Emm……接前篇。
主要是整理几个多线程小例子,读写锁和定时器队列是面试的时候考到的,线程池是自己很早就想写着玩的(挖了好久的坑一直没填,内存池也是……)。
整理出来的代码会放在这里:
ThreadPool 参考了一下别人的实现方案:
总体上还是比较容易理解的,每个工作线程队里。
当任务队列为空时,每一条工作线程在一个条件变量上等待;当任务队列非空时,则空闲线程直接从任务队列中取出封装好的 std::function
来执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 void ThreadRunner::operator () () { std::function<void ()> task; while (true ) { { std::unique_lock<std::mutex> lock (pool_.tasks_mutex_) ; while (!pool_.stop_ && pool_.tasks_.empty ()) { pool_.cv_.wait (lock); } if (pool_.stop_) return ; task = pool_.tasks_.front (); pool_.tasks_.pop (); } task (); } }
TimerQueue 定时器即让一个注册好的回调函数在某个设定时间到达之后执行。
话说现在应该有很多系统级的 API 都能够提供这种延迟执行 或者阻塞当前线程一段时间 的功能,但是当定时的任务比较多的时候,就有可能还是自己写一个定时器队列 来维护性能更好点了(主要是不确定到时候所用的定时 API 是怎么实现的,例如如果所有的这些定时任务都用 epoll_wait 的超时触发做,那 epoll 的底层实际上是用红黑树来维护的,猜想效率应该不会太差)。
另外还有一个定时精度的问题,不同的 API 可能能够提供的精度支持也是不一样的。
当时面试的时候被问到这个是远程共享写代码,由于之前没接触过这种 case,一开始不是很明白面试官到底想考我什么,就觉得定时器这种东西不是现有的 API 一大堆嘛,然后一脸懵逼。
最后写了一个线程每隔一个最小的周期(Tick)处理一次队列中的任务,把到达时间的任务取出来执行,任务列表就用优先队列或者说小根堆来实现,保证每次都能够高效地把设定时间最靠前的任务取出来。
网上查了下,基本上比较不错的思路有两种,一种就是小根堆的实现,另外一种是 Linux 内核中采用的时间轮的定时器方法。
由于小根堆的堆顶元素是最近触发的一个任务,因此还可以动态改变下一次处理需要等待的 Tick 时间来节省 CPU 资源的消耗。但是感觉这种方式会不会影响定时精度?以及这样似乎没有办法处理“在堆顶元素之前又插入了一个更近的任务”这种情况,所以最后想想还是觉得固定 Tick 比较稳妥。
然后就是重点要来学习一下时间轮(Timing-Wheel)了。
这个思路其实很简单,想象有一个钟表盘,上面有一圈时间刻度(从 1 到 N),有一个指针每隔 Tick 时间转动一个刻度,而所有的任务也是按照等待时间除以 Tick 分布在每个刻度上面,当指针扫到某个刻度时,即遍历一遍这个刻度上的任务列表,把到时间的拿出来执行即可。
当 N 足够大时,显然这个定时器的各个操作的复杂度都是 O(1)
的。
但是如果 N 过大,则这个时间轮的内存消耗将会比较大,因此又有了多级时间轮的优化思路:以钟表上的时针、分针、秒针为例,时针分为 12 格,分针、秒针分别为 60 格,这样就有一共 132 个格子,每个格子都是一条任务链表。一分钟以内的任务被加到秒针的格子中,一小时以内的任务加到分针的格子中,更久的任务加到时针的格子中。秒针每个 Tick 移动一次,每次移动后直接把当前格子中的所有任务取出来执行;分针则是每 60 个 Tick 移动一次,每次移动把当前格子中的任务下放到秒针轮对应的格子中;时针也是类似。
更厉害的优化思路可见下面这篇:
ReadWriteLock 最后这个读写锁是面试完之后让半小时内写完,然后邮件回面试官交作业……
结果读写锁的逻辑很快写完之后 main 函数的测试 case 却调 bug 调了好久……狂汗
读写锁从功能上来看也可以叫做共享锁,相对普通的互斥锁来说,它会有三种状态:未锁定、读锁定和写锁定。
读写锁的读操作是可以共享 的,即处于读锁定的时候另外一个线程还可以继续对其施加读锁定,而写锁定跟一般的锁一样是独占的。
因此基本的思路就是用一个读计数 来记录读操作,当读写锁处于解锁或者读锁定状态时,获取读锁即增加读的锁定计数,而此时想要获取写锁就需要等到所有的读操作都释放之后才行。
写操作可以用一个布尔变量来记录,当读写锁处于写锁定状态时,继续获取读锁或者写锁都需要等前一次的写操作释放。
下面分别是简单地直接用一个锁的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 void rwlock::getRead () { while (is_writing == true ) { _lock.lock (); } read_count++; } void rwlock::getWrite () { while (read_count!=0 || is_writing==true ) { _lock.lock (); } is_writing = true ; } void rwlock::unlockRead () { read_count--; if (read_count == 0 ) { _lock.unlock (); } } void rwlock::unlockWrite () { is_writing = false ; _lock.unlock (); }
以及用条件变量实现的版本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 void ReadWriteLock::getRead () { std::unique_lock<std::mutex> lock (cv_mutex_) ; cv_.wait (lock, [this ](){return !this ->is_writing_;}); read_count_++; } void ReadWriteLock::getWrite () { std::unique_lock<std::mutex> lock (cv_mutex_) ; cv_.wait (lock, [this ](){return this ->read_count_==0 && !this ->is_writing_;}); is_writing_ = true ; } void ReadWriteLock::unlockRead () { std::lock_guard<std::mutex> lock (cv_mutex_) ; read_count_--; if (read_count_ == 0 ) { cv_.notify_one (); } } void ReadWriteLock::unlockWrite () { std::lock_guard<std::mutex> lock (cv_mutex_) ; is_writing_ = false ; cv_.notify_one (); }
Coroutine 协程。
严格地说这个跟多线程关系不大,其实这个东西本身大概类似一种用户态的线程,然后关键在于这个线程的运行和调度都是要靠用户自己来管理的。
很多语言里面都自带协程的支持,例如,举一个别人的例子 ,一个 js 写的生成斐波那契数列的函数:
1 2 3 4 5 6 7 8 9 10 function fibonacii ( ) { let i = 1 ; let j = 1 ; for (let k = 0 ; k < 10 ; k++) { console .log (i); console .log (j); i += j; j += i; } }
用协程的写法则是:
1 2 3 4 5 6 7 8 9 10 11 12 13 function *fibonaciiUsingYield ( ) { let i = 1 ; let j = 1 ; while (true ) { yield i; yield j; i += j; j += i; } } ------ const gen = fibonaciiUsingYield ();gen.next ();
相当于每一次这个函数都断点 在 yield 对象上,当调用 next 函数时,函数向前执行一轮。
那么如何在本身不支持协程的 C/C++ 里面把这种特性用上呢?
答案是手动实现一个状态机……
详见轮子哥的考不上三本系列:
下面是按照这种方式写的一个简单例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 #include <iostream> #include <stdio.h> #include <string.h> #include <algorithm> using namespace std;void original_func () { int i=0 ; while (i<10 ) { printf ("%d\n" , i); i++; } } class func_callable { public : func_callable (int n) : total_ (n) {} int result; bool operator () () { int ans; while (true ) { switch (state_) { case 0 : i_ = 0 ; if (i_ < total_) {state_ = 1 ; continue ;} else {state_ = 3 ; continue ;} break ; case 1 : result = i_; {state_ = 2 ; return false ;} break ; case 2 : i_++; if (i_ < total_) {state_ = 1 ; continue ;} else {state_ = 3 ; continue ;} break ; case 3 : {state_ = -1 ; return true ;} break ; default : throw system_error (); } } return ans; } private : int i_; int total_; int state_ = 0 ; }; func_callable coroutine_func (int n) { return func_callable (n); } int main () { original_func (); auto a = coroutine_func (10 ); printf ("Coroutine Test:\n" ); a (); printf ("%d\n" , a.result); a ();a (); printf ("%d\n" , a.result); a (); printf ("%d\n" , a.result); a ();a (); printf ("%d\n" , a.result); return 0 ; }
Result:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 0 1 2 3 4 5 6 7 8 9 Coroutine Test: 0 2 3 5