TensorFlow 拆包(九):High Level APIs

前篇:

这篇来研究一下 TF 中的一些高级 API。

TensorFlow 由于一直是在开源的社区环境中发展起来的,早期的一些 API 都比较简单粗暴(更直白地说就是不那么好用),以至于在它之上封装的更友好的 Keras 可能在大部分的使用者群体中会有更高的出现率。后来的 TensorFlow 中也有吸收 Keras 里面一些比较好的结构,有出现像 tf.layers 这样的更高层封装,可以期待一下 2.0 以后会不会大幅优化上层的编码 API 吧。

那这里说的高级 API 是什么呢?

官网的 guide 里面列了几个:

  • Keras:一个神奇的包 tf.keras,官方提供的从 TensorFlow 本身向 Keras API 兼容的实现(感觉怪怪的,底层库中包含了一个对高层封装的兼容???)
  • Eager Execution:TensorFlow 的动态计算图实现,类似普通的 Numpy 或者 Pytorch 的执行模式
  • Importing Data:对输入数据的流水线封装 API
  • Estimator:用于把封装好的模型方便地扩展到多卡/多机的高级 API

其他的还有包括像 StagingArea(构建软件流水,神器!!!)等等目前还在 tf.contrib 中处于实验阶段的很多东西,开源的力量太强大了,每隔一段时间就有很多新功能被社区添加进库中。


Estimator

tf.keras 和 Estimator 的设计都是为了让用户能够更方便地编写网络,话说简单看了下 Estimator 的用法,API 的设计方式应该大概率是从 Keras 里面借鉴的。

具体的使用这里就不多记了,这里 写了个很小的例子,直接开始拆 Estimator 的实现吧。

核心是 tf.estimator.Estimator 这个类,先看初始化参数:

1
2
3
4
5
6
7
__init__(
model_fn,
model_dir=None,
config=None,
params=None,
warm_start_from=None
)

第一个是用来构建网络的模型函数,具体后面详细分析;model_dir 用于指定保存的参数、checkpoint、log 信息等等存放的目录;再后面的几个都是一些额外的配置选项。

让我觉得非常怪的是官网的介绍页面说 Estimator 的优势是不需要用户建图……我真是一脸懵逼。或许对于 TF 内置的一些事先建好的 Estimator 是这样吧,但是如果想自定义呢?……写文档的人吹的有点过了吧。

创建 Estimator 时,首先初始化各项配置参数信息(值得一提的是 model_dir 是允许被 config 中的选项覆盖的),设置训练或者验证时用的数据分布策略(DistributionStrategy,后面再详细分析),设置参数和图节点分布的 device_fn;之后简单检查 model_fn 的参数是否符合规范,然后完处理完 warm_start 的一些设置就结束了。

传入的 model_fn 是用于构建 Estimator 代表的模型网络的核心函数,它能够接受的参数名有严格的规定:

  • features:网络的输入数据,即一个 batch 的数据;
  • labels:网络的标签数据,即一个 batch 的目标标签;
  • mode:可选,但是一般都必须要有,要不实现起来会很麻烦。这个值会根据执行的模式由 Estimator 传入,会有 3 种,tf.estimator.ModeKeys.PREDICTtf.estimator.ModeKeys.TRAINtf.estimator.ModeKeys.EVALUATE
  • params:可选,对应的是 Estimator 的初始化参数;
  • config:可选,对应的是 Estimator 的初始化参数

Run the Estimator

接下来是 Estimator 类的三个调用方法 evaluate、predict 和 train,从字面上就能够看出来各自对应的是什么功能了(Keras 里面对应的 API 应该是 evaluate、predict 和 fit)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
evaluate(
input_fn,
steps=None,
hooks=None,
checkpoint_path=None,
name=None
)

predict(
input_fn,
predict_keys=None,
hooks=None,
checkpoint_path=None,
yield_single_examples=True
)

train(
input_fn,
hooks=None,
steps=None,
max_steps=None,
saving_listeners=None
)

三个方法的共同参数是这个 input_fn,这是类似前面 model_fn 一样,也需要 Estimator 的创建者写好的输出数据产生函数。这个函数的返回值是一个二元组 (features, labels) 对应了 model_fn 的前两个输入参数。

train 中的 steps 表示从哪里开始训练,Estimator 将首先从保存的 checkpoint 中找到最接近的保存点,然后开始这次的训练,max_steps 则简单地就是训练的 batch 数了。

1
2
3
4
5
def _train_model(self, input_fn, hooks, saving_listeners):
if self._train_distribution:
return self._train_model_distributed(input_fn, hooks, saving_listeners)
else:
return self._train_model_default(input_fn, hooks, saving_listeners)

如果在初始化时没有配置 _train_distribution 项,则会使用默认的方式来执行 train 操作,最终把 model_fn 也绑定出来:

1
2
estimator_spec = self._call_model_fn(
features, labels, model_fn_lib.ModeKeys.TRAIN, self.config)

model_fn 中传入输入数据以及 ModeKeys.TRAIN,接下来实际的执行函数是:

1
2
def _train_with_estimator_spec(self, estimator_spec, worker_hooks, hooks,
global_step_tensor, saving_listeners)

添加 TensorBoard 中的 Summary、创建参数保存点、如果有 saving_listeners 则额外添加到运行的 hooks 中,之后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
with training.MonitoredTrainingSession(
master=self._config.master,
is_chief=self._config.is_chief,
checkpoint_dir=self._model_dir,
scaffold=estimator_spec.scaffold,
hooks=worker_hooks,
chief_only_hooks=(
tuple(chief_hooks) + tuple(estimator_spec.training_chief_hooks)),
save_checkpoint_secs=0, # Saving is handled by a hook.
save_summaries_steps=self._config.save_summary_steps,
config=self._session_config,
log_step_count_steps=self._config.log_step_count_steps) as mon_sess:
loss = None
while not mon_sess.should_stop():
_, loss = mon_sess.run([estimator_spec.train_op, estimator_spec.loss])

嗯……这段代码是不是很熟悉,没错,官方建议的常规 TensorFlow 训练代码就是要写成这个格式。

至此,train 部分基本上分析完了(带 DistributionStrategy 的版本后面再说),整个过程就是把一套常规的 TensorFlow 代码的各个部分做了几级封装,要说有什么特别的就是它把 Summary 和 Saver 都默认包含在内了。

如果按照这个格式解开成普通的 TensorFlow 代码的话,可以说是非常好的官方范例了。

EstimatorSpec

然后再注意到 model_fn 的返回值,前面也提到了 evaluate、predict 和 train 这三个实际执行的方法其实最终都是把 input_fn 中产生的数据传给 model_fn 来跑,这里的控制差别就需要配合对不同的 mode 选项的分支判断来做,所以一个 model_fn 函数写出来大概是这个样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def model_fn(features, labels, mode):

xxxxxx

if (mode == tf.estimator.ModeKeys.PREDICT):
...
return tf.estimator.EstimatorSpec(mode=mode, predictions=predictions)

if (mode == tf.estimator.ModeKeys.EVAL):
...
return tf.estimator.EstimatorSpec(mode=mode, loss=cross_entropy, eval_metric_ops=eval_metric_ops)

if (mode == tf.estimator.ModeKeys.TRAIN):
...
return tf.estimator.EstimatorSpec(mode=mode, loss=cross_entropy, train_op=train_step, eval_metric_ops=eval_metric_ops)

不管是哪种模式,model_fn 最终的返回值都需要通过 EstimatorSpec 这个结构来传出去,其属性有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@staticmethod
__new__(
cls,
mode,
predictions=None,
loss=None,
train_op=None,
eval_metric_ops=None,
export_outputs=None,
training_chief_hooks=None,
training_hooks=None,
scaffold=None,
evaluation_hooks=None,
prediction_hooks=None
)
  • mode:对应三种不同的模式标识;
  • predictions:预测结果,要是一个 Tensor 或者 Tensor 组成的 dict;
  • loss:训练的损失函数值,必须是一个标量或者形状为 [1] 的 Tensor;
  • train_op:训练 step 的 op,一般是某个 Optimizer 的 minimize() 方法返回的那个;
  • eval_metric_ops:一个包含了验证结果的 dict,可以是 Metric 类,或者一个 (metric_tensor, update_op) 的元组;
  • 其他…略了

要求 ModeKeys.TRAIN 模式返回的必须包含 loss 和 train_op,ModeKeys.EVAL 模式返回的必须包含 lossModeKeys.PREDICT 模式返回的必须包含 predictions。

DistributionStrategy

前面说了,Estimator 这套 API 的出现是因为开发者希望能够方便用户快速搭网络,并且易于扩展到各种不同的计算结构上。

那么 Estimator 本体上面已经拆过了,没什么神秘的,就是个简单封装,距离实现单机到多卡/多机的这种扩展其实还差挺多的,而 DistributionStrategy 就是用来补上中间那个 Gap 的。

官方提供的资料是个 Github 上的 README:Distribution Strategy

里面给的使用的例子都非常简明易懂:

1
2
3
4
5
6
distribution = tf.contrib.distribute.MirroredStrategy()
config = tf.estimator.RunConfig(train_distribute=distribution)

classifier = tf.estimator.Estimator(model_fn=model_fn, config=config)
classifier.train(input_fn=input_fn)
classifier.evaluate(input_fn=input_fn)

下面的三行是个普通的 Estimator 的使用,跟前面一样,唯一区别的就是在 tf.estimator.RunConfig 中创建一个 DistributionStrategy,然后作为 config 选项传递给 Estimator 即可。

确实很方便啊。


如果有看过官方的 benchmarks中对多卡/多机的写法的话,可以发现那个写法大致上跟 DistributionStrategy 的设计非常像。

_train_model_distributed 的大致结构是这个样子的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
with self._train_distribution.scope():
...
features, labels = estimator_util.parse_iterator_result(
iterator.get_next())
grouped_estimator_spec = self._train_distribution.call_for_each_tower(
self._call_model_fn,
features,
labels, # although this will be None it seems
model_fn_lib.ModeKeys.TRAIN,
self.config)
loss = self._train_distribution.unwrap(
self._train_distribution.reduce(
distribute_lib.get_loss_reduction(),
grouped_estimator_spec.loss,
destinations='/device:CPU:0'))[0]
distributed_train_op = grouped_estimator_spec.train_op
...

_train_distribution.scope 中封装的是一些 variable_scopecustom_getter,用于在用tf.get_variable() 创建的变量之上再套上一层额外的封装控制。

对普通的 tf.get_variable() 套上 variable_scope 之后可以控制这个变量创建的设备位置等等很多东西,第一次看到这种写法的时候还觉得这像是一种 hack 的方法,但是不用改变原本里面的代码,还是非常方便的。

以单机多卡为例,call_for_each_tower 是在每块 GPU 卡上跑一遍 Estimator 中给定的 model_fn,即在每一块卡上独立创建一份数据并行的网络。基类里面这个函数是留空的,要求具体的实现类来完成这个部分。

话说这里我有个疑问……model_fn 中的正常写法应该是对训练模式返回一个包含了 Optimizer.minimize() 的 EstimatorSpec,但是多卡并行的过程中不是应该需要做梯度的聚合平均之后再更新到每个变量上吗?而且不同的并行模式下,这部分的处理方式应该也是不一样的,不知道这套 API 要怎么把这些全都统一起来。

看一下 MirroredStrategy 的这个实例里面是怎么实现这个函数的吧,中间部分的代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
# TODO(isaprykin): Create these threads once instead of during every run()
# call.
threads = []
for index, d in enumerate(distribution.worker_devices):
variable_creator_fn = shared_variable_creator.make_fn(
shared_variable_store, index)
t = MirroredStrategy._MirroredTowerThread( # pylint: disable=protected-access
distribution, coord, d, variable_creator_fn, fn,
*values.select_device(d, args), **values.select_device(d, kwargs))
threads.append(t)

for t in threads:
t.start()

distribution.worker_devices 中保存了单机中的多块 GPU 卡设备,这里直接对每块卡挂一个 shared_variable_creator 并且开一些的一对一的线程去处理。

shared_variable_creator 用于处理多卡之间的参数共享,在 device_id 为 0 的设备上调用 get_variable() 函数是创建新变量,并且保存到给定的 shared_variable_store dict 中;在 device_id 大于 0 的设备上调用 get_variable() 则会尝试共用前面创建好的变量。

接下来看一下创建线程这部分的逻辑。

主线程和多个子线程之间的控制这里用了 should_runhas_paused 两个 threading.Event() 来控制。开始的时候,每个线程都调用 should_run.wait() 来等待,等待主线程调用对应的 should_run.set() 来唤醒它们。主线程随后阻塞在 has_paused.wait() 上,等到每个线程完成自己那部分图的构建之后再用 has_paused.set() 唤醒。

话说为啥一定要用多线程来实现这个部分呢……感觉就用普通的单线程循环一样可以做到这里想要的事情。

那么对梯度的聚合和最终的 apply 呢?似乎这部分代码里面根本没看到啊,每个线程的 run() 函数基本上就是跑完各自的网络部分就没了。唯一看上去非常让人介意的是 run() 中在执行 main_fn() 前套了一堆 Python 的 Context,难道又是用有点 hack 的方法完成的?

其中 MirroredTowerContext 这个结构继承了 distribute_lib.TowerContext,只用于call_for_each_tower 中用于处理多块卡之间相同代表数据的同步。

问题是我还是没有看到处理 reduce 等等这些的代码。

然后……抱着一种怀疑的想法,我重新打开了 Optimizer 中关于梯度计算部分的代码!!发现这里已经跟当时拆包第二篇(Optimizer in TF)里看到的不一样了。

例如新的 apply_gradients 中增加了这样的一段:

1
2
3
4
if distribution_strategy_context.has_distribution_strategy():
grads_and_vars = get_filtered_grad_fn(lambda: grads_and_vars)()
return distribution_strategy_context.get_tower_context().merge_call(
self._distributed_apply, grads_and_vars, global_step, name)

那么 DistributionStrategy 是如何配合 Estimator 把原本单机的代码直接扩展开来就很明白了。

当时最早拆包开始时大概是 TensorFlow 的 1.6 版左右。

查了下 Optimizer 中增加部分的 git 记录,差不多是在今年 3 月底的时候加上的,应该是在 TensorFlow 的 1.7 版左右,然后后来又有过一次较大的改动。

Design Philosophy

再看一下官方文档中对 DistributionStrategy 的设计思想。

首先是一些底层的概念:

  • Wrapped values:跨设备相关的变量可以被封装为两种类别,PerDevice 对象表示的变量在每个设备上的值不同,Mirrored 对象表示的变量在每个设备上的值都相同
  • Unwrapping and merging:考虑前面提过的这个函数 call_for_each_tower(fn, w),fn 是模型函数,w 代表一些 Wrapped values。这个函数的调用过程中就包含了变量的 unwrapping 和 merging,假定在设备 d0fn(w0) 得到的结果是 (x, a, v0),在设备 d1fn(w1) 得到的结果是 (x, b, v1)。首先在调用函数之前,w 需要被解包变成 w0 和 w1 然后分别调用 fn 函数。返回的结果有三种情况,第一个值都返回了一个相同的对象 x,则最终 merge 之后还是对象 x;第二个值是每个设备不一样的,则 merge 之后是一个 PerDevice 对象(其实就是个设备和对应值的 map);第三个值是每个设备返回的分别是一组 Mirrored 对象的成员,则 merge 之后是一个 Mirrored 对象。所以 call_for_each_tower(fn, w) 在这里返回得到的就是一组 (x, PerDevice{...}, Mirrored{...})
  • Tower context vs. Cross-tower context:Tower context 指的是对每个设备的封装上下文,通常对每个设备分别跑一遍模型函数就需要这种封装;Cross-tower context 指的是跨设备的封装上下文,比如说像 reduce() 这种所有设备共同参与的一个操作就需要这种封装
  • Worker devices vs. parameter devices:负责计算的设备和存参数的设备,没啥好说的。

更新一个变量的常规操作如下:

  1. 把输入数据集封装在 d.distribute_dataset() 中,然后创建一个 iterator
  2. 对每一个设备共同调用 d.call_for_each_tower() 来分别创建网络模型,并且最终各自得到一组梯度/变量对:d0 上有 {(g0, v0), (g1, v1), ...}d1 上有 {(g'0, v0), (g'1, v1), ...} 等等这样
  3. 调用 d.reduce(VariableAggregation.SUM, t, v) 或者 d.batch_reduce() 来对梯度求和,并且对应到各自的变量上:{(Sum(g0, g'0), v0), (Sum(g1, g'1), v1), ...}
  4. 调用 d.update(v) 来对每一个变量进行更新

3、4 两步如果用 Optimizer 中的 apply_gradients() 方法可以自动完成(……这就是 Optimizer 后来加进去那部分代码的作用),或者在一个 Cross-tower context 中调用 _distributed_apply() 方法也可以。常规的网络层都应该在 Tower context 中被调用。

话说这个 _distributed_apply() 为什么前面带下划线啊喂,这个方法本来不打算直接给人调的吧???大概是 API 还没最终设计好。

嗯,所以 Estimator 本身一点都不神奇,真正这套机制麻烦的地方在 DistributionStrategy 里面,手写一个 DistributionStrategy 应该会是一件很麻烦的事情。

不知道未来这套机制会如何改进得更好用一些。


2018 12月更新

最近正在试图手动 DIY 一个 DistributionStrategy,发现到处都找不到相关的资料,官方的文档方面对这部分也是写的不明不白。

试了下自己继承一个 DistributionStrategy 类,但是发现这个基类的几乎所有功能都是交给另外一个 DistributionStrategyExtend 类来做的,而且直接从空类开始写缺的东西也有点太多了,还没下手成功。准备之后再试试直接继承一个现有的类比如 MirroredStrategy 然后重载掉里面的功能函数试试看。

Estimator 这套机制想要达到的目标是非常好的,但是似乎……由于 TensorFlow 本身过于庞大和复杂,不知道什么时候这两个东西才能真正成为方便用户使用的好接口。


StagingArea

从名字上直译过来应该是用于暂存的区域,这套 API 用于跨 step 地把数据保存到网络 data path 之外的地方,然后可以在另外的 step 中把保存下来的数据取出来。

解释上看起来挺绕的,而实际上用这套 API 实现出来的效果就是——软件流水

例如以下面这个由三个阶段组成的计算过程为例:

在 a/b 和 b/c 之间分别加入 StagingArea 即:

更重要的是加入了暂存结构之后,事实上 a、b、c 三个计算阶段的依赖就被解耦了:

对执行的流程稍微进行一些修改:

1
2
3
4
5
step1: A1
step2: A2 B1
step3: A3 B2 C1
step4: A4 B3 C2
...

原本必须按顺序执行的三个计算阶段现在就可以互不相关地并行执行了,在某些计算与 I/O、数据通信共同存在的环境中,原本可能存在的数据延迟、等待等等就有可能通过流水线的方式隐藏掉!(例如多机分布式训练的情况,实测效果非常好)

关于这套 API 如何使用的介绍这里就不多记了,直接来看 TensorFlow 是怎么实现它的。

Implementation

StagingArea 这个类在 tensorflow/python/ops/data_flow_ops.py 中,很早以前应该是在 tf.contrib 里面的,大概试验成熟之后移到正式的包部分了。

put()get() 这两个方法的实现分别调用了 gen_data_flow_ops.stage()gen_data_flow_ops.unstage(),然后你会发现虽然一开始是从 from tensorflow.python.ops import gen_data_flow_ops 中引入了这个包,但是源代码里面是找不到这个包的。

原因在于这里面的东西都是在 C++ 层代码中定义然后在编译过程中生成的,追到 tensorflow/core/ops/data_flow_ops.cc 中可以看到大量用 REGISTER_OP 宏注册的 op,其中就有 StagingArea 用到的 stage()unstage() 等等函数。

当然到这里为止还是没办法找到它的实现,因为 REGISTER_OP 宏只是负责 Python 与 C++ 的接口部分的处理,具体 C++ 层调用的实际内容还要再往 Kernel 里面找:tensorflow/core/kernels/stage_op.cc 。这里才是真正最底层的实现内容了,然后还能看到很多 REGISTER_KERNEL_BUILDER 宏,用于把 C++ 部分编译成的库与上面的接口绑定起来。

拆包第三篇简单记过 TensorFlow 中 Op 的创建方式,嗯,在这里用上了。


然后就发现这玩意的实现就是个双向队列的封装,没啥神奇的……╮(╯_╰)╭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
std::deque<Tuple> buf_;

void Get(Tuple* tuple) {
...
*tuple = std::move(buf_.front());
buf_.pop_front();
...
}

Status Put(Tuple* tuple) {
...
buf_.push_back(std::move(*tuple));
...
}
0%