linux poll的使用及其原理实现

您所在的位置:网站首页 in标志 linux poll的使用及其原理实现

linux poll的使用及其原理实现

2023-11-15 08:59| 来源: 网络整理| 查看: 265

1 poll的使用

相对于select来说,poll 也是在指定时间内论询一定数量的文件描述符,来测试其中是否有就绪的,不过,poll 提供了一个易用的方法,来实现 i/o 复用。

声明如下:

#include int poll(struct pollfd *fds, nfds_t nfds, int timeout);

其中,struct pollfd 定义为:

struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ };

fd 为文件描述符,events 告诉poll 监听fd 上哪些事件,它是一系列事件按位或。revents 由内核修改,来通知应用程序fd 上实际上发生了哪些事件。

nfds 为监听事件集合fds的大小

timeout 为poll的超时时间,单位毫秒。timeout 为-1时,poll永远阻塞,直到有事件发生。timeout为0时,poll立即返回。

下面举一个例子来看一下具体如何使用poll来监听文件的状态

#include #include #include #include #include #include #include #include #include #include #include #include using namespace std; #define BUFSIZE 10 #define CLIENTSIZE 100 int createSocket() { struct sockaddr_in servaddr; int listenfd = -1; if (-1 == (listenfd = socket(PF_INET, SOCK_STREAM, 0))) { fprintf(stderr, "socket: %d, %s\n", errno, strerror(errno)); exit(1); } int reuseaddr = 1; if (-1 == setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &reuseaddr, sizeof(reuseaddr))) { fprintf(stderr, "setsockopt: %d, %s\n", errno, strerror(errno)); exit(1); } memset(&servaddr, 0, sizeof(servaddr)); servaddr.sin_family = PF_INET; servaddr.sin_port = htons(8008); inet_pton(PF_INET, "0.0.0.0", &servaddr.sin_addr); if (-1 == bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr))) { fprintf(stderr, "bind: %d, %s\n", errno, strerror(errno)); exit(1); } if (-1 == listen(listenfd, 5)) { fprintf(stderr, "listen: %d, %s\n", errno, strerror(errno)); exit(1); } return listenfd; } int setnoblock(int fd) { int oldopt = fcntl(fd, F_GETFL); int newopt = oldopt | O_NONBLOCK; fcntl(fd, F_SETFL, newopt); return oldopt; } int main() { struct pollfd fds[CLIENTSIZE]; int listenfd = createSocket(); map mapdata; fds[0].fd = listenfd; fds[0].events = POLLIN | POLLERR;//监听 in和err事件 fds[0].revents = 0; int conncount = 0; while (1) { int ret = poll(fds, conncount + 1, -1); if (ret < 0) { fprintf(stderr, "poll: %d, %s\n", errno, strerror(errno)); exit(1); } for (int i = 0; i < conncount + 1; i++) { // 客户端关闭,或者错误。错误的原因是由于客户端关闭导致的,这里一并处理 if ((fds[i].revents & POLLRDHUP) || (fds[i].revents & POLLERR)) { int fd = fds[i].fd; fds[i] = fds[conncount]; i--; conncount--; mapdata.erase(fd); close(fd); printf("delete connection: %d\n", fd); } // 新的连接 else if ((fds[i].fd == listenfd) && (fds[i].revents & POLLIN)) { struct sockaddr_in client; socklen_t lenaddr = sizeof(client); int conn = -1; if (-1 == (conn = accept(listenfd, (struct sockaddr*)&client, &lenaddr))) { fprintf(stderr, "accept: %d, %s\n", errno, strerror(errno)); exit(1); } printf("get connection %d from %s:%d\n", conn, inet_ntoa(client.sin_addr), client.sin_port); conncount++; setnoblock(conn); fds[conncount].fd = conn; fds[conncount].events = POLLIN | POLLRDHUP | POLLERR; fds[conncount].revents = 0; } // 有可读数据 else if (fds[i].revents & POLLIN) { char buf[BUFSIZE] = {0}; int lenrecv = recv(fds[i].fd, buf, BUFSIZE-1, 0); if (lenrecv > 0) { mapdata[fds[i].fd] = buf; fds[i].events &= (~POLLIN); fds[i].events |= POLLOUT; } else if (lenrecv == 0) { printf("------- client %d exit (not print) --------\n", fds[i].fd); } else { fprintf(stderr, "recv: %d, %s\n", errno, strerror(errno)); exit(1); } } // 可写数据 else if (fds[i].revents & POLLOUT) { if (send(fds[i].fd, mapdata[fds[i].fd].c_str(), mapdata[fds[i].fd].size(), 0) < 0) { if (ECONNRESET == errno) { continue; } fprintf(stderr, "send: %d, %s\n", errno, strerror(errno)); exit(1); } fds[i].events &= (~POLLOUT); fds[i].events |= POLLIN; } } } }

可以看到设置了监听事件以后,当进程返回的时候,可以从返回的revents中读取就绪的事件。

上面内容参考自:

https://www.cnblogs.com/zuofaqi/p/9631601.html

2 poll的原理实现

poll的实现依赖于内核的等待队列。等待队列的原理可以参考这篇文章:

https://blog.csdn.net/oqqYuJi12345678/article/details/106304644

应用层调用poll函数的时候,内核为:

// poll 使用的结构体 struct pollfd { int fd; // 描述符 short events; // 关注的事件掩码 short revents; // 返回的事件掩码 }; // long sys_poll(struct pollfd *ufds, unsigned int nfds, long timeout_msecs) SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds, long, timeout_msecs) { struct timespec end_time, *to = NULL; int ret; if (timeout_msecs >= 0) { to = &end_time; // 将相对超时时间msec 转化为绝对时间 poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC, NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC)); } // do sys poll ret = do_sys_poll(ufds, nfds, to); // do_sys_poll 被信号中断, 重新调用, 对使用者来说 poll 是不会被信号中断的. if (ret == -EINTR) { struct restart_block *restart_block; restart_block = ¤t_thread_info()->restart_block; restart_block->fn = do_restart_poll; // 设置重启的函数 restart_block->poll.ufds = ufds; restart_block->poll.nfds = nfds; if (timeout_msecs >= 0) { restart_block->poll.tv_sec = end_time.tv_sec; restart_block->poll.tv_nsec = end_time.tv_nsec; restart_block->poll.has_timeout = 1; } else { restart_block->poll.has_timeout = 0; } // ERESTART_RESTARTBLOCK 不会返回给用户进程, // 而是会被系统捕获, 然后调用 do_restart_poll, ret = -ERESTART_RESTARTBLOCK; } return ret; } int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds, struct timespec *end_time) { struct poll_wqueues table; int err = -EFAULT, fdcount, len, size; /* 首先使用栈上的空间,节约内存,加速访问 */ long stack_pps[POLL_STACK_ALLOC/sizeof(long)]; struct poll_list *const head = (struct poll_list *)stack_pps; struct poll_list *walk = head; unsigned long todo = nfds; if (nfds > rlimit(RLIMIT_NOFILE)) { // 文件描述符数量超过当前进程限制 return -EINVAL; } // 复制用户空间数据到内核 len = min_t(unsigned int, nfds, N_STACK_PPS); for (;;) { walk->next = NULL; walk->len = len; if (!len) { break; } ...................................................(1) // 复制到当前的 entries if (copy_from_user(walk->entries, ufds + nfds-todo, sizeof(struct pollfd) * walk->len)) { goto out_fds; } todo -= walk->len; if (!todo) { break; } // 栈上空间不足,在堆上申请剩余部分 len = min(todo, POLLFD_PER_PAGE); size = sizeof(struct poll_list) + sizeof(struct pollfd) * len; walk = walk->next = kmalloc(size, GFP_KERNEL); if (!walk) { err = -ENOMEM; goto out_fds; } } // 初始化 poll_wqueues 结构, 设置函数指针_qproc 为__pollwait ..................................................................(2) poll_initwait(&table); // poll .....................................................................(3) fdcount = do_poll(nfds, head, &table, end_time); // 从文件wait queue 中移除对应的节点, 释放entry. poll_freewait(&table); // 复制结果到用户空间 for (walk = head; walk; walk = walk->next) { struct pollfd *fds = walk->entries; int j; for (j = 0; j < len; j++, ufds++) .......................................................................(4) if (__put_user(fds[j].revents, &ufds->revents)) { goto out_fds; } } err = fdcount; out_fds: // 释放申请的内存 walk = head->next; while (walk) { struct poll_list *pos = walk; walk = walk->next; kfree(pos); } return err; }

(1)把用户空间的pollfd 赋值到内核空间,用户 空间的pollfd包含有设置的需要监听的fd和events

(2)初始化 poll的等待队列

void poll_initwait(struct poll_wqueues *pwq) { init_poll_funcptr(&pwq->pt, __pollwait); pwq->polling_task = current; pwq->triggered = 0; pwq->error = 0; pwq->table = NULL; pwq->inline_index = 0; }

__pollwait函数在调用监控文件的poll函数的时候会被回掉到,用来向驱动函数的等待队列头插入该等待队列。

(3)do_poll的详细实现,遍历每个文件描述符实现的poll函数,检查是否该文件有数据可读,如果有数据可读,就返回,否则就把自己插入到驱动的等待队列头中睡眠,直到有数据来杯唤醒

(4)把各个文件描述符poll调用得到的返回事件复制到用户空间,用户空间据此返回值可以来判断文件的状态。

下面接着看do_poll的详细实现:

// 真正的处理函数 static int do_poll(unsigned int nfds, struct poll_list *list, struct poll_wqueues *wait, struct timespec *end_time) { poll_table* pt = &wait->pt; ktime_t expire, *to = NULL; int timed_out = 0, count = 0; unsigned long slack = 0; if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { // 已经超时,直接遍历所有文件描述符, 然后返回 pt = NULL; timed_out = 1; } if (end_time && !timed_out) { // 估计进程等待时间,纳秒 slack = select_estimate_accuracy(end_time); } // 遍历文件,为每个文件的等待队列添加唤醒函数(pollwake) for (;;) { struct poll_list *walk; for (walk = list; walk != NULL; walk = walk->next) { struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { // do_pollfd 会向文件对应的wait queue 中添加节点 // 和回调函数(如果 pt 不为空) // 并检查当前文件状态并设置返回的掩码 -----------------------------------------------------------------(1) if (do_pollfd(pfd, pt)) { // 该文件已经准备好了. // 不需要向后面文件的wait queue 中添加唤醒函数了. count++; pt = NULL; } } } // 下次循环的时候不需要向文件的wait queue 中添加节点, // 因为前面的循环已经把该添加的都添加了 pt = NULL; // 第一次遍历没有发现ready的文件 if (!count) { count = wait->error; // 有信号产生 if (signal_pending(current)) { count = -EINTR; } } -----------------------------------------------------------------------(2) // 有ready的文件或已经超时 if (count || timed_out) { break; } // 转换为内核时间 if (end_time && !to) { expire = timespec_to_ktime(*end_time); to = &expire; } // 等待事件就绪, 如果有事件发生或超时,就再循 // 环一遍,取得事件状态掩码并计数, // 注意此次循环中, 文件 wait queue 中的节点依然存在 ---------------------------------------------------------------------------(3) if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) { timed_out = 1; } } return count; }

(1)调用文件描述符提供的poll方法,可以看到,如果返回的mask不为0,则表示有数据变化,该mask 值作为do_pollfd的返回值不为0,则count++

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait) { unsigned int mask; int fd; mask = 0; fd = pollfd->fd; if (fd >= 0) { struct fd f = fdget(fd); mask = POLLNVAL; if (f.file) { mask = DEFAULT_POLLMASK; if (f.file->f_op && f.file->f_op->poll) { pwait->_key = pollfd->events|POLLERR|POLLHUP; mask = f.file->f_op->poll(f.file, pwait); } /* Mask out unneeded events. */ mask &= pollfd->events | POLLERR | POLLHUP; fdput(f); } } pollfd->revents = mask; return mask; }

(2)如果count不为0,或者超时,则poll不会睡眠,直接返回

(3)如果 超时,或者没有数据变化,则睡眠,由于外围函数是个for的死循环,所以醒来以后又会遍历所有的描述符事件,来获取数据以及决定是否跳出poll方法

从上面的实现可以看出,如果应用层要实现poll这样的机制,是需要驱动层来提供自己的poll函数的。下面看一下,驱动层如何实现一个简单的poll方法

3 驱动的poll实现 #include #include #include #include #include #include #include #include #include #include #include #include #include "memdev.h" static mem_major = MEMDEV_MAJOR; bool have_data = false; /*表明设备有足够数据可供读*/ module_param(mem_major, int, S_IRUGO); struct mem_dev *mem_devp; /*设备结构体指针*/ struct cdev cdev; /*文件打开函数*/ int mem_open(struct inode *inode, struct file *filp) { struct mem_dev *dev; /*获取次设备号*/ int num = MINOR(inode->i_rdev); if (num >= MEMDEV_NR_DEVS) return -ENODEV; dev = &mem_devp[num]; /*将设备描述结构指针赋值给文件私有数据指针*/ filp->private_data = dev; return 0; } /*文件释放函数*/ int mem_release(struct inode *inode, struct file *filp) { return 0; } /*读函数*/ static ssize_t mem_read(struct file *filp, char __user *buf, size_t size, loff_t *ppos) { unsigned long p = *ppos; unsigned int count = size; int ret = 0; struct mem_dev *dev = filp->private_data; /*获得设备结构体指针*/ /*判断读位置是否有效*/ if (p >= MEMDEV_SIZE) return 0; if (count > MEMDEV_SIZE - p) count = MEMDEV_SIZE - p; while (!have_data) /* 没有数据可读,考虑为什么不用if,而用while */ { if (filp->f_flags & O_NONBLOCK) return -EAGAIN; wait_event_interruptible(dev->inq,have_data); } /*读数据到用户空间*/ if (copy_to_user(buf, (void*)(dev->data + p), count)) { ret = - EFAULT; } else { *ppos += count; ret = count; printk(KERN_INFO "read %d bytes(s) from %d\n", count, p); } have_data = false; /* 表明不再有数据可读 */ /* 唤醒写进程 */ return ret; } /*写函数*/ static ssize_t mem_write(struct file *filp, const char __user *buf, size_t size, loff_t *ppos) { unsigned long p = *ppos; unsigned int count = size; int ret = 0; struct mem_dev *dev = filp->private_data; /*获得设备结构体指针*/ /*分析和获取有效的写长度*/ if (p >= MEMDEV_SIZE) return 0; if (count > MEMDEV_SIZE - p) count = MEMDEV_SIZE - p; /*从用户空间写入数据*/ if (copy_from_user(dev->data + p, buf, count)) ret = - EFAULT; else { *ppos += count; ret = count; printk(KERN_INFO "written %d bytes(s) from %d\n", count, p); } have_data = true; /* 有新的数据可读 */ /* 唤醒读进程 */ wake_up(&(dev->inq)); return ret; } /* seek文件定位函数 */ static loff_t mem_llseek(struct file *filp, loff_t offset, int whence) { loff_t newpos; switch(whence) { case 0: /* SEEK_SET */ newpos = offset; break; case 1: /* SEEK_CUR */ newpos = filp->f_pos + offset; break; case 2: /* SEEK_END */ newpos = MEMDEV_SIZE -1 + offset; break; default: /* can't happen */ return -EINVAL; } if ((newposMEMDEV_SIZE)) return -EINVAL; filp->f_pos = newpos; return newpos; } unsigned int mem_poll(struct file *filp, poll_table *wait) { struct mem_dev *dev = filp->private_data; unsigned int mask = 0; /*将等待队列添加到poll_table */ poll_wait(filp, &dev->inq, wait); if (have_data) mask |= POLLIN | POLLRDNORM; /* readable */ return mask; } /*文件操作结构体*/ static const struct file_operations mem_fops = { .owner = THIS_MODULE, .llseek = mem_llseek, .read = mem_read, .write = mem_write, .open = mem_open, .release = mem_release, .poll = mem_poll, }; /*设备驱动模块加载函数*/ static int memdev_init(void) { int result; int i; dev_t devno = MKDEV(mem_major, 0); /* 静态申请设备号*/ if (mem_major) result = register_chrdev_region(devno, 2, "memdev"); else /* 动态分配设备号 */ { result = alloc_chrdev_region(&devno, 0, 2, "memdev"); mem_major = MAJOR(devno); } if (result < 0) return result; /*初始化cdev结构*/ cdev_init(&cdev, &mem_fops); cdev.owner = THIS_MODULE; cdev.ops = &mem_fops; /* 注册字符设备 */ cdev_add(&cdev, MKDEV(mem_major, 0), MEMDEV_NR_DEVS); /* 为设备描述结构分配内存*/ mem_devp = kmalloc(MEMDEV_NR_DEVS * sizeof(struct mem_dev), GFP_KERNEL); if (!mem_devp) /*申请失败*/ { result = - ENOMEM; goto fail_malloc; } memset(mem_devp, 0, sizeof(struct mem_dev)); /*为设备分配内存*/ for (i=0; i < MEMDEV_NR_DEVS; i++) { mem_devp[i].size = MEMDEV_SIZE; mem_devp[i].data = kmalloc(MEMDEV_SIZE, GFP_KERNEL); memset(mem_devp[i].data, 0, MEMDEV_SIZE); /*初始化等待队列*/ init_waitqueue_head(&(mem_devp[i].inq)); //init_waitqueue_head(&(mem_devp[i].outq)); } return 0; fail_malloc: unregister_chrdev_region(devno, 1); return result; } /*模块卸载函数*/ static void memdev_exit(void) { cdev_del(&cdev); /*注销设备*/ kfree(mem_devp); /*释放设备结构体内存*/ unregister_chrdev_region(MKDEV(mem_major, 0), 2); /*释放设备号*/ } MODULE_AUTHOR("David Xie"); MODULE_LICENSE("GPL"); module_init(memdev_init); module_exit(memdev_exit);

其中poll_wait是调用poll_table初始化时赋值的__pollwait函数,该函数为__pollwait,把当前进程插入到该驱动提供的等待队列头中,最后数据到达时,一般会由该驱动来唤醒等待队列头中的进程

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); }

 



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3