使用多进程快速复制文件夹文件到目标文件夹
2025-10-11
usefulScripts
00
python
展开代码
#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多进程文件快速复制工具 支持递归复制目录结构,使用多进程提高复制速度 """ import os import shutil import argparse from multiprocessing import Pool, cpu_count from pathlib import Path import time from typing import List, Tuple def get_all_files(source_dir: str) -> List[Tuple[str, str]]: """ 递归获取源目录下所有文件的路径对 返回: [(源文件路径, 相对路径), ...] """ file_pairs = [] source_path = Path(source_dir) for root, dirs, files in os.walk(source_dir): for file in files: source_file = os.path.join(root, file) # 计算相对路径 relative_path = os.path.relpath(source_file, source_dir) file_pairs.append((source_file, relative_path)) return file_pairs def copy_single_file(args: Tuple[str, str, str]) -> Tuple[bool, str, str]: """ 复制单个文件 args: (源文件路径, 相对路径, 目标根目录) 返回: (是否成功, 源文件路径, 错误信息) """ source_file, relative_path, dest_root = args try: # 构建目标文件路径 dest_file = os.path.join(dest_root, relative_path) # 确保目标目录存在 dest_dir = os.path.dirname(dest_file) os.makedirs(dest_dir, exist_ok=True) # 复制文件 shutil.copy2(source_file, dest_file) return True, source_file, "" except Exception as e: return False, source_file, str(e) def fast_copy(source_dir: str, dest_dir: str, num_processes: int = 300): """ 多进程快速复制目录 Args: source_dir: 源目录路径 dest_dir: 目标目录路径 num_processes: 进程数量,默认300 """ print(f"开始扫描源目录: {source_dir}") start_time = time.time() # 获取所有文件路径 file_pairs = get_all_files(source_dir) scan_time = time.time() - start_time print(f"扫描完成,共找到 {len(file_pairs)} 个文件,耗时 {scan_time:.2f} 秒") if not file_pairs: print("源目录中没有找到文件") return # 确保目标根目录存在 os.makedirs(dest_dir, exist_ok=True) # 准备多进程参数 copy_args = [(source_file, relative_path, dest_dir) for source_file, relative_path in file_pairs] print(f"开始使用 {num_processes} 个进程复制文件...") copy_start_time = time.time() # 使用多进程复制文件 success_count = 0 error_count = 0 with Pool(processes=num_processes) as pool: results = pool.map(copy_single_file, copy_args) for success, source_file, error_msg in results: if success: success_count += 1 else: error_count += 1 print(f"复制失败: {source_file} - {error_msg}") copy_time = time.time() - copy_start_time total_time = time.time() - start_time print(f"\n复制完成!") print(f"成功复制: {success_count} 个文件") print(f"复制失败: {error_count} 个文件") print(f"复制耗时: {copy_time:.2f} 秒") print(f"总耗时: {total_time:.2f} 秒") print(f"平均速度: {success_count/copy_time:.2f} 文件/秒") def main(): parser = argparse.ArgumentParser(description='多进程文件快速复制工具') parser.add_argument('source', help='源目录路径') parser.add_argument('dest', help='目标目录路径') parser.add_argument('-p', '--processes', type=int, default=300, help='进程数量 (默认: 300)') args = parser.parse_args() # 检查源目录是否存在 if not os.path.exists(args.source): print(f"错误: 源目录不存在: {args.source}") return if not os.path.isdir(args.source): print(f"错误: 源路径不是目录: {args.source}") return # 调整进程数量 max_processes = min(args.processes, cpu_count() * 4) # 限制最大进程数 if args.processes != max_processes: print(f"进程数量调整为: {max_processes} (原设置: {args.processes})") print(f"源目录: {args.source}") print(f"目标目录: {args.dest}") print(f"进程数量: {max_processes}") print("-" * 50) fast_copy(args.source, args.dest, max_processes) if __name__ == '__main__': main()
如果对你有用的话,可以打赏哦
打赏
ali pay
wechat pay

本文作者:Dong

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!