Chapter 29 Lock-Based Concurrent Data Structures

第 29 章 基于锁的并发数据结构

通过加锁可以使得数据结构变得线程安全... thread safe

如何实现加锁,保证高性能的多线程访问,实现并发访问 - concurrently

1. 并发计数器

计数器是最简单的一种数据结构,使用广泛...接口简单

1.1. 非并发的计数器

type def struct counter_t
{
    int value;
}counter_t;

void init(counter_t *c)
{
    c->value = 0;
}

void increment(counter_t *c)
{
    c->value++;
}

void decrement(counter_t *c)
{
    c->value--;
}

int get(counter_t *c)
{
    return c->value;
}

这样的计数器没有同步机制...线程不安全,下面加上锁!

1.2. 简单但无法扩展

type def struct counter_t
{
    int value;
    pthread_mutex_t lock;
}counter_t;

void init(counter_t *c)
{
    c->value = 0;
    Pthread_mutex_init(&c->lock, NULL);
}

void increment(counter_t *c)
{
    Pthread_mutex_lock(&c->lock);
    c->value++;
    Pthread_mutex_unlock(&c->lock);
}

void decrement(counter_t *c)
{
    Pthread_mutex_lock(&c->lock);
    c->value--;
    Pthread_mutex_unlock(&c->lock);
}

int get(counter_t *c)
{
    Pthread_mutex_lock(&c->lock);
    int rc = c->value;
    Pthread_mutex_unlock(&c->lock);
    return rc;
}

这只是加了把锁...没别的

在每次调用函数都会加锁,结束后释放...

现在,有了一个并发数据结构,问题就是性能了,如果这个结构运行太慢,处理简单加锁,还需要进行优化...

请注意:Note that if the data structure is not too slow, you are done! No need to do something fancy if something simple will work.

基准测试

完美扩展:perfect scaling 虽然工作量增多,但是并行执行后,完成任务的时间没有增加

1.3. 可拓展技术

懒惰计数器:sloppy counter

其实应该叫做...近似计数算法 approximate counter...中文版太辣了!

通过多个局部计数器和一个全局计数器来实现...每个局部计数器上有一个锁,全局计数器有一个。

基本思想:

The basic idea of approximate counting is as follows. When a thread running on a given core wishes to increment the counter, it increments its local counter; access to this local counter is synchronized via the corresponding local lock. Because each CPU has its own local counter, threads across CPUs can update local counters without contention, and thus updates to the counter are scalable.

typedef struct counter_t
{
  int global;
  pthread_mutex_t glock;
  int local[NUMCPUS];
  pthread_mutex_t llock[NUMCPUS];
  int threshold;
} counter_t;

void init(counter_t *c, int threshold)
{
  c->threshold = threshold;
  c->global = 0;
  pthread_mutex_init(&c->glock, NULL);

  int i;
  for (int i = 0; i < NUMCPUS; i++)
  {
    c->local[i] = 0;
    pthread_mutex_init(&c->llock[i], NULL);
  }
}

void update(counter_t *c, int threadID, int amt)
{
  pthread_mutex_lock(&c->llock[threadID]);
  c->local[threadID] += amt;
  if (c->local[threadID] >= c->threshold)
  {
    pthread_mutex_lock(&c->glock);
    c->global += c->local[threadID];
    pthread_mutex_unlock(&c->glock);
    c->local[threadID] = 0;
  }
  pthread_mutex_unlock(&c->llock[threadID]);
}

int get(counter_t *c)
{
  pthread_mutex_lock(&c->glock);
  int rc = c->global;
  pthread_mutex_unlock(&c->glock);
  return rc;
}

2. 并发链表

// basic node structure
typedef struct __node_t
{
  int key;
  struct __node_t *next;
} node_t;

// basic list structure (one used per list)
typedef struct __list_t
{
  node_t *head;
  pthread_mutex_t lock;
} list_t;

void List_Init(list_t *L)
{
  L->head = NULL;
  pthread_mutex_init(&L->lock, NULL);
}

// insert a new node at the head of the list
int List_Insert(list_t *L, int key)
{
  pthread_mutex_lock(&L->lock);
  node_t *new = malloc(sizeof(node_t));
  if (new == NULL)
  {
    perror("malloc");
    pthread_mutex_unlock(&L->lock);
    return -1; // fail
  }
  new->key = key;
  new->next = L->head;
  L->head = new;
  pthread_mutex_unlock(&L->lock);
  return 0; // success
}

// remove the first node with the given key from the list
int List_Remove(list_t *L, int key)
{
  pthread_mutex_lock(&L->lock);
  node_t *prev = NULL;
  node_t *curr = L->head;
  while (curr)
  {
    if (curr->key == key)
    {
      if (prev)
      {
        prev->next = curr->next;
      }
      else
      {
        L->head = curr->next;
      }
      free(curr);
      pthread_mutex_unlock(&L->lock);
      return 0; // success
    }
    prev = curr;
    curr = curr->next;
  }
  pthread_mutex_unlock(&L->lock);
  return -1; // failure
}

// check if a node with the given key is in the list
int List_Lookup(list_t *L, int key)
{
  pthread_mutex_lock(&L->lock);
  node_t *curr = L->head;
  while (curr)
  {
    if (curr->key == key)
    {
      pthread_mutex_unlock(&L->lock);
      return 0; // success
    }
    curr = curr->next;
  }
  pthread_mutex_unlock(&L->lock);
  return -1; // failure
}

上面的代码在 malloc 失败的时候会释放锁...容易产生错误

2.1. 重写链表

假定 malloc 无需线程安全...

void List_Init(list_t *L)
{
  L->head = NULL;
  pthread_mutex_init(&L->lock, NULL);
}

int List_Insert(list_t *L, int key)
{
  // malloc: synchronization not needed
  node_t *new = malloc(sizeof(node_t));
  if (new == NULL)
  {
    perror("malloc");
    return -1;
  }
  new->key = key;
  // just lock critical section
  pthread_mutex_lock(&L->lock);
  new->next = L->head;
  L->head = new;
  pthread_mutex_unlock(&L->lock);
  return 0; // success
}

int List_Lookup(list_t *L, int key)
{
  int rv = -1;
  pthread_mutex_lock(&L->lock);
  node_t *curr = L->head;
  while (curr)
  {
    if (curr->key == key)
    {
      rv = 0;
      break;
    }
    curr = curr->next;
  }
  pthread_mutex_unlock(&L->lock);
  return rv; // now both success and failure
}

这样的链表拓展性不强

2.2. 扩展链表

一个具有良好可扩展性的系统,能够在资源增加时线性或近似线性地提升性能,而不会因为资源增加而出现性能瓶颈或效率下降的问题。

这时候可以增加一个过手锁...hand-over-hand locking or 锁耦合 loack coupling

The idea is pretty simple. Instead of having a single lock for the entire list, you instead add a lock per node of the list. When traversing the list, the code first grabs the next node’s lock and then releases the current node’s lock (which inspires the name hand-over-hand).

更多并发不一定更快

并发机制过于复杂就会引入新的复杂度...

所以一般采用两种方案:

  1. 简单但少一点并发

  2. 复杂但多一点并发

通过性能 Test 来决定采用哪种方式...毕竟这无法作弊..

当心锁和控制流

注意控制流的变化导致函数返回和退出,或其他情况导致函数停止执行...

Because many functions will begin by acquiring a lock, allocating some memory, or doing other similar stateful operations, when errors arise, the code has to undo all of the state before returning, which is errorprone. Thus, it is best to structure code to minimize this pattern.

要在发生错误时 恢复各种状态,回到最初的样子~

3. 并发队列

为数据结构增加并发的方式:添加一把大锁

添加扩展性:每个小结点添加一把小锁

One trick used by Michael and Scott is to add a dummy node (allocated in the queue initialization code); this dummy enables the separation of head and tail operations. Study the code, or better yet, type it in, run it, and measure it, to understand how it works deeply.

队头锁 head + 队尾锁 tail...这样可以并发处理入队和出队问题

typedef struct __node_t {
    int value;
    struct __node_t *next;
} node_t;

typedef struct __queue_t {
    node_t *head;
    node_t *tail;
    pthread_mutex_t head_lock, tail_lock;
} queue_t;

void Queue_Init(queue_t *q) {
    node_t *tmp = malloc(sizeof(node_t));
    tmp->next = NULL;
    q->head = q->tail = tmp;
    pthread_mutex_init(&q->head_lock, NULL);
    pthread_mutex_init(&q->tail_lock, NULL);
}

// 队尾锁
void Queue_Enqueue(queue_t *q, int value) {
    node_t *tmp = malloc(sizeof(node_t));
    assert(tmp != NULL);
    tmp->value = value;
    tmp->next = NULL;

    pthread_mutex_lock(&q->tail_lock);
    q->tail->next = tmp;
    q->tail = tmp;
    pthread_mutex_unlock(&q->tail_lock);
}

// 队头锁
int Queue_Dequeue(queue_t *q, int *value) {
    pthread_mutex_lock(&q->head_lock);
    node_t *tmp = q->head;
    node_t *new_head = tmp->next;
    if (new_head == NULL) {
        pthread_mutex_unlock(&q->head_lock);
        return -1; // queue was empty
    }
    *value = new_head->value;
    q->head = new_head;
    pthread_mutex_unlock(&q->head_lock);
    free(tmp);
    return 0;
}

需要正确判断临界区(玩锁的一个秘诀)

4. 并发散列表

或者俗称的 Hash 表

#define BUCKETS (101)

typedef struct __hash_t
{
  list_t lists[BUCKETS];
} hash_t;

void Hash_Init(hash_t *H)
{
  int i;
  for (i = 0; i < BUCKETS; i++)
    List_Init(&H->lists[i]);
}

int Hash_Insert(hash_t *H, int key)
{
  return List_Insert(&H->lists[key % BUCKETS], key);
}

int Hash_Lookup(hash_t *H, int key)
{
  return List_Lookup(&H->lists[key % BUCKETS], key);
}

这里说的散列表是固定大小的...

而且这里使用的底层数据结构是之前实现的 并发链表...

所以每个散列的键值都支持键值(可拓展性 max)

避免不成熟的优化

人话:先做对且垃圾的事情,再考虑优化

5. 作业(编码作业)

In this homework, you’ll gain some experience with writing concurrent code and measuring its performance. Learning to build code that performs well is a critical skill and thus gaining a little experience here with it is quite worthwhile.

写一些并发程序和对其的性能进行研究...

Last updated