15 - Concurrency control: synchronization (1)

15 - 并发控制:同步(1)

借助硬件原子指令和操作系统的帮助,我们实现了高效的互斥锁,能够确保锁保护的代码块按照某个顺序执行。然而,互斥并不总是能满足多个并发线程协作完成任务的需求,我们很多时候还需要控制代码块执行的先后次序。如何能便捷地让共享内存的线程协作以共同完成计算任务?

本讲内容:并发控制:同步。同步问题的定义、生产者-消费者问题、“万能” 的同步方法:条件变量。

好消息!!!

终于可以实现正确的 1 + 1 了!

  • 自旋锁 spin_lock, spin_unlock

  • 互斥锁 mutex_lock, mutex_unlock

lock(&lock);
sum++;
// 任意代码
unlock(&lock);

坏消息

互斥似乎还不够

  • 互斥实现了原子性ABBA

  • 但没有给我们确定性AB

    • 执行顺序的确定性

理解并发的方法

  • 线程 = 我们自己

  • 共享内存 = 物理空间

1. 同步和条件变量

1.1. 并发控制:同步 - Synchronization

“两个或两个以上随时间变化的量在变化过程中保持一定的相对关系”

  • 同步电机、同步电路、红绿灯……

DeepSeek: 多个事件、进程或系统在时间上协调一致,确保按预定顺序同时执行

我们希望控制事件发生的先后顺序

  • A→B→C

    • 互斥锁只能确保 “分开” A,B,C,做不到顺序控制

    • 可能是 A,C,B 或其他...

  • 我们希望形成受我们控制的 “happens-before” 关系

1.2. 现实世界的同步(1)

比如:合唱团...

理解同步:什么是事件 (代码的执行)、什么是顺序

  • 每个乐手都是一个 “线程” (状态机)

    • 事件:发出节拍、演奏节拍

    • 顺序:发出节拍 i → 演奏节拍 i

    • (这个模型允许一个节拍演奏得过长)

      • 这样就会造成混乱...

      • 实际上应该避免此情况

void T_player() {
    while (!end) {
        wait_next_beat();
        play_next_beat();
    }
}

1.3. 现实世界的同步(2)

理解同步:什么是事件 (代码的执行)、什么是顺序

  • 事件 A: 第一个人到达、事件 B: 第二个人到达

  • 不见不散:有一个共同完成的事件 XXXXXX (类似线程 join)

    • A→XXX, B→XXX

      • 因此一定有一瞬间 A,B 完成、XXXXXX 还没有开始

        • A, B 都完成了...并且都还没有开始下一步

      • (同步点:一个确定的状态;从而实现并发控制)

      • “控制”:将发散的并发程序状态 “收束”

void T_A() {
    arrive_at_activity_center();  // 大活
    while (!both_A_and_B_are_here()) ;  // 不见不散
    xxx();
}

1.4. 程序世界中的同步

sum = 0;  // A
spawn(T_sum);  // T_1 start -> T_1 end
spawn(T_sum);  // T_2 start -> T_2 end
join();
printf("sum = %ld\n", sum);  // B

理解同步:什么是事件 (代码的执行)、什么是顺序

    • 同一线程内的事件天然存在 happens-before 关系

      • 线程的执行一定在创建之后发生的

      • start 一定在 spawn 之后发生的...

    • 进入的 join 之后

    • 没有 happens-before 的代码就是并发

      • 在多处理器系统中可以并行执行

1.5. 实现同步:实现 Happens-before

实现 A→B

  • A; can_proceed = true; (🚩)

  • while (!can_proceed) ; B

    • B: 如果同步的条件不满足,就等待

void T_player() {
    for (int i = 0; i < n; i++) {
        while (current_beat < i) ;
        // 线程会检查是否完成节拍,否则一直等待...
        play_beat(i);
    }
}
  • “大致可用”,但存在一些问题

    • 例如,current_beat 存在和 sum++ 类似的问题

  • form kimi:

    • 忙等待(Busy Waiting)

      • while (current_beat < i) ; CPU 空转

    • 共享变量的同步问题

      • current_beat 是一个共享变量,多个线程可能会同时访问和修改它。

  • 可以加上锁来进行...互斥...访问问题...

1.6. 问问 OS?

能不能帮我们替代自旋的循环

while (!can_proceed) ;

“发明” 条件变量机制

  • 一个理想的 API

    • wait_until(cond),把代码编译到内核里

  • 当然,过去的操作系统设计者不会搞那么复杂

    • 条件不满足时等待 (让出 CPU)

      • wait - 直接睡眠等待

    • 条件满足时继续

      • signal/broadcast - 唤醒所有等待的线程

    • 实现 A Happens Before B

1.7. C++: 我可以把条件放到 λ 表达式里

std::mutex mtx;					// 互斥锁
std::condition_variable cv;  	// Since C++11 条件变量

void T_player() {
    std::unique_lock lk(mtx);	// 锁对象->mtx
    
    cv.wait(lk,
        // This reads better!
        []{ return can_proceed; }
    );							// 等待函数(锁对象,条件表达式)

    // can_proceed holds here
    // 线程运行到这,以及成功获取了锁 lk

    cv.notify_all();	// 通知所有等待 cv 的线程
    lk.unlock();		// 手动释放锁(此处会自动析构...)
}

注意使用条件变量之前...是必须要带一把锁的,否则编译不通过...

不要碰瓷并发编程...先使用绝对正确的方法...

2. 条件变量的正确打开方式

2.1. 经典同步问题

学废你就赢了

  • 99% 的实际并发问题都可以用生产者-消费者解决

  • dependency graph...

Producer 和 Consumer 共享一个缓冲区

  • Producer (生产数据):如果缓冲区有空位,放入;否则等待

  • Consumer (消费数据):如果缓冲区有数据,取走;否则等待

    • 同步:同一个 object 的生产必须 happens-before 消费

void produce(Object obj);
Object consume();

2.2. 生产者消费者问题简化

一个等价的描述

void produce() { printf("("); }
void consume() { printf(")"); }
  • 生产 = 打印左括号 (push into buffer)

  • 消费 = 打印右括号 (pop from buffer)

  • 在 printf 前后增加代码,使得打印的括号序列满足

    • 不能输出错误的括号序列

    • 括号嵌套的深度不超过 n

      • n=3, ((())())(((

      • n=3, (((()))), (()))×

        • 嵌套深度为 4(连续 4 个左括号出现了!)

        • 并且右括号在左括号之前出现了

2.3. 条件变量的正确打开方式

想清楚程序继续执行的条件

  • 什么时候可以 produce,什么时候可以 consume?

mutex_lock(🔒);
while (!cond) {  // cond 可以是任意的计算
    cond_wait(&cv, 🔒);
}
assert(cond);    // 此时 cond 成立且持有锁 lk
mutex_unlock(🔒);
// 注意锁的使用
mutex_lock(🔒);
cond = true;
cond_broadcast(&cv); // 唤醒所有可能继续的线程
mutex_unlock(🔒);

2.4. 使用条件变量实现同步

生产/消费的条件是什么?

  • d<n 可以生产;d>0 可以消费 (然后,抄代码)

首先要学会,可以无脑写出正确的代码

void produce() {
    mutex_lock(🔒);
    while (!(depth < n)) {
        cond_wait(&cv, 🔒);
    }

    assert(depth < n);
    depth++;
    printf("("); // put object to queue

    cond_broadcast(&cv);
    mutex_unlock(🔒);
}

2.5. Caveat: 小心并发!

“看起来正确” 其实很危险

  • Producer 如果唤醒了等待的 producer 就糟了……

void produce() {
    mutex_lock(🔒);				// 尝试获取互斥锁
    while (!(depth < n)) {		// 检查队列是否已满
        cond_wait(&cv, 🔒);	// 已满则进入等待
    }

    assert(depth < n);
    depth++;					// 队列未满,放对象进入队列
    printf("("); 				// put object to queue

    cond_signal(&cv);  			// ⚠	通知其他线程:有可能唤醒另一个生产者?
    mutex_unlock(🔒);
}

From KIMI

正确的做法是使用两个条件变量

  1. 一个用于生产者(cv_producer

  2. 一个用于消费者(cv_consume)。

  • 生产者线程应该使用cv_producer来等待队列不满的条件

  • 消费者线程应该使用cv_consumer来等待队列不空的条件

这样可以确保cond_signal只会唤醒正确的线程类型。

2.6. 条件变量:万能的同步法

有三种线程

  • Ta 若干: 死循环打印 <

  • Tb 若干: 死循环打印 >

  • Tc 若干: 死循环打印 _

任务:

  • 对线程同步,使得屏幕打印出 <><_><>_ 的组合

使用条件变量,只要回答三个问题:

  • 打印 “<” 的条件?打印 “>” 的条件?打印 “_” 的条件?

大脑体操?

但这里是 OS 的课程...需要考虑的是,什么时候该做什么事情???

3. 实现并发计算图

3.1. 理解你的计算任务:计算图模型

G(V,E): 有向无环的 Dependency Graph

  • 计算任务在节点上

    • (可以使用 shared memory)

  • 边 (u,v)∈E 表示 v 的计算要用到 u 产生的值

    • (u,v) 也是一个 happens-before 关系

这是一个非常基础的模型

  • 几乎总是可以用这个视角去理解并行计算

  • 如果节点 “独立计算时间” 足够长,算法就是可高效并行的

3.2. 例子:Longest Common Subsequence

这个是如何进行并行计算的?

根据左上角的拓扑结构扩展,利用依赖关系进行并行计算...

此时还可以进行一个性能优化

  • 每层的并行任务可以适当扩大,这样可以把任务缩减到16、32个线程...

3.3. 例子:电路模拟

由于个别 Logic 门的输入依赖于输出...所以也存在一些依赖关系

可以化为并行图...

3.4. 例子:深度神经网络

  • 我们会专门有一次课讲各类计算任务的并行方法...

神经网络就是天生的并行图...

3.5. 同步:实现任意计算图(1)

为每个计算节点设置一个线程和条件变量

  • 当然也可以几个节点分配一把大锁...

  • 初学阶段建议不要太贪心?

void T_u() {  // u -> v
    ... // u 的计算
    mutex_lock(v->lock);
    v->num_done++;
    cond_signal(v->cv);  // 这里是可以 signal 的
    mutex_unlock(v->lock);
}

void T_v() {
    mutex_lock(v->lock);
    while (!(v->num_done == v->num_predecessors)) {
        cond_wait(v->cv, v->lock);
    }
    mutex_unlock(v->lock);
    ... // v 的计算
}

3.6. 同步:实现任意计算图(2)

实现一个任务的调度器

  • 一个生产者 (scheduler),许多消费者 (workers) 循环:

mutex_lock(🔒);
while (!(all_done || has_job(tid))) {
    cond_wait(&worker_cv[tid], 🔒);
}
mutex_unlock(🔒);

if (all_done) {
    break;
} else {
    process_job(tid);
}

signal(&sched_cv);

3.7. T scheduler...

实现一个队列...

调度器 scheduler 生产任务 - workers 消费任务...

4. 总结

Take-away messages: 同步的本质是线程需要等待某件它所预期的事件发生,而事件的发生总是可以用条件 (例如 depth 满足某个条件,或是程序处于某个特定的状态) 来表达。因此计算机系统的设计者实现了条件变量,将条件检查和临界区 “放在一起”,以实现线程间的同步。

  • 依赖关系 dependency....

  • 生产者消费者...Model...

Last updated