TensorFlow 拆包(十):Allreduce

前篇:

不知不觉居然写了十篇了……写这个的初衷是觉得自己是个容易忘事的人,不找个地方做点笔记可能回过头就忘了自己看过什么了。

这篇要分析的是 TensorFlow 自带的 Allreduce 实现。


APIs outside of tf wrapper

TensorFlow 中常规的使用操作是:

1
import tensorflow as tf

但是实际上 TensorFlow 目录下的 __init__.py 里面并没有把全部的内容都包含进去,另外的内容需要通过:

1
from tensorflow.xxx.xxx import xxx

这样的方式直接导入单独的包。

然后更奇怪的是有的 API 明明不在 contrib 中,按理说应该算是官方库中正式内容了,但是在官网的文档中却找不到。也可能是正要从 contrib 往官方库中移?

举例来说前面提过的 StagingArea 就是这样,然后这里的 Allreduce 操作的 API 也是,这就让人很好奇是不是还有别的什么不常用到的 API 可能在某些场景下会有奇效?

扯远了,

collective op

首先是 tensorflow.python.ops 下的 collective_ops.py 这个文件,一共包含了 3 个 API:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def all_reduce(t, group_size, group_key, instance_key, merge_op, final_op,
subdiv_offsets=(0,))
"""Reduces tensors collectively, across devices.

Args:
t: the tensor to be reduced.
group_size: the total number of tensors to be collectively reduced.
Each must reside on a different device.
group_key: an integer identifying the group of devices.
instance_key: an integer identifying the participating group of Ops.
merge_op: string naming the binary Op to be applied to compute each
partial reduction.
final_op: string naming the unary Op to be applied to each fully
reduced value. Can be 'Id' for no operation.
subdiv_offsets: a list of integer offsets into the tensor at which each
independent subdivision should begin. Use [0] if no subdivision should
be done.

Returns:
An Op implementing the distributed reduction.

Raises:
ValueError: if any of the input parameter constraints are not met.
"""

def broadcast_send(t, shape, dtype, group_size, group_key, instance_key)
"""Broadcasts one tensor to a group of others, across devices.

Args:
t: the tensor to be sent.
shape: the shape of the tensor being sent, which must agree with t.
dtype: the type of the tensor being sent, which must agree with t.
group_size: one plus the number of receiving tensors, i.e. the total
number of devices participating. Each tensor must reside on a
different device.
group_key: an integer identifying the group of devices.
instance_key: an integer identifying the participating group of Ops.

Returns:
An Op implementing the distributed broadcast send.

Raises:
ValueError: if any of the input parameter constraints are not met.

Note that the shape and dtype arguments appear redundant since they
should be obtainable from t. The are two reasons for including
them. First, the shape and type of tensors passed via broadcast must
be known ahead of time in their most specific form so that the receive
side can allocate memory for the operation and shape/type inference can
carry forward from there. Including the same declarations on the
send side clarifies a commitment already made. Secondly, having nearly
identical use syntax for send and receive sides may simplify tool-driven
generation of broadcast.
"""

def broadcast_recv(shape, dtype, group_size, group_key, instance_key)
"""Receives a broadcasts tensor, across devices.

Args:
shape: Shape of the tensor to be received.
dtype: Type of the tensor to be received.
group_size: one plus the number of receiving tensors, i.e. the total
number of devices participating. Each tensor must reside on a
different device.
group_key: an integer identifying the group of devices.
instance_key: an integer identifying the participating group of Ops.

Returns:
An Op implementing the broadcast receive.

Raises:
ValueError: if any of the input parameter constraints are not met.
"""

基本的用法示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
with tf.device('cpu:0'):
v0 = tf.Variable([1, 1, 1], dtype=tf.float32)

with tf.device('gpu:0'):
v1 = tf.Variable([2, 2, 2], dtype=tf.float32)

with tf.device('gpu:1'):
v2 = tf.Variable([3, 3, 3], dtype=tf.float32)

sess = tf.Session(config=CONFIG)

sess.run(tf.global_variables_initializer())

print(sess.run([v0, v1, v2]))

sum_reduce = []
# with tf.device('cpu:0'):
# out.append(collective_ops.all_reduce(v0, 3, 1, 1, 'Add', 'Id'))
with tf.device('gpu:0'):
sum_reduce.append(collective_ops.all_reduce(v1, 2, 0, 1, 'Add', 'Id'))
with tf.device('gpu:1'):
sum_reduce.append(collective_ops.all_reduce(v2, 2, 0, 1, 'Add', 'Id'))
print(sess.run(sum_reduce))

average_reduce = []
with tf.device('gpu:0'):
average_reduce.append(collective_ops.all_reduce(v1, 2, 1, 1, 'Add', 'Div'))
with tf.device('gpu:1'):
average_reduce.append(collective_ops.all_reduce(v2, 2, 1, 1, 'Add', 'Div'))
print(sess.run(average_reduce))

print(sess.run([v0, v1, v2]))

print('==========================')

bcast = []
# with tf.device('cpu:0'):
# bcast.append(collective_ops.broadcast_send(v0, v0.shape, v0.dtype, 2, 3, 1))
with tf.device('gpu:0'):
bcast.append(collective_ops.broadcast_send(v0, v0.shape, v0.dtype, 2, 3, 2))
with tf.device('gpu:1'):
bcast.append(collective_ops.broadcast_recv(v0.shape, v0.dtype, 2, 3, 2))

print(sess.run(bcast))

参数中的 t 是每个 device 上要进行 reduce 的本地 Tensor,group_size 是一共需要参与的 Tensor 数量,merge_op 和 final_op 分别是 reduce 聚合时和聚合结束之后要做的事情。

然后比较坑的是 group_key 和 instance_key 这两个参数,注释写的太不明确了(还是我理解能力有问题?)。instance_key 应该是用于标识完整的一次 reduce 操作,而 group_key 具体是怎么来的还不是很清楚。测试的时候整体换成另外的数字都是可以正常工作的。

另外 all_reduce 和 broadcast 都似乎不能在 GPU 和 CPU 间进行工作?选择两块 GPU 卡之前是没有问题的,而再加上一个 CPU 就报错了。


其中的具体实现来自于 gen_collective_ops 这个包,那我们就知道这又是个用 C++ 写的然后封装到 python 下面用的操作了。

接口定义在 tensorflow/core/ops/collective_ops.cc 中,C++ 部分的实际实现在 tensorflow/core/kernels/collective_ops.cc 中,但是这里的 CollectiveReduceOpKernelCollectiveBcastSendOpKernelCollectiveBcastRecvOpKernel 三个类也主要还是用于设置输入输出的参数信息、检查结果等等,更进一步的实现要到 tensorflow/core/common_runtime/base_collective_executor.cc 中。

继续深挖下去之后,在 collective_param_resolver_local.cc 中看到了这样一段:

1
2
3
cp->instance.impl_details.collective_name =
(cp->instance.type == BROADCAST_COLLECTIVE) ? "HierarchicalTreeBroadcast"
: "RingReduce";

……

嗯,所以这里的实现又是注册、又是各种封装的,结果到最后只有两种算法。

目前这里的 all_reduce 只有环状通信的算法实现,broadcast 则只有二叉树广播方式的算法实现。

group_key

要想搞清楚这个 group_key 是怎么回事,还得从 tensorflow/core/kernels/collective_ops.cc 这个接口开始。

CollectiveReduceOpKernelCollectiveBcastSendOpKernelCollectiveBcastRecvOpKernel 三个类的 ComputeAsync() 方法的基本结构都差不多:

  • 准备输出用的 Tensor,由于 Reduce 操作本身带有输入,这里也会尝试是否可以重用输入的 Tensor
  • 调用 CanProceedWithCompute() 方法检查各项参数,对 group_key 的检查也在这里完成
  • 调用对应的实现算法完成计算

单机环境下的 CanProceedWithCompute() 最后会调用 CollectiveParamResolverLocal::CompleteGroupLocal() ,group_key 在这里只是完全作为一个 map 的关键字来使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
mutex_lock l(group_mu_);
auto it = group_table_.find(cp->group.group_key);
if (it == group_table_.end()) {
gr = new GroupRec;
gr->group.group_key = cp->group.group_key;
gr->group.group_size = cp->group.group_size;
gr->group.device_type = cp->group.device_type;
group_table_[gr->group.group_key].reset(gr);
VLOG(2) << "New group_key=" << gr->group.group_key
<< " group_size=" << gr->group.group_size;
} else {
gr = it->second.get();
}
}

若某个 group_key 是第一次被使用,则与之关联的 GroupRec 会用当前创建该 op 的默认设备类型来作为整个 GroupRec 的设备类型。

当下一个 collective_op 创建时,再对目标的设备类型和根据 group_key 找出来的 GroupRec 作对比,不一致则报错,因此这里确实是限制了参与 reduce 或者 broadcast 的所有 op 都要是同一种设备类型的。

另外需要注意的是,同一个 group 中参与 all_reduce 和 broadcast 的 op 必须要和设备独立一一对应,所以也不可以在一块卡上同时发起 broadcast 的 send 和 recv,或者在同一块卡上的两个变量间进行 allreduce。

这个设备限制我觉得还是比较奇怪的,从 CPU 向当前节点下的所有 GPU 设备广播我觉得是很常规的一种逻辑啊……

instance_key 的作用也与 group_key 类似,InstanceRec 在这里主要是用来维护几个 mutex,主要负责多个 op 之间的同步,同时发生的多个 collective_op 操作只要 instance 不一样相互之间是不影响的。


To be continued.

0%