线程池主要是用于解决多线程使用时由于线程频繁创建销毁带来的性能损耗导致的实时性降低问题,例如我们常见的订单处理、Web服务器及数据库服务器等经常需要同时处理大量链接请求,并且单个任务本身处理时间较短,此时若采用使用时创建,结束时销毁的方式就会由于频繁的创建及销毁带来很多不必要的开销,这时就可以利用线程池统一进行任务分配及线程的创建销毁管理,线程池可以预先创建一定数量的线程,当没有足够数量的任务需要处理时多余的线程就处于阻塞状态无需占用CPU,等任务处理完毕后再进行线程的销毁,这样处理这种短暂任务就可以不需要频繁的去创建销毁线程,从而解决问题,但如果是本身任务不多、耗时长且任务实时性要求不高的情况(此时线程创建销毁带来的耗时相比任务时间可以忽略)反而可能会因为线程池本身的开销带来性能降低如文件传输这种可能耗时较长的就不一定适合使用线程池,反而传统的即时创建,即时销毁可能会是更好的选择。
线程池主要由管理线程、工作线程及任务队列构成;任务队列用于存放暂未处理的任务,相当于任务排队等待调度执行,工作线程用于执行分配的任务,若未分配到任务则阻塞等待;管理线程用于创建、启动、停止、销毁线程及任务分配,起到管理线程池的作用。
线程池的原理就好比现在的XX打车,打车软件平台服务器就相当于管理线程、运输车辆就好比工作线程、每天的运输订单就相当于任务,订单一时可能过多就需要排队等待接单,排队的订单就相当于任务队列。平台就负责管理运营车辆及订单任务的分配,运输车辆负责完成订单任务,订单任务暂时没有车辆可安排则放入等待队列。
编程实现线程池就是围绕上面的内容完成的,任务队列可以选择使用队列、链表或者数组实现,为了防止类似不同车辆抢到同一订单即多个线程抢到同一任务的情况我们会使用到互斥锁,车辆等待订单及执行订单(线程空闲等待及执行)则可使用条件变量。理想状态是订单有多少车辆就有多少,那我们就可以同时完成多个订单,但现实是订单太多可能平台可能会因为链接过多崩溃类似以前的12306购票系统,在线程池中也就是内存溢出(OOM)没有足够的内存存放所有等待的订单;同样我们的车辆(线程数)也不能无限多,就好比城市道路若道路上跑满了汽车,那也就会导致交通瘫痪汽车移动缓慢甚至无法移动(工作线程数过多CPU就会100%负载这样就可能出现整个系统运行缓慢甚至卡死)。
下面是C语言中线程、互斥锁及条件变量使用的相关函数,编写线程池就会用到这些函数接口:
1.线程操作函数
int pthread_create(pthread_t *tid, const pthread_attr_t *attr, (void*)(*start_run)(void *), void *arg); //创建线程
void pthread_exit(void *retval); //终止当前线程
int pthread_cancel(pthread_t tid); //终止其他线程.发送终止信号后目标线程不一定终止,要调用join函数等待
int pthread_join(pthread_t tid, void **retval); //等待线程结束状态
int pthread_equal(pthread_t t1, pthread_t t2); //比较两线程ID是否相等
int pthread_detach(pthread_t tid); //从主线程中分离线程,该线程资源由分离后线程自身回收释放,注意调用了线程detach函数则不能调用上面的线程join函数
pthread_t pthread_self(void); //获取当前线程ID
int pthread_kill(pthread_t tid, int sig); //给当前进程中的线程发送信号,若发送0信号,可检测线程是否还存在,返回ESRCH表示不存在或线程已结束
2.互斥锁与条件变量相关函数
int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr); //初始化互斥锁锁
int pthread_mutex_destroy(pthread_mutex_t *mutex); //销毁锁
int pthread_mutex_lock(pthread_mutex_t *mutex); //加锁
int pthread_mutex_trylock(pthread_mutex_t *mutex); //尝试加锁,上面lock的非阻塞版本
int pthread_mutex_unlock(pthread_mutex_t *mutex); //解锁
int pthread_cond_init(pthread_cond_t *cv, const pthread_condattr_t *cattr); //初始化条件变量
int pthread_cond_destroy(pthread_cond_t *cond); //销毁条件变量
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); //等待条件
int pthread_cond_broadcast(pthread_cond_t *cond); //唤醒所有调用pthread_cond_wait()而进入睡眠等待的线程
int pthread_cond_signal(pthread_cond_t *cond); //唤醒至少一个调用pthread_cond_wait()而进入睡眠等待的线程
下面是模拟实现线程池的代码
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <signal.h> //pthread_kill
#include <errno.h> //使用ESRCH宏定义须包含
#define QUEUE_MAX_SIZE 100 //队列最大大小
#define MIN_TPOOL_CAPACITY 20 //线程池最小线程容量
#define PER_MAX_CHANGE 10 //线程池单次最多追加或减少线程数
#define FTASK 300000001 //第一个工作任务
#define LTASK 300001000 //最后一个工作任务
//用于封装保存工作任务对象信息
typedef struct task
{
void *(*work)(void *); //指向具体线程工作任务函数
void *arg; //用于保存传递工作任务参数
}task_t;
//线程池对象
typedef struct pthread_pool
{
pthread_mutex_t lock; //用于各线程互斥操作锁
pthread_mutex_t busy_count; //统计在忙线程个数的锁
pthread_cond_t full; //任务队列满时等待的条件变量
pthread_cond_t unused; //线程闲时等待的条件变量
pthread_t *wthdid; //用于保存工作线程ID
pthread_t mthdid; //用于保存管理线程ID
task_t *task_queue; //用于创建任务队列使用,这是构建的一个简单的循环队列保存任务
int queue_task_num; //保存任务队列现有任务个数
int queue_front; //定位任务队列起始位置索引
int queue_tail; //定位任务队列结束位置索引
int min_threads; //用于保存设定线程池中最小工作线程数
int max_threads; //用于保存设定线程池中最大工作线程数
int live_threads; //记录当前存在的工作线程数
int busy_threads; //记录正在工作中的工作线程数
int extra_threads; //记录目前多余的空闲不必要线程数,管理线程清除多余空闲工作线程时使用
int shutdown; //记录线程池工作状态,运行(0) or 结束(1)
}pthread_pool_t;
/*
* 工作线程
* 主要功能:
* 1.等待队列任务
* 2.结束多余线程(多余工作任务的线程及,线程池所有任务结束时的线程)
* 3.从队列中获取并执行任务
* 4.统计忙碌线程数
*/
void *workers_run(void *arg)
{
pthread_pool_t *pool = (pthread_pool_t *)arg;
while(1) {
pthread_mutex_lock(&pool->lock);
//线程池运行中并且线程池任务队列为空,工作线程进入闲时条件等待
while((0 == pool->queue_task_num) && !pool->shutdown) {
pthread_cond_wait(&pool->unused, &pool->lock); //闲时等待,类似等待接单过程
//当线程池中有多余空闲线程并且当前线程数大于线程池中最小线程数,则在工作线程唤醒后就结束本线程
if( (pool->extra_threads > 0) && (pool->live_threads > pool->min_threads) ) {
// printf("extra threads 0x%lx exit ", pthread_self());
pool->extra_threads--;
pool->live_threads--;
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
}
//当线程池结束并且线程池任务队列为空时,当前线程直接退出
if(pool->shutdown && (0 == pool->queue_task_num)) {
pool->live_threads--;
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
task_t task; //用于获取保存当前任务对象信息
//按顺序取出一个任务队列中的任务
task.work = pool->task_queue[pool->queue_front].work;
task.arg = pool->task_queue[pool->queue_front].arg;
pool->queue_front = (pool->queue_front + 1) % QUEUE_MAX_SIZE;
pool->queue_task_num--;
pthread_mutex_unlock(&pool->lock);
//唤醒等待添加新任务到队列的任务
pthread_cond_broadcast(&pool->full);
//工作线程开始取出的队列任务,忙碌线程数增加
pthread_mutex_lock(&pool->busy_count);
pool->busy_threads++;
pthread_mutex_unlock(&pool->busy_count);
task.work(task.arg); //执行取出的队列任务
//工作线程完成取出的队列任务,忙碌线程数减少
pthread_mutex_lock(&pool->busy_count);
pool->busy_threads--;
pthread_mutex_unlock(&pool->busy_count);
}
}
/*
* 管理线程
* 主要功能:
* 1.当线程池中工作线程数不够时增加一定数量的线程
* 2.当线程池中空闲工作线程数过多时通知闲置工作线程退出,让出资源
*/
#define INTERVAL_TIME 2 //设置管理线程轮询的时间间隔
void *manager_run(void *arg)
{
pthread_pool_t *pool = (pthread_pool_t *)arg;
while(!pool->shutdown || pool->busy_threads) {
sleep(INTERVAL_TIME); //定时轮询检测当前工作线程总数量、忙碌线程数量及队列任务量,根据实际使用情况决定是否该创建或删除工作线程
pthread_mutex_lock(&pool->lock);
int queue_size = pool->queue_task_num;
int live_threads = pool->live_threads;
pthread_mutex_unlock(&pool->lock);
pthread_mutex_lock(&pool->busy_count);
int busy_threads = pool->busy_threads;
pthread_mutex_unlock(&pool->busy_count);
printf("%d threads live, %d threads busy ", live_threads, busy_threads);
//已有工作线程数低于线程池最大设定线程数并且队列任务多余最小线程池线程容量时增加工作线程
if( (live_threads < pool->max_threads) && (queue_size > MIN_TPOOL_CAPACITY) ) {
pthread_mutex_lock(&pool->lock);
int i = 0, add = 0;
//在保证不超过线程池最大线程数的同时增加PER_MAX_CHANGE个工作线程
for(i = 0; i < pool->max_threads && add < PER_MAX_CHANGE; i++) {
//pthread_kill发送0给指定线程ID的线程判断当前线程是否存在或结束,返回ESRCH表示不存在该线程或该线程已结束
//创建的线程不能占用覆盖已有工作中的线程ID
if( (0 == pool->wthdid[i]) || (ESRCH == pthread_kill(pool->wthdid[i], 0)) ) {
pthread_create(&pool->wthdid[i], NULL, workers_run, (void *)pool);
pool->live_threads++;
add++;
}
}
pthread_mutex_unlock(&pool->lock);
}
//工作线程多余线程池设定的最小线程数和忙碌线程数则通知多余闲置线程退出
if( (busy_threads < live_threads) && (live_threads > pool->min_threads) ) {
pthread_mutex_lock(&pool->lock);
pool->extra_threads = live_threads - busy_threads;
pthread_mutex_unlock(&pool->lock);
int i = 0;
for(i = 0; i < (live_threads - busy_threads); i++) {
pthread_cond_signal(&pool->unused);
}
}
}
}
//线程池初始化创建函数,可设定最小和最大线程数及最大队列大小
pthread_pool_t *pthread_pool_create(int min_thds, int max_thds, int max_queue_size)
{
pthread_pool_t *thdpool = (pthread_pool_t *)malloc(sizeof(pthread_pool_t));
if(NULL == thdpool) {
printf("malloc thdpool failed ");
return NULL;
}
thdpool->min_threads = min_thds;
thdpool->max_threads = max_thds;
thdpool->live_threads = min_thds;
thdpool->busy_threads = 0;
thdpool->extra_threads = 0;
thdpool->queue_task_num = 0;
thdpool->queue_front = 0;
thdpool->queue_tail = 0;
thdpool->shutdown = 0;
//分配max_thds个pthread_t大小的空间存放用于保存对应的工作线程ID
thdpool->wthdid = (pthread_t *)malloc(sizeof(pthread_t) * max_thds);
if(NULL == thdpool->wthdid) {
printf("malloc workthd failed ");
free(thdpool);
return NULL;
}
memset(thdpool->wthdid, 0, sizeof(pthread_t) * max_thds); //创建线程前将所有工作线程ID设为0
//创建max_queue_size大小的任务队列
thdpool->task_queue = (task_t *)malloc(sizeof(task_t) * max_queue_size);
if(NULL == thdpool->task_queue) {
printf("malloc task_queue failed ");
free(thdpool->wthdid);
free(thdpool);
return NULL;
}
pthread_mutex_init(&thdpool->lock, NULL);
pthread_mutex_init(&thdpool->busy_count, NULL);
pthread_cond_init(&thdpool->full, NULL);
pthread_cond_init(&thdpool->unused, NULL);
//初始先创建最小线程个数的工作线程
int i = 0;
for(i = 0; i < min_thds; i++) {
pthread_create(&thdpool->wthdid[i], NULL, workers_run, (void *)thdpool);
}
//创建管理线程
pthread_create(&thdpool->mthdid, NULL, manager_run, (void *)thdpool);
return thdpool;
}
//线程池销毁函数
void pthread_pool_destroy(pthread_pool_t *pool)
{
if(NULL == pool) return;
pool->shutdown = 1;
int i = 0;
pthread_cond_broadcast(&pool->unused);
for(i = 0; i < pool->live_threads; i++) {
pthread_join(pool->wthdid[i], NULL);
}
pthread_join(pool->mthdid, NULL);
pthread_mutex_destroy(&pool->lock);
pthread_cond_destroy(&pool->full);
pthread_cond_destroy(&pool->unused);
free(pool->wthdid);
free(pool->task_queue);
free(pool);
pool = NULL;
}
//向线程池中增加任务的函数,work为需添加的任务,arg为任务可能需要的参数
void tpool_add_task(pthread_pool_t *pool, void *(*work)(void *), void *arg)
{
pthread_mutex_lock(&pool->lock);
//线程池关闭则直接退出
if(1 == pool->shutdown) {
pthread_mutex_unlock(&pool->lock);
return;
}
//当任务队列满时阻塞等待工作线程将队列中的任务取出
while(QUEUE_MAX_SIZE == pool->queue_task_num) {
pthread_cond_wait(&pool->full, &pool->lock);
}
//向队列中添加工作任务
pool->task_queue[pool->queue_tail].work = work;
pool->task_queue[pool->queue_tail].arg = arg;
pool->queue_tail = (pool->queue_tail + 1) % QUEUE_MAX_SIZE;
pool->queue_task_num++;
pthread_cond_signal(&pool->unused);
pthread_mutex_unlock(&pool->lock);
}
这里用求一个较大数是否是质数来模拟线程耗时工作任务,下面是测试任务函数及主函数
static int compsite_count; //统计合数个数
static int prime_count; //统计质数个数
//统计数量用于判定程序是否会出现漏处理任务
void *work_deal(void *arg)
{
unsigned long n = (unsigned long)arg;
int i = 2;
for(i = 2; i < (n/2 + 1); i++) {
if(0 == n%i) {
compsite_count++;
return arg;
}
}
printf("current thread is 0x%x deal num %lu is a prime num ", (unsigned int)pthread_self(), n);
prime_count++;
return arg;
}
int main(int argc, char **argv)
{
//创建线程池
pthread_pool_t *thdp = pthread_pool_create(10, 50, QUEUE_MAX_SIZE);
if(NULL == thdp) {
printf("create thread pool failed! ");
return 0;
}
//分配任务添加到线程池中
unsigned long task_num = FTASK;
for(task_num = FTASK; task_num < LTASK + 1; task_num++) {
tpool_add_task(thdp, work_deal, (void *)task_num);
}
//等待任务处理完毕并且工作线程任务结束
while(thdp->busy_threads || thdp->queue_task_num);
pthread_pool_destroy(thdp);
//这里质数与合数个数加起来应等于总任务数
printf("prime_count is %d, compsite_countn is %d ", prime_count, compsite_count);
return 0;
}
下面为线程池任务工作效果
通过运行效果我们可以看出在前期我们线程池中由于任务过多会逐渐创建新的线程,当空闲线程过多时退出空闲线程,就达成了自动管理线程的效果。
下图是为了方便更改原理图使用与文章内容无关