laion_aesthetics_1024_33M_9.parquet 文件示例我下载了一个示例文件 laion_aesthetics_1024_33M_9.parquet 并存放在 /ssd/xiedong/parquet_test 目录中,接下来,我将通过启动 Docker 容器来完成该文件的数据下载。
核心思路是:每个容器配备一个配置文件,配置文件内定义了下载设置(如输入和输出文件位置等)。容器启动后会依据配置文件执行任务,任务完成后容器自动关闭。
首先,制作一个包含所需环境的镜像:
bash展开代码docker run -it -v /ssd/xiedong/parquet_test:/ssd/xiedong/parquet_test buluma/python:3.10.4-ubuntu22.04 bash
pip install img2dataset
apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
pip cache purge
docker commit e768addf27ec kevinchina/deeplearning:ubuntu22.python3.10.img2dataset
download_parquet.py此脚本负责根据配置文件下载 Parquet 数据,并处理失败的下载任务重试:
python展开代码import argparse
import json
import multiprocessing
import os
import pandas as pd
from img2dataset import download
import os
import time
def prepare_retry_download(url_list_dir, output_folder):
    # 定义重试下载逻辑,生成一个包含失败下载项的 Parquet 文件
    src_in = os.path.dirname(url_list_dir)
    src_in = os.path.join(src_in, "retry_download")
    os.makedirs(src_in, exist_ok=True)
    parquet_file = os.path.join(src_in, "retry_download.parquet")
    if os.path.exists(parquet_file):
        print("已经存在 retry_download.parquet")
        return parquet_file
    
    # 从输出文件夹中查找需要重试的下载文件
    files = [f for f in os.listdir(output_folder) if f.endswith(".parquet")]
    files_abs = sorted([os.path.join(output_folder, f) for f in files])
    df = pd.read_parquet(files_abs[0], engine='pyarrow')
    df = df[df['status'] == "failed_to_download"]
    
    # 处理前1000个文件,合并失败记录
    for file in files_abs[1:1000]:
        file_b = os.path.splitext(os.path.basename(file))[0] + "_stats.json"
        stats_file = os.path.join(output_folder, file_b)
        if not os.path.exists(stats_file):
            continue
        with open(stats_file, "r") as f:
            data = json.load(f)
        if data["successes"] / data["count"] > 0.60:
            continue
        df_tmp = pd.read_parquet(file, engine='pyarrow')
        df = pd.concat([df, df_tmp[df_tmp['status'] == "failed_to_download"]], axis=0)
    
    # 排除不需要重试的错误
    exclude_errors = ["Max retries exceeded with url", "HTTP Error 400", "HTTP Error 403", "HTTP Error 404", 
                      "HTTP Error 415", "HTTP Error 500", "HTTP Error 501", "HTTP Error 502", "HTTP Error 503"]
    for e in exclude_errors:
        df = df[~df['error_message'].str.contains(e)]
    
    # 保留必要列并重命名
    df = df.rename(columns={"caption": "TEXT", "url": "URL", "key": "KEY"})[["TEXT", "URL", "KEY"]]
    df.to_parquet(parquet_file, engine='pyarrow')
    return parquet_file
def wait_for_env_variable(env_var_name, check_interval=5):
    # 等待环境变量的设置
    while True:
        env_var_value = os.getenv(env_var_name)
        if env_var_value:
            print(f"找到环境变量 '{env_var_name}':{env_var_value}")
            return env_var_value
        else:
            print(f"等待环境变量 '{env_var_name}' 被设置...")
            time.sleep(check_interval)
if __name__ == '__main__':
    # 等待环境变量
    env_var_value = wait_for_env_variable("PFVAR")
    
    # 读取配置文件
    with open(env_var_value, 'r', encoding='utf-8') as f:
        config = json.load(f)
    # 设置下载参数
    url_list = config["url_list"]
    output_folder = config["output_folder"]
    thread_count = config.get("thread_count", 16)
    timeout = config.get("timeout", 120)
    number_sample_per_shard = config.get("number_sample_per_shard", 2000)
    print("并发请求数量:", multiprocessing.cpu_count() * thread_count)
    download(
        processes_count=multiprocessing.cpu_count(),
        thread_count=thread_count,
        url_list=url_list,
        output_folder=output_folder,
        image_size=1024,
        resize_mode="no",
        skip_reencode=True,
        output_format="webdataset",
        input_format="parquet",
        url_col="URL",
        caption_col="TEXT",
        enable_wandb=False,
        number_sample_per_shard=number_sample_per_shard,
        distributor="multiprocessing",
        min_image_size=128,
        max_aspect_ratio=5,
        incremental_mode="incremental",
        timeout=timeout,
    )
    # 准备重试下载
    parquet_file = prepare_retry_download(url_list, output_folder)
    retry_output_folder = os.path.join(output_folder, "_retry_download")
    download(
        processes_count=multiprocessing.cpu_count(),
        thread_count=thread_count,
        url_list=parquet_file,
        output_folder=retry_output_folder,
        image_size=1024,
        resize_mode="no",
        skip_reencode=True,
        output_format="webdataset",
        input_format="parquet",
        url_col="URL",
        caption_col="TEXT",
        save_additional_columns=["KEY"],
        enable_wandb=False,
        number_sample_per_shard=number_sample_per_shard,
        distributor="multiprocessing",
        min_image_size=128,
        max_aspect_ratio=5,
        incremental_mode="incremental",
        timeout=timeout,
    )
    print("重试成功率可能为0,说明第一次下载已经非常成功。重试时无需关注成功率。")
    print("程序执行完毕,无需重新运行。")
bash展开代码FROM kevinchina/deeplearning:ubuntu22.python3.10.img2dataset
WORKDIR /app
COPY download_parquet.py .
ENTRYPOINT ["python", "download_parquet.py"]
bash展开代码docker build -t kevinchina/deeplearning:ubuntu22.python3.10.img2dataset.app .
config_part00059.json配置文件应指定 Parquet 文件位置和数据下载的持久化存储目录:
json展开代码{
  "url_list": "/ssd/xiedong/parquet_test/laion_aesthetics_1024_33M_9.parquet",
  "output_folder": "/ssd/xiedong/parquet_test/laion_aesthetics_1024_33M_9",
  "thread_count": 16,
  "timeout": 120,
  "number_sample_per_shard": 2000
}
最后,使用以下命令启动下载任务:
bash展开代码docker run -d \ -v /ssd/xiedong/parquet_test:/ssd/xiedong/parquet_test \ -e PFVAR=/ssd/xiedong/parquet_test/config_part00059.json \ kevinchina/deeplearning:ubuntu22.python3.10.img2dataset.app | xargs -I {} docker logs -f {}


本文作者:Dong
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC。本作品采用《知识共享署名-非商业性使用 4.0 国际许可协议》进行许可。您可以在非商业用途下自由转载和修改,但必须注明出处并提供原作者链接。 许可协议。转载请注明出处!