0%

TensorFlow 拆包(七):Profiling 踩坑 & Benchmark

接上篇:

开始分析性能瓶颈了,本篇记录一下研究 TF 中自带的 Profiling 工具时遇到的几个坑点。

profiler

大概 17 年 5 月左右,/tensorflow/core/ 中新加了一个 profiler 的目录,里面是把原本在 contrib 中的 profiling 工具移过来了,大概正式 release 应该是在 1.6、1.7 里面。

关于生成 profiling 的 context 文件详见 tf.profiler 相关的内容,这里直接开始记录怎么用 tfprof 这个工具。

试了一下 pip 包里面应该是没有单独包含的,需要从源码手动编译:

1
bazel build //tensorflow/core/profiler 

然后使用也是要从 bazel-bin 的目录中打开:

1
bazel-bin/tensorflow/core/profiler/profiler --profile_path=xxxxx

profiler_ui

profiler 的 README 中,示例代码除了 profiler 以外还有个 profiler_ui,基本上是一个类似 tensorboard 的网页前端,方便调用后端的 profiler 进行可视化查看用的。

这里虽然写着暂未开源,但是在 TensorFlow 的 github 总目录里面可以找到一个叫 profiler-ui 的项目,就是那个未完善开源的 ui 版了。

看了下,安装需要用到 go 以及 Google 自家的 pprof 工具,可能是因为耦合的其他部件比较多,所以暂时还没有并入 TF 的主代码中去。不过这里的 Installation 已经足够我们自己装上了。

装 pprof 的时候会有个坑点,CentOS 库中可以找到 gperftools 这个工具,也是 Google 提供的,yum 装上之后可执行文件的名字也叫 pprof !!但是跟这里用到的 pprof 不是一个玩意!!

之后按照示例上的说明:

1
python ui.py --profile_context_path=xxxx

即可启用。

在我尝试使用它的时候,距离这个库上一次 git 的更新已经过去 1 个月左右了,不知道是 python 版本还是什么原因,直接运行可能会遇到找不到 server 的路径等等的 bug,直接在 ui.py 里面稍微改一下就好。

Profiling

运行 TF 时保存出来的 profiling 文件包含了大量信息,主要有几个方面:

  • scope:应该是 python 层代码中用 tf.name_scope() 包起来的视图
  • graph:TensorFlow 计算图的视图
  • op:把 TensorFlow 计算图再细化一层
  • code:Python 代码视图

默认会按列表把所选的视图中的一些信息给输出出来,另外用-output 选项可以指定输出成另外的格式:

1
2
3
4
5
6
7
8
9
10
tfprof>
xxx xxx -output timeline:outfile=xxxxx
# 把结果输出成 chrome 用的时间线 trace 文件,可以在 chrome 地址栏中输入 chrome://tracing 打开
# 只支持 graph、scope、code 这 3 种视图
xxx xxx -output pprof:outfile=xxxxx
# 把结果输出成 pprof 用的可视化文件(所以前面装 pprof 就是为了这个)
# 只支持 code 这种视图
# --------------------------------------------------------------------------
# pprof 可视化文件之后可以用 pprof 来变成图片(猜测大概是类似 GraphViz 的数据结构)
pprof -svg --nodecount=10000 --sample_index=1 xxxxxx.prof > xxxxxx.svg

profiler_ui 打开时的第一个页面就是 graph 视图生成的 timeline:

其中包含了计算图中每个 node 在卡上的情况,运行时间、数据流动依赖关系等等。(话说显示的太复杂了,事实上我觉得还是很难看)

然后默认的 scope 视图以及 code 视图得到的 timeline 我也感觉并没有什么用。

code 视图输出成的 pprof 图片倒是还可以看一下,但是感觉用处也不大

所以最后感觉还是不知道该怎么用好这套 profiling 工具

Options

在 tfprof 界面直接回车可以看到默认的选项,然后这里面的内容都是可以改的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
tfprof>                             
-max_depth 10
-min_bytes 0
-min_peak_bytes 0
-min_residual_bytes 0
-min_output_bytes 0
-min_micros 0
-min_accelerator_micros 0
-min_cpu_micros 0
-min_params 0
-min_float_ops 0
-min_occurrence 0
-step -1
-order_by name
-account_type_regexes .*
-start_name_regexes .*
-trim_name_regexes
-show_name_regexes .*
-hide_name_regexes
-account_displayed_op_only false
-select micros
-output stdout:

稍微挑几个写一下:

-max_depth:指定显示前多少个 node(配合下面的 -order_by ?)

-step:profiling 记录的文件可能包含了很多个 step,用这个选项来指定当前分析哪个 step 的信息,默认 -1 是对所有 step 做平均

-order_by:打出来列表的时候,按照什么来排序:

  • name:node 的名称
  • depth:node 在节点树中的深度
  • bytes:占用的内存数
  • peak_bytes:占用的峰值内存数
  • residual_bytes:计算完成之后,还剩下不释放的内存数
  • output_bytes:输出的大小
  • micros:node 计算所花费的时间
  • accelerator_micros:node 计算所花费的加速卡时间(区别于 CPU 的其他设备)
  • cpu_micros:node 计算所花费的 CPU 时间
  • params:node 中包含的参数量
  • float_ops:node 所需要的浮点运算次数
  • occurrence:node 在图中出现的次数

-account_type_regexes:筛选出类型里面带有某些前缀的 node 有多少个

-start_name_regexes:筛选出名字中带某些前缀的 node

-trim_name_regexes:隐藏掉名字中带某些前缀的 node

-show_name_regexes:筛选出名字中带某些字符的 node

-hide_name_regexes:隐藏掉名字中带某些字符的 node

-select:选择视图中的哪些内容(有点像从数据库里面找东西的感觉),输出 timeline 的时候配合这个应该能够得到不同的数据:

  • bytes:占用的内存数
  • peak_bytes:占用的峰值内存数
  • residual_bytes:计算完成之后,还剩下不释放的内存数
  • output_bytes:输出的大小
  • micros:计算所花费的时间
  • accelerator_micros:计算所花费的加速卡时间
  • cpu_micros:计算所花费的 CPU 时间
  • params: 参数量
  • float_ops:浮点运算次数
  • occurrence:在计算图中出现的次数
  • tensor_value:tensor 数据的值(估计需要配合 checkpoint 用)
  • device:op 放在哪个设备上
  • op_types:op 类型
  • input_shapes:输入的形状

Trace

抛开上面那个目前还没有正式 Release 的 Profiling 接口不说,实际可以用来做分析的是一套生成 trace_file 的 API。

用法也很简单:

1
2
3
4
5
6
7
8
9
10
11
options = tf.RunOptions(trace_level=tf.RunOptions.FULL_TRACE)
run_metadata = tf.RunMetadata()

_ = sess.run(optimizer, options=options, run_metadata=run_metadata)

fetched_timeline = timeline.Timeline(run_metadata.step_stats)
chrome_trace = fetched_timeline.generate_chrome_trace_format()

with open(FLAGS.trace_file, 'w') as f:
f.write(chrome_trace)
print('Chrome Trace File write in %s' % FLAGS.trace_file)

在 RunOptions 中设置好追踪的级别,然后作为参数一起参与 Session.run(),最后记录得到的每个 step 的追踪数据通过 run_metadata 的结构返回出来。通过对追踪结果的解析即可生成我们可以理解的图形数据了,这个用的是 chrome 支持的 json 格式,在 chrome 地址栏中输入 chrome://tracing/ 即可很方便地查看,timeline 最后出来的效果跟上面的是一致的。

应该说,前面这个 Profiling 的 API 应该底层封装的也是这套机制。

在 DirectSession 中可以非常容易地找到与 trace_level 相关的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  std::unique_ptr<DeviceTracer> tracer;
if (run_options.trace_level() >= RunOptions::HARDWARE_TRACE) {
tracer = CreateDeviceTracer();
// tracer may be NULL on platforms without accelerators.
if (tracer) {
Status s = tracer->Start();
if (!s.ok()) {
run_state.executors_done.Notify();
delete barrier;
return s;
}
}
}
...
if (tracer) {
TF_RETURN_IF_ERROR(tracer->Stop());
TF_RETURN_IF_ERROR(tracer->Collect(run_state.collector.get()));
}

其中 DeviceTracer 是一个预留给多种设备来方便进行性能分析的接口,可惜的是目前里面的实现只有 GPU 的,需要依靠 CUDA 提供的 CUPTI 库。所以大概追踪过程中得到的与 CPU 相关的信息应该也是 CUPTI 附带的,如果是纯 CPU 版本的 TensorFlow,CreateDeviceTracer() 直接返回的是一个空指针。

Distributed

由于分布式环境下的 Session 的执行模式与单机情况下有所不同,因而分布式下运行 trace 的工作方式也会有所区别。

MasterSession 中首次执行 PartialRun 时会初始化 PerStepState:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// If this is the first partial run, initialize the PerStepState.
if (!run_state->step_started) {
run_state->step_started = true;
PerStepState pss;

const auto count = run_state->count;
pss.collect_timeline =
req.options().trace_level() == RunOptions::FULL_TRACE;
pss.collect_rpcs = req.options().trace_level() == RunOptions::FULL_TRACE;
pss.report_tensor_allocations_upon_oom =
req.options().report_tensor_allocations_upon_oom();

// Build the cost model every 'build_cost_model_every' steps after skipping
// an
// initial 'build_cost_model_after' steps.
const int64 build_cost_model_after =
session_opts_.config.graph_options().build_cost_model_after();
const int64 build_cost_model_every =
session_opts_.config.graph_options().build_cost_model();
pss.collect_costs =
build_cost_model_every > 0 &&
((count + 1 - build_cost_model_after) % build_cost_model_every == 0);
pss.collect_partition_graphs = req.options().output_partition_graphs();

std::unique_ptr<ProfileHandler> ph = run_state->rcg->GetProfileHandler(
run_state->step_id, count, req.options());
if (ph) {
pss.collect_timeline = true;
pss.collect_rpcs = ph->should_collect_rpcs();
}

run_state->pss = std::move(pss);
run_state->ph = std::move(ph);
}

这里会根据 trace_level 的值来设置一些标记。

下一步,pss 中的内容又会被写到 exec_opts 结构中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Collect execution cost stats on a smoothly decreasing frequency.
ExecutorOpts exec_opts;
if (pss->report_tensor_allocations_upon_oom) {
exec_opts.set_report_tensor_allocations_upon_oom(true);
}
if (pss->collect_costs) {
exec_opts.set_record_costs(true);
}
if (pss->collect_timeline) {
exec_opts.set_record_timeline(true);
}
if (pss->collect_rpcs) {
SetRPCLogging(true);
}
if (pss->collect_partition_graphs) {
exec_opts.set_record_partition_graphs(true);
}
if (pss->collect_costs || pss->collect_timeline) {
pss->step_stats.resize(partitions_.size());
}

这个结构会被封装在 rpc 的 call 中发送给 WorkerService 来处理。

在 Worker 的运行结构中,可以看到这样的代码:

1
2
3
4
5
6
7
StepStatsCollector* collector = nullptr;
if (request->exec_opts().report_tensor_allocations_upon_oom() ||
request->exec_opts().record_timeline() ||
request->exec_opts().record_costs()) {
collector = new StepStatsCollector(response->mutable_step_stats());
// TODO(mrry,pbar): GPU tracing for distributed steps.
}

好吧,StepStatsCollector 都已经创建了,但是可惜后续具体的 GPU tracing 部分还没有往里面完善。

Benchmark

TensorFlow 官方的 Performance 页Benchmarks 页 中给出了官方测性能用的 benchmark 脚本:

基本上把目前 TF 里最高效的 API 都用上了,并且包含了各种常见的多机多卡方案,很值得作为高效的样例脚本来参照。

有个问题是这个库目前没有定期 release,各种更新全都合并到 master 分支里面去了,然后随着 TF 版本的不断更新,它的 master 分支是跟着 TF 的 master 分支走的。

因此要想正常跑最新的 benchmarks,就需要装 tf-nightly-gpu 包或者源码编译一个比较新的 TensorFlow 分支。

简单分析一下这份脚本的结构。

tf_cnn_benchmarks.py 这个入口进去之后,核心的执行流程在 benchmark_cnn.py 中。

只测试前向走 BenchmarkCNN._eval_cnn(),测试训练全过程走 BenchmarkCNN._benchmark_cnn()

前面的 FLAG 解析什么的直接略过,从训练部分开始看。

_benchmark_cnn

首先构建计算图:(image_producer_ops, enqueue_ops, fetches) = self._build_model()

image_producer_ops 是处理输入数据的部分,enqueue_ops 涉及到计算图中的流水线队列,最后的 fetches 是等一下 sess.run() 中的目标 op。

设置 tf.summary 以及 tf.train.Saver 等等,Saver 中传入的是 variable_mgr.savable_variables()

创建 tf.train.Supervisor 时同时完成变量初始化,初始化 op 组包含:

  • tf.local_variables_initializer 初始化本地变量
  • tf.tables_initializer 初始化用到的各种表(哈希表等等)
  • 本地变量初始化之后,执行variable_mgr.get_post_init_ops() 完成自定义的一些初始化执行动作,这个部分要根据不同的参数维护算法来定
  • 如果有同步用的队列 barrier,也一起在这里完成初始化

之后 sv.managed_session 开始真正的执行循环:

1
2
3
4
5
6
7
8
9
with sv.managed_session(...) as sess:
如果使用的是真实数据,则往 enqueue_ops 中插入提取数据到队列的 op
初始化 global_step
创建一个 global_step_watcher,在新线程中监控 global_step 的变动情况
while not done_fn(): 这个函数与 global_step_watcher 相关,用于控制训练循环什么时候结束
...
benchmark_one_step() 训练一个 step
...
后续再处理一些收尾内容

_build_model

回到第一步看一下计算图的构建部分。

(image_producer_ops, image_producer_stages) = self._build_image_processing(shift_ratio=0) 创建输入数据。

1
2
3
4
对于当前设备上的每一块 GPU,variable_mgr.create_outer_variable_scope() 创建命名域:
add_forward_pass_and_gradients() 添加网络的前向部分,并且计算得到梯度
根据当前的任务是训练还是预测,处理准备网络中需要返回的内容
从计算图中提取出 Batch Normalization 的更新部分,添加到 0 号卡的更新部分中,BatchNorm 只需要一块卡来计算

如果图中用了 staging_area 的数据组织方式,这里另外再添加一下,扩充 enqueue_ops。

fetches = self._build_fetches() 最终收集前面所有的信息,构建出等一下需要传入 sess.run() 中去的目标

完成前面的内容后,把 image_producer_ops,enqueue_ops,fetches 三部分内容返回给上一层的函数。

add_forward_pass_and_gradients

创建随机数据作为输入,或者处理传入的数据产生器。

logits, aux_logits = self.model.build_network() 构建完整的前向网络。

添加输出结果以及计算 loss 误差。

variable_mgr.trainable_variables_on_device() 获取当前 GPU 上所有的可训练参数。

如果当前是最后一块 GPU 卡,那么再额外计算 L2_loss,添加到前面的 loss 中去,L2_loss 只需要计算一次。

grads = tf.gradients(scaled_loss, params, aggregation_method=aggmeth) 根据前面收集的当前 GPU 上的可训练参数信息构建反向的梯度计算图,返回得到的是图中所有的梯度。

接下来再获取一次 variable_mgr.trainable_variables_on_device() ,然后把得到的参数与前面的梯度打包在一起返回回去,准备接下来的参数更新。需要注意的是,第一次调用 trainable_variables_on_device 时传入了一个 writable=False 的参数,这里传入的是 writable=True,在某些特别的多卡参数管理算法中,用于梯度计算和最终梯度更新写回的目标是不一样的。

所有前面的这些都封装在一个 results 的 dict 中返回回去。

_build_fetches

这里算是计算图构建的收尾部分了,传入的内容是所有 GPU 上计算图的合集。

variable_mgr.preprocess_device_grads() 预处理出需要在哪些设备上执行梯度更新操作。

1
2
3
4
5
6
对于梯度更新设备中的每一块 GPU:
tf.reduce_mean() 计算前面所有卡上梯度的平均值
variable_mgr.get_gradients_to_apply() 获取有哪些梯度是要在当前设备上更新的
get_learning_rate() 计算学习率
get_optimizer() 获取梯度更新用的 Optimizer
variable_mgr.append_apply_gradients_ops() 应用 Optimizer 进行梯度更新

把前面所有的东西打包在 fetches 这个 dict 中返回回去。

VariableMgr

可以看到上面有很多核心的操作都是通过 variable_mgr 结构完成的,这套脚本定义了一个 VariableMgr 类,想要自己修改参数管理、更新的算法只需要重写这里面的一些函数即可。

前面出现过的比较有用的几个接口函数:

  • def create_outer_variable_scope(self, device_num)

封装变量命名域,主要用于维护变量创建时要做的事情,一般情况下直接返回一个普通的 tf.variable_scope,需要对变量创建进行额外操作的话需要自己构造一个 custom_getter 作为参数传入tf.variable_scope

  • def preprocess_device_grads(self, device_grads)

预处理出需要做梯度更新操作的设备,以及对应设备上的梯度和参数。

  • def get_gradients_to_apply(self, device_num, gradient_state)

与上一个函数对应使用,用于找出每个设备需要处理哪些参数更新任务。

  • def append_apply_gradients_ops(self, gradient_state, opt, grads, training_ops, loss_scale_params)

在设备上针对每一对需要更新的变量及其梯度,应用 apply_gradients 操作。

  • def get_post_init_ops(self)

用于额外附加一些在所有变量完成初始化之后,开始训练之前,需要执行的操作。

  • def get_devices(self)

返回当前节点中可用的 GPU 列表,在某些 PS-WORKER 的实现方式中,返回的是 tf.replica_device_setter 的封装。

  • def savable_variables(self)

返回哪些变量是需要被 tf.Saver 保存进检查点的。

  • def trainable_variables_on_device(self, rel_device_num, abs_device_num, writable=False)

返回当前设备上的可训练参数(即能计算梯度,可以进行反向更新的参数)。输入的两个 device_num 分别是 GPU 在当前节点中的编号以及在全局环境中的编号。

writable 用于标识需要被写回更新的参数,在有些情况下图中可能存在多份参数备份,writable 为 False 时返回的是图中用于求梯度以及构建反向数据通路用的参数,为 True 时返回的是等一下 apply_gradients 需要应用梯度更新操作的参数。

replica_device_setter & variable_scope-custom_getter

前面建图时用到的两个很重要的接口,用于额外处理 op 在设备上的分配操作。

benchmarks 脚本中的用法大概是这样:

1
2
3
4
5
6
for device_num in range(len(self.devices)):
with self.variable_mgr.create_outer_variable_scope(device_num):
# ...
with tf.device(self.devices[rel_device_num]):
# self.devices[] 里面是实现设好的 tf.replica_device_setter
#...build_network

对节点中的每一块 GPU 卡,首先套上一个 variable_scope(里面可能会使用到 custom_getter),在构建 op 时再套一层 replica_device_setter。

tf.replica_device_setter 需要配合 tf.device 使用,作用范围是其 python 作用域以内的所有 op,这个函数简单地说就是对传入的 op 进行判断,如果是计算型的 op 就正常分配在运算设备上,如果是需要在 PS-WORKER 之间共享的参数型 op 则需要在参数服务器上。它的返回值是需要分配给的 device 的名字,所以直接用 tf.device 指定即可。

1
2
3
def replica_device_setter(ps_tasks=0, ps_device="/job:ps",
worker_device="/job:worker", merge_devices=True,
cluster=None, ps_ops=None, ps_strategy=None)

具体的源码实现上,主要是对新创建 op 的类型进行判断,如果在 ps_ops 包含的范围内(为 None 时会用一个 STANDARD_PS_OPS 来作为检查范围)则用某种 ps 分配策略放到参数服务器上,否则放到默认的计算设备上。

默认的 ps_strategy 不指定的话就是用的 round-robin,简单地说就是按顺序依次分。

tf.variable_scope 中 custom_getter 的作用范围就只限于作用域以内所有的 tf.get_variable 调用了(注意,必须是 tf.get_variable,这个对 tf.variable 是无效的)。前面 replica_device_setter 只是指定了参数存放的位置,这里则可以对参数创建进行更多的改动。

例如 StagedVariableGetter 做的事情就是把变量封装上一层 StagingArea,计算图中需要读取变量的时候返回一个 StagingArea.get,对于 apply_gradient 这种需要修改变量本身的操作,则返回参数本体(也就是前面看到的 writable 这个参数起作用的方式)。

VariableMgr instances

官方的 Benchmark 脚本中提供了 8 种内置的 VariableMgr 实例。

VariableMgrIndependent

不同卡之间完全不作数据交互,单纯用来测单机多卡的理论计算速度用。

不需要封装 custom_getter 和 replica_device_setter。

VariableMgrLocalFetchFromPS

多卡中的参数统一存储,不同卡在计算时直接从统一的 PS 中读取需要的数据。

不需要封装 custom_getter。

get_device 这里:

1
2
3
4
5
6
7
8
9
10
11
12
if self.benchmark_cnn.local_parameter_device_flag == 'gpu':
return [
variable_mgr_util.ParamServerDeviceSetter(d, raw_devices)
for d in raw_devices
]
else:
return [
tf.train.replica_device_setter(
worker_device=d,
ps_device=self.benchmark_cnn.param_server_device,
ps_tasks=1) for d in raw_devices
]

如果参数存放在 CPU 上,直接对每个 GPU 设备返回一个指定好 ps_device 的 replica_device_setter。

如果选择参数存放在 GPU 上,这里的做法是将所有参数均衡负载平分在各块卡上。

VariableMgrLocalFetchFromStagedPS

多卡中的参数统一存储,相比之前的增加了 StagingArea 的流水线操作。

custom_getter 中为每个变量额外创建了一个 StagingArea,计算图中需要读取变量的时候返回对应的 StagingArea.get。

trainable_variables_on_device 中 writable 为 True 时,返回变量本体,否则返回对应的 StagingArea.get。

其他部分与上一种方式相同。

VariableMgrLocalReplicated

每块卡上的计算图完全独立,各卡都是自己存储自己的参数,梯度更新的时候再采用某种 Allreduce 的算法对各卡上的参数做统一规约。

get_post_init_ops 在初始化完成后拷贝 GPU0 上的参数到其他卡上覆盖掉,保证所有卡的初始参数一致。

不需要封装 custom_getter 和 replica_device_setter。

preprocess_device_grads 中返回的梯度是调用某种规约算法去综合所有卡上的梯度值,之后再跟本地的参数一起交给 apply_gradient 去更新即可。

因此这里的计算流程是,初始所有卡上参数一致,训练完一步之后规约梯度,规约完成后所有卡上得到的梯度也都一致了,再 apply 更新到本地的卡上,这样下一步开始时所有卡上的参数仍然是一致的。

VariableMgrDistributedAllReduce

用于分布式。

这是脚本中唯一一种需要用到 single_session 的模式,基本上跟 Replicated 的方式一致,每块卡上都独立存数据,更新时全局规约,特殊点在于这种方式只需要由一个 python 进程来启动,所有 worker 上的图构造等等都是由一个 controller 的角色完成,其他所有的 worker 都像平时的 ps 一样 join_server 即可。

大体实现上跟上一种一致

VariableMgrDistributedFetchFromPS

用于分布式。

大体实现跟单节点的 FetchFromPS 一致。

custom_getter 使用了 OverrideCachingDevice,虽然由 replica_device_setter 指定好了所有参数都保存在 ps 上,但是在 worker 还可以做一次数据缓存。caching_device 这个参数与 tf.get_variable() 中的参数对应,即 worker 端的多块卡从远程 ps 获取数据只在第一块卡拉取数据时通过网络取一次,后续的几次直接从缓存中读取。缓存数据的分配方案跟单节点 ps 时 CPU/GPU 上存储参数的方案类似。

VariableMgrDistributedFetchFromStagedPS

用于分布式。

在上一种的基础上加上了 StagingArea。

VariableMgrDistributedReplicated

用于分布式。

计算流程其实跟 DistributedAllReduce 是一致的,大体上跟前面类似实现相一致。