线程池(Thread Pool)是一种基于 池化思想 管理线程的工具,经常出现在多线程服务器中,如 MySQL。
线程过多会带来额外的开销,其中包括创建销毁线程的开销、调度线程的开销等等,同时也降低了计算机的整体性能。线程池维护多个线程,等待监督管理者(worker)分配可并发执行的任务。这种做法,一方面避免了处理任务时创建销毁线程开销的代价,另一方面避免了线程数量膨胀导致的过分调度问题,保证了对内核的充分利用。
创建线程可能是非常昂贵的。通常每个线程都会执行几乎相同的任务,包括创建线程、执行任务和销毁线程等。线程本身相当 heavy —— 创建或销毁线程会占用很多本来想要执行的任务时间。因此,重用线程是一个很好的选择。
线程池的另一个好处是它可以防止系统过载(overloaded),它允许 限制线程数量、任务排队,并仅在线程可用时运行任务。
线程池维持固定数量的线程,并等待执行特定任务。该线程池可以设计为根据您需要完成的工作量进行扩展,但我更喜欢 指定固定数量的线程 。为了确定这个数量,通常可以考虑 使用系统上的核心 / 处理器数量加一。
使用线程池可以带来一系列好处:
线程池解决的核心问题就是资源管理问题。在并发环境下,系统不能够确定在任意时刻中,有多少任务需要执行,有多少资源需要投入。这种不确定性将带来以下若干问题:
可以将任务传递到线程池,而不是为每个任务启动一个新线程来并发地执行。一旦池中有任何空闲线程(idle threads),任务就会分配给其中一个空闲线程并执行。在内部,任务被插入到阻塞队列(Blocking Queue)中,池中的线程将从该队列中出队。当一个新任务被插入队列时,其中一个空闲线程将成功地将其出队并执行它。池中的其余空闲线程将被阻塞,等待任务出队。
何为阻塞队列(Blocking Queue)?
后续内容是对基于 C 语言实现的线程池的开源项目 C-Thread-Pool 的学习。
Function example | Description |
---|---|
thpool_init(4) |
Will return a new threadpool with 4 threads. |
thpool_add_work(thpool,func_p,arg_p) |
Will add new work to the pool. Work is simply a function. You can pass a single argument to the function if you wish. If not, NULL should be passed. |
thpool_wait(thpool) |
Will wait for all jobs (both in queue and currently running) to finish. |
thpool_destroy(thpool) |
This will destroy the threadpool. If jobs are currently being executed, then it will wait for them to finish. |
thpool_pause(thpool) |
All threads in the threadpool will pause no matter if they are idle or executing work. |
thpool_resume(thpool) |
If the threadpool is paused, then all threads will resume from where they were. |
thpool_num_threads_working(thpool) |
Will return the number of currently working threads. |
We create a pool of 4 threads and then add 40 tasks to the pool (20 task1 functions and 20 task2 functions). task1 and task2 simply print which thread is running them.
1 |
|
As soon as we add the tasks to the pool, the threads will run them. It can happen that you see a single thread running all the tasks (highly unlikely). It is up the OS to decide which thread will run what. So it is not an error of the thread pool but rather a decision of the OS.
C-Thread-Pool 实现的线程池包括如下数据结构:
流程图左一:struct thpool_* thpool_init(int num_threads)
1 | typedef struct thpool_ { |
1 | jobqueue_init(&thpool_p->jobqueue); |
thpool_p->threads
,它指向一个 struct thread *
类型指针的 指针数组。1 | thpool_p->threads = (struct thread**)malloc(num_threads * sizeof(struct thread*)); |
后续将在
thread_init()
函数中,为每个指向struct thread *
的指针分配一段内存空间。正因为是在 其它函数内部 为这里申请的二级指针指向的指针数组中的每个指针分配一段内存空间,所以在thread_init()
函数中使用了二级指针。看这里,为什么用二级指针
1 | for (i = 0; i < num_threads; i++) { |
流程图左二:int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), void* arg_p)
1 | typedef struct job { |
1 | newjob->function = function_p; |
1 | jobqueue_push(&thpool_p->jobqueue, newjob); |
关注点:
对于任务队列中 没有任务时的处理:
不是采用轮询的方式,而是使用条件变量 has_jobs->cond
,在入队 / 出队任务后,若仍有任务在队列中,则唤醒条件变量。
对于 函数指针类型转换的处理:
自定义的函数原型可以与 api 的原型不一致。例如:这里 api 的函数指针原型为void (*)(void*)
,而「接口使用实例」中的 task1 和 task2 的函数原型为void* (*)(void*)
。
流程图左三:void thpool_wait(thpool_* thpool_p)
1 | pthread_mutex_lock(&thpool_p->thcount_lock); |
这个函数的工作原理是:
thpool_
结构中的计数变量);threads_all_idle
上,同时释放 thcount_lock
互斥锁(以让其它线程有机会拿到锁),并阻塞在这个条件变量上。这样其它线程可以在条件满足时唤醒等待在 threads_all_idle
条件变量上的这个线程。thcount_lock
互斥锁。一旦获取到互斥锁后,它会再次检查 while 条件,若条件成立,说明还有任务或工作线程。那么,这个线程会继续等待在条件变量上,等待被通知唤醒。关注点:
流程图右一:void thpool_destroy(thpool_* thpool_p)
1 | threads_keepalive = 0; |
复位的目的是,通过这个标志结束每个活跃线程的轮询(详见 thread_do
函数),并等待被唤醒后退出。
bsem_p->cond
的唤醒,线程退出1 | /* Give one second to kill idle threads */ |
这里被唤醒后,thread_do
函数中的 bsem_wait
将被唤醒,随之跳出轮询并线程退出。
1 | jobqueue_destroy(&thpool_p->jobqueue); |
1 | for (int n = 0; n < threads_total; n++) { |
关注点:
threads_keepalive
标志,使得在 thread_do
中被唤醒的线程,跳过任务执行(if)和轮询(while),达到优雅地退出。1 | // in function thread_do() |
void thpool_pause(thpool_* thpool_p)
1 | for (int n = 0; n < thpool_p->num_threads_alive; n++) { |
函数原型:int pthread_kill(pthread_t thread, int sig)
函数描述:pthread_kill()
函数向与调用者在同一进程中的线程 thread
发送信号 sig
。该信号会 异步地 被发送到 thread
线程中。发送成功返回 0,不成功返回非 0。
对「异步地」的理解:pthread_kill()
函数向目标线程发送信号时,不会阻塞调用线程的执行 。换句话说,调用pthread_kill()
函数发送信号是一个异步操作,它会立即返回,而不会等待目标线程处理完信号后再继续执行。这意味着调用线程可以继续执行自己的任务,而不必等待目标线程对接收到的信号做出响应。
void thpool_resume(thpool_* thpool_p)
1 | (void)thpool_p; |
关注点:
thread_hold
函数,进入循环睡眠,实现线程的暂停;当线程池发出恢复时,将 threads_on_hold
复位,thread_hold
函数将退出循环睡眠,线程继续执行。所有的线程接口都被 static
关键字修饰。
流程图左一:static int thread_init(thpool_* thpool_p, struct thread** thread_p, int id)
1 | *thread_p = (struct thread*)malloc(sizeof(struct thread)); |
为什么用二级指针作为入参?参考「线程池初始化 -3」小节。
1 | (*thread_p)->thpool_p = thpool_p; |
这一步有何作用?通过线程找到对应的线程池,多个线程可以互斥地改变对应线程池的共享计数资源(发生在
thread_do
)。
1 | /* |
这是核心代码,Worker 作为一个无休止的循环,唯一的被中断是调用 thpool_destroy()
或程序退出。它作为一个 Worker,在内部执行不断入队的 Job。
1 | thpool_* thpool_p = thread_p->thpool_p; |
1 | pthread_mutex_lock(&thpool_p->thcount_lock); |
在「线程初始化」中,每创建一个线程示例,都会执行对应的启动例程,即thread_do
,这个线程运行起来,共享计数量加一。
1 | while (threads_keepalive) { |
1 | if (threads_keepalive) { |
thpool_destroy()
唤醒,if 不成立,while 不成立,退出轮询,活跃线程退出。1 | pthread_mutex_lock(&thpool_p->thcount_lock); |
任务队列主要是为线程服务的。当有新任务到来时,thpool_add_work
调用 push 函数将新任务插入队尾;当线程检测到任务队列有任务时,及时将任务从队列中 pull 并执行它。
具体接口在这里就不再介绍了,直接看流程吧,挺详细的。核心就在 push&pull 操作后,若队列中仍有剩余任务,则会通知唤醒等待在条件变量上的线程,从而使得线程可以执行队列中的任务;当队列为空时,不再通知唤醒线程,线程便会阻塞在条件变量上。
参考资料: