DPDK 中的无锁环形队列利用原子操作(CAS)、出入队前后的内存屏蔽和高效的数据结构,提供了一个多生产者、多消费者并发安全的队列实现。但真的是“无锁”实现吗?

严格来说,“无锁”指的是避免使用传统的锁(例如互斥锁、读写锁等)来控制并发访问。在某些情况下,使用 CAS(Compare-And-Swap)等原子操作也会被称为无锁编程,因为它们不需要锁的开销(如上下文切换开销)。

其实,DPDK 中的无锁环形队的单生产者、单消费者是真正无锁的(也没有使用 CAS 原子操作);但是,对于多生产者、多消费者来说,其利用了 CAS 原子操作来实现互斥地修改生产者、消费者的头指针和尾指针,达到安全地进行数据的并发访问。

为什么单生产者、单消费者是无锁的?因为单生产者只会竞争写入位置,单消费者只会竞争读取位置,不会出现跨线程的竞争同一位置。

前置知识

柔性数组成员

概念

void *ring[0]void *ring 都是指针类型,但它们有不同的含义和用法。

  • void *ring是一个指向 void 类型的 指针变量。它可以指向任意类型的数据,但它本身只是一块内存地址。
  • void *ring[0]是一个 指针数组 ,数组的大小为 0, 放在结构体的最后一个成员中以声明变长数组

在 C 语言中,数组大小为 0 是非法的,通常不被允许。然而,在某些编译器扩展和特定用法(例如在结构体的最后一个成员中)中,使用零长度数组是一种常见的技巧,用于实现柔性数组成员(Flexible Array Member,FAM)。FAM 是一种用于在结构体中声明变长数组的方法。

示例

void *ring 用法示例:

1
2
3
4
5
6
7
struct example1 {
void *ring;
};

struct example1 *ex = malloc(sizeof(struct example1));
ex->ring = malloc(10 * sizeof(int)); // 分配一个存储 10 个 int 的空间
// ex->ring 是一个单个指针,指向一块动态分配的内存

void *ring[0] 用法示例:

1
2
3
4
5
6
7
8
struct example2 {
int size;
void *ring[0]; // 柔性数组成员
};

struct example2 *ex = malloc(sizeof(struct example2) + 10 * sizeof(void *)); // NOTE
ex->size = 10;
// ex->ring 是一个指向 10 个 void* 的数组

ring[0]是一个柔性数组成员,用于在结构体中实现变长数据。分配内存后,ring[0] 表示数据的开始位置。

实际使用时,柔性数组成员可 作为数据项 ,通常会分配比结构体定义更大的内存,如上面额外的10 * sizeof(void *) 资源,以便容纳实际的数据项。

优点

在结构体中使用柔性数组成员的优点:

  • 不占用任何空间 :由于ring[0] 是一个柔性数组成员,其大小在编译时为 0,不占用结构体本身的空间。
  • 分配时连续 :当为这个结构体 malloc 分配内存时, 会同时分配结构体和需要存储的数据项所需的所有内存,且这两块内存是连续的
  • 方便内存释放 :因为内存是一次性分配的(不像上面的 example1 和它的 ring 成员是分别 malloc 的),释放时也只需要一次 free 操作即可释放整个结构体 及其包含的数据项
  • 能提高速度:连续内存分配使得缓存命中率更高,内存访问速度更快。这对于需要高性能的应用来说是非常重要的。

DPDK 无锁环形队列

  • DPDK 版本:v2.2.0,与高版本(如 v20.08)中 rte_ring 的结构有所不同,这里还是从简单的结构学起吧。

rte_ring 数据结构

数据结构 rte_ring 中需要关注的点:

  • 生产者和消费者各有一对 head 和 tail 索引(而不是共用一对 head 和 tail 索引):其实这是为了实现多个线程同时出队、入队(具体看后面的代码、配图);
  • [0, size) 区间,而是在 [0, 2^32) 区间。
  • void *ring[0] 作为数据项(其优点见《前置知识》小节)。
  • ring[] 数组。
  • __rte_cache_aligned,将其分配到不同的 cache line。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
/**
* An RTE ring structure.
*
* The producer and the consumer have a head and a tail index. The particularity
* of these index is that they are not between 0 and size(ring). These indexes
* are between 0 and 2^32, and we mask their value when we access the ring[]
* field. Thanks to this assumption, we can do subtractions between 2 index
* values in a modulo-32bit base: that's why the overflow of the indexes is not
* a problem.
*/
struct rte_ring {
char name[RTE_RING_NAMESIZE]; /**< Name of the ring. */
int flags; /**< 创建模式,支持多 / 单生产者和多 / 单消费者 */
const struct rte_memzone* memzone;
/**< Memzone, if any, containing the rte_ring */
/**< rte_menzone 是 dpdk 内存管理底层的数据结构,memzone 用于记录在哪块 mem 分配的 rte_ring,释放的时候使用 */

/** Ring producer status. 生产者数据结构 */
struct prod {
uint32_t watermark; /**< Maximum items before EDQUOT. 水位线,用于检查入队后是否超过水位线 */
uint32_t sp_enqueue; /**< True, if single producer. */
uint32_t size; /**< Size of ring. */
uint32_t mask; /**< Mask (size-1) of ring. */
volatile uint32_t head; /**< Producer head. 入队前更新 head 字段,然后进行入队 */
volatile uint32_t tail; /**< Producer tail. 入队完更新 tail 字段(让先更新过 head 字段的线程
优先更新 tail 字段,自己等待一会),更新后 tail == head */
} prod __rte_cache_aligned;

/** Ring consumer status. 消费者数据结构 */
struct cons {
uint32_t sc_dequeue; /**< True, if single consumer. */
uint32_t size; /**< Size of the ring. */
uint32_t mask; /**< Mask (size-1) of ring. */
volatile uint32_t head; /**< Consumer head. 与生产者处理逻辑一样 */
volatile uint32_t tail; /**< Consumer tail. 与生产者处理逻辑一样 */
#ifdef RTE_RING_SPLIT_PROD_CONS
/* 这个属性就是要求 gcc 在编译的时候,把 cons/prod 结构分配到不同的 cache line,为什么这样做?
因为如果没有这些的话,这两个结构在内存上是连续的,编译器不会把他们分配到不同 cache line,
而一般上这两个结构是要被不同的 CPU 核访问的,如果连续的话这两个核就会产生伪共享问题 */
} cons __rte_cache_aligned;
#else
} cons;
#endif

#ifdef RTE_LIBRTE_RING_DEBUG
struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
#endif

void* ring[0] __rte_cache_aligned; /**< Memory space of ring starts here.
* not volatile so need to be careful
* about compiler re-ordering */
};

相关宏

TAILQ_HEAD 和 TAILQ_ENTRY 尾队列设计细节可参考 这篇文章

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
/* 尾队列表头 TAILQ_HEAD 宏定义 */
#define TAILQ_HEAD(name, type) \
struct name { \
struct type* tqh_first; /* first element */ \
struct type** tqh_last; /* addr of last next element */ \
}

/* 尾队列实体 TAILQ_ENTRY 宏定义 */
#define TAILQ_ENTRY(type) \
struct { \
struct type* tqe_next; /* next element */ \
struct type** tqe_prev; /* address of previous next element */ \
}

/** dummy structure type used by the rte_tailq APIs */
struct rte_tailq_entry {
/* 扩展到:
struct {
struct rte_tailq_entry* tqe_next;
struct rte_tailq_entry** tqe_prev;
} next;
*/
TAILQ_ENTRY(rte_tailq_entry) next; /**< Pointer entries for a tailq list */
void* data; /**< Pointer to the data referenced by this tailq entry */
};

/** dummy */
/* 扩展到:
struct rte_tailq_entry_head {
struct rte_tailq_entry* tqh_first;
struct rte_tailq_entry** tqh_last;
};
*/
TAILQ_HEAD(rte_tailq_entry_head, rte_tailq_entry);

#define RTE_TAILQ_NAMESIZE 32

/**
* The structure defining a tailq header entry for storing
* in the rte_config structure in shared memory. Each tailq
* is identified by name.
* Any library storing a set of objects e.g. rings, mempools, hash-tables,
* is recommended to use an entry here, so as to make it easy for
* a multi-process app to find already-created elements in shared memory.
*/
struct rte_tailq_head {
struct rte_tailq_entry_head tailq_head; /**< NOTE: must be first element */
char name[RTE_TAILQ_NAMESIZE];
};

struct rte_tailq_elem {
/**
* Reference to head in shared mem, updated at init time by
* rte_eal_tailqs_init()
* 在初始化时由 rte_eal_tailqs_init() 更新的共享内存中的头部引用
*/
struct rte_tailq_head* head;
/* 扩展到:
struct {
struct rte_tailq_elem* tqe_next;
struct rte_tailq_elem** tqe_prev;
} next;
*/
TAILQ_ENTRY(rte_tailq_elem) next;
const char name[RTE_TAILQ_NAMESIZE];
};

/**
* Return the first tailq entry casted to the right struct.
* 返回转换为正确结构的第一个 tailq 条目
*/
#define RTE_TAILQ_CAST(tailq_entry, struct_name) (struct struct_name*)&(tailq_entry)->tailq_head

宏扩展后的数据结构

上面的宏定义,使用 gcc -E 预处理后是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
struct rte_tailq_entry {
struct {
struct rte_tailq_entry* tqe_next;
struct rte_tailq_entry** tqe_prev;
} next;
void* data;
};

struct rte_tailq_entry_head {
struct rte_tailq_entry* tqh_first;
struct rte_tailq_entry** tqh_last;
};

struct rte_tailq_head {
struct rte_tailq_entry_head tailq_head;
char name[32];
};

struct rte_tailq_elem {
struct rte_tailq_head* head;

struct {
struct rte_tailq_elem* tqe_next;
struct rte_tailq_elem** tqe_prev;
} next;
const char name[32];
};

struct rte_ring_list {
struct rte_tailq_entry* tqh_first;
struct rte_tailq_entry** tqh_last;
};

/* 定义全局 rte_ring_tailq 尾队列,里面可以添加很多个创建的 ring_list(也是一个尾队列) */
static struct rte_tailq_elem rte_ring_tailq = {
.name = RTE_TAILQ_RING_NAME,
};

内存分布及挂接方式

全局 rte_ring_tailq 尾队列的内存分布及挂接方式:

DPDK 无锁环形队列内存分布及挂接方式

rte_ring_create 队列创建

几处要点:

  • ring_size:计算待申请的 rte_ring 的大小(而不是申请空间),包含 void* ring[]数据块的空间。
  • te:申请一块 rte_tailq_entry 的空间。
  • r = mz->addr:申请一块内存,其中 mz->addr 指向了申请的大小为 ring_size 的 rte_ring 结构的首地址。
  • rte_ring_init:对上一步申请的大小为 ring_size 的 rte_ring 结构 r 进行初始化。
  • te->data = (void*)r:初始化参数之后,把 rte_tailq_entry 的 data 节点指向 rte_ring 结构地址。
    • 要干嘛?尾队列里面的每个 entry 的 data 项都指向一个 rte_ring 无锁环形队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
/**
* rte_ring_create - create the ring
* @name: 唯一标识 ring 的 name,不同的 ring 具有不同的 name
* @count: ring 队列的长度,必须是 2 的幂次方
* @socket_id: ring 位于的 socket(不是那个网络 socket)
* @flags: 指定创建的 ring 的属性:单 / 多生产者、单 / 多消费者之间的组合。
* 0 表示使用默认属性(多生产者、多消费者)。不同的属性出入队的操作会有所不同。
*/
struct rte_ring* rte_ring_create(const char* name, unsigned count, int socket_id, unsigned flags) {
char mz_name[RTE_MEMZONE_NAMESIZE];
struct rte_ring* r;
struct rte_tailq_entry* te;
const struct rte_memzone* mz;
ssize_t ring_size;
int mz_flags = 0;
struct rte_ring_list* ring_list = NULL;

/* 这里是把 rte_tailq_entry_head 结构强转成了 rte_ring_list 结构,这俩结构的构成完全一样,强转没啥影响 */
ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);

/* 返回申请的资源大小(包含结构体和数据块):sizeof(struct rte_ring) + (count * sizeof(void *)) */
ring_size = rte_ring_get_memsize(count);
if (ring_size < 0) {
rte_errno = ring_size;
return NULL;
}

/* 申请一个 TAILQ ENTRY 尾队列节点的资源,在创建完成 rte_ring 后,挂接这个无锁队列到尾队列 entry 中 */
te = rte_zmalloc("RING_TAILQ_ENTRY", sizeof(*te), 0);
if (te == NULL) {
RTE_LOG(ERR, RING, "Cannot reserve memory for tailq\n");
rte_errno = ENOMEM;
return NULL;
}

/* 给唯一标识 ring 的名字增加一个前缀 */
snprintf(mz_name, sizeof(mz_name), "%s%s", RTE_RING_MZ_PREFIX, name);

rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);

/* reserve a memory zone for this ring. If we can't get rte_config or
* we are secondary process, the memzone_reserve function will set
* rte_errno for us appropriately - hence no check in this this function
* 为新创建的 ring 分配内存空间 */
mz = rte_memzone_reserve(mz_name, ring_size, socket_id, mz_flags);
if (mz != NULL) {
r = mz->addr; // 获取分配的内存空间的起始地址
/* no need to check return value here, we already checked the
* arguments above */
rte_ring_init(r, name, count, flags); // ring 队列初始化

te->data = (void*)r; // 挂接这个无锁队列到尾队列 entry 中
r->memzone = mz;
/* 在全局尾队列 ring_list 上挂接一个节点 te(其数据为 ring 结构的节点 r 的地址)*/
TAILQ_INSERT_TAIL(ring_list, te, next);
} else {
r = NULL;
RTE_LOG(ERR, RING, "Cannot reserve memory\n");
rte_free(te);
}
rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);

return r;
}

rte_ring_init 环初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int rte_ring_init(struct rte_ring* r, const char* name, unsigned count, unsigned flags) {
/* compilation-time checks */
RTE_BUILD_BUG_ON((sizeof(struct rte_ring) & RTE_CACHE_LINE_MASK) != 0);
RTE_BUILD_BUG_ON((offsetof(struct rte_ring, prod) & RTE_CACHE_LINE_MASK) != 0);

/* init the ring structure */
memset(r, 0, sizeof(*r));
snprintf(r->name, sizeof(r->name), "%s", name);
r->flags = flags;
r->prod.watermark = count;
r->prod.sp_enqueue = !!(flags & RING_F_SP_ENQ); // 是否是 single-producer
r->cons.sc_dequeue = !!(flags & RING_F_SC_DEQ); // 是否是 single-consumer
r->prod.size = r->cons.size = count;
r->prod.mask = r->cons.mask = count - 1;
r->prod.head = r->cons.head = 0; // 生产者与消费者的头指针索引
r->prod.tail = r->cons.tail = 0; // 生产者与消费者的尾指针索引

return 0;
}

rte_ring_free 环释放

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
/* free the ring */
void rte_ring_free(struct rte_ring* r) {
struct rte_ring_list* ring_list = NULL;
struct rte_tailq_entry* te;

if (r == NULL)
return;

/*
* Ring was not created with rte_ring_create,
* therefore, there is no memzone to free.
*/
if (r->memzone == NULL) {
RTE_LOG(ERR, RING, "Cannot free ring (not created with rte_ring_create()");
return;
}

if (rte_memzone_free(r->memzone) != 0) {
RTE_LOG(ERR, RING, "Cannot free memory\n");
return;
}

ring_list = RTE_TAILQ_CAST(rte_ring_tailq.head, rte_ring_list);
rte_rwlock_write_lock(RTE_EAL_TAILQ_RWLOCK);

/* find out tailq entry,找出尾队列节点中 data 指向入参 r 的 entry */
TAILQ_FOREACH(te, ring_list, next) {
if (te->data == (void*)r)
break;
}

if (te == NULL) { // 没找到
rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);
return;
}

TAILQ_REMOVE(ring_list, te, next); // 找到,从全局尾队列中摘除该 rte_ring 无锁队列

rte_rwlock_write_unlock(RTE_EAL_TAILQ_RWLOCK);

rte_free(te);
}

单生产者单消费者出入队

单生产者 sp 入队

核心流程:

  1. 首先,获取 prod.head 和 cons.tail 位置,然后计算空余 buffer 数量为 mask-(prod.head - cons.tail);
  2. 其次,在入队操作前,修改 prod.head,后移 n 个单位,表明生产者将要生产到这里;
  3. 然后,进行入队操作 ENQUEUE_PTRS;
  4. 最后,在入队操作后,修改 prod.tail,使其值为生产前的 prod.head+n,表明生产完成后其尾节点的正确位置。
    • 如果是单生产者模式,其实可以直接使用当前的 prod.head;但是如果多生产者模式,不能直接使用,因为多个线程会竞争写入。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
/**
* @internal Enqueue several objects on a ring (NOT multi-producers safe).
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* @param behavior 环的两种入队模式:固定数量入队、尽力而为入队
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects enqueued.
*/
static inline int __attribute__((always_inline))
__rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, cons_tail; // 假设无边界时,两指针的相对顺序:0____ct------ph________+inf
uint32_t prod_next, free_entries;
unsigned i;
uint32_t mask = r->prod.mask; // 等于 size(ring)-1
int ret;

prod_head = r->prod.head;
cons_tail = r->cons.tail;

/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* prod_head > cons_tail). So 'free_entries' is always between 0
* and size(ring)-1. */
free_entries = mask + cons_tail - prod_head;

/* check that we have enough room in ring */
if (unlikely(n > free_entries)) { // 入队的 obj 数量大于队列空余空间
if (behavior == RTE_RING_QUEUE_FIXED) { // 固定数量入队模式:无法全部入队,异常退出
__RING_STAT_ADD(r, enq_fail, n);
return -ENOBUFS;
}
else { // 尽力而为入队模式:入队全部空余空间,即入队 free_entries 个 obj
/* No free entry available */
if (unlikely(free_entries == 0)) {
__RING_STAT_ADD(r, enq_fail, n);
return 0;
}

n = free_entries; // 实际入队 free_entries 个 obj
}
}

prod_next = prod_head + n;
r->prod.head = prod_next; // 修改 ring 结构中的 prod.head 指针

/* write entries in ring */
ENQUEUE_PTRS(); // 入队操作
rte_smp_wmb(); // 内存屏障

/* if we exceed the watermark */
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { // 已经用的空间 + 本次实际使用的空间 大于 水位线
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED); // 按模式指定异常返回值
__RING_STAT_ADD(r, enq_quota, n);
}
else {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; // 返回对应模式的成功状态下的返回值
__RING_STAT_ADD(r, enq_success, n);
}

r->prod.tail = prod_next; // 修改 ring 结构中的 prod.tail 指针(sp 模式下,这里等于 prod.head 的值)
return ret;
}

入队宏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* the actual enqueue of pointers on the ring.
* Placed here since identical code needed in both
* single and multi producer enqueue functions */
#define ENQUEUE_PTRS() do { \
const uint32_t size = r->prod.size; \
uint32_t idx = prod_head & mask; \ // 获取入队的起始索引
if (likely(idx + n < size)) { \ // 起始索引 + 入队量 小于 ring 大小,则直接往后入队,不用回绕到最开始继续入队
// 先 4 个、4 个的入队(循环步长是 4),依次赋值,相当于循环展开优化,减少计算循环索引和预测条件分支判断等优化
for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
r->ring[idx] = obj_table[i]; \
r->ring[idx+1] = obj_table[i+1]; \
r->ring[idx+2] = obj_table[i+2]; \
r->ring[idx+3] = obj_table[i+3]; \
} \
switch (n & 0x3) { \ // 最后不足 4 个的 obj 入队
case 3: r->ring[idx++] = obj_table[i++]; \
case 2: r->ring[idx++] = obj_table[i++]; \
case 1: r->ring[idx++] = obj_table[i++]; \
} \
} else { \ // 起始索引 + 入队量 不小于 ring 大小,则往后入队一部分到右边界,然后从左边界入队余下的一部分
for (i = 0; idx < size; i++, idx++)\ // 入队部分 obj 到 ring 的[idx, size-1]
r->ring[idx] = obj_table[i]; \
for (idx = 0; i < n; i++, idx++) \ // 入队余下的 obj 到 ring 的[0, n-(size-idx)-1]
r->ring[idx] = obj_table[i]; \
} \
} while(0)

单消费者 sc 出队

核心流程:

  1. 首先,获取 prod.tail 和 cons.head 位置,然后计算有效 buffer 数量为(prod.tail - cons.head);
  2. 其次,在出队操作前,修改 cons.head,后移 n 个单位,表明消费者将要消费到这里;
  3. 然后,进行出队操作 DEQUEUE_PTRS;
  4. 最后,在出队操作后,修改 cons.tail,使其值为消费前的 cons.head+n,表明消费完成后其尾节点的正确位置。
    • 如果是单消费者模式,其实可以直接使用当前的 cons.head;但是如果多消费者模式,不能直接使用,因为多个线程会竞争写入。
1
2
3
4
5
6
7
8
9
生产者计算有效 buffer:
____ct******ch---------------------pt+++++++++++ph______
| |
------------------used buffer----------------

消费者计算有效 buffer:
____ct------ch---------------------pt-----------ph______
| |
-------used buffer------

为什么生产者、消费者计算有效使用 buffer 数量时的公式不一样?

(个人理解)使用不同计算公式,可能是为了兼顾 MPMC 模式。因为 +++ 可能是被其它生产者线程预留的将要生成的 buffer、但还没来得及生成完成;同样的,***可能是被其它消费者线程预留的将要消费的 buffer、但还没来得及消费完成。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
/**
* @internal Dequeue several objects from a ring (NOT multi-consumers safe).
* When the request objects are more than the available objects, only dequeue
* the actual number of objects 当请求出队量大于队列剩余量,则仅出队实际余量
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects) that will be filled.
* 从 ring 中出队的 obj 存放到 obj_table 指向的内存里
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @param behavior 环的两种出队模式:固定数量出队、尽力而为出队
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued. 余量不足,则不出队任何 obj
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects dequeued.
*/
static inline int __attribute__((always_inline))
__rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail; // 假设无边界时,两指针的相对顺序:0____ct------ph________+inf
uint32_t cons_next, entries;
unsigned i;
uint32_t mask = r->prod.mask;

cons_head = r->cons.head;
prod_tail = r->prod.tail;

/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* cons_head > prod_tail). So 'entries' is always between 0
* and size(ring)-1. */
entries = prod_tail - cons_head; // ring 中的 obj 数量

if (n > entries) { // 请求出队量大于实际余量,根据模式不同、处理不同
if (behavior == RTE_RING_QUEUE_FIXED) { // 固定数量出队,无法出队指定数量,返回异常
__RING_STAT_ADD(r, deq_fail, n);
return -ENOENT;
}
else {
if (unlikely(entries == 0)){ // 尽力而为出队,出队实际余量
__RING_STAT_ADD(r, deq_fail, n);
return 0;
}

n = entries; // 出队量为实际余量
}
}

cons_next = cons_head + n;
r->cons.head = cons_next; // 修改 ring 结构中的 cons.head 指针

/* copy in table */
DEQUEUE_PTRS(); // 出队操作
rte_smp_rmb(); // 内存屏蔽

__RING_STAT_ADD(r, deq_success, n);
r->cons.tail = cons_next; // 修改 ring 结构中的 cons.tail 指针(sp 模式下,这里等于 cons.head 的值)
return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}

出队宏

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/* the actual copy of pointers on the ring to obj_table.
* Placed here since identical code needed in both
* single and multi consumer dequeue functions */
#define DEQUEUE_PTRS() do { \
uint32_t idx = cons_head & mask; \ // 获取出队的起始索引
const uint32_t size = r->cons.size; \
if (likely(idx + n < size)) { \ // 起始索引 + 出队量 小于 ring 大小,则直接往后出队,不用回绕到最开始继续出队
// 先 4 个、4 个的出队(循环步长是 4),依次赋值,相当于循环展开优化,减少计算循环索引和预测条件分支判断等优化
for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
obj_table[i] = r->ring[idx]; \
obj_table[i+1] = r->ring[idx+1]; \
obj_table[i+2] = r->ring[idx+2]; \
obj_table[i+3] = r->ring[idx+3]; \
} \
switch (n & 0x3) { \ // 最后不足 4 个的 obj 出队
case 3: obj_table[i++] = r->ring[idx++]; \
case 2: obj_table[i++] = r->ring[idx++]; \
case 1: obj_table[i++] = r->ring[idx++]; \
} \
} else { \ // 起始索引 + 出队量 不小于 ring 大小,则往后出队一部分到右边界,然后从左边界出队余下的一部分
for (i = 0; idx < size; i++, idx++) \ // 出队 ring 的[idx, size-1] 区间的 obj 到 obj_table 的前一段
obj_table[i] = r->ring[idx]; \
for (idx = 0; i < n; i++, idx++) \ // 出队 ring 的[0, n-(size-idx)-1] 区间的 obj 到 obj_table 的后一段
obj_table[i] = r->ring[idx]; \
} \
} while (0)

sp&sc 场景分析

在单生产者、单消费者场景下,简单考虑 push 和 pop 数据,假设有生产者 A 和消费者 B 两个线程,分别指向 ring。

有几个问题:B 去 pop 数据的时候会不会因为判断有数据而取不到的情况?(A 在 push 数据时,不小心先更新了 prod.tail 后 push 数据);A 在 push 数据时,是否会因为 B 先更新 cons.tail 而导致覆盖 B 还没来得及 pop 的数据?还有一些其他情况…

因为这里有内存屏障,并不会出现以上问题,以上的实现在单生产者和单消费者的情况下比较简单,但对于多读多写同一个 ring 可能比较复杂。

多生产者多消费者出入队

以下分析下复杂情况,即多个线程 push 和 pop 数据,这里仅分析生产和消费的源码,其他的一些比较细节且不是很重要的代码不在这里分析。

DPDK 无锁环形队列 MPMC 模式下 head 和 tail 同步逻辑

多生产者 mp 入队

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
/**
* @internal Enqueue several objects on the ring (multi-producers safe).
*
* This function uses a "compare and set" instruction to move the
* producer index atomically.
* 该函数使用 CAS 指令,原子地修改生产者索引
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects).
* @param n
* The number of objects to add in the ring from the obj_table.
* @param behavior
* RTE_RING_QUEUE_FIXED: Enqueue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Enqueue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects enqueue.
* - -EDQUOT: Quota exceeded. The objects have been enqueued, but the
* high water mark is exceeded.
* - -ENOBUFS: Not enough room in the ring to enqueue, no object is enqueued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects enqueued.
*/
static inline int __attribute__((always_inline))
__rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t prod_head, prod_next; // 假设无边界时,两指针的相对顺序:......____ct------ph________...
uint32_t cons_tail, free_entries;
const unsigned max = n;
int success;
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;
int ret;

/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
if (n == 0)
return 0;

/* move prod.head atomically */
do {
/* Reset n to the initial burst count */
n = max;

/*
* while (unlikely(success == 0)) 时,说明有其他线程修改了 r->prod.head,
* 因此,这里要重新读取,并执行 CAS 指令,否则永远不会 success == 1 退出
*/
prod_head = r->prod.head;
cons_tail = r->cons.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* prod_head > cons_tail). So 'free_entries' is always between 0
* and size(ring)-1. */
free_entries = (mask + cons_tail - prod_head);

/* check that we have enough room in ring */
if (unlikely(n > free_entries)) { // 入队的 obj 数量大于队列空余空间
if (behavior == RTE_RING_QUEUE_FIXED) { // 固定数量入队模式:无法全部入队,异常退出
__RING_STAT_ADD(r, enq_fail, n);
return -ENOBUFS;
}
else { // 尽力而为入队模式:入队全部空余空间,即入队 free_entries 个 obj
/* No free entry available */
if (unlikely(free_entries == 0)) {
__RING_STAT_ADD(r, enq_fail, n);
return 0;
}

n = free_entries; // 实际入队 free_entries 个 obj
}
}

prod_next = prod_head + n;
/*
* 这里与 sp 模式不同,mp 模式下使用 CAS 指令实现原子比较并交换,这行代码试图在同一个时间点内,
* 判断并更新 r->prod.head 的值,即当 r->prod.head 等于 prod_head 时,更新其值为 prod_next
* 这种操作通常用于多线程编程中的同步,以确保没有两个线程同时修改共享变量 r->prod.head
*/
success = rte_atomic32_cmpset(&r->prod.head, prod_head, prod_next);
} while (unlikely(success == 0)); // 操作失败时,再来一次

/* write entries in ring */
ENQUEUE_PTRS(); // 入队操作(与单生产者入队操作执行过程一样)
rte_smp_wmb(); // 内存屏蔽

/* if we exceed the watermark */
if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) { // 已经用的空间 + 本次实际使用的空间 大于 水位线
ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
(int)(n | RTE_RING_QUOT_EXCEED); // 按模式指定异常返回值
__RING_STAT_ADD(r, enq_quota, n);
}
else {
ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n; // 返回对应模式的成功状态下的返回值
__RING_STAT_ADD(r, enq_success, n);
}

/*
* If there are other enqueues in progress that preceded us,
* we need to wait for them to complete
* 如果在我们之前,还有其他更早的生产者线程正在入队,
* 我们需要等待他们完成,然后再更新 prod.tail
*/
while (unlikely(r->prod.tail != prod_head)) {
rte_pause();

/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
r->prod.tail = prod_next; // 修改 ring 结构中的 prod.tail 指针(mp 模式下,这里等于 CAS 更新后的 r->prod.head 的值)
return ret;
}

CAS 函数源码

x86 平台的实现:lib\librte_eal\common\include\arch\x86\rte_atomic.h

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*------------------------- 32 bit atomic operations -------------------------*/

static inline int
rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
{
uint8_t res;

asm volatile(
MPLOCKED
"cmpxchgl %[src], %[dst];"
"sete %[res];"
: [res] "=a" (res), /* output */
[dst] "=m" (*dst)
: [src] "r" (src), /* input */
"a" (exp),
"m" (*dst)
: "memory"); /* no-clobber list */
return res;
}

多消费者 mc 出队

多消费者出队的实现,CAS 实现差不多和上面的多生产者一样,出队和单消费者实现一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
/**
* @internal Dequeue several objects from a ring (multi-consumers safe). When
* the request objects are more than the available objects, only dequeue the
* actual number of objects
*
* This function uses a "compare and set" instruction to move the
* consumer index atomically.
*
* @param r
* A pointer to the ring structure.
* @param obj_table
* A pointer to a table of void * pointers (objects) that will be filled.
* @param n
* The number of objects to dequeue from the ring to the obj_table.
* @param behavior
* RTE_RING_QUEUE_FIXED: Dequeue a fixed number of items from a ring
* RTE_RING_QUEUE_VARIABLE: Dequeue as many items a possible from ring
* @return
* Depend on the behavior value
* if behavior = RTE_RING_QUEUE_FIXED
* - 0: Success; objects dequeued.
* - -ENOENT: Not enough entries in the ring to dequeue; no object is
* dequeued.
* if behavior = RTE_RING_QUEUE_VARIABLE
* - n: Actual number of objects dequeued.
*/

static inline int __attribute__((always_inline))
__rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
unsigned n, enum rte_ring_queue_behavior behavior)
{
uint32_t cons_head, prod_tail; // 假设无边界时,两指针的相对顺序:......____ch------pt________...
uint32_t cons_next, entries;
const unsigned max = n;
int success;
unsigned i, rep = 0;
uint32_t mask = r->prod.mask;

/* Avoid the unnecessary cmpset operation below, which is also
* potentially harmful when n equals 0. */
if (n == 0)
return 0;

/* move cons.head atomically */
do {
/* Restore n as it may change every loop */
n = max;

cons_head = r->cons.head;
prod_tail = r->prod.tail;
/* The subtraction is done between two unsigned 32bits value
* (the result is always modulo 32 bits even if we have
* cons_head > prod_tail). So 'entries' is always between 0
* and size(ring)-1. */
entries = (prod_tail - cons_head);

/* Set the actual entries for dequeue */
if (n > entries) {
if (behavior == RTE_RING_QUEUE_FIXED) {
__RING_STAT_ADD(r, deq_fail, n);
return -ENOENT;
}
else {
if (unlikely(entries == 0)){
__RING_STAT_ADD(r, deq_fail, n);
return 0;
}

n = entries;
}
}

cons_next = cons_head + n;
success = rte_atomic32_cmpset(&r->cons.head, cons_head, cons_next);
} while (unlikely(success == 0));

/* copy in table */
DEQUEUE_PTRS();
rte_smp_rmb();

/*
* If there are other dequeues in progress that preceded us,
* we need to wait for them to complete
*/
while (unlikely(r->cons.tail != cons_head)) {
rte_pause();

/* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
* for other thread finish. It gives pre-empted thread a chance
* to proceed and finish with ring dequeue operation. */
if (RTE_RING_PAUSE_REP_COUNT &&
++rep == RTE_RING_PAUSE_REP_COUNT) {
rep = 0;
sched_yield();
}
}
__RING_STAT_ADD(r, deq_success, n);
r->cons.tail = cons_next;

return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
}

参考资料:

  1. https://www.jianshu.com/p/ab14b3d3aa83
  2. https://blog.csdn.net/u012630961/article/details/80974857
  3. https://elixir.bootlin.com/dpdk/v2.2.0/source/lib/librte_ring/rte_ring.c