python展开代码import requests
import json
import os
from tqdm import tqdm
import time
class FishAudioTTS:
def __init__(self, api_key, reference_id):
"""
初始化Fish Audio TTS客户端
Args:
api_key (str): Fish Audio API密钥
reference_id (str): 参考音频ID
"""
self.api_key = api_key
self.reference_id = reference_id
self.base_url = "https://api.fish.audio/v1/tts"
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'model': 's1' # 使用s1模型
}
def text_to_speech(self, text, output_file="output.wav", **kwargs):
"""
将文本转换为语音
Args:
text (str): 要转换的文本
output_file (str): 输出音频文件名
**kwargs: 其他API参数
"""
# 默认参数
default_params = {
"text": text,
"reference_id": self.reference_id, # 使用参考音频ID
"temperature": 0.7,
"top_p": 0.7,
"chunk_length": 200,
"normalize": True,
"format": "wav",
"sample_rate": 44100,
"latency": "normal"
}
# 合并用户提供的参数
params = {**default_params, **kwargs}
print(f"🎵 开始文本转语音...")
print(f"📝 文本内容: {text[:50]}{'...' if len(text) > 50 else ''}")
print(f"🎯 参考音频ID: {self.reference_id}")
print(f"🤖 使用模型: s1")
try:
# 发送请求
with tqdm(desc="🚀 发送请求到Fish Audio API", unit="req") as pbar:
response = requests.post(
self.base_url,
headers=self.headers,
json=params,
timeout=60
)
pbar.update(1)
# 检查响应状态
if response.status_code == 200:
print("✅ API请求成功!")
# 保存音频文件
with tqdm(desc="💾 保存音频文件", unit="B", unit_scale=True) as pbar:
with open(output_file, "wb") as f:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
pbar.update(len(chunk))
print(f"🎉 音频文件已保存到: {output_file}")
print(f"📊 文件大小: {os.path.getsize(output_file)} 字节")
return True
elif response.status_code == 401:
print("❌ 认证失败: API密钥无效")
return False
elif response.status_code == 402:
print("❌ 支付失败: 账户余额不足")
return False
elif response.status_code == 422:
print("❌ 请求参数错误")
try:
error_info = response.json()
print(f"错误详情: {error_info}")
except:
print(f"响应内容: {response.text}")
return False
else:
print(f"❌ 请求失败: HTTP {response.status_code}")
print(f"响应内容: {response.text}")
return False
except requests.exceptions.Timeout:
print("❌ 请求超时,请检查网络连接")
return False
except requests.exceptions.RequestException as e:
print(f"❌ 网络请求错误: {e}")
return False
except Exception as e:
print(f"❌ 未知错误: {e}")
return False
def main():
"""主函数 - 演示Fish Audio TTS的使用"""
# 配置信息
API_KEY = "xx"
REFERENCE_ID = "ef45348243c74d0d83ceb0f2f90c2d38" # 参考音频ID
# 创建TTS客户端
tts_client = FishAudioTTS(API_KEY, REFERENCE_ID)
# 测试文本
test_text = "你好,这是一个使用Fish Audio API的文本转语音测试。今天天气真不错,希望你能喜欢这个声音效果!"
print("=" * 60)
print("🎤 Fish Audio 文本转语音测试")
print("=" * 60)
# 执行文本转语音
success = tts_client.text_to_speech(
text=test_text,
output_file="fish_audio_output.wav",
# 可以自定义参数
temperature=0.8,
top_p=0.8,
format="wav",
sample_rate=44100
)
if success:
print("\n🎉 文本转语音完成!")
print(f"📁 输出文件: fish_audio_output.wav")
else:
print("\n❌ 文本转语音失败,请检查错误信息")
if __name__ == "__main__":
main()
GPT-SoVITS v4出来好久了,使用一下。
代码
bash展开代码git cl
服务:
bash展开代码python -m vllm.entrypoints.openai.api
关键点: 这个测试会让GPU"吃饱",充分利用显卡算力,最能反映显卡的计算性能差异。
找到带 launcher.py 参数的 python 进程 全部杀死
js展开代码sudo pkill -9 -f launcher.py
下载模型
python展开代码curl -s https://packagecloud.io/install/repositories/github/git-lfs/script.deb.sh | bash
apt-get install git-lfs
git clone https://huggingface.co/Qwen/Qwen2.5-3B-Instruct
模型路径:
bash展开代码/mnt/jfs6/model/Qwen2.5-3B-Instruct
根据搜索结果,启动 GGUF 文件为 OpenAI 式接口主要有以下几种方法:
这是最常用的方法,llama-cpp-python 提供了一个网络服务器,可以作为 OpenAI API 的直接替代品。
安装和启动:
bash展开代码# 安装服务器版本
pip install 'llama-cpp-python[server]' # 无gpu支持
# 或者安装GPU支持的
# NVIDIA GPU (CUDA)
CMAKE_ARGS="-DGGML_CUDA=on" pip install llama-cpp-python[server]
# 启动服务器
python3 -m llama_cpp.server \
--model /mnt/jfs6/model/Satyr-V0.1-4B/Satyr-V0.1-4B-F16.gguf \
--host 0.0.0.0 \
--port 8000 \
--n_ctx 10240 \
--n_gpu_layers -1 # 全部层都放gpu
比如这个:
bash展开代码https://huggingface.co/PantheonUnbound/Satyr-V0.1-4B/blob/main/Satyr-V0.1-4B-F16.gguf
使用resolve就可以下载:
bash展开代码wget https://huggingface.co/PantheonUnbound/Satyr-V0.1-4B/resolve/main/Satyr-V0.1-4B-F16.gguf
python展开代码#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程文件快速复制工具
支持递归复制目录结构,使用多进程提高复制速度
"""
import os
import shutil
import argparse
from multiprocessing import Pool, cpu_count
from pathlib import Path
import time
from typing import List, Tuple
def get_all_files(source_dir: str) -> List[Tuple[str, str]]:
"""
递归获取源目录下所有文件的路径对
返回: [(源文件路径, 相对路径), ...]
"""
file_pairs = []
source_path = Path(source_dir)
for root, dirs, files in os.walk(source_dir):
for file in files:
source_file = os.path.join(root, file)
# 计算相对路径
relative_path = os.path.relpath(source_file, source_dir)
file_pairs.append((source_file, relative_path))
return file_pairs
def copy_single_file(args: Tuple[str, str, str]) -> Tuple[bool, str, str]:
"""
复制单个文件
args: (源文件路径, 相对路径, 目标根目录)
返回: (是否成功, 源文件路径, 错误信息)
"""
source_file, relative_path, dest_root = args
try:
# 构建目标文件路径
dest_file = os.path.join(dest_root, relative_path)
# 确保目标目录存在
dest_dir = os.path.dirname(dest_file)
os.makedirs(dest_dir, exist_ok=True)
# 复制文件
shutil.copy2(source_file, dest_file)
return True, source_file, ""
except Exception as e:
return False, source_file, str(e)
def fast_copy(source_dir: str, dest_dir: str, num_processes: int = 300):
"""
多进程快速复制目录
Args:
source_dir: 源目录路径
dest_dir: 目标目录路径
num_processes: 进程数量,默认300
"""
print(f"开始扫描源目录: {source_dir}")
start_time = time.time()
# 获取所有文件路径
file_pairs = get_all_files(source_dir)
scan_time = time.time() - start_time
print(f"扫描完成,共找到 {len(file_pairs)} 个文件,耗时 {scan_time:.2f} 秒")
if not file_pairs:
print("源目录中没有找到文件")
return
# 确保目标根目录存在
os.makedirs(dest_dir, exist_ok=True)
# 准备多进程参数
copy_args = [(source_file, relative_path, dest_dir)
for source_file, relative_path in file_pairs]
print(f"开始使用 {num_processes} 个进程复制文件...")
copy_start_time = time.time()
# 使用多进程复制文件
success_count = 0
error_count = 0
with Pool(processes=num_processes) as pool:
results = pool.map(copy_single_file, copy_args)
for success, source_file, error_msg in results:
if success:
success_count += 1
else:
error_count += 1
print(f"复制失败: {source_file} - {error_msg}")
copy_time = time.time() - copy_start_time
total_time = time.time() - start_time
print(f"\n复制完成!")
print(f"成功复制: {success_count} 个文件")
print(f"复制失败: {error_count} 个文件")
print(f"复制耗时: {copy_time:.2f} 秒")
print(f"总耗时: {total_time:.2f} 秒")
print(f"平均速度: {success_count/copy_time:.2f} 文件/秒")
def main():
parser = argparse.ArgumentParser(description='多进程文件快速复制工具')
parser.add_argument('source', help='源目录路径')
parser.add_argument('dest', help='目标目录路径')
parser.add_argument('-p', '--processes', type=int, default=300,
help='进程数量 (默认: 300)')
args = parser.parse_args()
# 检查源目录是否存在
if not os.path.exists(args.source):
print(f"错误: 源目录不存在: {args.source}")
return
if not os.path.isdir(args.source):
print(f"错误: 源路径不是目录: {args.source}")
return
# 调整进程数量
max_processes = min(args.processes, cpu_count() * 4) # 限制最大进程数
if args.processes != max_processes:
print(f"进程数量调整为: {max_processes} (原设置: {args.processes})")
print(f"源目录: {args.source}")
print(f"目标目录: {args.dest}")
print(f"进程数量: {max_processes}")
print("-" * 50)
fast_copy(args.source, args.dest, max_processes)
if __name__ == '__main__':
main()