Skip to main content

动手下的流水并行代码记录

python两种解包操作:

* 用于解包可迭代对象(如列表、元组)

** 用于解包字典

1. p2p_communication.py

hcg是混合通信组,它包含了数据并行(DP)、模型并行(MP)和流水线并行(PP)的通信组信息。例如:

# 获取各种并行度
dp_degree = hcg.get_data_parallel_world_size() # 数据并行度
mp_degree = hcg.get_model_parallel_world_size() # 模型并行度
pp_degree = hcg.get_pipe_parallel_world_size() # 流水线并行度

# 获取当前进程的角色
dp_rank = hcg.get_data_parallel_rank() # 在数据并行组中的rank
mp_rank = hcg.get_model_parallel_rank() # 在模型并行组中的rank
pp_rank = hcg.get_stage_id() # 在流水线中的stage ID
# 获取不同的通信组
dp_group = hcg.get_data_parallel_group() # 数据并行通信组
mp_group = hcg.get_model_parallel_group() # 模型并行通信组
pp_group = hcg.get_pipe_parallel_group() # 流水线并行通信组

1.1 recv_meta函数

以接收数据为例解析

def recv_meta(self, group):
src_rank = _hcg._get_p2p_prev_rank()

data_numel = paddle.empty([1], dtype="int64")#首先创建一个数据大小的缓冲区,用来接收当前发送的数据的长度,此处的发送数据为一个或多个tensor的信息,具体格式如下:
# 多个张量的数据格式:
[
1, # tensor_type=1 表示多个张量
2, # tensor_num=2 表示有2个张量
# 第一个张量信息
2, # shape_len=2
32, 64, # shape=[32,64]
1, # dtype_number
0, # stop_gradient=False
# 第二个张量信息
3, # shape_len=3
16, 32, 64, # shape=[16,32,64]
2, # dtype_number
1 # stop_gradient=True
]
paddle.distributed.recv(data_numel, src=src_rank, group=group)
data_numel = data_numel.item()#以上面数据为例,则将接收到data_numel=10,这里是张量的信息,而不是张量的具体内容

data = paddle.empty([data_numel], dtype="int64")#设置此时张量大小为[10],即1*10
paddle.distributed.recv(data, src=src_rank, group=group)
data = data.numpy().tolist() #接收到数据转换成numpy格式,并展平为python列表
# parse data
tensor_type = data.pop(0) #这里数据用来判断有一个还是多个张量

if tensor_type == 1:
tensor_num = data.pop(0)
else:
tensor_num = 1

shapes = []
dtypes = []
stop_grads = []

for _ in range(tensor_num):
shape_len = data.pop(0)
shape = data[:shape_len]#这里shepe=[32,64],因此shape_len=2,即取此时data中的前两个数据
data = data[shape_len:]
dtype_number = data.pop(0)
#PADDLE_TO_NUMBER = { paddle.float16: 0,paddle.float32: 1,paddle.float64: 2,paddle.int32: 3,paddle.int64: 4,paddle.bfloat16: 5,paddle.bool: 6},不同数字代表当前tensor数据的不同数据类型
stop_gradient = bool(data.pop(0))#是否在该张量处停止梯度传递

shapes.append(shape)
dtypes.append(dtype_number)
stop_grads.append(stop_gradient)

assert (
len(data) == 0
), f"send data must be parsed zero, now it is {data}"

if tensor_type == 0:
self.recv_shape_message = shapes[0]
self.recv_dtype_message = dtypes[0]
self.recv_stop_gradient = stop_grads[0]
else:
self.recv_shape_message = tuple(shapes)
self.recv_dtype_message = tuple(dtypes)
self.recv_stop_gradient = tuple(stop_grads)

1.2 batch_send_recv_on_calc_stream函数


def batch_send_recv_on_calc_stream(p2p_op_list):
group = p2p_op_list[0].group
if _warn_cur_rank_not_in_group(group):
return

need_check = strtobool(os.getenv('FLAGS_pp_check_naninf', '0'))
if need_check:
for p2p_op in p2p_op_list:
if p2p_op.op == _send_on_calc_stream:
err_msg = check_naninf(p2p_op.tensor)
if err_msg is not None:
raise ValueError(
f"{err_msg}. Tensor contains inf or nan values at rank {paddle.distributed.get_rank()}"
)

group = _get_global_group() if group is None else group
backend = group.backend
tasks = []
with _coalescing_manager(group, tasks):# 通信操作合并管理器,集中执行calc_stream中的通信操作,一个op对应一个操作
for p2p_op in p2p_op_list:
op = p2p_op.op
tensor = p2p_op.tensor
peer = p2p_op.peer
comm_group = p2p_op.group
nranks = p2p_op.nranks
rank_id = p2p_op.rank_id
op(tensor, comm_group, peer, nranks, rank_id)

2. stream/send.py

LayerHelper 是 PaddlePaddle 中用于在静态图模式下创建和添加算子(Operator)的辅助类。

2.1 send函数

def send(
tensor: Tensor,
dst: int = 0,
group: Group | None = None,
sync_op: bool = True,
use_calc_stream: bool = False,
) -> task | None:
"""

Send a tensor to the destination device.

Args:
tensor (Tensor): The tensor to send. Support float16, float32, float64, int32, int64, int8, uint8 or bool as its data type.
dst (int, optional): Rank of the destination device. If none is given, use `0` as default.
group (Group, optional): Communicate in which group. If none is given, use the global group as default.
sync_op (bool, optional): Indicate whether the communication is sync or not. If none is given, use true as default.
use_calc_stream (bool, optional): Indicate whether the communication is done on calculation stream. If none is given, use false as default. This
option is designed for high performance demand, be careful to turn it on except you are clearly know its meaning.

Returns:
Return a task object.

Examples:
.. code-block:: python

>>> # doctest: +REQUIRES(env: DISTRIBUTED)
>>> import paddle
>>> import paddle.distributed as dist

>>> dist.init_parallel_env()
>>> local_rank = dist.get_rank()
>>> if local_rank == 0:
... data = paddle.to_tensor([[4, 5, 6], [4, 5, 6]])
... task = dist.stream.send(data, dst=1, sync_op=False)
>>> else:
... data = paddle.to_tensor([[1, 2, 3], [1, 2, 3]])
... task = dist.stream.recv(data, src=0, sync_op=False)
>>> task.wait() # type: ignore[union-attr]
>>> out = data.numpy()
>>> print(out)
[[4, 5, 6], [4, 5, 6]]
"""
if _warn_cur_rank_not_in_group(group):
return

if not sync_op and use_calc_stream:
raise RuntimeError(
"use_calc_stream can only be True in sync op behavior."
)

if framework.in_dynamic_mode():
group = _get_global_group() if group is None else group
dst_rank_in_group = _get_or_throw_group_rank(dst, group)

return _send_in_dygraph(
tensor, dst_rank_in_group, group, sync_op, use_calc_stream
)
else:
assert (
group is None
), "Group can not be used in static graph mode for now."
return _send_in_static_mode(
tensor, dst, group, sync_op, use_calc_stream
)

2.2 _send_in_static_mode函数

def _send_in_static_mode(
tensor, dst_rank_in_group, group, sync_op, use_calc_stream
):
op_type = 'send_v2'
data_feeder.check_variable_and_dtype(
tensor,
'tensor',
['float16', 'float32', 'float64', 'int32', 'int64', 'uint16'],
'send',
)

ring_id = 0 if group is None else group.id
# 1. 创建LayerHelper实例
helper = framework.LayerHelper(op_type, **locals())#locals会返回函数内部,当前位置之前定义的变量,以及函数自己的参数变量,以字典的形式返回,而**则是用来解包字典的
# - op_type: 操作类型,这里是'send_v2'
# - **locals(): 传入当前函数的所有局部变量

# 2. 添加算子
helper.append_op(
type=op_type, # 算子类型
inputs={'X': [tensor]}, # 输入张量
attrs={ # 算子属性
'ring_id': ring_id,
'peer': dst_rank_in_group,
'use_calc_stream': sync_op,
},
)

这里的LayerHelper 是 PaddlePaddle 中用于在静态图模式下创建和添加算子(Operator)的辅助类。

2.3 _send_in_dygraph函数


def _send_in_dygraph(
tensor, dst_rank_in_group, group, sync_op, use_calc_stream
):
if use_calc_stream:
return group.process_group.send_on_calc_stream(
tensor, dst_rank_in_group
)

task = group.process_group.send(tensor, dst_rank_in_group, sync_op)#发起发送操作,返回一个task对象
if sync_op:
task.wait()#阻塞直到发送给操作完成

return task