0%

TensorFlow 拆包(一):Session.Run()

18年的第一篇,开一个估计又是会持续超长时间的坑。

要来拆包 TensorFlow 啦。

嗯,话说这件事情前年、去年就一直在做,做完 RDMA 写完论文就扔一边了,也没再整理过。没想到之后的工作还是回到了这里,所以重新过一遍,也好好整理一下。

GDB 调试大法

扔一个当时写的简单指南:

最近的新版 TF 在 dbg 模式下编 GPU 版会有奇怪的 bug,Eigen 在做内存分配的时候有个地方会报错,Github 上也能找到相关的 issues,但是至少目前的 1.4、1.5 版本都还没解决。

于是尝试了一下曲线救国的方案,在 opt 模式下面手动在编译选项里加上“-g”,虽然还不清楚 dbg 模式具体加了什么,但是至少后面 gdb 调试的时候 label 都是有的,还算能用,于是就这么用了。

1
$ bazel build -c opt --config=cuda --copt="-g" --cxxopt="-g" //tensorflow/tools/pip_package:build_pip_package

在要调的 python 代码前面加上这么一段:

1
2
3
4
5
import os
PID = os.getpid()
print('Program pid:', PID)
print('Pause here to enter DBG')
os.system("read")

然后 gdb -p PID进去就好了。

另外补充一个 python 和 gdb 的交互脚本。在 python 的源码目录下面有个 Tools/gdb目录,里面是一个 python 脚本 libpython.py

启动 gdb 之后:

1
2
3
4
5
6
(gdb) python
>import sys
>sys.path.append('/path/to/libpython.py')
>import libpython
>end
(gdb) ...

或者把libpython.py拷出来放到 gdb 启动的地方也行,比如为了让 gdb 能直接找到代码,通常我会在 TF 的目录下面开 gdb,那就把这个脚本放过去。导进来之后,后面 gdb 里面就会多出来一堆 py-开头的命令,除了 gdb 原有支持的查看 c/c++ 层面的信息以外,可以用这些新命令查看 python 层面的东西

TF’s Dirs

源码中主要的部分在 /tensorflow目录下:

目录 说明
/c C++ API,也是一些 Python API 与 C 层的接口部分
/cc
/compiler 即时编译的工具内容
/contrib 一些额外的库,大部分由第三方添加,其中一些正式确定的内容会移出去
/core TensorFlow 的核心运行时代码
/core/distributed_runtime 分布式运行时代码
/core/framework 运行时中相对最底层的架构部分,涉及到很多基础结构的定义、与 Protobuf 的结合部分等等
/core/graph 运行时中对计算图的定义和处理
/core/kernels 计算图中 Op 的核心计算部分(即 Op 的 Kernel 函数)
/core/lib 运行时中调用的其他库的接口?
/core/ops C 部分的 Op 分成两个部分,核心计算函数在前面的 /kernels 目录中,这里存的是 Op 面向上层 Python 运行时的注册部分内容。
详见 TensorFlow 拆包(三):Graph 和 Node
/core/platform 针对不同平台的额外内容
/core/profiler 运行时的调优工具?
/core/protobuf Protobuf 的定义
/core/util 其他的一些工具
/python TensorFlow Python 部分的运行时和 API

Session.run()

python 代码里面的 run 函数是Session 类的父类BaseSession里面来的。

BaseSession中对 run 这个方法有详细的说明,调用一次run是执行一遍数据流图, 在 TensorFlow 的训练代码中通常是在一个循环中多次调用sess.run(),一次 run 即为训练 过程中的一步。 fetches 是 run 方法的一个输入参数,这个参数可以是很多种形式的数据,run 最后的 返回值也会和 fetches 有相同的结构。

TF_Run()

中间追了一大圈,最后 c++ 的入口函数是TF_Run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#0  tensorflow::DirectSession::Run (
this=0x7fb03026c390, run_options=..., inputs=std::vector of length 0, capacity 0,
output_names=std::vector of length 0, capacity 0,
target_nodes=std::vector of length 1, capacity 1 = {...},
outputs=0x7fff23b07e50, run_metadata=0x7fff23b07ea0)
at tensorflow/core/common_runtime/direct_session.cc:437
#1 0x00007fb1130a469a in TF_Run_Helper (
session=0x7fb03026c390, handle=handle@entry=0x0,
run_options=run_options@entry=0x0,
input_pairs=std::vector of length 0, capacity 0,
output_tensor_names=std::vector of length 0, capacity 0,
c_outputs=c_outputs@entry=0x7fff23b08220,
target_oper_names=std::vector of length 1, capacity 1 = {...},
run_metadata=run_metadata@entry=0x0, status=status@entry=0x7fb030307f60)
at tensorflow/c/c_api.cc:698
#2 0x00007fb1130a49d4 in TF_Run (
s=0x7fb03026bdd0, run_options=0x0, c_input_names=<optimized out>,
c_inputs=0x7fff23b081d0, ninputs=<optimized out>, c_output_names=<optimized out>,
c_outputs=0x7fff23b08220, noutputs=0, c_target_oper_names=0x7fff23b084d0, ntargets=1,
run_metadata=0x0, status=0x7fb030307f60)
at tensorflow/c/c_api.cc:753

然后再往下走到底是tensorflow::DirectSession::Run(),其中具体做的事情是:

  • 累加 Session 计数器
  • 处理输入,准备线程池,根据输入输出 tensor、目标节点,从当前 Session 中已有的 executor 中找是否存在 一个相同任务的 executor,找到则将其返回,否则创建一个新的 executor
  • 设置一个 FunctionCallFrame,似乎是用来处理 executor 输入输出的一个接口结构。从前面解析过的输入中提取出具体的 Tensor,封装到 FunctionCallFrame 里面去
  • 创建一个 RunState,用于标记运行的状态;一个 IntraProcessRendezvous 用于本地 Tensor 数据的管理;一个 CancellationManager,用于让 Session 响应 Session::Close()
  • 中间还有用于预估网络情况相关的 cost_model、性能追踪工具等等的设置
  • 接下来的一段就是让 executor 配合线程池去执行运行了,具体的代码是item.executor->RunAsync(args, barrier->Get())
  • 主线程这个时候会停下来等待 executor 跑完
  • 结束返回之后,检查是否有输出,然后处理输出部分,返回到上层的 python 部分

Executor

看一下 executor 的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
struct PerPartitionExecutorsAndLib {
Graph* graph = nullptr; // not owned.
Device* device = nullptr; // not owned.
FunctionLibraryRuntime* flib = nullptr; // not owned.
std::unique_ptr<Executor> executor;
};

struct ExecutorsAndKeys {
ExecutorsAndKeys() : step_count(0) {}

std::atomic_int_fast64_t step_count;
std::unique_ptr<Graph> graph;
NameNodeMap name_to_node;
std::unique_ptr<FunctionLibraryDefinition> flib_def;
std::unique_ptr<ProcessFunctionLibraryRuntime> proc_flr;
std::vector<PerPartitionExecutorsAndLib> items;
std::unordered_map<string, size_t> input_name_to_index;
std::unordered_map<string, string> input_name_to_rendezvous_key;
std::unordered_map<string, size_t> output_name_to_index;
std::unordered_map<string, string> output_name_to_rendezvous_key;

DataTypeVector input_types;
DataTypeVector output_types;
};

ExecutorsAndKeys->items里面的每一个元素都是一个 Executor 的衍生类。注释里面写了 Executor 的用法:

1
2
3
4
5
6
7
8
9
10
11
12
13
// Executor runs a graph computation.
// Example:
// Graph* graph = ...;
// ... construct graph ...
// Executor* executor;
// TF_CHECK_OK(NewSimpleExecutor(my_device, graph, &executor));
// Rendezvous* rendezvous = NewNaiveRendezvous();
// TF_CHECK_OK(rendezvous->Send("input", some_input_tensor));
// TF_CHECK_OK(executor->Run({ExecutorOpts, rendezvous, nullptr}));
// TF_CHECK_OK(rendezvous->Recv("output", &output_tensor));
// ... ...
//
// Multiple threads can call Executor::Run concurrently.

Executor 这个类本身只有一些基础接口,比较核心的是 RunAsync 这个虚函数接口,Run 函数是对 RunAsync 的同步封装。

为了搞清楚 items 里面的 Executor 到底是什么,回到上面看一些这个东西是怎么创建的,看一下DirectSession::GetOrCreateExecutors()这个函数:

  • 首先根据输入、输出、以及参数等等情况,在 DirectSession 的记录(一个 unordered_map)里面找是不是前面有创建过一样的 Executor
  • 如果没有找到那么就准备创建一个新的。
  • CreateGraphs()创建出当前运行需要的图,这个地方产生的 graphs 具体是个什么东西先放着待研究,ek->items.reserve(graphs.size())这句表明等下创建的 Executor 的个数应该是跟 graphs 里面元素的个数相同的
  • 处理 GraphExecutionState、FunctionLibraryRuntime
  • 然后对 graphs 里面的每一个元素,分配运算设备、Runtime、用 GraphOptimizer 优化一遍,再用 NewLocalExecutor()创建对应的 Executor 放到 ExecutorsAndKeys 这个结构的 items 里面去
  • NewLocalExecutor()创建的是 ExecutorImpl,那问题就清楚了,ExecutorsAndKeys->items里面的每一个东西其实是个 ExecutorImpl

回到上面,看一下 ExecutorImpl 的 RunAsync 函数,又引出来一个新的结构:

1
2
3
void ExecutorImpl::RunAsync(const Args& args, DoneCallback done) {
(new ExecutorState(args, this))->RunAsync(std::move(done));
}

RunAsync() & ScheduleReady()

每一次的ExecutorImpl::Run()都会产生一个 ExecutorState 的封装结构,它负责追踪 node 的前驱节点的状态,当依赖满足、可以执行的时候把它调度到线程池里面执行。例如ExecutorState::AsyncState这个结构就是相当于打包了整个运行环境的上下文信息,用于多线程执行 node,把 node 当前运行需要的资源信息全部包在这里面扔进线程池,这样等新线程跑完 node 的内容还能够继续后续工作。

具体对图的执行是这样的:

  • 首先展开 context map,这个地方已经是对对应的 device 进行调用了,在目标设备上准备运行时的上下文

  • 初始化 ready 队列,这里面放的 TaggedNode 就是图里面入度为 0 的 note,也就是 root_node

  • 接下来调用ScheduleReady(),交给线程池开始跑。这个函数本身的描述是:

    1
    2
    3
    4
    // Schedule all the expensive nodes in 'ready', and put all the inexpensive
    // nodes in 'ready' into 'inline_ready'.
    void ScheduleReady(const TaggedNodeSeq& ready,
    TaggedNodeReadyQueue* inline_ready);

    关键是这个 expensive node 到底是怎么定义的呢?在代码里面看的不是很清楚。ExecutorImpl::RunAsync()调用的时候是把 root_node 放在 ready 队列里面传进来,inline_ready 给的是空指针,那么看 ScheduleReady()的实现,这部分在这里执行之后就返回了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # executor.cc:2085

    if (inline_ready == nullptr) {
    // Schedule to run all the ready ops in thread pool.
    for (auto& tagged_node : ready) {
    runner_([=]() { Process(tagged_node, scheduled_usec); });
    }
    return;
    }

    后面的部分先跳过,回头再看。

runner_()

接下来考虑一下这个runner_()是什么东西:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Executor {
...
struct Args {
...
typedef std::function<void()> Closure;
typedef std::function<void(Closure)> Runner;
...
}
...
}

class ExecutorState {
...
Executor::Args::Runner runner_;
...
}

这玩意就是个套了很多层的std::function<void()>!!!在 gdb 里面追踪起来像函数指针一样难受,只能继续翻源码看它是从哪来的。它的初始化在 ExecutorState 的构造函数里面直接完成,赋值的是上面传进来的 args.runner,向上需要追溯到tensorflow::DirectSession::Run()里面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# direct_session.cc:585

Executor::Args::Runner default_runner = [this,
pool](Executor::Args::Closure c) {
SchedClosure(pool, std::move(c));
};
for (const auto& item : executors_and_keys->items) {
// TODO(zhengxq): support partial run.
// TODO(zhengxq): if the device picks its own threadpool, we need to assign
// less threads to the main compute pool by default.
thread::ThreadPool* device_thread_pool =
item.device->tensorflow_device_thread_pool();
if (!device_thread_pool) {
args.runner = default_runner;
} else {
args.runner = [this, device_thread_pool](Executor::Args::Closure c) {
SchedClosure(device_thread_pool, std::move(c));
};
}
item.executor->RunAsync(args, barrier->Get());
}

好了,所以是runner_()就是 SchedClosure(),删掉这部分代码里面跟安卓有关的部分是这样的:

1
2
3
4
void DirectSession::SchedClosure(thread::ThreadPool* pool,
std::function<void()> c) {
pool->Schedule(std::move(c));
}

runner_([=]() { Process(tagged_node, scheduled_usec); })即把 Lambda 里面的 Process() 传给对应线程池的Schedule() 函数。这个函数然后又套了一层:

1
2
3
4
void ThreadPool::Schedule(std::function<void()> fn) {
CHECK(fn != nullptr);
impl_->Schedule(std::move(fn));
}

impl_ 是 ThreadPool 这个类的实际对象,继承于 Eigen::ThreadPoolTempl<EigenEnvironment>这个模版,继承的目标这部分已经不在 TensorFlow 的源码包里面了,而是来源于 Eigen 的源码,在workspace.bzl里面可以找到 :

1
2
3
4
5
6
7
8
9
10
tf_http_archive(
name = "eigen_archive",
urls = [
"https://mirror.bazel.build/bitbucket.org/eigen/eigen/get/c2947c341c68.tar.gz",
"https://bitbucket.org/eigen/eigen/get/c2947c341c68.tar.gz",
],
sha256 = "f21f8ab8a8dbcb91cd0deeade19a043f47708d0da7a4000164cdf203b4a71e34",
strip_prefix = "eigen-eigen-c2947c341c68",
build_file = str(Label("//third_party:eigen.BUILD")),
)

这条线就先不展开了,总之是扔进线程池里面跑就是了。

Process()

接下来来看tensorflow::(anonymous namespace)::ExecutorState::Process(),也就是线程池里面实际跑的东西的内容:

  • 处理好上下文以及一些运行中需要用到的参数
  • inline_ready 是需要运行的 node 队列,这里的核心过程就是一个 while 循环,不断从队列里面把 node 给 pop 出来跑
  • 准备当前 node 的输入等等一系列东西
  • 如果 kernel 是异步的,那么调用 device->ComputeAsync(),否则调用device->Compute()
  • 处理输出结果ProcessOutputs(),然后PropagateOutputs(),这个函数很重要,除了把算完的结果沿着当前 node 的出边传给下一个 node 以外,还把接下来满足依赖关系的 node 放到 ready 队列里面
  • 最后调用tensorflow::(anonymous namespace)::ExecutorState::NodeDone()表明当前 node 的计算完成了

NodeDone() & ScheduleReady()

NodeDone() 的最后一步执行的也是ScheduleReady()。有意思的是,异步 kernel 跑完之后,往NodeDone()里面传进去的 inline_ready 还是空指针,那ScheduleReady()的执行方式就跟前面一样,往线程池里面扔进去新的Process()任务;同步的 kernel 往NodeDone()里面传进去的就是当前线程的 inline_ready,那么回到上面继续看这个函数的后半部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# executor.cc:2092

const GraphView& gview = impl_->gview_;
const TaggedNode* curr_expensive_node = nullptr;
for (auto& tagged_node : ready) {
const NodeItem& item = *gview.node(tagged_node.node->id());
if (tagged_node.is_dead || !item.kernel_is_expensive) {
// Inline this inexpensive node.
inline_ready->push_back(tagged_node);
} else {
if (curr_expensive_node) {
// Dispatch to another thread since there is plenty of work to
// do for this thread.
runner_(std::bind(&ExecutorState::Process, this, *curr_expensive_node,
scheduled_usec));
}
curr_expensive_node = &tagged_node;
}
}
if (curr_expensive_node) {
if (inline_ready->empty()) {
// Tail recursion optimization
inline_ready->push_back(*curr_expensive_node);
} else {
// There are inline nodes to run already. We dispatch this expensive
// node to other thread.
runner_(std::bind(&ExecutorState::Process, this, *curr_expensive_node,
scheduled_usec));
}
}

下面这个循环一开始我是没看懂的,看明白了发现这里的逻辑还是挺有意思的。检查 ready 队列中的每一个元素,如果下一个 node 不是耗时的计算任务(expensive node),那就直接加到 inline_ready 队列里面去,否则当前线程至多只做一个耗时任务,其他的任务都扔到线程池里面去交给别的线程做。

也就是说,当前线程要么把全部的不耗时任务做了,要么只做一个耗时任务。


到这里为止,运行部分的逻辑闭环就结束了。这部分的整个流程图大概是这个样子:

Session.Run()


后面继续深入 Executor 的运行时实现往下看看: