文章目录
Socket编程:常见的协议:套接字概念:预备知识:网络字节序:IP转换函数:sockaddr数据结构:socket文件描述符号:
网络套接字函数:基于TCP的socket模型:创建socket函数:bind绑定(命名)socket:listen监听socket:accept接受连接:connect发起连接:
基于UDP的socket模型:recvfrom函数:sendto函数:
close和shutdown函数对比:
C/S模型-TCP实现:TCP三次握手、状态变换、函数调用返回、队列状态综合图:TCP状态转换图:TCP通信流程:2MSL:等待`2MSL`的意义?端口复用:
TCP连接和释放的过程:简单的通信过程实现:server.c:client.c:
C/S模型-UDP实现:简单的通信过程实现:server.c:client.c:
高并发服务器:补充细节:高并发服务器的通讯示意图:read函数返回值:
多进程并发服务器:使用细节:server.c
多线程并发服务器:使用细节:server.c
多路I/O转接服务器:select函数:相关函数:server.c:select的优缺点:
epoll函数:引入:常用的函数:epoll_create函数:epoll_ctl函数:epoll_wait函数:
epoll原理详解:epoll_create详解:epoll_ctl、epoll_wait详解:总结:
ET/LT深度解析:epoll监听管道的ET和LT模式:事件模型ET和LT的比较:举例:
server.cepoll反应堆模型:
select、epoll对比:
libevent库:libevent库介绍:libevent源码包的安装:libevent特点和组成:libevent的特点和优势:libevent的组成:
libevent框架:常规事件:使用fifo进行读写实现:read.cwrite.c
未决和非未决:
带缓冲区的事件`bufferevent`:常用的函数:基于libevent的TCP通信实现:server.cclient.c
Socket编程:
常见的协议:
![在这里插入图片描述](https://img-blog.csdnimg.cn/ec24ef5b8c884c1ea085bb139fe197e5.jpeg#pic_center)
套接字概念:
Socket,在Linux环境下用于表示进程间网络通信的特殊文件类型,本质是内核借助缓冲区形成的伪文件。与管道类似,读写套接字和读写文件操作一致。区别:管道主要应用于本地进程间通信,套接字多用于网络进程间通信。套接字内核实现较为复杂,有待进一步学习。。。在TCP/IP协议中,IP+TCP/UDP端口可唯一标识网络通信中的一个进程,也就唯一对应一个socket。如果要在两个进程间网络通信,则需要一对socket,即可标识一个唯一的网络连接。通信过程中,套接字是成对出现的。一个文件描述符指向一个socket套接字(内部由内核借助两个发送/接受缓冲区实现)。(这里结合TCP、IP四次挥手进行理解)。
![在这里插入图片描述](https://img-blog.csdnimg.cn/278eaf1e0dc64471a12dfe1882beb180.png#pic_center)
预备知识:
网络字节序:
大端法(PC本地存储):高地址存高位,低地址存底位。小端法(网络存储):高位存低地址,低位存高地址。
#include
// Convert values between host and network byte order.
uint32_t htonl(uint32_t hostlong); --> IP
uint32_t htons(uint32_t hostshort); --> Port
uint32_t ntohl(uint32_t hostlong); --> IP
uint32_t ntohs(uint32_t hostshort); --> Port
IP转换函数:
#include
/* 本地字节序(const char* IP) --> 网络字节序 */
// 参数:
// af:AF_INET(IPv4)、AF_INET6(IPv6)
// src:传入 IP 地址(点分十进制形式)
// dst:传出转换后的网络字节序的 IP 地址
int inet_pton(int af, const char* src, void* dst);
// 返回值:成功则返回1,异常则返回0即src指向的不是一个有效的IP地址,失败则返回-1
/* 网络字节序 --> 本地字节序(const char* IP) */
// af:AF_INET、AF_INET6
// src:网络字节序的 IP 地址
// dst:本地字节序(点分十进制形式的 IP 地址)
// size:dst 的大小
const char* inet_ntop(int af, const void* src, char* dst, socklen_t size);
sockaddr数据结构:
现在的sockaddr退化成了void*的作用,传递一个地址给函数,其会根据地址族确定到底调用按个函数sockaddr_in/sockaddr_in6,函数内部会强制将类型转化为所需的地址类型。
struct sockaddr
{
sa_family_t sin_family; /* address family : AF_INET、AF_INET6、AF_UNIX */
char sa_data[14]; /* 14 bytes, 包括套接字中的目标地址和端口信息 */
};
// 注意:
// 1. sockaddr是给操作系统用的
// 2. 程序员应该使用sockaddr_in来表示地址,(sockaddr_in区分了地址和端口,使用更方便)
// 3. 使用系统调用时,用“类型、IP地址、端口”填充sockaddr_in结构体,之后需要强制转换成sockaddr,才能作为参数传递给系统调用函数
struct sockaddr_in
{
sa_family_t sin_family; /* address family : AF_INET、AF_INET6、AF_UNIX */
in_port_t sin_port; /* port in network byte order */
struct in_addr sin_addr; /* internet address */
char sin_zero[8]; /* 8 bytes zero this if you want to */
// sin_zero用来填充字节,使sockaddr_in、sockaddr保持一样大小
};
/*
// 支持IPv4和IPv6
// inet_pton()函数,可转换如下:IPv4 --> in_addr、IPv6 --> in6_addr
struct in_addr
{
uint32_t s_addr; // Internet address : network byte order
};
struct in6_addr
{
uint64_t l_addr; // Internet address : network byte order
};
*/
![在这里插入图片描述](https://img-blog.csdnimg.cn/989498f73c0c4e7aa3c8c0761d9d4b1a.png#pic_center)
注意:所有专用socket地址结构体类型的变量,都需要强制转换成通用socket地址类型sockaddr(所有的socket编程接口使用的地址参数类型都是sockaddr)。
// 使用举例:
struct sockaddr_in addr;
addr.sin_family = AF_INET/AF_INET6;
addr.sin_port = htons(端口号);
// 方式一:
inet_pton(AF_INET, string类型的IP, &(addr.sin_addr.s_addr));
// 方式二:
addr.sin_addr.s_addr = htonl(INADDR_ANY); // 取出系统中,有效的任意IP地址。
socket文件描述符号:
使用cat命令查看一个进程可以打开的socket描述符的上限。
cat /proc/sys/fs/file-max
一个进程允许使用的文件描述符的个数:ulimit - a查看。
![在这里插入图片描述](https://img-blog.csdnimg.cn/ee2cc4e50907425e82bea7756e418f4a.png#pic_center)
如果有需要,可通过修改配置文件的方式,修改该值。
sudo vi /etc/security/limits.conf
网络套接字函数:
基于TCP的socket模型:
![在这里插入图片描述](https://img-blog.csdnimg.cn/dcb84a7882b348ca996e0ecc991202d4.png#pic_center)
创建socket函数:
#include
// domain:底层协议族:AF_INET、AF_INET6、AF_UNIX
// type:哪个服务:SOCK_STREAM(TCP字节流)、SOCK_DGRAM(UDP数据报)
// protocol:默认为0(前两个参数已经足够)
int fd = socket(int domain, int type, int protocol);
// 返回值:成功则返回一个socket文件句柄fd(或称文件描述符),失败则返回-1并设置errno
bind绑定(命名)socket:
#include
// sockfd:socket文件描述符,即socket()函数的返回值
// addr:将addr所指的socket地址分配给 “未命名的sockfd”,即将 “IP + port” 与 sockfd 绑定
// addrlen:该socket地址结构的大小 sizeof(addr);
int bind(int sockfd, const struct sockaddr* addr, socklen_t addrlen); // 给socket绑定一个地址结构 “IP + Port”
// 返回值:成功则返回0,失败则返回-1并设置errno。
listen监听socket:
socket被绑定之后,还不可以马上接受客户连接,需要创建一个监听队列存放待处理的客户连接,即设置监听的上限。
#include
/* 设置同时与服务器建立连接的上限数(同时进行3次握手的客户端数量),并不阻塞于此 */
// sockfd:被监听的socket,本质是socket()函数的返回值
// backlog:提示内核监听队列的最大长度,即同时与服务器建立连接的上限数(最大为128)
int listen(int sockfd, int backlog);
// 返回值:成功则返回0,失败则返回-1并设置errno
accept接受连接:
阻塞等待直到有客户端尝试与该服务器监听的端口连接,则从listen监听队列中取出连接,最终返回能与客户端通信的文件描述符。
#include
// sockfd:socket函数返回值
// 传出参数addr:成功与服务器建立连接的那个 “客户端的地址结构”
// 传入传出参数addrlen:传入的是addr的大小、传出的是实际的客户端addr的大小
int accept(int sockfd, struct sockaddr* addr, socklen_t* addrlen); // 阻塞直到有客户端连接
// 返回值:
// 1. 成功则返回一个 “服务器能与客户端进行通信的socket对应的文件描述符”
// 2. 失败则返回-1并设置errno
connect发起连接:
服务器是通过listen调用来被动接受连接,客户端则是通过connect主动与服务器建立连接。
#include
// sockfd:系统调用socket()返回的一个fd文件句柄(文件描述符)
// addr:服务器监听的socket地址
// addrlen:服务器监听的socket地址的长度
int connect(int sockfd, const struct sockaddr* addr, socklen_t addrlen);
// 返回值:
// 1. 成功则返回0,之后“客户端就可以通过读写sockfd来与服务器进行通信”
// 2. 失败返回-1并设置errno
基于UDP的socket模型:
![在这里插入图片描述](https://img-blog.csdnimg.cn/f532b1ab177242f1bf9699e97f90c3bd.png#pic_center)
recvfrom函数:
#include
#include
// 读取sockfd上的数据
// buf、len参数:分别指定缓冲区的位置和大小
// 传出参数src_addr:每次读取数据都需要获取发送端的socket地址
// 传入传出参数addrlen:指定地址长度
ssize_t recvfrom(int sockfd, void* buf, size_t len, int flags, struct sockaddr* src_addr, socklen_t* addrlen);
// 返回值:成功则返回接收到的字节数,失败则返回-1并置errno,0则表示对端关闭
sendto函数:
#include
#include
// 往sockfd上写入数据
// buf和len参数:分别为存储数据的缓冲区的位置和大小
// dest_addr指定接收方的socket地址,addrlen指定地址长度
size_t sendto(int sockfd, void* buf, size_t len, int flags, struct sockaddr* dest_addr, socklen_t addrlen);
// 返回值:成功则返回写出的字节数,失败则返回-1并置errno
close和shutdown函数对比:
// close()只关闭指向该缓冲区(用来socket通信的)的一个的文件描述符;shutdown()关闭所有指向该缓冲区的文件描述符
shutdown(int fd, int how);
// how : SHUT_RD 关读端(读缓冲区)
// SHUT_WR 关写端(写缓冲区)
// SHUT_RDWR 关读写端(读/写缓冲区)
C/S模型-TCP实现:
TCP三次握手、状态变换、函数调用返回、队列状态综合图:
客户端:socket() --> connect()阻塞在这里、SYN_SENT状态 --> RTT --> connect()返回、ESTABLISHED状态
服务端:socket()、bind()、listen() --> accept()阻塞在这里、LISTEN状态 --> 在未完成连接队列中建立条目、SYN_RCVD状态 --> RTT --> ESTABLISHED状态、该条目从未完成队列中移到已完成队列中且accept返回
TCP状态转换图:
![在这里插入图片描述](https://img-blog.csdnimg.cn/678e7285058f433cb4bad78fea216246.png#pic_center)
TCP通信流程:
![在这里插入图片描述](https://img-blog.csdnimg.cn/dca4d42d7467446d92b28968b1d61c0e.png#pic_center)
注意:主动关闭的一方可能是client/server,且主动关闭的一方均要等通过TIME_WAIT等待2MSL的时长。
2MSL:
等待2MSL的意义?
保证最后一个ACK能成功被对端接收(等待期间,如果对端没有收到我发的ACK,对端可以再发送FIN请求)。
端口复用:
如果server先关闭进入TIME_WAIT状态(client后关闭),之后server立即重启,但会提示“端口已被占用”,因为此时server是主动发起关闭的一端,会经历从TIME_WAIT状态开始的2MSL的等待时间。
j解决办法:使用setsockopt()设置sock描述符的选项为SO_REUSEADDR = 1,表示允许创建端口号相同但IP地址不同的多个sock描述符。
主要解决的问题:TIME_WAIT状态,导致bind失败的问题。
// 在server.c中,socket()和bind()之间,插入如下代码:(解决了server关闭后,可立即重启的问题)
int opt = 1;
setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&opt, sizeof(opt));
// 返回值:成功则返回0,失败则返回-1并置errno
TCP连接和释放的过程:
三次握手:
![在这里插入图片描述](https://img-blog.csdnimg.cn/eed898e7d2864ec6ba6c0c411b0fd8bb.png#pic_center)
主动发起连接请求端:发送SYN标志位,请求建立连接。携带序列号seq、数据字节数(0)、滑动窗口大小。被动接受连接请求端:发送ACK标志位,同时携带SYN请求标志位。携带序列号seq、确认序号ack、数据字节数(0)、滑动窗口大小。主动发起连接请求端:发送ACK标志位,应答服务器连接请求,携带确认序号ack。因为第三次握手,如果携带数据,则也需要携带序列号seq、数据字节数(非0)、滑动窗口大小。
四次挥手:
半关闭过程: 主动关闭连接请求端,发送FIN标志位。 被动关闭连接请求端,应答ACK标志位。 连接全部关闭: 被动关闭连接请求端,发送FIN标志位。 主动关闭连接请求端,应答ACK标志位。
简单的通信过程实现:
server.c:
#include
#include
#include
#include // toupper()/tolower()
#include
#include
#define SERVER_PORT 8000
void sys_error(const char* str)
{
perror(str);
exit(1);
}
int main()
{
// 创建socket
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0); // 使用IPv4且基于TCP进行通信
if (listen_sockfd == -1)
{
sys_error("socket error");
}
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY); // 表示接收任意IP的连接请求
// 绑定服务器地址结构
int ret = bind(listen_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1)
{
sys_error("bind error");
}
// 设置同时监听上限
ret = listen(listen_sockfd, 128);
if (ret == -1)
{
sys_error("listen error");
}
// 阻塞监听客户端连接,获取客户端的sockaddr_in的socket结构,并返回用于与客户端通信的fd文件描述符
struct sockaddr_in client_addr; socklen_t client_addrlen;
int client_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &client_addrlen);
if (client_sockfd == -1)
{
sys_error("accept error");
}
// 获取客户端的地址
char client_IP[1024];
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_IP, sizeof(client_IP)); // 返回值与client_IP一致
printf("%s : %d\n", client_IP, ntohs(client_addr.sin_port));
while (1)
{
char buf[1024];
// 阻塞等待获取客户端数据,并返回实际读到的字节数
ret = read(client_sockfd, buf, sizeof(buf));
if (ret == 0) // !!!重点:已经读到结尾(对端已关闭)
{
break;
}
write(STDOUT_FILENO, buf, ret);
// 逻辑处理:将小写转大写
for (int i = 0; i
perror(str);
exit(1);
}
int main()
{
// 创建socket
int client_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (client_sockfd == -1)
{
sys_error("socket error");
}
// 将服务器监听的IP地址和端口号port,以及使用的IP地址族,写入sockaddr_in结构体,组成服务器的socket地址
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
inet_pton(AF_INET, "127.0.0.1", &(server_addr.sin_addr.s_addr));
// 与服务器建立连接
int ret = connect(client_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1)
{
sys_error("connect error");
}
char buf[1024];
while (1)
{
// 阻塞等待客户端输入数据,并将数据写入buf,返回实际写入buf的字节数ret
ret = read(STDIN_FILENO, buf, sizeof(buf));
if (ret == 0) // !!!重点:已经读到结尾(对端已关闭)
{
break;
}
write(client_sockfd, buf, ret);
// 阻塞等待服务端的返回数据,并将数据写入buf,返回实际写入buf的字节数ret
ret = read(client_sockfd, buf, sizeof(buf));
write(STDIN_FILENO, buf, ret);
}
close(client_sockfd);
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/7fa98143d0d64e219421c7e5956a1ceb.png#pic_center)
C/S模型-UDP实现:
由于不需要建立连接,故无状态(通过netstat -apn观察不到该IP:Port的状态)、默认支持并发,即多路I/O。
简单的通信过程实现:
server.c:
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
#define BUFSIZE 1024
void sys_error(const char* str)
{
perror(str);
exit(1);
}
int main()
{
int listen_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (listen_sockfd
sys_error("bind error");
}
struct sockaddr_in client_sockaddr; int client_sockaddr_len = sizeof(struct sockaddr_in);
char buf[BUFSIZE];
printf("accepting connection ... \n");
while(1)
{
ret = recvfrom(listen_sockfd, buf, sizeof(buf), 0, (struct sockaddr*)&client_sockaddr, &client_sockaddr_len);
if (ret == -1)
{
sys_error("recvfrom error");
}
// 获取发送数据的客户端的IP地址和port端口号:
char client_ip_addr[1024];
inet_ntop(AF_INET, &client_sockaddr.sin_addr, client_ip_addr, sizeof(client_ip_addr));
printf("data from %s : %d\n", client_ip_addr, ntohs(client_sockaddr.sin_port));
// 服务器执行逻辑,如小写转大写
for (int i = 0; i
sys_error("sendto error");
}
// 将数据发送到客户端的IP地址和port端口号:
printf("data sent to %s : %d\n", client_ip_addr, ntohs(client_sockaddr.sin_port));
}
close(listen_sockfd);
return 0;
}
client.c:
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
#define BUFSIZE 1024
void sys_error(const char* str)
{
perror(str);
exit(1);
}
int main()
{
int client_sockfd = socket(AF_INET, SOCK_DGRAM, 0);
// 初始化服务器的socket地址结构
struct sockaddr_in server_sockaddr;
memset(&server_sockaddr, 0, sizeof(struct sockaddr_in));
server_sockaddr.sin_family = AF_INET;
server_sockaddr.sin_port = htons(SERVER_PORT);
inet_pton(AF_INET, "127.0.0.1", &(server_sockaddr.sin_addr.s_addr));
char buf[BUFSIZE]; int ret;
while (fgets(buf, BUFSIZE, stdin) != NULL)
{
int server_sockaddr_len = sizeof(struct sockaddr_in);
ret = sendto(client_sockfd, buf, strlen(buf), 0, (struct sockaddr*)&server_sockaddr, server_sockaddr_len);
if (ret == -1)
{
sys_error("sendto error");
}
ret = recvfrom(client_sockfd, buf, sizeof(buf), 0, NULL, 0); // NULL和0表示不关心对端的socket地址信息
if (ret == -1)
{
sys_error("recv_from");
}
write(STDOUT_FILENO, buf, ret);
}
close(client_sockfd);
return 0;
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/41e4b052bbdb492caf920d96784890a3.png#pic_center)
高并发服务器:
补充细节:
高并发服务器的通讯示意图:
![在这里插入图片描述](https://img-blog.csdnimg.cn/92309b0d64ab4e25bcb18ba0a5078d54.png#pic_center)
read函数返回值:
> 0,表示实际读到的字节数= 0,已经读到结尾(对端(客户/服务端)已关闭)-1,则需要进一步判断errno的值:
errno = EAGIN || EWOULDBLOCK:设置了非阻塞方式,读时没有数据到达。errno == EINTR,表示慢速系统调用read被中断。errno == ECONNRESET说明连接被重置,需要close,移出监听队列。errno为“其他情况,出现异常”。
多进程并发服务器:
使用细节:
父进程中,最大文件描述符的个数(父进程中,需要close关闭accept返回的新的文件描述符)。系统内,创建进程个数(与内存大小有关)。进程创建过多,是否会降低整个服务器的性能(进程调度)。需要在父进程中,通过注册的捕捉函数(终止掉所有已经结束的子进程,防止僵尸进程出现)捕捉子进程发生状态变化的**SIGCHILD信号**,同时还会继续阻塞监听客户端的连接…。
server.c
#include
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
void sys_error(const char* str)
{
perror(str);
exit(1);
}
void tfunc(int signo)
{
if (signo == SIGCHILD)
{
// 非阻塞形式,回收所有给父进程发送SIGCHILD信号的子进程
while (waitpid(0, NULL, WNOHANG) > 0);
}
}
int main(int argc, char* argv[])
{
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sockfd == -1)
{
sys_error("socket error");
}
struct sockaddr_in server_addr;
// memset(&addr, 0, sizeof(addr));
bzero(&server_addr, sizeof(server_addr)); // 将addr结构体清零
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SERVER_PORT);
int ret = bind(listen_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1)
{
sys_error("bind error");
}
ret = listen(listen_sockfd, 128);
if (ret == -1)
{
sys_error("listen error");
}
while (1)
{
struct sockaddr_in client_addr; socklen_t client_addr_len;
int client_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &client_addr_len);
// 获取客户端的地址
char client_IP[1024];
inet_ntop(AF_INET, &client_addr.sin_addr.s_addr, client_IP, sizeof(client_IP));
printf("%s : %d\n", client_IP, ntohs(client_addr.sin_port));
pid_t pid = fork(); // fork的每个子进程处理一个客户端的连接
if (pid // 子进程,用来处理某一个客户端连接
close(listen_sockfd);
while (1)
{
char buf[1024];
ret = read(client_sockfd, buf, sizeof(buf));
if (ret == 0) // !!!重点:已经读到结尾(对端已关闭)
{
break;
}
write(STDOUT_FILENO, buf, ret);
// 逻辑处理:将小写转大写
for (int i = 0; i // 父进程,等待其他客户端连接,并回收已经处理结束的子进程(僵尸进程)
// 当子进程状态变化时(结束、终止等),会发送SIGCHILD信号给父进程
// 父进程调用注册的捕捉函数,捕捉该SIGCHILD信号
struct sigaction act;
act.sa_handler = sig_catch;
sigemptyset(&act.sa_mask);
// flags=0,即在捕捉函数执行期间,屏蔽子进程的SIGCHILD信号,使用waitpid(0, NULL, WNOHANG)回收所有结束的子进程
act.sa_flags = 0;
ret = sigaction(SIGCHILD, &act, NULL);
if (ret == -1)
{
sys_error("sigaction error");
}
close(client_sockfd);
continue;
}
}
return 0;
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/013d6d01095147e3856506e9e5e72f7d.png#pic_center)
多线程并发服务器:
使用细节:
注意一次最多能连接的客户端的数,本例中为256个注意需要在主线程中,设置子线程为“分离状态”,即可自行回收线程资源。
server.c
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
struct sock_info {
struct sockaddr_in connect_addr;
int connect_fd;
};
void sys_error(const char* str)
{
perror(str);
exit(1);
}
void* tfunc(void* arg)
{
// 打印已连接的客户端的"Port : IP"
struct sock_info* ts = (struct sock_info*)arg;
char client_addr[1024];
inet_ntop(AF_INET, &(*ts).connect_addr.sin_addr, client_addr, sizeof(client_addr));
printf("%s : %d\n", client_addr, ntohs((*ts).connect_addr.sin_port));
while (1)
{
char buf[1024];
int ret = read(ts->connect_fd, buf, sizeof(buf));
if (ret == 0) // !!!重点:已经读到结尾(对端已关闭)
{
printf("....... the client %d closed ........\n", ts->connect_fd);
break;
}
write(STDOUT_FILENO, buf, ret);
// 逻辑处理:将小写转大写
for (int i = 0; i
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in server_addr;
memset(&server_addr, sizeof(server_addr), 0);
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
server_addr.sin_port = htons(SERVER_PORT);
int ret = bind(listen_fd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1)
{
sys_error("bind error");
}
ret = listen(listen_fd, 128); // 设置同一时刻,连接服务器的上限数
if (ret == -1)
{
sys_error("listen error");
}
struct sock_info ts[256]; // 只能允许创建的256个线程,来处理客户端的连接
int i = 0; // 0
perror(str);
exit(1);
}
int main()
{
int connect_sockfd;
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_sockfd == 0)
{
sys_error("socket error");
}
int opt = 1;
setsockopt(listen_sockfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(listen_sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr));
if (ret == -1)
{
sys_error("bind error");
}
ret = listen(listen_sockfd, 128);
if (ret == -1)
{
sys_error("listen error");
}
fd_set rset, allset; // 定义读集合rset、备份集合allset
FD_ZERO(&allset); // 清空备份集合
FD_SET(listen_sockfd, &allset); // 将待监听listen_sockfd添加到监听集合中
int max_sockfd = listen_sockfd;
while (1)
{
rset = allset; // 备份
// 使用select监听信号集rset,且每次循环都需要更新rset
int nReady = select(max_sockfd + 1, &rset, NULL, NULL, NULL);
if (ret
struct sockaddr_in client_addr;
socklen_t client_addr_len = sizeof(client_addr);
// 获取客户端的socket地址,并与客户端建立连接,得到连接后的文件描述符connect_sockfd
int connect_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &client_addr_len); // 建立连接,不会发生阻塞
FD_SET(connect_sockfd, &allset); // 将新产生的connect_fd,添加到监听集合中,监听数据“读事件”
if (max_sockfd
continue;
}
}
// 轮询所有的读文件描述符,是否有数据(优化:通过将所有待读文件描述符放入一个数组,来缩小轮询读文件描述符的范围)
for (int i = listen_sockfd + 1; i
char buf[1024];
ret = read(i, buf, sizeof(buf));
if (ret == -1) {
sys_error("read error");
} else if (ret == 0) { // 监听到客户端已经关闭连接
close(i);
FD_CLR(i, &rset); // 将关闭的文件描述符,移除监听集合
} else if (ret > 0) { // 服务端处理客户端发来的数据,并将结果再发送回客户端
for (int j = 0; j
...
/* 红黑树中,存储着所有添加到epoll中的事件,即该epoll监控的事件(epoll_ctl将要传来的socket) */
struct rb_root rbr; // 红黑树的根节点
/* 双向链表rdllist保存着将要通过epoll_wait返回给用户的、满足条件的事件,即存储“准备就绪的事件” */
struct list_head rdllist; // 当epoll_wait调用时,仅仅观察这个rdllist双向链表里有没有数据即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。
...
};
在调用epoll_create时,内核的工作:
在epoll文件系统里建了个file结点在内核cache里建了个红黑树再建立一个rdllist双向链表
所有添加到epoll中的事件都会与设备(如网卡)驱动程序建立“回调关系”,也就是说相应事件的发生时会调用这里的回调方法。这个回调方法在内核中叫做ep_poll_callback,它会把这样的事件放到上面的rdllist双向链表中。
epoll_ctl、epoll_wait详解:
在epoll中对于每一个事件都会建立一个epitem结构体,如下所示:
struct epitem {
...
//红黑树节点
struct rb_node rbn;
//双向链表节点
struct list_head rdllink;
//事件句柄等信息
struct epoll_filefd ffd;
//指向其所属的eventepoll对象
struct eventpoll *ep;
//期待的事件类型
struct epoll_event event;
...
}; // 这里包含每一个事件对应着的信息。
![在这里插入图片描述](https://img-blog.csdnimg.cn/9775da206f06495ca81897219fe2a43c.jpeg#pic_center)
当调用epoll_wait检查是否有发生事件的连接时,只是检查eventpoll对象中的rdllist双向链表是否有epitem元素,如果不为空,则将这里的事件复制到用户态内存(使用共享内存提高效率)中,同时将事件数量返回给用户。因此epoll_wait效率非常高。 epoll_ctl在向epoll对象中添加、删除、修改事件时,都是基于rbtree红黑树操作,非常快。 总的来说,epoll是非常高效的,它可以轻易地处理百万级别的并发连接。
总结:
一颗红黑树,一张准备就绪句柄双向链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。
![在这里插入图片描述](https://img-blog.csdnimg.cn/869f86774c90475b997e9c6fffd4dc36.png#pic_center)
执行epoll_create()时,创建了红黑树和就绪链表;执行epoll_ctl()时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据;执行epoll_wait()时,会立刻返回就绪链表里的数据。
ET/LT深度解析:
LT是水平触发(默认的触发方式),属于低速模式。如果事件没有处理完,就会被一直触发。
ET是边沿触发,属于高速模式,该事件的通知只会出现一次。
epoll监听管道的ET和LT模式:
#include
#include
#include
#include
#define maxLen 10
void sys_error(const char* str)
{
perror(str);
exit(1);
}
int main(void)
{
int fd[2];
pipe(fd);
pid_t pid = fork();
if (pid == -1) {
sys_error("fork error");
} else if (pid == 0) { // 子进程向管道fd[1]写端写入数据,故关闭fd[0]读端
close(fd[0]);
char buf[maxLen] = "aaaa\nbbbb\n";
for (int cnt = 0; cnt
if (i == maxLen / 2 - 1)
{
continue;
}
buf[i] += 1;
}
write(fd[1], buf, sizeof(buf));
sleep(1);
printf("........ write one time ........\n");
}
} else if (pid > 0) { // 父进程用epoll监听管道的读端fd[0],故关闭fd[1]写端
close(fd[1]);
int epfd = epoll_create(10);
struct epoll_event event;
event.events = EPOLLIN; // LT水平触发(默认)
//event.events = EPOLLIN | EPOLLET; // ET边沿触发
event.data.fd = fd[0];
epoll_ctl(epfd, EPOLL_CTL_ADD, fd[0], &event);
struct epoll_event resevent[10]; // epoll_wait返回的就绪event,但这里只有一个需要监听的文件描述符fd[0]
while (1)
{
int ret = epoll_wait(epfd, resevent, 10, -1);
if (resevent[0].data.fd = fd[0])
{
char buf[maxLen];
ret = read(fd[0], buf, maxLen / 2);
if (ret == -1)
{
sys_error("read error");
}
write(STDOUT_FILENO, buf, ret);
}
}
close(fd[0]);
close(epfd);
}
return 0;
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/294760e50e1b492cb3c2352b8bab8f5b.png#pic_center)
事件模型ET和LT的比较:
LT(Level Trigger)水平触发(缺省的工作方式),并且同时支持block和non-block socket。该模式中,内核会告诉你文件描述符是否就绪,之后可对该文件描述符进行I/O操作。如果你不做任何操作,内核还是会继续通知你。 该方式出错可能性小,故传统的select和poll均采用这种方式。 只要有数据都会触发,缓冲区剩余未读尽的数据会导致epoll_wait返回。 ET(Edge Trigger)边沿触发(高速工作方式),只支持non-block socket。该模式下,当文件描述符从未就绪 --> 就绪时,内核会通过epoll通知你。收到文件描述符已准备就绪的通知后,就不会再为文件描述符发送更多的就绪通知。 只有数据到来才触发,不管缓存区中是否还有数据,缓冲区剩余未读尽的数据不会导致epoll_wait返回。 注意:如果收到就绪通知后,一直不再对该文件描述符进行I/O操作(从而导致它再次变成未就绪状态),内核不会发送更多的通知。
![在这里插入图片描述](https://img-blog.csdnimg.cn/5074cb2ab35f4a7fa6a32024e0699aa5.png#pic_center)
举例:
#include
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
#define MAXLEN 1024
void sys_error(const char* str)
{
perror(str);
exit(1);
}
int main(void)
{
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in listen_addr;
listen_addr.sin_family = AF_INET;
listen_addr.sin_port = htons(SERVER_PORT);
listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(listen_sockfd, (struct sockaddr*)&listen_addr, sizeof(listen_addr));
if (ret == -1)
{
sys_error("bind error");
}
ret = listen(listen_sockfd, 128);
if (ret == -1)
{
sys_error("listen error");
}
struct epoll_event event;
struct epoll_event resevent[10];
int epfd = epoll_create(10);
/* epoll 的 ET模式(高效模式),但只支持“非阻塞模式” */
event.events = EPOLLIN | EPOLLET; // ET是边沿触发,默认为水平触发
struct sockaddr_in client_addr;
socklen_t client_addr_len;
int connect_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &client_addr_len);
char clientaddr[1024];
inet_ntop(AF_INET, &client_addr.sin_addr, clientaddr, sizeof(clientaddr));
printf("%s : %d\n", clientaddr, ntohs(client_addr.sin_port));
/* !!!修改connect_sockfd为非阻塞读 */
int flag = fcntl(connect_sockfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(connect_sockfd, F_SETFL, flag);
event.data.fd = connect_sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connect_sockfd, &event); // 将connect_sockfd添加到监听红黑树中
/* 这段验证程序,只需要监听一个客户端连接的数据 */
while (1)
{
ret = epoll_wait(epfd, &resevent, 10, -1);
if (resevent[0].data.fd == connect_sockfd)
{
char buf[MAXLEN];
while ((ret = read(connect_sockfd, buf, MAXLEN / 2)) > 0) // 非阻塞、忙轮询
{
for (int i = 0; i
perror(str);
exit(1);
}
int main(void)
{
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in listen_addr;
listen_addr.sin_family = AF_INET;
listen_addr.sin_port = htons(SERVER_PORT);
listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
int ret = bind(listen_sockfd, (struct sockaddr*)&listen_addr, sizeof(listen_addr));
if (ret == -1)
{
sys_error("bind error");
}
ret = listen(listen_sockfd, 128);
if (ret == -1)
{
sys_error("listen error");
}
// epfd,创建监听红黑树
int epfd = epoll_create(1024);
if (epfd == -1)
{
sys_error("epoll_create error");
}
// 初始化listen_sockfd的监听属性,并将其添加到红黑树上
struct epoll_event tmp;
tmp.events = EPOLLIN; // 默认事件模式,水平触发
tmp.data.fd = listen_sockfd;
ret = epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sockfd, &tmp);
if (ret == -1)
{
sys_error("epoll_ctl error");
}
struct epoll_event ep[1024]; // ep为epoll_wait传出的满足监听事件的数组
while (1)
{
ret = epoll_wait(epfd, ep, 1024, -1); // 阻塞实施监听,并传出的满足监听事件的数组ep
if (ret == -1)
{
sys_error("epoll_wait error");
}
for (int i = 0; i
continue;
}
// listen_sock满足读事件,表示有客户端发起连接请求
else if (ep[i].data.fd == listen_sockfd)
{
struct sockaddr_in client_addr;
socklen_t clientaddr_len;
int connect_sockfd = accept(listen_sockfd, (struct sockaddr*)&client_addr, &clientaddr_len);
char clientaddr[1024];
inet_ntop(AF_INET, &client_addr.sin_addr, clientaddr, sizeof(clientaddr));
printf("%s : %d\n", clientaddr, ntohs(client_addr.sin_port));
// 初始化connect_sockfd的监听属性,并将其添加到红黑树上
tmp.events = EPOLLIN;
tmp.data.fd = connect_sockfd;
epoll_ctl(epfd, EPOLL_CTL_ADD, connect_sockfd, &tmp);
}
// connect_sockfd满足读事件,表示有客户端写数据
else
{
char buf[1024];
int connect_sockfd = ep[i].data.fd;
ret = read(connect_sockfd, buf, sizeof(buf));
if (ret
write(STDOUT_FILENO, buf, ret);
// 执行服务器的逻辑操作,并将结果返回给客户端
for (int j = 0; j
int fd; // 要监听的文件描述符
int events; // 对应的监听事件,EPOLLIN、EPLLOUT
void *arg; // 指向自己结构体指针
void (*call_back)(int fd,int events,void *arg); //回调函数
int status; // 是否在监听:1 -> 在红黑树上(监听)、0 -> 不在(不监听)
char buf[BUFLEN];
int len;
long last_active; // 记录每次加入红黑树 g_efd 的时间值
};
int g_efd; // 全局变量,作为红黑树根
struct myevent_s g_events[MAX_EVENTS+1]; // 自定义结构体类型数组. +1-->listen fd
// listen_sockfd放在数组的最末尾,其余为connect_sockfd
/*
* 封装一个自定义事件,包括fd、fd的回调函数、额外的参数项
* 注意:在封装这个事件的时候,为这个事件指明了回调函数,一般来说,一个fd只对一个特定的事件
* 感兴趣,当这个事件发生的时候,就调用这个回调函数
*/
void eventset(struct myevent_s *ev, int fd, void (*call_back)(int fd,int events,void *arg), void *arg)
{
ev->fd = fd;
ev->call_back = call_back;
ev->events = 0;
ev->arg = arg;
ev->status = 0;
if(ev->len
struct epoll_event epv={0, {0}};
int op = 0;
epv.data.ptr = ev; // ptr指向一个结构体(之前的epoll模型红黑树上挂载的是文件描述符cfd和lfd,现在是ptr指针)
epv.events = ev->events = events; // EPOLLIN 或 EPOLLOUT
if(ev->status == 0) // status 说明文件描述符是否在红黑树上 0不在,1在
{
op = EPOLL_CTL_ADD; // 将其加入红黑树 g_efd, 并将status置1
ev->status = 1;
}
// 添加一个节点
if(epoll_ctl(efd, op, ev->fd, &epv)
printf("event add OK [fd=%d], events[%d]\n", ev->fd, events);
}
}
/* 从epoll 监听的 红黑树中删除一个文件描述符 */
void eventdel(int efd, struct myevent_s* ev)
{
struct epoll_event epv = {0, {0}};
if(ev->status != 1) // 如果fd没有添加到监听树上,就不用删除,直接返回
{
return;
}
epv.data.ptr = NULL;
ev->status = 0;
epoll_ctl(efd, EPOLL_CTL_DEL, ev->fd, &epv);
return;
}
/* 当有文件描述符就绪, epoll返回, 调用该函数与客户端建立链接 */
void acceptconn(int listen_sockfd, int events, void* arg)
{
struct sockaddr_in cin;
socklen_t len = sizeof(cin);
int connect_sockfd, i;
if((connect_sockfd = accept(listen_sockfd, (struct sockaddr *)&cin, &len)) == -1)
{
if(errno != EAGAIN && errno != EINTR)
{
sleep(1);
}
printf("%s:accept,%s\n", __func__, strerror(errno)); // __func__用来获取函数名称
return;
}
do
{
for(i = 0; i
break;
}
}
if(i == MAX_EVENTS) // 超出连接数上限
{
printf("%s: max connect limit[%d]\n", __func__, MAX_EVENTS);
break;
}
int flag = 0;
if((flag = fcntl(connect_sockfd, F_SETFL, O_NONBLOCK))
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = recv(fd, ev->buf, sizeof(ev->buf), 0); // 读取客户端发过来的数据
eventdel(g_efd, ev); // 将该节点从红黑树上摘除
if (len > 0)
{
ev->len = len;
ev->buf[len] = '\0'; // 手动添加字符串结束标记
printf("C[%d]:%s\n", fd, ev->buf);
eventset(ev, fd, senddata, ev); // 设置该fd对应的回调函数为senddata
eventadd(g_efd, EPOLLOUT, ev); // 将fd加入红黑树g_efd中,监听其写事件
}
else if (len == 0)
{
close(ev->fd);
/* ev-g_events 地址相减得到偏移元素位置 */
printf("[fd=%d] pos[%ld], closed\n", fd, ev - g_events);
}
else
{
close(ev->fd);
printf("recv[fd=%d] error[%d]:%s\n", fd, errno, strerror(errno));
}
return;
}
/*发送给客户端数据*/
void senddata(int fd, int events, void *arg)
{
struct myevent_s *ev = (struct myevent_s *)arg;
int len;
len = send(fd, ev->buf, ev->len, 0); // 直接将数据回射给客户端
eventdel(g_efd, ev); // 从红黑树g_efd中移除
if (len > 0)
{
printf("send[fd=%d], [%d]%s\n", fd, len, ev->buf);
eventset(ev, fd, recvdata, ev); // 将该fd的回调函数改为recvdata
eventadd(g_efd, EPOLLIN, ev); // 重新添加到红黑树上,设为监听读事件
}
else
{
close(ev->fd); // 关闭连接
printf("send[fd=%d] error %s\n", fd, strerror(errno));
}
return ;
}
/*创建 socket, 初始化listen_sockfd */
void initlistensocket(int efd, short port)
{
struct sockaddr_in sin;
int listen_sockfd = socket(AF_INET, SOCK_STREAM, 0);
int flag = fcntl(listen_sockfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(listen_sockfd, flag); // 将socket设为非阻塞
memset(&sin, 0, sizeof(sin));
sin.sin_family = AF_INET;
sin.sin_addr.s_addr = htonl(INADDR_ANY);
sin.sin_port = htons(port);
bind(listen_sockfd, (struct sockaddr*)&sin, sizeof(sin));
listen(listen_sockfd, 20);
/* void eventset(struct myevent_s *ev, int fd, void (*call_back)(int, int, void *), void *arg); */
eventset(&g_events[MAX_EVENTS], listen_sockfd, acceptconn, &g_events[MAX_EVENTS]);
/* void eventadd(int efd, int events, struct myevent_s *ev) */
eventadd(efd, EPOLLIN, &g_events[MAX_EVENTS]); // 将listen_sockfd添加到监听树上,监听读事件
return;
}
int main()
{
int port = SERV_PORT;
g_efd = epoll_create(MAX_EVENTS + 1); //创建红黑树,返回给全局 g_efd
if(g_efd
/*
// 超时验证,每次测试100个连接,不测试listen_sockfd
// 当客户端60s内没有与服务器通信,则关闭此客户端
long now = time(NULL);
for(i=0; i < 100; i++, checkpos++)
{
if(checkpos == MAX_EVENTS);
{
checkpos = 0;
}
if(g_events[checkpos].status != 1)
{
continue;
}
long duration = now -g_events[checkpos].last_active;
if(duration >= 60)
{
close(g_events[checkpos].fd);
printf("[fd=%d] timeout\n", g_events[checkpos].fd);
eventdel(g_efd, &g_events[checkpos]);
}
}
*/
//调用eppoll_wait等待接入的客户端事件,epoll_wait传出的是满足监听条件的那些fd的struct epoll_event类型
int nfd = epoll_wait(g_efd, events, MAX_EVENTS + 1, 1000);
if (nfd
// eventadd()函数中,添加监听事件到监听树的时候,会将myevents_t结构体类型给了ptr指针
// 这里epoll_wait返回的时候,在返回的events中有对应fd的myevents_t类型的指针
struct myevent_s *ev = (struct myevent_s *)events[i].data.ptr;
// 如果监听的是读事件,且返回的也是读事件
if ((events[i].events & EPOLLIN) &&(ev->events & EPOLLIN))
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
// 如果监听的是写事件,且返回的也是写事件
if ((events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
{
ev->call_back(ev->fd, events[i].events, ev->arg);
}
}
}
return 0;
}
select、epoll对比:
系统调用selectepoll事件集合用户通过三个参数分别传入感兴趣的可读、可写、异常等事件,内核通过对这些参数的在线修改来反馈其中的就绪事件,使得每次调用select都需要重置三个参数。内核通过一个(红黑树)事件表直接管理用户感兴趣的所有事件,因此每次调用epoll_wait时,不需要反复传入用户感兴趣的事件。epoll_wait系统调用的参数events仅用来反馈就绪的事件。应用程序索引就绪文件描述符的时间复杂度O(n)O(1)最大支持的文件描述符数一般有最大限制值1024受系统最大值限制65535工作模式LT支持ET高效模式内核实现和工作效率采用“轮询”来检测就绪事件,算法时间复杂度为O(n)采用“回调函数”来检测就绪事件,算法时间复杂度为O(1)
![在这里插入图片描述](https://img-blog.csdnimg.cn/3bf031f7f131492d8b65ccea77d6decf.png#pic_center)
libevent库:
libevent库介绍:
libevent源码包的安装:
在libevent官网下载源码包,并拷贝到centos7后,使用tar zxvf libevent-2.1.8-stable.tar.gz 解压到/opt目录下。 源码包的安装过程: ./configure # 检查安装环境,并生成makefile文件
make # 生成.o文件和可执行文件
sudo make install # 将必要的资源放入系统指定目录
# 检查是否安装成功
cd /opt/libevent-2.1.8-stable/sample # 检查该文件下的demo是否可以正常编译和运行,即可
# 注意:编译使用该库的.c文件时,需要加上`-levent`选项,如下:
gcc ./hello-world.c -o helo-world -levent
# 查看库名和头文件
/usr/local/lib --> libevent.so
链接路径的配置: export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH
# 防止error while loading shared libraries: libevent-2.1.so.6: cannot open shared object file: No such file or directory
# 将`/usr/local/lib`目录添加到`LD_LIBRARY_PATH`环境变量中:
# 解决方法:修改动态链接库搜索路径的配置文件
vim /etc/ld.so.conf
sudo ldconfig # 更新系统的动态链接库缓存
libevent特点和组成:
libevent的特点和优势:
事件驱动,高性能; 轻量级,专注于网络; 跨平台,支持Windows、Linux、Mac Os等; 支持多种I/O多路复用技术, epoll、poll、select和kqueue等; #include
// 查看支持哪些多路I/O
const char** event_get_supported_methods(void);
// 查看当前使用的多路I/O
const char* event_base_get_method(const struct event_base* base);
支持I/O,定时器和信号等事件;
注意:基于“事件”的异步通信模型(内核通过回调执行之前注册好的函数)。
libevent的组成:
事件管理包括各种I/O(socket)、定时器、信号等事件,也是libevent应用最广的模块;缓存管理是指evbuffer功能;DNS是libevent提供的一个异步DNS查询功能;HTTP是libevent的一个轻量级http实现,包括服务器和客户端;
libevent框架:
#include
// 1. 创建`event_base`
struct event_base* event_base_new(void);
// 查看fork后,子进程使用的event_base
// 注意:使用该函数后,父进程创建的base才能在子进程中生效
int event_reinit(struct event_base* base);
// 返回值:成功返回0,失败返回-1
// 2. 创建事件`event`:常规事件event、带缓冲区的事件bufferevent
event_new();
bufferevent_scoket_new();
// 3. 将event添加到事件链表上,注册事件,即将事件添加到`base`上
// tv参数:设置超时时间,为NULL则一直等待事件被触发、回调函数被调用
int event_add(struct event* ev, const struct timeval* tv);
// 4. 循环、检测、分发事件,即循环监听事件满足
int event_base_dispatch(struct event_base* base);
// base:event_base_new函数的返回值
// 返回值:成功返回0,失败返回-1
/*
注意:只有event_new()中,EV_PERSIST才能持续触发,否则只触发一次就跳出循环
通常的设置为:EV_READ | EV_PERSIST、EV_WRITE | EV_PERSIST
*/
// 指定时间后,停止循环
int event_base_loopexit(struct event_base* base, const struct timeval* tv);
// 立即停止循环
int event_base_loopbreak(struct event_base* base);
// 5. 释放`event_base`
void event_base_free(struct event_base* base);
常规事件:
/* 创建一个事件 */
// what参数:
// EV_READ:一次读事件
// EV_WRITE:一次写事件
// EV_READ | EV_PERSIST(持续触发读)、EV_WRITE | EV_PERSIST(持续触发写),只有与event_base_dispatch()结合使用才能发挥作用
// cb回调函数:typedef void(*event_callback_fn)(evutil_socket_t fd, short, void*)
struct event event_new(struct event_base* base, evutil_socket_t fd, short what, event_callback_fn cb, void* arg);
/* 将事件添加到base上 */
int event_add(struct event* ev, const struct timeval* tv);
/* 将事件从base上拿下来 */
int event_del(struct event* ev);
/* 释放事件 */
int event_free(struct event* ev);
使用fifo进行读写实现:
read.c
#include
#include
#include
#include
#include
#include
#include
#define BUFSIZE 1024
void sys_error(const char* str)
{
perror(str);
exit(1);
}
void read_cb(evutil_socket_t fd, short what, void* arg)
{
char buf[BUFSIZE];
int ret = read(fd, buf, sizeof(buf));
printf("%s : %s\n", what & EV_READ ? "read satisfy" : "read non-satisfy", buf);
sleep(1);
}
int main(void)
{
unlink("testfifo");
mkfifo("testfifo", 0644);
// 打开fifo命名管道的读端
int fd = open("testfifo", O_RDONLY | O_NONBLOCK);
if (fd == -1)
{
sys_error("open error");
}
// 创建event_base
struct event_base* base = event_base_new();
// 创建事件,会被持续触发写
struct event* ev = event_new(base, fd, EV_READ | EV_PERSIST, read_cb, NULL);
// 添加事件ev到base上
event_add(ev, NULL);
// 循环、检测、分发事件
event_base_dispatch(base);
// 释放资源
event_free(ev);
event_base_free(base);
close(fd);
return 0;
}
write.c
#include
#include
#include
#include
#include
#include
#include
#define BUFSIZE 1024
void sys_error(const char* str)
{
perror(str);
exit(1);
}
void write_cb(evutil_socket_t fd, short what, void* arg)
{
char buf[BUFSIZE];
static int count = 0;
sprintf(buf, "hello world %d\n", count++);
printf("%s : %s\n", what & EV_WRITE ? "write satisfy" : "write non-satisfy", buf);
write(fd, buf, strlen(buf));
sleep(1);
}
int main(void)
{
// 打开fifo命名管道的读端
int fd = open("testfifo", O_WRONLY | O_NONBLOCK);
if (fd == -1)
{
sys_error("open error");
}
// 创建event_base
struct event_base* base = event_base_new();
// 创建事件,会被持续触发读
struct event* ev = event_new(base, fd, EV_WRITE | EV_PERSIST, write_cb, NULL);
// 添加事件ev到base上
event_add(ev, NULL);
// 循环、检测、分发事件
event_base_dispatch(base);
// 释放资源
event_free(ev);
event_base_free(base);
close(fd);
return 0;
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/40be258301fe47289fe85b710993f38e.png#pic_center)
未决和非未决:
未决态:有资格但还没有被处理
非未决态:没有资格被处理
![在这里插入图片描述](https://img-blog.csdnimg.cn/2b199450ca2845069f3285c551113456.png#pic_center)
带缓冲区的事件bufferevent:
常用的函数:
#include
#include
// 原理:bufferevent中有两个缓冲区(就基于队列实现,即读走就不再有数据、“先进先出”)
// 1)读缓冲区:有数据 --> 读回调函数被调用 --> 使用bufferevent_read() --> 读数据
// 2)写缓冲区:使用bufferevent_write() --> 向写缓冲中写数据 --> 该缓冲区有数据自动写出 --> 写完,回调函数被调用
/* 创建bufferevent */
// fd参数:与bufferevent绑定的文件描述符,类比event_new()
// options参数:BEV_OPT_CLOSE_ON_FREE(释放bufferevent时,关闭底层传输出端口(套接字))
struct bufferevent* bufferevent_socket_new(struct event_base* base, evutil_socket_t fd, enum bufferevent_options options);
/* 释放bufferevent */
void bufferevent_free(struct bufferevent* bev);
/* 给读写缓冲区设置回调 */
// readcb:设置bufferevent读缓冲,对应的回调(自己封装,在其内部读数据)
// writecb:设置bufferevent写缓冲,对应的回调,可为NULL
// eventcb:可传NULL
// cbarg:回调函数的参数
void bufferevent_setcb(struct bufferevent* bufev, bufferevent_data_cb readcb, bufferevent_data_cb writecb, , bufferevent_event_cb eventcb, void* cbarg);
// 1、readcb/writecb对应的回调函数
typedef void(*bufferevent_data_cb)(struct bufferevent* bev, void* ctx);
size_t bufferevent_read(struct bufferevent* bufev, void* data, size_t size); // 用来代替read
size_t bufferevent_write(struct bufferevent* bufev, const void* data, size_t size); // 用来代替write
// 2、eventcb对应的回调函数
typedef void(*bufferevent_event_cb)(struct bufferevent* bev, short events, void* ctx);
// events:不同标志位,代表不同的事件
// BEV_EVENT_READING:读取操作时发生某事件
// BEV_EVENT_WRITING:写入操作时发生某事件
// BEV_EVENT_ERROR:操作时发生错误,关于错误更多信息,EVUTIL_SOCKET_ERROR()
// BEV_EVENT_TIMEOUT:发生超时
// BEV_EVENT_EOF:遇到文件结束指示
// BEV_EVENT_CONNECTED:请求连接过程已经完成,实现客户端时可用
/* 禁用、启用缓冲区 */
// 默认:新建的bufferevent,写缓冲是enable,读缓冲是disable的
// events取值:EV_READ、EV_WRITE、EV_READ | EV_WRITE
void bufferevent_disable(struct bufferevent* bufev, short events); // 禁用缓冲区
void bufferevent_enable(struct bufferevent* bufev, short events); // 启用缓冲区
short bufferevent_get_enabled(struct bufferevent* bufev); // 获取缓冲区的禁用状态,需要借助&获取
/* 创建连接客户端 */
int bufevent_socket_connect(struct bufferevent* bev, struct sockaddr* address, int addrlen); // 替代系统调用socket()、connect()
/* 创建监听服务器 */
// cb监听回调函数:接受连接之后,执行用户要做的操作
// ptr回调函数的参数
// flags标志位:LEV_OPT_CLOSE_ON_FREE(释放bufferevent时关闭底层传输端口,即会释放底层套接字、释放底层bufferevent等)、LEV_OPT_REUSEABLE(端口可复用)、LEV_OPT_CLOSE_ON_FREE | LEV_OPT_REUSEABLE
// backlog(-1表示默认最大值128) -- listen的第2个参数,即“全连接队列的长度”
// sa和port(服务端自己的地址结构和大小) -- bind()的参数
struct evconnlistener* evconnlistener_new_bind(struct event_base* base, evconnlistener_cb cb, void* ptr, unsigned flags, int backlog, const struct sockaddr* sa, int socklen); // 替代系统调用socket()、bind()、listen()、accept()
// 返回值:返回成功创建的listener
// 回调函数类型evconnlistener_cb:
// listener:evconnlistener_new_bind函数的返回值
// fd:用于通信的文件描述符
// addr、len:客户端的地址结构和addr的大小
// ptr:外部ptr传递进来的值
typedef void(*evconnlistener_cb)(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* addr, int len, void* ptr);
/* 释放监听服务器 */
void evconnlistener_free(struct evconnlistener* lev);
基于libevent的TCP通信实现:
server.c
#include
#include
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
#define BUFSIZE 1024
// 读缓冲回调函数
void read_cb(struct bufferevent* bev, void* arg)
{
// 读缓冲区中的数据
char buf[BUFSIZE];
bufferevent_read(bev, buf, sizeof(buf));
printf("from client data : %s\n", buf);
char* result = "I'm server, I have successfully received your datas.\n";
// 写数据给客户端
bufferevent_write(bev, result, strlen(result));
sleep(1);
}
// 写缓冲回调函数(bufferevent_write将数据写缓冲区后(客户端会去读),调用该回调函数)
void write_cb(struct bufferevent* bev, void* arg)
{
printf("results has been successfully sent to client\n");
}
// 事件回调函数
void event_cb(struct bufferevent* bev, short events, void* arg)
{
if (events & BEV_EVENT_EOF)
{
printf("connection closed\n");
}
if (events & BEV_EVENT_TIMEOUT)
{
printf("connection timeout\n");
}
else if (events & BEV_EVENT_ERROR)
{
printf("occur others' error\n");
}
bufferevent_free(bev);
printf("bufferevent free\n");
}
// 监听回调函数
void cb_listener(struct evconnlistener* listener, evutil_socket_t fd, struct sockaddr* addr, int len, void* ptr)
{
printf("connect new client\n");
struct event_base* base = (struct event_base*)ptr;
// 创建bufferevent对象
struct bufferevent* bev = bufferevent_socket_new(base, fd, BEV_OPT_CLOSE_ON_FREE);
// 给bufferevent缓冲区设置回调函数
bufferevent_setcb(bev, read_cb, write_cb, event_cb, NULL);
// 启用bufferevent的读缓冲(默认为disable)
bufferevent_enable(bev, EV_READ);
}
int main(void)
{
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
server_addr.sin_addr.s_addr = htonl(INADDR_ANY);
// 创建event_base
struct event_base* base = event_base_new();
// 创建监听服务器(创建套接字、绑定、接收连接情趣):实现了这四个函数的功能,socket()创建套接字、bind()绑定、listen()监听、accept()接收客户端的连接请求
struct evconnlistener* listener = evconnlistener_new_bind(base, cb_listener, base, LEV_OPT_REUSEABLE | LEV_OPT_CLOSE_ON_FREE, 10, (struct sockaddr*)&server_addr, sizeof(server_addr));
// 循环、监听、分发:
event_base_dispatch(base);
evconnlistener_free(listener);
event_base_free(base);
return 0;
}
client.c
#include
#include
#include
#include
#include
#include
#include
#define SERVER_PORT 8000
#define SERVER_IP "127.0.0.1"
#define BUFSIZE 1024
// 读缓冲回调函数
void read_cb(struct bufferevent* bev, void* arg)
{
// 读缓冲区中的数据
char buf[BUFSIZE] = {0};
bufferevent_read(bev, buf, sizeof(buf));
printf("from server data : %s\n", buf);
char* result = "I'm client, your results has been successfully received\n";
// 写数据给服务端
bufferevent_write(bev, result, strlen(result));
sleep(1);
}
// 写缓冲回调函数(bufferevent_write将数据写缓冲区后(客户端会去读),调用该回调函数)
void write_cb(struct bufferevent* bev, void* arg)
{
printf("results has been successfully sent to server\n");
}
// 事件回调函数:用来处理连接成功、错误事件
void event_cb(struct bufferevent* bev, short events, void* arg)
{
if (events & BEV_EVENT_EOF)
{
printf("connection closed\n");
}
else if (events & BEV_EVENT_ERROR)
{
printf("occur others' error\n");
}
else if (events & BEV_EVENT_CONNECTED)
{
printf("connect successfully\n");
return;
}
bufferevent_free(bev);
printf("bufferevent free\n");
}
void read_terminal(evutil_socket_t fd, short what, void* arg)
{
char buf[BUFSIZE] = {0};
int ret = read(fd, buf, sizeof(buf));
// 将从客户端fd中,读到的数据写入给bufferevent的write缓冲区,发送给服务端
struct bufferevent* bev = (struct bufferevent*)arg;
bufferevent_write(bev, buf, ret);
}
int main(void)
{
struct event_base* base = event_base_new();
int client_sockfd = socket(AF_INET, SOCK_STREAM, 0);
// 通信fd放在bufferevent中
struct bufferevent* bev = bufferevent_socket_new(base, client_sockfd, BEV_OPT_CLOSE_ON_FREE);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(SERVER_PORT);
inet_pton(AF_INET, SERVER_IP, &server_addr.sin_addr.s_addr);
// 连接服务器
bufferevent_socket_connect(bev, (struct sockaddr*)&server_addr, sizeof(server_addr));
// 设置回调函数
bufferevent_setcb(bev, read_cb, write_cb, event_cb, NULL);
/* 注意:该函数对程序执行的影响!!! */
// 启用bufferevent的读缓冲(默认为disable)(注销该语句,则read_cb()函数不能被调用)
//bufferevent_enable(bev, EV_READ); // 添加该语句,会导致bufferevent读缓冲中一旦有数据,则会一直触发读回调函数
// 创建事件:用来获取“终端的读事件”
struct event* ev = event_new(base, STDIN_FILENO, EV_READ | EV_PERSIST, read_terminal, bev);
// 添加事件
event_add(ev, NULL); // 不设置超时时间
// 循环、检测、分发:
event_base_dispatch(base);
event_free(ev);
event_base_free(base);
return 0;
}
![在这里插入图片描述](https://img-blog.csdnimg.cn/495c3828c4844e45b8a2b0a152d0ed6e.png#pic_center)
|