C++高性能服务器开发 epoll+线程池模型

您所在的位置:网站首页 epoll实现服务器 C++高性能服务器开发 epoll+线程池模型

C++高性能服务器开发 epoll+线程池模型

2023-08-28 04:10| 来源: 网络整理| 查看: 265

文章目录 前言epoll 部分线程池部分整体设计:

前言

最近在总结之前做的恋爱交由平台的项目。在优化服务器时,将一开始使用的同步阻塞+多线程,替换为现在的epoll+线程池模型。提高了并发的能力,可以实现C10k的目标。

因此,特写此文,用来记录epoll+线程池模型。为相同需求的同学提供优化思路。

epoll 部分 首先要做的还是socket的创建,绑定,监听。并且创建epoll的句柄,同时将监听socket挂载到红黑树上,方便以后客户端有连接请求时可以建立连接。 int TcpNet::InitNetWork() { pool_t *pool = NULL; m_pool = new thread_pool; bzero(&serveraddr,sizeof(serveraddr)); serveraddr.sin_family = AF_INET; if(inet_pton(AF_INET,_DEF_SERVERIP,&serveraddr.sin_addr.s_addr) == -1) { perror("Init Ip Error:"); return FALSE; } serveraddr.sin_port = htons(_DEF_PORT); //创建Socket if((sockfd = socket(AF_INET,SOCK_STREAM,0)) == -1) { perror("Create Socket Error:"); return FALSE; } int mw_optval; setsockopt(sockfd,SOL_SOCKET,SO_REUSEADDR,(char*)&mw_optval,sizeof(mw_optval)); //绑定端口号 if(bind(sockfd,(struct sockaddr*)&serveraddr,sizeof(serveraddr)) == -1) { perror("Bind Socket Error:"); return FALSE; } //监听socket if(listen(sockfd,_DEF_LISTEN) == -1) { perror("Listen Error:"); return FALSE; } epfd = epoll_create(_DEF_EPOLLSIZE);//创建epoll的句柄,可以监听的文件描述符为_DEF_EPOLLSIZE Addfd(sockfd,TRUE); //创建拥有10个线程的线程池 最大线程数200 环形队列最大值50 if((pool = (m_pool->Pool_create(200,10,50))) == NULL) err_str("Create Thread_Pool Failed:",-1); m_pool->Producer_add(pool, EPOLL_Jobs, pool); return TRUE; } // 这个函数是将要监控的socket挂载到红黑树上 void TcpNet::Addfd(int fd,int enable_et/*是否为边缘触发*/) { struct epoll_event eptemp; eptemp.events = EPOLLIN; // 对应文件描述符可读 eptemp.data.fd = fd; // 文件描述符 if(enable_et) eptemp.events |= EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,fd,&eptemp); } 将epoll_work函数加入到线程池的任务队列中,然后用一个线程负责调用epoll_wait(),当有读事件发生时,epoll_wait返回事件就绪的个数,并且从内核态将就绪事件拷贝到了用户态。在这里调用epoll_deal函数遍历返回的epoll事件集合。首先判断是否是监听socket,如果是则将 与客户端建立连接的 函数加入到线程池的任务队列中。如果是读事件就绪,则将读取数据函数加入到线程池的任务队列中。 // 这个是将 epoll_jobs函数加入到线程池任务队列中 m_pool->Producer_add(pool, EPOLL_Jobs, pool); // epoll工作内容 epoll_wait void * TcpNet::EPOLL_Jobs(void * arg) { pool_t *pool = (pool_t*)arg; int ready; int i = 0; while(1) { //阻塞-1监听socket printf("%d\n",i++); //epoll_wait返回值是就绪事件的个数 if((ready = epoll_wait(m_pThis->epfd,m_pThis->epollarr,_DEF_EPOLLSIZE,-1)) == -1) err_str("Epoll Call Failed:",-1);//出错 //调用epoll_deal 处理就绪的fd m_pThis->Epoll_Deal(ready,pool); bzero(m_pThis->epollarr,sizeof(epollarr)); } } // 处理就绪事件 根据继续个数,处理就绪事件 // 在这里判断是 客户端连接还是 // 客户端连接的话,就交个线程,执行Accept_Deal建立连接的任务 // 是读事件就绪,则分配一个线程执行读取内核缓冲区的内容,并且 void TcpNet::Epoll_Deal(int ready,pool_t *pool) { int i = 0; for(i=0; i Deletefd(fd); m_pool->Producer_add(pool,Info_Recv,(void*)fd); } } } // 创建客户端连接 void *TcpNet::Accept_Deal(void *arg) { struct sockaddr_in clientaddr; int clientsize = sizeof(clientaddr); int clientfd; char ipstr[_DEF_IPSIZE]; pthread_mutex_lock(&m_pThis->alock); if((clientfd = accept(m_pThis->sockfd,(struct sockaddr*)&clientaddr,(socklen_t*)&clientsize)) == -1) { err_str("Custom Thread Accept Error",-1); } pthread_mutex_unlock(&m_pThis->alock); m_pThis->Addfd(clientfd,TRUE); printf("Custom Thread TID:0x%x\tClient IP:%s\tClient PORT:%d\t\n",(unsigned int)pthread_self() ,inet_ntop(AF_INET,&clientaddr.sin_addr.s_addr,ipstr,sizeof(ipstr)),ntohs(clientaddr.sin_port)); return 0; } void *TcpNet::Info_Recv(void *arg) { int clientfd = (long)arg; int nRelReadNum = 0; int nPackSize = 0; char *pSzBuf = NULL; nRelReadNum = recv(clientfd,&nPackSize,sizeof(nPackSize),0); if(nRelReadNum nRelReadNum = recv(clientfd,pSzBuf+nOffSet,nPackSize,0); if(nRelReadNum > 0) { nOffSet += nRelReadNum; nPackSize -= nRelReadNum; } } m_pThis->m_kernel->DealData(clientfd,pSzBuf,nOffSet); m_pThis->Addfd(clientfd,TRUE ); printf("pszbuf = %p \n",pSzBuf); if(pSzBuf != NULL) { free(pSzBuf); pSzBuf = NULL; } return 0; } 线程池部分 整体设计:

在这里插入图片描述 分为三部分:管理线程(负责线程数的扩容与缩减)、任务队列(负责向任务队列填装任务)、工作线程(负责从任务队列取任务并处理)。

其中任务队列、工作线程采用生产者-消费者模型. 采用互斥量+条件变量实现线程同步。

/* * 函数任务: 创建线程 * max:最大线程数 * min:最少线程数 * que_max:队列最大长度 */ pool_t *thread_pool::Pool_create(int max,int min,int que_max) { pool_t *p; if((p = (pool_t*)malloc(sizeof(pool_t))) == NULL) { err_str("malloc pool error:",-1); } p->thread_max = max; p->thread_min = min; p->thread_alive = 0; p->thread_busy = 0; p->thread_shutdown = TRUE; p->thread_wait = 0; p->queue_max = que_max; p->queue_cur = 0; p->queue_front = 0; p->queue_rear = 0; if(pthread_cond_init(&p->not_full,NULL)!=0 || pthread_cond_init(&p->not_empty,NULL)!=0 || pthread_mutex_init(&p->lock,NULL)!=0) { err_str("init cond or mutex error:",-1); } if((p->tids = (pthread_t*)malloc(sizeof(pthread_t)*max)) == NULL) { err_str("malloc tids error:",-1); } bzero(p->tids,sizeof(pthread_t)*max); if((p->queue_task = (task_t*)malloc(sizeof(task_t)*que_max))==NULL) { err_str("malloc task queue error:",-1); } int err; for(int i=0; i printf("create custom error:%s\n",strerror(err)); return NULL; } ++(p->thread_alive); } if((err = pthread_create(&(p->manager_tid),NULL, Manager,(void*)p))>0) { printf("create Manager error:%s\n",strerror(err)); return NULL; } return p; } /* * 根据传进来的任务函数,和任务参数 * 将任务投递到队列中 (生产者) */ int thread_pool::Producer_add(pool_t * p,void *(task)(void *arg),void *arg) { pthread_mutex_lock(&p->lock); while(p->queue_cur == p->queue_max && p->thread_shutdown ) { // 当线程个数等于最大线程个数时 ,挂起线程,解锁 条件变量 pthread_cond_wait(&p->not_full,&p->lock); } if(!p->thread_shutdown ) { pthread_mutex_unlock(&p->lock); return -1; } p->queue_task[p->queue_front].task = task; // 将任务投递到队列中 p->queue_task[p->queue_front].arg = arg; // 任务函数的参数 p->queue_front = (p->queue_front + 1) % p->queue_max; // 将front后移 ++(p->queue_cur); //更新当前节点 pthread_cond_signal(&p->not_empty); pthread_mutex_unlock(&p->lock); return 0; } // 工作线程--负责从任务队列中取出任务并执行 // 消费者 void * thread_pool::Custom(void * arg) { pool_t * p = (pool_t*)arg; task_t task; while(p->thread_shutdown) { pthread_mutex_lock(&p->lock); while(p->queue_cur == 0 && p->thread_shutdown ) { pthread_cond_wait(&p->not_empty,&p->lock); } if(!p->thread_shutdown ) { pthread_mutex_unlock(&p->lock); pthread_exit(NULL); } if(p->thread_wait > 0 && p->thread_alive > p->thread_min) { --(p->thread_wait); --(p->thread_alive); pthread_mutex_unlock(&p->lock); pthread_exit(NULL); } task.task = p->queue_task[p->queue_rear].task; task.arg = p->queue_task[p->queue_rear].arg; p->queue_rear = (p->queue_rear + 1) % p->queue_max; --(p->queue_cur); pthread_cond_signal(&p->not_full); ++(p->thread_busy); pthread_mutex_unlock(&p->lock); //执行核心工作 (*task.task)(task.arg); // 调用任务函数 pthread_mutex_lock(&p->lock); --(p->thread_busy); pthread_mutex_unlock(&p->lock); } return 0; } // 管理线程 用于扩容和缩减线程个数 void *thread_pool::Manager(void *arg) { pool_t * p = (pool_t *)arg; int alive; int cur; int busy; int add = 0; while(p->thread_shutdown ) { pthread_mutex_lock(&p->lock); alive = p->thread_alive; busy = p->thread_busy; cur = p->queue_cur; pthread_mutex_unlock(&p->lock); if((cur > alive - busy || (float)busy / alive*100 >= (float)80 ) || p->thread_max > alive) { for(int i=0; ithread_max)&&add pthread_create(&p->tids[i],NULL,Custom,(void*)p); ++(p->thread_alive); } pthread_mutex_unlock(&p->lock); } } if(busy *2 p->thread_min) { pthread_mutex_lock(&p->lock); p->thread_wait = _DEF_COUNT; pthread_mutex_unlock(&p->lock); for(int i=0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3