= TfmdDL(list(range(50)), bs=12, num_workers=2)
dl for i in range(4):
= DistributedDL(dl, i, 4)
dl1 list(dl1), (torch.arange(i*13, i*13+12)%50,torch.tensor([i*13+12])%50)) test_eq(
分布式训练
当使用多 GPU 时,您很可能希望使用分布式训练进行拟合(fit)。
示例用法可在以下位置找到
- 以脚本形式:examples/distrib.py
- 以及所有应用示例中,使用Notebook 启动器
- 在本 notebook 底部,查看更多
notebook_launcher
示例。
要使用分布式训练,只需三个步骤
- 在调用
learn.fit
之前添加with learn.distrib_ctx():
- 可以通过从命令行运行
accelerate config
来自行配置 Accelerate,或者运行
from accelerate.utils import write_basic_config
write_basic_config()
- 使用
accelerate launch scriptname.py ...args...
运行您的训练脚本
如果您使用 untar_data
,或可能在脚本中下载或解压数据或模型,您应该将该代码用 rank0_first
包裹起来。这会强制该步骤首先只在主进程上运行一次,然后其余进程并行运行。例如,代替
= untar_data(URLs.IMAGEWOOF_320) path
…您应该使用
= rank0_first(untar_data, URLs.IMAGEWOOF_320) path
如果需要,请参阅下方了解完整 API 和底层辅助函数的详细信息——但是请注意,除非您需要改变分布式训练的实现方式,否则您将不需要上述内容之外的任何东西。
并行
DataParallel.reset
DataParallel.reset ()
将必需的 reset
调用补丁到 DataParallel
中
ParallelTrainer
ParallelTrainer (device_ids)
自动将模型包裹在 DataParallel
中
Learner.to_parallel
Learner.to_parallel (device_ids=None)
向 Learner
添加 ParallelTrainer
回调函数
Learner.detach_parallel
Learner.detach_parallel ()
从 Learner 中移除 ParallelTrainer
回调函数
parallel_ctx
parallel_ctx (device_ids=None)
一个上下文管理器,用于使 Learner 适应数据并行模式训练。
分布式
辅助函数
DistributedDataParallel.reset
DistributedDataParallel.reset ()
将必需的 reset
调用补丁到 DistributedDataParallel
中
setup_distrib
setup_distrib (gpu=None)
设置此进程参与分布式训练
teardown_distrib
teardown_distrib ()
释放分布式训练资源
DataLoader
DistributedDL
DistributedDL (dl, rank=None, world_size=None, device=None)
一个 TfmdDL
,它将批次数据分割成等大小的块分给每个工作进程。
DistributedTrainer
DistributedTrainer (sync_bn=True, device_placement:bool=True, split_batches:bool=<object object at 0x7f5154a49380>, gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration|None=None, deepspeed_plugin:DeepSpeedPlugin|dict[str,DeepSpeedPl ugin]|None=None, fsdp_plugin:FullyShardedDataParallelPlugin|None=None, torch_tp_plugin:TorchTensorParallelPlugin|None=None, megatron_lm_plugin:MegatronLMPlugin|None=None, rng_types:list[str|RNGType]|None=None, project_dir:str|os.PathLike|None=None, project_config:ProjectConfiguration|None=None, gradie nt_accumulation_plugin:GradientAccumulationPlugin|Non e=None, kwargs_handlers:list[KwargsHandler]|None=None, dynamo_backend:DynamoBackend|str|None=None, dynamo_plugin:TorchDynamoPlugin|None=None, deepspeed_ plugins:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|Non e=None)
将 model
包裹在 DistributedDataParallel
中,将 dls
包裹在 DistributedDL
中
类型 | 默认值 | 详情 | |
---|---|---|---|
sync_bn | 布尔值 | True | 是否将所有批归一化替换为 nn.SyncBatchNorm |
device_placement | 布尔值 | True | |
split_batches | 布尔值 | <object object at 0x7f5154a49380> | |
gradient_accumulation_steps | int | 1 | |
cpu | 布尔值 | False | |
dataloader_config | DataLoaderConfiguration | None | None | |
deepspeed_plugin | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None | |
fsdp_plugin | FullyShardedDataParallelPlugin | None | None | |
torch_tp_plugin | TorchTensorParallelPlugin | None | None | |
megatron_lm_plugin | MegatronLMPlugin | None | None | |
rng_types | list[str | RNGType] | None | None | |
project_dir | str | os.PathLike | None | None | |
project_config | ProjectConfiguration | None | None | |
gradient_accumulation_plugin | GradientAccumulationPlugin | None | None | |
kwargs_handlers | list[KwargsHandler] | None | None | |
dynamo_backend | DynamoBackend | str | None | None | |
dynamo_plugin | TorchDynamoPlugin | None | None | |
deepspeed_plugins | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None |
Learner.to_distributed
Learner.to_distributed (sync_bn=True, device_placement:bool=True, split_batches:bool=<object object at 0x7f5154a49380>, gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfi guration|None=None, deepspeed_plugin:DeepSpeedPlu gin|dict[str,DeepSpeedPlugin]|None=None, fsdp_plu gin:FullyShardedDataParallelPlugin|None=None, tor ch_tp_plugin:TorchTensorParallelPlugin|None=None, megatron_lm_plugin:MegatronLMPlugin|None=None, rng_types:list[str|RNGType]|None=None, project_dir:str|os.PathLike|None=None, project_config:ProjectConfiguration|None=None, gr adient_accumulation_plugin:GradientAccumulationPl ugin|None=None, kwargs_handlers:list[KwargsHandler]|None=None, dynamo_backend:DynamoBackend|str|None=None, dynamo_plugin:TorchDynamoPlugin|None=None, deepsp eed_plugins:DeepSpeedPlugin|dict[str,DeepSpeedPlu gin]|None=None)
向 Learner 添加 AcceleratedTrainer
并配置 Accelerator
类型 | 默认值 | 详情 | |
---|---|---|---|
sync_bn | 布尔值 | True | 是否将所有批归一化替换为 nn.SyncBatchNorm |
device_placement | 布尔值 | True | |
split_batches | 布尔值 | <object object at 0x7f5154a49380> | |
gradient_accumulation_steps | int | 1 | |
cpu | 布尔值 | False | |
dataloader_config | DataLoaderConfiguration | None | None | |
deepspeed_plugin | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None | |
fsdp_plugin | FullyShardedDataParallelPlugin | None | None | |
torch_tp_plugin | TorchTensorParallelPlugin | None | None | |
megatron_lm_plugin | MegatronLMPlugin | None | None | |
rng_types | list[str | RNGType] | None | None | |
project_dir | str | os.PathLike | None | None | |
project_config | ProjectConfiguration | None | None | |
gradient_accumulation_plugin | GradientAccumulationPlugin | None | None | |
kwargs_handlers | list[KwargsHandler] | None | None | |
dynamo_backend | DynamoBackend | str | None | None | |
dynamo_plugin | TorchDynamoPlugin | None | None | |
deepspeed_plugins | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None |
Learner.detach_distributed
Learner.detach_distributed ()
从 Learner 中移除 DistributedTrainer
distrib_ctx
上下文管理器
distrib_ctx
distrib_ctx (sync_bn=True, in_notebook=False, device_placement:bool=True, split_batches:bool=<object object at 0x7f5154a49380>, gradient_accumulation_steps:int=1, cpu:bool=False, dataloader_config:DataLoaderConfiguration|None=None, deepspe ed_plugin:DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|None=Non e, fsdp_plugin:FullyShardedDataParallelPlugin|None=None, torch_tp_plugin:TorchTensorParallelPlugin|None=None, megatron_lm_plugin:MegatronLMPlugin|None=None, rng_types:list[str|RNGType]|None=None, project_dir:str|os.PathLike|None=None, project_config:ProjectConfiguration|None=None, gradient_accu mulation_plugin:GradientAccumulationPlugin|None=None, kwargs_handlers:list[KwargsHandler]|None=None, dynamo_backend:DynamoBackend|str|None=None, dynamo_plugin:TorchDynamoPlugin|None=None, deepspeed_plugins :DeepSpeedPlugin|dict[str,DeepSpeedPlugin]|None=None)
一个上下文管理器,用于使 Learner 适应分布式数据并行模式训练。
类型 | 默认值 | 详情 | |
---|---|---|---|
sync_bn | 布尔值 | True | 是否将所有批归一化替换为 nn.SyncBatchNorm |
in_notebook | 布尔值 | False | 是否从 notebook 启动 |
device_placement | 布尔值 | True | |
split_batches | 布尔值 | <object object at 0x7f5154a49380> | |
gradient_accumulation_steps | int | 1 | |
cpu | 布尔值 | False | |
dataloader_config | DataLoaderConfiguration | None | None | |
deepspeed_plugin | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None | |
fsdp_plugin | FullyShardedDataParallelPlugin | None | None | |
torch_tp_plugin | TorchTensorParallelPlugin | None | None | |
megatron_lm_plugin | MegatronLMPlugin | None | None | |
rng_types | list[str | RNGType] | None | None | |
project_dir | str | os.PathLike | None | None | |
project_config | ProjectConfiguration | None | None | |
gradient_accumulation_plugin | GradientAccumulationPlugin | None | None | |
kwargs_handlers | list[KwargsHandler] | None | None | |
dynamo_backend | DynamoBackend | str | None | None | |
dynamo_plugin | TorchDynamoPlugin | None | None | |
deepspeed_plugins | DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None | None |
distrib_ctx
准备 Learner 以在分布式数据并行模式下训练。它假定脚本/代码将通过命令行使用 accelerate launch
运行,或通过 Accelerate 的 notebook_launcher
函数运行。它还假定 Accelerate 已经通过运行 write_basic_config()
或通过 CLI 调用 accelerate config
并回答提示来配置。
典型用法
with learn.distrib_ctx(): learn.fit(.....)
它将 DistributedTrainer
回调和 DistributedDL
数据加载器附加到 Learner,然后执行 learn.fit(.....)
。退出上下文后,它将移除 DistributedTrainer
和 DistributedDL
,并销毁任何本地创建的分布式进程组。尽管如此,进程仍然附着在 GPU 上。
rank0_first
rank0_first (func, *args, **kwargs)
首先在 Rank-0 进程中执行 func
,然后在其他 Rank 中并行执行。
rank0_first
首先在 rank-0 进程中调用 f()
,然后在其余进程中并行调用,在分布式训练模式下。在单进程、非分布式训练模式下,f()
仅按预期调用一次。
rank0_first()
的一个应用是使通过 untar_data
进行的新下载在由 python -m fastai.launch
启动的分布式训练脚本中安全。
path = untar_data(URLs.IMDB)
变为
path = rank0_first(lambda: untar_data(URLs.IMDB))
一些 Learner 工厂方法可能使用 untar_data
下载预训练模型
learn = text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy)
变为
learn = rank0_first(lambda: text_classifier_learner(dls, AWD_LSTM, drop_mult=0.5, metrics=accuracy))
否则,多个进程将同时下载并损坏数据。
Notebook 启动器
Accelerate 提供 notebook_launcher 功能,让您可以在保持使用 Jupyter Notebook 的同时,在分布式环境中进行训练!
首先,确保 Accelerate 已正确配置。您可以通过从命令行运行 accelerate config
来自行配置 Accelerate,或者在 notebook 的第一个单元格中运行来设置自动填充配置。
from accelerate.utils import write_basic_config
write_basic_config()
配置 Accelerate 后,要使用 notebook_launcher
功能,请将您的训练代码迁移到一个函数中,并将其传递给 notebook_launcher
,例如
---
from fastai.vision.all import *
from fastai.distributed import *
def train():
99, True)
set_seed(= untar_data(URLs.PETS)/'images'
path = ImageDataLoaders.from_name_func(
dls =0.2,
path, get_image_files(path), valid_pct=lambda x: x[0].isupper(), item_tfms=Resize(224))
label_func
= vision_learner(dls, resnet34, metrics=error_rate).to_fp16()
learn with learn.distrib_ctx(in_notebook=True):
1)
learn.fine_tune(---
from accelerate import notebook_launcher
=2)
notebook_launcher(train, num_processes---