读者写者问题是并发程序设计中的经典问题。问题描述为:对于同一块共享资源,读 - 写操作互斥,写 - 写操作互斥,读 - 读操作允许(读操作可以同时并行)。本文是对 操作系统之经典同步问题 文章中读写问题的具体代码实现。

读者 - 写者问题的读、写操作限制(仅读者优先或写者优先)

  • 写 - 写互斥:即不能有两个写者同时进行写操作。
  • 读 - 写互斥:即不能同时有一个线程在读,而另一个线程在写。
  • 读 - 读允许:即可以有一个或多个读者线程在读。

读者优先信号量实现

模型分析

读者优先定义:只要有一个读者正在读取共享资源,后来的读者都会被接纳(即使这些读者是在正在等待的写者之后到来的,也会优先让这些读者读取共享资源)。如果读者源源不断的出现,那么写者将始终处于阻塞状态(可能会出现饿死)。

例如有以下访问序列:

  • 到达时刻:读者 1 -> 读者 2 -> 写者 a -> 读者 3,且写者 a 到达时有读者正在访问共享资源。
  • 访问顺序:读者 1 -> 读者 2 -> 读者 3 -> 写者 a,即写者后的读者 3 可以插队到先来的写者 a 之前,访问共享资源。

读者优先允许同时有多个读者在读取资源。若将这些读者分为第一个读者、中间的读者和最后一个读者,那么这三类读者有不同的动作:

  • 第一个读者:尝试读 -> 可以读 + 不可以写 -> 读操作 -> 结束读;
  • 中间的读者:尝试读 -> 可以读 -> 读操作 -> 结束读;
  • 最后的读者:尝试读 -> 可以读 -> 读操作 -> 结束读 + 可以写

也就是,第一个读者要负责禁止写者执行写操作,最后一个读者负责恢复写者执行写操作

写者线程就简单了,只要你能拿到锁,你就能执行写操作。能拿到锁的场景:没有读者了(一直没有或最后一个读者结束读)。

多线程读者优先的信号量实现(代码在后面)

模拟数据

测试数据文件包括多行测试数据,每行数据从左到右依次为线程标识、读写类型、动作前等待时间、动作持续时间。其中,

  • 动作前等待时间:即线程创建后,延迟相应时间(单位为秒)后发出对共享资源的读写申请。
  • 动作持续时间:一旦线程读写申请成功,开始对共享资源进行读写操作,该操作持续相应时间后结束,并释放共享资源。
线程标识 读写类型 动作前等待时间 (秒) 动作持续时间 (秒)
0 R 1 5
1 R 3 5
2 W 4 5
3 R 5 2
4 R 6 5
5 W 7 3

例如,第四行表示一个读者线程 3,在它被创建 5 秒后,尝试申请读操作;申请成功后,读取资源 2 秒并释放资源、退出线程。

功能实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#define MAX_THREAD_NR (100)

/* 线程的动作信息:依次为线程标识、类型、动作前等待时间、动作持续时间 */
typedef struct tagThreadAction {
unsigned int uiTid;
char cRwType;
unsigned int uiDelay;
unsigned int uiDuration;
} THREAD_ACTION_S;

/* 读者优先的锁结构:读者计数器、计数器信号量、写者信号量 ⭐ */
typedef struct {
unsigned int uiRdCnt;
sem_t stRdCntMutex;
sem_t stWrMutex;
} READ_FIRST_LOCK_S;

READ_FIRST_LOCK_S g_stRdFirstLock; // 多个线程共享的全局资源 ⭐

time_t stStartTime;

void* writer(void* arg) {
THREAD_ACTION_S* pstThrAction = (THREAD_ACTION_S*)arg;

sleep(pstThrAction->uiDelay);
printf("[%02.0lf 秒] 写者 #%d 等待写入 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);

sem_wait(&g_stRdFirstLock.stWrMutex); // wait-W ⭐
printf("[%02.0lf 秒] 写者 #%d 开始写入 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);
sleep(pstThrAction->uiDuration); // 模拟写入操作(这期间要持有写锁)
sem_post(&g_stRdFirstLock.stWrMutex); // post-W ⭐

printf("[%02.0lf 秒] 写者 #%d 写入结束 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);

return NULL;
}

void* reader(void* arg) {
THREAD_ACTION_S* pstThrAction = (THREAD_ACTION_S*)arg;

sleep(pstThrAction->uiDelay);
printf("[%02.0lf 秒] 读者 #%d 等待读取 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);

sem_wait(&g_stRdFirstLock.stRdCntMutex); // wait-R-START ⭐
if (0 == g_stRdFirstLock.uiRdCnt) { // 第一个读者
sem_wait(&g_stRdFirstLock.stWrMutex); // wait-W ⭐
}
g_stRdFirstLock.uiRdCnt++; // 拿读锁是为了修改共享的读计数变量、释放读锁是为了其它读者可以拿到
sem_post(&g_stRdFirstLock.stRdCntMutex); // post-R-START ⭐

printf("[%02.0lf 秒] 读者 #%d 开始读取 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);
sleep(pstThrAction->uiDuration); // 模拟读取操作(这期间不用持有读锁、只需要有任一读者持有写锁即可)

sem_wait(&g_stRdFirstLock.stRdCntMutex); // wait-R-END ⭐
g_stRdFirstLock.uiRdCnt--; // 拿读锁是为了修改共享的读计数变量、以及最后一个读者释放写锁
if (0 == g_stRdFirstLock.uiRdCnt) { // 最后一个读者
sem_post(&g_stRdFirstLock.stWrMutex); // post-W ⭐
}
sem_post(&g_stRdFirstLock.stRdCntMutex); // post-R-END ⭐

printf("[%02.0lf 秒] 读者 #%d 读取结束 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);

return NULL;
}

int main(int argc, char* argv[]) {
pthread_t thread[MAX_THREAD_NR];
THREAD_ACTION_S stThrAction[MAX_THREAD_NR];
THREAD_ACTION_S stArg; // 用于输入参数的存储
unsigned int uiThrNr = 0;

/* 读者优先锁结构初始化 ⭐ */
g_stRdFirstLock.uiRdCnt = 0;
sem_init(&g_stRdFirstLock.stWrMutex, 0, 1);
sem_init(&g_stRdFirstLock.stRdCntMutex, 0, 1);

/* 从文件读取每个线程的数据 */
while (4 == scanf("%d %c %d %d", &stArg.uiTid, &stArg.cRwType, &stArg.uiDelay, &stArg.uiDuration)) {
stThrAction[uiThrNr].uiTid = stArg.uiTid;
stThrAction[uiThrNr].cRwType = stArg.cRwType;
stThrAction[uiThrNr].uiDelay = stArg.uiDelay;
stThrAction[uiThrNr].uiDuration = stArg.uiDuration;
uiThrNr++;
}

stStartTime = time(NULL);

/* 创建线程,并根据动作类型,绑定线程函数 */
for (int i = 0; i < uiThrNr; i++) {
pthread_create(&thread[i], NULL, 'R' == stThrAction[i].cRwType ? reader : writer, (void*)(&stThrAction[i]));
printf("[%02.0lf 秒] 创建线程 #%d\n", difftime(time(NULL), stStartTime), stThrAction[i].uiTid);
}

/* 等待线程结束 */
for (int i = 0; i < uiThrNr; i++) {
pthread_join(thread[i], NULL);
}

/* 销毁信号量 */
sem_destroy(&g_stRdFirstLock.stRdCntMutex);
sem_destroy(&g_stRdFirstLock.stWrMutex);

return 0;
}

测试分析

上述读者优先代码执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[00 秒] 创建线程 #0
[00 秒] 创建线程 #1
[00 秒] 创建线程 #2
[00 秒] 创建线程 #3
[00 秒] 创建线程 #4
[00 秒] 创建线程 #5
[01 秒] 读者 #0 等待读取
[01 秒] 读者 #0 开始读取
[03 秒] 读者 #1 等待读取
[03 秒] 读者 #1 开始读取
[04 秒] 写者 #2 等待写入
[05 秒] 读者 #3 等待读取
[05 秒] 读者 #3 开始读取
[06 秒] 读者 #4 等待读取
[06 秒] 读者 #4 开始读取
[06 秒] 读者 #0 读取结束
[07 秒] 写者 #5 等待写入
[07 秒] 读者 #3 读取结束
[08 秒] 读者 #1 读取结束
[11 秒] 读者 #4 读取结束
[11 秒] 写者 #2 开始写入
[16 秒] 写者 #2 写入结束
[16 秒] 写者 #5 开始写入
[19 秒] 写者 #5 写入结束

分析

所有线程均在 0 秒时被创建,然后 第一个尝试访问共享资源的是读者 #0,此时所有锁都是释放状态。读者 #0 顺利地拿到计数锁、写锁(禁止写入),开始访问资源,并在第 6 秒结束。

第 3 秒时,读者 #1 尝试访问共享资源,因为它是一个读者,且没有写者在访问共享资源。所以,读者 #1 顺利地开始访问资源,并在第 8 秒结束。

第 4 秒时,写者 #2 尝试访问共享资源,但此时写锁被读者线程持有。所以,写者 #2 开始等待写锁被释放。

第 5 秒时,读者 #3 尝试访问共享资源,因为它是一个读者,且没有写者在访问共享资源。所以,读者 #3 顺利地开始访问资源,并在第 7 秒结束。

第 6 秒时,读者 #4 尝试访问共享资源,因为它是一个读者,且没有写者在访问共享资源。所以,读者 #4 顺利地开始访问资源,并在第 11 秒结束。此时刻,它是最后一个读者(前面没有读者正在访问者共享资源),它负责通知等待在写锁上的线程(恢复写者执行写操作)

第 7 秒时,写者 #5 尝试访问共享资源,但此时写锁被读者线程持有。所以,写者 #5 开始等待锁被释放。

第 11 秒,由于读者 #4 作为最后一个读者,释放了写锁。因此,写者 #2 开始执行写操作,5 秒后写操作结束,写者 #5 继而开始执行写操作,直到结束。

结论

即使写者请求访问共享资源的时间靠前,但由于写者请求时有其它读者正在访问共享资源,并在其访问结束前来了新的读者请求。这时,后来的读者访问资源的时间比先来的写者早,这便是读者优先。

写者优先(非立即访问)管程实现

模型分析

写者优先定义:只要有一个写者处于活跃状态,后来的写者都会被接纳(即使这些写者是在正在等待的读者之后到来的,也会优先让这些写者修改共享资源)。如果写者源源不断的出现,那么读者始终处于阻塞状态(可能会出现饿死)。

例如有以下访问序列:

  • 到达时刻:读者 1 -> 读者 2 -> 写者 a -> 读者 3 -> 写者 b,且写者 b 到达时有写者正在访问共享资源。
  • 访问顺序:读者 1 -> 读者 2 -> 写者 a -> 写者 b -> 读者 3,即首个等待的写者要等正在访问的所有读者结束后才能访问(换句话说,首个等待的写者不能插队、立即访问,但也不会先让后来的读者先访问),但能在写者访问期间优先处理后续等待的写者、滞后读者。

读者写者问题的写者优先可以 抽象为两个方法

  • 读者线程:等待没有写者 -> 读取数据集 -> 检查:唤醒等待的写者。
  • 写者线程:等待没有读者、写者 -> 往数据集写入数据 -> 检查:唤醒等待的读者或写者。

具体的实现原理,可以参考 这篇文章

功能实现

管程方式实现中的数据结构是 WRITE_FIRST_LOCK_S,并基于这个数据结构实现了多个方法:

  • wrFirstLockInit():数据结构初始化;
  • startRead():读取共享资源前的动作;
  • doneRead():读取共享资源后的动作;
  • startWrite():修改共享资源前的动作;
  • doneWrite():修改共享资源后的动作;
  • wrFirstLockDestroy():数据结构销毁。

假设这些方法已经正确实现,那么:

  • 读者线程的一次执行过程就是:startRead() -> 读取共享资源 -> doneRead()
  • 读者线程的一次执行过程就是:startWrite() -> 修改共享资源 -> doneWrite()

特别注意:假如现在前面已经有 100 个读者正在访问着共享资源,此时来了一个写者,接着又来了 10 个读者和 20 个写者。下面实现的代码效果是 要等这 100 个读者访问结束后,这个写者才能访问(不能插队、立即访问);当这个写者访问完后,先让后面那 20 个写者访问,再让那 10 个读者访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#define MAX_THREAD_NR (100)

/* 线程的动作信息:依次为线程标识、类型、动作前等待时间、动作持续时间 */
typedef struct tagThreadAction {
unsigned int uiTid;
char cRwType;
unsigned int uiDelay;
unsigned int uiDuration;
} THREAD_ACTION_S;

typedef struct {
unsigned int uiAr; // active reader count
unsigned int uiAw; // active writer count
unsigned int uiWr; // waiting reader count
unsigned int uiWw; // waiting writer count
pthread_cond_t stOkToRead;
pthread_cond_t stOkToWrite;
pthread_mutex_t stCntMutex;
} WRITE_FIRST_LOCK_S; // ⭐

WRITE_FIRST_LOCK_S g_stWrFirstLock; // 多个线程共享的全局资源 ⭐

time_t stStartTime;

/* 写者优先锁结构初始化 */
void wrFirstLockInit(void) {
g_stWrFirstLock.uiAr = 0;
g_stWrFirstLock.uiAw = 0;
g_stWrFirstLock.uiWr = 0;
g_stWrFirstLock.uiWw = 0;
pthread_cond_init(&g_stWrFirstLock.stOkToRead, NULL);
pthread_cond_init(&g_stWrFirstLock.stOkToWrite, NULL);
pthread_mutex_init(&g_stWrFirstLock.stCntMutex, NULL);
}

/* 销毁条件变量和互斥锁 */
void wrFirstLockDestroy(void) {
pthread_cond_destroy(&g_stWrFirstLock.stOkToRead);
pthread_cond_destroy(&g_stWrFirstLock.stOkToWrite);
pthread_mutex_destroy(&g_stWrFirstLock.stCntMutex);
}

/* 准备读:等待没有写者 */
static void startRead(void) {
WRITE_FIRST_LOCK_S* pstWrFirstLock = &g_stWrFirstLock;

pthread_mutex_lock(&pstWrFirstLock->stCntMutex); // 请求计数锁 ⭐

// 只要有写者(活跃 / 等待),读者就要等待 ⭐
while (pstWrFirstLock->uiAw + pstWrFirstLock->uiWw > 0) {
pstWrFirstLock->uiWr++;
pthread_cond_wait(&pstWrFirstLock->stOkToRead,
&pstWrFirstLock->stCntMutex); // 释放锁,并等待条件变量的唤醒 ⭐
// 某一个等待的读者可以去读取共享资源了(阻塞结束,必定不会有写者),while 循环结束 ⭐
pstWrFirstLock->uiWr--;
}
pstWrFirstLock->uiAr++; // 这个读者将是活跃状态 ⭐

pthread_mutex_unlock(&pstWrFirstLock->stCntMutex); // 释放计数锁(以便其它读者可以一起读)⭐

return;
}

/* 读取结束:检查、(条件满足时)唤醒等待的写者 */
static void doneRead(void) {
WRITE_FIRST_LOCK_S* pstWrFirstLock = &g_stWrFirstLock;

pthread_mutex_lock(&pstWrFirstLock->stCntMutex); // 请求计数锁 ⭐

pstWrFirstLock->uiAr--; // 读完退出,活跃读者减一 ⭐
if (0 == pstWrFirstLock->uiAr && pstWrFirstLock->uiWw > 0) { // 在没有活跃的读者时,才有可能唤醒写者 ⭐
pthread_cond_signal(&pstWrFirstLock->stOkToWrite);
}

pthread_mutex_unlock(&pstWrFirstLock->stCntMutex); // 释放计数锁 ⭐

return;
}

/* 准备写:等待没有读者和写者 */
static void startWrite(void) {
WRITE_FIRST_LOCK_S* pstWrFirstLock = &g_stWrFirstLock;

pthread_mutex_lock(&pstWrFirstLock->stCntMutex); // 请求计数锁 ⭐

// 有活跃的写者或读者,新写者必须等待 ⭐
while (pstWrFirstLock->uiAw + pstWrFirstLock->uiAr > 0) {
pstWrFirstLock->uiWw++;
pthread_cond_wait(&pstWrFirstLock->stOkToWrite,
&pstWrFirstLock->stCntMutex); // 释放锁,并等待条件变量的唤醒 ⭐
pstWrFirstLock->uiWw--; // 新写者可以去修改共享资源了,while 循环结束 ⭐
}
pstWrFirstLock->uiAw++; // 新写者将是活跃状态 ⭐

// 释放计数锁(不会出现后续写者也能写,因为上面的 while 将会出现 Aw>0,新写者会被阻塞)⭐
pthread_mutex_unlock(&pstWrFirstLock->stCntMutex);

return;
}

/* 写结束:检查、(条件满足时)唤醒等待的写者(优先)或所有读者 */
static void doneWrite(void) {
WRITE_FIRST_LOCK_S* pstWrFirstLock = &g_stWrFirstLock;

pthread_mutex_lock(&pstWrFirstLock->stCntMutex); // 请求计数锁 ⭐

pstWrFirstLock->uiAw--; // 写完退出,活跃写者减一 ⭐
if (pstWrFirstLock->uiWw > 0) { // 优先唤醒等待的写者,其次广播唤醒所有等待的读者 ⭐
pthread_cond_signal(&pstWrFirstLock->stOkToWrite);
} else if (pstWrFirstLock->uiWr > 0) {
pthread_cond_broadcast(&pstWrFirstLock->stOkToRead);
}

pthread_mutex_unlock(&pstWrFirstLock->stCntMutex); // 释放计数锁 ⭐

return;
}

void* reader(void* arg) {
THREAD_ACTION_S* pstThrAction = (THREAD_ACTION_S*)arg;

sleep(pstThrAction->uiDelay);
printf("[%02.0lf 秒] 读者 #%d 等待读取 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);
startRead(); // ⭐

printf("[%02.0lf 秒] 读者 #%d 开始读取 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);
sleep(pstThrAction->uiDuration); // 模拟读取操作

doneRead(); // ⭐
printf("[%02.0lf 秒] 读者 #%d 读取结束 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);

return NULL;
}

void* writer(void* arg) {
THREAD_ACTION_S* pstThrAction = (THREAD_ACTION_S*)arg;

sleep(pstThrAction->uiDelay);
printf("[%02.0lf 秒] 写者 #%d 等待写入 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);
startWrite(); // ⭐

printf("[%02.0lf 秒] 写者 #%d 开始写入 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);
sleep(pstThrAction->uiDuration); // 模拟写入操作

doneWrite(); // ⭐
printf("[%02.0lf 秒] 写者 #%d 写入结束 \n", difftime(time(NULL), stStartTime), pstThrAction->uiTid);

return NULL;
}

int main(int argc, char* argv[]) {
pthread_t thread[MAX_THREAD_NR];
THREAD_ACTION_S stThrAction[MAX_THREAD_NR];
THREAD_ACTION_S stArg; // 用于输入参数的存储
unsigned int uiThrNr = 0;

wrFirstLockInit();
// 同上一个程序,省略...
wrFirstLockDestroy();

return 0;
}

测试分析

还是使用上面的模拟数据,上述写者优先代码执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[01 秒] 读者 #0 等待读取
[01 秒] 读者 #0 开始读取
[03 秒] 读者 #1 等待读取
[03 秒] 读者 #1 开始读取
[04 秒] 写者 #2 等待写入
[05 秒] 读者 #3 等待读取
[06 秒] 读者 #0 读取结束
[06 秒] 读者 #4 等待读取
[07 秒] 写者 #5 等待写入
[08 秒] 读者 #1 读取结束
[08 秒] 写者 #2 开始写入
[13 秒] 写者 #2 写入结束
[13 秒] 写者 #5 开始写入
[16 秒] 写者 #5 写入结束
[16 秒] 读者 #4 开始读取 // 因为广播唤醒,这里也可能是读者 #3 先读取
[16 秒] 读者 #3 开始读取
[18 秒] 读者 #3 读取结束
[21 秒] 读者 #4 读取结束

分析

所有线程均在 0 秒时被创建,然后 第一个尝试访问共享资源的是读者 #0,此时所有锁都是释放状态。读者 #0 顺利地拿到计数锁,开始访问资源,并在第 6 秒结束。

第 4 秒时,写者 #2 尝试访问共享资源,但此时 有活跃的读者 #0, #1。所以,写者 #2 释放锁并等待条件变量的唤醒。

第 5, 6 秒时,读者 #3, #4 尝试访问共享资源,因为 有等待的写者 #2。所以,读者 #3, #4 也必须停下来等待

第 7 秒时,写者 #5 尝试访问共享资源,但此时 有活跃的读者 #1。所以,写者 #5 释放锁并等待条件变量的唤醒。

第 8 秒时,读者 #1 离开共享资源,并 唤醒等待在写条件变量上的写者线程 。这时 按照写者优先 + FIFO 队列,依次是写者 #2, #5 访问共享资源。

第 16 秒时,没有写者、但有多个等待的读者。此时,通过广播唤醒所有读者,但只有一把锁,谁先拿到谁先访问共享资源(但真正开始访问共享资源时,会释放锁——这时锁已经可以被其它读者线程持有了,所有第 16 秒时一前一后有两个读者线程开始访问共享资源)。

结论

即使读者(#3, #4)请求访问共享资源的时间靠前,但由于读者请求时有其它写者(#2)正在等待或访问共享资源,并在其访问结束前来了新的写者(#5)请求。这时,后来的写者(#5)访问资源的时间比先来的读者(#3, #4)早,这便是写者优先、但写者不能插队以求立即访问

抛出一个问题?如何达到真正的写者优先——允许写者插队以立即访问?

如果能 保持最多一个(而不是多个)读者在同时 访问共享资源,就能实现写者立即访问。这就需要加锁来约束读者只能一个、一个地访问共享资源。

写者优先(立即访问)管程实现

保持最多一个(而不是多个)读者在同时访问共享资源,从而实现写者立即访问。

改造代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typedef struct {
unsigned int uiAr, uiAw, uiWr, uiWw;
pthread_cond_t stOkToRead, stOkToWrite;
pthread_mutex_t stCntMutex;
sem_t stHasReader; // 是否有读者正在访问共享资源 ⭐
} WRITE_FIRST_LOCK_S;

void wrFirstLockInit(void) {
// ...
sem_init(&g_stWrFirstLock.stHasReader, 0, 1);
}

void wrFirstLockDestroy(void) {
// ...
sem_destroy(&g_stWrFirstLock.stHasReader);
}

static void startRead(void) {
WRITE_FIRST_LOCK_S* pstWrFirstLock = &g_stWrFirstLock;

sem_wait(&pstWrFirstLock->stHasReader); // 有读者正在访问则等待 ⭐
// ...
}

static void doneRead(void) {
// ...
sem_post(&pstWrFirstLock->stHasReader); // 唤醒可能等待访问的读者 ⭐
return;
}

这样,就约束了同一时间只有一个读者可以访问。其访问结束后,若有等待读者和写者,别管谁先来的,写者会先访问共享资源(即立即访问)。

测试分析

还是使用上面的模拟数据,上述写者优先(立即访问)代码执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[01 秒] 读者 #0 等待读取
[01 秒] 读者 #0 开始读取
[03 秒] 读者 #1 等待读取
[04 秒] 写者 #2 等待写入
[05 秒] 读者 #3 等待读取
[06 秒] 读者 #4 等待读取
[06 秒] 读者 #0 读取结束
[06 秒] 写者 #2 开始写入
[07 秒] 写者 #5 等待写入
[11 秒] 写者 #2 写入结束
[11 秒] 写者 #5 开始写入
[14 秒] 写者 #5 写入结束
[14 秒] 读者 #4 开始读取 // 因为广播唤醒,这里可能是读者 #4/#3/#1 先读取
[19 秒] 读者 #4 读取结束
[19 秒] 读者 #3 开始读取
[21 秒] 读者 #3 读取结束
[21 秒] 读者 #1 开始读取
[26 秒] 读者 #1 读取结束

读写公平管程实现

模型分析

读写公平定义 :读操作可以同时进行,多个读操作之间不会互相阻塞, 读、写操作按到达顺序访问共享资源

例如有以下访问序列:

  • 到达时刻:读者 1 -> 读者 2 -> 写者 a -> 读者 3 -> 写者 b -> 写者 c -> 读者 4
  • 访问顺序:读者 1 -> 读者 2 -> 写者 a -> 读者 3 -> 写者 b -> 写者 c -> 读者 4

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
typedef struct {
unsigned int uiRdCount;
pthread_mutex_t stCntMutex;
pthread_mutex_t stOkToWrite;
sem_t stWantVisit; // 访问者想访问共享资源
} RW_FAIR_LOCK_S; // ⭐

RW_FAIR_LOCK_S g_stRwFairLock;

time_t stStartTime;

void rwFairLockInit(void) {
g_stRwFairLock.uiRdCount = 0;
pthread_mutex_init(&g_stRwFairLock.stCntMutex, NULL);
pthread_mutex_init(&g_stRwFairLock.stOkToWrite, NULL);
sem_init(&g_stRwFairLock.stWantVisit, 0, 1);
}

void rwFairLockDestroy(void) {
pthread_mutex_destroy(&g_stRwFairLock.stCntMutex);
pthread_mutex_destroy(&g_stRwFairLock.stOkToWrite);
sem_destroy(&g_stRwFairLock.stWantVisit);
}

static void startRead(void) {
RW_FAIR_LOCK_S* pstRwFairLock = &g_stRwFairLock;

sem_wait(&pstRwFairLock->stWantVisit); // 有访问者正在访问则等待 ⭐
pthread_mutex_lock(&pstRwFairLock->stCntMutex);
if (0 == pstRwFairLock->uiRdCount) {
pthread_mutex_lock(&pstRwFairLock->stOkToWrite);
}
pstRwFairLock->uiRdCount++;
pthread_mutex_unlock(&pstRwFairLock->stCntMutex);
sem_post(&pstRwFairLock->stWantVisit); // 唤醒一个等待访问的访问者 ⭐

return;
}

static void doneRead(void) {
RW_FAIR_LOCK_S* pstRwFairLock = &g_stRwFairLock;

pthread_mutex_lock(&pstRwFairLock->stCntMutex);
pstRwFairLock->uiRdCount--;
if (0 == pstRwFairLock->uiRdCount) {
pthread_mutex_unlock(&pstRwFairLock->stOkToWrite);
}
pthread_mutex_unlock(&pstRwFairLock->stCntMutex);

return;
}

static void startWrite(void) {
RW_FAIR_LOCK_S* pstRwFairLock = &g_stRwFairLock;

sem_wait(&pstRwFairLock->stWantVisit); // 先表明访问想法,从而阻塞后续的读者 ⭐
pthread_mutex_lock(&pstRwFairLock->stOkToWrite);

return;
}

static void doneWrite(void) {
RW_FAIR_LOCK_S* pstRwFairLock = &g_stRwFairLock;

pthread_mutex_unlock(&pstRwFairLock->stOkToWrite);
sem_post(&pstRwFairLock->stWantVisit); // 唤醒一个等待访问的访问者 ⭐

return;
}

测试分析

还是使用上面的模拟数据,上述读写公平代码执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
[01 秒] 读者 #0 等待读取
[01 秒] 读者 #0 开始读取
[03 秒] 读者 #1 等待读取
[03 秒] 读者 #1 开始读取
[04 秒] 写者 #2 等待写入
[05 秒] 读者 #3 等待读取
[06 秒] 读者 #0 读取结束
[06 秒] 读者 #4 等待读取
[07 秒] 写者 #5 等待写入
[08 秒] 读者 #1 读取结束
[08 秒] 写者 #2 开始写入
[13 秒] 写者 #2 写入结束
[13 秒] 读者 #3 开始读取 // 按 FIFO 顺序访问
[13 秒] 读者 #4 开始读取
[15 秒] 读者 #3 读取结束
[18 秒] 读者 #4 读取结束
[18 秒] 写者 #5 开始写入
[21 秒] 写者 #5 写入结束

读者写者实现比较

读者优先 写者优先(不可插队) 写者优先(立即访问) 读写公平
[01 秒] 读者 #0 等待读取 [01 秒] 读者 #0 等待读取 [01 秒] 读者 #0 等待读取 [01 秒] 读者 #0 等待读取
[01 秒] 读者 #0 开始读取 [01 秒] 读者 #0 开始读取 [01 秒] 读者 #0 开始读取 [01 秒] 读者 #0 开始读取
[03 秒] 读者 #1 等待读取 [03 秒] 读者 #1 等待读取 [03 秒] 读者 #1 等待读取 [03 秒] 读者 #1 等待读取
[03 秒] 读者 #1 开始读取 [03 秒] 读者 #1 开始读取 [04 秒] 写者 #2 等待写入 [03 秒] 读者 #1 开始读取
[04 秒] 写者 #2 等待写入 [04 秒] 写者 #2 等待写入 [05 秒] 读者 #3 等待读取 [04 秒] 写者 #2 等待写入
[05 秒] 读者 #3 等待读取 [05 秒] 读者 #3 等待读取 [06 秒] 读者 #4 等待读取 [05 秒] 读者 #3 等待读取
[05 秒] 读者 #3 开始读取 [06 秒] 读者 #0 读取结束 [06 秒] 读者 #0 读取结束 [06 秒] 读者 #0 读取结束
[06 秒] 读者 #4 等待读取 [06 秒] 读者 #4 等待读取 [06 秒] 写者 #2 开始写入 [06 秒] 读者 #4 等待读取
[06 秒] 读者 #4 开始读取 [07 秒] 写者 #5 等待写入 [07 秒] 写者 #5 等待写入 [07 秒] 写者 #5 等待写入
[06 秒] 读者 #0 读取结束 [08 秒] 读者 #1 读取结束 [11 秒] 写者 #2 写入结束 [08 秒] 读者 #1 读取结束
[07 秒] 写者 #5 等待写入 [08 秒] 写者 #2 开始写入 [11 秒] 写者 #5 开始写入 [08 秒] 写者 #2 开始写入
[07 秒] 读者 #3 读取结束 [13 秒] 写者 #2 写入结束 [14 秒] 写者 #5 写入结束 [13 秒] 写者 #2 写入结束
[08 秒] 读者 #1 读取结束 [13 秒] 写者 #5 开始写入 [14 秒] 读者 #4 开始读取 [13 秒] 读者 #3 开始读取
[11 秒] 读者 #4 读取结束 [16 秒] 写者 #5 写入结束 [19 秒] 读者 #4 读取结束 [13 秒] 读者 #4 开始读取
[11 秒] 写者 #2 开始写入 [16 秒] 读者 #4 开始读取 [19 秒] 读者 #3 开始读取 [15 秒] 读者 #3 读取结束
[16 秒] 写者 #2 写入结束 [16 秒] 读者 #3 开始读取 [21 秒] 读者 #3 读取结束 [18 秒] 读者 #4 读取结束
[16 秒] 写者 #5 开始写入 [18 秒] 读者 #3 读取结束 [21 秒] 读者 #1 开始读取 [18 秒] 写者 #5 开始写入
[19 秒] 写者 #5 写入结束 [21 秒] 读者 #4 读取结束 [26 秒] 读者 #1 读取结束 [21 秒] 写者 #5 写入结束