Linux高并发学习

您所在的位置:网站首页 epoll实现基于TCP的聊天室 Linux高并发学习

Linux高并发学习

2023-11-25 22:27| 来源: 网络整理| 查看: 265

Reactor模式

目前主流的网络通信库,如C/C++的libevent、Java的Netty等,使用的都是Reactor模式。Reactor模式是一种事件处理的设计模式,在I/O请求到达后,服务处理程序使用I/O复用技术同步地讲这些请求派发给相关的请求处理程序。 如何具体的说明Reactor模式呢?我们以饭店运营模式为例: 饭店作为服务器,顾客则是I/O请求,很显然,几乎绝大部分饭店都不会给每位顾客单独分配一个服务员,这样开销太大了。而采用了Reactor模式的饭店是这样做的:一般由某个服务员接待进来用餐的顾客,里面有服务员接待几桌的顾客,当顾客有需求时(如点菜、结账)告诉服务员,服务员会将这些需求转告给对应的工作人员进行处理(如点菜需要转告给厨房、结账转告给收银台),这样的话,即使顾客爆满(I/O事件频繁),饭店依然能够有条不紊的运作。 在这里插入图片描述

具体使用“非阻塞IO+IO复用”,基本结构是一个事件循环(event loop),以事件驱动(event-driven)和事件回调的方式实现业务逻辑,即Reactor模式。 伪代码如下:

while(!done) { int nready = epoll_wait(epollfd, epollEvents, EPOLL_EVENT_SIZE, timeout); if(nready // 处理到期的timer,回调用户的timer handler if(nready > 0){ // 处理IO事件,回调用户的IO handler } } } Epoll的Reactor实现

在reactor模式的设计中,我们首先得设计一个功能较完善的结构,这个结构至少得包含基本的socketfd、读写事件的回调函数等等,这样在后续epoll_wait返回events事件时,能够根据事件类型直接调用所绑定的回调函数。本例程设计的结构体变量如下:

typedef int (*callBack)(int, int, void*); typedef struct _event_item { int fd; int events; void *args; callBack rhandle; // 读事件回调 callBack whandle; // 写事件回调 unsigned char sendBuf[BUFFER_SIZE]; int sendLen; unsigned char recvBuf[BUFFER_SIZE]; int recvLen; } event_item; typedef struct _reactor { int epollfd; event_item *events; }reactor;

其中event_item是一个基本结点,对应着一个socketfd,以及相关的读写回调和发送接收缓存;reactor结点则是用于管理epoll_create()创建的epollfd和所有的event_item结点,event_item *events在本例程中是一个大小为512的数组,也就是说最多能支持512个TCP连接,后续支持更多连接的话,读者可以使用链表来组织结构。

和原始服务器模型一样,首先需要创建socket到listen状态,我们使用一个init_socket()的函数,返回监听状态的socket给后续使用:

sockfd = init_socket(SERVER_PORT); int init_socket(short port){ int reuseAddr = 1; int fd = socket(AF_INET, SOCK_STREAM, 0); if(fd printf("errno: %s, fcntl() failed!\n", strerror(errno)); return -1; } struct sockaddr_in serverAddr; memset(&serverAddr, 0, sizeof(serverAddr)); serverAddr.sin_family = AF_INET; serverAddr.sin_port = htons(port); serverAddr.sin_addr.s_addr = htonl(INADDR_ANY); setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const void*)&reuseAddr, sizeof(reuseAddr)); if(bind(fd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) printf("errno: %s, listen() failed!\n", strerror(errno)); return -1; } return fd; }

在得到监听socket之后,我们接着就可以获取新的socket连接了,在此之前,我们先初始化上述设定的结构变量:

int init_reactor(reactor *r){ if(r == NULL) return -1; memset(r, 0, sizeof(reactor)); r->epollfd = epoll_create(1); if(r->epollfd printf("errno: %s, (event_item*)malloc() failed!\n", strerror(errno)); return -1; } return 0; } reactor *instance = NULL; reactor *getInstance(){ if(instance == NULL){ instance = (reactor*)malloc(sizeof(reactor)); if(instance == NULL) return NULL; memset(instance, 0, sizeof(reactor)); if(init_reactor(instance) struct epoll_event ev; reactor *r = getInstance(); if(r == NULL){ printf("set_reactor_events(): getInstance() failed!\n"); return -1; } if(event == ACCEPT){ r->events[fd].fd = fd; r->events[fd].args = args; r->events[fd].rhandle = acceptCB; ev.events = EPOLLIN; }else if(event == READ){ r->events[fd].fd = fd; r->events[fd].args = args; r->events[fd].rhandle = readCB; ev.events = EPOLLIN; // ev.events |= EPOLLET; }else if(event == WRITE){ r->events[fd].fd = fd; r->events[fd].args = args; r->events[fd].whandle = writeCB; ev.events = EPOLLOUT; } ev.data.ptr = &r->events[fd]; if(r->events[fd].events == INIT){ epoll_ctl(r->epollfd, EPOLL_CTL_ADD, fd, &ev); r->events[fd].events = event; }else if(r->events[fd].events != event){ epoll_ctl(r->epollfd, EPOLL_CTL_MOD, fd, &ev); r->events[fd].events = event; } return 0; }

在该函数中,只要传入fd和需要设定的event事件类型,再封装epoll_ctl()函数完成事件的注册。 那么如何使用呢?我们使用reactor_loop()函数进行事件循环,根据不同类型的事件,来分配给不同的回调函数处理,代码如下:

int reactor_loop(){ struct epoll_event events[MAX_EPOLL_SIZE] = {0}; reactor *r = getInstance(); if(r == NULL){ printf("reactor_loop(): getInstance() failed!\n"); return -1; } while(1){ int nready = epoll_wait(r->epollfd, events, MAX_EPOLL_SIZE, -1); if(nready == -1) continue; for(int i = 0; i struct sockaddr_in clientAdr; socklen_t len = sizeof(clientAdr); int connfd = accept(fd, (struct sockaddr*)&clientAdr, &len); if(fcntl(connfd, F_SETFL, O_NONBLOCK) reactor *r = getInstance(); if(r == NULL){ printf("reactor_loop(): getInstance() failed!\n"); return -1; } unsigned char *rbuf = r->events[fd].recvBuf; #if 0 // ET int cnt = 0, num = 0; while(1){ num = recv(fd, rbuf+cnt, BUFFER_SIZE-cnt, 0); if(num == -1){ if(errno == EAGAIN || errno == EWOULDBLOCK) break; } else if(num > 0) cnt += num; else break; } if (cnt == BUFFER_SIZE && num != -1) { set_reactor_events(fd, READ, NULL); } else if (num == 0) { printf("对端连接断开!clientfd = %d\n", fd); del_reactor_events(fd); } else { unsigned char *sbuf = r->events[fd].sendBuf; memcpy(sbuf, rbuf, cnt); r->events[fd].sendLen = cnt; printf("recv from fd = %d: %s\n", fd, sbuf); set_reactor_events(fd, WRITE, NULL); } #else int n = recv(fd, rbuf, BUFFER_SIZE, 0); if(n == 0){ printf("errno: %s, clientfd %d closed!\n", strerror(errno), fd); del_reactor_events(fd); return -1; }else if(n printf("errno: %s, error!\n", strerror(errno)); return -1; } } else{ unsigned char *sbuf = r->events[fd].sendBuf; memcpy(sbuf, rbuf, n); r->events[fd].sendLen = n; printf("recv from fd = %d: %s\n", fd, sbuf); set_reactor_events(fd, WRITE, args); } #endif return 0; } int writeCB(int fd, int events, void* args){ reactor *r = getInstance(); if(r == NULL){ printf("reactor_loop(): getInstance() failed!\n"); return -1; } unsigned char *sbuf = r->events[fd].sendBuf; int len = r->events[fd].sendLen; int ret = send(fd, sbuf, len, 0); if(ret set_reactor_events(fd, READ, args); } return 0; } 后续

文章到这里就告一段落啦,下一篇会基于reactor实现单机百万并发的代码例程,此外代码有需要的同学可以在评论区留言或者私信我哈,我也上传了对应的代码,链接在这:添加链接描述 有问题的地方请告知我哈~ 互相学习!



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3