几个多线程的练手 case

Emm……接前篇。

主要是整理几个多线程小例子,读写锁和定时器队列是面试的时候考到的,线程池是自己很早就想写着玩的(挖了好久的坑一直没填,内存池也是……)。

整理出来的代码会放在这里:

ThreadPool

参考了一下别人的实现方案:

总体上还是比较容易理解的,每个工作线程队里。

当任务队列为空时,每一条工作线程在一个条件变量上等待;当任务队列非空时,则空闲线程直接从任务队列中取出封装好的 std::function 来执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void ThreadRunner::operator()()
{
std::function<void()> task;

while (true)
{
{
std::unique_lock<std::mutex> lock(pool_.tasks_mutex_);

while (!pool_.stop_ && pool_.tasks_.empty())
{
pool_.cv_.wait(lock);
}

if (pool_.stop_) return;

task = pool_.tasks_.front();
pool_.tasks_.pop();
}

task();
}
}

TimerQueue

定时器即让一个注册好的回调函数在某个设定时间到达之后执行。

话说现在应该有很多系统级的 API 都能够提供这种延迟执行或者阻塞当前线程一段时间的功能,但是当定时的任务比较多的时候,就有可能还是自己写一个定时器队列来维护性能更好点了(主要是不确定到时候所用的定时 API 是怎么实现的,例如如果所有的这些定时任务都用 epoll_wait 的超时触发做,那 epoll 的底层实际上是用红黑树来维护的,猜想效率应该不会太差)。

另外还有一个定时精度的问题,不同的 API 可能能够提供的精度支持也是不一样的。

当时面试的时候被问到这个是远程共享写代码,由于之前没接触过这种 case,一开始不是很明白面试官到底想考我什么,就觉得定时器这种东西不是现有的 API 一大堆嘛,然后一脸懵逼。

最后写了一个线程每隔一个最小的周期(Tick)处理一次队列中的任务,把到达时间的任务取出来执行,任务列表就用优先队列或者说小根堆来实现,保证每次都能够高效地把设定时间最靠前的任务取出来。


网上查了下,基本上比较不错的思路有两种,一种就是小根堆的实现,另外一种是 Linux 内核中采用的时间轮的定时器方法。

由于小根堆的堆顶元素是最近触发的一个任务,因此还可以动态改变下一次处理需要等待的 Tick 时间来节省 CPU 资源的消耗。但是感觉这种方式会不会影响定时精度?以及这样似乎没有办法处理“在堆顶元素之前又插入了一个更近的任务”这种情况,所以最后想想还是觉得固定 Tick 比较稳妥。

然后就是重点要来学习一下时间轮(Timing-Wheel)了。

Timing-Wheel

这个思路其实很简单,想象有一个钟表盘,上面有一圈时间刻度(从 1 到 N),有一个指针每隔 Tick 时间转动一个刻度,而所有的任务也是按照等待时间除以 Tick 分布在每个刻度上面,当指针扫到某个刻度时,即遍历一遍这个刻度上的任务列表,把到时间的拿出来执行即可。

当 N 足够大时,显然这个定时器的各个操作的复杂度都是 O(1) 的。

但是如果 N 过大,则这个时间轮的内存消耗将会比较大,因此又有了多级时间轮的优化思路:以钟表上的时针、分针、秒针为例,时针分为 12 格,分针、秒针分别为 60 格,这样就有一共 132 个格子,每个格子都是一条任务链表。一分钟以内的任务被加到秒针的格子中,一小时以内的任务加到分针的格子中,更久的任务加到时针的格子中。秒针每个 Tick 移动一次,每次移动后直接把当前格子中的所有任务取出来执行;分针则是每 60 个 Tick 移动一次,每次移动把当前格子中的任务下放到秒针轮对应的格子中;时针也是类似。

更厉害的优化思路可见下面这篇:

ReadWriteLock

最后这个读写锁是面试完之后让半小时内写完,然后邮件回面试官交作业……

结果读写锁的逻辑很快写完之后 main 函数的测试 case 却调 bug 调了好久……狂汗


读写锁从功能上来看也可以叫做共享锁,相对普通的互斥锁来说,它会有三种状态:未锁定、读锁定和写锁定。

读写锁的读操作是可以共享的,即处于读锁定的时候另外一个线程还可以继续对其施加读锁定,而写锁定跟一般的锁一样是独占的。

因此基本的思路就是用一个读计数来记录读操作,当读写锁处于解锁或者读锁定状态时,获取读锁即增加读的锁定计数,而此时想要获取写锁就需要等到所有的读操作都释放之后才行。

写操作可以用一个布尔变量来记录,当读写锁处于写锁定状态时,继续获取读锁或者写锁都需要等前一次的写操作释放。

下面分别是简单地直接用一个锁的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void rwlock::getRead()
{
while (is_writing == true)
{
_lock.lock();
}
read_count++;
}

void rwlock::getWrite()
{
while (read_count!=0 || is_writing==true)
{
_lock.lock();
}
is_writing = true;
}

void rwlock::unlockRead()
{
read_count--;
if (read_count == 0)
{
_lock.unlock();
}
}

void rwlock::unlockWrite()
{
is_writing = false;
_lock.unlock();
}

以及用条件变量实现的版本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void ReadWriteLock::getRead()
{
std::unique_lock<std::mutex> lock(cv_mutex_);

cv_.wait(lock, [this](){return !this->is_writing_;});

read_count_++;
}

void ReadWriteLock::getWrite()
{
std::unique_lock<std::mutex> lock(cv_mutex_);

cv_.wait(lock, [this](){return this->read_count_==0 && !this->is_writing_;});

is_writing_ = true;
}

void ReadWriteLock::unlockRead()
{
std::lock_guard<std::mutex> lock(cv_mutex_);
read_count_--;
if (read_count_ == 0)
{
cv_.notify_one();
}
}

void ReadWriteLock::unlockWrite()
{
std::lock_guard<std::mutex> lock(cv_mutex_);
is_writing_ = false;
cv_.notify_one();
}

Coroutine

协程。

严格地说这个跟多线程关系不大,其实这个东西本身大概类似一种用户态的线程,然后关键在于这个线程的运行和调度都是要靠用户自己来管理的。

很多语言里面都自带协程的支持,例如,举一个别人的例子,一个 js 写的生成斐波那契数列的函数:

1
2
3
4
5
6
7
8
9
10
function fibonacii() {
let i = 1;
let j = 1;
for (let k = 0; k < 10; k++) {
console.log(i);
console.log(j);
i += j;
j += i;
}
}

用协程的写法则是:

1
2
3
4
5
6
7
8
9
10
11
12
13
function *fibonaciiUsingYield() {
let i = 1;
let j = 1;
while (true) {
yield i;
yield j;
i += j;
j += i;
}
}
------
const gen = fibonaciiUsingYield();
gen.next();

相当于每一次这个函数都断点在 yield 对象上,当调用 next 函数时,函数向前执行一轮。

那么如何在本身不支持协程的 C/C++ 里面把这种特性用上呢?

答案是手动实现一个状态机……

详见轮子哥的考不上三本系列:

下面是按照这种方式写的一个简单例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
/* ***********************************************
MYID : Chen Fan
LANG : G++
PROG : Coroutine_test
************************************************ */

#include <iostream>
#include <stdio.h>
#include <string.h>
#include <algorithm>

using namespace std;

void original_func()
{
int i=0;
while (i<10)
{
printf("%d\n", i);
i++;
}
}

class func_callable
{
public:
func_callable(int n)
: total_(n) {}

int result;

bool operator()()
{
int ans;
while (true)
{
switch(state_)
{
case 0:
i_ = 0;
if (i_ < total_) {state_ = 1; continue;}
else {state_ = 3; continue;}
break;
case 1:
result = i_;
{state_ = 2; return false;}
break;
case 2:
i_++;
if (i_ < total_) {state_ = 1; continue;}
else {state_ = 3; continue;}
break;
case 3:
{state_ = -1; return true;}
break;
default:
throw system_error();
}
}
return ans;
}

private:
int i_;
int total_;
int state_ = 0;
};

func_callable coroutine_func(int n)
{
return func_callable(n);
}

int main()
{
original_func();

auto a = coroutine_func(10);
printf("Coroutine Test:\n");
a();
printf("%d\n", a.result);
a();a();
printf("%d\n", a.result);
a();
printf("%d\n", a.result);
a();a();
printf("%d\n", a.result);

return 0;
}

Result:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
0
1
2
3
4
5
6
7
8
9
Coroutine Test:
0
2
3
5
0%