从源码角度来领略一下内核的 epoll 轮询机制。

1
2
3
kernel/fs/eventpoll.c
kernel/include/linux/poll.h
kernel/include/uapi/linux/eventpoll.h

一、概述

在 Linux 2.6.8 之前还没有 epoll 机制,select 和 poll 作为 I/O 多路复用的机制实现并发程序,但这两种方式有着如下缺点:

  • 通过 select 方式单个进程能够监控的 fd 不得超过进程可打开的文件个数上限,默认为 1024,即便强行修改了这个上限,还会遇到性能问题。
  • select 轮询效率 O(n) 随着监控个数的增加而性能变差。
  • select 从内核空间返回到用户空间的是整个 fd 数组,应用程序还需要额外再遍历整个数组才知道哪些 fd 触发了相应事件。

本文主要介绍 epoll 机制,有不少人可能都知道相比 select/poll 之下,epoll 有着明显优势,这些优势的底层实现原理又是什么呢?

epoll 实现原理的流程图见 这一小节

epoll 函数

用户态主要的 epoll 函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int epoll_create(int size);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);
int epoll_wait(int epfd, struct epoll_event* events, int maxevents, int timeout);

struct epoll_event {
__uint32_t events;
epoll_data_t data;
};

union epoll_data {
void* ptr;
int fd;
uint32_t u32;
uint64_t u64;
};

typedef union epoll_data epoll_data_t;

接下来从源码角度剖析这 3 个方法。

二、epoll_create

用户态进程在调用 epoll_create 创建 epoll 句柄 epfd 时,会陷入内核,系统调用 SYSCALL_DEFINE1(epoll_create, int, size) 创建一个 file 实例,同时在内核内部分配 eventpoll 资源,并将其指向 file 实例的 void* private_data 成员下。后续就可以通过 epfd 找到 file,进而找到内核维护的 eventpoll 资源。

2.1 sys_epoll_create

1
2
3
4
5
SYSCALL_DEFINE1(epoll_create, int, size) {
if (size <= 0)
return -EINVAL;
return sys_epoll_create1(0);
}

size 仅仅用来检测是否大于 0,并没有真正使用。sys_epoll_create1 过程检查参数,然后再调用 epoll_create1。

2.2 sys_epoll_create1

epoll_create1 的过程主要是创建并初始化 eventpoll 资源,创建 file 实例,并将 ep 放入 file->private 成员下、file 放在 ep->file 成员下,以及将 fd 与 file 关联起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
SYSCALL_DEFINE1(epoll_create1, int, flags) {
int error, fd;
struct eventpoll* ep = NULL;
struct file* file;

// 创建内部数据结构 eventpoll 【小节 2.3】
error = ep_alloc(&ep); // ⭐

// 获取一个未使用的 fd
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

// 创建一个匿名 inode 文件,使用 eventpoll_fops 文件操作结构,allocl_file() 创建一个 file,
// 并将 ep 传递给 file,即 file->private_data = ep ⭐
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC));

ep->file = file;
fd_install(fd, file); // 建立 fd 和 file 的关联关系
return fd; // fd 即 epfd

out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
}

2.3 ep_alloc

分配 eventpoll 资源、初始化相关成员,并将分配的 eventpoll 资源地址传给入参的二级指针(这样在函数外部,二级指针的值就是刚才分配的资源的地址)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
static int ep_alloc(struct eventpoll** pep) {
int error;
struct user_struct* user;
struct eventpoll* ep; // 【小节 2.4.1】

user = get_current_user();
error = -ENOMEM;
ep = kzalloc(sizeof(*ep), GFP_KERNEL);
if (unlikely(!ep))
goto free_uid;

// 相关成员初始化
spin_lock_init(&ep->lock);
mutex_init(&ep->mtx);
init_waitqueue_head(&ep->wq); // 阻塞进程的等待队列
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist); // 就绪事件列表
ep->rbr = RB_ROOT;
ep->ovflist = EP_UNACTIVE_PTR;
ep->user = user;
*pep = ep;
return 0;

free_uid:
free_uid(user);
return error;
}

2.4 相关结构体

为了方便后续源码的阅读,这里列举前后文所涉及到的核心 struct。

eventpoll 相关结构组织关系

2.4.1 struct eventpoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/*
* This structure is stored inside the "private_data" member of the file ⭐
* structure and represents the main data structure for the eventpoll
* interface.
*/
struct eventpoll {
/* Protect the access to this structure */
spinlock_t lock;

/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;

/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq; // <- 将即将阻塞的进程加入到这个等待队列里 ⭐
// <- 后续 fd 事件触发时,会唤醒这个列表

/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;

/* List of ready file descriptors */
struct list_head rdllist; // <- 就绪事件的 epi->rdllist 会链接到这里哦 ⭐
// (epi 中的 ffd 和 event 存储着用户关心的 fd 和 event)
// <- 通过列表判空,高效判断是否有就绪事件

/* RB tree root used to store monitored fd structs */
struct rb_root rbr; // <- 红黑树的根节点,树的节点是 epitem 数据 ⭐
// <- 用红黑树来管理所有用户关心(被监听的)的 fd
// <- 节点排序规则:file 地址和 fd 为第一二优先级
/*
static inline int ep_cmp_ffd(struct epoll_filefd *p1,
struct epoll_filefd *p2) {
return (p1->file > p2->file ? +1 :
(p1->file < p2->file ? -1 : p1->fd - p2->fd));}
*/

/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem* ovflist; // <- ep->ovflist = epi in the func "ep_poll_callback"

/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source* ws;

/* The user that created the eventpoll descriptor */
struct user_struct* user;

struct file* file; // <- ep 对应的的文件地址 ⭐

/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
};

2.4.2 struct epitem

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/* <- epoll_ctl 的 op=EPOLL_CTL_ADD 时,会往红黑树中插入一个 epitem 节点
* Each file descriptor added to the eventpoll interface will
* have an entry of this type linked to the "rbr" RB tree.
*/
struct epitem {
/* RB tree node used to link this structure to the eventpoll RB tree */
struct rb_node rbn; // <- 放在首位,可以用于从红黑树中找到节点后,强转得到 epitem 结构

/* List header used to link this structure to the eventpoll ready list */
struct list_head rdllink; // <- 事件就绪时,将其链接到 ep->rdllist 列表下 ⭐

/*
* Works together "struct eventpoll"->ovflist in keeping the
* single linked chain of items.
*/
struct epitem* next; // <- epi->next = ep->ovflist in the func "ep_poll_callback"

/* The file descriptor information this item refers to */
struct epoll_filefd ffd; // <- 这个节点所属的 eventpoll 文件,ffd.fd 就是 epfd 吧 ⭐

/* Number of active wait queue attached to poll operations */
int nwait;

/* List containing poll wait queues */
struct list_head pwqlist;

/* The "container" of this item */
struct eventpoll* ep; // <- 这个节点所属的 eventpoll 的地址 ⭐

/* List header used to link this item to the "struct file" items list */
struct list_head fllink;

/* wakeup_source used when EPOLLWAKEUP is set */
struct wakeup_source __rcu* ws;

/* The structure that describe the interested events and the source fd */
struct epoll_event event; // <- 从用户态 epool_ctl 拷贝过来的用户关心的事件 ⭐
};

2.4.3 struct epoll_event

1
2
3
4
struct epoll_event {
__u32 events;
__u64 data;
} EPOLL_PACKED;

2.4.4 struct epoll_filefd

1
2
3
4
struct epoll_filefd {
struct file *file;
int fd;
} __packed;

2.4.5 struct ep_pqueue

epoll 优先队列,包含红黑树下对应的节点 epitem 和其对应的 poll 处理函数。

1
2
3
4
struct ep_pqueue {
poll_table pt;
struct epitem *epi;
};

2.4.6 struct poll_table

1
2
3
4
typedef struct poll_table_struct {
poll_queue_proc _qproc;
unsigned long _key;
} poll_table;

2.4.7 struct eppoll_entry

1
2
3
4
5
6
7
8
struct eppoll_entry {
struct list_head llink; // 指向 epi->pwqlist
struct epitem* base; // 指向 epitem 的地址
wait_queue_t wait; // 监视的目标 fd 的等待队列,指定了 fd 就绪时的回调函数 ⭐
wait_queue_head_t* whead; // 文件系统中,目标 fd 对应的等待队列头的地址 ⭐
// 后面会把 ->wait 挂到 ->whead 下,这样 fd 就绪唤醒 whead 队列时,
// 就可以通过 ->wait.func 唤醒回调函数来往 ep->rdllist 中挂就绪事件
};

三、epoll_ctl

3.1 sys_epoll_ctl

该系统调用主要从 epfd 对应的 file 中获取 eventepoll 数据 ep,然后通过 key=<file, fd> 在 ep->rbr 中查找节点。根据查找结果 epi 和入参 op 操作进行增加、删除或修改处理分支。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd, struct epoll_event __user*, event) {
int error;
int full_check = 0;
struct fd f, tf;
struct eventpoll* ep; // 【小节 2.4.1】
struct epitem* epi; // 【小节 2.4.2】
struct epoll_event epds; // 【小节 2.4.3】

error = -EFAULT;
// 如果操作携带事件(不是 EPOLL_CTL_DEL),则将用户空间的 epoll_event 拷贝到内核
if (ep_op_has_event(op) && copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto error_return;

f = fdget(epfd); // 通过 epfd 获取对应的 fd 结构
tf = fdget(fd); // 通过 fd 获取对应的 fd 结构

// 目标 fd 必须支持 poll,因为后面的代码在文件系统的 poll() 下挂着 ep 的 poll() 处理机制
if (!tf.file->f_op->poll)
goto error_tgt_fput;

if (ep_op_has_event(op)) // 检查是否允许 EPOLLWAKEUP
ep_take_care_of_epollwakeup(&epds);

// 从 epfd 对应的 file 中获取 eventepoll 数据(即 epoll_create 过程创建的 ep) ⭐
ep = f.file->private_data;

mutex_lock_nested(&ep->mtx, 0);
// ...
epi = ep_find(ep, tf.file, fd); // 通过 key=<file, fd> 在 ep 的红黑树中查找节点 ⭐
switch (op) {
case EPOLL_CTL_ADD: // 添加操作,要求条目不存在于 rb 树中
if (!epi) {
epds.events |= POLLERR | POLLHUP; // 发生错误 | 挂起
// 申请一个 epitem 节点并设置其 event,然后添加到 ep 下的红黑树中,见【小节 3.2】 ⭐
error = ep_insert(ep, &epds, tf.file, fd, full_check);
}
if (full_check) // 进行完整性检查
clear_tfile_check_list();
break;
case EPOLL_CTL_DEL: // 删除操作,要求条目存在于 rb 树中
if (epi)
error = ep_remove(ep, epi); // 见【小节 3.3】
break;
case EPOLL_CTL_MOD: // 修改操作,要求条目存在于 rb 树中
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds); // 见【小节 3.4】
}
break;
}
mutex_unlock(&ep->mtx);
fdput(tf); // 释放 epfd 的引用
fdput(f);
// ...
return error;
}

3.2 ep_insert

ep_insert 函数用于:

  1. 初始化 poll table,指定 ep 的回调函数,调用文件系统的 poll 以及挂上的 ep poll 回调处理,检查 fd 文件的状态(可读、可写、挂起…),处理 ep 就绪队列;
  2. 将新的监视项(其查找 key=<file, fd>)封装成一个 epitem 对象,插入到 eventepoll 实例 ep 的红黑树中;
  3. 若 fd 上有用户关心的事件,按需看看是否需要(能够)唤醒进程。

想想,为什么先执行 step1 再执行的 step2 呢?可以反过来吗?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
/* 
* ep: 内核私有 eventpoll 实例
* event: 从用户态 epoll_event 事件拷贝到内核态下的事件
* tfile: 目标文件(作为红黑树插入的第一优先查找 key)
* fd: 目标文件对应的 fd(作为红黑树插入的第二优先查找 key)
* full_check: 是否进行完整性检查
*/
static int ep_insert(struct eventpoll* ep, struct epoll_event* event, struct file* tfile, int fd, int full_check) {
int error, revents, pwake = 0;
unsigned long flags;
long user_watches; // 当前用户监视的数量
struct epitem* epi; // 新分配的监视项,要往红黑树中添加哦
struct ep_pqueue epq; // 【小节 2.4.5】

user_watches = atomic_long_read(&ep->user->epoll_watches);
if (unlikely(user_watches >= max_user_watches))
return -ENOSPC;

// 从内存池中分配新的监视项
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
return -ENOMEM;

// 构造并填充 epi 结构体
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep; // 所属的 eventpoll 实例
ep_set_ffd(&epi->ffd, tfile, fd); // 将 tfile 和 fd 赋值给 ffd 对应成员,ffd 结构体见【小节 2.4.4】 ⭐
epi->event = *event; // 记录从用户态拷贝到内核的事件 ⭐
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR; // 设置为无效值

// 如果包含 EPOLLWAKEUP,则创建一个唤醒源;否则初始化为 NULL
if (epi->event.events & EPOLLWAKEUP) {
error = ep_create_wakeup_source(epi);
} else {
RCU_INIT_POINTER(epi->ws, NULL);
}

epq.epi = epi;
// 初始化 poll table【小节 3.2.1 & 3.2.3】 ⭐
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

// 执行 file->poll 方法【小节 3.2.2】,返回满足条件的事件类型 ⭐
revents = ep_item_poll(epi, &epq.pt);

// 加锁,将当前的 epi 与目标 file 关联起来
spin_lock(&tfile->f_lock);
list_add_tail_rcu(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_lock);

// 将当前 epi 添加到 RB 树(比较的 key 已经存在 epi->ffd 里了哦)⭐
ep_rbtree_insert(ep, epi);

spin_lock_irqsave(&ep->lock, flags);
// 添加操作这会儿,如果就存在满足的就绪事件 并且 epi 的就绪队列无数据(链表为空)
// 那么,就链接节点到 ep->rdllist 列表中,并唤醒调用 epoll_wait() 的进程
if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist); // 添加节点到 ep->rdllist 列表中 ⭐
ep_pm_stay_awake(epi);

// 唤醒正在等待事件就绪的进程(即调用 epoll_wait 的进程)⭐
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++; // 唤醒计数
}

spin_unlock_irqrestore(&ep->lock, flags);
atomic_long_inc(&ep->user->epoll_watches);

if (pwake)
ep_poll_safewake(&ep->poll_wait); // 唤醒等待 eventpoll 文件就绪的进程
return 0;
}

3.2.1 init_poll_funcptr

1
2
3
4
static inline void init_poll_funcptr(poll_table* pt, poll_queue_proc qproc) {
pt->_qproc = qproc;
pt->_key = ~0UL; /* all events enabled */ // <- 一个初始化而已,后面用时还会重新赋值
}

在 ep_insert 中,将 ep_pqueue 结构的 pt 成员中的_qproc 设置为 ep_ptable_queue_proc 函数。

3.2.2 ep_item_poll

f_op->poll() 就是调用 文件系统的 poll 方法,不同驱动设备实现方法略有不同,但都会执行 poll_wait()。

1
2
3
4
5
static inline unsigned int ep_item_poll(struct epitem* epi, poll_table* pt) {
pt->_key = epi->event.events; // <- 重新赋值
// 调用文件系统的 poll 核心方法【小节 3.2.3】 ⭐
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}

poll_wait() 会调用 epq.pt.qproc 所对应的回调函数 ep_ptable_queue_proc,其主要工作是初始化等待队列项 pwq->wait 的唤醒函数为 ep_poll_callback(),并把 pwq->wait 挂到文件系统对应的等待队列头 whead 下,源码如下所示。由此,当目标 fd 的就绪事件到来时,就会调用 ep_poll_callback() 函数。

3.2.3 ep_ptable_queue_proc

该函数设置 pwq->wait 的成员 func 的唤醒回调函数为 ep_poll_callback,并将 pwq->wait 添加到 fd 的 whead 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/* 
* 用于处理文件的等待队列
* file: 这里是 epi->ffd.file
* whead: 【文件】的等待队列头
* pt: 轮询的回调处理函数表
*/
static void ep_ptable_queue_proc(struct file* file, wait_queue_head_t* whead, poll_table* pt) {
struct epitem* epi = ep_item_from_epqueue(pt); // 从 ep_pqueue 中获取 epi
struct eppoll_entry* pwq; // 【小节 2.4.7】

// 从内存池中分配一个新的 eppoll_entry 条目
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
// 初始化等待队列项的唤醒回调函数 ⭐
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback); // 【小节 3.2.4】
pwq->whead = whead; // 记录文件 fd 的等待队列头
pwq->base = epi;
// 将等待项 pwq->wait(内含 fd 就绪时的回调函数 ep_poll_callback)头插到 fd 的 whead 中 ⭐
add_wait_queue(whead, &pwq->wait);
// 将 pwq->llink 放入 epi->pwqlist 的尾部
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
} else {
epi->nwait = -1; // 标记错误发生
}
}

/* Get the "struct epitem" from an epoll queue wrapper */
static inline struct epitem* ep_item_from_epqueue(poll_table* p) {
return container_of(p, struct ep_pqueue, pt)->epi;
}

static inline void init_waitqueue_func_entry(wait_queue_t* q, wait_queue_func_t func) {
q->flags = 0;
q->private = NULL;
q->func = func;
}

3.2.4 ep_poll_callback

ep_poll_callback 函数核心功能是当目标 fd 的就绪事件到来时,将 fd 对应的 epitem 实例添加到 ep 就绪队列,并唤醒正在等待在 epoll_wait() 上的进程。这样,内核就会将 ep 就绪队列中的 event 报告给用户态应用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
*/
static int ep_poll_callback(wait_queue_t* wait, unsigned mode, int sync, void* key) {
int pwake = 0;
unsigned long flags;
struct epitem* epi = ep_item_from_wait(wait); // 从 eppoll_entry 中获取 epi
struct eventpoll* ep = epi->ep; // 从 epi 中获取 ep

spin_lock_irqsave(&ep->lock, flags);
/*
* 如果正在将事件传递给用户空间,我们不能持有锁
* (因为我们正在访问用户内存,以及 linux f_op->poll() 语义)
* 在那段时间内调用 epool_ctl 发生的所有事件,
* 都链接在 ep->ovflist 中并在稍后重新入队(加入红黑树中)
*/
if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
if (epi->next == EP_UNACTIVE_PTR) {
epi->next = ep->ovflist;
ep->ovflist = epi;
if (epi->ws) {
__pm_stay_awake(ep->ws);
}
}
goto out_unlock;
}

/* If this file is already in the ready list we exit soon */
if (!ep_is_linked(&epi->rdllink)) {
// 将 epi 就绪事件插入到 ep 就绪队列 ⭐
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake_rcu(epi);
}

// 如果活跃(有进程在等待),唤醒调用 epoll_wait() 而阻塞的进程 ⭐
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);

if (waitqueue_active(&ep->poll_wait))
pwake++;

out_unlock:
spin_unlock_irqrestore(&ep->lock, flags);
if (pwake)
ep_poll_safewake(&ep->poll_wait);

if ((unsigned long)key & POLLFREE) {
list_del_init(&wait->task_list); // 删除相应的 wait
smp_store_release(&ep_pwq_from_wait(wait)->whead, NULL);
}
return 1;
}

// 判断等待队列是否为空
static inline int waitqueue_active(wait_queue_head_t* q) {
return !list_empty(&q->task_list);
}
ep_remove 和 ep_modify(点击展开)

3.3 ep_remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int ep_remove(struct eventpoll* ep, struct epitem* epi) {
unsigned long flags;
struct file* file = epi->ffd.file;

ep_unregister_pollwait(ep, epi);

/* Remove the current item from the list of epoll hooks */
spin_lock(&file->f_lock);
list_del_rcu(&epi->fllink);
spin_unlock(&file->f_lock);

rb_erase(&epi->rbn, &ep->rbr);

spin_lock_irqsave(&ep->lock, flags);
if (ep_is_linked(&epi->rdllink))
list_del_init(&epi->rdllink);
spin_unlock_irqrestore(&ep->lock, flags);

wakeup_source_unregister(ep_wakeup_source(epi));
call_rcu(&epi->rcu, epi_rcu_free);
atomic_long_dec(&ep->user->epoll_watches);

return 0;
}

3.4 ep_modify

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
static int ep_modify(struct eventpoll* ep, struct epitem* epi, struct epoll_event* event) {
int pwake = 0;
unsigned int revents;
poll_table pt;

init_poll_funcptr(&pt, NULL);

epi->event.events = event->events; /* need barrier below */
epi->event.data = event->data; /* protected by mtx */
if (epi->event.events & EPOLLWAKEUP) {
if (!ep_has_wakeup_source(epi))
ep_create_wakeup_source(epi);
} else if (ep_has_wakeup_source(epi)) {
ep_destroy_wakeup_source(epi);
}

smp_mb();

revents = ep_item_poll(epi, &pt);

if (revents & event->events) {
spin_lock_irq(&ep->lock);
if (!ep_is_linked(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);

/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
spin_unlock_irq(&ep->lock);
}

/* We have to call this outside the lock */
if (pwake)
ep_poll_safewake(&ep->poll_wait);

return 0;
}

四、epoll_wait

4.1 sys_epoll_wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user*, events, int, maxevents, int, timeout) {
int error;
struct fd f;
struct eventpoll* ep;

// #define EP_MAX_EVENTS (INT_MAX / sizeof(struct epoll_event))
if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
return -EINVAL;

// 检查用户空间传递的内存是否可写,因为后面内核要拷贝就绪事件到用户内存
if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event)))
return -EFAULT;

// 获取 epfd 对应的 file,进而通过 file 的 private_data 拿到 eventpoll 实例 ⭐
f = fdget(epfd);
ep = f.file->private_data;

error = ep_poll(ep, events, maxevents, timeout); // 【小节 4.2】 ⭐

error_fput:
fdput(f);
return error;
}

4.2 ep_poll

该函数主要做以下几件事:

  1. 判断被监听的 fds 中是否有就绪的 fd,如果有就返回(序号 4&5)。
  2. 如果没有就把当前进程添加到 epoll 的等待队列中,并且进入睡眠。
  3. 进程会一直睡眠直到有以下几种情况发生:
    • 被监听的 fds 中有就绪的事件;
    • 设置了 timeout 并且超时了;
    • 接收到中断信号。
  4. 如果有就绪的文件,那么就调用 ep_send_events() 函数把就绪事件复制到入参指定的用户空间的 events 数组中。
  5. 返回就绪文件的个数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
static int ep_poll(struct eventpoll* ep, struct epoll_event __user* events, int maxevents, long timeout) {
int res = 0, eavail, timed_out = 0;
unsigned long flags;
long slack = 0;
wait_queue_t wait;
ktime_t expires, *to = NULL;

if (timeout > 0) { // 超时设置
struct timespec end_time = ep_set_mstimeout(timeout);
slack = select_estimate_accuracy(&end_time);
to = &expires;
*to = timespec_to_ktime(end_time);
} else if (timeout == 0) {
// timeout 等于 0 为非阻塞操作,此处避免不必要的等待队列循环
timed_out = 1;
spin_lock_irqsave(&ep->lock, flags);
goto check_events;
}

fetch_events:
spin_lock_irqsave(&ep->lock, flags);

// 没有事件就绪则进入睡眠状态,当事件就绪后可通过 ep_poll_callback() 来唤醒
if (!ep_events_available(ep)) { // 【小节 4.2.1】
// 初始化当前进程 current 的 wait 等待队列项 ⭐
init_waitqueue_entry(&wait, current); // 【小节 4.2.2】

// 将当前进程加入 ep->wq 等待队列,等待文件就绪、超时或中断信号 ⭐
__add_wait_queue_exclusive(&ep->wq, &wait);

for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
// 有就绪队列或者超时,则跳出循环
if (ep_events_available(ep) || timed_out)
break;
// 有待处理信号,则跳出循环
if (signal_pending(current)) {
res = -EINTR;
break;
}

spin_unlock_irqrestore(&ep->lock, flags);
// 主动出让 CPU,从这里开始进入睡眠状态
if (!freezable_schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
timed_out = 1;

spin_lock_irqsave(&ep->lock, flags);
}
// 有 3 种情况会执行到这里:从等待队列中移除该进程、设置进程为运行态 ⭐
// 1. 被监听的 fds 中有就绪的事件、2. 设置了 timeout 并且超时了、3. 接收到中断信号
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}

check_events:
eavail = ep_events_available(ep);
spin_unlock_irqrestore(&ep->lock, flags);

/* 尝试传输就绪事件到用户空间,如果没有获取就绪事件,但还未超时,会再次 fetch_events ⭐
* 1. eavail == 0 ? 返回 0 个就绪事件
* 2. res == 0 ? 尝试传输就绪事件到用户空间(数量为 res 值)
* 3. 尝试后 res == 0 ? 传输了 0 个或传输失败,未超时下则转向 fetch_events : 返回 res
*/
if (!res && eavail && !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
goto fetch_events;

return res;
}

4.2.1 ep_events_available

1
2
3
4
// eventpoll 就绪列表不为空 或 暂存列表不为空
static inline int ep_events_available(struct eventpoll* ep) {
return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
}

4.2.2 init_waitqueue_entry

1
2
3
4
5
static inline void init_waitqueue_entry(wait_queue_t* q, struct task_struct* p) {
q->flags = 0;
q->private = p;
q->func = default_wake_function; // 设置等待队列项的唤醒函数
}

五、总结

  1. epoll_create():获取一个未使用的 fd 作为 epfd,创建并初始化 eventpoll 结构体 ep,并将 ep 放入 file->private、file 放入 ep->file,最后将 file 与 fd 关联起来并返回 fd。
  2. epoll_ctl():以插入 epi 为例(进入 ep_insert 函数)。
    • init_poll_funcptr():将 ep_pqueue->pt 的成员变量_qproc 设置为 ep_ptable_queue_proc 函数,用于文件系统中 poll_wait()的回调函数;
    • ep_item_poll():通过这个函数,执行上面设置的 ep_ptable_queue_proc 回调函数;
    • ep_ptable_queue_proc():将 pwq->wait 的成员变量 func 的唤醒回调函数设置为 ep_poll_callback——这是用于文件系统中有就绪事件触发时,唤醒进程所用的回调函数。再将 pwq->wait 添加到文件的等待队列头 whead 中——在就绪时,就能通过 wq 头找到 wq 项,在 wq 项中获取并执行 func 回调函数。
  3. epoll_wait():主要工作是执行 ep_poll()方法。
    • 设置将要阻塞的 current 进程的等待队列项,其唤醒函数为 default_wake_function(),然后将其加入 ep->wq;
    • freezable_schedule_hrtimeout_range():出让 CPU,进入睡眠状态;
    • 等待被唤醒。

之后,当其他进程的就绪事件发生时便会唤醒相应等待队列上的进程。比如监控的是可写事件,则会在 write()方法中调用 wake_up 方法唤醒相对应的等待队列上的进程,当唤醒后执行前面设置的唤醒回调函数 ep_poll_callback 函数。

  1. ep_poll_callback():目标 fd 的就绪事件到来时,将 epi->rdllink 加入 ep->rdllist 的队列,导致 rdlist 不空,从而进程被唤醒,epoll_wait 得以继续执行。
  2. 回到 epoll_wait(),从队列中移除 wait,再传输就绪事件到用户空间。

epoll 比 select 更高效的一点是:epoll 监控的每一个文件 fd 就绪事件触发,导致相应 fd 上的回调函数 ep_poll_callback()被调用。

Linux 内核的 epoll 实现原理

参考资料:

  1. http://gityuan.com/2019/01/06/linux-epoll/
  2. https://www.bilibili.com/video/BV12p4y1372v/
  3. https://github.com/liexusong/linux-source-code-analyze/blob/master/epoll-principle.md
  4. https://www.bluepuni.com/archives/epoll-in-depth/