Linux 内核的等待队列是以双循环链表为基础数据结构,与进程调度机制紧密结合,能够用于实现核心的异步事件通知机制、可以使用等待队列实现阻塞进程的唤醒。等待队列在 Linux 内核中有着举足轻重的作用,很多 Linux 驱动都或多或少涉及到了等待队列。因此,对于 Linux 内核及驱动开发者来说,掌握等待队列是必修课之一。
在介绍等待队列机制的技术实现之前,先从功能角度谈下等待队列在内核中的角色。
你理解阻塞吗
写程序的时候,我们常常说某个系统调用是阻塞调用。
从用户层的角度,基本理解是:进程在执行某个系统调用的时候,因为需要的资源不满足(I/O 操作,加锁等),导致进程“停”在那里,等到资源就绪了或者设置的 timeout 时间超时了,进程得以继续执行。
从内核的角度,面对用户层对阻塞调用的需求,需要实现哪些机制呢?
首先,进程陷入内核,内核发现进程所要求的资源暂时无法满足,需要将其设置为睡眠状态,然后调度其它进程执行。
其次,等到资源就绪时,我们需要唤醒等待在该资源上的进程。
这里存在两个问题:2)内核是怎么知道资源就绪的 ?以及,3) 某个资源就绪了,内核怎么找到对应的等待进程,并将它唤醒的 ?
一个场景:我们希望对某个 socket fd 进行阻塞 write 操作。发起写操作的时候,陷入内核,内核发现该 socket fd 的写缓冲区是满的(即需要的资源不满足),暂时不能写。这时,内核会将进程设置为睡眠状态。转而调用执行其它运行态的进程。等到该写缓冲区可以写的时候(即需要的资源满足了),内核将进程设置为运行状态,然后执行写操作,拷贝数据到内核写缓冲区。执行完,切换回用户态。
1)内核如何将一个进程睡眠的 ?
在 Linux 中每一个进程都由 task_struct
数据结构来定义。它有一个 .state
状态成员,将其设置为睡眠(TASK_INTERRUPTIBLE 或 TASK_UNINTERRUPTIBLE),并将表示该进程的 task_struct
结构从就绪队列中移走(转移到等待队列中),内核就不会调度其执行,也就相当于睡眠了。
2)内核是怎么知道资源就绪的 ?
中断机制:内核的所有工作都是由中断驱动的。不管是系统调用陷入内核,还是调度,还是其它的内核活动,都是由各种各样的中断来触发执行的。对于设备 I/O,如果设备空闲了,会触发一个外部中断,该中断触发内核执行中断处理程序,通知等待进程、执行回调等等。
资源可用 -> 中断触发 -> 内核接收到中断信号 -> 中断处理程序(更新某种状态标记以表示资源可用 ) -> 通知唤醒等待该资源的等待进程,重新进入调度队列(到这里中断处理结束) -> 被调度时,执行资源处理(这通常是在进程的上下文中进行,而不是在中断上下文中)。
3)某个资源就绪了,内核怎么找到对应的等待进程,并将它唤醒的 ?
等待队列:我们将一个资源和一个等待队列关联起来。如果进程所请求的资源还未就绪,就先加入到该资源的等待队列中。等到资源就绪了,就唤醒等待队列中的进程,加入到调度。
怎么知道进程等待的资源是否就绪了呢(资源就绪后会发生什么变化)?在等待队列中,“资源”是一个通过条件变量 condition 模拟的抽象,当资源满足时,会将 condition 修改为真,从而 wake_up 等待队列中的进程。
等待队列与惊群
“惊群”的基本行为是 :有多个进程 / 线程等待在同一个资源上,而该资源一次只能有一个进程处理。比如文件描述符的写操作、accept 一个新连接等。那么,在资源就绪的时候,如果内核采取的策略是唤醒所有的进程,这样,只有一个进程获取了该资源,其它进程发现没有资源就绪,继续进入睡眠(所谓虚假唤醒)。这样的行为浪费了系统的 CPU 资源。
那是不是,内核在资源就绪的时候,就唤醒一个进程不就得了。其实也不是,因为不是所有资源都是互斥的。比如某个文件的读操作。
那么,惊群问题怎么解决 ?
在用户态,可以有不同的解决方式。或者忽略惊群所带来的开销,或者使用锁方式保证一次只有一个进程来阻塞在一个资源上。
而对于内核来说,在等待队列上增加了一个是否“互斥等待”的标志。即如果是互斥等待的,一次唤醒一个进程;如果不是互斥等待的,一次唤醒所有进程。
互斥等待的经典例子:accept。因为我们很明确知道,对一个 listen fd 的 accept,肯定是一次只有一个进程可以处理。那么,我们在 listen fd 上的等待队列,就毫无疑问可以设置为“互斥等待”。所以,现今版本的 linux 内核,解决了 accept 的惊群问题。
但是像 epoll_wait 的惊群问题,就无法从等待队列的互斥等待来解决。首先,epoll fd 上也有一个等待队列,代表 epoll fd 所监听的其他若干文件描述符(资源)就绪时,唤醒等待队列上的进程。因为我们无法确定,进程对这些资源是不是都是互斥访问的,还是都不是。所以,只好唤醒所有进程。更多的惊群问题,可以查阅相关资料。
等待队列
作用
Linux 内核的 等待队列是以双循环链表为基础数据结构 ,与进程调度机制紧密结合,能够用于实现核心的异步事件通知机制、可以使用等待队列实现阻塞进程的唤醒。
它有两种数据结构:等待队列头(wait_queue_head_t
)和等待队列项(wait_queue_t
)。等待队列头和等待队列中都包含一个 list_head
类型的域作为“连接件”。它通过一个双链表把等待 task 的头(wait_queue_head_t
类型)和等待的进程列表(wait_queue_t
类型)链接起来。
等待队列的每个节点都代表一个进程 task_struct
的封装,它通过等待队列项的 .private = current
来“绑定”进程 ,具体看后面的代码实现。
结构组成
(1)等待队列头
1 2 3 4 5 6 7 8 9 10 struct __wait_queue_head { spinlock_t lock; struct list_head task_list ; }; struct list_head { struct list_head *next , *prev ; }; typedef struct __wait_queue_head wait_queue_head_t ;
成员
描述
lock
自旋锁,在对 task_list 操作的过程中,使用该锁实现对等待队列(如添加、删除操作)的互斥访问
task_list
双向循环链表,存放等待的进程
(2)等待队列
1 2 3 4 5 6 7 8 9 10 11 struct __wait_queue { unsigned int flags; #define WQ_FLAG_EXCLUSIVE 0x01 void * private; wait_queue_func_t func; struct list_head task_list ; }; typedef int (*wait_queue_func_t ) (wait_queue_t * wait, unsigned mode, int flags, void * key) ;typedef struct __wait_queue wait_queue_t ;
(3)等待队列头(wait_queue_head_t
)和等待队列(wait_queue_t
)的区别是等待队列是等待队列头的成员。也就是说等待队列头的 .task_list
域链接的成员就是等待队列类型的(wait_queue_t
)。
通过 ->task_list->next
得到的是下一个等待项的 task_list
成员的地址。那么,如何找到 wait_queue_t
结构的首地址呢?如果是通过减法操作,不会受字节对齐影响吗?不会的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 #define container_of(ptr, type, member) \ ({ \ const typeof(((type*)0)->member)* __mptr = (ptr); \ (type*)((char*)__mptr - offsetof(type, member)); \ }) #define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
操作 / 常用函数接口
Linux-2.6 提供如下关于等待队列的操作:
(1)定义等待队列头
1 wait_queue_head_t my_queue;
(2)初始化等待队列头
1 2 init_waitqueue_head(&my_queue); DECLARE_WAIT_QUEUE_HEAD(my_queue);
(3)定义等待队列
1 DECLARE_WAITQUEUE(name, tsk);
(4)添加 / 移除等待队列
1 2 3 4 5 void fastcall add_wait_queue (wait_queue_head_t * q, wait_queue_t * wait) ;void fastcall remove_wait_queue (wait_queue_head_t * q, wait_queue_t * wait) ;
(5)等待事件
1 2 3 4 5 6 7 8 9 wait_event(queue , condition); wait_event_interruptible(queue , condition); wait_event_timeout(queue , condition, timeout); wait_event_interruptible_timeout(queue , condition, timeout);
(6)唤醒队列
1 2 3 4 5 6 7 8 9 10 11 void wake_up (wait_queue_head_t * queue ) ;void wake_up_interruptible (wait_queue_head_t * queue ) ;
(7)在等待队列上睡眠。sleep_on 无需条件的睡眠,可能导致竞态,3.15 版本后废弃,采用 wait_event 代替 sleep_on。
1 2 3 4 5 6 7 8 9 sleep_on(wait_queue_head_t * q); interruptible_sleep_on(wait_queue_head_t * q);
接口实现
初始化
(1)直接定义后初始化。init_waitqueue_head()函数会将自旋锁初始化为未锁,等待队列初始化为空的双向循环链表。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 wait_queue_head_t my_queue;init_waitqueue_head(&my_queue); #define init_waitqueue_head(q) \ do { \ static struct lock_class_key __key; \ \ __init_waitqueue_head((q), &__key); \ } while (0) void __init_waitqueue_head(wait_queue_head_t * q, struct lock_class_key* key) { spin_lock_init(&q->lock); lockdep_set_class(&q->lock, key); INIT_LIST_HEAD(&q->task_list); } static inline void INIT_LIST_HEAD (struct list_head* list ) { list ->next = list ; list ->prev = list ; }
(2)定义并初始化的快捷方式,等价于(1)。
1 2 3 4 5 6 7 8 DECLARE_WAIT_QUEUE_HEAD(my_queue); #define DECLARE_WAIT_QUEUE_HEAD(name) \ wait_queue_head_t name = __WAIT_QUEUE_HEAD_INITIALIZER(name) #define __WAIT_QUEUE_HEAD_INITIALIZER(name) { \ .lock = __SPIN_LOCK_UNLOCKED(name.lock), \ .task_list = { &(name).task_list, &(name).task_list } }
(3)定义等待队列,注意此处是定义一个 wait_queue_t
类型的变量 name,并将其 private 与设置为 tsk。
1 2 3 4 5 6 7 8 9 DECLARE_WAITQUEUE(name, tsk); #define DECLARE_WAITQUEUE(name, tsk) \ wait_queue_t name = __WAITQUEUE_INITIALIZER(name, tsk) #define __WAITQUEUE_INITIALIZER(name, tsk) { \ .private = tsk, \ .func = default_wake_function, \ .task_list = { NULL, NULL } }
添加/移出等待队列
(1)向等待队列头中添加等待队列,设置等待的进程为非互斥进程,并将其添加进等待队列头的 队头中(而非队尾) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 void add_wait_queue (wait_queue_head_t * q, wait_queue_t * wait) { unsigned long flags; wait->flags &= ~WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(add_wait_queue); static inline void __add_wait_queue(wait_queue_head_t * head, wait_queue_t * new) { list_add(&new->task_list, &head->task_list); }
(2)向等待队列头中添加等待队列,设置等待的进程为互斥进程,并将其添加进等待队列头的 队尾中(而非队头) 。
1 2 3 4 5 6 7 8 9 10 11 12 13 void add_wait_queue_exclusive (wait_queue_head_t * q, wait_queue_t * wait) { unsigned long flags; wait->flags |= WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); __add_wait_queue_tail(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(add_wait_queue_exclusive); static inline void __add_wait_queue_tail(wait_queue_head_t * head, wait_queue_t * new) { list_add_tail(&new->task_list, &head->task_list); }
该函数和 add_wait_queue()函数功能基本一样,只不过它是将等待的进程设置为互斥进程。
(3)在等待的资源或事件满足时,进程被唤醒,使用该函数从等待头指向的等待队列中删除一项。
1 2 3 4 5 6 7 8 9 10 11 12 void remove_wait_queue (wait_queue_head_t * q, wait_queue_t * wait) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __remove_wait_queue(q, wait); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(remove_wait_queue); static inline void __remove_wait_queue(wait_queue_head_t * head, wait_queue_t * old) { list_del(&old->task_list); }
等待事件
wait_event
wait_event 宏使进程被置于深睡眠状态(不可被信号中断)进行等待,直到 @condition 条件满足。等待队列头 @wq 在每次被 wakeup 时都会检查 @condition 条件,如果不满足则继续睡眠等待 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 #define wait_event(wq, condition) \ do { \ if (condition) \ break; \ __wait_event(wq, condition); \ } while (0) #define __wait_event(wq, condition) \ do { \ DEFINE_WAIT(__wait); \ \ for (;;) { \ prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \ if (condition) \ break; \ schedule(); \ } \ finish_wait(&wq, &__wait); \ } while (0) #define DEFINE_WAIT_FUNC(name, function) \ wait_queue_t name = { \ .private = current, \ .func = function, \ .task_list = LIST_HEAD_INIT((name).task_list), \ } #define DEFINE_WAIT(name) DEFINE_WAIT_FUNC(name, autoremove_wake_function) #define get_current() (current_thread_info()->task) #define current get_current() void prepare_to_wait (wait_queue_head_t * q, wait_queue_t * wait, int state) { unsigned long flags; wait->flags &= ~WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); if (list_empty(&wait->task_list)) __add_wait_queue(q, wait); set_current_state(state); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(prepare_to_wait); void finish_wait (wait_queue_head_t * q, wait_queue_t * wait) { unsigned long flags; __set_current_state(TASK_RUNNING); if (!list_empty_careful(&wait->task_list)) { spin_lock_irqsave(&q->lock, flags); list_del_init(&wait->task_list); spin_unlock_irqrestore(&q->lock, flags); } } EXPORT_SYMBOL(finish_wait); #define set_current_state(state_value) set_mb(current->state, (state_value)) #define set_mb(var, value) \ do { \ var = value; \ barrier(); \ } while (0)
有一个疑问,这里 DEFINE_WAIT(__wait)
定义一个 局部 等待队列项__wait
,其用意在哪?
管理当前等待状态 :使用 __wait
来表示当前进程(.private = current
)的等待状态(set_current_state(state)
),用来跟踪哪个线程(current
)在等待哪个条件(condition
)。
自动清理 :使用 autoremove_wake_function
回调函数,__wait
可以在条件满足时自动移除。
另一个疑问,在 set_current_state(state)
设置进程状态时,为什么不是修改 (&__wait)->private->state
,而是再次获取current->state
来修改?它俩会不会已经不是同一个进程了呢?
单线程上下文 :wait_event
和 __wait_event
宏都 运行在调用进程的上下文中,不会被其它进程异步更改 。因此,在循环等待或状态设置之前和之后,current
不会改变。
调度保障 :在进入不可中断状态后,进程会在调度器 schedule()
中等待条件发生改变并被唤醒。在这一过程中,进程上下文保持一致。
所以,正常情况下 current
和 .private
所指的进程将保持一致。
wait_event_interruptible
wait_event_interruptible 与 wait_event 宏的不同之处是:该宏使进程被置于浅睡眠状态(可被信号中断)进行等待。在每次被唤醒的时候,首先检查 condition 是否为真,如果为真则返回 0;否则,检查进程是被信号唤醒,是则返回 -ERESTARTSYS 错误码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #define wait_event_interruptible(wq, condition) \ ({ \ int __ret = 0; \ if (!(condition)) \ __wait_event_interruptible(wq, condition, __ret); \ __ret; \ }) #define __wait_event_interruptible(wq, condition, ret) \ do { \ DEFINE_WAIT(__wait); \ \ for (;;) { \ prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \ if (condition) \ break; \ if (!signal_pending(current)) { \ schedule(); \ continue; \ } \ ret = -ERESTARTSYS; \ break; \ } \ finish_wait(&wq, &__wait); \ } while (0)
wait_event_timeout
wait_event_timeout 也与 wait_event 类似,均使进程被置于深睡眠状态(不可被中断)进行等待。不过如果所给的睡眠时间为负数则立即返回;如果在睡眠期间被唤醒,且 condition 为真则返回剩余的睡眠时间,否则继续睡眠直到到达或超过给定的睡眠时间,然后返回 0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 #define wait_event_timeout(wq, condition, timeout) \ ({ \ long __ret = timeout; \ if (!(condition)) \ __wait_event_timeout(wq, condition, __ret); \ __ret; \ }) #define __wait_event_timeout(wq, condition, ret) \ do { \ DEFINE_WAIT(__wait); \ \ for (;;) { \ prepare_to_wait(&wq, &__wait, TASK_UNINTERRUPTIBLE); \ if (condition) \ break; \ ret = schedule_timeout(ret); \ if (!ret) \ break; \ } \ finish_wait(&wq, &__wait); \ } while (0)
wait_event_interruptible_timeout
wait_event_interruptible_timeout 是 interruptible 和 timeout 的结合,可使进程被置于浅睡眠状态(可被信号中断)进行等待。如果在睡眠期间被信号打断,则返回 -ERESTARTSYS 错误码;到达或超过给定的睡眠时间,则返回 0。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 #define wait_event_interruptible_timeout(wq, condition, timeout) \ ({ \ long __ret = timeout; \ if (!(condition)) \ __wait_event_interruptible_timeout(wq, condition, __ret); \ __ret; \ }) #define __wait_event_interruptible_timeout(wq, condition, ret) \ do { \ DEFINE_WAIT(__wait); \ \ for (;;) { \ prepare_to_wait(&wq, &__wait, TASK_INTERRUPTIBLE); \ if (condition) \ break; \ if (!signal_pending(current)) { \ ret = schedule_timeout(ret); \ if (!ret) \ break; \ continue; \ } \ ret = -ERESTARTSYS; \ break; \ } \ finish_wait(&wq, &__wait); \ } while (0)
wait_event_interruptible_exclusive
同样和 wait_event_interruptible 一样,不过该睡眠的进程是一个互斥进程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 #define wait_event_interruptible_exclusive(wq, condition) \ ({ \ int __ret = 0; \ if (!(condition)) \ __wait_event_interruptible_exclusive(wq, condition, __ret); \ __ret; \ }) #define __wait_event_interruptible_exclusive(wq, condition, ret) \ do { \ DEFINE_WAIT(__wait); \ \ for (;;) { \ prepare_to_wait_exclusive(&wq, &__wait, TASK_INTERRUPTIBLE); \ if (condition) { \ finish_wait(&wq, &__wait); \ break; \ } \ if (!signal_pending(current)) { \ schedule(); \ continue; \ } \ ret = -ERESTARTSYS; \ abort_exclusive_wait(&wq, &__wait, TASK_INTERRUPTIBLE, NULL); \ break; \ } \ } while (0) void prepare_to_wait_exclusive (wait_queue_head_t * q, wait_queue_t * wait, int state) { unsigned long flags; wait->flags |= WQ_FLAG_EXCLUSIVE; spin_lock_irqsave(&q->lock, flags); if (list_empty(&wait->task_list)) __add_wait_queue_tail(q, wait); set_current_state(state); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(prepare_to_wait_exclusive); void abort_exclusive_wait (wait_queue_head_t * q, wait_queue_t * wait, unsigned int mode, void * key) { unsigned long flags; __set_current_state(TASK_RUNNING); spin_lock_irqsave(&q->lock, flags); if (!list_empty(&wait->task_list)) list_del_init(&wait->task_list); else if (waitqueue_active(q)) __wake_up_locked_key(q, mode, key); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(abort_exclusive_wait);
唤醒队列
wake_up
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 #define wake_up(x) __wake_up(x, TASK_NORMAL, 1, NULL) #define TASK_NORMAL (TASK_INTERRUPTIBLE | TASK_UNINTERRUPTIBLE) void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, void *key) { unsigned long flags; spin_lock_irqsave(&q->lock, flags); __wake_up_common(q, mode, nr_exclusive, 0 , key); spin_unlock_irqrestore(&q->lock, flags); } EXPORT_SYMBOL(__wake_up); static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int wake_flags, void *key) { wait_queue_t *curr, *next; list_for_each_entry_safe(curr, next, &q->task_list, task_list) { unsigned flags = curr->flags; if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break ; } }
唤醒等待队列:可唤醒处于 TASK_INTERRUPTIBLE 和 TASK_UNINTERUPTIBLE 状态的进程,和 wait_event/wait_event_timeout 成对使用。
被唤醒的进程,都会检查自己等待的条件是否满足,满足的进程会修改自己的状态为 TASK_RUNNING;如果条件不满足会继续睡眠,等待下次被唤醒(睡眠的进程可能支持可中断,所以并发所有的唤醒都是由类似函数唤醒)。
wake_up_interruptible
1 #define wake_up_interruptible(x) __wake_up(x, TASK_INTERRUPTIBLE, 1, NULL)
与 wake_up 的唯一区别是它只能唤醒 TASK_INTERRUPTIBLE 状态的进程。与 wait_event_interruptible/wait_event_interruptible_timeout/wait_event_interruptible_exclusive 成对使用。
wake_up_…
1 2 3 4 #define wake_up_all(x) __wake_up(x, TASK_NORMAL, 0, NULL) #define wake_up_interruptible_nr(x, nr) __wake_up(x, TASK_INTERRUPTIBLE, nr, NULL) #define wake_up_interruptible_all(x) __wake_up(x, TASK_INTERRUPTIBLE, 0, NULL)
这些也基本都和 wake_up/wake_up_interruptible 一样。
在等待队列上睡眠
sleep_on
该函数的作用是定义一个等待队列(wait),并将 current 进程添加到等待队列中(wait),然后将当前进程的状态置为 TASK_UNINTERRUPTIBLE,并将等待队列(wait)添加到等待队列头(q)中。之后就被挂起直到资源可以获取,才被从等待队列头(q)中唤醒,从等待队列头(q)中移出等待队列(wait)。在被挂起等待资源期间,该进程不能被信号唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 void __sched sleep_on (wait_queue_head_t * q) { sleep_on_common(q, TASK_UNINTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); } EXPORT_SYMBOL(sleep_on); static long __sched sleep_on_common (wait_queue_head_t * q, int state, long timeout) { unsigned long flags; wait_queue_t wait; init_waitqueue_entry(&wait, current); __set_current_state(state); spin_lock_irqsave(&q->lock, flags); __add_wait_queue(q, &wait); spin_unlock(&q->lock); timeout = schedule_timeout(timeout); spin_lock_irq(&q->lock); __remove_wait_queue(q, &wait); spin_unlock_irqrestore(&q->lock, flags); return timeout; } static inline void init_waitqueue_entry (wait_queue_t * q, struct task_struct* p) { q->flags = 0 ; q->private = p; q->func = default_wake_function; } #define __set_current_state(state_value) \ do { \ current->state = (state_value); \ } while (0)
上面,四个自旋锁操作的区别在于中断处理和锁的解锁方式:
spin_lock_irqsave(&q->lock, flags)
:
获取自旋锁并禁用本地 CPU 中断。
保存中断状态到 flags
中,以便之后恢复 ,用于确保在锁持有期间,不会因中断处理导致竞态条件。
spin_unlock(&q->lock)
:
释放自旋锁。
不恢复中断状态 ,常用于不涉及中断的情况下。
spin_lock_irq(&q->lock)
:
获取自旋锁并禁用本地 CPU 中断。
不保存中断状态 ,一般用于不关心中断状态恢复的临界区。
spin_unlock_irqrestore(&q->lock, flags)
:
释放自旋锁。
恢复之前保存的中断状态 ,用于结束一段可能影响中断处理的代码块,并恢复到进入时的中断状态。
sleep_on_timeout
与 sleep_on() 函数的区别在于调用该函数时,如果在指定的时间内(timeout)没有获得等待的资源就会返回。实际上是调用 schedule_timeout() 函数实现的。值得注意的是,如果给定的 timeout 小于 0,则不会睡眠。该函数返回的是真正的睡眠时间。
1 2 3 4 long __sched sleep_on_timeout (wait_queue_head_t * q, long timeout) { return sleep_on_common(q, TASK_UNINTERRUPTIBLE, timeout); } EXPORT_SYMBOL(sleep_on_timeout);
interruptible_sleep_on
该函数和 sleep_on() 函数唯一的区别是:将当前进程的状态置为 TASK_INTERRUPTINLE,这意味在睡眠过程中,如果该进程收到信号则会被唤醒。
1 2 3 4 void __sched interruptible_sleep_on (wait_queue_head_t * q) { sleep_on_common(q, TASK_INTERRUPTIBLE, MAX_SCHEDULE_TIMEOUT); } EXPORT_SYMBOL(interruptible_sleep_on);
interruptible_sleep_on_timeout
类似于 sleep_on_timeout() 函数。进程在睡眠中可能在等待的时间没有到达就被信号打断而被唤醒,也可能是等待的时间到达而被唤醒。
1 2 3 4 long __sched interruptible_sleep_on_timeout (wait_queue_head_t * q, long timeout) { return sleep_on_common(q, TASK_INTERRUPTIBLE, timeout); } EXPORT_SYMBOL(interruptible_sleep_on_timeout);
以上四个函数都是让进程在等待队列上睡眠,不过是小有差异而已。在实际用的过程中,根据需要选择合适的函数使用就是了。例如,
在对软驱数据的读写中,如果设备没有就绪,则调用 sleep_on() 函数睡眠直到数据可读(可写);
在打开串口的时候,如果串口端口处于关闭状态,则调用 interruptible_sleep_on() 函数尝试等待其打开;
在声卡驱动中,读取声音数据时,如果没有数据可读,就会等待足够常的时间直到可读取。
字符设备驱动示例
要在 Linux 内核中使用 wait_queue_head_t
来实现进程的等待和唤醒,我们可以编写一个简单的字符设备驱动示例。该示例将展示如何让一个进程进入睡眠状态,直到另一个进程或事件唤醒它。
内核代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 #include <linux/fs.h> #include <linux/init.h> #include <linux/module.h> #include <linux/sched.h> #include <linux/uaccess.h> #include <linux/wait.h> #define DEVICE_NAME "wait_queue_example" static int major_num;static DECLARE_WAIT_QUEUE_HEAD (wq) ;static int condition = 0 ;static ssize_t device_read (struct file* filp, char * buffer, size_t length, loff_t * offset) { printk("Process %d (%s) going to sleep\n" , current->pid, current->comm); wait_event_interruptible(wq, condition != 0 ); condition = 0 ; printk("Process %d (%s) woken up\n" , current->pid, current->comm); return 0 ; } static ssize_t device_write (struct file* filp, const char * buffer, size_t length, loff_t * offset) { printk("Process %d (%s) waking up the readers\n" , current->pid, current->comm); condition = 1 ; wake_up_interruptible(&wq); return length; } static struct file_operations fops = { .read = device_read, .write = device_write, }; static int __init wait_queue_example_init (void ) { major_num = register_chrdev(0 , DEVICE_NAME, &fops); if (major_num < 0 ) { printk("Registering char device failed with %d\n" , major_num); return major_num; } printk("I was assigned major number %d." , major_num); printk("To talk to the driver, create a dev file with 'mknod /dev/%s c %d 0'.\n" , DEVICE_NAME, major_num); printk("Try various minor numbers. Try to cat and echo to the device file.\n" ); return 0 ; } static void __exit wait_queue_example_exit (void ) { unregister_chrdev(major_num, DEVICE_NAME); printk("Goodbye, world!\n" ); } module_init(wait_queue_example_init); module_exit(wait_queue_example_exit); MODULE_LICENSE("GPL" ); MODULE_AUTHOR("ahaaa" ); MODULE_DESCRIPTION("A simple Linux driver with wait queues" );
编译模块
使用 make 编译模块,在包含源代码和 Makefile 的目录中运行 make 命令,这会生成一个.ko(内核模块)文件,比如 wait_queue_example.ko。
1 2 3 4 5 6 7 obj-m += wait_queue_example.o all: make -C /lib/modules/$(shell uname -r) /build M=$(PWD) modules clean: make -C /lib/modules/$(shell uname -r) /build M=$(PWD) clean
加载模块
使用 insmod 命令加载模块:sudo insmod wait_queue_example.ko
。如果一切顺利,你将看不到任何输出,因为 printk
函数不会输出到控制台,而是输出到内核日志。
查看模块是否成功加载 / 卸载:lsmod | grep wait_queue_example
查看内核日志,以确保模块正常启动:dmesg | tail
使用 rmmod 命令卸载模块:sudo rmmod wait_queue_example
创建设备文件
查找 wait_queue_example 对应的主设备号:cat /proc/devices
使用 mknod 创建设备文件,假设主设备号是 major_num:sudo mknod /dev/wait_queue_example c <major_num> 0
设置设备文件的权限,使其可读写:sudo chmod 666 /dev/wait_queue_example
运行进程
在一个终端中运行cat /dev/wait_queue_example
,此进程将进入等待队列进行睡眠。
在另一个终端中使用echo "wake" > /dev/wait_queue_example
,这将唤醒等待的进程。
再次通过 dmesg | tail
查看内核日志,可以看到:
1 2 3 [28154.120944] Process 9571 (cat) going to sleep [28200.935241] Process 9603 (bash) waking up the readers [28200.935265] Process 9571 (cat) woken up
这两个进程,能共享 condition 这个变量吗?一个进行修改了这个变量,另一个进程可以读到修改后的变量吗?
是的,这两个进程可以共享和访问 condition 变量,因为 它们在同一个内核模块中运行,并共享相同的内存地址空间(同一块内存) 。因此,一个进程对 condition 的更改对其它进程立即可见。
参考资料:
https://shunlqing.github.io/2018/05/19/2018_5_19LinuxKernel_WaitQueue/
https://cslqm.github.io/2020/01/08/wait_queue_head_t/
https://juejin.cn/post/7083852861930471432