python展开代码#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多进程文件快速复制工具
支持递归复制目录结构,使用多进程提高复制速度
"""
import os
import shutil
import argparse
from multiprocessing import Pool, cpu_count
from pathlib import Path
import time
from typing import List, Tuple
def get_all_files(source_dir: str) -> List[Tuple[str, str]]:
"""
递归获取源目录下所有文件的路径对
返回: [(源文件路径, 相对路径), ...]
"""
file_pairs = []
source_path = Path(source_dir)
for root, dirs, files in os.walk(source_dir):
for file in files:
source_file = os.path.join(root, file)
# 计算相对路径
relative_path = os.path.relpath(source_file, source_dir)
file_pairs.append((source_file, relative_path))
return file_pairs
def copy_single_file(args: Tuple[str, str, str]) -> Tuple[bool, str, str]:
"""
复制单个文件
args: (源文件路径, 相对路径, 目标根目录)
返回: (是否成功, 源文件路径, 错误信息)
"""
source_file, relative_path, dest_root = args
try:
# 构建目标文件路径
dest_file = os.path.join(dest_root, relative_path)
# 确保目标目录存在
dest_dir = os.path.dirname(dest_file)
os.makedirs(dest_dir, exist_ok=True)
# 复制文件
shutil.copy2(source_file, dest_file)
return True, source_file, ""
except Exception as e:
return False, source_file, str(e)
def fast_copy(source_dir: str, dest_dir: str, num_processes: int = 300):
"""
多进程快速复制目录
Args:
source_dir: 源目录路径
dest_dir: 目标目录路径
num_processes: 进程数量,默认300
"""
print(f"开始扫描源目录: {source_dir}")
start_time = time.time()
# 获取所有文件路径
file_pairs = get_all_files(source_dir)
scan_time = time.time() - start_time
print(f"扫描完成,共找到 {len(file_pairs)} 个文件,耗时 {scan_time:.2f} 秒")
if not file_pairs:
print("源目录中没有找到文件")
return
# 确保目标根目录存在
os.makedirs(dest_dir, exist_ok=True)
# 准备多进程参数
copy_args = [(source_file, relative_path, dest_dir)
for source_file, relative_path in file_pairs]
print(f"开始使用 {num_processes} 个进程复制文件...")
copy_start_time = time.time()
# 使用多进程复制文件
success_count = 0
error_count = 0
with Pool(processes=num_processes) as pool:
results = pool.map(copy_single_file, copy_args)
for success, source_file, error_msg in results:
if success:
success_count += 1
else:
error_count += 1
print(f"复制失败: {source_file} - {error_msg}")
copy_time = time.time() - copy_start_time
total_time = time.time() - start_time
print(f"\n复制完成!")
print(f"成功复制: {success_count} 个文件")
print(f"复制失败: {error_count} 个文件")
print(f"复制耗时: {copy_time:.2f} 秒")
print(f"总耗时: {total_time:.2f} 秒")
print(f"平均速度: {success_count/copy_time:.2f} 文件/秒")
def main():
parser = argparse.ArgumentParser(description='多进程文件快速复制工具')
parser.add_argument('source', help='源目录路径')
parser.add_argument('dest', help='目标目录路径')
parser.add_argument('-p', '--processes', type=int, default=300,
help='进程数量 (默认: 300)')
args = parser.parse_args()
# 检查源目录是否存在
if not os.path.exists(args.source):
print(f"错误: 源目录不存在: {args.source}")
return
if not os.path.isdir(args.source):
print(f"错误: 源路径不是目录: {args.source}")
return
# 调整进程数量
max_processes = min(args.processes, cpu_count() * 4) # 限制最大进程数
if args.processes != max_processes:
print(f"进程数量调整为: {max_processes} (原设置: {args.processes})")
print(f"源目录: {args.source}")
print(f"目标目录: {args.dest}")
print(f"进程数量: {max_processes}")
print("-" * 50)
fast_copy(args.source, args.dest, max_processes)
if __name__ == '__main__':
main()
本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!