0%

多线程相关整理

还是在整理秋招遭遇过的面试题的时候顺便补充一下多线程相关的东西。

一开始是分了 Basic 和 Project 两个大标题,主要还是想写三个面试遇到的多线程例子的实现,然后发现前面 Basic 部分的东西写的有点多了……篇幅有点长,然后想了想还是把 Project 部分另外开一篇吧。

这大概就是计划赶不上变化?……就是这么任性。


这里来理一下 C 和 C++ 里面与多线程相关的 API。

C Format

在 C11 之前,C 标准库里面本身不带线程支持,通常需要用 UNIX/LINUX 系统库里面的 clone、fork 之类或者用 POSIX 标准的 pthread 库来实现。虽然这些东西都包含在 glibc 的库里面了,但是事实上不算 C 语言本身的标准,所以在 cppreference 里面是搜不到的(当时奇怪了好久)。

pthread 底层用来创建线程的实现应该也是调的 clone、fork 这些系统调用来完成的。

至于 mutex 锁、信号量这些原语,从 Linux 2.6.x 版本内核之后都是通过 FUTEX 系统调用来实现的,具体以后有空再看。

另外需要对线程进程这两个概念作一下强调,Linux 中本身不分进程和线程统称为 task,后来用于区分这两个概念主要是看一个 task 所拥有的资源情况。进程有自己的内存空间,线程共享父进程的内存空间。


fork(), vfork(), clone()

先来看一下三个系统接口。

系统调用 描述
fork() 创建父进程的完整副本,复制父进程的资源,包括所有内存的内容。写时复制
vfork() 创建的子进程与父进程共享数据段,并且由 vfork 创建的子进程先运行,父进程在子进程返回前保持阻塞。
clone() 可以对创建的子进程做更多控制,启动的时候指定需要执行的函数内容。

需要注意的是这几个 API 在 Mingw 里面是用不了的。

Bash on Windows 大法好~~~

fork() 的使用方式特别简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/* ***********************************************
MYID : Chen Fan
LANG : GCC
PROG :
************************************************ */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>

int main()
{
int aaa = 1;
pid_t fpid = fork();
int bbb = 1;

if (fpid < 0)
{
perror("Fork Error!");
} else if (fpid == 0)
{
aaa++;
bbb++;
printf("This is son, with pid: %d, aaa: %d(%p), bbb: %d(%p)\n", getpid(), aaa, &aaa, bbb, &bbb);
} else
{
printf("This is Father, with pid: %d, aaa: %d(%p), bbb: %d(%p)\n", getpid(), aaa, &aaa, bbb, &bbb);
printf("Son's pid: %d\n", fpid);
}

return 0;
}

结果:

1
2
3
4
5
6
7
8
# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [15:45:20]
$ gcc test.c

# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [15:45:50]
$ ./a.out
This is Father, with pid: 487, aaa: 1(0x7fffe2ce2d9c), bbb: 1(0x7fffe2ce2da0)
This is son, with pid: 488, aaa: 2(0x7fffe2ce2d9c), bbb: 2(0x7fffe2ce2da0)
Son's pid: 488

fork() 创建产生的子进程除了这个函数返回的 pid 值以外,与父进程完全一致,父子进程也都会从 fork() 函数返回之后继续向下执行相同的内容。另外,如果把本地的变量值的地址打出来,会发现它们的虚拟地址也都是一样的。

这里用了一个写时复制的技术,fork 出来的子进程一开始直接用的是父进程的内存空间,所有内容包括虚拟地址都是跟父进程完全一致的,直到发生了数据更改,才会在物理地址空间中作一个新的映射。这也就提高了创建子进程的效率,因为分配新的页表也会是一件比较耗时的工作。

然后是 vfork(),把上面这段代码中的 fork 直接改成 vfork 之后会得到这样的运行结果:

1
2
3
4
5
6
7
8
9
10
# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [15:46:37] C:134
$ gcc test.c

# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [15:46:42]
$ ./a.out
This is son, with pid: 550, aaa: 2(0x7ffff4873a2c), bbb: 2(0x7ffff4873a30)
This is Father, with pid: 549, aaa: 0(0x7ffff4873a2c), bbb: 1(0x7ffff4873a30)
Son's pid: 550
*** stack smashing detected ***: ./a.out terminated
[1] 549 abort (core dumped) ./a.out

vfork 调用之后,父进程会被阻塞,所以可以看到不同于之前的情况,这里永远都是 son 这句先输出,执行完毕之后才会轮到父进程执行。

那为什么会炸了呢……

在 fpid 等于 0 的分支末尾加上 exit(0); 之后程序就能够正常执行了:

1
2
3
4
5
6
7
8
# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [15:47:10] C:134
$ gcc test.c

# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [15:47:18]
$ ./a.out
This is son, with pid: 584, aaa: 2(0x7fffc8d7641c), bbb: 2(0x7fffc8d76420)
This is Father, with pid: 583, aaa: 2(0x7fffc8d7641c), bbb: 1(0x7fffc8d76420)
Son's pid: 584

原因是 vfork 不同于 fork 的一点是,创建出来的子进程直接共享父进程的数据段,当子进程跑完之后,他会像一个正常的进程一样对自己的栈空间等等做回收,则之后当父进程开始执行的时候自身的内存数据就被子进程破坏掉了一部分,这也是为什么前面第一次父进程的 aaa 输出来的结果是不对的,而加上 exit(0); 之后,父进程可以正常输出 2。

话说网上说 vfork 出来的子进程如果用 return 来返回的话会出现很奇怪的 bug,不过我这里测试的时候没有见到,可能跟 gcc 和系统库的版本有关系。

那么为什么会有 vfork 这个看上去有点问题的实现呢?

这就需要提到另外一个系统接口 exec 了。exec 的作用是拿另外一个程序的代码来替换到当前进程的正文、数据和堆栈,简单地说就是用来启动一个新程序。

exec 的接口实际上是一套,一共 6 个函数,具体的这里先不展开了。

vfork 自身设计的目标是为 exec 服务的,当需要创建一个新进程来执行一段完全不同的代码时,vfork 直接共享父进程地址空间的做法是开销最小的,即保证先有一个子进程,然后调用 exec 来载入一段新的代码并且创建自己的独立地址空间,在子进程开始新程序或者退出之前,内核保证父进程一直处于阻塞状态

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/* ***********************************************
MYID : Chen Fan
LANG : GCC
PROG :
************************************************ */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>

int main()
{
int aaa = 1;
pid_t fpid = vfork();
int bbb = 1;

if (fpid < 0)
{
perror("Fork Error!");
} else if (fpid == 0)
{
aaa++;
bbb++;
printf("This is son, with pid: %d, aaa: %d(%p), bbb: %d(%p)\n", getpid(), aaa, &aaa, bbb, &bbb);
execv("hello.out", NULL);
} else
{
printf("This is Father, with pid: %d, aaa: %d(%p), bbb: %d(%p)\n", getpid(), aaa, &aaa, bbb, &bbb);
printf("Son's pid: %d\n", fpid);
}

return 0;
}

结果大概是这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [16:07:41] C:1
$ gcc test.c
test.c: In function ‘main’:
test.c:27:9: warning: null argument where non-null required (argument 2) [-Wnonnull]
execv("hello.out", NULL);
^

# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [16:09:07]
$ ./a.out
This is son, with pid: 654, aaa: 2(0x7fffec2f627c), bbb: 2(0x7fffec2f6280)
This is Father, with pid: 653, aaa: 2(0x7fffec2f627c), bbb: 1(0x7fffec2f6280)
Son's pid: 654
Hello World

最后是 clone,先看下 man 里面的定义:

1
2
3
4
5
6
7
8
9
10
/* Prototype for the glibc wrapper function */

#define _GNU_SOURCE
#include <sched.h>

int clone(int (*fn)(void *), void *child_stack,
int flags, void *arg, ...
/* pid_t *ptid, void *newtls, pid_t *ctid */ );

/* For the prototype of the raw system call, see NOTES */
  • fn 是需要执行的函数指针,即 clone 出来的子进程需要执行的函数内容。

  • child_stack 就明显是给子进程分配的系统堆栈空间的位置。

  • flags 用于描述子进程需要从父进程中继承哪些部分的内容,因此通过这个值可以控制产生进程、线程、甚至非父子关系而是兄弟关系的进程等等,功能强大。

  • 后面的就是传入新进程中的参数了

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* ***********************************************
MYID : Chen Fan
LANG : GCC
PROG :
************************************************ */

#define _GNU_SOURCE
#include <sched.h>

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <unistd.h>

int hello()
{
printf("This is: %d, Hello World\n", getpid());
return 0;
}

int main()
{

void* stack = malloc(8192);

int pid = clone(&hello, (char*)stack+8192, CLONE_PARENT, 0);

printf("Father pid: %d, new pid: %d\n", getpid(), pid);

return 0;
}

上面这份代码中有两个地方需要额外注意一下:

  1. 在 <sched.h> 头文件引用前要加上 #define _GNU_SOURCE 的宏,表明下文代码不可移植,可能会用到一些非 GNU 标准的内容(例如 clone)。
  2. clone() 中的第二个参数指定的是栈空间,然后因为栈是反向增长的!!,所以这里需要传入申请的空间的尾部。

结果:

1
2
3
4
5
6
7
# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [16:46:24]
$ gcc test.c

# jcf @ J-CF-MSI in /mnt/c/Users/jcf/Desktop/multithread [16:46:54]
$ ./a.out
Father pid: 989, new pid: 990
This is: 990, Hello World

这三个接口的最底层涉及到的都是 do_fork() 这个调用,只是传入的参数不同,clone 可以认为就是个 do_fork() 的 API 外衣。

pthreads

pthreads 的全称应该是 POSIX Threads,是 POSIX 的线程标准,它定义了一套 C 语言标准的线程控制 API,由一个 <pthread.h> 的头文件和一个线程库来实现,主要包含了:线程管理、互斥锁、条件变量、线程同步等等这些线程操作的 API。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/* ***********************************************
MYID : Chen Fan
LANG : GCC
PROG :
************************************************ */

#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#include <pthread.h>

void* hello(void* arg)
{
printf("This is: %d, Hello World\n", getpid());
}

int main()
{
pthread_t tt;

if (pthread_create(&tt, NULL, hello, NULL))
{
printf("Thread Create Error\n");
return 0;
}

pthread_join(tt, NULL);

return 0;
}

结果:

1
2
[Running] cd "c:\Users\jcf\Desktop\multithread\" && gcc test.c -o test && "c:\Users\jcf\Desktop\multithread\"test
This is: 18308, Hello World

话说从使用方式上来看,pthread_create() 的接口跟 clone 就特别像,大概率底层实现就是用 clone 做的,不过传入的线程函数的格式不太一样。

pthread 也提供了互斥锁和条件变量这些结构: pthread_mutex_tpthread_cond_t。具体的使用方式跟后面的差别不大,下文再整理。

头文件 <semaphore.h> 中也提供了信号量的支持。

C11

C11 之后,标准库里面提供了线程支持,包含在头文件 <threads.h> 中(这下可以在 cppreference 里面查到啦)。

基本也就是线程创建、等待、互斥、条件变量等等的支持,感觉看上去跟 pthread 基本一致。

而且不知道为什么,虽然在标准库里面查到了这个库,但是似乎用的人特别少。

那 C 的部分还是用 pthread 吧。

C++ Format

从知乎讨论上面来看,大家对 C++11 的 thread 意见还是比较大的。


C++ 这部分……其实涉及到的东西非常多,需要一堆不同的库联合起来一起用:

线程支持库 <thread> 提供了线程创建、调度、等待等等一系列管理操作;

互斥库 <mutex> 提供了基本的互斥量 mutex,RAII 的锁控制方式 lock_guard 和 unique_lock 等等;

条件变量库 提供了条件变量的支持;

异步支持库 <future> 提供了像 promise、future、async 等等这种异步语义(在 Nodejs 里面用过,之前还真没听说 C++ 里面还带这种玩意);

原子操作库 <atomic> 提供了一系列与原子操作相关的支持;

另外 C++ 中任何可以被调用的东西都是函数对象,前面用来创建线程用的目标函数也需要由函数对象库 <functional> 来管理,std::bind、std::invoke 等等这些管理参数调用,也可以用 lambda 表达式等等。

话说 <thread> 库是不是也还是 pthread 的封装???

functional

有关 std::function 和 Lambda 表达式,很早之前稍微有记过一些:

mutex, lock_guard, unique_lock

C++11 中的基础互斥锁结构是 std::mutex,用法应该基本跟 pthread 的一样。<mutex> 中额外还提供了两个符合 RAII 标准的锁控制封装,以更加异常安全的方式来管理互斥锁。

std::lock_guard 就是个简单的互斥封装容器,构造时锁定给定的锁,然后析构的时候自动释放。事实上它能操作的锁不一定只限于 std::mutex,任何有 lock()un_lock() 两个成员函数的对象都可以。

std::unique_lock 功能更多一点。构造时可选地对传入的锁上锁(也可以选择不锁),析构时自动释放。并且同时它还提供了 lock()try_lock()unlock() 等等这些成员函数,使用起来就更灵活了,除了离开作用域自动析构释放这一点之外,在作用域中还可以手动控制加锁解锁。

condition_variable

条件变量需要结合互斥锁一起使用,这里的 std::condition_variable 尤其在 wait 的时候必须配合 std::unique_lock 来用。

条件变量的核心操作是等待(wait)和唤醒(notify),通常情况下,需要在条件变量上等待的线程需要:

  1. 首先需要获得 std::unique_lock<std::mutex> 锁(重要!!);
  2. 执行 wait()wait_for() 或者 wait_until() ,这三个函数需要把前面的 unique_lock 作为参数传入,执行时将原子地释放传入的 unique_lock,然后挂起当前线程进入等待状态;
  3. 当条件变量被其他线程唤醒(notify)或者超时(对于 wait_for、wait_until)时,当前线程结束等待状态,unique_lock 自动获得锁,然后往下继续执行。

在条件变量上执行唤醒操作的线程需要:

  1. 首先同样要获得锁,这里不是一定要用 std::unique_lock<std::mutex>,其他方式管理也行;
  2. 可以对用于判断的其他数据进行操作;
  3. 对条件变量执行 notify_one() 或者 notify_all(),则其他处于等待状态的线程会被唤醒。

文档中对 notify_one 的描述是会唤醒一个等待的线程,但是并不一定是哪一个,跟进入 wait 状态的线程的先后顺序无关,notify_all 则是唤醒当前正处于等待状态中的所有线程

由于 wait 操作本身自带对 unique_lock 的加锁解锁操作,因此 notify 这边也需要注意前面这个 mutex 锁的状况,必须保证 wait 在调用的时候 mutex 是锁上的,然后 wait 被唤醒的时候锁是开着的。如果 wait 唤醒时试图获取锁失败则会被阻塞在等互斥锁的状态,这个下面有测试。

还有一个重要的注意点是多个线程对某个条件变量的 notify 和 wait 操作可以看成是对一个原子变量的顺序操作,这就意味着如果先调用 notify,再调用 wait,则 wait 是不会从唤醒中恢复的。

看上去这个注意点很正常啊,正常就应该是这样的啊。但是实际多线程操作中非常容易出现:自认为 notify 会发生在 wait 以后,实际执行却不是,然后导致死锁的 bug。

Talk is cheap, show me the code!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/* ***********************************************
MYID : Chen Fan
LANG : G++
PROG : CV_TEST
************************************************ */

#include <stdio.h>

#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

using namespace std;

int main()
{
mutex cv_m;
condition_variable cv;

// ----------- 1 -----------
// {
// unique_lock<std::mutex> lock(cv_m);
// cv.notify_one();
// }

this_thread::sleep_for(chrono::seconds(2));

thread th1([&cv, &cv_m]
{
unique_lock<std::mutex> lock(cv_m);
printf("th1 Start Waiting\n");
cv.wait(lock);
printf("th1 wake up\n");
});

this_thread::sleep_for(chrono::seconds(2));

// ----------- 2 -----------
{
unique_lock<std::mutex> lock(cv_m);
cv.notify_one();
}

// ----------- 3 -----------
// cv_m.lock();
// cv.notify_one();
// cv_m.unlock();

// ----------- 4 -----------
// cv_m.lock();
// cv.notify_one();
// this_thread::sleep_for(chrono::seconds(2));
// cv_m.unlock();

th1.join();

return 0;
}

做了个小测试,对于上面打上标记的 4 块代码:

  1. 中间加了个 sleep 2 秒来确保先调用 notify,然后 wait,妥妥的死锁!
  2. 确保先调用 wait,然后 notify,正常工作!
  3. 这么写也是可以正常工作的,但是如果把下面那段 cv_m.unlock() 删掉,则结果就会死锁!!!原因恰恰是在于 wait 被唤醒的时候要首先试图获得锁,由于 cv_m 这时候是锁着的,然后 wait 线程就被阻塞在获取互斥锁的状态了。
  4. 为了确认 3 里面的这一点,我又写了 4 这个测试。从这里可以明确的是,notify 操作之后,wait 线程虽然仍然阻塞,但是这个阻塞状态跟前面线程挂起的等待状态是不同的,而是卡在 cv_m 这个锁上。

所以看上去最好的方式是 notify 的时候根本就别管锁?如果不上锁,不就没这么多麻烦了吗……事实上,更好的写法应该是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/* ***********************************************
MYID : Chen Fan
LANG : G++
PROG : CV_TEST
************************************************ */

#include <stdio.h>

#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

using namespace std;

int main()
{
mutex cv_m;
condition_variable cv;
bool ready = false;

// ----------- 1 -----------
// {
// lock_guard<mutex> lock(cv_m);
// ready = true;
// cv.notify_one();
// }

this_thread::sleep_for(chrono::seconds(2));

thread th1([&cv, &cv_m, &ready]
{
unique_lock<mutex> lock(cv_m);
printf("th1 Start Waiting\n");
//cv.wait(lock, [&ready]{return ready;});
while (!ready) cv.wait(lock);
printf("th1 wake up\n");
});

this_thread::sleep_for(chrono::seconds(2));

// ----------- 2 -----------
{
lock_guard<mutex> lock(cv_m);
ready = true;
cv.notify_one();
}

th1.join();

return 0;
}

三种 wait 函数均有一种附加条件的多参数调用方式,等同于在一个 while 循环中调用单参数版的 wait 函数。另外用一个 ready 变量标识等待情况,在 notify 时,cv_m 这个锁实际上是用于保护这个 ready 变量用的。用这种方式写则无论 notify 代码块发生在 wait 线程前还是发生在之后,wait 线程均会正常返回了。

future

这个头文件里面的内容很有意思,核心的类主要是 std::promisestd::packaged_taskstd::future 这几个。

std::future 是一个对异步操作结果的封装类,一般需要配合 std::asyncstd::packaged_taskstd::promise 的异步操作使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/* ***********************************************
MYID : Chen Fan
LANG : G++
PROG : future_test
************************************************ */

#include <iostream>

#include <future>
#include <chrono>

using namespace std;

int task(int i)
{
this_thread::sleep_for(chrono::seconds(1));
return i;
}

void promise_task(future<int>& future_int)
{
int x = future_int.get();
cout << x << endl;
}

int main()
{
// ----------- future + async -----------
future<int> a;
a = async(task, 10);
cout << a.get() << endl;

// ----------- future + packaged_task -----------
packaged_task<int(int)> b_pack(task);
future<int> b = b_pack.get_future();
b_pack(20);
//thread thb(move(b_pack), 20);
//thb.join();
cout << b.get() << endl;

// ----------- future + promise -----------
promise<int> c_prom;
future<int> c = c_prom.get_future();
thread thc(promise_task, ref(c));
this_thread::sleep_for(chrono::seconds(1));
c_prom.set_value(30);
thc.join();

return 0;
}

std::async 的作用是在另外一个线程中异步(默认操作,也可以设置在调用线程中同步执行)地执行给定的函数,函数执行的返回值则是由一个 std::future 对象来接收。

std::packaged_task 则是一个可调用目标(函数、Lambda表达式或者其他函数对象,即 std::function 的对象)的类模板封装,这个封装主要也就是把函数对象的执行和返回值给分开。package_task 对象可以直接加参数调用,或者放在另外一个线程中调用,结果会在函数体执行完毕之后存到对应的 future 结构中。

最后是 std::promise,这个对象感觉有点像 placeholder 占位符的作用。promise 通过 set_value() 来提供数据,在 future 绑定的 promise 准备完成之前,future 的 get() 会阻塞所在的线程。