16 - Concurrency control: synchronization (2)

16 - 并发控制:同步(2)

同步实现了 happens-before 的关系,使程序能确保到达一个确定的 “已知简单状态”,从而实现并发控制。在同步的过程中,自然存在的 ”同步达成条件“ 带来了条件变量机制。

本讲内容:E. W. Dijkstra 发明的另一种共享内存系统中常用的同步方法:信号量。

同步:实现 Happen - before

“条件达成前” → “条件达成后”

  • 23:59:59 大活门口不见不散

    • 存在状态,两人同时到达大活门口且未进行下一步动作

  • 生产 (对象创建) → 消费 (对象释放)

    • 存在状态,对象已经创建,但还未被释放

  • 线程结束 → join 返回

    • 存在状态,线程全部结束且后续代码(join 之后的代码)还未开始

于是有了 “条件变量”

  • 首先是一把大锁!然后再拓展

模板!

  • 等待条件满足:while (!cond) wait(cv, lk);

  • 条件可能满足:broadcast(cv) ;

互斥:也实现了 Happens-before

Release → Acquire

void lock() {
    std::unique_lock<std::mutex> lk(mtx);
    cv.wait(lk, []{ return !lock_held; });
    lock_held = true;
}
 
void unlock() {
    std::lock_guard<std::mutex> lk(mtx);
    lock_held = false;
    cv.notify_one();  // 或 cv.notify_all()
}
  • 只需要互斥的场景 (例如 mymalloc),mutex lock 效率更高

1. 信号量

互斥锁来实现主进程和子进程的 Happens before...

  1. 在一开始 lock(L);

  2. 主进程希望 lock(L); 实现返回,但是一直失败

  3. 直到随后 在子进程 unlock(L);

  4. 主进程希望 lock(L); 实现返回,这时候成功

当然,这个 hack 是 undefined behavior,只作课堂展示,不能在生产环境使用。

实际上只能在同一个进程 lock 和 unlock 同一把锁

1.1. 互斥锁实现精确唤醒

一个奇妙的想法

  • 创建锁时,立即 “获得” 它 (总是成功)

  • 其他人想要获得时就会等待

    • 此时 release 就实现了 happens-before

  • 一个线程上锁,在另一个线程解锁

让我们来试一试吧 (demo)

  • 先把厕所门都锁上

  • 线程到达以后等待

  • 管理员把所有门都打开

1.2. 用互斥锁实现计算图

Acquire-Release 实现计算图

  • 为每一条边 e=(u,v)e=(u,v) 分配一个互斥锁

  • 初始时,全部处于锁定状态

  • 对于一个节点,它需要获得所有入边的锁才能继续

    • 可以直接计算的节点立即开始计算

  • 计算完成后,释放所有出边对应的锁

挺好用 (试试 orchestra 的实现)

  • 甚至比条件变量还好用嘞!

    • This is undefined behavior

1.3. 本质:“Release as Synchronization”

Release-Acquire 实现了 happens-before

  • Acquire = 等待信号

  • Release = 发出信号

    • (这不是 UNIX 的 signal...)

“信号” 可以理解为现实生活中的 “资源许可”

  • 共享,资源,凭 (信号) 进入

    • 游泳馆:有空位获得手环进入更衣室,否则排队等待

    • 餐厅:有空桌可以直接进入,否则取号等待

    • 停车场:有空车位可以直接进场,否则排队

      • 这是一大类同步问题

1.4. 适合处理:计数型的同类资源

为资源实现 acquire 和 release 操作

  • Acquire: 在有车位时进入停车场

  • Release: 出停车场;车位 + 1

1.5. 计数型的同类资源:实现同步

void enter_parking() {
    // 锁
    mutex_lock(&lk);

    // 进入parking lot的判断...
    while (!(in_parking < capacity)) {
        cond_wait(&cv, &lk);
    }
    in_parking++;  // 这个时候我已经 “进入” 了
    // 解锁
    mutex_unlock(&lk);
}

void leave_parking() {
    mutex_lock(&lk);
    in_parking--;
    cond_broadcast(&cv);  // 退出以后,可能有人能进入
    mutex_unlock(&lk);
}
  • 这是生产者-消费者呀

    • 确保进入前存在一个 “空位” (count < capacity) 就行

1.6. 计数型的同类资源:实现同步另一种方法

void consume_parking() {  // enter_parking
    mutex_lock(&lk);
    
    while (!(parking_spot > 0)) {
        cond_wait(&cv, &lk);
    }
    count--;
    
    mutex_unlock(&lk);
}

void magic_parking() {  // leave_parking
    mutex_lock(&lk);
    
    parking_spot++;
    
    cond_broadcast(&cv);
    mutex_unlock(&lk);
}
  • “空位” 和 “占用” 是相对的

    • 停车 = “吃掉” 车位;离开 = “创造” 车位 (允许凭空创造)

    • 转变为 PC Model

1.7. 于是你发明了“信号量”!

void P(sem_t *sem) {
    // Prolaag - try + decrease/down/wait/acquire
    mutex_lock(&sem->lk);
    while (!(sem->count > 0)) {
        cond_wait(&sem->cv, &sem->lk);
    }
    sem->count--;  // 消耗一个信号 (车位/手环/...)
    mutex_unlock(&sem->lk);
}

void V(sem_t *sem) {
    // Verhoog - increase/up/post/signal/release
    mutex_lock(&sem->lk);
    sem->count++;  // 创建一个信号 (车位/手环/...)
    cond_broadcast(&sem->cv);
    mutex_unlock(&sem->lk);
}

1.8. 我们可以把信号量当互斥锁使用

sem_t sem = SEM_INIT(1);

void lock() {
    P(&sem);
}

void unlock() {
    V(&sem);
}
  • 回到了我们开头的代码

    • 允许在不同的线程的 release/acquire

    • “信号量是扩展的互斥锁”

2. 用 “互斥” 实现同步

信号量:应用

2.1. 信号量的两种典型应用

1. 实现一次临时的 happens-before: A→BA→B

  • AV(s)→P(s)→B

    • 这就是刚才的 “互斥锁实现同步”

2. 管理计数型资源

  • 游泳池里的人不能超过 n

  • 停车场只有 n 个车位

  • 餐厅只有 n

    • 当然,可以分成大桌、中桌、小桌

2.2. 例子:线程 join()

1. 实现 happens-before

  • worker: V(donet)V(donet)

  • main: P(done1)→P(done2)…→P(doneT)P(done1)→P(done2)…→P(doneT)

  • 描述了一个 “计算图”

2. 实现计数型资源管理

  • worker: V(done)V(done)

    • worker 结束时释放一个手环

  • main: P(done)×TP(done)×T

    • 管理员收齐所有手环后可以关闭游泳池

2.3. 例子:优雅实现生产者-消费者

// 固定大小的缓冲区
sem_t empty = SEM_INIT(depth);
sem_t fill = SEM_INIT(0);

void T_produce() {
    P(&empty);
    printf("(");
    V(&fill);
}

void T_consume() {
    P(&fill);
    printf(")");
    V(&empty);
}
  • 从 empty 袋子里拿球 → produce (然后把球放到 fill)

  • 从 fill 袋子里拿球 → consume (然后把球放回 empty)

3. 信号量、条件变量 与 同步

3.1. 信号量 v.s. 条件变量

信号量

  • 干净、优雅,完美地解决了生产者-消费者问题

  • 但 “count” 不总是能很好地代表同步条件

条件变量

  • 万能:适用于任何同步条件

  • 丑陋:代码总感里有什么脏东西 (spin loop)

synchronized (obj) {  // reads better in Java
    while (!cond) {
        obj.wait();
    }
    ...
}

3.2. 让我们上一点难度

Fish

  • 保证打印出 <><_><>_ 的序列?

一个小困难

  • 假设刚打印完一条完整的鱼

    • <><_><>_

  • <> 都是可行的

    • 信号量似乎不太好表达 “二选一”

      • 作为一个信号量,难以做出决策

    • 除非手动构建 happens-before

3.3. 面对更复杂的同步问题...

学家吃饭问题 (E. W. Dijkstra, 1960)

  • 哲学家 (线程) 有时思考,有时吃饭

  • 吃饭需要同时得到左手和右手的叉子

3.4. 尝试

条件变量

  • 同步条件:avail[lhs] && avail[rhs]

  • 背模板即可

    • (期末考试 100% 会考,这就是通关密码)

信号量

  • P(&sem[lhs]) && P(&sem[rhs])

  • 看起来没什么问题?

    • 当互斥锁用就行了

    • 很有可能变成死锁!!!

可能存在的 并发情况

#include <thread.h>
#include <thread-sync.h>

#define N 5

sem_t table;
sem_t avail[N];

void Tphilosopher(int id)
{
    int lhs = (id + N - 1) % N;
    int rhs = id % N;

    while (1)
    {
        // Come to table
        // P(&table);

        P(&avail[lhs]);
        printf("+ %d by T%d\n", lhs, id);
        P(&avail[rhs]);
        printf("+ %d by T%d\n", rhs, id);

        // Eat.
        // Philosophers are allowed to eat in parallel.

        printf("- %d by T%d\n", lhs, id);
        printf("- %d by T%d\n", rhs, id);
        V(&avail[lhs]);
        V(&avail[rhs]);

        // Leave table
        // V(&table);
    }
}

int main()
{
    // SEM_INIT(&table, N - 1);

    for (int i = 0; i < N; i++)
    {
        SEM_INIT(&avail[i], 1);
    }

    for (int i = 0; i < N; i++)
    {
        create(Tphilosopher);
    }
}
......
......
+ 3 by T4
+ 2 by T2
- 1 by T2
- 2 by T2
+ 4 by T4
- 3 by T4
- 4 by T4
+ 3 by T4
+ 4 by T4
- 3 by T4
- 4 by T4
+ 3 by T4
+ 4 by T4
- 3 by T4
- 4 by T4
+ 3 by T4
+ 4 by T4
- 3 by T4
- 4 by T4
+ 3 by T4
+ 4 by T4
- 3 by T4
- 4 by T4
+ 2 by T3
+ 4 by T5
+ 3 by T4
+ 1 by T2
^C
...

陷入死锁,所有哲学家可能同时拿到了左边的叉子,都在等待右边的叉子...

可以加一个 table 的 信号...

这时候可以安全吃饭...

或者始终先拿编号小的,大的不拿,这样会有一个人一把都不拿!

3.5. 成功的尝试:信号量

如果 5 个哲学家同时举起左手的叉子……

  • 我们需要禁止这件事发生

Workaround 1: 从桌子上赶走一个人

  • 直观理解:大家先从桌上退出

    • 袋子里有 4 张卡

    • 拿到卡的可以上桌吃饭 (拿叉子)

    • 吃完以后把卡归还到袋子

Workaround 2: Lock Ordering

  • 给叉子编号,总是先拿编号小的

现实的程序设计总是更复杂的!

3.6. 不!这不成功!

信号量不总是 “优雅”

  • Systems 要的是 absolutely correct 的方案

  • 数值型资源不总是能很好地代表同步条件

3.7. 信号量实现条件变量 - Spicy

Implementing condition variables out of a simple primitive like semaphores is surprisingly tricky. (from a 2003 report)

void wait(cond_t *cv, mutex_t *mutex) {
    atomic_inc(&cv->nwait); // 原子操作,增加等待线程计数
    mutex_unlock(mutex);    // 释放互斥锁,允许其他线程进入临界区

    // 这里可能会有 boardcast
    
    P(&cv->sleep);          // 等待信号量,线程进入休眠状态
    mutex_lock(mutex);      // 再次加锁,准备继续执行
}

void broadcast(cond_t *cv) {
    mutex_lock(&cv->lock);  // 锁定条件变量的锁,确保操作的原子性
    for (int i = 0; i < cv->nwait; i++)
        V(&cv->sleep);      // 释放信号量,唤醒所有等待的线程
    cv->nwait = 0;          // 重置等待线程计数
    mutex_unlock(&cv->lock); // 解锁
}
  • 唤醒丢失: 一个 “早就 wait 但没有 P” 的线程会抢走唤醒

3.8. 实现困难的本质原因 - Spicy

先释放锁,再执行 P

  • 释放锁的一瞬间可能与 broadcast 并发

先执行 P,再释放锁

  • P(&cv->sleep) 会 “永久睡眠”

那怎么办

  • Release-wait 必须实现成 “不可分割的原子操作”

    • 解决不了,问操作系统吧 (实际实现靠得是 futex)

  • 信号量:在合适的时候好用;但不总是好用

4. 总结

Take-away messages: 信号量可以看做是互斥锁的一个 “推广”,可以理解成游泳馆的手环、停车场的车位、餐厅的桌子和袋子里的球,通过计数的方式实现同步——在符合这个抽象时,使用信号量能够带来优雅的代码。但信号量不是万能的——理解线程同步的条件才是真正至关重要的。

Last updated