Linux 内核的等待队列是以双循环链表为基础数据结构,与进程调度机制紧密结合,能够用于实现核心的异步事件通知机制、可以使用等待队列实现阻塞进程的唤醒。等待队列在 Linux 内核中有着举足轻重的作用,很多 Linux 驱动都或多或少涉及到了等待队列。因此,对于 Linux 内核及驱动开发者来说,掌握等待队列是必修课之一。

在介绍等待队列机制的技术实现之前,先从功能角度谈下等待队列在内核中的角色。

你理解阻塞吗

写程序的时候,我们常常说某个系统调用是阻塞调用。

从用户层的角度,基本理解是:进程在执行某个系统调用的时候,因为需要的资源不满足(I/O 操作,加锁等),导致进程“停”在那里,等到资源就绪了或者设置的 timeout 时间超时了,进程得以继续执行。

从内核的角度,面对用户层对阻塞调用的需求,需要实现哪些机制呢?

  1. 首先,进程陷入内核,内核发现进程所要求的资源暂时无法满足,需要将其设置为睡眠状态,然后调度其它进程执行。

    • 这里引出一个问题:1)内核如何将一个进程睡眠的
  2. 其次,等到资源就绪时,我们需要唤醒等待在该资源上的进程。

    • 这里存在两个问题:2)内核是怎么知道资源就绪的 ?以及,3) 某个资源就绪了,内核怎么找到对应的等待进程,并将它唤醒的

一个场景:我们希望对某个 socket fd 进行阻塞 write 操作。发起写操作的时候,陷入内核,内核发现该 socket fd 的写缓冲区是满的(即需要的资源不满足),暂时不能写。这时,内核会将进程设置为睡眠状态。转而调用执行其它运行态的进程。等到该写缓冲区可以写的时候(即需要的资源满足了),内核将进程设置为运行状态,然后执行写操作,拷贝数据到内核写缓冲区。执行完,切换回用户态。

1)内核如何将一个进程睡眠的

在 Linux 中每一个进程都由 task_struct 数据结构来定义。它有一个 .state 状态成员,将其设置为睡眠(TASK_INTERRUPTIBLE 或 TASK_UNINTERRUPTIBLE),并将表示该进程的 task_struct 结构从就绪队列中移走(转移到等待队列中),内核就不会调度其执行,也就相当于睡眠了。

2)内核是怎么知道资源就绪的

中断机制:内核的所有工作都是由中断驱动的。不管是系统调用陷入内核,还是调度,还是其它的内核活动,都是由各种各样的中断来触发执行的。对于设备 I/O,如果设备空闲了,会触发一个外部中断,该中断触发内核执行中断处理程序,通知等待进程、执行回调等等。

资源可用 -> 中断触发 -> 内核接收到中断信号 -> 中断处理程序(更新某种状态标记以表示资源可用) -> 通知唤醒等待该资源的等待进程,重新进入调度队列(到这里中断处理结束) -> 被调度时,执行资源处理(这通常是在进程的上下文中进行,而不是在中断上下文中)。

3)某个资源就绪了,内核怎么找到对应的等待进程,并将它唤醒的

等待队列:我们将一个资源和一个等待队列关联起来。如果进程所请求的资源还未就绪,就先加入到该资源的等待队列中。等到资源就绪了,就唤醒等待队列中的进程,加入到调度。

怎么知道进程等待的资源是否就绪了呢(资源就绪后会发生什么变化)?在等待队列中,“资源”是一个通过条件变量 condition 模拟的抽象,当资源满足时,会将 condition 修改为真,从而 wake_up 等待队列中的进程。

等待队列与惊群

“惊群”的基本行为是:有多个进程 / 线程等待在同一个资源上,而该资源一次只能有一个进程处理。比如文件描述符的写操作、accept 一个新连接等。那么,在资源就绪的时候,如果内核采取的策略是唤醒所有的进程,这样,只有一个进程获取了该资源,其它进程发现没有资源就绪,继续进入睡眠(所谓虚假唤醒)。这样的行为浪费了系统的 CPU 资源。

那是不是,内核在资源就绪的时候,就唤醒一个进程不就得了。其实也不是,因为不是所有资源都是互斥的。比如某个文件的读操作。

那么,惊群问题怎么解决

在用户态,可以有不同的解决方式。或者忽略惊群所带来的开销,或者使用锁方式保证一次只有一个进程来阻塞在一个资源上。

而对于内核来说,在等待队列上增加了一个是否“互斥等待”的标志。即如果是互斥等待的,一次唤醒一个进程;如果不是互斥等待的,一次唤醒所有进程。

互斥等待的经典例子:accept。因为我们很明确知道,对一个 listen fd 的 accept,肯定是一次只有一个进程可以处理。那么,我们在 listen fd 上的等待队列,就毫无疑问可以设置为“互斥等待”。所以,现今版本的 linux 内核,解决了 accept 的惊群问题。

但是像 epoll_wait 的惊群问题,就无法从等待队列的互斥等待来解决。首先,epoll fd 上也有一个等待队列,代表 epoll fd 所监听的其他若干文件描述符(资源)就绪时,唤醒等待队列上的进程。因为我们无法确定,进程对这些资源是不是都是互斥访问的,还是都不是。所以,只好唤醒所有进程。更多的惊群问题,可以查阅相关资料。

等待队列

作用

Linux 内核的 等待队列是以双循环链表为基础数据结构,与进程调度机制紧密结合,能够用于实现核心的异步事件通知机制、可以使用等待队列实现阻塞进程的唤醒。

它有两种数据结构:等待队列头(wait_queue_head_t)和等待队列项(wait_queue_t)。等待队列头和等待队列中都包含一个 list_head 类型的域作为“连接件”。它通过一个双链表把等待 task 的头(wait_queue_head_t类型)和等待的进程列表(wait_queue_t类型)链接起来。

等待队列的每个节点都代表一个进程 task_struct 的封装,它通过等待队列项的 .private = current 来“绑定”进程,具体看后面的代码实现。

结构组成

(1)等待队列头

1
2
3
4
5
6
7
8
9
10
// 等待队列头
struct __wait_queue_head {
spinlock_t lock;
struct list_head task_list;
};

struct list_head {
struct list_head *next, *prev;
};
typedef struct __wait_queue_head wait_queue_head_t;
成员 描述
lock 自旋锁,在对 task_list 操作的过程中,使用该锁实现对等待队列(如添加、删除操作)的互斥访问
task_list 双向循环链表,存放等待的进程

(2)等待队列

1
2
3
4
5
6
7
8
9
10
11
// 等待队列
struct __wait_queue {
unsigned int flags; // 指明该等待的进程是互斥进程还是非互斥进程
#define WQ_FLAG_EXCLUSIVE 0x01
void* private;
wait_queue_func_t func; // 挂唤醒函数用的,如 default_wake_function
struct list_head task_list;
};

typedef int (*wait_queue_func_t)(wait_queue_t* wait, unsigned mode, int flags, void* key);
typedef struct __wait_queue wait_queue_t;

(3)等待队列头(wait_queue_head_t)和等待队列(wait_queue_t)的区别是等待队列是等待队列头的成员。也就是说等待队列头的 .task_list 域链接的成员就是等待队列类型的(wait_queue_t)。

等待队列

通过 ->task_list->next 得到的是下一个等待项的 task_list 成员的地址。那么,如何找到 wait_queue_t 结构的首地址呢?如果是通过减法操作,不会受字节对齐影响吗?不会的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* container_of - 从结构体的成员指针获取指向整个结构体的指针
* @ptr: 指向结构体成员的指针
* @type: 结构体类型
* @member: 成员在结构体中的名字
* typeof(((type*)0)->member) 创建一个 type 类型的空指针,并访问其成员,
* 不会实际访问内存,只是获取成员的类型
*/
#define container_of(ptr, type, member) \
({ \
const typeof(((type*)0)->member)* __mptr = (ptr); \
(type*)((char*)__mptr - offsetof(type, member)); \
})

#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)

操作 / 常用函数接口

Linux-2.6 提供如下关于等待队列的操作:

(1)定义等待队列头

1
wait_queue_head_t my_queue;

(2)初始化等待队列头

1
2
init_waitqueue_head(&my_queue);     // 对已定义的结构初始化
DECLARE_WAIT_QUEUE_HEAD(my_queue); // 定义结构并初始化

(3)定义等待队列

1
DECLARE_WAITQUEUE(name, tsk);  // 定义并初始化一个名为 name 的等待队列,并设置 private 域为 tsk

(4)添加 / 移除等待队列

1
2
3
4
5
// 用于将等待队列 wait,(头插)添加到等待队列头 q 指向的等待队列链表中
void fastcall add_wait_queue(wait_queue_head_t* q, wait_queue_t* wait);

// 用于将等待队列 wait,从等待队列头 q 指向的等待队列链表中移除
void fastcall remove_wait_queue(wait_queue_head_t* q, wait_queue_t* wait);

(5)等待事件

1
2
3
4
5
6
7
8
9
wait_event(queue, condition);
wait_event_interruptible(queue, condition);
wait_event_timeout(queue, condition, timeout);
wait_event_interruptible_timeout(queue, condition, timeout);
/*
等待第一个参数 queue 作为等待队列头的等待队列被唤醒,而且第二个参数 condition 必须满足,否则阻塞
wait_event 和 wait_event_interruptible 的区别在于后者可以被信号打断,而前者不能
加上 timeout 意味着阻塞等待的超时时间,以 jiffy 为单位,在第三个参数的 timeout 到达时,不论 condition 是否满足,均返回
*/

(6)唤醒队列

1
2
3
4
5
6
7
8
9
10
11
void wake_up(wait_queue_head_t* queue);
void wake_up_interruptible(wait_queue_head_t* queue);
/*
上述操作会唤醒以 queue 作为等待队列头的【所有】等待队列对应的进程
wake_up() <---> wait_event()
wait_event_timeout()
wake_up_interruptible() <---> wait_event_interruptible()
wait_event_interruptible_timeout()
wake_up() 可以唤醒处于 TASK_INTERRUPTIBLE 和 TASK_UNINTERRUPTIBLE 的进程
wake_up_interruptble() 只能唤醒处于 TASK_INTERRUPTIBLE 的进程
*/

(7)在等待队列上睡眠。sleep_on 无需条件的睡眠,可能导致竞态,3.15 版本后废弃,采用 wait_event 代替 sleep_on。

1
2
3
4
5
6
7
8
9
// 作用是将当前进程的状态置成 TASK_UNINTERRUPTIBLE,定义一个等待队列,并把它添加到等待队列头 q,直到资源获得,q 指向的等待队列被唤醒
sleep_on(wait_queue_head_t* q);

// 作用是将当前进程的状态置成 TASK_INTERRUPTIBLE,并定义一个等待队列,之后把它附属到等待队列头 q,直到资源可获得,q 指向的等待队列被唤醒或者进程收到信号
interruptible_sleep_on(wait_queue_head_t* q);
/*
sleep_on() <---> wake_up()
interruptible_sleep_on() <---> wake_up_interruptible()
*/

接口实现

初始化

(1)直接定义后初始化。init_waitqueue_head()函数会将自旋锁初始化为未锁,等待队列初始化为空的双向循环链表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
wait_queue_head_t my_queue;
init_waitqueue_head(&my_queue);

#define init_waitqueue_head(q) \
do { \
static struct lock_class_key __key; \
\
__init_waitqueue_head((q), &__key); \
} while (0)

void __init_waitqueue_head(wait_queue_head_t* q, struct lock_class_key* key) {
spin_lock_init(&q->lock); // 初始化自旋锁
lockdep_set_class(&q->lock, key); // 高级调试,具体代码看不懂,只知道可以检测是否发生死锁
INIT_LIST_HEAD(&q->task_list); // 初始化链表
}

static inline void INIT_LIST_HEAD(struct list_head* list) {
list->next = list;
list->prev = list;
}

(2)定义并初始化的快捷方式,等价于(1)。

1
2
3
4
5
6
7
8
DECLARE_WAIT_QUEUE_HEAD(my_queue);

#define DECLARE_WAIT_QUEUE_HEAD(name) \
wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name)

#define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \
.lock = __SPIN_LOCK_UNLOCKED(name.lock), \
.task_list = { &(name).task_list, &(name).task_list } }

(3)定义等待队列,注意此处是定义一个 wait_queue_t 类型的变量 name,并将其 private 与设置为 tsk。

1
2
3
4
5
6
7
8
9
DECLARE_WAITQUEUE(name, tsk);

#define DECLARE_WAITQUEUE(name, tsk) \
wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk)

#define __WAITQUEUE_INITIALIZER(name, tsk) { \
.private = tsk, \
.func = default_wake_function, \
.task_list = { NULL, NULL } }

添加/移出等待队列

(1)向等待队列头中添加等待队列,设置等待的进程为非互斥进程,并将其添加进等待队列头的 队头中(而非队尾)

1
2
3
4
5
6
7
8
9
10
11
12
13
void add_wait_queue(wait_queue_head_t* q, wait_queue_t* wait) {
unsigned long flags;

wait->flags &= ~WQ_FLAG_EXCLUSIVE; // ~WQ_FLAG_EXCLUSIVE 位运算表示非互斥
spin_lock_irqsave(&q->lock, flags); // 需要修改等待队列,申请锁
__add_wait_queue(q, wait); // 添加到等待队列的队头
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue);

static inline void __add_wait_queue(wait_queue_head_t* head, wait_queue_t* new) {
list_add(&new->task_list, &head->task_list);
}

(2)向等待队列头中添加等待队列,设置等待的进程为互斥进程,并将其添加进等待队列头的 队尾中(而非队头)

1
2
3
4
5
6
7
8
9
10
11
12
13
void add_wait_queue_exclusive(wait_queue_head_t* q, wait_queue_t* wait) {
unsigned long flags;

wait->flags |= WQ_FLAG_EXCLUSIVE; // 互斥进程 WQ_FLAG_EXCLUSIVE
spin_lock_irqsave(&q->lock, flags); // 需要修改等待队列,申请锁
__add_wait_queue_tail(q, wait); // 添加到等待队列的队尾
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(add_wait_queue_exclusive);

static inline void __add_wait_queue_tail(wait_queue_head_t* head, wait_queue_t* new) {
list_add_tail(&new->task_list, &head->task_list);
}

该函数和 add_wait_queue()函数功能基本一样,只不过它是将等待的进程设置为互斥进程。

(3)在等待的资源或事件满足时,进程被唤醒,使用该函数从等待头指向的等待队列中删除一项。

1
2
3
4
5
6
7
8
9
10
11
12
void remove_wait_queue(wait_queue_head_t* q, wait_queue_t* wait) {
unsigned long flags;

spin_lock_irqsave(&q->lock, flags); // 需要修改等待队列,申请锁
__remove_wait_queue(q, wait); // 删除
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(remove_wait_queue);

static inline void __remove_wait_queue(wait_queue_head_t* head, wait_queue_t* old) {
list_del(&old->task_list);
}

等待事件

wait_event

wait_event 宏使进程被置于深睡眠状态(不可被信号中断)进行等待,直到 @condition 条件满足。等待队列头 @wq 在每次被 wakeup 时都会检查 @condition 条件,如果不满足则继续睡眠等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// linux/wait.h

/**
* wait_event - 睡眠直到条件满足
* @wq: 要等待的等待队列头
* @condition: 要等待的条件表达式
*
* 进程会进入不可中断的睡眠状态 (TASK_UNINTERRUPTIBLE),直到
* @condition 条件为真。每次等待队列头 @wq 被唤醒时都会检查该条件。
*
* 在改变可能影响等待条件的任何变量之后,必须调用 wake_up()。
*/
#define wait_event(wq, condition) \
do { \
if (condition) \
break; \
__wait_event(wq, condition); \
} while (0)

#define __wait_event(wq, condition) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \
if (condition) \
break; \
schedule(); \
} \
finish_wait(&wq, &__wait); \
} while (0)

// linux/wait.h

#define DEFINE_WAIT_FUNC(name, function) \
wait_queue_t name = { \
.private = current, \
.func = function, \
.task_list = LIST_HEAD_INIT((name).task_list), \
} // current 指向当前运行的进程的指针

// 定义并初始化一个等待队列项,同时指定了 private 和 func 分别为 current 和 autoremove_wake_function
#define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function)

// include/asm-generic/current.h
#define get_current() (current_thread_info()->task) // 获取当前执行的任务(进程)的 task_struct 类型指针
#define current get_current() // 当前运行进程的指针

// prepare_to_wait() 修改当前进程状态为不可中断的睡眠状态(TASK_UNINTERRUPTIBLE)
// schedule()开始进行睡眠
// finish_wait() 修改当前进程状态为 TASK_RUNNING,并将当前进程从等待队列中移除(若它还在)

// kernel/wait.h

/*
* 注意:我们需要在添加到等待队列之后再使用 "set_current_state()",
* 因为在 SMP 系统中需要一个内存屏障,避免编译器优化
* 这样任何检测等待队列是否活跃的唤醒函数
* 将被保证看到等待队列的添加,或者本线程中随后的测试将看到唤醒已经发生。
*/
void prepare_to_wait(wait_queue_head_t* q, wait_queue_t* wait, int state) {
unsigned long flags;

/* 睡眠前的准备工作,用于防止 wait 不在队列中,而事件已产生,则会无限等待 */

wait->flags &= ~WQ_FLAG_EXCLUSIVE; // 设置为非互斥模式
spin_lock_irqsave(&q->lock, flags); // 需要修改等待队列,申请锁
if (list_empty(&wait->task_list)) // 当 wait 不在队列 q,则加入其中,防止无法唤醒
__add_wait_queue(q, wait); // 添加事件到等待队列
set_current_state(state); // 修改当前进程状态
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(prepare_to_wait);

/**
* finish_wait - clean up after waiting in a queue
* @q: waitqueue waited on
* @wait: wait descriptor
*
* Sets current thread back to running state and removes
* the wait descriptor from the given waitqueue if still
* queued.
*/
void finish_wait(wait_queue_head_t* q, wait_queue_t* wait) {
unsigned long flags;

__set_current_state(TASK_RUNNING);

if (!list_empty_careful(&wait->task_list)) {
spin_lock_irqsave(&q->lock, flags);
list_del_init(&wait->task_list); // deletes entry from list and reinitialize it
spin_unlock_irqrestore(&q->lock, flags);
}
}
EXPORT_SYMBOL(finish_wait);

// linux/sched.h

#define set_current_state(state_value) set_mb(current->state, (state_value))

#define set_mb(var, value) \
do { \
var = value; \
barrier(); \
} while (0) // 设置变量值并加上内存屏障避免指令重排

有一个疑问,这里 DEFINE_WAIT(__wait) 定义一个 局部 等待队列项__wait,其用意在哪?

  • 管理当前等待状态:使用 __wait 来表示当前进程(.private = current)的等待状态(set_current_state(state)),用来跟踪哪个线程(current)在等待哪个条件(condition)。
  • 自动清理:使用 autoremove_wake_function 回调函数,__wait 可以在条件满足时自动移除。

另一个疑问,在 set_current_state(state) 设置进程状态时,为什么不是修改 (&__wait)->private->state,而是再次获取current->state 来修改?它俩会不会已经不是同一个进程了呢?

  • 单线程上下文 wait_event__wait_event 宏都 运行在调用进程的上下文中,不会被其它进程异步更改。因此,在循环等待或状态设置之前和之后,current 不会改变。
  • 调度保障:在进入不可中断状态后,进程会在调度器 schedule() 中等待条件发生改变并被唤醒。在这一过程中,进程上下文保持一致。

所以,正常情况下 current.private 所指的进程将保持一致。

wait_event_interruptible

wait_event_interruptible 与 wait_event 宏的不同之处是:该宏使进程被置于浅睡眠状态(可被信号中断)进行等待。在每次被唤醒的时候,首先检查 condition 是否为真,如果为真则返回 0;否则,检查进程是被信号唤醒,是则返回 -ERESTARTSYS 错误码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* wait_event_interruptible - sleep until a condition gets true
* @wq: the waitqueue to wait on
* @condition: a C expression for the event to wait for
*
* The process is put to sleep (TASK_INTERRUPTIBLE) until the
* @condition evaluates to true or a signal is received.
* The @condition is checked each time the waitqueue @wq is woken up.
*
* wake_up() has to be called after changing any variable that could
* change the result of the wait condition.
*
* The function will return -ERESTARTSYS if it was interrupted by a
* signal and 0 if @condition evaluated to true.
*/
#define wait_event_interruptible(wq, condition) \
({ \
int __ret = 0; \
if (!(condition)) \
__wait_event_interruptible(wq, condition, __ret); \
__ret; \
})

#define __wait_event_interruptible(wq, condition, ret) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
if (condition) \
break; \
if (!signal_pending(current)) { \
schedule(); \
continue; \
} \
ret = -ERESTARTSYS; \
break; \
} \
finish_wait(&wq, &__wait); \
} while (0)

wait_event_timeout

wait_event_timeout 也与 wait_event 类似,均使进程被置于深睡眠状态(不可被中断)进行等待。不过如果所给的睡眠时间为负数则立即返回;如果在睡眠期间被唤醒,且 condition 为真则返回剩余的睡眠时间,否则继续睡眠直到到达或超过给定的睡眠时间,然后返回 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
#define wait_event_timeout(wq, condition, timeout)      \
({ \
long __ret = timeout; \
if (!(condition)) \
__wait_event_timeout(wq, condition, __ret); \
__ret; \
})

#define __wait_event_timeout(wq, condition, ret) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \
if (condition) \
break; \
ret = schedule_timeout(ret); \
if (!ret) \
break; \
} \
finish_wait(&wq, &__wait); \
} while (0)

wait_event_interruptible_timeout

wait_event_interruptible_timeout 是 interruptible 和 timeout 的结合,可使进程被置于浅睡眠状态(可被信号中断)进行等待。如果在睡眠期间被信号打断,则返回 -ERESTARTSYS 错误码;到达或超过给定的睡眠时间,则返回 0。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#define wait_event_interruptible_timeout(wq, condition, timeout)      \
({ \
long __ret = timeout; \
if (!(condition)) \
__wait_event_interruptible_timeout(wq, condition, __ret); \
__ret; \
})

#define __wait_event_interruptible_timeout(wq, condition, ret) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \
if (condition) \
break; \
if (!signal_pending(current)) { \
ret = schedule_timeout(ret); \
if (!ret) \
break; \
continue; \
} \
ret = -ERESTARTSYS; \
break; \
} \
finish_wait(&wq, &__wait); \
} while (0)

wait_event_interruptible_exclusive

同样和 wait_event_interruptible 一样,不过该睡眠的进程是一个互斥进程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
#define wait_event_interruptible_exclusive(wq, condition)               \
({ \
int __ret = 0; \
if (!(condition)) \
__wait_event_interruptible_exclusive(wq, condition, __ret); \
__ret; \
})

#define __wait_event_interruptible_exclusive(wq, condition, ret) \
do { \
DEFINE_WAIT(__wait); \
\
for (;;) { \
prepare_to_wait_exclusive(&wq, &__wait, TASK_INTERRUPTIBLE); \
if (condition) { \
finish_wait(&wq, &__wait); \
break; \
} \
if (!signal_pending(current)) { \
schedule(); \
continue; \
} \
ret = -ERESTARTSYS; \
abort_exclusive_wait(&wq, &__wait, TASK_INTERRUPTIBLE, NULL); \
break; \
} \
} while (0)

void prepare_to_wait_exclusive(wait_queue_head_t* q, wait_queue_t* wait, int state) {
unsigned long flags;

wait->flags |= WQ_FLAG_EXCLUSIVE; // 互斥进程
spin_lock_irqsave(&q->lock, flags);
if (list_empty(&wait->task_list))
__add_wait_queue_tail(q, wait); // 添加到队列尾
set_current_state(state);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(prepare_to_wait_exclusive);

/**
* abort_exclusive_wait - 终止在队列中的独占等待(abort exclusive waiting in a queue)
* @q: waitqueue waited on
* @wait: wait descriptor
* @mode: runstate of the waiter to be woken
* @key: key to identify a wait bit queue or %NULL
*
* 将当前线程设置回运行状态,并从给定的等待队列中删除等待描述符(如果仍在队列中)。
*
* 如果调用者并发地唤醒该队列,则唤醒队列里的下一个等待者。
*
* This prevents waiter starvation where an exclusive waiter
* aborts and is woken up concurrently and no one wakes up
* the next waiter.
*/
void abort_exclusive_wait(wait_queue_head_t* q, wait_queue_t* wait, unsigned int mode, void* key) {
unsigned long flags;

__set_current_state(TASK_RUNNING);
spin_lock_irqsave(&q->lock, flags);
if (!list_empty(&wait->task_list))
list_del_init(&wait->task_list);
else if (waitqueue_active(q)) // (!list_empty(&q->task_list))
__wake_up_locked_key(q, mode, key);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(abort_exclusive_wait);

唤醒队列

wake_up

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
#define wake_up(x)      __wake_up(x, TASK_NORMAL, 1, NULL)

#define TASK_NORMAL (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE)

/**
* __wake_up - wake up threads blocked on a waitqueue.
* @q: the waitqueue
* @mode: which threads
* @nr_exclusive: how many wake-one or wake-many threads to wake up
* @key: is directly passed to the wakeup function
*
* It may be assumed that this function implies a write memory barrier before
* changing the task state if and only if any tasks are woken up.
*/
void __wake_up(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, void *key)
{
unsigned long flags;

spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0, key);
spin_unlock_irqrestore(&q->lock, flags);
}
EXPORT_SYMBOL(__wake_up);

/*
* The core wakeup function. Non-exclusive wakeups (nr_exclusive == 0) just
* wake everything up. If it's an exclusive wakeup (nr_exclusive == small +ve
* number) then we wake all the non-exclusive tasks and one exclusive task.
*
* There are circumstances in which we can try to wake a task which has already
* started to run but is not in state TASK_RUNNING. try_to_wake_up() returns
* zero in this (rare) case, and we handle it by continuing to scan the queue.
*/
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
int nr_exclusive, int wake_flags, void *key)
{
wait_queue_t *curr, *next;

list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
unsigned flags = curr->flags;

// 通过 curr->func 获取唤醒回调函数并执行
if (curr->func(curr, mode, wake_flags, key) &&
(flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
break;
}
}

唤醒等待队列:可唤醒处于 TASK_INTERRUPTIBLE 和 TASK_UNINTERUPTIBLE 状态的进程,和 wait_event/wait_event_timeout 成对使用。

被唤醒的进程,都会检查自己等待的条件是否满足,满足的进程会修改自己的状态为 TASK_RUNNING;如果条件不满足会继续睡眠,等待下次被唤醒(睡眠的进程可能支持可中断,所以并发所有的唤醒都是由类似函数唤醒)。

wake_up_interruptible

1
#define wake_up_interruptible(x)        __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)

与 wake_up 的唯一区别是它只能唤醒 TASK_INTERRUPTIBLE 状态的进程。与 wait_event_interruptible/wait_event_interruptible_timeout/wait_event_interruptible_exclusive 成对使用。

wake_up_…

1
2
3
4
#define wake_up_all(x)      __wake_up(x, TASK_NORMAL, 0, NULL)  // Non-exclusive wakeups (nr_exclusive == 0) just wake everything up.

#define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL)
#define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)

这些也基本都和 wake_up/wake_up_interruptible 一样。

在等待队列上睡眠

sleep_on

该函数的作用是定义一个等待队列(wait),并将 current 进程添加到等待队列中(wait),然后将当前进程的状态置为 TASK_UNINTERRUPTIBLE,并将等待队列(wait)添加到等待队列头(q)中。之后就被挂起直到资源可以获取,才被从等待队列头(q)中唤醒,从等待队列头(q)中移出等待队列(wait)。在被挂起等待资源期间,该进程不能被信号唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void __sched sleep_on(wait_queue_head_t* q) {
sleep_on_common(q, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
EXPORT_SYMBOL(sleep_on);

static long __sched sleep_on_common(wait_queue_head_t* q, int state, long timeout) {
unsigned long flags;
wait_queue_t wait;

// wait 变量用于当前线程 current 挂接,并指定唤醒函数
init_waitqueue_entry(&wait, current);

__set_current_state(state);

spin_lock_irqsave(&q->lock, flags);
__add_wait_queue(q, &wait);
spin_unlock(&q->lock);

// 让线程调度并进入睡眠,直到资源可以获取或者超时
timeout = schedule_timeout(timeout);
spin_lock_irq(&q->lock);
__remove_wait_queue(q, &wait);
spin_unlock_irqrestore(&q->lock, flags);

return timeout;
}

// kernel/wait.h
static inline void init_waitqueue_entry(wait_queue_t* q, struct task_struct* p) {
q->flags = 0;
q->private = p;
q->func = default_wake_function;
}

#define __set_current_state(state_value) \
do { \
current->state = (state_value); \
} while (0)

上面,四个自旋锁操作的区别在于中断处理和锁的解锁方式:

  1. spin_lock_irqsave(&q->lock, flags):

    • 获取自旋锁并禁用本地 CPU 中断。
    • 保存中断状态到 flags 中,以便之后恢复,用于确保在锁持有期间,不会因中断处理导致竞态条件。
  2. spin_unlock(&q->lock):

    • 释放自旋锁。
    • 不恢复中断状态,常用于不涉及中断的情况下。
  3. spin_lock_irq(&q->lock):

    • 获取自旋锁并禁用本地 CPU 中断。
    • 不保存中断状态,一般用于不关心中断状态恢复的临界区。
  4. spin_unlock_irqrestore(&q->lock, flags):

    • 释放自旋锁。
    • 恢复之前保存的中断状态,用于结束一段可能影响中断处理的代码块,并恢复到进入时的中断状态。

sleep_on_timeout

与 sleep_on() 函数的区别在于调用该函数时,如果在指定的时间内(timeout)没有获得等待的资源就会返回。实际上是调用 schedule_timeout() 函数实现的。值得注意的是,如果给定的 timeout 小于 0,则不会睡眠。该函数返回的是真正的睡眠时间。

1
2
3
4
long __sched sleep_on_timeout(wait_queue_head_t* q, long timeout) {
return sleep_on_common(q, TASK_UNINTERRUPTIBLE, timeout);
}
EXPORT_SYMBOL(sleep_on_timeout);

interruptible_sleep_on

该函数和 sleep_on() 函数唯一的区别是:将当前进程的状态置为 TASK_INTERRUPTINLE,这意味在睡眠过程中,如果该进程收到信号则会被唤醒。

1
2
3
4
void __sched interruptible_sleep_on(wait_queue_head_t* q) {
sleep_on_common(q, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT);
}
EXPORT_SYMBOL(interruptible_sleep_on);

interruptible_sleep_on_timeout

类似于 sleep_on_timeout() 函数。进程在睡眠中可能在等待的时间没有到达就被信号打断而被唤醒,也可能是等待的时间到达而被唤醒。

1
2
3
4
long __sched interruptible_sleep_on_timeout(wait_queue_head_t* q, long timeout) {
return sleep_on_common(q, TASK_INTERRUPTIBLE, timeout);
}
EXPORT_SYMBOL(interruptible_sleep_on_timeout);

以上四个函数都是让进程在等待队列上睡眠,不过是小有差异而已。在实际用的过程中,根据需要选择合适的函数使用就是了。例如,

  • 在对软驱数据的读写中,如果设备没有就绪,则调用 sleep_on() 函数睡眠直到数据可读(可写);
  • 在打开串口的时候,如果串口端口处于关闭状态,则调用 interruptible_sleep_on() 函数尝试等待其打开;
  • 在声卡驱动中,读取声音数据时,如果没有数据可读,就会等待足够常的时间直到可读取。

字符设备驱动示例

要在 Linux 内核中使用 wait_queue_head_t 来实现进程的等待和唤醒,我们可以编写一个简单的字符设备驱动示例。该示例将展示如何让一个进程进入睡眠状态,直到另一个进程或事件唤醒它。

内核代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <linux/fs.h>
#include <linux/init.h>
#include <linux/module.h>
#include <linux/sched.h>
#include <linux/uaccess.h>
#include <linux/wait.h>

#define DEVICE_NAME "wait_queue_example"

static int major_num;
static DECLARE_WAIT_QUEUE_HEAD(wq);
static int condition = 0;

static ssize_t device_read(struct file* filp, char* buffer, size_t length, loff_t* offset) {
printk("Process %d (%s) going to sleep\n", current->pid, current->comm);

// 让进程进入睡眠,并在条件满足(等待数据到达)时被唤醒
wait_event_interruptible(wq, condition != 0); // 该函数将进程设置为非互斥的 TASK_INTERRUPTIBLE 状态的进程

condition = 0; // Reset the condition

printk("Process %d (%s) woken up\n", current->pid, current->comm);

return 0;
}

static ssize_t device_write(struct file* filp, const char* buffer, size_t length, loff_t* offset) {
printk("Process %d (%s) waking up the readers\n", current->pid, current->comm);

condition = 1;

// 唤醒等待在条件上的所有进程,通知已经有数据可以读取
wake_up_interruptible(&wq);

return length;
}

static struct file_operations fops = {
.read = device_read,
.write = device_write,
};

static int __init wait_queue_example_init(void) {
major_num = register_chrdev(0, DEVICE_NAME, &fops);

if (major_num < 0) {
printk("Registering char device failed with %d\n", major_num);
return major_num;
}

printk("I was assigned major number %d.", major_num);
printk("To talk to the driver, create a dev file with 'mknod /dev/%s c %d 0'.\n", DEVICE_NAME, major_num);
printk("Try various minor numbers. Try to cat and echo to the device file.\n");

return 0;
}

static void __exit wait_queue_example_exit(void) {
unregister_chrdev(major_num, DEVICE_NAME);
printk("Goodbye, world!\n");
}

module_init(wait_queue_example_init);
module_exit(wait_queue_example_exit);

MODULE_LICENSE("GPL");
MODULE_AUTHOR("ahaaa");
MODULE_DESCRIPTION("A simple Linux driver with wait queues");

编译模块

使用 make 编译模块,在包含源代码和 Makefile 的目录中运行 make 命令,这会生成一个.ko(内核模块)文件,比如 wait_queue_example.ko。

1
2
3
4
5
6
7
obj-m += wait_queue_example.o

all:
make -C /lib/modules/$(shell uname -r)/build M=$(PWD) modules

clean:
make -C /lib/modules/$(shell uname -r)/build M=$(PWD) clean

加载模块

使用 insmod 命令加载模块:sudo insmod wait_queue_example.ko。如果一切顺利,你将看不到任何输出,因为 printk 函数不会输出到控制台,而是输出到内核日志。

查看模块是否成功加载 / 卸载:lsmod | grep wait_queue_example

查看内核日志,以确保模块正常启动:dmesg | tail

使用 rmmod 命令卸载模块:sudo rmmod wait_queue_example

创建设备文件

查找 wait_queue_example 对应的主设备号:cat /proc/devices

使用 mknod 创建设备文件,假设主设备号是 major_num:sudo mknod /dev/wait_queue_example c <major_num> 0

设置设备文件的权限,使其可读写:sudo chmod 666 /dev/wait_queue_example

运行进程

在一个终端中运行cat /dev/wait_queue_example,此进程将进入等待队列进行睡眠。

在另一个终端中使用echo "wake" > /dev/wait_queue_example,这将唤醒等待的进程。

再次通过 dmesg | tail 查看内核日志,可以看到:

1
2
3
[28154.120944] Process 9571 (cat) going to sleep
[28200.935241] Process 9603 (bash) waking up the readers
[28200.935265] Process 9571 (cat) woken up

这两个进程,能共享 condition 这个变量吗?一个进行修改了这个变量,另一个进程可以读到修改后的变量吗?

是的,这两个进程可以共享和访问 condition 变量,因为 它们在同一个内核模块中运行,并共享相同的内存地址空间(同一块内存)。因此,一个进程对 condition 的更改对其它进程立即可见。

参考资料:

  1. https://shunlqing.github.io/2018/05/19/2018_5_19LinuxKernel_WaitQueue/
  2. https://cslqm.github.io/2020/01/08/wait_queue_head_t/
  3. https://juejin.cn/post/7083852861930471432