1. 信号量

并发问题:

  1. 多线程并发导致资源竞争

同步概念:

  1. 协调多线程对共享数据的访问
  2. 任何时刻只能有一个线程执行临界区代码

确保同步正确的方法:

  1. 底层硬件支持
  2. 高层次的编程抽象

1

信号量和锁机制是同一层次的同步机制

信号量是操作系统提供的一种协调共享资源访问的方法

  • 软件同步是平等线程间的一种同步协商机制
  • OS是管理者,地位高于进程
  • 用信号量来表示系统资源的数量

信号是一种抽象的数据类型

  • 由一个整形(sem)变量和两个原子操作组成

$P()$:sem减1(sem < 0, 进入等待,否则继续)

$V()$:sem加1(sem <= 0,唤醒一个等待进程)

信号量与铁路的类比

2 个站台的车站
2个资源的信号量

2

特性:

信号量是被保护的整数数量

  • 初始化完成后,只能通过$P()$和$V()$操作修改
  • 由操作系统保证,PV操作是原子操作

$P()$可能阻塞和$V()$不会阻塞

信号量等待按先进先出排队,而自旋锁是不能实现先进先出(查空则进)。

1.1 信号量的实现

3个主要实现:定义这个信号量类,$P()$和$V()$

3

2. 信号量的使用

信号量分类:

  1. 二进制信号量:0 或 1
  2. 资源信号量:任何非负值
  3. 两者等价

2.1 用信号量实现临界区的互斥访问

每类资源设置一个信号量,其初值为1

1
mutex = new Semaphore(1);
1
2
3
mutex->P();
Critical Section
mutex->V();

注意:

  • 必须成对使用$P()$和$V()$操作

PV操作顺序不能颠倒

2.2 用信号量实现条件同步

条件同步设置一个信号量,其初值为0

1
condition = new Semaphore(0);

4

生产者-消费者问题

5

  • 一个或多个生产者在生成数据后放在缓存区
  • 单个消费者从缓存区取出数据处理
  • 任何时刻只能有一个生产者or消费者访问缓存区

问题分析:

  1. 任何时刻只能有一个线程操作缓冲区
  2. 缓存区空时,消费者必须等生产者
  3. 缓存区满时,生产者必须等消费者

用信号量描述每个约束

  1. 二进制信号量mutex
  2. 资源信号量 fullBuffers
  3. 资源信号量:emptyBuffers

代码实现:

消费者看生产者:
6

生产者看消费者:
7

整体:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
var items = 0, space = 10, mutex = 1;
var in = 0, out = 0;
item buf[10] = { NULL };

producer {
while( true ) {
wait( space ); // 等待缓冲区有空闲位置, 在使用PV操作时,条件变量需要在互斥锁之前
wait( mutex ); // 保证在product时不会有其他线程访问缓冲区

// product
buf.push( item, in ); // 将新资源放到buf[in]位置
in = ( in + 1 ) % 10;

signal( mutex ); // 唤醒的顺序可以不同
signal( items ); // 通知consumer缓冲区有资源可以取走
}
}

consumer {
while( true ) {
wait( items ); // 等待缓冲区有资源可以使用
wait( mutex ); // 保证在consume时不会有其他线程访问缓冲区

// consume
buf.pop( out ); // 将buf[out]位置的的资源取走
out = ( out + 1 ) % 10;

signal( mutex ); // 唤醒的顺序可以不同
signal( space ); // 通知缓冲区有空闲位置
}
}

不能将线程里两个wait的顺序调换否则会出现死锁。例如(调换后),将consumer的两个wait调换,在producer发出signal信号后,如果producer线程此时再次获得运行机会,执行完了wait(space),此时,另一个consumer线程获得运行机会,执行了 wait(mutex) ,如果此时缓冲区为空,那么consumer将会阻塞在wait(items),而producer也会因为无法获得锁的所有权所以阻塞在wait(mutex),这样两个线程都在阻塞,也就造成了死锁。

C++代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
#include <iostream>
#include <string.h>
#include <pthread.h>
#include <unistd.h>
using namespace std;

int current = 0; // producer运行加1,consumer运行减1
int buf[10];
int in = 0, out = 0;
int items = 0, spaces = 10;
bool flag; // 标记线程结束运行
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t notfull = PTHREAD_COND_INITIALIZER; // 缓冲区不满
pthread_cond_t notempty = PTHREAD_COND_INITIALIZER; // 缓冲区不空

void *producer( void *arg ) {
while( flag ) {
pthread_mutex_lock( &mutex ); // 为保证条件变量不会因为多线程混乱,所以先加锁
while( !spaces ) { // 避免“惊群”效应,避免因其他线程实现得到事件而导致该线程“假醒”
pthread_cond_wait( &notfull, &mutex );
}
buf[in] = current++;
in = ( in + 1 ) % 10;
items++;
spaces--;

printf( "producer %zu , current = %d\n", pthread_self(), current );
for( int i = 0; i < 10; i++ ) {
printf( "%-4d", buf[i] );
}
printf( "\n\n" );

pthread_cond_signal( &notempty );
pthread_mutex_unlock( &mutex );
}
pthread_exit( NULL );
}

void *consumer( void *arg ) {
while( flag ) {
pthread_mutex_lock( &mutex );
while( !items ) {
pthread_cond_wait( &notempty, &mutex );
}
buf[out] = -1;
out = ( out + 1 ) % 10;
current--;
items--;
spaces++;

printf( "consumer %zu , current = %d\n", pthread_self(), current );
for( int i = 0; i < 10; i++ ) {
printf( "%-4d", buf[i] );
}
printf( "\n\n" );

pthread_cond_signal( &notfull );
pthread_mutex_unlock( &mutex );
}
pthread_exit( NULL );
}

int main() {
memset( buf, -1, sizeof(buf) );
flag = true;
pthread_t pro[10], con[10];
int i = 0;

for( int i = 0; i < 10; i++ ) {
pthread_create( &pro[i], NULL, producer, NULL );
pthread_create( &con[i], NULL, consumer, NULL );
}

sleep(1); // 让线程运行一秒
flag = false;

for( int i = 0; i < 10; i++ ) {
pthread_join( pro[i], NULL );
pthread_join( con[i], NULL );
}

return 0;
}

参考链接:【操作系统】生产者消费者问题

P,V操作的顺序是不能换的!

缺点:

  1. 读/写代码比较困难
  2. 容易出错(忘记释放信号量)
  3. 不能够处理死锁问题

3. 管程

8

管程是一种用于多线程互斥访问共享资源的程度结构

  1. 采用面向对象方法,简化了线程间的同步控制
  2. 任一时刻最多只有一个线程执行管程代码
  3. 正在管程中的线程可临时放弃管程的互斥访问,等待事件出现时恢复

管程的使用:

  1. 在对象中,收集相关共享数据
  2. 定义共享方法

管程的组成:

  1. 一个锁:控制管程代码的互斥访问
  2. 0个或多个条件变量

9

3.1 条件变量

条件变量:管程内的等待机制

每个条件变量表示一种等待原因,对应一个等待队列

Wait()

  1. 将自己阻塞在等待队列中
  2. 唤醒一个等待者or释放管程的互斥访问

Signal()

  1. 将等待队列中的一个线程唤醒
  2. 如果等待队列为空,相当于空操作

条件变量 - 代码实现:

10

3.2 用信号量解决生产者-消费者问题

11

在管程中,把P/V操作集成到一个模块里

3.3 条件变量的释放处理方式

Hansen管程 / Hoare管程

12

优先角度看,T1合理

Hansen:真实系统,java
Hoare:教科书

13

4. 哲学家就餐问题

问题描述:

14

方案1:

信号量实现

1
2
3
4
5
6
7
8
9
10
11
12
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
void philosopher(int i) // 哲学家编号:0...4
while(TRUE)
{
think(); // 思考
p(fork[i]); // 拿左边叉子
p(fork[(i + 1) % N]); // 拿右边叉子
eat(); // 吃
V(fork[i]); // 放下左边叉子
V(fork[(i + 1) % N]); // 放下右边叉子
}

当大家一起拿左边的叉子时,每个人都只拿到一个,永远拿不到右边的,导致死锁,gg。

方案2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
semaphore mutex; // 互斥信号量,初值1
void philosopher(int i) // 哲学家编号:0...4
while(TRUE)
{
think(); // 思考
P(mutex); // 进入临界区
p(fork[i]); // 拿左边叉子
P(fork[(i + 1) % N]); // 拿右边叉子
eat(); // 吃
V(fork[i]); // 放下左边叉子
V(fork[(i + 1) % N]); // 放下右边叉子
V(mutex); // 退出临界区
}

互斥访问正确,but,效率很低

方案3:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#define N 5 // 哲学家个数
semaphore fork[5]; // 信号量初值为1
void philosopher(int i) // 哲学家编号:0...4
while(TRUE)
{
think(); // 思考
if(i % 2 == 0) {
p(fork[i]); // 拿左边叉子
P(fork[(i + 1) % N]); // 拿右边叉子
} else {
P(fork[(i + 1) % N]); // 拿右边叉子
p(fork[i]); // 拿左边叉子
}
eat(); // 吃
V(fork[i]); // 放下左边叉子
V(fork[(i + 1) % N]); // 放下右边叉子
}

没有死锁,可有多人同时就餐

5. 读者-写者问题

共享数据的两类使用者:

  1. 读者:只读取数据,不修改 (可同时)
  2. 写者:读取和修改数据 (不可同时)

三条原则:

  1. 读-读允许
  2. 读-写互斥
    1. 没有写者才能读
    2. 没有读者才能写
  3. 谢谢互斥

信号量解决方法:

  1. 信号量:WriteMutex(控制读写操作的互斥,初始为1)
  2. 读者计数:Rcount(读者数量,初始为0)
  3. 信号量:CountMutex(控制读者计数的互斥,初始为1)

15

这是能满足前面的3条原则。

优先策略:

读者优先策略:

  1. 只要有读者正在读状态,后来的读者都能直接进入
  2. 如果读者持续进入,写者处于饥饿

写者优先策略:

  1. 只要写者就绪,应尽快执行写操作
  2. 如果写者持续不断就绪,读者处于饥饿

管程解决方法:

16

读者:

17

写者:

18

评论