Linux 内核 kfifo 环形队列。

前置知识

2 的幂次方数

在二进制中,2 的幂次方很容易表示:一个数只有最高 bit 位是 1,其余全为 0。

如何判断某个数是否是 2 的幂次方,如果不是,则更新该数为大于该数的最小的 2 的幂次方的数

  • 判断一个数是不是 2 的幂次方:判断 n & (n - 1) 是否非零,非零则不是 2 的幂次方。如 3 不是(0b11 & 0b10)、4 是(0b100 & 0b011)。
  • 大于等于某数的最小的 2 的幂次方的数1UL << fls(n - 1)

fls(x) 即找到 x 的二进制表示中最高置 1 位的索引(索引从 1 开始),如 fls(0b101) = 3,则 1UL << fls(6 - 1) = 0b1000。fls 函数如何实现?

fls 即 find the last bit set,查找最后一个置 1 的 bit 位。32 位数则从 bit31 开始找,64 位数则从 bit63 开始找。检查完 bit0 后停止。

可以假想一下,第一个置 1bit 的前面 bit 都是 0 的,而第一个置 1bit 后面的位是不关心的(不管是 0 还是 1),就可以 转化为查找一堆 bit 中是否有置 1 的 bit 位,所以这里是可以使用二分法查找

以 32 位数 fls 为例,首先看前 16bit 是否是全 0,是全 0 就把后 16bit 移到前面,此时第一个置 1 的 bit 不在前 16bit,所以 n 要减去 16。如果前 16bit 非 0,那就说明置 1 的 bit 包含在前面 16bit 里了,继续下一步,即在包含 1 的那 16bit 中使用二分查找。

32 位数,按照二分法查找算法步骤需要 log(32)=5 步。64 位数,log(64)=6 步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#include <stdio.h>

// find last bit set, binary search
int fls(unsigned int v) {
int n = 32;

if (!v) return -1;
if (!(v & 0xFFFF0000)) { v <<= 16; n -= 16; } // 确定在不在 32bits 数中的高 16bits 位
if (!(v & 0xFF000000)) { v <<= 8; n -= 8; } // 确定在不在过滤后的 16bits 数中的高 8bits 位
if (!(v & 0xF0000000)) { v <<= 4; n -= 4; } // 确定在不在过滤后的 8bits 数中的高 4bits 位
if (!(v & 0xC0000000)) { v <<= 2; n -= 2; } // 确定在不在过滤后的 4bits 数中的高 2bits 位
if (!(v & 0x80000000)) { v <<= 1; n -= 1; } // 确定在不在过滤后的 2bits 数中的高 1bits 位

return n;
}

int main() {
unsigned int size = 1000;
if (size & (size - 1)) {
// roundup_pow_of_two(): 向上取大于等于某数的最小的 2 的幂次方的数
size = (1UL << fls(size - 1)); // 减一是避免传进来的数本就是 2 的幂次方数,达到返回结果还是这个数(而不是乘一倍)
}
printf("size = 0x%x\r\n", size); // 0x400 = 1024

return 0;
}

对 2 的幂次方取模

假设 size 等于 2 的幂次方(前置条件),则表达式 in & (size - 1) 的作用相当于 in % size

解释:

考虑 size 等于 2 的幂次方,那么 size 的二进制可以表示为 1 后面跟着 n 个 0 的形式。例如:size = 2^3 表示为 1000,size = 2^4 表示为 10000。则 size - 1 的二进制可以表示为 n 个 1。

对任何整数 in 进行 in & (size - 1) 按位与操作会 保留 in 的二进制表示中的最低的 n 位,而将其余的高位置为 0。也就是运算后的结果在 0~size-1 之间

模运算 in % size 也是保留 in 对 size 的余数,这个余数正好是 in 二进制表示中的最低的 n 位 。因此表达式 in & (size - 1)in % size 等价,但是前者的位运算效率更高。

原码反码补码

在计算机科学中,原码、反码和补码是三种用于表示整数的方法,特别是有符号整数 。它们主要 用于二进制系统中。无符号数不涉及补码的概念。

原码

原码是最直接的一种表示方法,它使用一个符号位和一个数值位来表示整数。最高位(最左边一位)表示符号,剩余位表示数值

  • 符号位:0 表示正数,1 表示负数
  • 数值位:直接表示数值的大小。

示例(char 类型数据)

  • +5 的原码表示为:0 0000101
  • -5 的原码表示为:1 0000101

原码表示的一个问题是存在两个零(正零和负零),即 0 00000001 0000000

反码

反码通过对原码的数值位取反来表示负数。正数的反码与其原码相同,负数的反码则是将其原码的数值位逐位取反。

示例(char 类型数据)

  • +5 的反码表示为:0 0000101
  • -5 的反码表示为:1 1111010

反码表示也存在两个零:0 0000000(正零)和 1 1111111(负零)。

补码

补码是现代计算机中 最常用 的有符号整数表示方法,负数在计算机中是以补码形式表示 。它通过将数值按二进制加法计算机的模数(通常是 2 的幂)进行减法来表示负数。具体来说, 负数的补码表示为原码数值位取反并加 1

示例(char 类型数据)

  • +5 的补码表示为:0 0000101
  • -5 的补码表示为:1 1111011(将 0 0000101 取反为 1 1111010,然后加 1)

补码表示的优点是只有一个零(0 0000000),且在硬件电路中,加法和减法运算可以使用相同的电路实现,这使得运算更加高效。

总结和比较

  1. 原码

    • 符号位表示正负,数值位表示大小。
    • 存在两个零(正零和负零)。
  2. 反码

    • 正数与原码相同,负数取原码数值位的反。
    • 存在两个零(正零和负零)。
  3. 补码

    • 正数与原码相同,负数取(负数)原码数值位的反 并加 1
    • 只有一个零,运算更高效。

示例对比

1
2
3
4
5
值       原码      反码      补码
+5 00000101 00000101 00000101
-5 10000101 11111010 11111011
+0 00000000 00000000 00000000
-0 10000000 11111111 00000000 (注意:补码没有负零)

正数的原码、反码与补码表示都一样。负数的补码表示是其对应的(负数)原码的数值位取反并加 1,或者是(正数)原码的符号位和数值位均取反并加 1

计算有符号数 3 - 5 = -2 的过程:

  • 正数 3 在计算机中的表示:补码,其值为 0000 0011
  • 负数 -5 在计算机中的表示:补码,其值为 正数的原码的符号位和数值位均取反 并加 1,即 5 的原码 0000 0101,均取反 1111 1010,加一 1111 1011
  • 3-5 等于 +3 的补码 + (-5 的补码),即 0000 0011 + 1111 1011 = 1111 1110,减一得到反码 1111 1101,取反得到原码 1000 0010 = (-2)

无符号整数溢出回绕

在 C 语言中,无符号整数的溢出行为是 定义良好的 (最大值 +1 后必定是 0)。当一个无符号整数在加法、减法、乘法或其他操作中超过其表示范围时,它会发生 溢出回绕(也称为模数回绕)。具体来说,结果会被截断为适合该类型的最大值,并从零重新开始。这种行为基于二进制算术模数。

何为溢出回绕行为:当无符号整数的运算结果超过其表示范围时,结果会模数化。例如,对于一个 8 位的 unsigned char 类型,它的范围是 0 到 255。如果结果大于 255,它会回绕到从 0 开始的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <stdio.h>

int main() {
unsigned int a = 0xfffffffa;
unsigned int b = a + 10;
unsigned int c = 4;
printf("a = %u, 0x%08x\n", a, a);
printf("b = %u, 0x%08x\n", b, b);
printf("b - a = %u, 0x%08x\n", b - a, b - a);
printf("c - a = %u, 0x%08x\n", c - a, c - a);
return 0;
}

/*
a = 4294967290, 0xfffffffa
b = 4, 0x00000004
-> a + 10 = 0x1 00 00 00 04 溢出
因为 uint32 共计 32 位,溢出时的最高位无法获取,只能获取低 32 位,即 0x00000004
b - a = 10, 0x0000000a
-> b - a = 4 + (-4294967290 的补码) = 0x04 + (~(0xfffffffa)+1)
= 0x04 + 0x00000006 = 0x0a = 10

-> b - a = 0x04 - 0xfffffffa = -4294967286 + 2^32 = 0x0000000a = 10
因为 4 - 4294967290 将变成一个负值,这在无符号整数表示中是不允许的。
为了处理这种情况,结果会被加上一个模数(即 2^32),以得到正确的无符号结果。
c - a = 10, 0x0000000a
*/

Linux 内核 kfifo 环形队列

Linux 内核中有一个先进先出的数据结构,采用环形队列的数据结构来实现,提供一个无边界的字节流服务。最重要的是,这个队列采用的是无锁的方式来实现。即当它用于只有一个入队线程和一个出队线程的场景时,两个线程可以并发操作,而不需要任何加锁行为,就可以保证 kfifo 的线程安全。这个队列名为 kfifo。

数据结构

1
2
3
4
5
6
7
struct kfifo {
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
spinlock_t *lock; /* protects concurrent modifications */
};
  • buffer: 存放数据的缓存表
  • size: 缓存空间的大小,要求为 2 的幂次方
  • in: 指向 buffer 队列的队头,代码中计数持续增长(未取模)
  • out: 指向 buffer 队列的队尾,代码中计数持续增长(未取模)
  • lock: 多生产者 / 多消费者场景中用于同步互斥的锁

注意事项

对于 Linux 内核 kfifo 环形队列:

以上三种条件都满足的情况下,可以使用 kfifo 无锁队列。相反,如果存在多个生产者或多个消费者,则需要通过锁来进行同步:

buffer 大小为 2 的幂次方

为什么 buffer 大小要求为 2 的幂次方?为了使用位运算,达到快、快、不择手段的快。

如果用户给定的 size 大小不是 2 的幂次方会怎么样?程序会自动向上取到大于它的 2 的幂次方数作为 size。

kfifo_alloc 初始化

kfifo 的初始化是指为 kfifo 分配空间、初始化 kfifo 中的各项参数等操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* kfifo_alloc - allocates a new FIFO and its internal buffer
* @size: the size of the internal buffer to be allocated.
* @gfp_mask: get_free_pages mask, passed to kmalloc()
* @lock: the lock to be used to protect the fifo buffer
*
* The size will be rounded-up to a power of 2.
*/
struct kfifo *kfifo_alloc(unsigned int size, unsigned int __nocast gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;

/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
*/
if (size & (size - 1)) { // 如果不是 2 的幂次方,则向上取到 2 的幂次方
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}

buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);

ret = kfifo_init(buffer, size, gfp_mask, lock);

if (IS_ERR(ret))
kfree(buffer);

return ret;
}

/**
* kfifo_init - allocates a new FIFO using a preallocated buffer
* @buffer: the preallocated buffer to be used.
* @size: the size of the internal buffer, this have to be a power of 2.
* @gfp_mask: get_free_pages mask, passed to kmalloc()
* @lock: the lock to be used to protect the fifo buffer
*
* Do NOT pass the kfifo to kfifo_free() after use ! Simply free the
* struct kfifo with kfree().
*/
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size,
unsigned int __nocast gfp_mask, spinlock_t *lock)
{
struct kfifo *fifo;

/* size must be a power of 2 */
BUG_ON(size & (size - 1));

fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
if (!fifo)
return ERR_PTR(-ENOMEM);

fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;

return fifo;
}

__kfifo_put 入队(无锁)

__kfifo_put()是 kfifo 的入队函数(无锁版本),源码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* __kfifo_put - puts some data into the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most 'len' bytes from the 'buffer' into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
* 注意,对于仅一个并发的读者和写者,使用这些功能不需要额外的锁
*/
unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;

/* (in-out) 表示已使用的空间,size-(in-out)表示剩余的空间。通过 min 来防止写越界 */
len = min(len, fifo->size - fifo->in + fifo->out);

/* 下面的设计考虑了:从队列尾部填充一部分、从队列首部填充剩余的部分的情况 */
/* first put the data starting from fifo->in to buffer end */
/* fifo->buffer = |0........in....size-1| 首先将前半部分数据缓存放在 in~size-1 所在的区间 */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

/* then put the rest (if any) at the beginning of the buffer */
/* 若 l<len,则表明上面 in~size-1 区间不够放下 len 长度,此时要将剩余后半部分数据从环的开始端继续放 */
memcpy(fifo->buffer, buffer + l, len - l);

fifo->in += len; /* 从这里可以说明,指针 in 一直在增加,其值未曾对 size 取模 */

return len;
}

根据前面的《无符号整数溢出回绕》小节可知,无符号数 in 相当于 b,无符号数 out 相当于 a,当 out 很大、in 溢出回绕时,in-out(b-a) 的距离仍为正确值。无论何时,即使发生整数回绕,kfifo 中的指针都有如下关系:

kfifo.inkfifo.out<=kfifo.sizekfifo.in - kfifo.out <= kfifo.size

__kfifo_get 出队(无锁)

__kfifo_get()是 kfifo 的出队函数(无锁版本),源码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* __kfifo_get - gets some data from the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most 'len' bytes from the FIFO into the
* 'buffer' and returns the number of copied bytes.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
* 注意,对于仅一个并发的读者和写者,使用这些功能不需要额外的锁
*/
unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;

/* 这里 in-out 可能会发生整数溢出回绕,如 in-out=0x04-0xfffffffa,
* 但不影响正确性,其结果为正确值 10,正好对应 abcdef0123 十处 */
len = min(len, fifo->in - fifo->out);

/* 后面的原理跟__kfifo_put 的原理一致 */
/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);

fifo->out += len; /* 从这里可以说明,指针 out 一直在增加,其值未曾对 size 取模 */

return len;
}

__kfifio_reset 重置队列

1
2
3
4
5
6
7
8
/**
* __kfifo_reset - removes the entire FIFO contents, no locking version
* @fifo: the fifo to be emptied.
*/
static inline void __kfifo_reset(struct kfifo *fifo)
{
fifo->in = fifo->out = 0;
}

入队出队加锁版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* kfifo_reset - removes the entire FIFO contents
* @fifo: the fifo to be emptied.
*/
static inline void kfifo_reset(struct kfifo *fifo)
{
unsigned long flags;

spin_lock_irqsave(fifo->lock, flags);
__kfifo_reset(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
}

/**
* kfifo_put - puts some data into the FIFO
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most 'len' bytes from the 'buffer' into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*/
static inline unsigned int kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;

spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_put(fifo, buffer, len);
spin_unlock_irqrestore(fifo->lock, flags);

return ret;
}

/**
* kfifo_get - gets some data from the FIFO
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most 'len' bytes from the FIFO into the
* 'buffer' and returns the number of copied bytes.
*/
static inline unsigned int kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;

spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_get(fifo, buffer, len);

/*
* optimization: if the FIFO is empty, set the indices to 0
* so we don't wrap the next time
*/
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;
spin_unlock_irqrestore(fifo->lock, flags);

return ret;
}

参考资料:

  1. https://blog.csdn.net/suz_cheney/article/details/112456368
  2. https://juejin.cn/post/6983865054147903518
  3. https://elixir.bootlin.com/linux/v2.6.12/source/kernel/kfifo.c
  4. https://blog.csdn.net/u012630961/article/details/80974857