c语言+pthread实现线程池

您所在的位置:网站首页 线程创建和销毁的开销 c语言+pthread实现线程池

c语言+pthread实现线程池

2023-09-07 14:00| 来源: 网络整理| 查看: 265

文章目录 1.线程池原理2.项目结构项目源代码tpool.htpool.cdemo.c线程池完整源码见

1.线程池原理

线程池(thread pool)技术是指能够保证所创建的任一线程都处于繁忙状态,而不需要频繁地为了某一任务而创建和销毁线程,因为系统在创建和销毁线程时所耗费的cpu资源很大。如果任务很多,频率很高,为了单一一个任务而起线程而后销线程,那么这种情况效率相当低下的。线程池技术就是用于解决这样一种应用场景而应运而生的。 pthread线程池实现原理,首先创建多个线程组成线程池,其中每个线程都运行一个被阻塞的循环函数,等待管理者线程唤醒,被唤醒的线程通过函数指针来执行用户所指定的函数,其中线程的阻塞和唤醒使用信号量机制。 ps:该项目改编自知乎文章:https://zhuanlan.zhihu.com/p/44971598 ps:笔者在大佬代码的基础上修改了线程池销毁过程,当线程池中的任务未全部完成时,线程池不会被立刻销毁,直到任务全部执行完成。 ps:在提交的任务函数中不要有sleep这种带有线程取消点的函数,否则在sleep过程中,如果调用destroy_tpool会被主线程默认取消。

2.项目结构

请添加图片描述 提供的api有

一、创建线程池,create_tpool

二、销毁线程池,destroy_tpool

三、分派任务,add_task_2_tpool

项目源代码 tpool.h #ifndef T_POOL #define T_POOL #include #include typedef struct tpool_work{ void* (*work_routine)(void*); //function to be called void* args; //arguments struct tpool_work* next; }tpool_work_t; typedef struct tpool{ size_t shutdown; //is tpool shutdown or not, 1 ---> yes; 0 ---> no size_t maxnum_thread; // maximum of threads pthread_t *thread_id; // a array of threads tpool_work_t* tpool_head; // tpool_work queue pthread_cond_t queue_ready; // condition varaible pthread_mutex_t queue_lock; // queue lock }tpool_t; /*************************************************** *@brief: * create thread pool *@args: * max_thread_num ---> maximum of threads * pool ---> address of thread pool *@return value: * 0 ---> create thread pool successfully * othres ---> create thread pool failed ***************************************************/ int create_tpool(tpool_t** pool,size_t max_thread_num); /*************************************************** *@brief: * destroy thread pool *@args: * pool ---> address of pool ***************************************************/ void destroy_tpool(tpool_t* pool); /************************************************** *@brief: * add tasks to thread pool *@args: * pool ---> thread pool * routine ---> entry function of each thread * args ---> arguments *@return value: * 0 ---> add ok * others ---> add failed **************************************************/ int add_task_2_tpool(tpool_t* pool,void* (*routine)(void*),void* args); #endif//tpool.h tpool.c #include "tpool.h" #include #include #include #include #include int is_taskover(tpool_t *pool){ if(pool->tpool_head!=NULL)return 0; return 1; } static void* work_routine(void* args) { tpool_t* pool = (tpool_t*)args; tpool_work_t* work = NULL; while(1){ pthread_mutex_lock(&pool->queue_lock); while(!pool->tpool_head && !pool->shutdown){ // if there is no works and pool is not shutdown, it should be suspended for being awake pthread_cond_wait(&pool->queue_ready,&pool->queue_lock); } if(pool->shutdown){ pthread_mutex_unlock(&pool->queue_lock);//pool shutdown,release the mutex and exit pthread_exit(NULL); } /* tweak a work*/ work = pool->tpool_head; pool->tpool_head = (tpool_work_t*)pool->tpool_head->next; pthread_mutex_unlock(&pool->queue_lock); work->work_routine(work->args); free(work); } return NULL; } int create_tpool(tpool_t** pool,size_t max_thread_num) { (*pool) = (tpool_t*)malloc(sizeof(tpool_t)); if(NULL == *pool){ printf("in %s,malloc tpool_t failed!,errno = %d,explain:%s\n",__func__,errno,strerror(errno)); exit(-1); } (*pool)->shutdown = 0; (*pool)->maxnum_thread = max_thread_num; (*pool)->thread_id = (pthread_t*)malloc(sizeof(pthread_t)*max_thread_num); if((*pool)->thread_id == NULL){ printf("in %s,init thread id failed,errno = %d,explain:%s",__func__,errno,strerror(errno)); exit(-1); } (*pool)->tpool_head = NULL; if(pthread_mutex_init(&((*pool)->queue_lock),NULL) != 0){ printf("in %s,initial mutex failed,errno = %d,explain:%s",__func__,errno,strerror(errno)); exit(-1); } if(pthread_cond_init(&((*pool)->queue_ready),NULL) != 0){ printf("in %s,initial condition variable failed,errno = %d,explain:%s",__func__,errno,strerror(errno)); exit(-1); } for(int i = 0; i printf("pthread_create failed!\n"); exit(-1); } } return 0; } int add_task_2_tpool(tpool_t* pool,void* (*routine)(void*),void* args) { tpool_work_t* work,*member; if(!routine){ printf("rontine is null!\n"); return -1; } work = (tpool_work_t*)malloc(sizeof(tpool_work_t)); if(!work){ printf("in %s,malloc work error!,errno = %d,explain:%s\n",__func__,errno,strerror(errno)); return -1; } work->work_routine = routine; work->args = args; work->next = NULL; pthread_mutex_lock(&pool->queue_lock); member = pool->tpool_head; if(!member){ pool->tpool_head = work; } else{ while(member->next){ member = (tpool_work_t*)member->next; } member->next =(tpool_work_t*)work; } //notify the pool that new task arrived! pthread_cond_signal(&pool->queue_ready); pthread_mutex_unlock(&pool->queue_lock); return 0; } void destroy_tpool(tpool_t* pool) { tpool_work_t* tmp_work; while(pool->shutdown || !is_taskover(pool)); pool->shutdown = 1; pthread_mutex_lock(&pool->queue_lock); pthread_cond_broadcast(&pool->queue_ready); pthread_mutex_unlock(&pool->queue_lock); for(int i = 0; i maxnum_thread; i++){ pthread_join(pool->thread_id[i],NULL); } free(pool->thread_id); while(pool->tpool_head){ tmp_work = pool->tpool_head; pool->tpool_head = (tpool_work_t*)pool->tpool_head->next; free(tmp_work); } pthread_mutex_destroy(&pool->queue_lock); pthread_cond_destroy(&pool->queue_ready); free(pool); } demo.c #include "tpool.h" #include #include #include void* fun(void* args) { int thread = (int)args; printf("running the thread of %d\n",thread); return NULL; } int main(int argc, char* args[]) { tpool_t* pool = NULL; if(0 != create_tpool(&pool,5)){ printf("create_tpool failed!\n"); return -1; } for(int i = 0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3