C/C++手撕线程池(线程池的封装和实现)

您所在的位置:网站首页 线程从创建到销毁的全过程图解 C/C++手撕线程池(线程池的封装和实现)

C/C++手撕线程池(线程池的封装和实现)

2023-12-20 01:16| 来源: 网络整理| 查看: 265

C/C++手撕线程池(线程池的封装和实现) 线程池描述池式结构线程池线程池解决的问题 朴实无华但不枯燥的代码(以c++为例)线程池中比较关键的东西工作队列任务队列线程池本池回调函数static?函数本身 缝缝补补添加任务面向用户的添加任务构造函数析构函数 测试性能测试测试结果不使用线程池使用线程池 一些合理的建议以及后来觉得没必要的骚操作

本文使用的源码地址: https://github.com/SCljh/thread_pool

线程池描述

池式结构

  在计算机体系结构中有许多池式结构:内存池、数据库连接池、请求池、消息队列、对象池等等。

  池式结构解决的主要问题为缓冲问题,起到的是缓冲区的作用。  

线程池

  通过使用线程池,我们可以有效降低多线程操作中任务申请和释放产生的性能消耗。特别是当我们每个线程的任务处理比较快时,系统大部分性能消耗都花在了pthread_create以及释放线程的过程中。那既然是这样的话,何不在程序开始运行阶段提前创建好一堆线程,等我们需要用的时候只要去这一堆线程中领一个线程,用完了再放回去,等程序运行结束时统一释放这一堆线程呢?按照这个想法,线程池出现了。  

线程池解决的问题 解决任务处理阻塞IO解决线程创建于销毁的成本问题管理线程

  线程池应用之一:日志存储

  在服务器保存日志至磁盘上时,性能往往压在磁盘读写上,而引入线程池利用异步解耦可以解决磁盘读写性能问题。

  线程池的主要作用:异步解耦   说了那么多线程池的优点,那接下来要做的就是手撕这诱人的玩意了。  

朴实无华但不枯燥的代码(以c++为例)

本文主要讲解的是c++线程池的实现,C语言实现其实思想和c++是一致的,具体的代码可见文章开头的链接。

线程池中比较关键的东西

  若想自己编写一个线程池框架,那么可以先关注线程池中比较关键的东西:

工作队列任务队列线程池的池pthread_create中的回调函数

为什么说这些东西比较关键?因为这“大四元”基本上支撑起了整个线程池的框架。而线程池的框架如下所示: 线程池   如图所示,我们将整个框架以及任务添加接口定义为线程池的“池”,那么在这个池子中重要的就是工作队列、任务队列、以及决定工作队列中的thread到底应该工作还是休息的回调函数。

  那么手撕线程池,我们就从这几个关键点入手。  

工作队列

  worker队列,首先要有worker才有队列,我们首先定义worker结构体:   可想而知,worker中要有create_pthread函数的id参数,还需要有控制每一个worker live or die的标志terminate,我们最好再设置一个标志表示这个worker是否在工作。最后,我们要知道这个worker隶属于那个线程池(至于为什么下文将介绍)

struct NWORKER{ pthread_t threadid; //线程id bool terminate; //是否需要结束该worker的标志 int isWorking; //该worker是否在工作 ThreadPool *pool; //隶属于的线程池 }

任务队列

  任务队列就简单得多了,想想编程语言中的任务应该是什么?不就是函数嘛。所以我们只需要定义一个函数该有的东西就行了。

struct NJOB{ void (*func)(void *arg); //任务函数 void *user_data; //函数参数 };

线程池本池

  对于一个线程池,任务队列和工作队列已经是必不可少的东西了,那线程池还有需要哪些东西辅助它以达到它该有的功能呢?   一说到线程,那处理好线程同步就是一个绕不开的话题,那在线程池中我们需要处理的临界资源有哪些呢?想想我们工作队列中的每个worker都在等待一个任务队列看其是否有任务到来,所以很容易得出结论我们必须要在线程池中实现两把锁:一把是用来控制对任务队列操作的互斥锁,另一把是当任务队列有新任务时唤醒worker的条件锁。   有了这两把锁,线程池中再加点必要的一些数字以及对线程池操作的函数,那么这个类就写完了。实现代码如下:

class ThreadPool{ private: struct NWORKER{ pthread_t threadid; bool terminate; int isWorking; ThreadPool *pool; } *m_workers; struct NJOB{ void (*func)(void *arg); //任务函数 void *user_data; }; public: //线程池初始化 //numWorkers:线程数量 ThreadPool(int numWorkers, int max_jobs); //销毁线程池 ~ThreadPool(); //面向用户的添加任务 int pushJob(void (*func)(void *data), void *arg, int len); private: //向线程池中添加任务 bool _addJob(NJOB* job); //回调函数 static void* _run(void *arg); void _threadLoop(void *arg); private: std::list m_jobs_list; int m_max_jobs; //任务队列中的最大任务数 int m_sum_thread; //worker总数 int m_free_thread; //空闲worker数 pthread_cond_t m_jobs_cond; //线程条件等待 pthread_mutex_t m_jobs_mutex; //为任务加锁防止一个任务被两个线程执行等其他情况 };

可以看到我们做了一些必要的封装,只给用户提供了构造函数、析构函数以及添加任务的函数。这也是一个基本的线程池框架必要的接口。

回调函数 static?

  根据上方代码可以看见,回调函数为static函数。我可不想在我使用使用回调函数的时候自动给我加上*this参数。   首先回调函数是每个线程创建之后就开始执行的函数,该函数作为pthread_create的第三个参数传入。我们来看看pthread_create的函数原型:

int pthread_create(pthread_t *tidp,const pthread_attr_t *attr, void *(*start_rtn)(void*),void *arg);

  注意到,此处的我们传入的回调函数必须是接受一个void*参数,且返回类型为void*的函数。如果我们将回调函数写成线程池的普通成员函数,那么c++会在这个函数参数前默认加上一个*this参数,**这也是为什么我们能在成员函数中使用当前对象中的一些属性。**然而就是这个原因,若我们传入的回调函数指针为类的成员函数,那c++编译器会破坏我们的函数结构(因为给我们加了一个形参),导致pthread_create的第三个参数不符合要求而报错: NON-STATIC   看吧,编译器不让我们用non-static的成员函数作为回调函数传入pthread_create中。其实在c++中,大多数回调函数都有这个要求。

  那为什么static就可以呢?

这是因为static函数为类的静态函数,当类的成员函数被static修饰后,调用该函数将不会默认传递*this指针,这也是为什么static成员函数中不能使用对象的非static属性:你*this指针都没传我上哪去找你的对象?   

函数本身

  在运行回调函数的时候,我们又想用对象里的东西(比如锁),编译器又不让我们用,那怎么办?别忘了虽然static函数没有*this指针,但它却可以有一个*void的参数啊。有了这个*void,我们还怕少一个*this指针?我们可以先写一个static函数,将需要的对象指针通过形参传到这个函数里,再在这个函数中通过这个对象调用成员函数的方法,就能使用这个对象的成员属性了。

  就像这样:

//run为static函数 void* ThreadPool::_run(void *arg) { NWORKER *worker = (NWORKER *)arg; worker->pool->_threadLoop(arg); } //threadLoop为普通成员函数 void ThreadPool::_threadLoop(void *arg) { //在这里就能直接用当前ThreadPool对象的东西了 }

  至于threadLoop的实现,由于线程是要一直存在的,一个while(true)的循环肯定少不了了。这个循环中具体做什么呢:不断检查任务队列中是否有job:

如果有,则取出这个job,并将该job从任务队列中删除,且执行job中的func函数。如果没有,调用pthread_cond_wait函数等待job到来时被唤醒。若当前worker的terminate为真,则退出循环结束线程。

注意在对job操作前别忘了加锁,函数实现如下:

void ThreadPool::_threadLoop(void *arg) { NWORKER *worker = (NWORKER*)arg; while (1){ //线程只有两个状态:执行\等待 //查看任务队列前先获取锁 pthread_mutex_lock(&m_jobs_mutex); //当前没有任务 while (m_jobs_list.size() == 0) { //检查worker是否需要结束生命 if (worker->terminate) break; //条件等待直到被唤醒 pthread_cond_wait(&m_jobs_cond,&m_jobs_mutex); } //检查worker是否需要结束生命 if (worker->terminate){ pthread_mutex_unlock(&m_jobs_mutex); break; } //获取到job后将该job从任务队列移出,免得其他worker过来重复做这个任务 struct NJOB *job = m_jobs_list.front(); m_jobs_list.pop_front(); //对任务队列的操作结束,释放锁 pthread_mutex_unlock(&m_jobs_mutex); m_free_thread--; worker->isWorking = true; //执行job中的func job->func(job->user_data); worker->isWorking = false; free(job->user_data); free(job); } free(worker); pthread_exit(NULL); }

  此处需要注意的是pthread_cond_wait,大家或许会有疑惑:线程调用pthread_cond_wait前没有释放m_jobs_mutex锁就进入了等待,那其他线程不是就一直拿不到这把互斥锁了吗?其实不然,因为pthread_cond_wait在进入等待之前会释放第二个参数的锁,而在被唤醒时又将尝试获取第二个参数传入的锁。具体pthread_cond_wait的实现原理大家可百度或google详细了解,在此就不赘述。  

缝缝补补

  至此,四大元的代码已经介绍完毕,接下来就是一些缝缝补补的工作:添加线程之类的。

添加任务

  添加线程的逻辑其实也挺容易理解:传入一个job --> 尝试获取互斥锁 --> 将job添加到线程池的任务队列中 --> 释放锁 --> 通知worker来取线程,代码如下所示:

bool ThreadPool::_addJob(NJOB *job) { //尝试获取锁 pthread_mutex_lock(&m_jobs_mutex); //判断队列是否超过任务数量上限 if (m_jobs_list.size() >= m_max_jobs){ pthread_mutex_unlock(&m_jobs_mutex); return false; } //向任务队列添加job m_jobs_list.push_back(job); //唤醒休眠的线程 pthread_cond_signal(&m_jobs_cond); //释放锁 pthread_mutex_unlock(&m_jobs_mutex); return true; }

面向用户的添加任务

  当然,我们不希望用户在使用我们的线程池的时候都需要自己定义job并添加到任务队列,job这种私密的关于内部实现的东西,我们也不希望用户能看到,所以我们可以封装一层面向用户的添加任务函数,一来可以方便线程池的使用者,二来也能隐藏内部实现:

//面向用户的添加任务 int ThreadPool::pushJob(void (*func)(void *), void *arg, int len) { struct NJOB *job = (struct NJOB*)malloc(sizeof(struct NJOB)); if (job == NULL){ perror("malloc"); return -2; } memset(job, 0, sizeof(struct NJOB)); job->user_data = malloc(len); memcpy(job->user_data, arg, len); job->func = func; _addJob(job); return 1; }

  接下来,我们还有线程池的构造函数以及析构函数没有实现。  

构造函数

  构造函数所做的工作就是根据用户传入的参数创建线程,并且初始化一些属性值。值得注意的是,我们最好在创建完线程后,调用pthread_detach函数,这样能让我们的worker都能安详得结束一切:

ThreadPool::ThreadPool(int numWorkers, int max_jobs = 10) : m_sum_thread(numWorkers), m_free_thread(numWorkers), m_max_jobs(max_jobs){ //numWorkers:线程数量 if (numWorkers perror("create workers failed!\n"); } //初始化每个worker for (int i = 0; i delete[] m_workers; perror("create worker fail\n"); } if (pthread_detach(m_workers[i].threadid)){ delete[] m_workers; perror("detach worder fail\n"); } m_workers[i].terminate = 0; } }

  

析构函数

  析构函数无非就是做释放资源的事情,注意,由于我们detach了我们创造的线程,所以我们必须手动唤醒所有在条件等待的线程,并将worker的terminate值置为true:

ThreadPool::~ThreadPool(){ //terminate值置1 for (int i = 0; i printf("i = %d\n", *(int *)arg); } int main(){ ThreadPool *pool = new ThreadPool(1000, 2000); printf("线程池初始化成功\n"); int i = 0; for (i = 0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3