管道并行

DeepSpeed v0.3 包含对管道并行的全新支持!管道并行通过将模型的层划分为可以并行处理的阶段,来提高深度学习训练的内存和计算效率。DeepSpeed 的训练引擎提供混合数据和管道并行,并且可以与模型并行(例如 Megatron-LM)进一步结合。下面展示了 3D 并行的示意图。我们最新的 结果 表明,这种 3D 并行使训练具有超过 **万亿** 个参数的模型成为可能。

3D parallelism in DeepSpeed

DeepSpeed 使用梯度累积来提取管道并行(如下所示)。每批训练数据被划分为可以由管道阶段并行处理的微批次。一旦一个阶段完成对微批次的正向传递,激活内存就会被传递到管道中的下一个阶段。类似地,当下一个阶段完成对微批次的反向传递时,关于激活的梯度会通过管道反向传递。每个反向传递都会在本地累积梯度。接下来,所有数据并行组都会并行执行梯度归约。最后,优化器会更新模型权重。

下面展示了 DeepSpeed 如何使用混合双路数据并行和两阶段管道并行训练一个包含八个微批次的批次。GPU 0 和 2 被安排在一个管道中,并且将交替执行正向 (F) 和反向 (B) 传递。然后,它们将与各自的数据并行对应部分 GPU 1 和 3 进行全归约 (AR) 梯度。最后,两个管道阶段会更新它们的模型权重。

Pipeline Schedule

管道并行的入门

DeepSpeed 努力加速简化管道并行训练的过程。本节通过准备 torchvisionAlexNet 模型,提供混合数据和管道并行训练的第一步。

表达管道模型

管道并行要求模型被表达为一系列层。在正向传递中,每层都会使用前一层的输出。事实上,不需要为管道并行模型指定 forward()!管道并行模型的正向传递隐式地采用以下形式

def forward(self, inputs):
    x = inputs
    for layer in self.layers:
        x = layer(x)
    return x

PyTorch 的 torch.nn.Sequential 是一个方便的容器,用于表达管道并行模型,并且可以在没有任何修改的情况下被 DeepSpeed 并行化

net = nn.Sequential(
    nn.Linear(in_features, hidden_dim),
    nn.ReLU(inplace=True),
    nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)

PipelineModule 使用其 layers 参数作为组成模型的层序列。初始化后,net 被划分为两个管道阶段,并且它的层被移动到相应的 GPU。如果存在多个 GPU,DeepSpeed 也会使用混合数据并行。

注意: GPU 的总数必须能被管道阶段的数量整除。

注意: 对于大型模型训练,请参阅 内存高效的模型构建

AlexNet

让我们来看一个简化的 torchvisionAlexNet 实现

class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            ...
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            ...
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x

AlexNet 主要是一些 Sequential 子模块的组合。我们可以通过将它的子模块展平成一个单一的层序列,将它转换为 PipelineModule

class AlexNetPipe(AlexNet):
    def to_layers(self):
        layers = [
            *self.features,
            self.avgpool,
            lambda x: torch.flatten(x, 1),
            *self.classifier
        ]
        return layers

from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)

注意: 上面 layers 中间的 lambda 不是 torch.nn.Module 类型。任何实现了 __call__() 的对象都可以是 PipelineModule 中的层:这允许在管道中进行方便的数据转换。

输入和输出

遵循 torch.nn.Sequential,每层的输入和输出必须是单个 torch.Tensor 或一个 tuple 张量。在实践中,一些模型可能需要修改它们的正向传递,以打包和解包传递给 forward() 的参数。考虑一个 Transformer 块堆栈的简化实现

class TransformerBlock(nn.Module)
    ...
    def forward(self, hidden, mask):
        output = self.compute(hidden, mask)
        return output
    ...

stack = [ TransformerBlock() for _ in range(num_layers) ]

需要对 TransformerBlock 进行两个修改

  1. 参数必须被收集到一个 tuple 中。
  2. mask 也必须从 forward() 返回,以便传递到下一层。

这些修改可以使用一个简短的子类来完成

class TransformerBlockPipe(TransformerBlock)
    def forward(self, inputs):
        hidden, mask = inputs
        output = super().forward(hidden, mask)
        return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]

训练循环

管道并行交织了正向和反向传递,因此训练循环不能被划分为 forward()backward()step() 的独立阶段。相反,DeepSpeed 的管道引擎提供了一个 train_batch() 方法,该方法会推进管道引擎,直到下一批训练数据被使用并且模型权重被更新。

train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)

上面的 train_batch() 示例等同于使用传统数据并行 DeepSpeed 的以下示例

train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
    batch = next(data_iter)
    loss = engine(batch)
    engine.backward(loss)
    engine.step()

处理数据

数据并行训练通常让每个工作器在每批的开始独立地执行 IO。但是,在管道并行环境中,只有第一个阶段会使用输入数据,只有最后一个阶段会使用标签来计算损失。

注意: 管道引擎期望数据加载器返回两个项目的 tuple。第一个返回的项目是输入批次数据,第二个项目是用于损失计算的数据。与之前一样,输入和标签应该是 torch.Tensor 类型或一个 tuple 张量。

为了方便起见,当数据集被提供给 deepspeed.initialize() 时,DeepSpeed 管道引擎可以构建一个分布式数据加载器。DeepSpeed 处理数据加载的其他复杂性,因此管道训练循环变为

engine, _, _, _ = deepspeed.initialize(
    args=args,
    model=net,
    model_parameters=[p for p in net.parameters() if p.requires_grad],
    training_data=cifar_trainset())

for step in range(args.steps):
    loss = engine.train_batch()

当然,DeepSpeed 可以与你想要使用的任何数据加载器一起使用。数据加载器应该由管道中的第一个和最后一个阶段构建。每个工作器应该加载大小为 engine.train_micro_batch_size_per_gpu() 的微批次,并且每个 train_batch() 会被查询总共 engine.gradient_accumulation_steps() 次。

注意! 管道引擎会拉取数据,而不是迭代它。数据流在训练批次中间不能被清空,这一点至关重要。每次调用 train_batch() 会从数据迭代器中拉取总共 engine.gradient_accumulation_steps() 个微批次数据。

DeepSpeed 提供了一个便捷类 deepspeed.utils.RepeatingLoader,它只是简单地包装一个可迭代对象(例如数据加载器),并在到达末尾时重新启动它

train_loader = deepspeed.utils.RepeatingLoader(train_loader)
train_iter = iter(train_loader)
for step in range(args.steps):
    loss = engine.train_batch(data_iter=train_iter)

高级主题

负载平衡管道模块

管道并行训练的性能很大程度上依赖于负载平衡。DeepSpeed 提供了几种将模型划分为 GPU 的机制。这些策略可以使用 partition_method 关键字参数设置为 PipelineModule。以下是 DeepSpeed 当前提供的分区方法

  • partition_method="parameters" (默认) 平衡每个管道阶段的可训练参数数量。这在内存受限的环境中以及当层的尺寸与计算时间成正比时特别有用。
  • partition_method="type:[regex]" 平衡类名与 [regex] 匹配的层。正则表达式不区分大小写。例如,partition_method="type:transformer" 会平衡每个阶段的 Transformer 层数量。
  • partition_method="uniform" 平衡每个阶段的层数量。

内存高效的模型构建

构建一个 Sequential 容器并将其提供给 PipelineModule 是指定管道并行模型的一种便捷方式。但是,这种方法在处理大型模型时会遇到可扩展性问题,因为每个工作器都会在 CPU 内存中复制整个模型。例如,一台拥有 16 个 GPU 的机器必须拥有与模型大小 16 倍相等的本地 CPU 内存。

DeepSpeed 提供了一个 LayerSpec 类,它会延迟模块的构建,直到模型层被划分为不同的工作器。然后,每个工作器只分配分配给它的层。因此,与上一段中的示例相比,使用 LayerSpec,一台拥有 16 个 GPU 的机器只需要在 CPU 内存中分配模型大小的 1 倍,而不是 16 倍。

以下是一个简化的 AlexNet 模型示例,但仅使用 LayerSpec 表达。请注意,语法几乎没有变化:nn.ReLU(inplace=True) 仅仅变成 LayerSpec(nn.ReLU, inplace=True)

from deepspeed.pipe import PipelineModule, LayerSpec
class AlexNetPipe(PipelineModule):
    def __init__(self, num_classes=10, **kwargs):
        self.num_classes = num_classes
        specs = [
            LayerSpec(nn.Conv2d, 3, 64, kernel_size=11, stride=4, padding=2),
            LayerSpec(nn.ReLU, inplace=True),
            ...
            LayerSpec(nn.ReLU, inplace=True),
            LayerSpec(nn.Linear, 4096, self.num_classes),
        ]
        super().__init__(layers=specs, loss_fn=nn.CrossEntropyLoss(), **kwargs)

绑定层

一些模型无法完全表达为流水线并行模型,因为一些层在流水线中被重复使用。例如,基于 Transformer 的语言模型通常在流水线的早期使用嵌入层将词汇映射到隐藏状态,然后在流水线的末尾使用嵌入层将隐藏状态映射回词汇。如果模型被限制为纯粹的流水线并行,这种嵌入重复将禁止流水线并行。

DeepSpeed 提供了一个 TiedLayerSpec,它是 LayerSpec 的扩展。 TiedLayerSpec 需要一个额外的参数: key。每个层的重复都使用 TiedLayerSpec 指定,key 字段用于标识层被重复使用的位置。

绑定层在每个拥有重复实例的流水线阶段上复制。训练然后像往常一样进行,但所有反向传播完成后,会额外添加一个绑定梯度的全约简。全约简确保绑定层的权重在流水线阶段之间保持同步。

更新: