18年的第一篇,开一个估计又是会持续超长时间的坑。
要来拆包 TensorFlow 啦。
嗯,话说这件事情前年、去年就一直在做,做完 RDMA 写完论文就扔一边了,也没再整理过。没想到之后的工作还是回到了这里,所以重新过一遍,也好好整理一下。
GDB 调试大法
扔一个当时写的简单指南:
最近的新版 TF 在 dbg 模式下编 GPU 版会有奇怪的 bug,Eigen 在做内存分配的时候有个地方会报错,Github 上也能找到相关的 issues,但是至少目前的 1.4、1.5 版本都还没解决。
于是尝试了一下曲线救国的方案,在 opt 模式下面手动在编译选项里加上“-g”,虽然还不清楚 dbg 模式具体加了什么,但是至少后面 gdb 调试的时候 label 都是有的,还算能用,于是就这么用了。
1 | $ bazel build -c opt --config=cuda --copt="-g" --cxxopt="-g" //tensorflow/tools/pip_package:build_pip_package |
在要调的 python 代码前面加上这么一段:
1 | import os |
然后 gdb -p PID
进去就好了。
另外补充一个 python 和 gdb 的交互脚本。在 python 的源码目录下面有个 Tools/gdb
目录,里面是一个 python 脚本 libpython.py
。
启动 gdb 之后:
1 | (gdb) python |
或者把libpython.py
拷出来放到 gdb 启动的地方也行,比如为了让 gdb 能直接找到代码,通常我会在 TF 的目录下面开 gdb,那就把这个脚本放过去。导进来之后,后面 gdb 里面就会多出来一堆 py-
开头的命令,除了 gdb 原有支持的查看 c/c++ 层面的信息以外,可以用这些新命令查看 python 层面的东西
TF’s Dirs
源码中主要的部分在 /tensorflow
目录下:
目录 | 说明 |
---|---|
/c | C++ API,也是一些 Python API 与 C 层的接口部分 |
/cc | |
/compiler | 即时编译的工具内容 |
/contrib | 一些额外的库,大部分由第三方添加,其中一些正式确定的内容会移出去 |
/core | TensorFlow 的核心运行时代码 |
/core/distributed_runtime | 分布式运行时代码 |
/core/framework | 运行时中相对最底层的架构部分,涉及到很多基础结构的定义、与 Protobuf 的结合部分等等 |
/core/graph | 运行时中对计算图的定义和处理 |
/core/kernels | 计算图中 Op 的核心计算部分(即 Op 的 Kernel 函数) |
/core/lib | 运行时中调用的其他库的接口? |
/core/ops | C 部分的 Op 分成两个部分,核心计算函数在前面的 /kernels 目录中,这里存的是 Op 面向上层 Python 运行时的注册部分内容。 详见 TensorFlow 拆包(三):Graph 和 Node |
/core/platform | 针对不同平台的额外内容 |
/core/profiler | 运行时的调优工具? |
/core/protobuf | Protobuf 的定义 |
/core/util | 其他的一些工具 |
/python | TensorFlow Python 部分的运行时和 API |
Session.run()
python 代码里面的 run 函数是Session
类的父类BaseSession
里面来的。
BaseSession
中对 run 这个方法有详细的说明,调用一次run是执行一遍数据流图, 在 TensorFlow 的训练代码中通常是在一个循环中多次调用sess.run()
,一次 run 即为训练 过程中的一步。 fetches 是 run 方法的一个输入参数,这个参数可以是很多种形式的数据,run 最后的 返回值也会和 fetches 有相同的结构。
TF_Run()
中间追了一大圈,最后 c++ 的入口函数是TF_Run()
。
1 | #0 tensorflow::DirectSession::Run ( |
然后再往下走到底是tensorflow::DirectSession::Run()
,其中具体做的事情是:
- 累加 Session 计数器
- 处理输入,准备线程池,根据输入输出 tensor、目标节点,从当前 Session 中已有的 executor 中找是否存在 一个相同任务的 executor,找到则将其返回,否则创建一个新的 executor
- 设置一个 FunctionCallFrame,似乎是用来处理 executor 输入输出的一个接口结构。从前面解析过的输入中提取出具体的 Tensor,封装到 FunctionCallFrame 里面去
- 创建一个 RunState,用于标记运行的状态;一个 IntraProcessRendezvous 用于本地 Tensor 数据的管理;一个 CancellationManager,用于让 Session 响应
Session::Close()
- 中间还有用于预估网络情况相关的 cost_model、性能追踪工具等等的设置
- 接下来的一段就是让 executor 配合线程池去执行运行了,具体的代码是
item.executor->RunAsync(args, barrier->Get())
- 主线程这个时候会停下来等待 executor 跑完
- 结束返回之后,检查是否有输出,然后处理输出部分,返回到上层的 python 部分
Executor
看一下 executor 的情况:
1 | struct PerPartitionExecutorsAndLib { |
ExecutorsAndKeys->items
里面的每一个元素都是一个 Executor 的衍生类。注释里面写了 Executor 的用法:
1 | // Executor runs a graph computation. |
Executor 这个类本身只有一些基础接口,比较核心的是 RunAsync 这个虚函数接口,Run 函数是对 RunAsync 的同步封装。
为了搞清楚 items 里面的 Executor 到底是什么,回到上面看一些这个东西是怎么创建的,看一下DirectSession::GetOrCreateExecutors()
这个函数:
- 首先根据输入、输出、以及参数等等情况,在 DirectSession 的记录(一个 unordered_map)里面找是不是前面有创建过一样的 Executor
- 如果没有找到那么就准备创建一个新的。
- 用
CreateGraphs()
创建出当前运行需要的图,这个地方产生的 graphs 具体是个什么东西先放着待研究,ek->items.reserve(graphs.size())
这句表明等下创建的 Executor 的个数应该是跟 graphs 里面元素的个数相同的 - 处理 GraphExecutionState、FunctionLibraryRuntime
- 然后对 graphs 里面的每一个元素,分配运算设备、Runtime、用 GraphOptimizer 优化一遍,再用
NewLocalExecutor()
创建对应的 Executor 放到 ExecutorsAndKeys 这个结构的 items 里面去 NewLocalExecutor()
创建的是 ExecutorImpl,那问题就清楚了,ExecutorsAndKeys->items
里面的每一个东西其实是个 ExecutorImpl
回到上面,看一下 ExecutorImpl 的 RunAsync 函数,又引出来一个新的结构:
1 | void ExecutorImpl::RunAsync(const Args& args, DoneCallback done) { |
RunAsync() & ScheduleReady()
每一次的ExecutorImpl::Run()
都会产生一个 ExecutorState 的封装结构,它负责追踪 node 的前驱节点的状态,当依赖满足、可以执行的时候把它调度到线程池里面执行。例如ExecutorState::AsyncState
这个结构就是相当于打包了整个运行环境的上下文信息,用于多线程执行 node,把 node 当前运行需要的资源信息全部包在这里面扔进线程池,这样等新线程跑完 node 的内容还能够继续后续工作。
具体对图的执行是这样的:
首先展开 context map,这个地方已经是对对应的 device 进行调用了,在目标设备上准备运行时的上下文
初始化 ready 队列,这里面放的 TaggedNode 就是图里面入度为 0 的 note,也就是 root_node
接下来调用
ScheduleReady()
,交给线程池开始跑。这个函数本身的描述是:1
2
3
4// Schedule all the expensive nodes in 'ready', and put all the inexpensive
// nodes in 'ready' into 'inline_ready'.
void ScheduleReady(const TaggedNodeSeq& ready,
TaggedNodeReadyQueue* inline_ready);关键是这个 expensive node 到底是怎么定义的呢?在代码里面看的不是很清楚。
ExecutorImpl::RunAsync()
调用的时候是把 root_node 放在 ready 队列里面传进来,inline_ready 给的是空指针,那么看ScheduleReady()
的实现,这部分在这里执行之后就返回了:1
2
3
4
5
6
7
8
9
if (inline_ready == nullptr) {
// Schedule to run all the ready ops in thread pool.
for (auto& tagged_node : ready) {
runner_([=]() { Process(tagged_node, scheduled_usec); });
}
return;
}后面的部分先跳过,回头再看。
runner_()
接下来考虑一下这个runner_()
是什么东西:
1 | class Executor { |
这玩意就是个套了很多层的std::function<void()>
!!!在 gdb 里面追踪起来像函数指针一样难受,只能继续翻源码看它是从哪来的。它的初始化在 ExecutorState 的构造函数里面直接完成,赋值的是上面传进来的 args.runner
,向上需要追溯到tensorflow::DirectSession::Run()
里面。
1 | # direct_session.cc:585 |
好了,所以是runner_()
就是 SchedClosure()
,删掉这部分代码里面跟安卓有关的部分是这样的:
1 | void DirectSession::SchedClosure(thread::ThreadPool* pool, |
runner_([=]() { Process(tagged_node, scheduled_usec); })
即把 Lambda 里面的 Process()
传给对应线程池的Schedule()
函数。这个函数然后又套了一层:
1 | void ThreadPool::Schedule(std::function<void()> fn) { |
impl_ 是 ThreadPool 这个类的实际对象,继承于 Eigen::ThreadPoolTempl<EigenEnvironment>
这个模版,继承的目标这部分已经不在 TensorFlow 的源码包里面了,而是来源于 Eigen 的源码,在workspace.bzl
里面可以找到 :
1 | tf_http_archive( |
这条线就先不展开了,总之是扔进线程池里面跑就是了。
Process()
接下来来看tensorflow::(anonymous namespace)::ExecutorState::Process()
,也就是线程池里面实际跑的东西的内容:
- 处理好上下文以及一些运行中需要用到的参数
- inline_ready 是需要运行的 node 队列,这里的核心过程就是一个 while 循环,不断从队列里面把 node 给 pop 出来跑
- 准备当前 node 的输入等等一系列东西
- 如果 kernel 是异步的,那么调用
device->ComputeAsync()
,否则调用device->Compute()
- 处理输出结果
ProcessOutputs()
,然后PropagateOutputs()
,这个函数很重要,除了把算完的结果沿着当前 node 的出边传给下一个 node 以外,还把接下来满足依赖关系的 node 放到 ready 队列里面 - 最后调用
tensorflow::(anonymous namespace)::ExecutorState::NodeDone()
表明当前 node 的计算完成了
NodeDone() & ScheduleReady()
NodeDone()
的最后一步执行的也是ScheduleReady()
。有意思的是,异步 kernel 跑完之后,往NodeDone()
里面传进去的 inline_ready 还是空指针,那ScheduleReady()
的执行方式就跟前面一样,往线程池里面扔进去新的Process()
任务;同步的 kernel 往NodeDone()
里面传进去的就是当前线程的 inline_ready,那么回到上面继续看这个函数的后半部分:
1 |
|
下面这个循环一开始我是没看懂的,看明白了发现这里的逻辑还是挺有意思的。检查 ready 队列中的每一个元素,如果下一个 node 不是耗时的计算任务(expensive node),那就直接加到 inline_ready 队列里面去,否则当前线程至多只做一个耗时任务,其他的任务都扔到线程池里面去交给别的线程做。
也就是说,当前线程要么把全部的不耗时任务做了,要么只做一个耗时任务。
到这里为止,运行部分的逻辑闭环就结束了。这部分的整个流程图大概是这个样子:
后面继续深入 Executor 的运行时实现往下看看: