UDP 性能提升(理论部分)

While a lot of work has gone into optimizing TCP implementations as much as possible over the years, including building offloading capabilities in both software (like in operating systems) and hardware (like in network interfaces), UDP has not received quite as much attention as TCP, which puts QUIC at a disadvantage. In this post we will look at a few tricks that help mitigate(减轻) this disadvantage for UDP, and by association QUIC.

尽管多年来人们投入大量精力优化 TCP 实现——包括在软件(如操作系统)和硬件(如网络接口)中构建卸载能力,但 UDP 获得的关注度远不及 TCP,这使得 QUIC 处于不利地位。本文将探讨几种有助于减轻 UDP 及其关联协议 QUIC 的这种劣势的技术方案。

sendmsg

Currently the code that implements QUIC in NGINX uses the sendmsg() system call to send a single UDP packet at a time.

1
ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
sendmsg

The struct msghdr carries a struct iovec which can in turn(依次) carry multiple buffers. However, all of the buffers within a single iovec will be merged together into a single UDP datagram during transmission. The kernel will then take care of encapsulating the buffer in a UDP packet and sending it over the wire.

struct msghdr 结构体携带的 struct iovec 能够承载多个缓冲区。然而,在传输过程中,单个 iovec 内的所有缓冲区会被合并成一个独立的 UDP 数据报。内核随后会负责将该缓冲区封装到 UDP 数据包中并通过网络发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* Structure describing messages sent by `sendmsg' and received by `recvmsg'.  */
struct msghdr
{
void *msg_name; /* Address to send to/receive from. */
socklen_t msg_namelen; /* Length of address data. */

struct iovec *msg_iov; /* Vector of data to send/receive into. */
size_t msg_iovlen; /* Number of elements in the vector. */

void *msg_control; /* Ancillary data (eg BSD filedesc passing). */
size_t msg_controllen; /* Ancillary data buffer length.
!! The type should be socklen_t but the
definition of the kernel is incompatible
with this. */

int msg_flags; /* Flags on received message. */
};

/* Structure for scatter/gather I/O. */
struct iovec
{
void *iov_base; /* Pointer to data. */
size_t iov_len; /* Length of data. */
};

sendmmsg

Due to the fact that sendmsg() only sends a single UDP packet at a time, it needs to be invoked quite a lot in order to transmit all of the QUIC packets required to deliver the requested resources.

由于 sendmsg() 每次只能发送一个 UDP 数据包,因此需要频繁调用该函数才能传输交付所请求资源所需的所有 QUIC 数据包。

Each of those system calls causes an expensive context switch between the application and the kernel, thus impacting throughput.

每一次系统调用都会导致应用程序与内核之间发生昂贵的上下文切换,从而对吞吐量造成影响。

But while sendmsg() only transmits a single UDP packet at a time for each invocation, its close cousin(近亲 / 衍生) sendmmsg() (note the additional “m” in the name) is able to batch multiple packets per system call:

1
int sendmmsg(int sockfd, struct mmsghdr *msgvec, unsigned int vlen, int flags);

Multiple struct mmsghdr structures can be passed to the kernel as an array, each in turn(依次) carrying a single struct msghdr with its own struct iovec, with each element in the msgvec array representing a single UDP datagram.

可以将多个 struct mmsghdr 结构体以数组形式传递给内核,其中每个结构体依次承载一个包含独立 struct iovecstruct msghdr,而 msgvec 数组中的每个元素都代表一个独立的 UDP 数据报。

sendmmsg
1
2
3
4
5
6
/* For `recvmmsg' and `sendmmsg'.  */
struct mmsghdr
{
struct msghdr msg_hdr; /* Actual message header. */
unsigned int msg_len; /* Number of received or sent bytes for the entry. */
};

系统调用示例(可见 .msg_len 是内核返回给应用的参数):

1
2
3
4
5
6
7
8
9
int ret = sendmmsg(sockfd, msgs, num_packets, 0);

// 统计发送的字节数
printf("sendmmsg sent %d out of %d packets\n", ret, num_packets);
size_t total_sent = 0;
for (int i = 0; i < ret; i++) {
total_sent += msgs[i].msg_len;
}
printf("Total bytes sent via sendmmsg: %zu\n", total_sent);

UDP segmentation offload

原理

With sendmsg() as well as sendmmsg(), the application is responsible for separating each QUIC packet into its own buffer in order for the kernel to be able to transmit it. While the implementation in NGINX uses static buffers to implement this, so there is no overhead in allocating them, all of these buffers need to be traversed by the kernel during transmission, which can add significant overhead.

无论是使用 sendmsg() 还是 sendmmsg(),应用程序都需要将每个 QUIC 数据包分离到独立的缓冲区中,以便内核进行传输。虽然 NGINX 的实现使用静态缓冲区来完成这一操作(因此不存在分配开销),但在传输过程中内核需要遍历所有这些缓冲区,这会带来显著的开销。

Linux supports a feature, Generic Segmentation Offload (GSO), which allows the application to pass a single “super buffer” to the kernel, which will then take care of segmenting it into smaller packets. The kernel will try to postpone(推迟) the segmentation as much as possible to reduce the overhead of traversing outgoing buffers (some NICs even support hardware segmentation). Originally GSO was only supported for TCP, but support for UDP GSO was recently added as well, in Linux 4.18.

Linux 支持一项名为通用分段卸载(GSO)的功能,该功能允许应用程序向内核传递一个“超级缓冲区”,由内核负责将其分割成更小的数据包。内核会尽可能推迟分段操作,以减少遍历发送缓冲区带来的开销(某些网卡甚至支持硬件分段)。最初 GSO 仅支持 TCP,但在 Linux 4.18 中也新增了对 UDP GSO 的支持。

This feature can be controlled using the UDP_SEGMENT socket option:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#ifndef SOL_UDP
#define SOL_UDP (17)
#endif
#ifndef UDP_SEGMENT
#define UDP_SEGMENT (103)
#endif
#ifndef UDP_GRO
#define UDP_GRO (104)
#endif
#ifndef UDP_MAX_SEGMENTS
#define UDP_MAX_SEGMENTS (1 << 6UL)
#endif

int use_gso = -1;
socklen_t opt_len = sizeof(use_gso);
if (getsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &use_gso, &opt_len) < 0) {
printf("GSO not supported by system, strerror: %s\n", strerror(errno));
} else {
int gso_size = (1500 - 20 - 8); // MTU - IP Header - UDP Header
if (setsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
printf("setsockopt: UDP GSO enabled with size %d failed, strerror: %s\n", gso_size, strerror(errno));
} else {
printf("UDP GSO supported: UDP GSO enabled with size %d\n", gso_size);
}
}

As well as via ancillary(辅助的) data, to control segmentation for each sendmsg() call:

1
2
3
4
5
struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *) CMSG_DATA(cm)) = gso_size;

Where gso_size is the size of each segment that form the “super buffer” passed to the kernel from the application. Once configured, the application can provide one contiguous(连续的) large buffer containing a number of packets of gso_size length (as well as a final smaller packet), that will then be segmented by the kernel (or the NIC if hardware segmentation offloading is supported and enabled).

其中,gso_size 表示构成从应用程序传递给内核的“超级缓冲区”的每个分段的大小。一旦配置完成,应用程序就可以提供一个连续的大型缓冲区,其中包含多个长度为 gso_size 的数据包(以及最后一个较小尺寸的数据包),这些数据包将由内核(或在支持并启用硬件分段卸载功能时由网卡)进行分段处理。

sendmsg-gso

Up to 64 segments can be batched with the UDP_SEGMENT option.

And indeed the number of syscalls also went down significantly, compared to plain(普通的) sendmsg().

GSO can also be combined with sendmmsg() to deliver an even bigger improvement. The idea being that each struct msghdr can be segmented in the kernel by setting the UDP_SEGMENT option using ancillary data, allowing an application to pass multiple “super buffers”, each carrying up to 64 segments, to the kernel in a single system call.

GSO 还可以与 sendmmsg() 结合使用,从而实现更显著的性能提升。其核心思想是:通过使用辅助数据设置 UDP_SEGMENT 选项,每个 struct msghdr 都可以在内核中进行分段,这使得应用程序能够在单次系统调用中向内核传递多个“超级缓冲区”,而每个缓冲区最多可包含 64 个数据段。

all-chart

sendmsg+GSO 实践代码

本段代码,抽象于我最近编写的基于 Alibaba XQUIC 封装的 BSD TCP Socket API。

对于多个独立的 QUIC 包组成的 msg_iov 数组(也并非一块连续的超级缓冲区),如果大多数包满足具有相同的长度(不一定是 setsockopt 设置的 gso_size,但一定比它小且长度固定),少数包是其它长度。这种场景下,在尝试发送一个 msg_iov 数组下的多个包时,需要“分段”发送,即遇到长度变化时就要调用一次 sendmsg,将之前的所有长度一致的包使用 GSO 发送出去,再继续发送后续的包。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
ssize_t sendmsg_with_gso(const struct iovec *msg_iov, unsigned int vlen,
const struct sockaddr *peer_addr, socklen_t peer_addrlen, int fd)
{
ssize_t res;
ssize_t sent_batch = 0;
unsigned int start = 0;
// 一次调用 sendmsg 时,发送的多个小包组成的大包,长度不能超过系统限制,否则报错 Message too loog
// 且一次 sendmsg 调用,不能超过 UDP_SEGMENT 个分段
const unsigned max_sendmsg_once = fmin(UDP_MAX_SEGMENTS, (65536 / PKT_MAX_MSS));

struct msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)peer_addr;
msg.msg_namelen = peer_addrlen;

char control[CMSG_SPACE(sizeof(uint16_t))];
memset(control, 0, sizeof(control));

for (unsigned int i = 1; i <= vlen; i++) {
// 连续相同的长度则继续后探,也控制每次 sendmsg 的 UDP datagram 数量
if (i < vlen && msg_iov[i].iov_len == msg_iov[start].iov_len && i - start < max_sendmsg_once) {
continue;
}

unsigned int batch_len = i - start;
const struct iovec *batch_iov = &msg_iov[start];
size_t segment_size = msg_iov[start].iov_len;

msg.msg_iov = (struct iovec *)batch_iov;
msg.msg_iovlen = batch_len;

// 只有批量数据才使用 GSO
if (batch_len > 1) {
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = segment_size;
} else {
msg.msg_control = NULL;
msg.msg_controllen = 0;
}

res = sendmsg(fd, &msg, 0);
// 错误处理...

sent_batch += batch_len;
start = i;
}

return sent_batch;
}

大包、小包与 GSO

在业务发送 UDP 数据包时,为了避免 IP 层对包进行分片,一般会将待发送的 UDP 数据包的大小控制在 MTU - sizeof(udp header) - sizeof(ip header) 以下。这里之所以要避免 IP 层分片,主要是额外的 IP 层分片与重组有时候会导致不少问题:

  • 比如,同属于同一个上层数据包的多个 IP 分片在发送时,任一分片的丢失都将导致整个上层数据包的重传。
  • 再比如,接收端设备为了完成 IP 重组不得不分配额外的内存等资源来存放管理当前已经收到的 IP 分片,如果发送端发送大量的 IP 分片,那么将会导致接收端用于暂存分片包的缓冲区被打满。
  • 更糟糕的是,如果发送端的一个数据包对应的分片数目过多,那么接收端可能会一直无法完成一次完整的分片重组。举个极端例子:假设接收端 R 最多可以缓存 4 个 IP 分片包,现在有发送端 S 发送了 1 个 8000 字节长度的 UDP 数据包,在 MTU 为 1500 的情况下,这个 UDP 数据包将拆分为 6 个分片。很显然这时 R 将一直无法完成数据包的重组导致数据发送一直失败。

UDP GSO,简单来说就是内核在接受到应用程序发来的一堆待发送的应用数据 D 之后,会按照应用程序之前告诉内核的配置,将接收到的 D 拆分为若干块,之后为每一块加上 UDP header 封装成一个 UDP 数据包发送出去。以 gsosize 来表示上层应用告诉内核的每块最大大小。

我们以 QUIC 协议为例,当上层应用希望发送 2k 字节内容时,在不使用 GSO 的情况下,QUIC 实现一般会将内容拆分存放到 2 个 QUIC packet 中,之后对应着 2 个 UDP 数据包发送出去,对应到伪代码如下。很显然这涉及到两次 sendto 系统调用:

1
2
3
4
// 将待 1k 应用数据拼接成 QUIC packet 之后调用 sendto 发送
sendto(fd, "quic short packet header + frame header + [0, 1k) app data");
// 继续发送后续 1k 应用数据
sendto(fd, "quic short packet header + frame header + [1k, 2k) app data");

在使用 GSO 之后,应用需要首先通过 SOL_UDP + UDP_SEGMENT 告诉内核 gsosize 取值,之后应用按照 gsosize 完成数据的组装:

1
2
3
4
5
6
7
8
9
// 分片 1, 总长度要限制为 gsosize,否则 part1 就不是一个完整的 QUIC 包了
part1 = "quic short packet header + frame header + app data"
// 分片 2, 存放着剩下的应用数据, 其大小可小于 gsosize.
part2 = "quic short packet header + frame header + app data"

// 之后将分片 1, 2 拼接成一个大缓冲区, 然后一次 sendto 调用完成数据发送.
// 除最后一个分片的所有分片的大小要固定为 gsosize.
sendto(fd, part1 + part2); // part1 + part2 为一块连续的超级缓冲区
// 这里内核内部会按照 gsosize 再次将数据拆分为 ' 分片 1', ' 分片 2', 然后分为两个 UDP 数据包发送.

所以,也可以看到 GSO 对于接收端来说是透明的。毕竟 GSO 后发送的每一个 UDP 数据包都是一个完整的、单独的数据包。

UDP GRO

UDP GRO,最开始我一直没有搞懂 UDP GRO,单纯地从字面上看 GRO 是说内核会尽量在协议的最底层将收到的多个数据包拼接在一起之后向上传递,也即上层看到的只是一个数据包。对于 TCP 中的 GRO,这里内核在拼接数据包时会遵循 TCP 的语义,比如内核在收到了三个 TCP 数据包,TCP 序号分别为 33, 34, 35, 那么此时内核会将三个 TCP 数据包拼接成一个之后向上层协议传递,这时还是可以理解的。但是对于 UDP 而言,大部分使用 UDP 作为传输协议的应用都依赖着 udp packet 边界的性质,比如 QUIC short packet 中,packet header 中并没有长度字段,完全是使用了 udp header 中的长度字段来隐式地指定了 short packet 的大小。那这时 GRO 将多个 UDP 数据包拼接成一个之后,上层应用还咋用这个边界信息来区分?

这个真的是 google 上找了半天,现实中问了一圈大佬都没搞清楚,直到最后快要弃疗的时候看到了内核关于 GRO/GSO 的单测 case 才大概了解了 UDP GRO 是如何拼接成,很简单就是 仅当 UDP 数据包具有相同大小时,才会被拼接成一个大的 UDP 数据包,同时内核还会告诉上层应用原始 UDP 数据包的长度信息。这样上层应用在需要的时候也可以根据这个信息来确定 udp packet 边界。如 udpgso_bench_rx.c 所示,当使用了 UDP GRO 之后,应用程序的收包姿势如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static int recv_msg(int fd, char *buf, int len, int *gso_size)
{
char control[CMSG_SPACE(sizeof(uint16_t))] = {0};
struct msghdr msg = {0};
struct iovec iov = {0};
struct cmsghdr *cmsg;
uint16_t *gsosizeptr;
int ret;

iov.iov_base = buf;
iov.iov_len = len;

msg.msg_iov = &iov;
msg.msg_iovlen = 1;

msg.msg_control = control;
msg.msg_controllen = sizeof(control);

*gso_size = -1;

// 假如,发送端发送的是 1200, 1200, 1100, 1200, 1200, 1200 的六个包
// 那么,接收端需要调用 3 次 recv_msg 接口,才能将发送端的六个包完整的接收到
// 第一次,接收一个长度为 2400 的数据块存到 buf 中,*gso_size 的值为 1200,函数返回后用户根据 gso_size 来切分包
// 第二次,接收一个长度为 1100 的数据块存到 buf 中,*gso_size 的值为 1100,函数返回后用户根据 gso_size 来切分包
// 第三次,接收一个长度为 3600 的数据块存到 buf 中,*gso_size 的值为 1200,函数返回后用户根据 gso_size 来切分包
ret = recvmsg(fd, &msg, MSG_TRUNC | MSG_DONTWAIT);
if (ret != -1) {
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
gsosizeptr = (uint16_t *) CMSG_DATA(cmsg);
*gso_size = *gsosizeptr;
break;
}
}
}

return ret;
}

recv_msg() 返回后,[buf, len) 中存放的 可能是一个或多个 UDP 数据包拼接之后的内容,此时 *gso_size 存放着每个原始 UDP 数据包的大小。对于应用来说,它可以以 *gso_size 来切分 buf,然后处理每一个数据包,即中间位置的数据对应的原始 UDP 数据包大小总是为 *gso_size, 最后剩下的数据对应 UDP 数据包大小可能会小于 *gsosize

UDP 性能提升(实践部分)

开启 & 禁用 socket-level 的 GSO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#define GSO_SIZE (1500 - 20 - 8) // 最大 GSO 分段大小

// 启用 UDP GSO
int enable_udp_gso(int sockfd)
{
int gso_size = GSO_SIZE;

if (setsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
perror("setsockopt UDP_SEGMENT failed");
return -1;
}

printf("UDP GSO enabled with segment size: %d\n", gso_size);
return 0;
}

// 禁用 UDP GSO
int disable_udp_gso(int sockfd)
{
int gso_size = 0;

if (setsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
perror("setsockopt UDP_SEGMENT disable failed");
return -1;
}

printf("UDP GSO disabled\n");
return 0;
}

不同系统调用发送相同字节数据(客户端侧)

每个接口均发送 num_packetspacket_size 字节的数据,共计发送 packet_size * num_packets 字节,但是使用不同的系统调用和调优策略。

sendto(发送多个小包)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
void send_sendto(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
size_t total_size = packet_size * num_packets;
size_t total_sent_size = 0;
char *send_buffer = malloc(packet_size);

if (!send_buffer) {
perror("malloc failed for send buffer");
return;
}

memset(send_buffer, 'X', packet_size);
send_buffer[packet_size - 1] = '\0';

printf("Sending %d packets using sendto, total buffer: %zu bytes\n",
num_packets, total_size);

for (int i = 0; i < num_packets; i++) {
int written = snprintf(send_buffer, packet_size,
"Send packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
send_buffer[written] = 'X'; // 去掉结束符,方便 recvfrom 能打印整个缓冲区

ssize_t sent = sendto(sockfd, send_buffer, packet_size, 0,
(struct sockaddr *)dest_addr, sizeof(*dest_addr));

if (sent < 0) {
perror("sendto without GSO failed");
} else {
total_sent_size += sent;
// printf("Send sent %zd bytes, expected %u bytes\n", sent, packet_size);
}
}
printf("Approximately %zd packets sent without GSO, total sent %lu bytes\n",
total_sent_size / packet_size, total_sent_size);

free(send_buffer);
}

sendto+GSO(发送一个大包)

前置条件:先调用 enable_udp_gso(sockfd) 接口开启指定 sockfd 的 GSO。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
void send_gso_batch(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
// 计算总缓冲区大小
size_t total_size = packet_size * num_packets;
char *gso_buffer = malloc(total_size);

if (!gso_buffer) {
perror("malloc failed for GSO buffer");
return;
}

memset(gso_buffer, 'X', total_size);
gso_buffer[total_size - 1] = '\0';

// 准备数据,初始化每个小包的前 N 个字节
for (int i = 0; i < num_packets; i++) {
char *packet_start = gso_buffer + (i * packet_size);
int written = snprintf(packet_start, packet_size,
"GSO Batch packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
packet_start[written] = 'X';
}

printf("Sending %d packets using UDP GSO, total buffer: %zu bytes\n",
num_packets, total_size);

// 使用 GSO 一次性发送整个连续的超级缓冲区
ssize_t sent = sendto(sockfd, gso_buffer, total_size, 0,
(struct sockaddr *)dest_addr, sizeof(*dest_addr));

if (sent < 0) {
perror("sendto with GSO failed");
} else {
printf("GSO sent %zd bytes, expected %zu bytes\n", sent, total_size);
printf("Approximately %zd packets sent via GSO\n", sent / packet_size);
}

free(gso_buffer);
}

sendmsg+GSO(Scatter-Gather 发送多个小包)

前置条件:如果启用 is_segment,需要先调用 enable_udp_gso(sockfd) 接口开启指定 sockfd 的 GSO。

该接口实现的是:通过一次 sendmsg 系统调用,发送一个 struct msghdr。该结构体中包含一个 struct iovec * 数组(其长度为 num_packets),该数组的每个元素中,又包含一个 UDP datagram(长度为 packet_size)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
void send_gso_batch_sendmsg(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets, int is_segment)
{
// 一次性分配所有内存:packet_buffers 数组 + iovec 数组 + 所有数据包缓冲区
size_t total_buffer_size = (size_t)packet_size * num_packets;
char *memory_block = malloc(num_packets * sizeof(char *) + num_packets * sizeof(struct iovec) + total_buffer_size);

if (!memory_block) {
perror("malloc failed for memory block");
return;
}

memset(memory_block, 'X', total_buffer_size);
memory_block[total_buffer_size - 1] = '\0';

// 设置指针位置
struct iovec *msg_iov = (struct iovec *)(memory_block + num_packets * sizeof(char *));
char *data_buffer = (char *)(memory_block + num_packets * sizeof(char *) + num_packets * sizeof(struct iovec));

// 初始化数据包和 iovec
for (int i = 0; i < num_packets; i++) {
char *packet_start = data_buffer + (i * packet_size);

int written = snprintf(packet_start, packet_size,
"Optimized Merged Batch packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
packet_start[written] = 'X';

msg_iov[i].iov_base = packet_start;
msg_iov[i].iov_len = packet_size;
}

// 设置 msghdr
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)dest_addr;
msg.msg_namelen = sizeof(*dest_addr);
msg.msg_iov = msg_iov;
msg.msg_iovlen = num_packets;

if (is_segment) {
// 分配控制消息缓冲区
char control[CMSG_SPACE(sizeof(uint16_t))];
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

// 设置 UDP_SEGMENT 控制消息
struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = packet_size;
}

printf("Sending %d optimized merged packets, total size: %zu bytes\n",
num_packets, total_buffer_size);

ssize_t sent = sendmsg(sockfd, &msg, 0);

if (sent < 0) {
perror("sendmsg with optimized merged packets failed");
} else {
printf("Optimized merged batch sent %zd total bytes\n", sent);
}

free(memory_block);
}

sendmsg+GSO(发送一个大包)

前置条件:如果启用 is_segment,需要先调用 enable_udp_gso(sockfd) 接口开启指定 sockfd 的 GSO。

该接口实现的是:通过一次 sendmsg 系统调用,发送一个 struct msghdr。该结构体中包含一个 struct iovec * 数组(其长度为 1),该数组又包含一个长度为 packet_size * num_packets 连续的超级缓冲区,并由内核或更底层的 NIC 负责对超大包的分段卸载处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void send_gso_batch_improved(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
// 计算总缓冲区大小
size_t total_size = packet_size * num_packets;
char *gso_buffer = malloc(total_size);

if (!gso_buffer) {
perror("malloc failed for GSO buffer");
return;
}

memset(gso_buffer, 'X', total_size);
gso_buffer[total_size - 1] = '\0';

// 准备批量数据
for (int i = 0; i < num_packets; i++) {
char *packet_start = gso_buffer + (i * packet_size);
int written = snprintf(packet_start, packet_size,
"GSO Batch packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
packet_start[written] = 'X';
}

printf("Sending %d packets using UDP GSO, total buffer: %zu bytes\n",
num_packets, total_size);

struct iovec iov = {
.iov_base = gso_buffer,
.iov_len = total_size
};

struct msghdr msg = {
.msg_name = dest_addr,
.msg_namelen = sizeof(*dest_addr),
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = NULL, // 使用 socket-level GSO 可以不加控制信息
.msg_controllen = 0,
.msg_flags = 0
};

// disable_udp_gso 时加控制信息
char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 };
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = GSO_SIZE;

ssize_t sent = sendmsg(sockfd, &msg, 0);

if (sent < 0) {
perror("sendmsg with GSO failed");
} else {
printf("GSO sent %zd bytes, expected %zu bytes, Approximately %zd packets sent via GSO\n",
sent, total_size, sent / packet_size);

if (sent != total_size) {
printf("Warn: Sent %zd bytes less than expected (packet fragmentation may occur)\n",
total_size - sent);
}
}

free(gso_buffer);
}

sendmmsg+W/O GSO(批量发送小包)

前置条件:无(不需要启用 GSO)。

该接口实现的是:批量发送小包。通过一次 sendmmsg 系统调用,发送一个 struct mmsghdr 数组(其长度为 num_packets)。该结构体数组的每一个元素的成员 struct msghdr 中,又包含一个 struct iovec * 数组(其长度为 1),该数组又包含一个长度为 packet_size 的小包缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
void send_mmsg_batch(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
struct mmsghdr msgs[num_packets];
struct iovec iovecs[num_packets];
char *buffers[num_packets];

// 准备每个消息
for (int i = 0; i < num_packets; i++) {
buffers[i] = malloc(packet_size);
if (!buffers[i]) {
perror("malloc failed for mmsg buffer");
for (int j = 0; j < i; j++) {
free(buffers[j]);
}
return;
}

memset(buffers[i], 'X', packet_size);
buffers[i][packet_size - 1] = '\0';

int written = snprintf(buffers[i], packet_size,
"sendmmsg packet %d at timestamp: %ld",
i, time(NULL));
buffers[i][written] = 'X';

iovecs[i].iov_base = buffers[i];
iovecs[i].iov_len = packet_size; // 使用完整的数据包大小

memset(&msgs[i], 0, sizeof(msgs[i]));
msgs[i].msg_hdr.msg_name = dest_addr;
msgs[i].msg_hdr.msg_namelen = sizeof(*dest_addr);
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
}

printf("Sending %d packets using sendmmsg\n", num_packets);

int ret = sendmmsg(sockfd, msgs, num_packets, 0);

if (ret < 0) {
perror("sendmmsg failed");
} else {
printf("sendmmsg sent %d out of %d packets\n", ret, num_packets);

// 统计发送的字节数
size_t total_sent = 0;
for (int i = 0; i < ret; i++) {
total_sent += msgs[i].msg_len;
}
printf("Total bytes sent via sendmmsg: %zu\n", total_sent);
}

for (int i = 0; i < num_packets; i++) {
free(buffers[i]);
}
}

sendmmsg+GSO(批量发送小包)

前置条件:先调用 enable_udp_gso(sockfd) 接口开启指定 sockfd 的 GSO。

该接口实现的是:批量发送小包。通过一次 sendmmsg 系统调用,发送一个 struct mmsghdr 数组(其长度为 num_packets)。该结构体数组的每一个元素的成员 struct msghdr 中,又包含一个 struct iovec * 数组(其长度为 1),该数组又包含一个长度为 packet_size 的小包缓冲区。与 send_mmsg_batch 的区别是,该接口加了 GSO 控制信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
void send_mmsg_gso_batch(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
struct mmsghdr msgs[num_packets];
struct iovec iovecs[num_packets];
char *buffers[num_packets];
char *control_buffers[num_packets]; // 为每个消息分配控制缓冲区

// 准备每个消息
for (int i = 0; i < num_packets; i++) {
buffers[i] = malloc(packet_size);
if (!buffers[i]) {
perror("malloc failed for mmsg_gso buffer");
for (int j = 0; j < i; j++) {
free(buffers[j]);
}
return;
}

memset(buffers[i], 'X', packet_size);
buffers[i][packet_size - 1] = '\0';

int written = snprintf(buffers[i], packet_size,
"sendmmsg+GSO packet %d at timestamp: %ld",
i, time(NULL));
buffers[i][written] = 'X';

iovecs[i].iov_base = buffers[i];
iovecs[i].iov_len = packet_size; // 使用完整的数据包大小

// 为控制消息分配空间
control_buffers[i] = malloc(CMSG_SPACE(sizeof(uint16_t)));
if (!control_buffers[i]) {
perror("malloc failed for control buffer");
for (int j = 0; j <= i; j++) {
free(buffers[j]);
}
for (int j = 0; j < i; j++) {
free(control_buffers[j]);
}
return;
}

memset(&msgs[i], 0, sizeof(msgs[i]));
msgs[i].msg_hdr.msg_name = dest_addr;
msgs[i].msg_hdr.msg_namelen = sizeof(*dest_addr);
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;

// 设置 GSO 控制信息
msgs[i].msg_hdr.msg_control = control_buffers[i];
msgs[i].msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));

struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msgs[i].msg_hdr);
cmsg->cmsg_level = SOL_UDP;
cmsg->cmsg_type = UDP_SEGMENT;
cmsg->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cmsg)) = GSO_SIZE; // 设置 GSO 分段大小

// 注意:msg_controllen 必须精确设置为控制消息的总长度
msgs[i].msg_hdr.msg_controllen = cmsg->cmsg_len;
}

printf("Sending %d packets using sendmmsg + UDP GSO\n", num_packets);

int ret = sendmmsg(sockfd, msgs, num_packets, 0);

if (ret < 0) {
perror("sendmmsg with GSO failed");
} else {
printf("sendmmsg+GSO sent %d out of %d packets\n", ret, num_packets);

// 统计发送的字节数
size_t total_sent = 0;
for (int i = 0; i < ret; i++) {
total_sent += msgs[i].msg_len;
}
printf("Total bytes sent via sendmmsg+GSO: %zu\n", total_sent);
}

for (int i = 0; i < num_packets; i++) {
free(buffers[i]);
free(control_buffers[i]);
}
}

不同接口的性能对比测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// 性能测试函数
void performance_test(int sockfd, struct sockaddr_in *dest_addr)
{
const int iterations = 10000;
const int packets_per_batch = 32;
const int packet_size = 1216;
const unsigned long long total_bits = 8ULL * iterations * packets_per_batch * packet_size;

printf("\n=== Performance Test ===\n");
printf("Iterations: %d, Packets per batch: %d, Packet size: %d, GSO_SIZE: %d\n",
iterations, packets_per_batch, packet_size, GSO_SIZE);

// 测试 sendto
printf("\n1. Testing sendto without GSO...\n");
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_sendto(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double sendto_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
sleep(3);

// 测试 sendmmsg
printf("\n2. Testing sendmmsg...\n");
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_mmsg_batch(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double mmsg_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
sleep(3);

// 测试 UDP GSO + sendto 发大包
printf("\n3. Testing UDP GSO (merge sendto)...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO
sleep(3);

// 测试 UDP GSO + scatter-gather sendmsg without segment
printf("\n3. Testing UDP GSO (scatter-gather sendmsg without segment)...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch_sendmsg(sockfd, dest_addr, packet_size, packets_per_batch, 0);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_sendmsg_time0 = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO
sleep(3);

// 测试 UDP GSO + scatter-gather sendmsg with segment
printf("\n3. Testing UDP GSO (scatter-gather sendmsg with segment)...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch_sendmsg(sockfd, dest_addr, packet_size, packets_per_batch, 1);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_sendmsg_time1 = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO
sleep(3);

// 测试 UDP GSO + sendmsg 发大包
printf("\n4. Testing UDP GSO (merge sendmsg)...\n");
// enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch_improved(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_improved_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
// disable_udp_gso(sockfd); // 禁用 GSO
sleep(3);

// 测试 sendmmsg + UDP GSO
printf("\n5. Testing sendmmsg + UDP GSO...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_mmsg_gso_batch(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double mmsg_gso_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO

printf("\n=== Results ===\n");
printf("test params: iterations=%d, packets_per_batch=%d, packet_size=%d, GSO_SIZE=%d, total_bits=%llu\n",
iterations, packets_per_batch, packet_size, GSO_SIZE, total_bits);

printf("sendto time: %.6f seconds, bitrate=%.2f Mbps\n", sendto_time, total_bits / 1e6 / sendto_time);
printf("sendmmsg time: %.6f seconds, bitrate=%.2f Mbps\n", mmsg_time, total_bits / 1e6 / mmsg_time);
printf("UDP GSO(merge sendto) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_time, total_bits / 1e6 / gso_time);
printf("UDP GSO(scatter-gather sendmsg without segment) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_sendmsg_time0, total_bits / 1e6 / gso_sendmsg_time0);
printf("UDP GSO(scatter-gather sendmsg with segment) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_sendmsg_time1, total_bits / 1e6 / gso_sendmsg_time1);
printf("UDP GSO(merge sendmsg) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_improved_time, total_bits / 1e6 / gso_improved_time);
printf("sendmmsg + UDP GSO time: %.6f seconds, bitrate=%.2f Mbps\n", mmsg_gso_time, total_bits / 1e6 / mmsg_gso_time);

printf("sendmmsg speedup over sendto: %.2fx\n", sendto_time / mmsg_time);
printf("GSO(merge sendto) speedup over sendto: %.2fx\n", sendto_time / gso_time);
printf("GSO(scatter-gather sendmsg without segment) speedup over sendto: %.2fx\n", sendto_time / gso_sendmsg_time0);
printf("GSO(scatter-gather sendmsg with segment) speedup over sendto: %.2fx\n", sendto_time / gso_sendmsg_time1);
printf("GSO(merge sendmsg) speedup over sendto: %.2fx\n", sendto_time / gso_improved_time);
printf("sendmmsg + UDP GSO speedup over sendto: %.2fx\n", sendto_time / mmsg_gso_time);
}

测试结果

系统与硬件信息:

1
2
3
4
5
6
7
8
9
[root@svr22 ~]# uname -a
Linux svr22 5.15.117 #1 SMP Mon Mar 24 07:44:16 CST 2025 x86_64 x86_64 x86_64 GNU/Linux
[root@svr22 ~]# ethtool ens7f0np0 | grep Speed
Speed: 100000Mb/s
[root@svr22 ~]# ethtool -i ens7f0np0 | grep bus-info
bus-info: 0000:d8:00.0
[root@svr22 ~]# lspci | grep d8:00.0
d8:00.0 Ethernet controller: Mellanox Technologies MT27800 Family [ConnectX-5]
[root@svr22 ~]#

系统缓冲区配置:

1
2
3
4
5
6
7
8
9
10
11
100G:
sudo sysctl -w net.core.wmem_max=134217728
sudo sysctl -w net.core.wmem_default=134217728
sudo sysctl -w net.core.rmem_max=134217728
sudo sysctl -w net.core.rmem_default=134217728

ethtool -K ens7f0np0 gso on
ethtool -K ens7f0np0 gro on
ethtool -K ens7f0np0 tx-udp-segmentation on
ethtool -K ens7f0np0 rx-udp-gro-forwarding on
ethtool -K ens7f0np0 rx-gro-list on

所有接口全部执行一次:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int packet_size = 1216;
int num_packets = 32;
int mode = 0; // 0: 全部执行, 1-6: 执行单个模式

if (argc > 1) {
mode = atoi(argv[1]);
}

switch (mode) {
case 0: // 全部执行
printf("\n=== sendto(without GSO) Example ===\n");
send_sendto(sockfd, &dest_addr, packet_size, num_packets);

printf("\n=== sendmmsg Example ===\n");
send_mmsg_batch(sockfd, &dest_addr, packet_size, num_packets);

printf("\n=== UDP GSO(merge sendto) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);

printf("\n=== UDP GSO(scatter-gather sendmsg without segment) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_sendmsg(sockfd, &dest_addr, packet_size, num_packets, 0);
disable_udp_gso(sockfd);

printf("\n=== UDP GSO(scatter-gather sendmsg with segment) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_sendmsg(sockfd, &dest_addr, packet_size, num_packets, 1);
disable_udp_gso(sockfd);

printf("\n=== UDP GSO(merge sendmsg) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_improved(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);

printf("\n=== sendmmsg + UDP GSO Example ===\n");
enable_udp_gso(sockfd);
send_mmsg_gso_batch(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);
break;

客户端输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
[root@svr22 202509281610]# ./client7_udp_gso_test 0
=== sendto(without GSO) Example ===
Sending 32 packets using sendto, total buffer: 38912 bytes
Approximately 32 packets sent without GSO, total sent 38912 bytes

=== sendmmsg Example ===
Sending 32 packets using sendmmsg
sendmmsg sent 32 out of 32 packets
Total bytes sent via sendmmsg: 38912

=== UDP GSO(merge sendto) Example ===
UDP GSO enabled with segment size: 1472
Sending 32 packets using UDP GSO, total buffer: 38912 bytes
GSO sent 38912 bytes, expected 38912 bytes
Approximately 32 packets sent via GSO
UDP GSO disabled

=== UDP GSO(scatter-gather sendmsg without segment) Example ===
UDP GSO enabled with segment size: 1472
Sending 32 optimized merged packets, total size: 38912 bytes
Optimized merged batch sent 38912 total bytes
UDP GSO disabled

=== UDP GSO(scatter-gather sendmsg with segment) Example ===
UDP GSO enabled with segment size: 1472
Sending 32 optimized merged packets, total size: 38912 bytes
Optimized merged batch sent 38912 total bytes
UDP GSO disabled

=== UDP GSO(merge sendmsg) Example ===
UDP GSO enabled with segment size: 1472
Sending 32 packets using UDP GSO, total buffer: 38912 bytes
GSO sent 38912 bytes, expected 38912 bytes, Approximately 32 packets sent via GSO
UDP GSO disabled

=== sendmmsg + UDP GSO Example ===
UDP GSO enabled with segment size: 1472
Sending 32 packets using sendmmsg + UDP GSO
sendmmsg+GSO sent 32 out of 32 packets
Total bytes sent via sendmmsg+GSO: 38912
UDP GSO disabled

服务端输出(可能分多批接收,但最终接收到的总字节数会与发送端一致):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
[root@svr20 202509281610]# ./server7_udp_gso_test 1
UDP GSO Receiver listening on port 8888...
Using simple recvfrom mode
Received packet 1 from 192.168.110.222:45888, size: 10944 bytes
Likely GSO packet, containing multiple segments
Received packet 2 from 192.168.110.222:45888, size: 59584 bytes
Likely GSO packet, containing multiple segments
Received packet 3 from 192.168.110.222:45888, size: 7296 bytes
Likely GSO packet, containing multiple segments
Received packet 4 from 192.168.110.222:45888, size: 38912 bytes
Likely GSO packet, containing multiple segments
Received packet 5 from 192.168.110.222:45888, size: 38912 bytes
Likely GSO packet, containing multiple segments
Received packet 6 from 192.168.110.222:45888, size: 38912 bytes
Likely GSO packet, containing multiple segments
Received packet 7 from 192.168.110.222:45888, size: 38912 bytes
Likely GSO packet, containing multiple segments
Received packet 8 from 192.168.110.222:45888, size: 38912 bytes
Likely GSO packet, containing multiple segments

[root@svr20 202509281610]# ./server7_udp_gso_test 3
UDP GSO Receiver listening on port 8888...
Using recvmsg mode with GRO support
Received packet 1 from 192.168.110.222:34281, size: 3648 bytes
GSO packet detected, segment size: 1216 bytes
Received packet 2 from 192.168.110.222:34281, size: 4864 bytes
GSO packet detected, segment size: 1216 bytes
Received packet 3 from 192.168.110.222:34281, size: 4864 bytes
GSO packet detected, segment size: 1216 bytes
Received packet 4 from 192.168.110.222:34281, size: 4864 bytes
GSO packet detected, segment size: 1216 bytes
Received packet 5 from 192.168.110.222:34281, size: 2432 bytes
GSO packet detected, segment size: 1216 bytes
Received packet 6 from 192.168.110.222:34281, size: 1216 bytes
Received packet 7 from 192.168.110.222:34281, size: 2432 bytes
Received packet 8 from 192.168.110.222:34281, size: 2432 bytes
Received packet 9 from 192.168.110.222:34281, size: 2432 bytes
Received packet 10 from 192.168.110.222:34281, size: 2432 bytes
Received packet 11 from 192.168.110.222:34281, size: 7296 bytes
Received packet 12 from 192.168.110.222:34281, size: 2432 bytes
Received packet 13 from 192.168.110.222:34281, size: 36480 bytes
Received packet 14 from 192.168.110.222:34281, size: 38912 bytes
Received packet 15 from 192.168.110.222:34281, size: 38912 bytes
Received packet 16 from 192.168.110.222:34281, size: 38912 bytes
Received packet 17 from 192.168.110.222:34281, size: 38912 bytes
Received packet 18 from 192.168.110.222:34281, size: 1216 bytes
Received packet 19 from 192.168.110.222:34281, size: 37696 bytes

迭代多轮,性能结果(bitrate 只表明通过系统调用发送这些数据的速率,可能不能反应真实的发送速率,更不能表现接收速率):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
=== Results ===
test params: iterations=10000, packets_per_batch=32, packet_size=1216, GSO_SIZE=1472, total_bits=3112960000
sendto time: 0.974089 seconds, bitrate=3195.76 Mbps
sendmmsg time: 0.763818 seconds, bitrate=4075.52 Mbps
UDP GSO(merge sendto) time: 0.299842 seconds, bitrate=10382.01 Mbps
UDP GSO(scatter-gather sendmsg without segment) time: 0.290046 seconds, bitrate=10732.66 Mbps
UDP GSO(scatter-gather sendmsg with segment) time: 0.287964 seconds, bitrate=10810.25 Mbps
UDP GSO(merge sendmsg) time: 0.272673 seconds, bitrate=11416.46 Mbps
sendmmsg + UDP GSO time: 0.856162 seconds, bitrate=3635.95 Mbps
sendmmsg speedup over sendto: 1.28x
GSO(merge sendto) speedup over sendto: 3.25x
GSO(scatter-gather sendmsg without segment) speedup over sendto: 3.36x
GSO(scatter-gather sendmsg with segment) speedup over sendto: 3.38x
GSO(merge sendmsg) speedup over sendto: 3.57x
sendmmsg + UDP GSO speedup over sendto: 1.14x
perf-cmp

完整代码

接收侧(服务端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <errno.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <unistd.h>

#ifndef SOL_UDP
#define SOL_UDP (17)
#endif
#ifndef UDP_SEGMENT
#define UDP_SEGMENT (103)
#endif
#ifndef UDP_GRO
#define UDP_GRO (104)
#endif

#define PORT 8888
#define BUFFER_SIZE 65536
#define GSO_SIZE (1500 - 20 - 8) // 最大 GSO 分段大小

typedef enum {
RECV_MODE_SIMPLE = 1,
RECV_MODE_MMSG,
RECV_MODE_MSG
} recv_mode_t;

void simple_recv_mode(int sockfd)
{
struct sockaddr_in client_addr;
socklen_t addr_len = sizeof(client_addr);
char buffer[BUFFER_SIZE];
int packet_count = 0;

printf("Using simple recvfrom mode\n");

while (1) {
// memset(buffer, 0, BUFFER_SIZE);
int n = recvfrom(sockfd, buffer, BUFFER_SIZE, 0, (struct sockaddr *)&client_addr, &addr_len);

if (n > 0) {
packet_count++;
printf("Received packet %d from %s:%d, size: %d bytes\n",
packet_count, inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port), n);

if (n > 1500) {
printf(" Likely GSO packet, containing multiple segments\n");
}

int print_len = n < 100 ? n : 100;
// printf(" Data: %.*s\n", print_len, buffer);
} else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
perror("recvfrom failed");
break;
}
}
}

void mmsg_recv_mode(int sockfd)
{
struct sockaddr_in client_addr;
const int vlen = 256;
const int bufsize = 1500;
struct sockaddr peer_addrs[vlen];
struct mmsghdr msgs[vlen];
struct iovec iovecs[vlen];
unsigned char bufs[vlen][bufsize];
int packet_count = 0;

printf("Using recvmmsg mode\n");

memset(msgs, 0, sizeof(msgs));
for (int i = 0; i < vlen; i++) {
iovecs[i].iov_base = bufs[i];
iovecs[i].iov_len = bufsize;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &peer_addrs[i];
msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr);
}

while (1) {
int n = recvmmsg(sockfd, msgs, vlen, MSG_DONTWAIT, NULL);
if (n > 0) {
for (int i = 0; i < n; i++) {
packet_count++;
struct sockaddr_in *client_addr_ptr = (struct sockaddr_in *)&peer_addrs[i];
printf("Received packet %d from %s:%d, size: %d bytes\n",
packet_count, inet_ntoa(client_addr_ptr->sin_addr),
ntohs(client_addr_ptr->sin_port), msgs[i].msg_len);

if (msgs[i].msg_len > 1500) {
printf(" Likely GSO packet, containing multiple segments\n");
}

int print_len = msgs[i].msg_len < 100 ? msgs[i].msg_len : 100;
// printf(" Data: %.*s\n", print_len, (char *)iovecs[i].iov_base);
}
} else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
printf("recvmmsg failed: %s\n", strerror(errno));
break;
}
}
}

void msg_recv_mode(int sockfd)
{
struct sockaddr_in client_addr;
socklen_t peer_addrlen = sizeof(struct sockaddr);
unsigned char bufs[BUFFER_SIZE];
int packet_count = 0;

printf("Using recvmsg mode with GRO support\n");

struct sockaddr peer_addr;
char control[CMSG_SPACE(sizeof(int))] = { 0 };
struct msghdr msg = { 0 };
struct iovec iov = { 0 };
struct cmsghdr *cmsg;
int recv_bytes;

iov.iov_base = bufs;
iov.iov_len = BUFFER_SIZE;

msg.msg_iov = &iov;
msg.msg_iovlen = 1;
msg.msg_name = &peer_addr;
msg.msg_namelen = peer_addrlen;
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

while (1) {
recv_bytes = recvmsg(sockfd, &msg, MSG_DONTWAIT);
if (recv_bytes > 0) {
int gso_size = -1;
for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
if (cmsg->cmsg_level == SOL_UDP && cmsg->cmsg_type == UDP_GRO) {
gso_size = *(int *)CMSG_DATA(cmsg);
break;
}
}

packet_count++;
struct sockaddr_in *client_addr_ptr = (struct sockaddr_in *)&peer_addr;
printf("Received packet %d from %s:%d, size: %d bytes\n",
packet_count, inet_ntoa(client_addr_ptr->sin_addr),
ntohs(client_addr_ptr->sin_port), recv_bytes);

if (gso_size > 0) {
printf(" GSO packet detected, segment size: %d bytes\n", gso_size);
}

int print_len = recv_bytes < 100 ? recv_bytes : 100;
// printf(" Data: %.*s\n", print_len, (char *)iov.iov_base);
} else if (recv_bytes < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
printf("recvmsg failed: %s\n", strerror(errno));
break;
}
}
}

void print_usage(const char *program_name)
{
printf("Usage: %s <mode>\n", program_name);
printf("Modes:\n");
printf(" 1 - Simple recvfrom mode\n");
printf(" 2 - recvmmsg mode (batch receiving)\n");
printf(" 3 - recvmsg mode (with GRO support)\n");
}

int main(int argc, char *argv[])
{
if (argc != 2) {
print_usage(argv[0]);
return EXIT_FAILURE;
}

int mode = atoi(argv[1]);
if (mode < RECV_MODE_SIMPLE || mode > RECV_MODE_MSG) {
printf("Error: Invalid mode. Please choose 1, 2, or 3.\n");
print_usage(argv[0]);
return EXIT_FAILURE;
}

int sockfd;
struct sockaddr_in server_addr;

sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("socket creation failed");
return EXIT_FAILURE;
}

int gso_size = GSO_SIZE;
if (setsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
perror("setsockopt UDP_SEGMENT failed");
return -1;
}

int val = 1;
if (setsockopt(sockfd, IPPROTO_UDP, UDP_GRO, &val, sizeof(val)) < 0) {
perror("setsockopt UDP_GRO failed");
}

memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = htons(PORT);

if (bind(sockfd, (const struct sockaddr *)&server_addr, sizeof(server_addr)) < 0) {
perror("bind failed");
close(sockfd);
return EXIT_FAILURE;
}

printf("UDP GSO Receiver listening on port %d...\n", PORT);

switch (mode) {
case RECV_MODE_SIMPLE:
simple_recv_mode(sockfd);
break;
case RECV_MODE_MMSG:
mmsg_recv_mode(sockfd);
break;
case RECV_MODE_MSG:
msg_recv_mode(sockfd);
break;
}

close(sockfd);
return 0;
}

发送侧(客户端)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <linux/if_packet.h>
#include <net/if.h>
#include <netinet/in.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <time.h>
#include <unistd.h>

#ifndef SOL_UDP
#define SOL_UDP (17)
#endif
#ifndef UDP_SEGMENT
#define UDP_SEGMENT (103)
#endif

#define DEST_IP "192.168.110.220"
#define DEST_PORT 8888
#define GSO_SIZE (1500 - 20 - 8) // 最大 GSO 分段大小
#define SLEEP_SEC 3
#define MALLOC_INIT

// 启用 UDP GSO
int enable_udp_gso(int sockfd)
{
int gso_size = GSO_SIZE;

if (setsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
perror("setsockopt UDP_SEGMENT failed");
return -1;
}

printf("UDP GSO enabled with segment size: %d\n", gso_size);
return 0;
}

// 禁用 UDP GSO
int disable_udp_gso(int sockfd)
{
int gso_size = 0;

if (setsockopt(sockfd, SOL_UDP, UDP_SEGMENT, &gso_size, sizeof(gso_size)) < 0) {
perror("setsockopt UDP_SEGMENT disable failed");
return -1;
}

printf("UDP GSO disabled\n");
return 0;
}

// 创建 UDP socket
int create_udp_socket()
{
int sockfd = socket(AF_INET, SOCK_DGRAM, 0);
if (sockfd < 0) {
perror("socket creation failed");
exit(EXIT_FAILURE);
}

// 设置 socket 为阻塞模式(可选)
int flags = fcntl(sockfd, F_GETFL, 0);
fcntl(sockfd, F_SETFL, flags & ~O_NONBLOCK);

return sockfd;
}

void setup_dest_addr(struct sockaddr_in *dest_addr, const char *ip, int port)
{
memset(dest_addr, 0, sizeof(*dest_addr));
dest_addr->sin_family = AF_INET;
dest_addr->sin_port = htons(port);

if (inet_pton(AF_INET, ip, &dest_addr->sin_addr) <= 0) {
perror("invalid address");
exit(EXIT_FAILURE);
}
}

void send_sendto(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
size_t total_size = packet_size * num_packets;
size_t total_sent_size = 0;
char *send_buffer = malloc(packet_size);

if (!send_buffer) {
perror("malloc failed for send buffer");
return;
}

#ifdef MALLOC_INIT
memset(send_buffer, 'X', packet_size);
send_buffer[packet_size - 1] = '\0';
#endif

printf("Sending %d packets using sendto, total buffer: %zu bytes\n", num_packets, total_size);

for (int i = 0; i < num_packets; i++) {
int written = snprintf(send_buffer, packet_size,
"Send packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
#ifdef MALLOC_INIT
send_buffer[written] = 'X';
#endif

ssize_t sent = sendto(sockfd, send_buffer, packet_size, 0,
(struct sockaddr *)dest_addr, sizeof(*dest_addr));

if (sent < 0) {
perror("sendto without GSO failed");
} else {
total_sent_size += sent;
// printf("Send sent %zd bytes, expected %u bytes\n", sent, packet_size);
}
}
printf("Approximately %zd packets sent without GSO, total sent %lu bytes\n",
total_sent_size / packet_size, total_sent_size);

free(send_buffer);
}

// 使用 UDP GSO 发送批量数据包
void send_gso_batch(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
// 计算总缓冲区大小
size_t total_size = packet_size * num_packets;
char *gso_buffer = malloc(total_size);

if (!gso_buffer) {
perror("malloc failed for GSO buffer");
return;
}

#ifdef MALLOC_INIT
memset(gso_buffer, 'X', total_size);
gso_buffer[total_size - 1] = '\0';
#endif

// 准备批量数据
for (int i = 0; i < num_packets; i++) {
char *packet_start = gso_buffer + (i * packet_size);
int written = snprintf(packet_start, packet_size,
"GSO Batch packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
#ifdef MALLOC_INIT
packet_start[written] = 'X';
#endif
}

printf("Sending %d packets using UDP GSO, total buffer: %zu bytes\n",
num_packets, total_size);

// 使用 GSO 发送整个缓冲区
ssize_t sent = sendto(sockfd, gso_buffer, total_size, 0,
(struct sockaddr *)dest_addr, sizeof(*dest_addr));

if (sent < 0) {
perror("sendto with GSO failed");
} else {
printf("GSO sent %zd bytes, expected %zu bytes\n", sent, total_size);
printf("Approximately %zd packets sent via GSO\n", sent / packet_size);
}

free(gso_buffer);
}

void send_gso_batch_sendmsg(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets, int is_segment)
{
// 一次性分配所有内存:packet_buffers 数组 + iovec 数组 + 所有数据包缓冲区
size_t total_buffer_size = (size_t)packet_size * num_packets;
char *memory_block = malloc(num_packets * sizeof(char *) + num_packets * sizeof(struct iovec) + total_buffer_size);

if (!memory_block) {
perror("malloc failed for memory block");
return;
}

#ifdef MALLOC_INIT
memset(memory_block, 'X', total_buffer_size);
memory_block[total_buffer_size - 1] = '\0';
#endif

// 设置指针位置
struct iovec *msg_iov = (struct iovec *)(memory_block + num_packets * sizeof(char *));
char *data_buffer = (char *)(memory_block + num_packets * sizeof(char *) + num_packets * sizeof(struct iovec));

// 初始化数据包和 iovec
for (int i = 0; i < num_packets; i++) {
char *packet_start = data_buffer + (i * packet_size);

int written = snprintf(packet_start, packet_size,
"Optimized Merged Batch packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
#ifdef MALLOC_INIT
packet_start[written] = 'X';
#endif

msg_iov[i].iov_base = packet_start;
msg_iov[i].iov_len = packet_size;
}

// 设置 msghdr
struct msghdr msg;
memset(&msg, 0, sizeof(msg));
msg.msg_name = (void *)dest_addr;
msg.msg_namelen = sizeof(*dest_addr);
msg.msg_iov = msg_iov;
msg.msg_iovlen = num_packets;

if (is_segment) {
// 分配控制消息缓冲区
char control[CMSG_SPACE(sizeof(uint16_t))];
msg.msg_control = control;
msg.msg_controllen = sizeof(control);

// 设置 UDP_SEGMENT 控制消息
struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = packet_size;
}

printf("Sending %d optimized merged packets, total size: %zu bytes\n",
num_packets, total_buffer_size);

ssize_t sent = sendmsg(sockfd, &msg, 0);

if (sent < 0) {
perror("sendmsg with optimized merged packets failed");
} else {
printf("Optimized merged batch sent %zd total bytes\n", sent);
}

free(memory_block);
}

// 改进的使用 UDP GSO 发送批量数据包
void send_gso_batch_improved(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
// 计算总缓冲区大小
size_t total_size = packet_size * num_packets;
char *gso_buffer = malloc(total_size);

if (!gso_buffer) {
perror("malloc failed for GSO buffer");
return;
}

#ifdef MALLOC_INIT
memset(gso_buffer, 'X', total_size);
gso_buffer[total_size - 1] = '\0';
#endif

// 准备批量数据
for (int i = 0; i < num_packets; i++) {
char *packet_start = gso_buffer + (i * packet_size);
int written = snprintf(packet_start, packet_size,
"GSO Batch packet %d at timestamp: %ld, total packets: %d",
i, time(NULL), num_packets);
#ifdef MALLOC_INIT
packet_start[written] = 'X';
#endif
}

printf("Sending %d packets using UDP GSO, total buffer: %zu bytes\n",
num_packets, total_size);

struct iovec iov = {
.iov_base = gso_buffer,
.iov_len = total_size
};

struct msghdr msg = {
.msg_name = dest_addr,
.msg_namelen = sizeof(*dest_addr),
.msg_iov = &iov,
.msg_iovlen = 1,
.msg_control = NULL, // 使用 socket-level GSO 可以不加控制信息
.msg_controllen = 0,
.msg_flags = 0
};

// disable_udp_gso 时加控制信息
char control[CMSG_SPACE(sizeof(uint16_t))] = { 0 };
msg.msg_control = control;
msg.msg_controllen = sizeof(control);
struct cmsghdr *cm = CMSG_FIRSTHDR(&msg);
cm->cmsg_level = SOL_UDP;
cm->cmsg_type = UDP_SEGMENT;
cm->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cm)) = GSO_SIZE;

ssize_t sent = sendmsg(sockfd, &msg, 0);

if (sent < 0) {
perror("sendmsg with GSO failed");
} else {
printf("GSO sent %zd bytes, expected %zu bytes, Approximately %zd packets sent via GSO\n",
sent, total_size, sent / packet_size);

if (sent != total_size) {
printf("Warn: Sent %zd bytes less than expected (packet fragmentation may occur)\n",
total_size - sent);
}
}

free(gso_buffer);
}

// 使用 sendmmsg 发送批量数据包(用于对比)
void send_mmsg_batch(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
struct mmsghdr msgs[num_packets];
struct iovec iovecs[num_packets];
char *buffers[num_packets];

// 准备每个消息
for (int i = 0; i < num_packets; i++) {
buffers[i] = malloc(packet_size);
if (!buffers[i]) {
perror("malloc failed for mmsg buffer");
for (int j = 0; j < i; j++) {
free(buffers[j]);
}
return;
}

#ifdef MALLOC_INIT
memset(buffers[i], 'X', packet_size);
buffers[i][packet_size - 1] = '\0';
#endif

int written = snprintf(buffers[i], packet_size,
"sendmmsg packet %d at timestamp: %ld",
i, time(NULL));
#ifdef MALLOC_INIT
buffers[i][written] = 'X';
#endif

iovecs[i].iov_base = buffers[i];
iovecs[i].iov_len = packet_size; // 使用完整的数据包大小

memset(&msgs[i], 0, sizeof(msgs[i]));
msgs[i].msg_hdr.msg_name = dest_addr;
msgs[i].msg_hdr.msg_namelen = sizeof(*dest_addr);
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
}

printf("Sending %d packets using sendmmsg\n", num_packets);

int ret = sendmmsg(sockfd, msgs, num_packets, 0);

if (ret < 0) {
perror("sendmmsg failed");
} else {
printf("sendmmsg sent %d out of %d packets\n", ret, num_packets);

// 统计发送的字节数
size_t total_sent = 0;
for (int i = 0; i < ret; i++) {
total_sent += msgs[i].msg_len;
}
printf("Total bytes sent via sendmmsg: %zu\n", total_sent);
}

for (int i = 0; i < num_packets; i++) {
free(buffers[i]);
}
}

// 使用 sendmmsg + UDP GSO 发送批量数据包
void send_mmsg_gso_batch(int sockfd, struct sockaddr_in *dest_addr,
int packet_size, int num_packets)
{
struct mmsghdr msgs[num_packets];
struct iovec iovecs[num_packets];
char *buffers[num_packets];
char *control_buffers[num_packets]; // 为每个消息分配控制缓冲区

// 准备每个消息
for (int i = 0; i < num_packets; i++) {
buffers[i] = malloc(packet_size);
if (!buffers[i]) {
perror("malloc failed for mmsg_gso buffer");
for (int j = 0; j < i; j++) {
free(buffers[j]);
}
return;
}

#ifdef MALLOC_INIT
memset(buffers[i], 'X', packet_size);
buffers[i][packet_size - 1] = '\0';
#endif

int written = snprintf(buffers[i], packet_size,
"sendmmsg+GSO packet %d at timestamp: %ld",
i, time(NULL));
#ifdef MALLOC_INIT
buffers[i][written] = 'X';
#endif

iovecs[i].iov_base = buffers[i];
iovecs[i].iov_len = packet_size; // 使用完整的数据包大小

// 为控制消息分配空间
control_buffers[i] = malloc(CMSG_SPACE(sizeof(uint16_t)));
if (!control_buffers[i]) {
perror("malloc failed for control buffer");
for (int j = 0; j <= i; j++) {
free(buffers[j]);
}
for (int j = 0; j < i; j++) {
free(control_buffers[j]);
}
return;
}

memset(&msgs[i], 0, sizeof(msgs[i]));
msgs[i].msg_hdr.msg_name = dest_addr;
msgs[i].msg_hdr.msg_namelen = sizeof(*dest_addr);
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;

// 设置 GSO 控制信息
msgs[i].msg_hdr.msg_control = control_buffers[i];
msgs[i].msg_hdr.msg_controllen = CMSG_SPACE(sizeof(uint16_t));

struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msgs[i].msg_hdr);
cmsg->cmsg_level = SOL_UDP;
cmsg->cmsg_type = UDP_SEGMENT;
cmsg->cmsg_len = CMSG_LEN(sizeof(uint16_t));
*((uint16_t *)CMSG_DATA(cmsg)) = GSO_SIZE; // 设置 GSO 分段大小

// 注意:msg_controllen 必须精确设置为控制消息的总长度
msgs[i].msg_hdr.msg_controllen = cmsg->cmsg_len;
}

printf("Sending %d packets using sendmmsg + UDP GSO\n", num_packets);

int ret = sendmmsg(sockfd, msgs, num_packets, 0);

if (ret < 0) {
perror("sendmmsg with GSO failed");
} else {
printf("sendmmsg+GSO sent %d out of %d packets\n", ret, num_packets);

// 统计发送的字节数
size_t total_sent = 0;
for (int i = 0; i < ret; i++) {
total_sent += msgs[i].msg_len;
}
printf("Total bytes sent via sendmmsg+GSO: %zu\n", total_sent);
}

for (int i = 0; i < num_packets; i++) {
free(buffers[i]);
free(control_buffers[i]);
}
}

// 性能测试函数
void performance_test(int sockfd, struct sockaddr_in *dest_addr)
{
const int iterations = 10000;
const int packets_per_batch = 32;
const int packet_size = 1216;
const unsigned long long total_bits = 8ULL * iterations * packets_per_batch * packet_size;

printf("\n=== Performance Test ===\n");
printf("Iterations: %d, Packets per batch: %d, Packet size: %d, GSO_SIZE: %d\n",
iterations, packets_per_batch, packet_size, GSO_SIZE);

// 测试 sendto
printf("\n1. Testing sendto without GSO...\n");
struct timespec start, end;
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_sendto(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double sendto_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
sleep(SLEEP_SEC);

// 测试 sendmmsg
printf("\n2. Testing sendmmsg...\n");
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_mmsg_batch(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double mmsg_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
sleep(SLEEP_SEC);

// 测试 UDP GSO + sendto 发大包
printf("\n3. Testing UDP GSO (merge sendto)...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO
sleep(SLEEP_SEC);

// 测试 UDP GSO + scatter-gather sendmsg without segment
printf("\n3. Testing UDP GSO (scatter-gather sendmsg without segment)...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch_sendmsg(sockfd, dest_addr, packet_size, packets_per_batch, 0);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_sendmsg_time0 = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO
sleep(SLEEP_SEC);

// 测试 UDP GSO + scatter-gather sendmsg with segment
printf("\n3. Testing UDP GSO (scatter-gather sendmsg with segment)...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch_sendmsg(sockfd, dest_addr, packet_size, packets_per_batch, 1);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_sendmsg_time1 = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO
sleep(SLEEP_SEC);

// 测试 UDP GSO + sendmsg 发大包
printf("\n4. Testing UDP GSO (merge sendmsg)...\n");
// enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_gso_batch_improved(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double gso_improved_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
// disable_udp_gso(sockfd); // 禁用 GSO
sleep(SLEEP_SEC);

// 测试 sendmmsg + UDP GSO
printf("\n5. Testing sendmmsg + UDP GSO...\n");
enable_udp_gso(sockfd); // 启用 GSO
clock_gettime(CLOCK_MONOTONIC, &start);
for (int i = 0; i < iterations; i++) {
send_mmsg_gso_batch(sockfd, dest_addr, packet_size, packets_per_batch);
}
clock_gettime(CLOCK_MONOTONIC, &end);
double mmsg_gso_time = (end.tv_sec - start.tv_sec) + (end.tv_nsec - start.tv_nsec) / 1e9;
disable_udp_gso(sockfd); // 禁用 GSO

printf("\n=== Results ===\n");
printf("test params: iterations=%d, packets_per_batch=%d, packet_size=%d, GSO_SIZE=%d, total_bits=%llu\n",
iterations, packets_per_batch, packet_size, GSO_SIZE, total_bits);

printf("sendto time: %.6f seconds, bitrate=%.2f Mbps\n", sendto_time, total_bits / 1e6 / sendto_time);
printf("sendmmsg time: %.6f seconds, bitrate=%.2f Mbps\n", mmsg_time, total_bits / 1e6 / mmsg_time);
printf("UDP GSO(merge sendto) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_time, total_bits / 1e6 / gso_time);
printf("UDP GSO(scatter-gather sendmsg without segment) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_sendmsg_time0, total_bits / 1e6 / gso_sendmsg_time0);
printf("UDP GSO(scatter-gather sendmsg with segment) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_sendmsg_time1, total_bits / 1e6 / gso_sendmsg_time1);
printf("UDP GSO(merge sendmsg) time: %.6f seconds, bitrate=%.2f Mbps\n", gso_improved_time, total_bits / 1e6 / gso_improved_time);
printf("sendmmsg + UDP GSO time: %.6f seconds, bitrate=%.2f Mbps\n", mmsg_gso_time, total_bits / 1e6 / mmsg_gso_time);

printf("sendmmsg speedup over sendto: %.2fx\n", sendto_time / mmsg_time);
printf("GSO(merge sendto) speedup over sendto: %.2fx\n", sendto_time / gso_time);
printf("GSO(scatter-gather sendmsg without segment) speedup over sendto: %.2fx\n", sendto_time / gso_sendmsg_time0);
printf("GSO(scatter-gather sendmsg with segment) speedup over sendto: %.2fx\n", sendto_time / gso_sendmsg_time1);
printf("GSO(merge sendmsg) speedup over sendto: %.2fx\n", sendto_time / gso_improved_time);
printf("sendmmsg + UDP GSO speedup over sendto: %.2fx\n", sendto_time / mmsg_gso_time);
}

int main(int argc, char *argv[])
{
int sockfd;
struct sockaddr_in dest_addr;

printf("UDP GSO vs sendmmsg Performance Comparison\n");

// 创建 socket
sockfd = create_udp_socket();
setup_dest_addr(&dest_addr, DEST_IP, DEST_PORT);

if (argc > 1 && strcmp(argv[1], "test") == 0) {
// 运行性能测试
performance_test(sockfd, &dest_addr);
} else {
// 分别演示
int packet_size = 1216;
int num_packets = 32;
int mode = 0; // 0: 全部执行, 1-6: 执行单个模式

if (argc > 1) {
mode = atoi(argv[1]);
}

switch (mode) {
case 0: // 全部执行
printf("\n=== sendto(without GSO) Example ===\n");
send_sendto(sockfd, &dest_addr, packet_size, num_packets);

printf("\n=== sendmmsg Example ===\n");
send_mmsg_batch(sockfd, &dest_addr, packet_size, num_packets);

printf("\n=== UDP GSO(merge sendto) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);

printf("\n=== UDP GSO(scatter-gather sendmsg without segment) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_sendmsg(sockfd, &dest_addr, packet_size, num_packets, 0);
disable_udp_gso(sockfd);

printf("\n=== UDP GSO(scatter-gather sendmsg with segment) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_sendmsg(sockfd, &dest_addr, packet_size, num_packets, 1);
disable_udp_gso(sockfd);

printf("\n=== UDP GSO(merge sendmsg) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_improved(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);

printf("\n=== sendmmsg + UDP GSO Example ===\n");
enable_udp_gso(sockfd);
send_mmsg_gso_batch(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);
break;

case 1:
printf("\n=== sendto(without GSO) Example ===\n");
send_sendto(sockfd, &dest_addr, packet_size, num_packets);
break;

case 2:
printf("\n=== sendmmsg Example ===\n");
send_mmsg_batch(sockfd, &dest_addr, packet_size, num_packets);
break;

case 3:
printf("\n=== UDP GSO(merge sendto) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);
break;

case 4:
printf("\n=== UDP GSO(scatter-gather sendmsg without segment) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_sendmsg(sockfd, &dest_addr, packet_size, num_packets, 0);
disable_udp_gso(sockfd);
break;

case 5:
printf("\n=== UDP GSO(scatter-gather sendmsg with segment) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_sendmsg(sockfd, &dest_addr, packet_size, num_packets, 1);
disable_udp_gso(sockfd);
break;

case 6:
printf("\n=== UDP GSO(merge sendmsg) Example ===\n");
enable_udp_gso(sockfd);
send_gso_batch_improved(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);
break;

case 7:
printf("\n=== sendmmsg + UDP GSO Example ===\n");
enable_udp_gso(sockfd);
send_mmsg_gso_batch(sockfd, &dest_addr, packet_size, num_packets);
disable_udp_gso(sockfd);
break;

default:
printf("Invalid mode: %d\n", mode);
printf("Usage:\n");
printf(" %s # Run all examples\n", argv[0]);
printf(" %s <mode> # Run specific mode (1-6)\n", argv[0]);
break;
}
}

close(sockfd);
return 0;
}

参考资料:

  1. https://blog.hidva.com/2020/05/11/udp-gro-gso/
  2. https://calendar.perfplanet.com/2019/accelerating-udp-packet-transmission-for-quic/