python展开代码import json
import random
from concurrent.futures import ProcessPoolExecutor, as_completed
from tqdm import tqdm
import requests
def get_api_response(query, headers, url, model, prompt_guanfang, style_zhongwen):
    """
    模拟API请求,发送一个POST请求并返回响应。
    """
    payload = {
        "query": query,
        "model": model,
        "prompt_guanfang": prompt_guanfang,
        "style_zhongwen": style_zhongwen
    }
    try:
        response = requests.post(url, json=payload, headers=headers)
        return response.json()  # 假设API返回JSON数据
    except requests.exceptions.RequestException as e:
        return {"error": str(e)}
def worker(args):
    """
    Worker函数,用于多进程执行。
    """
    query, url, headers, model, prompt_guanfang, style_zhongwen = args
    return get_api_response(query, headers, url, model, prompt_guanfang, style_zhongwen)
if __name__ == '__main__':
    # 定义所有可用的API端口
    ports = list(range(8001, 8009))  # 8001到8008
    urls = [f"http://127.0.0.1:{port}/v1/chat/completions" for port in ports]
    headers = {
        "Content-Type": "application/json",
    }
    model = "gpt-4o-mini"  # 可以根据需要调整
    # 读取 JSON 文件内容(这里假设风格和查询数据以JSON文件存储)
    with open('风格翻译.json', 'r', encoding='utf-8') as json_file:
        style_dict = json.load(json_file)
    with open('用户原始输入.json', 'r', encoding='utf-8') as json_file:
        user_in = json.load(json_file)
    # 假设load_queries函数返回所有查询的列表
    def load_queries(user_in):
        return [entry["query"] for entry in user_in]
    queries = load_queries(user_in)
    queries = random.sample(queries, 200)  # 随机选择200条查询进行处理
    # 初始化结果字典
    responses = []
    # 准备多进程池
    num_processes = len(urls)  # 进程数等于可用URL数量
    with ProcessPoolExecutor(max_workers=num_processes) as executor:
        # 准备任务列表,将每个查询分配到一个URL(轮询分配)
        tasks = []
        idx_url = 0
        for idx, query in enumerate(queries):
            for key in style_dict:
                prompt_guanfang = style_dict[key]["prompt_guanfang"]
                style_zhongwen = style_dict[key]["style_zhongwen"]
                url = urls[idx_url % len(urls)]  # 轮询分配URL
                idx_url += 1
                tasks.append((query, url, headers, model, prompt_guanfang, style_zhongwen))
        # 使用tqdm显示进度条
        futures = {executor.submit(worker, task): task for task in tasks}
        for future in tqdm(as_completed(futures), total=len(futures), desc="处理查询"):
            response = future.result()
            responses.append(response)
    # 保存所有响应到JSON文件
    with open('result.json', 'w', encoding='utf-8') as json_file:
        json.dump(responses, json_file, ensure_ascii=False, indent=4)
    print("所有API请求处理完毕,结果已保存至result.json")


本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!