15 - Concurrency control: synchronization (1)
15 - 并发控制:同步(1)
借助硬件原子指令和操作系统的帮助,我们实现了高效的互斥锁,能够确保锁保护的代码块按照某个顺序执行。然而,互斥并不总是能满足多个并发线程协作完成任务的需求,我们很多时候还需要控制代码块执行的先后次序。如何能便捷地让共享内存的线程协作以共同完成计算任务?
本讲内容:并发控制:同步。同步问题的定义、生产者-消费者问题、“万能” 的同步方法:条件变量。
好消息!!!
终于可以实现正确的 1 + 1 了!
自旋锁 spin_lock, spin_unlock
互斥锁 mutex_lock, mutex_unlock
lock(&lock);
sum++;
// 任意代码
unlock(&lock);
坏消息
互斥似乎还不够
互斥实现了原子性:A→B 或 B→A
但没有给我们确定性:A→B
执行顺序的确定性
理解并发的方法
线程 = 我们自己
共享内存 = 物理空间
1. 同步和条件变量
1.1. 并发控制:同步 - Synchronization
“两个或两个以上随时间变化的量在变化过程中保持一定的相对关系”
同步电机、同步电路、红绿灯……
DeepSeek: 多个事件、进程或系统在时间上协调一致,确保按预定顺序或同时执行
我们希望控制事件发生的先后顺序
A→B→C
互斥锁只能确保 “分开” A,B,C,做不到顺序控制
可能是 A,C,B 或其他...
我们希望形成受我们控制的 “happens-before” 关系
1.2. 现实世界的同步(1)
比如:合唱团...
理解同步:什么是事件 (代码的执行)、什么是顺序
每个乐手都是一个 “线程” (状态机)
事件:发出节拍、演奏节拍
顺序:发出节拍 i → 演奏节拍 i
(这个模型允许一个节拍演奏得过长)
这样就会造成混乱...
实际上应该避免此情况
void T_player() {
while (!end) {
wait_next_beat();
play_next_beat();
}
}
1.3. 现实世界的同步(2)
理解同步:什么是事件 (代码的执行)、什么是顺序
事件 A: 第一个人到达、事件 B: 第二个人到达
不见不散:有一个共同完成的事件 XXXXXX (类似线程 join)
A→XXX, B→XXX
因此一定有一瞬间 A,B 完成、XXXXXX 还没有开始
A, B 都完成了...并且都还没有开始下一步
(同步点:一个确定的状态;从而实现并发控制)
“控制”:将发散的并发程序状态 “收束”
void T_A() {
arrive_at_activity_center(); // 大活
while (!both_A_and_B_are_here()) ; // 不见不散
xxx();
}
1.4. 程序世界中的同步
sum = 0; // A
spawn(T_sum); // T_1 start -> T_1 end
spawn(T_sum); // T_2 start -> T_2 end
join();
printf("sum = %ld\n", sum); // B
理解同步:什么是事件 (代码的执行)、什么是顺序
同一线程内的事件天然存在 happens-before 关系
线程的执行一定在创建之后发生的
start 一定在 spawn 之后发生的...
进入的 join 之后
没有 happens-before 的代码就是并发的
在多处理器系统中可以并行执行
1.5. 实现同步:实现 Happens-before
实现 A→B
A; can_proceed = true; (🚩)
while (!can_proceed) ; B
B: 如果同步的条件不满足,就等待
void T_player() {
for (int i = 0; i < n; i++) {
while (current_beat < i) ;
// 线程会检查是否完成节拍,否则一直等待...
play_beat(i);
}
}
“大致可用”,但存在一些问题
例如,current_beat 存在和 sum++ 类似的问题
form kimi:
忙等待(Busy Waiting)
while (current_beat < i) ; CPU 空转
共享变量的同步问题
current_beat 是一个共享变量,多个线程可能会同时访问和修改它。
可以加上锁来进行...互斥...访问问题...
1.6. 问问 OS?
能不能帮我们替代自旋的循环
while (!can_proceed) ;
“发明” 条件变量机制
一个理想的 API
wait_until(cond),把代码编译到内核里
当然,过去的操作系统设计者不会搞那么复杂
条件不满足时等待 (让出 CPU)
wait - 直接睡眠等待
条件满足时继续
signal/broadcast - 唤醒所有等待的线程
实现 A Happens Before B
1.7. C++: 我可以把条件放到 λ 表达式里
std::mutex mtx; // 互斥锁
std::condition_variable cv; // Since C++11 条件变量
void T_player() {
std::unique_lock lk(mtx); // 锁对象->mtx
cv.wait(lk,
// This reads better!
[]{ return can_proceed; }
); // 等待函数(锁对象,条件表达式)
// can_proceed holds here
// 线程运行到这,以及成功获取了锁 lk
cv.notify_all(); // 通知所有等待 cv 的线程
lk.unlock(); // 手动释放锁(此处会自动析构...)
}
注意使用条件变量之前...是必须要带一把锁的,否则编译不通过...
不要碰瓷并发编程...先使用绝对正确的方法...
2. 条件变量的正确打开方式
2.1. 经典同步问题
学废你就赢了
99% 的实际并发问题都可以用生产者-消费者解决
dependency graph...
Producer 和 Consumer 共享一个缓冲区
Producer (生产数据):如果缓冲区有空位,放入;否则等待
Consumer (消费数据):如果缓冲区有数据,取走;否则等待
同步:同一个 object 的生产必须 happens-before 消费
void produce(Object obj);
Object consume();
2.2. 生产者消费者问题简化
一个等价的描述
void produce() { printf("("); }
void consume() { printf(")"); }
生产 = 打印左括号 (push into buffer)
消费 = 打印右括号 (pop from buffer)
在 printf 前后增加代码,使得打印的括号序列满足
不能输出错误的括号序列
括号嵌套的深度不超过 n
n=3,
((())())(((
√n=3,
(((())))
,(()))
×
嵌套深度为 4(连续 4 个左括号出现了!)
并且右括号在左括号之前出现了
2.3. 条件变量的正确打开方式
想清楚程序继续执行的条件
什么时候可以 produce,什么时候可以 consume?
mutex_lock(🔒);
while (!cond) { // cond 可以是任意的计算
cond_wait(&cv, 🔒);
}
assert(cond); // 此时 cond 成立且持有锁 lk
mutex_unlock(🔒);
// 注意锁的使用
mutex_lock(🔒);
cond = true;
cond_broadcast(&cv); // 唤醒所有可能继续的线程
mutex_unlock(🔒);
2.4. 使用条件变量实现同步
生产/消费的条件是什么?
d<n 可以生产;d>0 可以消费 (然后,抄代码)
首先要学会,可以无脑写出正确的代码
void produce() {
mutex_lock(🔒);
while (!(depth < n)) {
cond_wait(&cv, 🔒);
}
assert(depth < n);
depth++;
printf("("); // put object to queue
cond_broadcast(&cv);
mutex_unlock(🔒);
}
2.5. Caveat: 小心并发!
“看起来正确” 其实很危险
Producer 如果唤醒了等待的 producer 就糟了……
void produce() {
mutex_lock(🔒); // 尝试获取互斥锁
while (!(depth < n)) { // 检查队列是否已满
cond_wait(&cv, 🔒); // 已满则进入等待
}
assert(depth < n);
depth++; // 队列未满,放对象进入队列
printf("("); // put object to queue
cond_signal(&cv); // ⚠ 通知其他线程:有可能唤醒另一个生产者?
mutex_unlock(🔒);
}
From KIMI
正确的做法是使用两个条件变量
一个用于生产者(
cv_producer
)一个用于消费者(
cv_consume
)。
生产者线程应该使用
cv_producer
来等待队列不满的条件消费者线程应该使用
cv_consumer
来等待队列不空的条件
这样可以确保cond_signal
只会唤醒正确的线程类型。
2.6. 条件变量:万能的同步法
有三种线程
Ta 若干: 死循环打印
<
Tb 若干: 死循环打印
>
Tc 若干: 死循环打印
_
任务:
对线程同步,使得屏幕打印出
<><_
和><>_
的组合
使用条件变量,只要回答三个问题:
打印 “
<
” 的条件?打印 “>
” 的条件?打印 “_
” 的条件?
大脑体操?
但这里是 OS 的课程...需要考虑的是,什么时候该做什么事情???
3. 实现并发计算图
3.1. 理解你的计算任务:计算图模型
G(V,E): 有向无环的 Dependency Graph
计算任务在节点上
(可以使用 shared memory)
边 (u,v)∈E 表示 v 的计算要用到 u 产生的值
(u,v) 也是一个 happens-before 关系
这是一个非常基础的模型
几乎总是可以用这个视角去理解并行计算
如果节点 “独立计算时间” 足够长,算法就是可高效并行的
3.2. 例子:Longest Common Subsequence

这个是如何进行并行计算的?
根据左上角的拓扑结构扩展,利用依赖关系进行并行计算...
此时还可以进行一个性能优化
每层的并行任务可以适当扩大,这样可以把任务缩减到16、32个线程...
3.3. 例子:电路模拟

由于个别 Logic 门的输入依赖于输出...所以也存在一些依赖关系
可以化为并行图...
3.4. 例子:深度神经网络

我们会专门有一次课讲各类计算任务的并行方法...
神经网络就是天生的并行图...
3.5. 同步:实现任意计算图(1)
为每个计算节点设置一个线程和条件变量
当然也可以几个节点分配一把大锁...
初学阶段建议不要太贪心?
void T_u() { // u -> v
... // u 的计算
mutex_lock(v->lock);
v->num_done++;
cond_signal(v->cv); // 这里是可以 signal 的
mutex_unlock(v->lock);
}
void T_v() {
mutex_lock(v->lock);
while (!(v->num_done == v->num_predecessors)) {
cond_wait(v->cv, v->lock);
}
mutex_unlock(v->lock);
... // v 的计算
}
3.6. 同步:实现任意计算图(2)
实现一个任务的调度器
一个生产者 (scheduler),许多消费者 (workers) 循环:
mutex_lock(🔒);
while (!(all_done || has_job(tid))) {
cond_wait(&worker_cv[tid], 🔒);
}
mutex_unlock(🔒);
if (all_done) {
break;
} else {
process_job(tid);
}
signal(&sched_cv);
3.7. T scheduler...
实现一个队列...
调度器 scheduler 生产任务 - workers 消费任务...
4. 总结
Take-away messages: 同步的本质是线程需要等待某件它所预期的事件发生,而事件的发生总是可以用条件 (例如 depth 满足某个条件,或是程序处于某个特定的状态) 来表达。因此计算机系统的设计者实现了条件变量,将条件检查和临界区 “放在一起”,以实现线程间的同步。
依赖关系 dependency....
生产者消费者...Model...
Last updated