在现代计算中,利用多核CPU的能力来提高程序的性能已成为必不可少的技能。Python的multiprocessing模块提供了简便的接口来实现多进程并行计算。本文将通过一个实际的代码示例,详细介绍如何使用Python的多进程来分发和处理任务。
在处理大量任务时,单线程或单进程往往无法充分利用多核CPU的性能。通过多进程并行处理,可以显著提高程序的执行效率。下面,我们将介绍一个完整的代码示例,演示如何使用multiprocessing模块实现任务的并行处理。
首先,让我们先看一下代码的整体结构:
python展开代码import logging
import time
from multiprocessing import cpu_count, get_context
def process_task(task_queue, enable_trigger):
    # 处理任务的工作进程函数
    pass
def distribute_tasks(task_queue, batch_size, incremmental, task_size):
    # 分发任务到任务队列
    pass
def main():
    # 主函数,设置多进程环境并启动任务处理
    pass
if __name__ == '__main__':
    main()
代码主要包含以下几个部分:
logging、time和multiprocessing。process_task,用于处理从任务队列中获取的任务。distribute_tasks,用于将任务放入任务队列。main,设置多进程环境,启动工作进程,并分发任务。接下来,我们将逐步解析代码的每个部分。
python展开代码import logging
import time
from multiprocessing import cpu_count, get_context
logging:用于记录日志信息,方便调试和监控程序运行。time:用于记录时间,计算程序运行耗时。multiprocessing:核心模块,提供创建多进程所需的功能。process_taskpython展开代码def process_task(task_queue, enable_trigger):
    while True:
        task = task_queue.get()
        if task is None:
            # 接收到退出信号,结束循环
            task_queue.task_done()
            break
        # 处理任务(此处为占位符,实际逻辑需自行实现)
        logging.info(f"Processing task: {task}")
        time.sleep(0.1)  # 模拟处理时间
        task_queue.task_done()
功能说明:
task_queue中获取任务。None,表示需要结束进程,跳出循环。time.sleep(0.1)模拟处理时间)。task_queue.task_done(),表示任务已处理完毕。关键点:
while True循环,使进程持续运行,直到接收到退出信号。task_queue.get()从队列中获取任务,队列是进程间通信的主要方式。task_queue.task_done()通知队列,任务已完成。distribute_taskspython展开代码def distribute_tasks(task_queue, batch_size, incremmental, task_size):
    # 将任务分发到任务队列中
    for i in range(batch_size):
        task_queue.put(f"Task {i}")
    logging.info(f"Distributed {batch_size} tasks.")
功能说明:
task_queue中,供工作进程消费。batch_size决定了任务的数量。关键点:
task_queue.put()将任务放入队列。mainpython展开代码def main():
    start = time.time()
    max_workers = 0
    enable_trigger = False
    incremmental = True  # 假设这是一个布尔标志
    num_processor = int(max_workers) or cpu_count()
    task_size = num_processor * 3
    ctx = get_context('spawn')  # 使用'spawn'以兼容Windows
    batch_size = 1000
    logging.basicConfig(level=logging.INFO)
    logging.info(
        f"[start update candidate resume lang]: "
        f"use processors {num_processor}, "
        f"task_length {task_size}"
    )
    task_queue = ctx.JoinableQueue(maxsize=task_size)
    # 创建工作进程
    workers = []
    for _ in range(num_processor):
        p = ctx.Process(target=process_task, args=(task_queue, enable_trigger))
        p.start()
        workers.append(p)
    # 分发任务
    distribute_tasks(task_queue, batch_size, incremmental, task_size)
    # 等待所有任务完成
    task_queue.join()
    # 发送信号以停止工作进程
    for _ in workers:
        task_queue.put(None)
    # 等待所有工作进程结束
    for p in workers:
        p.join()
    logging.info(f"Finished processing in {time.time() - start} seconds.")
功能说明:
关键点详解:
python展开代码start = time.time()
max_workers = 0
enable_trigger = False
incremmental = True  # 假设这是一个布尔标志
start:记录开始时间,用于计算总耗时。max_workers:最大工作进程数,默认为0。enable_trigger和incremmental:控制流程的标志变量。python展开代码num_processor = int(max_workers) or cpu_count()
task_size = num_processor * 3
ctx = get_context('spawn')  # 使用'spawn'以兼容Windows
batch_size = 1000
num_processor:如果max_workers为0,则使用cpu_count()获取CPU核心数。task_size:任务队列的最大大小,设置为进程数的3倍。ctx:获取上下文,这里使用spawn以兼容Windows操作系统。batch_size:一次分发的任务数量。python展开代码logging.basicConfig(level=logging.INFO)
logging.info(
    f"[start update candidate resume lang]: "
    f"use processors {num_processor}, "
    f"task_length {task_size}"
)
INFO,用于输出信息。python展开代码task_queue = ctx.JoinableQueue(maxsize=task_size)
# 创建工作进程
workers = []
for _ in range(num_processor):
    p = ctx.Process(target=process_task, args=(task_queue, enable_trigger))
    p.start()
    workers.append(p)
JoinableQueue,指定最大大小maxsize。Process创建工作进程,目标函数为process_task,并传入参数。workers列表中。python展开代码distribute_tasks(task_queue, batch_size, incremmental, task_size)
distribute_tasks函数,将任务放入任务队列中。python展开代码# 等待所有任务完成
task_queue.join()
# 发送信号以停止工作进程
for _ in workers:
    task_queue.put(None)
# 等待所有工作进程结束
for p in workers:
    p.join()
task_queue.join():阻塞主进程,直到队列中的所有任务都被处理完毕。None,通知它们退出循环,结束进程。p.join()等待所有进程结束,确保资源被正确释放。python展开代码logging.info(f"Finished processing in {time.time() - start} seconds.")
python展开代码if __name__ == '__main__':
    main()
main()函数。通过上述代码,我们实现了一个简单而实用的多进程任务处理程序。关键点包括:
multiprocessing模块的Process和JoinableQueue来实现进程间通信和任务分发。get_context('spawn')来兼容不同的操作系统,特别是Windows。None),优雅地结束工作进程。task_queue.join()和task_queue.task_done()来同步任务的完成情况。num_processor和task_size等参数。python展开代码import logging
import time
from multiprocessing import cpu_count, get_context
def process_task(task_queue, enable_trigger):
    while True:
        task = task_queue.get()
        if task is None:
            # Signal to exit
            task_queue.task_done()
            break
        # Process the task (placeholder for actual processing logic)
        logging.info(f"Processing task: {task}")
        time.sleep(0.1)  # Simulate processing time
        task_queue.task_done()
def distribute_tasks(task_queue, batch_size, incremmental, task_size):
    # Distribute tasks into the task_queue
    for i in range(batch_size):
        task_queue.put(f"Task {i}")
    logging.info(f"Distributed {batch_size} tasks.")
def main():
    start = time.time()
    max_workers = 0
    enable_trigger = False
    incremmental = True  # Assuming this is a boolean flag
    num_processor = int(max_workers) or cpu_count()
    task_size = num_processor * 3
    ctx = get_context('spawn')  # Use 'spawn' for Windows compatibility
    batch_size = 1000
    logging.basicConfig(level=logging.INFO)
    logging.info(
        f"[start update candidate resume lang]: "
        f"use processors {num_processor}, "
        f"task_length {task_size}"
    )
    task_queue = ctx.JoinableQueue(maxsize=task_size)
    # Create worker processes
    workers = []
    for _ in range(num_processor):
        p = ctx.Process(target=process_task, args=(task_queue, enable_trigger))
        p.start()
        workers.append(p)
    # Corrected function call with missing comma
    distribute_tasks(task_queue, batch_size, incremmental, task_size)
    # Wait for all tasks to be processed
    task_queue.join()
    # Send a signal to stop the workers
    for _ in workers:
        task_queue.put(None)
    # Properly close the pool
    for p in workers:
        p.join()
    logging.info(f"Finished processing in {time.time() - start} seconds.")
if __name__ == '__main__':
    main()


本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!