进程间通信(Inter-Process Communication,IPC)是操作系统中的一个重要概念,它允许不同进程之间进行数据交换、共享资源和相互协作。在多任务操作系统中,进程通常是相互独立的,彼此隔离且不能直接访问对方的内存。因此,为了实现进程间的交互,需要使用 IPC 机制。同时,本文章也记录了各种 IPC 通信机制中的常见系统调用 API 的功能描述。

进程间通信方式

  • 共享文件(Shared file):共享文件允许 多个进程访问同一个文件,并在这个文件上进行读取和写入操作。

  • 共享内存(Shared memory (with semaphores)):共享内存允许 多个进程访问同一块内存区域 ,这样它们就可以直接读取或写入数据,而无需进行复制。共享内存通常是 最快的 IPC 方法之一,因为它避免了数据的复制操作。

  • 信号量(Semaphores):信号量是一种用于控制对共享资源的访问的同步机制。它可以用来解决竞态条件(Race Condition)和临界区(Critical Section)问题,确保多个进程之间的互斥和同步。

  • 管道(Pipe (named and unnamed)):管道是一种 单向通信机制,用于在相关进程之间传输数据。它通常用于具有亲缘关系(父子进程)的进程之间的通信。管道有匿名管道和命名管道两种类型。

  • 消息队列(Message queues):消息队列是一种进程间通信的方式,其中的 消息被放置在一个队列中,并且可以由多个接收者进行读取。消息队列通常是通过消息队列标识符来进行引用的。

  • 套接字(Socket):套接字是一种在网络上进行进程间通信的通用方法。它允许 不同计算机上的进程进行通信,可以用于实现客户端 - 服务器模型等应用。

如果需要高效的数据传输,共享内存可能是一个不错的选择;如果需要在不同机器上的进程进行通信,套接字则是一个更合适的选择。

核心概念

进程是正在执行的程序,每个进程都有自己的地址空间,其中包括允许进程访问的内存位置。

进程内的线程共享各种资源,特别是地址空间。因此,进程内的线程可以通过共享内存直接通信。但是,不同的进程,默认情况下,不共享内存。

启动进程然后进行通信的方法有很多种,主要有两种方法:

  • 使用终端来启动一个进程,使用不同的终端来启动另一个进程。
  • 在一个进程(父进程)内调用系统函数 fork 以生成另一进程(子进程)。

Shared Storage

Shared File

共享文件可能是最基本的 IPC 机制。考虑相对简单的情况,其中一个进程(生产者)创建并写入文件,而另一个进程(消费者)从同一个文件中读取。

1
2
3
         writes  +-----------+  reads 
producer-------->| disk file |<-------consumer
+-----------+

使用此 IPC 机制的明显挑战是 可能会出现竞态条件:生产者和消费者可能会同时访问文件,从而导致结果不确定。为了避免竞态条件,必须以防止写入操作与任何其他操作(无论是读取还是写入)之间发生冲突的方式锁定文件。

  • 生产者应该在写入文件之前获得文件的独占锁。独占锁最多只能由一个进程持有,这排除了竞态条件,因为在释放锁之前没有其他进程可以访问该文件。
  • 在读取文件之前,消费者应该至少获得文件的共享锁。多个读取者可以同时持有共享锁,但是当单个读取者持有共享锁时,任何写入者都无法访问文件。

标准 I/O 库包含一个 名为 fcntl 的实用函数,可用于检查和操作文件上的独占锁和共享锁。该函数通过文件描述符进行工作,文件描述符是一个非负整数值,在进程内标识一个文件(不同进程中的不同文件描述符可能标识同一个物理文件)。

对于文件锁定,Linux 提供了库函数 flock,它是 fcntl 的 thin wrapper。第一个示例使用 fcntl 函数探索 API 细节。

producer

Example 1. the producer program producer.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

#define FileName "data.txt"
#define DataString "Now is the winter of our discontent\nMade glorious summer by this sun of York\n"

void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}

int main() {
struct flock lock;
lock.l_type = F_WRLCK; /* read/write (exclusive versus shared) lock */
lock.l_whence = SEEK_SET; /* base for seek offsets */
lock.l_start = 0; /* 1st byte in file */
lock.l_len = 0; /* 0 here means 'until EOF' */
lock.l_pid = getpid(); /* process id */

int fd; /* file descriptor to identify a file within a process */

if ((fd = open(FileName, O_RDWR | O_CREAT, 0666)) < 0) /* -1 signals an error */
report_and_exit("open failed...");

if (fcntl(fd, F_SETLK, &lock) < 0) /** F_SETLK doesn't block, F_SETLKW does **/
report_and_exit("fcntl failed to get lock...");
else {
write(fd, DataString, strlen(DataString)); /* populate data file */
fprintf(stderr, "Process %d has written to data file...\n", lock.l_pid);
}

/* Now release the lock explicitly. */
lock.l_type = F_UNLCK;
if (fcntl(fd, F_SETLK, &lock) < 0)
report_and_exit("explicit unlocking failed...");

close(fd); /* close the file: would unlock if needed */
return 0; /* terminating the process would unlock as well */
}

上述生产者程序的主要步骤可以总结如下:

1)程序声明了一个 struct flock 类型的变量,它代表一个锁,并初始化了该结构的五个字段,F_WRLCK 使锁成为独占(读写)锁而不是共享(只读)锁。该变量主要作用是锁定整个文件,但也可以通过 l_startl_len 来锁定指定的字节。

1
2
3
4
5
6
7
8
9
// vim /usr/include/x86_64-linux-gnu/bits/fcntl.h +35
struct flock
{
short int l_type; /* Type of lock: F_RDLCK, F_WRLCK, or F_UNLCK. */
short int l_whence; /* How to interpret l_start: SEEK_SET, SEEK_CUR, SEEK_END. */
__off_t l_start; /* Offset where the lock begins. */
__off_t l_len; /* Size of the locked area; zero means until EOF. */
__pid_t l_pid; /* Process holding the lock. */
};

2)第一次调用 fcntl(fd, F_SETLK, &lock) 尝试以独占方式锁定文件。第二个参数 F_SETLK 表示对 fcntl 的调用不会阻塞:函数立即返回,要么授予锁,要么指示失败。如果改用标志 F_SETLKW(末尾的 W 表示 wait ),则对 fcntl 的调用将阻塞,直到可以获取锁为止。

1
2
3
4
5
   int fcntl(int fd, int cmd, ... /* arg */ );

DESCRIPTION
fcntl() performs one of the operations described below on the
open file descriptor fd. The operation is determined by cmd.

3)写入文件后,生产者将文件锁结构的 l_type 字段更改为解锁值 F_UNLCK,并调用 fcntl 执行解锁操作。

consumer

Example 2. the consumer program consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define FileName "data.txt"

void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}

int main() {
struct flock lock;
lock.l_type = F_WRLCK; /* read/write (exclusive) lock */
lock.l_whence = SEEK_SET; /* base for seek offsets */
lock.l_start = 0; /* 1st byte in file */
lock.l_len = 0; /* 0 here means 'until EOF' */
lock.l_pid = getpid(); /* process id */

int fd; /* file descriptor to identify a file within a process */

if ((fd = open(FileName, O_RDONLY)) < 0) /* -1 signals an error */
report_and_exit("open to read failed...");

/* If the file is write-locked, we can't continue. */
fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */
if (lock.l_type != F_UNLCK)
report_and_exit("file is still write locked...");

lock.l_type = F_RDLCK; /* prevents any writing during the reading */
if (fcntl(fd, F_SETLK, &lock) < 0)
report_and_exit("can't get a read-only lock...");

/* Read the bytes (they happen to be ASCII codes) one at a time. */
int c; /* buffer for read bytes */
while (read(fd, &c, 1) > 0) /* 0 signals EOF */
write(STDOUT_FILENO, &c, 1); /* write one byte to the standard output */

/* Release the lock explicitly. */
lock.l_type = F_UNLCK;
if (fcntl(fd, F_SETLK, &lock) < 0)
report_and_exit("explicit unlocking failed...");

close(fd);
return 0;
}

消费者程序首先检查文件是否被独占锁定,然后才尝试获得共享锁。fcntl 调用中指定的 F_GETLK 操作检查锁,在本例中,是上面第一个语句中以 F_WRLCK 形式给出的独占锁。如果指定的锁不存在,则 fcntl 调用会自动将锁类型字段更改为 F_UNLCK 以指示这一事实。如果文件被独占锁定,则消费者终止(该程序的更强大版本可能会让消费者稍微休息一下,然后重试几次)。

如果文件当前未锁定,则消费者尝试获取共享(只读)锁 F_RDLCK。获得共享锁后,消费者程序从文件中一次读取一个字节,将字节打印到标准输出,释放锁,关闭文件,然后终止。

为了缩短程序,尝试获取共享(只读)锁后,可以删除对 fcntlF_GETLK 调用,因为如果其他某个进程已持有互斥(读写)锁,则 F_RDLCK 赋值操作不会成功。

特点

共享文件的内容可能是大量的、任意字节(例如数字化电影),这使得文件共享成为一种非常灵活的 IPC 机制。缺点是文件访问无论是读还是写都比较慢。下一个示例通过共享内存(而不是共享文件)展现了 IPC 的优势,并相应提高了性能。

Shared memory

默认情况下,为共享内存提供的 POSIX API 将共享内存实现为内存映射文件:对于共享内存段,系统维护一个具有相应内容的后备文件(backing file)。POSIX 下的共享内存可以在没有后备文件的情况下进行配置,但这可能会影响可移植性。

下面的示例使用带有支持后备文件的 POSIX API,它 结合了内存访问(速度)和文件存储(持久性)的优点

共享内存示例有两个程序,名为 memwritermemreader,并 使用信号量来协调(同步)它们对共享内存的访问——因为无论是在多进程还是多线程中,基于内存的竞态条件的风险也是如此。

1
2
3
4
5
6
// file shmem.h
#define ByteSize 512
#define BackingFile "/shMemEx"
#define AccessPerms 0644
#define SemaphoreName "mysemaphore"
#define MemContents "This is the way the world ends...\n"

memwriter

Example 3. source code for the memwriter process memwriter.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/** Compilation: gcc -o memwriter memwriter.c -lrt -lpthread **/
#include <fcntl.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include "shmem.h"

void report_and_exit(const char* msg) {
perror(msg);
exit(-1);
}

int main() {
int fd = shm_open(BackingFile, /* name from smem.h */
O_RDWR | O_CREAT, /* read/write, create if needed */
AccessPerms); /* access permissions (0644) */
if (fd < 0)
report_and_exit("Can't open shared mem segment...");

ftruncate(fd, ByteSize); /* get the bytes */

caddr_t memptr = mmap(NULL, /* let system pick where to put segment */
ByteSize, /* how many bytes */
PROT_READ | PROT_WRITE, /* access protections */
MAP_SHARED, /* mapping visible to other processes */
fd, /* file descriptor */
0); /* offset: start at 1st byte */
if ((caddr_t)-1 == memptr)
report_and_exit("Can't get segment...");

fprintf(stderr, "shared mem address: %p [0..%d]\n", memptr, ByteSize - 1);
fprintf(stderr, "backing file: /dev/shm%s\n", BackingFile);

/* semahore code to lock the shared mem */
sem_t* semptr = sem_open(SemaphoreName, /* name */
O_CREAT, /* create the semaphore */
AccessPerms, /* protection perms */
0); /* initial value */
if ((void*)-1 == semptr)
report_and_exit("sem_open");

strcpy(memptr, MemContents); /* copy some ASCII bytes to the segment */

/* increment the semaphore so that memreader can read */
if (sem_post(semptr) < 0)
report_and_exit("sem_post");

sleep(12); /* give reader a chance */

/* clean up */
munmap(memptr, ByteSize); /* unmap the storage */
close(fd);
sem_close(semptr);
shm_unlink(BackingFile); /* unlink from the backing file */
return 0;
}

二进制信号量是一种特殊情况,只需要两个值:0 和 1,充当互斥锁。共享内存示例使用信号量作为互斥体。当信号量的值为 0 时,只有 memwriter 可以访问共享内存。写入后,该进程会增加信号量的值,从而允许 memreader 读取共享内存。

以下概述了 memwriter 和 memreader 程序如何通过共享内存进行通信:

1)如上所示,memwriter 程序调用 shm_open 函数来获取系统与共享内存协调的 backing file 的文件描述符。此时,还没有分配任何内存。随后调用名称具有误导性的函数 ftruncate 分配 ByteSize 字节。

  • memwriter 和 memreader 程序 仅访问共享内存,而不访问后备文件。系统负责同步共享内存和后备文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
   int shm_open(const char *name, int oflag, mode_t mode);
int shm_unlink(const char *name);

DESCRIPTION
shm_open() creates and opens a new, or opens an existing, POSIX
shared memory object. A POSIX shared memory object is in effect
a handle which can be used by unrelated processes to mmap(2) the
same region of shared memory. The shm_unlink() function performs
the converse operation, removing an object previously created by
shm_open().

int ftruncate(int fildes, off_t length);

DESCRIPTION
If fildes is not a valid file descriptor open for writing, the
ftruncate() function shall fail.

If fildes refers to a regular file, the ftruncate() function
shall cause the size of the file to be truncated to length. If
the size of the file previously exceeded length, the extra data
shall no longer be available to reads on the file. If the file
previously was smaller than this size, ftruncate() shall increase
the size of the file. If the file size is increased, the extended
area shall appear as if it were zero-filled. The value of the
seek pointer shall not be modified by a call to ftruncate().

2)然后 memwriter 调用 mmap 函数,获取指向共享内存的指针。指针类型 caddr_t 以 c 开头,代表 calloc,这是一个将动态分配的存储初始化为零(initializes dynamically allocated storage to zeroes)的系统函数。

  • mmap 的第一个参数是 NULL,这意味着系统决定在虚拟地址空间中分配内存的位置。
  • MAP_SHARED 标志指示分配的内存可在进程之间共享,最后一个参数(在本例中为零)意味着共享内存的偏移量应该是第一个字节。
  • size 参数指定要分配的字节数,protection 参数指示共享内存可以写入和读取。
  • 函数执行成功后,系统在 /dev/shm 目录下创建并维护后备文件 /shMemEx
1
2
3
4
5
6
7
8
9
   void *mmap(void addr[.length], size_t length, int prot, int flags,
int fd, off_t offset);
int munmap(void addr[.length], size_t length);

DESCRIPTION
mmap() creates a new mapping in the virtual address space of the
calling process. The starting address for the new mapping is
specified in addr. The length argument specifies the length of
the mapping (which must be greater than 0).

3)此时,memwriter 已准备好写入,但它首先创建一个信号量以确保对共享内存的独占访问。如果对 sem_open 的调用成功,然后就可以处理写操作了。

  • 在 memwriter 和 memreader 进程中,通过 调用 sem_open 时使用相同的名称标识,可以访问同一个信号量
1
2
3
4
5
6
7
8
9
10
   sem_t *sem_open(const char *name, int oflag);
sem_t *sem_open(const char *name, int oflag,
mode_t mode, unsigned int value);

DESCRIPTION
sem_open() creates a new POSIX semaphore or opens an existing
semaphore. The semaphore is identified by name.

The oflag argument specifies flags that control the operation of
the call.

4)写入后,memwriter 通过 sem_post() 将信号量值增加到 1。增加信号量会释放互斥锁,并使 memreader 能够执行其读取操作。

5)最后,memwriter 进程还从 memwriter 地址空间取消共享内存的映射,这会阻止 memwriter 进一步访问共享内存;以及其它关闭操作。

  • 如果省略 shm_unlink 语句,则后备文件在程序终止后仍然存在。
1
2
3
4
5
6
7
8
9
   int munmap(void *addr, size_t len);

DESCRIPTION
The munmap() function shall remove any mappings for those entire
pages containing any part of the address space of the process
starting at addr and continuing for len bytes. Further references
to these pages shall result in the generation of a SIGSEGV signal
to the process. If there are no mappings in the specified
address range, then munmap() has no effect.

在 memwriter 和 memreader 程序中,主要感兴趣的共享内存函数是 shm_openmmap:如果成功,第一个调用将返回后备文件的文件描述符,第二个调用将使用该文件描述符来获取指向共享内存的指针部分。

memreader

Example 4. source code for the memreader process memreader

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
/** Compilation: gcc -o memreader memreader.c -lrt -lpthread **/
#include <fcntl.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include "shmem.h"

void report_and_exit(const char* msg) {
perror(msg);
exit(-1);
}

int main() {
int fd = shm_open(BackingFile, O_RDWR, AccessPerms); /* empty to begin */
if (fd < 0)
report_and_exit("Can't get file descriptor...");

/* get a pointer to memory */
caddr_t memptr = mmap(NULL, /* let system pick where to put segment */
ByteSize, /* how many bytes */
PROT_READ | PROT_WRITE, /* access protections */
MAP_SHARED, /* mapping visible to other processes */
fd, /* file descriptor */
0); /* offset: start at 1st byte */
if ((caddr_t)-1 == memptr)
report_and_exit("Can't access segment...");

/* create a semaphore for mutual exclusion */
sem_t* semptr = sem_open(SemaphoreName, /* name */
O_CREAT, /* create the semaphore */
AccessPerms, /* protection perms */
0); /* initial value */
if ((void*)-1 == semptr)
report_and_exit("sem_open");

/* use semaphore as a mutex (lock) by waiting for writer to increment it */
if (!sem_wait(semptr)) { /* wait until semaphore != 0 */
int i;
for (i = 0; i < strlen(MemContents); i++)
write(STDOUT_FILENO, memptr + i, 1); /* one byte at a time */
sem_post(semptr);
}

/* cleanup */
munmap(memptr, ByteSize);
close(fd);
sem_close(semptr);
unlink(BackingFile);
return 0;
}

与 memwriter 一样,memreader 在调用 sem_open 通过其名称来访问信号量。但是 memreader 然后进入等待状态,直到 memwriter 递增信号量。一旦等待结束,memreader 就会从共享内存中读取 ASCII 字节,随后进行清理并终止。

共享内存 API 包含显式同步共享内存段和后备文件的操作 msync(NULL, ByteSize, MS_SYNC)。示例中省略了这些操作,以减少混乱并将重点放在内存共享和信号量代码上。

输出

先执行 memwriter 进程,然后执行 memreader 进程,输出如下:

1
2
3
4
5
6
7
8
9
% ./memwriter
shared mem address: 0x7f924123f000 [0..511]
backing file: /dev/shm/shMemEx
(...sleep 12s, then to end...)
%

% ./memreader
This is the way the world ends...
%

特点

共享文件和共享内存示例展示了进程如何通过共享存储进行通信,一种情况是文件,另一种情况是内存段。这两种方法的 API 都相对简单。这些方法有共同的缺点吗?现代应用程序经常处理流数据,实际上是处理大量数据流。共享文件和共享内存方法都不太适合海量数据流。Channels of one type or another are better suited. Next part thus introduces pipes and message queues.

Pipes

本节转向管道,管道(pipes)是连接进程进行通信的通道(channels)。一个 channel 有一个用于写入字节的写端(write-end),以及一个用于以 FIFO 顺序读取这些字节的读端(read-end)。在典型使用中,一个进程向 channel 写入数据,而另一个进程从同一 channel 读取数据。

管道有两种类型:命名(named)的和匿名(unnamed)的,可以在命令行交互式地使用,也可以在程序内部使用

一般来说,基于 channel 的 IPC 是并发安全(concurrent-safe)的。

Unnamed pipes

在交互式命令行

让我们从一个命令行示例开始,该示例展示了匿名管道的工作原理。在所有现代系统中,竖线 | 表示命令行中的匿名管道。假设 % 是命令行提示符,并考虑以下命令:

1
% sleep 5 | echo "Hello, world!" ## writer to the left of |, reader to the right

sleepecho 实用程序作为单独的进程执行,并且匿名管道允许它们进行通信——该示例是人为设计的(没有发生通信)。问候语 Hello, world! 出现在屏幕上;然后,大约五秒钟后,命令行提示符返回,表明 sleepecho 进程都已退出。这是怎么回事?

在命令行的 | 语法中,左侧的进程(sleep)是写者,右侧的进程(echo)是读者。默认情况下,读者会阻塞,直到能从通道读取字节,而写者在写入字节后,通过发送流结束标记来完成(即使写者提前终止,流结束标记也会发送给读者)。匿名管道将持续存在,直到写者和读者都终止

刚刚不是说“默认情况下,读者会阻塞,直到能从通道读取字节”吗?那为什么是立即打印呢?

sleep 进程不会向通道写入任何字节,但会在大约五秒后终止,这会向通道发送流结束标记。同时,echo 进程立即将问候语写入标准输出(屏幕),因为该进程不会从通道读取任何字节,因此无需等待。

两个需要通信的例子,写者写入字节、读者从管道中读取字节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
% sleep 5 && echo "Data printed after waiting" && sleep 3 | echo "hello world!" | cat
Data printed after waiting (5s-th)
hello world! (5s-th)
% (8s-th)

% cat test.txt
this
is
the
way
the
world
ends
% cat test.txt | sort | uniq
ends
is
the
this
way
world
%

在程序内部

Example 1. a pipeUN.c with two processes communicating through an unnamed pipe.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h> /* exit functions */
#include <unistd.h> /* read, write, pipe, _exit */
#include <string.h>

#define ReadEnd 0
#define WriteEnd 1

void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /** failure **/
}

int main() {
int pipeFDs[2]; /* two file descriptors */
char buf; /* 1-byte buffer */
const char* msg = "Nature's first green is gold\n"; /* bytes to write */

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
pid_t cpid = fork(); /* fork a child process */
if (cpid < 0) report_and_exit("fork"); /* check for failure */

if (0 == cpid) { /*** child ***/ /* child process */
close(pipeFDs[WriteEnd]); /* child reads, doesn't write */

while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */

close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
_exit(0); /* exit and notify parent at once */
}
else { /*** parent ***/
close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */

write(pipeFDs[WriteEnd], msg, strlen(msg)); /* write the bytes to the pipe */
close(pipeFDs[WriteEnd]); /* done writing: generate eof */

wait(NULL); /* wait for child to exit */
exit(0); /* exit normally */
}
return 0;
}

上面的 pipeUN.c 程序使用系统函数 fork 来创建进程。尽管程序只有一个源文件,但在(成功)执行期间会发生多处理。

以下是库函数 fork 工作原理的快速回顾:

在父进程中调用的 fork 函数,返回值为整数类型的 pid_t 变量。如果 fork 调用成功,它就会生成(创建)一个新的子进程,向父进程返回一个值,但向子进程返回一个不同的值。父进程和子进程都执行调用 fork 之后的相同代码,但大概率(人为设计)走向不同的条件分支(通过 fork 成功时的返回值来分流)

  • 调用 fork 失败时,向父进程返回 -1。
  • 调用 fork 成功时,子进程返回 0、父进程返回子进程的 PID

如何在程序中使用匿名管道?

使用系统调用 pipe 来创建管道,数组参数用于返回:写入字节的写端(write-end)的文件描述符,以及一个用于以 FIFO 顺序读取这些字节的读端(read-end)的文件描述符。数组索引 0 对应读端文件描述符。索引 1 对应写端文件描述符。

1
2
3
4
5
6
7
8
9
10
   int pipe(int pipefd[2]);

DESCRIPTION
pipe() creates a pipe, a unidirectional data channel that can be
used for interprocess communication. The array pipefd is used to
return two file descriptors referring to the ends of the pipe.
pipefd[0] refers to the read end of the pipe. pipefd[1] refers
to the write end of the pipe. Data written to the write end of
the pipe is buffered by the kernel until it is read from the read
end of the pipe.

在这个例子中,父进程负责将数据写入匿名管道,子进程负责从匿名管道读取数据。所以,在各自进程的分支处理中,先把不使用的另一个文件描述符关闭了。

需要注意的细节

1)父进程中对 wait(NULL) 函数调用的作用?

虽然,子进程在很大程度上独立于其父进程、子进程可以执行与父进程无关的任意代码。但是,如果子进程终止,系统会通过信号通知父进程。

如果父进程先于子进程终止怎么办?在这种情况下,除非采取预防措施,否则子进程将成为并保持为僵尸进程,并在进程表中残留一个条目

预防措施有两大类。一种预防措施是让父进程通知系统,让其知道:父进程(自己)对子进程的终止没有兴趣

1
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二种方法是让父进程执行等待操作,等待任何一个子进程的终止,从而确保父进程的寿命比子进程的寿命长

wait() 系统调用

1
2
3
4
5
6
7
8
9
10
11
12
13
   pid_t wait(int *_Nullable wstatus);
pid_t waitpid(pid_t pid, int *_Nullable wstatus, int options);

DESCRIPTION
All of these system calls are used to wait for state changes in a
child of the calling process, and obtain information about the
child whose state has changed. A state change is considered to
be: the child terminated; the child was stopped by a signal; or
the child was resumed by a signal. In the case of a terminated
child, performing a wait allows the system to release the
resources associated with the child; if a wait is not performed,
then the terminated child remains in a "zombie" state (see NOTES
below).

系统调用 wait() 用于调用进程的子进程的状态改变时,获取它的状态:子进程终止、子进程通过信号被暂停、子进程通过信号被恢复。“子进程终止” 状态下,允许系统释放分配给子进程的资源,以防止子进程成为僵尸进程。

示例代码:

1
2
3
4
5
int wstatus;
pid_t wc = wait(&wstatus); // wait code
printf("parent process (PID: %d), wc: %d, wstatus: %d, wait if normal exited: %s\n", \
getpid(), wc, wstatus, WIFEXITED(wstatus) ? "true" : "false");
exit(EXIT_SUCCESS);

pipeUN.c 程序还采取了另一种预防措施。当父进程完成等待时,父进程将通过调用常规退出函数来终止。相比之下,子进程通过调用 _exit 变体来终止,这会快速跟踪终止通知。实际上,子进程会告诉系统:「尽快」通知父进程,自己已经终止

2)如果多个进程同时向管道中写入,会不会出现并发(交错)写入?

POSIX 标准确保只要写入不超过 PIPE_BUF 字节(4096 bytes on Linux),写入就不会交错。正如本节一开始说的:基于 channel 的 IPC 是并发安全的。

Named pipes

匿名管道没有后备文件:系统维护一个内存缓冲区(in-memory buffer),用于将字节从写者传输到读者。一旦写者和读者终止,缓冲区就会被回收,匿名管道就会消失。相比之下,命名管道有一个后备文件和一个独特的 API。

管道的基本特性是先进先出(FIFO),写入管道的数据会按照顺序被读取。当写者进程往管道中写入数据时,如果没有读者进程在等待读取数据,写者进程可能会被阻塞,直到有读者进程读取了数据。相反,如果读者进程试图从空管道中读取数据,读者进程也会被阻塞,直到有写者进程向管道中写入数据

在交互式命令行

让我们看另一个命令行示例来了解命名管道的要点。打开两个终端,两者的工作目录应该相同。

终端 1:

1
2
3
% mkfifo tester ## creates a backing file with FIFO named pipes
% cat tester ## type the pipe's contents to stdout

一开始,终端中不应出现任何内容,因为尚未将任何内容写入命名管道。

终端 2:

1
2
3
4
% cat > tester ## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye, bye ## ditto
<Control-C> ## terminate session with a Control-C

在这个终端中输入的任何内容都会在另一个终端中得到回显。输入 Ctrl+C 后,两个终端都会返回常规命令行提示符:管道已关闭。

使用 unlink tester 可以删除并清理已创建的命名管道的文件。

在程序内部

Linux 有一个名为 mkfifo 的库函数,它在程序中创建一个命名管道。

1
2
3
4
5
6
7
   int mkfifo(const char *pathname, mode_t mode);

DESCRIPTION
mkfifo() makes a FIFO special file with name pathname. mode
specifies the FIFO's permissions. It is modified by the
process's umask in the usual way: the permissions of the created
file are (mode & ~umask).

下面的示例给出两个进程:一个写入数据到命名管道的进程,一个从该管道读取数据并进行业务处理的进程。

Example 2. the fifoWriter program fifoWriter.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <time.h>
#include <unistd.h>

#define MaxLoops 12000 /* outer loop */
#define ChunkSize 16 /* how many written at a time */
#define IntsPerChunk 4 /* four 4-byte ints per chunk */
#define MaxZs 250 /* max microseconds to sleep */

int main() {
const char* pipeName = "./fifoChannel";
mkfifo(pipeName, 0666); /* read/write for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
if (fd < 0)
return -1; /** error **/

printf("FIFO named pipe has created and opend\n");
int i;
for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */
int j;
for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */
int k;
int chunk[IntsPerChunk];
for (k = 0; k < IntsPerChunk; k++) {
chunk[k] = rand();
}
if (i == 0 && j == 0)
printf("debug, verify if the writer process is blocked\n");
write(fd, chunk, sizeof(chunk));
}
usleep((rand() % MaxZs) + 1); /* pause a bit for realism */
}

close(fd); /* close pipe: generates an end-of-file */
unlink(pipeName); /* unlink from the implementing file */
printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);

return 0;
}

其中,mkfifo 创建了一个指定名称的后备文件,并指定了文件权限。然后使用熟悉的 open 函数调用打开命名管道,该函数返回一个文件描述符。同时,为了更加真实,fifoWriter.c 不会一次写入所有数据,而是写入一个块,休眠随机微秒数。最后,关闭命名管道后,fifoWriter.c 还会取消文件链接。

当仅运行了这个进程后,你会发现不会有任何信息被打印到屏幕上,这是因为:如果没有读者进程在等待读取数据,写者进程可能会被阻塞,直到有读者进程读取了数据。

Example 3. the fifoReader program fifoReader.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#include <fcntl.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>

/* not pretty, but gets the job done efficiently */
unsigned int is_prime(unsigned n) {
if (n <= 3)
return n > 1;
if (0 == (n % 2) || 0 == (n % 3))
return 0;

unsigned int i;
for (i = 5; (i * i) <= n; i += 6)
if (0 == (n % i) || 0 == (n % (i + 2)))
return 0;

return 1; /* found a prime! */
}

int main() {
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
if (fd < 0)
return -1; /* no point in continuing */
unsigned int count = 0, total = 0, primes_count = 0;

while (1) {
int next;
int i;
ssize_t count = read(fd, &next, sizeof(int));

if (0 == count) {
break; /* end of stream */
} else if (count == sizeof(int)) { /* read a 4-byte int value */
total++;
if (is_prime(next)) {
primes_count++;
}
}
}

close(fd); /* close pipe from read end */
unlink(file); /* unlink from the underlying file */
printf("Received ints: %u, primes: %u\n", total, primes_count);

return 0;
}

因为 fifoWriter.c 创建命名管道,所以 fifoReader.c 仅需要以只读方式打开后备文件,来访问管道即可。

read 调用返回 0 表示结束流(end-of-stream)。在这种情况下,fifoReader.c 会跳出循环,关闭命名管道,并在终止之前取消链接后备文件。

输出

1
2
3
4
5
6
7
% ./fifoWriter
FIFO named pipe has created and opend # run ./fifoReader and print it out
debug, verify if the writer process is blocked
768000 ints sent to the pipe.

% ./fifoReader
Received ints: 768000, primes: 37682

特点

命名管道是一种高度可靠且高效的 IPC 机制,因此被广泛使用。

Message queue

管道具有严格的 FIFO 行为:写入的第一个字节是读取的第一个字节,写入的第二个字节是读取的第二个字节,依此类推。消息队列可以以相同的方式运行,但足够灵活,可以不按 FIFO 顺序检索字节块(byte chunks)

顾名思义,消息队列是一个消息序列,每个消息都有两部分:

  • 负载(payload),它是一个字节数组;
  • 类型(type),以无符号长整型形式给出;通过类型对消息进行分类,以便灵活检索(通过类型达到不按 FIFO 顺序检索字节块的效果)。

考虑以下消息队列,其中每条消息都标有整数类型:

1
2
3
          +-+    +-+    +-+    +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
+-+ +-+ +-+ +-+

如果采用严格的 FIFO 行为,则消息将以 1-2-2-3 的顺序接收。然而,消息队列允许其他检索顺序。例如,接收方可以按 3-2-1-2 的顺序检索消息。

Example 4. The header file queue.h, use it in sender.c and receiver.c

1
2
3
4
5
6
7
8
9
#define ProjectId 123
#define PathName "queue.h" /* any existing, accessible file would do */
#define MsgLen 4
#define MsgCount 6

typedef struct {
long type; /* must be of type long */
char payload[MsgLen + 1]; /* bytes in the message */
} queuedMessage;

ProjectId 可以是任何正整数值,PathName 必须是现有的、可访问的文件。这两个参数用于使用 ftok 函数调用生成 System V IPC key。

1
2
3
4
5
6
7
   key_t ftok(const char *pathname, int proj_id);
DESCRIPTION
The ftok() function uses the identity of the file named by the
given pathname (which must refer to an existing, accessible file)
and the least significant 8 bits of proj_id (which must be
nonzero) to generate a key_t type System V IPC key, suitable for
use with msgget(2), semget(2), or shmget(2).

sender

Example 5. the message sender program sender.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "queue.h"

void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}

int main() {
key_t key = ftok(PathName, ProjectId);
if (key < 0)
report_and_exit("couldn't get key...");

int qid = msgget(key, 0666 | IPC_CREAT);
if (qid < 0)
report_and_exit("couldn't get queue id...");

char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
int i;
for (i = 0; i < MsgCount; i++) {
/* build the message */
queuedMessage msg;
msg.type = types[i];
strcpy(msg.payload, payloads[i]);

/* send the message */
msgsnd(qid, &msg, MsgLen + 1, IPC_NOWAIT); /* don't block */
printf("%s sent as type %i\n", msg.payload, (int)msg.type);
}
return 0;
}

msgget() 系统调用

  • 描述:msgget() 系统调用返回与 key 参数值 关联的 System V 消息队列标识符。
  • 返回值:成功时,返回消息队列标识符(非负整数)。失败时,返回 -1,并设置 errno 来指示错误。
1
2
3
4
5
6
7
8
   int msgget(key_t key, int msgflg);

DESCRIPTION
The msgget() system call returns the System V message queue
identifier associated with the value of the key argument. It may
be used either to obtain the identifier of a previously created
message queue (when msgflg is zero and key does not have the
value IPC_PRIVATE), or to create a new set.

msgget 系统调用中,0666 | IPC_CREAT 表示:

  • 0666:八进制权限位,指定了消息队列的权限,表示该消息队列将被所有用户读写访问。
  • IPC_CREAT:这是一个标志位,指示如果消息队列不存在,则创建一个新的消息队列。如果消息队列已经存在,它将被忽略

msgsnd() 系统调用

  • 描述:msgsnd() 函数应将消息发送到与 msqid 指定的消息队列标识符 关联的 队列。
  • 返回值:成功完成后,将返回 0;否则,不发送任何消息,返回 -1,并设置 errno 来指示错误。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
   int msgsnd(int msqid, const void *msgp, size_t msgsz, int msgflg);

DESCRIPTION
The msgsnd() function shall send a message to the queue
associated with the message queue identifier specified by msqid.

The application shall ensure that the argument msgp points to a
user-defined buffer that contains first a field of type long
specifying the type of the message, and then a data portion that
holds the data bytes of the message. The structure below is an
example of what this user-defined buffer might look like:

struct mymsg {
long mtype; /* Message type. */
char mtext[1]; /* Message text. */
}

The structure member mtype is a non-zero positive type long that
can be used by the receiving process for message selection.

The structure member mtext is any text of length msgsz bytes. The
argument msgsz can range from 0 to a system-imposed maximum.

在本例中,发送方按照 1-1-2-2-3-3 的顺序发送消息,但接收方随后按照 3-1-2-1-3-2 的顺序检索消息,这表明 消息队列没有绑定严格的 FIFO 行为

receiver

Example 6. the message receiver program receiver.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
#include <stdio.h>
#include <stdlib.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include "queue.h"

void report_and_exit(const char* msg) {
perror(msg);
exit(-1); /* EXIT_FAILURE */
}

int main() {
key_t key = ftok(PathName, ProjectId); /* key to identify the queue */
if (key < 0)
report_and_exit("key not gotten...");

int qid = msgget(key, 0666 | IPC_CREAT); /* access but not create if created already */
if (qid < 0)
report_and_exit("no access to queue...");

int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
int i;
for (i = 0; i < MsgCount; i++) {
queuedMessage msg; /* defined in queue.h */
if (msgrcv(qid, &msg, MsgLen + 1, types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
puts("msgrcv trouble...");
printf("%s received as type %i\n", msg.payload, (int)msg.type);
}

/** remove the queue **/
if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
report_and_exit("trouble removing queue...");

return 0;
}

因为 sender.c 已经创建了消息队列,所以在 receiver.cmsgget 系统调用中,IPC_CREAT 标志位将被忽略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
   ssize_t msgrcv(int msqid, void *msgp, size_t msgsz, long msgtyp,
int msgflg);

DESCRIPTION
The msgrcv() function shall read a message from the queue
associated with the message queue identifier specified by msqid
and place it in the user-defined buffer pointed to by msgp.

The received message shall be truncated to msgsz bytes if it is
larger than msgsz and (msgflg & MSG_NOERROR) is non-zero. The
truncated part of the message shall be lost and no indication of
the truncation shall be given to the calling process.
RETURN VALUE
Upon successful completion, msgrcv() shall return a value equal
to the number of bytes actually placed into the buffer mtext.
Otherwise, no message shall be received, msgrcv() shall return
-1, and errno shall be set to indicate the error.

注意:当接收进程通过调用 msgctl 显式删除队列后,队列才会消失;否则,队列仍然存在(即使写入队列进程已经退出)。

1
2
3
4
5
   int msgctl(int msqid, int cmd, struct msqid_ds *buf);

DESCRIPTION
msgctl() performs the control operation specified by cmd on the
System V message queue with identifier msqid.

输出

上述代码的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3
%

% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2
%

如果将 receiver.c 代码中的 msgctl 函数调用注释掉,再把 for 循环改成 i < MsgCount/2,则接收方有以下输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
% ./receiver
msg6 received as type 3 # 没接收完,且没显示删除消息队列 -> 还能继续接收
msg2 received as type 1
msg4 received as type 2
% ./receiver
msgrcv trouble... # 消息队列中没有消息了(如果这一步前再执行一次./sender,则这一步还能继续接收到消息)
received as type 0
msgrcv trouble...
received as type 0
msgrcv trouble...
received as type 0
%

特点

管道和消息队列的 API 本质上是单向的:一个进程写入,另一个进程读取。虽然也有双向管道的实现,但作者 Marty Kalin 认为 IPC 机制越简单越好。作者认为消息队列仍然是 IPC 工具箱中有用的工具,尽管它们的流行度下降了。

Sockets

正如 pipes 有两种类型(命名和匿名)一样,socket 也是如此。

  • IPC sockets(又名 Unix domain sockets)可以为 同一物理设备(主机)上的进程 启用基于通道的通信。
  • Network sockets 可以在 不同主机上运行的进程 启用基于通道的通信,从而发挥网络作用。

Network sockets 需要底层协议的支持,例如 tcp (transmission control protocol) or the lower-level udp (user datagram protocol)。相比之下,IPC sockets 依赖本地系统内核来支持通信;实际上,IPC sockets 使用本地文件作为 socket address 进行通信。尽管存在这些实现差异,但 IPC sockets 和 Network sockets 的 APIs 在本质上是相同的。

配置为流的套接字是双向的(bidirectional),并且控制遵循 client/server 模式:客户端通过尝试连接到服务器来发起对话,服务器尝试接受连接。如果一切正常,来自客户端的请求和来自服务器的响应可以流经通道,直到两端关闭,从而中断连接。

Example 0. The header file sock.h, use it in server.c and client.c

1
2
3
4
5
#define PortNumber 9876
#define MaxConnects 8
#define BuffSize 256
#define ConversationLen 3
#define Host "127.0.0.1"

下面的示例涵盖了 Network sockets,但服务器和客户端程序可以在同一台计算机上运行——服务器使用网络地址 localhost (127.0.0.1),即本地主机地址。

server

Example 1. The socket server server.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
#include <arpa/inet.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "sock.h"

void report(const char* msg, int terminate) {
perror(msg);
if (terminate)
exit(-1); /* failure */
}

int main() {
int fd = socket(AF_INET, /* network versus AF_LOCAL */
SOCK_STREAM, /* reliable, bidirectional: TCP */
0); /* system picks underlying protocol */
if (fd < 0)
report("socket", 1); /* terminate */

/* bind the server's local address in memory */
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr)); /* clear the bytes */
saddr.sin_family = AF_INET; /* versus AF_LOCAL */
saddr.sin_addr.s_addr = htonl(INADDR_ANY); /* host-to-network endian */
saddr.sin_port = htons(PortNumber); /* for listening */

if (bind(fd, (struct sockaddr*)&saddr, sizeof(saddr)) < 0)
report("bind", 1); /* terminate */

/* listen to the socket */
if (listen(fd, MaxConnects) < 0) /* listen for clients, up to MaxConnects */
report("listen", 1); /* terminate */

fprintf(stderr, "Listening on port %i for clients...\n", PortNumber);
/* a server traditionally listens indefinitely */
while (1) {
struct sockaddr_in caddr; /* client address */
int len = sizeof(caddr); /* address length could change */

int client_fd = accept(fd, (struct sockaddr*)&caddr, &len); /* accept blocks */
if (client_fd < 0) {
report("accept", 0); /* don't terminated, though there's a problem */
continue;
}

/* read from client */
int i;
for (i = 0; i < ConversationLen; i++) {
char buffer[BuffSize + 1];
memset(buffer, '\0', sizeof(buffer));
int count = read(client_fd, buffer, sizeof(buffer));
if (count > 0) {
puts(buffer);
write(client_fd, buffer, sizeof(buffer)); /* echo as confirmation */
}
}
close(client_fd); /* break connection */
}
return 0;
}

上面的服务器程序执行经典的四步,为客户端请求做好准备,然后接受各个请求。每个步骤(也是调用顺序):

  1. socket():获取套接字连接的文件描述符;
  2. bind():将套接字绑定到服务器主机上的某个地址;
  3. listen():监听客户端的请求;
  4. accept():接受特定客户端的请求。

socket() 系统调用

1
2
3
4
5
6
7
8
9
10
11
   int socket(int domain, int type, int protocol);

DESCRIPTION
socket() creates an endpoint for communication and returns a file
descriptor that refers to that endpoint. The file descriptor
returned by a successful call will be the lowest-numbered file
descriptor not currently open for the process.

RETURN VALUE
On success, a file descriptor for the new socket is returned. On
error, -1 is returned, and errno is set to indicate the error.

第一个参数指定网络套接字 AF_INET 而不是 IPC 套接字 AF_LOCAL

1
2
AF_UNIX/AF_LOCAL    Local communication
AF_INET IPv4 Internet protocols

第二个参数有多个选项,但 SOCK_STREAMSOCK_DGRAM(数据报)可能是最常用的。

1
2
3
4
5
6
7
8
SOCK_STREAM
Provides sequenced, reliable, two-way, connection-based
byte streams. An out-of-band data transmission mechanism
may be supported.

SOCK_DGRAM
Supports datagrams (connectionless, unreliable messages of
a fixed maximum length).

第三个参数指定协议。对于这里使用的基于流的套接字——参数 0 代表 TCP。

返回值:因为成功调用套接字会返回熟悉的文件描述符,所以 套接字的写入和读取语法与本地文件等相同

bind() 系统调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
   int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

DESCRIPTION
When a socket is created with socket(2), it exists in a name
space (address family) but has no address assigned to it. bind()
assigns the address specified by addr to the socket referred to
by the file descriptor sockfd. addrlen specifies the size, in
bytes, of the address structure pointed to by addr.
Traditionally, this operation is called “assigning a name to a
socket”.

RETURN VALUE
On success, zero is returned. On error, -1 is returned, and
errno is set to indicate the error.

功能:bind() 系统调用将 sockfd 套接字 绑定到分配的内存地址 addr 上

动态分配一个端口号

对于网络套接字,如果套接字地址结构中的端口号(如 sin_port of struct sockaddr_in)被指定为 0,则在 bind() 系统调用时,OS 会自动为套接字分配一个可用的临时端口号(如果 OS 定义的临时端口范围内的所有端口号丢已经被其他套接字使用,则会发生“EADDRINUSE 地址使用错误”)。

动态分配端口后,如何获取这个端口值

1
2
3
// 通过 getsockname 系统调用,获取 fd 绑定到的当前地址(位于 addr 指向的缓冲区中)
getsockname(fd, (struct sockaddr *)&saddr, &sizeof(saddr));
port = ntohs(saddr.sin_port); // 从当前地址中取出指定(或动态分配)的端口号

getsockname() 系统调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
   int getsockname(int sockfd, struct sockaddr *restrict addr,
socklen_t *restrict addrlen);

DESCRIPTION
getsockname() returns the current address to which the socket
sockfd is bound, in the buffer pointed to by addr. The addrlen
argument should be initialized to indicate the amount of space
(in bytes) pointed to by addr. On return it contains the actual
size of the socket address.

RETURN VALUE
On success, zero is returned. On error, -1 is returned, and
errno is set to indicate the error.

server.c 中,bind() 的第二个参数为什么要强转成 struct sockaddr 类型呢

addr 是一个指向 sockaddr 结构体的指针,用于指定要绑定到 套接字的地址信息。这个结构体的具体类型取决于套接字的地址族(Address Family),通常是 sockaddr_in 结构体(用于 IPv4 地址族)或 sockaddr_un 结构体(用于 UNIX 本地通信域)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Describes a socket address
#include <sys/socket.h>
struct sockaddr {
sa_family_t sa_family; /* Address family */
char sa_data[]; /* Socket address */
};

// Internet domain sockets
#include <netinet/in.h>
struct sockaddr_in {
sa_family_t sin_family; /* AF_INET for Internet domain sockets */
in_port_t sin_port; /* Port number */
struct in_addr sin_addr; /* IPv4 address */
};

struct in_addr {
in_addr_t s_addr;
};

typedef uint32_t in_addr_t;

// UNIX domain sockets
#include <sys/un.h>
struct sockaddr_un {
sa_family_t sun_family; /* AF_LOCAL(AF_UNIX) for UNIX domain sockets */
char sun_path[]; /* Socket pathname */
};

listen() 系统调用

1
2
3
4
5
6
   int listen(int sockfd, int backlog);

DESCRIPTION
listen() marks the socket referred to by sockfd as a passive
socket, that is, as a socket that will be used to accept incoming
connection requests using accept(2).

作用:listen() 函数是用于使套接字处于监听状态,以便接受来自客户端的连接请求。当调用 listen() 函数时,套接字会由主动套接字(active socket)变为被动套接字(passive socket),从而使其 处于监听状态 。在这个状态下,套接字将开始接受连接请求,但并 不立即处理这些连接请求 ,而是将它们 放入内核中的连接队列中等待处理

第二个参数 backlog 指定了 OS 在内核中维护的连接队列的最大长度。这个参数决定了同时等待被服务器接受的连接请求的数量。

accept() 系统调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
   int accept(int sockfd, struct sockaddr *_Nullable restrict addr,
socklen_t *_Nullable restrict addrlen);

DESCRIPTION
The accept() system call is used with connection-based socket
types (SOCK_STREAM, SOCK_SEQPACKET). It extracts the first
connection request on the queue of pending connections for the
listening socket, sockfd, creates a new connected socket, and
returns a new file descriptor referring to that socket. The
newly created socket is not in the listening state. The original
socket sockfd is unaffected by this call.

RETURN VALUE
On success, these system calls return a file descriptor for the
accepted socket (a nonnegative integer). On error, -1 is
returned, errno is set to indicate the error, and addrlen is left
unchanged.

作用:accept() 函数用于在服务器端接受客户端连接请求。

  • 该调用默认为阻塞等待:服务器不执行任何操作,直到客户端尝试连接然后继续。
  • 调用成功时,返回一个新的套接字文件描述符(a read/write socket),用于与客户端进行通信:服务器使用 read/write socket 读取来自客户端的请求并写回响应

参数 sockfd 仅用于接受客户端连接。

参数 addraddrlen 用于获取客户端的地址信息。

  • 在函数调用之前,需要将一个足够大的 sockaddr 结构体传递给 addr 参数,并将其长度传递给 addrlen 参数。accept() 函数返回时,addr 指向的结构体将被填充为客户端的地址信息,而 addrlen 则会被更新为 addr 结构体的实际长度

client

Example 2. The socket client client.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <arpa/inet.h>
#include <netdb.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include "sock.h"

const char* books[] = {"War and Peace", "Pride and Prejudice", "The Sound and the Fury"};

void report(const char* msg, int terminate) {
perror(msg);
if (terminate)
exit(-1); /* failure */
}

int main() {
/* fd for the socket */
int sockfd = socket(AF_INET, /* versus AF_LOCAL */
SOCK_STREAM, /* reliable, bidirectional */
0); /* system picks protocol (TCP) */
if (sockfd < 0)
report("socket", 1); /* terminate */

/* connect to the server: configure server's address 1st */
struct sockaddr_in saddr;
memset(&saddr, 0, sizeof(saddr));
saddr.sin_family = AF_INET;
saddr.sin_addr.s_addr = inet_addr(Host);
saddr.sin_port = htons(PortNumber); /* port number in big-endian */

if (connect(sockfd, (struct sockaddr*)&saddr, sizeof(saddr)) < 0)
report("connect", 1);

/* Write some stuff and read the echoes. */
puts("Connect to server, about to write some stuff...");
int i;
for (i = 0; i < ConversationLen; i++) {
if (write(sockfd, books[i], strlen(books[i])) > 0) {
/* get confirmation echoed from server and print */
char buffer[BuffSize + 1];
memset(buffer, '\0', sizeof(buffer));
if (read(sockfd, buffer, sizeof(buffer)) > 0)
puts(buffer);
}
}
puts("Client done, about to exit...");
close(sockfd); /* close the connection */
return 0;
}

客户端程序的代码与服务器的类似。两者之间的主要区别在于客户端既不 listens 也不 accepts,而是 connects。

connect() 调用可能会因多种原因而失败,例如:

  • 客户端的服务器地址错误。
  • 已经有太多客户端连接到服务器。

在这个示例中,如果连接操作成功,客户端将写入请求,然后在 for 循环中读取服务器发回来的回显响应。会话结束后,服务器和客户端都 close the read/write socket(那个 accept() 调用返回的新的套接字文件描述符),尽管任何一方的关闭操作都足以关闭连接。此后客户端退出,但如前所述,服务器仍保持开放状态。

connect() 系统调用

1
2
3
4
5
6
7
8
9
10
11
12
    int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

DESCRIPTION
The connect() system call connects the socket referred to by the
file descriptor sockfd to the address specified by addr. The
addrlen argument specifies the size of addr. The format of the
address in addr is determined by the address space of the socket
sockfd; see socket(2) for further details.

RETURN VALUE
If the connection or binding succeeds, zero is returned. On
error, -1 is returned, and errno is set to indicate the error.

作用:connect() 函数是一个阻塞函数,在连接建立完成之前会阻塞程序的执行。其作用是将客户端的套接字连接到指定的服务器地址和端口上。在调用 connect() 函数之前,客户端需要先创建一个套接字,并调用 bind() 函数绑定到本地地址(如果需要的话),然后才能调用 connect() 函数与服务器建立连接。

inet_addr() 辅助函数

1
2
3
4
5
6
7
8
9
10
    in_addr_t inet_addr(const char *cp);

DESCRIPTION
The inet_addr() function shall convert the string pointed to by
cp, in the standard IPv4 dotted decimal notation, to an integer
value suitable for use as an Internet address.

RETURN VALUE
Upon successful completion, inet_addr() shall return the Internet
address. Otherwise, it shall return (in_addr_t)(-1).

作用:inet_addr() 将字符串格式的点分十进制 IPv4 地址,转换为网络地址格式。

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
% ./server
Listening on port 9876 for clients...
War and Peace
Pride and Prejudice
The Sound and the Fury # while(1) -> 一直监听


% ./client
Connect to server, about to write some stuff...
War and Peace
Pride and Prejudice
The Sound and the Fury
Client done, about to exit...
%

总结

套接字示例将请求消息回显给客户端,暗示了服务器和客户端之间 进行任意丰富对话的可能性。也许这就是套接字的主要吸引力。在现代系统中,客户端应用程序(例如数据库客户端)通过套接字与服务器进行通信是很常见的。如前所述,本地 IPC sockets 和 Network sockets 仅在一些实现细节上有所不同;一般来说,IPC sockets 的开销较低,性能较好。两者的通信 API 本质上是相同的。

Signals

信号(signal)中断正在执行的程序并与其通信 :信号是一种用于通知进程发生了某种事件的机制。 当其它进程接收到信号时,OS 会中断进程的正常执行,然后根据接收到的信号执行相应的操作

除了由 OS 发出信号外,一个进程也可以向另一个进程发送信号,从而使信号成为一种 IPC 机制。大多数信号可以被忽略(阻止)或 处理(通过指定的代码)

在 UNIX 系统可以通过 kill -l 来查看支持的信号列表,以下是部分信号的功能:

信号名 POSIX 标准编号 描述
SIGKILL 9 强制终止进程,无法被捕获或忽略
SIGUSR1 10 用户自定义信号 1
SIGUSR2 12 用户自定义信号 2
SIGCHLD 17 子进程状态发生改变
SIGTERM 15 请求终止进程,通常是正常终止

用户交互中可能会产生(arise)信号。例如,用户从命令行按下 Ctrl+C 来终止从命令行启动的程序;Ctrl+C 生成 SIGTERM (15) 信号。与 SIGKILL (9) 不同,用于终止的 SIGTERM (15) 可以被阻止或处理。

kill() 系统调用

1
2
3
4
5
   int kill(pid_t pid, int sig);

DESCRIPTION
The kill() system call can be used to send any signal to any
process group or process.

如果函数 kill 的第一个参数大于 0,则该参数被视为目标进程的 PID;如果参数为 0,则该参数标识信号发送者所属的进程组。

shutdown

Example 3. the graceful shutdown of a multi-processing system shutdown.c

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/wait.h>
#include <unistd.h>

void graceful(int signum) {
printf("\tChild confirming received signal: %i\n", signum);
puts("\tChild about to terminate gracefully...");

sleep(1);

puts("\tChild terminating now...");
_exit(0); /* fast-track notification of parent */
}

void set_handler() {
struct sigaction current;

sigemptyset(&current.sa_mask); /* clear the signal set */
current.sa_flags = 0; /* enables setting sa_handler, not sa_action */
current.sa_handler = graceful; /* specify a handler */

sigaction(SIGTERM, &current, NULL); /* register the handler */
}

void child_code() {
set_handler();

while (1) { /** loop until interrupted **/
sleep(1);
puts("\tChild just woke up, but going back to sleep.");
}
}

void parent_code(pid_t cpid) {
puts("Parent sleeping for a time...");
sleep(5);

/* Try to terminate child. */
if (-1 == kill(cpid, SIGTERM)) {
perror("kill");
exit(-1);
}
wait(NULL); /** wait for child to terminate **/
puts("My child terminated, about to exit myself...");
}

int main() {
pid_t cpid = fork();
if (cpid < 0) {
perror("fork");
exit(-1);
} else if (0 == cpid) {
child_code();
} else {
parent_code(cpid);
}
return 0;
}

上述代码中,来自父进程的 SIGTERM 信号(通过 kill() 系统调用),导致子进程优雅地执行信号处理 callback 函数。父进程等待子进程正常终止后,自身也正常终止。

对于信号处理,该示例使用 sigaction 函数(POSIX 推荐的),而不是 signal 信号函数(它存在可移植性问题)。

struct sigaction 数据结构

1
2
3
4
5
6
7
struct sigaction {
void (*sa_handler)(int);
void (*sa_sigaction)(int, siginfo_t *, void *);
sigset_t sa_mask;
int sa_flags;
void (*sa_restorer)(void);
};
  • sa_flags:整数值,包含了一些标志位,用于控制信号处理的行为。
  • sa_mask:信号集,用于在执行信号处理函数期间阻止其他特定信号的发送。
  • sa_handler:函数指针,指向一个简单的信号处理函数,接收一个信号编号。当 sa_flags 没有设置 SA_SIGINFO 标志时使用。
  • sa_sigaction:函数指针,指向一个更复杂的信号处理函数。当 sa_flags 设置了 SA_SIGINFO 标志时使用。

sigaction() 系统调用

1
2
3
4
5
   int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact);

DESCRIPTION
The sigaction() system call is used to change the action taken by
a process on receipt of a specific signal.
  • signum:指定了要捕获的信号的编号。比如,SIGTERM 代表终止信号等。
  • act:一个指向 struct sigaction 结构的指针,该结构定义了新的信号处理程序和一些信号处理的选项。如果想要忽略信号,可以将 act 设置为 NULL
  • oldact:用于存储以前关联的信号处理程序信息,以供后续使用。如果不需要此信息,也可以将其设置为 NULL

输出

1
2
3
4
5
6
7
8
9
10
11
% ./shutdown
Parent sleeping for a time...
Child just woke up, but going back to sleep.
Child just woke up, but going back to sleep.
Child just woke up, but going back to sleep.
Child just woke up, but going back to sleep.
Child confirming received signal: 15
Child about to terminate gracefully...
Child terminating now...
My child terminated, about to exit myself...
%

参考资料:

  1. A guide to inter-process communication in Linux