DeepSpeed分布式训练最佳实践:ShareGPT数据集加载方法
摘要
大规模分布式训练中,使用ShareGPT对话数据集配合DeepSpeed进行多卡训练,最棘手的痛点在于
大规模分布式训练中,使用ShareGPT对话数据集配合DeepSpeed进行多卡训练,最棘手的痛点在于数据分片未对齐——即便每个GPU均已加载数据,训练时却常出现样本重复、进程间数据错乱,甚至个别GPU因无数据而闲置。根本原因通常是数据集索引机制未适配DistributedSampler的切分逻辑,或是变长文本带来的动态性未被妥善处理。以下分享三种经过工业验证的封装策略,覆盖不同数据规模与组织形式。

首先明确一个判断标准:若你的ShareGPT数据总量在几十GB级别,且可一次性加载至内存,最稳妥的方式是借助HuggingFace Datasets的原生接口,配合PyTorch的DistributedSampler实现均匀分割。无需额外自定义扩展。
一、基于HuggingFace Datasets与DistributedSampler实现数据切分
此方案核心在于将ShareGPT JSONL文件转换为Dataset对象,并通过torch.utils.data.DistributedSampler实现进程间互斥切片,同时保留原始样本顺序。具体流程包含六个关键步骤:
1. 安装必要依赖:pip install datasets torch transformers,这是标准化操作。
2. 加载原始数据并构建Dataset对象:dataset = load_dataset("json", data_files={"train": "sharegpt_clean.jsonl"}, split="train")。确保data_files路径指向你的实际文件。
3. 定义预处理函数,将对话结构归一化为连续文本格式:def format_sharegpt(example): return {"text": "".join([f"### {msg['from']}: {msg['value']}" for msg in example["conversations"]])}。分隔符可根据业务需求调整,例如改用\n换行。
4. 执行映射与分词操作:tokenized_ds = dataset.map(format_sharegpt).map(lambda x: tokenizer(x["text"], truncation=True, max_length=2048), batched=True)。truncation与max_length参数需严格对齐模型上下文窗口,避免超出支持范围。
5. 初始化DistributedSampler:sampler = DistributedSampler(tokenized_ds, shuffle=True, drop_last=True)。shuffle开关依训练需求而定,但建议开启drop_last,防止最后一个批次样本数不一致引发的梯度同步异常。
6. 构建DataLoader:dataloader = DataLoader(tokenized_ds, batch_size=4, sampler=sampler, num_workers=4)。num_workers根据CPU核心数适度调整,推荐不超过16。
该方案运行后,每个GPU获取的样本确定互斥,且顺序与原始数据一致,适合调试与结果复现。
二、自定义IterableDataset配合DeepSpeed数据并行配置
当ShareGPT数据量达到数百GB甚至TB级,且已按日期或ID拆分为多个JSONL文件时,一次性全量加载不再现实。此时需采用流式读取,结合rank感知的路径选择,构建无状态且支持断点续训的数据供给。
具体实现通过继承torch.utils.data.IterableDataset并重写__iter__方法。关键操作如下:
1. 定义类时传入rank与world_size,筛选当前进程负责的文件:class ShareGPTIterableDataset(IterableDataset): def __init__(self, file_list, rank, world_size): self.file_list = [f for i, f in enumerate(file_list) if i % world_size == rank]。通过取模运算均匀分配文件,确保各进程仅读取自身归属部分。
2. 在__iter__中按行解析JSONL并逐条生成样本:for file_path in self.file_list: with open(file_path) as f: for line in f: yield json.loads(line)。此处避免过多预处理,保持轻量。
3. 实例化数据集时传入当前进程的rank与world_size:ds = ShareGPTIterableDataset(glob.glob("sharegpt_*.jsonl"), dist.get_rank(), dist.get_world_size())。glob匹配所有分片文件。
4. 由于IterableDataset无法搭配DistributedSampler,需直接构建DataLoader并禁用自动采样:dataloader = DataLoader(ds, batch_size=2, num_workers=2)。num_workers不宜过大,以免引发文件句柄冲突。
5. 在DeepSpeed配置中显式关闭可能干扰数据流的自动采样功能,否则DeepSpeed会尝试为DataLoader附加Sampler,与IterableDataset冲突:"data_efficiency": {"enabled": false}。该字段位于ZeRO优化对应的JSON配置文件内。
此方案内存占用极低,且天然支持断点续训——只需在保存checkpoint时记录各进程当前读取的行号或文件偏移量即可。
三、基于DeepSpeed的DataLoader Hook注入分片逻辑
第三种方案直接作用于DeepSpeed初始化阶段,绕过PyTorch原生采样器,注入rank专属的数据路径与偏移量。适用于已预分片且需严格管控每张GPU token吞吐量的场景,例如配合梯度累积进行精细调参。
具体操作步骤:
1. 手动或通过脚本将ShareGPT原始文件按world_size切分为独立文件:split -l 50000 sharegpt_full.jsonl sharegpt_part_。分片行数(如50000)可根据实际显存动态调整。
2. 在init_process_group完成后,依据当前rank定位对应分片文件:part_file = f"sharegpt_part_{dist.get_rank():02d}"。注意补零以确保排序正确。
3. 使用HuggingFace的load_dataset加载单个分片:local_ds = load_dataset("json", data_files=part_file, split="train")。每个进程仅读取一个文件,彻底杜绝数据交叉。
4. 调用deepspeed.initialize时通过training_data参数传入本地数据集:model_engine, optimizer, _, _ = deepspeed.initialize(model=model, training_data=local_ds, ...)。注意该参数在较新版本DeepSpeed中可能更名为train_dataset,需参照官方文档。
5. 确保DeepSpeed配置中禁用可能干扰数据流的选项,如partition_activations与stage3_gather_16bit_weights_on_model_save。推荐的安全配置为:{"zero_optimization": {"stage": 2}}。若使用stage3,需额外关注数据分片与激活检查点的交互。
此方法灵活性高,但需手动管理分片文件,适合数据预处理已标准化且流程稳定的团队。若数据源为动态生成且每轮训练分片可能变化,建议优先采用前两种方案。
来源:互联网
本网站新闻资讯均来自公开渠道,力求准确但不保证绝对无误,内容观点仅代表作者本人,与本站无关。若涉及侵权,请联系我们处理。本站保留对声明的修改权,最终解释权归本站所有。