Chapter 30 Conditional Variables

第 30 章 条件变量

很多情况下,线程需要检查某一条件(condition)满足之后,才会继续运行...

例如:

  • join() 等待子线程,父线程检查子线程是否执行完毕

我们也可以尝试使用共享变量,但是一般效率低下

例如:

  • CPU 自旋 spin 检查是否符合条件

1. 定义和程序

线程可以使用 condition variable 来等待一个条件变成真...

条件变量是一个显式队列...

当 condition 不满足,线程把自己加入队列,waiting 该条件

另外的线程改变了 condition 的时候,会唤醒队列里面的线程

1.1. 条件变量使用

声明:

pthread_cond_t c;

c是一个条件变量,加上适当初始化

其有两个操作

wait();
signal();

线程要休眠:调用 wait()

线程要唤醒别人:调用 signal()

1.2. 为什么需要状态变量(比如done

文档里举了一个例子,如果没有状态变量,只有条件变量,可能会出问题。比如,子线程运行得很快,直接调用了signal()通知父线程,但父线程还没开始等待(还没进入wait())。这时候,父线程会错过这个通知,因为它还没开始等待就被通知了。结果就是父线程会一直等下去,因为没有其他线程再通知它了。状态变量的作用:状态变量(比如done)是一个标记,用来记录条件是否已经满足。子线程在完成任务后,会设置这个状态变量(比如把done设置为1),然后调用signal()通知父线程。父线程在等待时会检查这个状态变量,确保不会错过通知。

1.3. signal() 和 wait()

都要确保是持有锁的进程在调用....防止态条件

wait() 语法上需要持有锁,signal 虽然不强制,但是建议!

signal() 也需要锁:虽然在某些情况下, signal() 操作可能不需要锁,但为了避免复杂的竞态条件,最好的做法是在调用 signal() 时也持有锁。这样可以确保状态变量和通知操作是一致的。

2. 生产者/消费者(有界缓冲区)问题

producer/consumer problem

bounded-buffer...

很多实际应用都可以归为这类问题

比如:

2.1. 管道

A bounded buffer is also used when you pipe the output of one program into another,

e.g., grep foo file.txt | wc -l.

This example runs two processes concurrently;

grep writes lines from file.txt with the string foo in them to what it thinks is standard output;

the UNIX shell redirects the output to what is called a UNIX pipe (created by the pipe system call). The other end of this pipe is connected to the standard input of the process wc, which simply counts the number of lines in the input stream and prints out the result. Thus, the grep process is the producer; the wc process is the consumer; between them is an in-kernel bounded buffer; you, in this example, are just the happy user.

2.2. 生产者线程

生成数据项 - 进入消费缓冲区

2.3. 消费者线程

从缓冲区取走线程,某种方式消费...

2.4. 有问题的方案

#include <assert.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include "common.h"
#include "common_threads.h"

int max;
int loops;
int *buffer; 

int use_ptr  = 0;
int fill_ptr = 0;
int num_full = 0;

pthread_cond_t cv = PTHREAD_COND_INITIALIZER;
pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;

int consumers = 1;
int verbose = 1;


void do_fill(int value) {
    buffer[fill_ptr] = value;
    fill_ptr = (fill_ptr + 1) % max;
    num_full++;
}

int do_get() {
    int tmp = buffer[use_ptr];
    use_ptr = (use_ptr + 1) % max;
    num_full--;
    return tmp;
}

void *producer(void *arg) {
    int i;
    for (i = 0; i < loops; i++) {
	Mutex_lock(&m);            // p1
	while (num_full == max)    // p2
	    Cond_wait(&cv, &m);    // p3
	do_fill(i);                // p4
	Cond_signal(&cv);          // p5
	Mutex_unlock(&m);          // p6
    }

    // end case: put an end-of-production marker (-1) 
    // into shared buffer, one per consumer
    for (i = 0; i < consumers; i++) {
	Mutex_lock(&m);
	while (num_full == max) 
	    Cond_wait(&cv, &m);
	do_fill(-1);
	Cond_signal(&cv);
	Mutex_unlock(&m);
    }

    return NULL;
}
                                                                               
void *consumer(void *arg) {
    int tmp = 0;
    // consumer: keep pulling data out of shared buffer
    // until you receive a -1 (end-of-production marker)
    while (tmp != -1) { 
	Mutex_lock(&m);           // c1
	while (num_full == 0)     // c2 
	    Cond_wait(&cv, &m);   // c3
	tmp = do_get();           // c4
	Cond_signal(&cv);         // c5
	Mutex_unlock(&m);         // c6
    }
    return NULL;
}

int
main(int argc, char *argv[])
{
    if (argc != 4) {
	fprintf(stderr, "usage: %s <buffersize> <loops> <consumers>\n", argv[0]);
	exit(1);
    }
    max = atoi(argv[1]);
    loops = atoi(argv[2]);
    consumers = atoi(argv[3]);

    buffer = (int *) malloc(max * sizeof(int));
    assert(buffer != NULL);

    int i;
    for (i = 0; i < max; i++) {
	buffer[i] = 0;
    }

    pthread_t pid, cid[consumers];
    Pthread_create(&pid, NULL, producer, NULL); 
    for (i = 0; i < consumers; i++) {
	Pthread_create(&cid[i], NULL, consumer, (void *) (long long int) i); 
    }
    Pthread_join(pid, NULL); 
    for (i = 0; i < consumers; i++) {
	Pthread_join(cid[i], NULL); 
    }
    return 0;
}

The problem arises for a simple reason: after the producer woke Tc1, but before Tc1 ever ran, the state of the bounded buffer changed (thanks to Tc2). Signaling a thread only wakes them up; it is thus a hint that the state of the world has changed (in this case, that a value has been placed in the buffer), but there is no guarantee that when the woken thread runs, the state will still be as desired. This interpretation of what a signal means is often referred to as Mesa semantics, after the first research that built a condition variable in such a manner [LR80]; the contrast, referred to as Hoare semantics, is harder to build but provides a stronger guarantee that the woken thread will run immediately upon being woken [H74]. Virtually every system ever built employs Mesa semantics.

2.5. 较好但有问题的方案:while 语句 替代 if

由于 Mesa 语义,牢记下面的规则:

  • 总是使用 while 循环

  • 虽然有时候不需要重新检查条件,但这样做总是安全的...

  • just do it and be happy.

但是有新问题

如果 1 producer 和 2 consumer

当一个 C_1 消费了数据

需要唤醒队列里的进程(P 和 C_2)

正确情况是唤醒 P

但绝对有可能唤醒 C_2,取决于睡眠队列如何管理...

C_2 醒来发现没法执行(while)所以又回去睡了

C_1 刚刚唤醒了 C_2 之后检查条件之后也会去睡了

P 本来就在睡

现在三个都在睡觉...

所以信号很重要

需要指定唤醒的是 producer 还是 consumer...

2.6. 单值缓冲的生产者/消费者方案

解决方案简单:使用两个 condition variable

2.7. 最终方案

虽然能用了,但是扩展性不强,需要多值缓冲...也就是缓冲区大小 != 1

The last change we make is to enable more concurrency and efficiency; specifically, we add more buffer slots, so that multiple values can be produced before sleeping, and similarly multiple values can be consumed before sleeping.

With just a single producer and consumer, this approach is more efficient as it reduces context switches 减少上下文切换; with multiple producers or consumers (or both), it even allows concurrent producing or consuming to take place, thus increasing concurrency. Fortunately, it is a small change from our current solution.

always use while

解决了 spurious wakeup 假唤醒的问题

3. 覆盖条件

使用 boardcast 唤醒所有等待的线程,避免不知道唤醒哪个等待的线程的情况...

这样可能带来性能的下降,但是很好解决了问题(所有唤醒的进程都进入 while 判断执行条件)

广播一般来说有可能(但不一定是)最佳方案,要好好斟酌!

4. 作业(编码作业)

This homework lets you explore some real code that uses locks and condition variables to implement various forms of the producer/consumer queue discussed in the chapter. You’ll look at the real code, run it in various configurations, and use it to learn about what works and what doesn’t, as well as other intricacies. Read the README for details.

Last updated