Multithreading and Thread Synchrony

多线程与线程同步

来自B站:爱编程的大丙

代码仓库:https://github.com/Rain0832/CSDIY_Demo/tree/main/Threads

我将其定义为操作系统导论与实践。

操作系统最重要的理解就是进程/线程,锁机制等。

1. 多线程特点

  • 线程是轻量级的进程(LWP:light weight process),在Linux环境线程的本质是进程

  • 进程是资源分配的最小单位,线程是操作系统调度执行的最小单位。

  • 线程不是越多越好。而是够用就好

    • 文件IO操作:文件IO对CPU是使用率不高, 因此可以分时复用CPU时间片, 线程的个数 = 2 * CPU核心数 (效率最高)

    • 处理复杂的算法(主要是CPU进行运算, 压力大),线程的个数 = CPU的核心数 (效率最高)

2. 线程的创建、退出、回收方法

线程库 - C语言:

  • #include <pthread.h>

2.1. 线程函数

线程ID函数:pthread_t pthread_self(void);

创建函数:int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);

2.2. 创建线程

#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<pthread.h>

struct Test
{
	int num;
	int age;
};

// 子线程的处理代码:回调函数
void* callback(void* arg)
{
	for(int i = 0; i < 5; i++)
	{
		printf("child thread: i = %d\n", i);
	}
	printf("child thread: %ld\n", pthread_self());

	struct Test* t = (struct Test*)arg;
	t -> num = 100;
	t -> age = 6;

	pthread_exit(t);
	return NULL;

}

int main(void)
{
	struct Test t;

    // 创建一个子线程
	pthread_t tid;
	pthread_create(&tid, NULL, callback, &t);
    // 主线程代码
	printf("main thread: %ld\n", pthread_self());

	pthread_detach(tid);
    // 线程退出
	pthread_exit(NULL);
	return 0;
}

2.3. 线程退出

退出函数:void pthread_exit(void *retval);

2.4. 线程回收

回收函数:int pthread_join(pthread_t thread, void **retval);

这个函数是一个阻塞函数,如果还有子线程在运行,调用该函数就会阻塞,子线程退出函数解除阻塞进行资源的回收,函数被调用一次,只能回收一个子线程,如果有多个子线程则需要循环进行回收。

3. 其他线程函数

3.1. 线程分离

分离函数:int pthread_detach(pthread_t thread);

参数是子线程的线程ID, 主线程会和这个子线程分离

3.2. 线程取消

取消函数:int pthread_cancel(pthread_t thread);

3.3. 线程ID比较

比较函数:int pthread_equal(pthread_t t1, pthread_t t2);

比较两个线程的ID是否相同。

3.4. C++线程类

以上的函数C++中都支持。另外C++11也提供了线程类支持。

C++11中提供的线程类叫做std::thread

3.4.1. 构造函数

// ①
thread() noexcept;
// ②
thread( thread&& other ) noexcept;
// ③
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
// ④
thread( const thread& ) = delete;

3.4.2. 公共成员函数

获取线程ID:

get_id(): std::thread::id get_id() const noexcept;

连接线程:

join(): void join();

线程分离:

detach(): void detach();

判断主线程与子线程是否关联:

joinable(): bool joinable() const noexcept;

线程的复制、粘贴(类似):

  • 线程中的资源是不能被复制的,因此通过=操作符进行赋值操作最终并不会得到两个完全相同的对象。

// move (1)	
thread& operator= (thread&& other) noexcept;
// copy [deleted] (2)	
thread& operator= (const other&) = delete;

静态函数

static unsigned hardware_concurrency() noexcept;

4. 线程同步处理思路

假设有4个线程A、B、C、D,当前一个线程A对内存中的共享资源进行访问的时候,其他线程B, C, D都不可以对这块内存进行操作,直到线程A对这块内存访问完毕为止,B,C,D中的一个才能访问这块内存,剩余的两个需要继续阻塞等待,以此类推,直至所有的线程都对这块内存操作完毕。 线程对内存的这种访问方式就称之为线程同步,通过对概念的介绍,我们可以了解到所谓的同步并不是多个线程同时对内存进行访问,而是按照先后顺序依次进行的。

通过锁机制可以让程序能够按照预期执行,而不会因为线程对共享资源的访问沟通出错使得程序出现未知错误。

线程同步的大致处理思路是这样的:

  • 在临界区代码的上边,添加加锁函数,对临界区加锁。

    • 哪个线程调用这句代码,就会把这把锁锁上,其他线程就只能阻塞在锁上了。

  • 在临界区代码的下边,添加解锁函数,对临界区解锁。

    • 出临界区的线程会将锁定的那把锁打开,其他抢到锁的线程就可以进入到临界区了。

  • 通过锁机制能保证临界区代码最多只能同时有一个线程访问,这样并行访问就变为串行访问了。

5. 互斥锁

5.1. 互斥锁变量与函数

互斥锁变量:pthread_mutex_t mutex;

初始化互斥锁:int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restrict attr);

释放互斥锁资源:int pthread_mutex_destroy(pthread_mutex_t *mutex);

互斥锁锁定:int pthread_mutex_lock(pthread_mutex_t *mutex);

尝试加锁函数:int pthread_mutex_trylock(pthread_mutex_t *mutex);

相当于在加锁前做一次判断。

互斥锁解锁:int pthread_mutex_unlock(pthread_mutex_t *mutex);

5.2. 互斥锁使用

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>

#define MAX 50
// 全局变量
int number;

// thread mutex
pthread_mutex_t mutex;

// 线程处理函数
void *funcA_num(void *arg)
{
    for (int i = 0; i < MAX; ++i)
    {
        pthread_mutex_lock(&mutex);
        int cur = number;
        cur++;
        usleep(10);
        number = cur;
        printf("Thread A, id = %lu, number = %d\n", pthread_self(), number);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

void *funcB_num(void *arg)
{
    for (int i = 0; i < MAX; ++i)
    {
        pthread_mutex_lock(&mutex);
        int cur = number;
        cur++;
        number = cur;
        printf("Thread B, id = %lu, number = %d\n", pthread_self(), number);
        usleep(5);
        pthread_mutex_unlock(&mutex);
    }
    return NULL;
}

int main(int argc, const char *argv[])
{
    pthread_t p1, p2;
    pthread_mutex_init(&mutex, NULL);

    // 创建两个子线程
    pthread_create(&p1, NULL, funcA_num, NULL);
    pthread_create(&p2, NULL, funcB_num, NULL);

    // 阻塞,资源回收
    pthread_join(p1, NULL);
    pthread_join(p2, NULL);

    pthread_mutex_destroy(&mutex);
    return 0;
}

6. 线程死锁

6.1. 常见场景

  • 加锁后忘记解锁

  • 重复加锁,造成死锁

  • 有多个共享资源时,随意加锁,导致互相被阻塞

6.2. 如何避免死锁

  • 避免多次锁定,多检查

  • 对共享资源访问完毕之后,一定要解锁,或者在加锁的时候使用trylock

  • 如果程序中多把锁,可以控制对锁的访问顺序,也可以在对其他互斥锁进行加锁之前,先释放当前线程拥有的互斥锁

  • 项目中可以引入一些专门用于死锁检测的模块

7. 读写锁

其实是一把锁,只是有两种使用方式,基本类似互斥锁。

7.1. 读写锁变量与函数函数

读写锁变量:pthread_rwlock_t rwlock;

初始化读写锁:int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);

释放读写锁:int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

锁定读操作:int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);

尝试锁定读操作:int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);

锁定写操作:int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);

尝试锁定写操作:int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);\

解锁操作:int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

7.2. 读写锁使用

#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <string.h>
#include <pthread.h>

#define MAX 50
// 全局变量
int number;

// thread mutex
pthread_rwlock_t rwlock;

// 线程处理函数
void *read_num(void *arg)
{
    for (int i = 0; i < MAX; ++i)
    {
        pthread_rwlock_rdlock(&rwlock);
        printf("Thread read, id = %lu, number = %d\n", pthread_self(), number);
        pthread_rwlock_unlock(&rwlock);
        usleep(rand() % 5);
    }

    return NULL;
}

void *write_num(void *arg)
{
    for (int i = 0; i < MAX; ++i)
    {
        pthread_rwlock_wrlock(&rwlock);
        int cur = number;
        cur++;
        number = cur;
        printf("Thread write, id = %lu, number = %d\n", pthread_self(), number);
        pthread_rwlock_unlock(&rwlock);
        usleep(5);
    }

    return NULL;
}

int main(int argc, const char *argv[])
{
    pthread_t p_read[5], p_write[3];
    pthread_rwlock_init(&rwlock, NULL);

    // 创建两个子线程
    for (int i = 0; i < 5; i++)
    {
        pthread_create(&p_read[i], NULL, read_num, NULL);
    }

    for (int i = 0; i < 3; i++)
    {
        pthread_create(&p_write[i], NULL, write_num, NULL);
    }

    // 阻塞,资源回收

    for (int i = 0; i < 3; i++)
    {
        pthread_join(p_write[i], NULL);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_join(p_read[i], NULL);
    }

    pthread_rwlock_destroy(&rwlock);
    return 0;
}

8. 条件变量

条件变量的主要作用不是处理线程同步, 而是进行线程的阻塞。

8.1. 条件变量与函数

条件变量:pthread_cond_t cond;

初始化:int pthread_cond_init(pthread_cond_t *restrict cond, const pthread_condattr_t *restrict attr);

销毁释放资源:int pthread_cond_destroy(pthread_cond_t *cond);

线程阻塞函数:int pthread_cond_wait(pthread_cond_t *restrict cond, pthread_mutex_t *restrict mutex);

其作用有:

    • 阻塞线程

    • 阻塞线程之前把线程的互斥锁打开

    • 在线程解除阻塞之后,自动给线程加上互斥锁

8.2. 生产者与消费者模型

  1. 生产者线程 -> 若干个

    • 生产商品或者任务放入到任务队列中

    • 任务队列满了就阻塞, 不满的时候就工作

    • 通过一个生产者的条件变量控制生产者线程阻塞和非阻塞

  1. 消费者线程 -> 若干个

    • 读任务队列, 将任务或者数据取出

    • 任务队列中有数据就消费,没有数据就阻塞

    • 通过一个消费者的条件变量控制消费者线程阻塞和非阻塞

  1. 队列 -> 存储任务/数据,对应一块内存,为了读写访问可以通过一个数据结构维护这块内存

    • 可以是数组、链表,也可以使用stl容器:queue / stack / list / vector

8.3. 条件变量使用

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

pthread_cond_t cond;
pthread_mutex_t mutex;

// list sturcture type
struct Node
{
	int number;
	struct Node *next;
};

// list head node
struct Node *head = NULL;

// producer callback function
void *producer(void *arg)
{
	while (1)
	{
		pthread_mutex_lock(&mutex);

		// bound area<

		// create new node
		struct Node *newNode = (struct Node *)malloc(sizeof(struct Node));

		// init node
		newNode->number = rand() % 1000;
		newNode->next = head;
		head = newNode;
		printf("producer, id: %ld, number: %d\n", pthread_self(), newNode->number);

		// >bound area
		pthread_mutex_unlock(&mutex);

		// awake the blocked thread...
		pthread_cond_broadcast(&cond);
		sleep(rand() % 3);
	}

	return NULL;
}

// consumer callback function
void *consumer(void *arg)
{
	while (1)
	{
		pthread_mutex_lock(&mutex);
		// bound area

		while (head == NULL)
		{
			// block consumer thread
			pthread_cond_wait(&cond, &mutex);
		}
		struct Node *node = head;
		printf("consumer, id: %ld, number: %d\n", pthread_self(), node->number);
		head = head->next;
		free(node);

		// bound area
		pthread_mutex_unlock(&mutex);
		sleep(rand() % 3);
	}
	return NULL;
}

int main(void)
{
	pthread_mutex_init(&mutex, NULL);
	pthread_cond_init(&cond, NULL);

	pthread_t t_p[5], t_c[5];
	for (int i = 0; i < 5; i++)
	{
		pthread_create(&t_p[i], NULL, producer, NULL);
	}

	for (int i = 0; i < 5; i++)
	{
		pthread_create(&t_c[i], NULL, consumer, NULL);
	}

	for (int i = 0; i < 5; i++)
	{
		pthread_join(t_p[i], NULL);
		pthread_join(t_c[i], NULL);
	}

	pthread_mutex_destroy(&mutex);
	pthread_cond_destroy(&cond);
	return 0;
}

9. 信号量

信号量用在多线程多任务同步的,一个线程完成了某一个动作就通过信号量告诉别的线程,别的线程再进行某些动作。

9.1. 信号量变量与函数

信号量变量:sem_t sem;

初始化信号量/信号灯:int sem_init(sem_t *sem, int pshared, unsigned int value);

资源释放:int sem_destroy(sem_t *sem);

  • 线程销毁之后调用这个函数即可

消耗资源:int sem_wait(sem_t *sem);

  • sem中资源消耗1个,当资源 == 0,线程会被阻塞

  • 相当于一个计数器

尝试消耗资源:int sem_trywait(sem_t *sem);

增加资源:int sem_post(sem_t *sem);

  • sem中资源增加1个

查看资源:int sem_getvalue(sem_t *sem, int *sval);

  • 查看sem中当前值

9.2. 信号量使用

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>
#include <semaphore.h>

// producer's signal
sem_t semp;
// consumer's signal
sem_t semc;

pthread_mutex_t mutex;

// list sturcture type
struct Node
{
    int number;
    struct Node *next;
};

// list head node
struct Node *head = NULL;

// producer callback function
void *producer(void *arg)
{
    while (1)
    {
        sem_wait(&semp);
        pthread_mutex_lock(&mutex);
        // create new node

        struct Node *newNode = (struct Node *)malloc(sizeof(struct Node));

        // init node
        newNode->number = rand() % 1000;
        newNode->next = head;
        head = newNode;
        printf("producer, id: %ld, number: %d\n", pthread_self(), newNode->number);
        sem_post(&semp);

        pthread_mutex_unlock(&mutex);
        sleep(rand() % 3);
    }

    return NULL;
}

// consumer callback function
void *consumer(void *arg)
{
    while (1)
    {
        sem_wait(&semc);

        pthread_mutex_lock(&mutex);
        struct Node *node = head;
        printf("consumer, id: %ld, number: %d\n", pthread_self(), node->number);
        head = head->next;
        free(node);
        sem_post(&semc);

        pthread_mutex_unlock(&mutex);
        sleep(rand() % 3);
    }
    return NULL;
}

int main(void)
{
    // producer
    sem_init(&semp, 0, 5);

    // consumer -> init == 0; blocked
    sem_init(&semc, 0, 0);

    pthread_mutex_init(&mutex, NULL);

    pthread_t t_p[5], t_c[5];
    for (int i = 0; i < 5; i++)
    {
        pthread_create(&t_p[i], NULL, producer, NULL);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_create(&t_c[i], NULL, consumer, NULL);
    }

    for (int i = 0; i < 5; i++)
    {
        pthread_join(t_p[i], NULL);
        pthread_join(t_c[i], NULL);
    }

    pthread_mutex_destroy(&mutex);
    sem_destroy(&semp);
    sem_destroy(&semc);
    return 0;
}

10. 总结

操作系统的学习重点就在于理解系统的线程如何操作,线程又引申出了多线程与线程同步的概念,也就是这次副本的主要内容。另外也涉及到了系统内存的知识,这次的代码为C语言实现,80%的代码是手搓的。另外借此机会重新复用了Ubuntu系统进行编程。这次全程使用vim编辑器进行编程。在一次次的问kimi的过程中,一次次的又忘记,借助AI让自己的编程能力快速成长。

这次只能算是浅尝辄止,甚至不能算作操作系统的导论课,只能算作线程的导论课,关于操作系统和线程都还有很多知识和实践经验,单靠这几百行代码远远不够,只能是了解个大概,但这也够了。

Last updated