1. 概述

进程(process):是正在运行的程序实例,是系统资源分配和调度的基本单位,每个进程都有自己独立的地址空间、数据堆、全局变量和其他系统资源(如文件描述符)

  • 独立性:不同进程之间相互独立,互不影响
  • 隔离性:每个进程都有自己独立的地址空间,无法访问其他进程的内存数据
  • 开销大:进程切换需要保存和恢复很多状态信息

线程(thread):线程是进程内部的执行单元,也称为轻量级进程,一个进程可以包含多个线程,它们共享同一进程的内存空间和资源,但每个线程也有自己的栈和寄存器状态

  • 共享资源:多个线程共享所属同一进程的内存数据
  • 开销小:线程创建、销毁、切换、调度都比进程快得多
  • 并发性:线程可以在多核处理器上实现并行执行
不同之处PthreadsMPI
编程模型基于线程,一个进程内部创建多个线程并发执行基于进程,多个独立进程通过消息传递协同工作
内存模型共享内存,所有线程共享同一进程的地址空间分布式内存,每个进程拥有独立地址空间,数据通过消息传递交换
通信机制使用共享变量实现线程之间的数据共享使用显式的消息传递函数进行数据交换
启动方式由主线程显式创建多个线程通过 mpirun/mpiexec 等外部脚本工具启动多个进程

指令

  • 编译源文件:$ gcc -pthread program.c -o program
  • 运行可执行文件:$ ./program num_of_threads

2. 程序结构

2.1 派生与合并

pthread_t:用于标识线程的数据类型,它是一个不透明对象,即系统级代码无法直接访问到里面的数据,只有操作系统能够访问

pthread_create:用于创建新线程,每次调用都会启动一个独立的线程执行流,使得程序能够并发执行多个任务

  • pthread_t* thread:指向线程标识符的指针,新创建的线程 ID 会保存在这里,需要提前为 pthread_t 对象分配内存空间
  • const pthread_attr_t* attr_p:指向线程属性对象的指针,用于设置线程的属性(栈大小、调度策略等),传递 NULL 则使用默认属性
  • void* (*start_routine)(*void):指向线程启动函数的指针,新线程会从这个函数开始执行,要求该函数必须接受一个 void* 类型的参数,同时返回一个 void* 类型的结果
  • void* arg_p:传递给启动函数的参数,如果不需要传递数据则传递 NULL

pthread_join:用于阻塞当前调用线程,直到指定目标线程结束,常用于主线程确保其他线程执行完毕

  • pthread_t thread:要等待的线程的标识符
  • void** ret_val_p:接收线程退出的返回值,如果不关心返回值则传递 NULL

派生/合并成果则返回 0,失败则返回非零错误码

2.2 Hello 程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>

// 全局变量,存储线程数量
int thread_count;

// 启动函数
void* Hello(void* rank) {
long my_rank = (long)rank;
printf("hello from thread %ld of %d!\n", my_rank, thread_count);
return NULL;
}

// argv[1]记录了传递的线程数量
int main(int argc, char* argv[]) {
// 获取线程数量,为 pthread_t 分配内存空间
pthread_t* thread_handles;
thread_count = strtol(argv[1], NULL, 10);
thread_handles = malloc(thread_count * sizeof(pthread_t));

// 派生线程,传递 thread 作为线程编号
long thread;
for (thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, Hello, (void*)thread);

// 主线程的输出
printf("hello from the main thread!\n");

// 合并线程,即主线程等待全部其他线程执行完毕
for (thread = 0; thread < thread_count; thread++)
pthread_join(thread_handles[thread], NULL);

// 释放分配空间
free(thread_handles);
return 0;
}

2.3 矩阵-向量乘法程序

假设是 mxn 维的矩阵 A 和 nx1 维的向量 x,有 t 个线程(可以被 m 整除),则第 k 个线程负责矩阵 A 的k×mtk \times \frac{m}{t} 行到(k+1)×mt1(k+1) \times \frac{m}{t} - 1行,此外假设 A,x,y,m,n 都是全局变量

1
2
3
4
5
6
7
8
9
10
11
12
void* Pth_mat_vec(void* rank) {
long my_rank = (long)rank;
local_m = m / thread_count;
int first_row = rank * local_m;
int last_row = (rank + 1) * local_m - 1;
for (int i = first_row; i <= last_row; i++) {
y[i] = 0.0;
for (int j = 0; j < n; j++)
y[i] += A[i][j] * x[j];
}
return NULL;
}

3. 竞争条件

3.1 临界区

竞争条件:当多个线程都要访问共享资源时,如果至少其中一个访问是更新操作,其执行顺序的不确定性可能导致最终结果错误

临界区:一个更新共享资源的代码段,一次只允许一个线程指向该代码段,从而防止竞争条件

利用公式π=113+1517++(1)n12n+1+\pi = 1 - \frac{1}{3} + \frac{1}{5} - \frac{1}{7} + \ldots + (-1)^n\frac{1}{2n+1} + \ldots估算圆周率,假设用 n 个项和 t 个线程来估计,则第 k 个线程负责计算k×n/tk \times n / t(k+1)×n/t1(k+1) \times n / t - 1的和

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void* calPi(void* rank) {
long my_rank = (long)rank;
long local_n = n / thread_count;
long first_index = my_rank * local_n;
long last_index = (my_rank + 1) * local_n - 1;

double sign;
if (first_index % 2 == 0)
sign = 1.0;
else
sign = -1.0;

double local_sum = 0.0;
for (long i = first_index; i <= last_index; i++)
local_sum += sign / (2 * i + 1);

pi += local_sum;

return NULL;
}

存在以下问题:不同线程都对共享变量 local_sum 进行更新,导致先写的值被后写的值覆盖了,可以发现使用两个线程的结果具有较大偏差


3.2 忙等待

忙等待:线程在进入临界区前循环检查条件,一直等到条件满足才能进入临界区

通常来说是设置一个 int 类型标志共享变量 flag,主线程将其初始化为0,线程进入临界区前判断 flag 是否等于自己的编号,离开临界区后对 flag 进行更新,以解除某个其他线程的忙等待

1
2
3
while (flag != my_rank);
pi += local_sum;
flag = (flag + 1) % thread_count;

存在问题

  1. 编译优化可能会改变指令的书写顺序,导致忙等待失效
  2. 处于忙等待的线程实际上一直在运行着,会一直占用 CPU 资源,浪费 CPU 周期
  3. 线程不停地在等待和运行之间切换,造成较大开销,导致程序性能下降

3.3 互斥量

互斥量(mutex):一个特殊类型的变量,通过某些特定函数,可以限制每次只有一个线程进入临界区,防止竞争条件的发生

  • int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr):传递一个指向 pthread_mutex_t 类型的互斥量对象进行初始化,第二个参数通常传递 NULL 表示使用默认属性
  • int pthread_mutex_destroy(pthread_mutex_t *mutex):释放指向 pthread_mutex_t 类型的互斥量对象占用的资源
  • int pthread_mutex_lock(pthread_mutex_t *mutex):用于加锁一个互斥量,确保进入临界区的代码同一时间只有一个线程执行
  • int pthread_mutex_unlock(pthread_mutex_t *mutex):用于解锁一个互斥量,释放之前加锁的互斥量,从而允许其他线程进入临界区

通常在程序开始的时候初始化互斥量,然后再创建其他线程使用,最后由主进程对其进行销毁

1
2
3
pthread_mutex_lock(&mutex);
pi += local_sum;
pthread_mutex_unlock(&mutex);
区别忙等待互斥量
线程个数多于核的个数每个忙等待线程会持续占用 CPU,不断轮询检查条件,导致 CPU 资源被大量浪费,同时也干扰其他线程的执行,整体性能显著下降等待线程会被挂起,不会占用 CPU 时间,操作系统可以更有效地调度其他就绪线程,从而保持较高的整体性能
线程进入临界区顺序可以满足特定顺序地进入临界区,但无法做到任意顺序线程进入临界区是随机的、无法预测的,由操作系统决定

3.4 信号量

信号量(semaphore):本质是一个整型计数器,通过对其执行“减一”和“加一”操作,从而控制对资源或事件的访问数量

  • 二元信号量:实现互斥,值只有 0 或 1,等效于互斥锁
  • 计数信号量:实现生产者-消费者模型,值可以大于 1,表示可以同时允许多个线程访问资源,或表示某个资源剩余的可用量

互斥量侧重互斥,用于保证同一时刻只有一个线程进入临界区,适用于“锁”的场景
信号量侧重计数,用于资源数量或事件的同步控制

常用函数定义在头文件 <semaphore.h>

  • windows 系统
    • int sem_init(sem_t *sem, int pshared, unsigned int value):初始化信号量,pshared 0/1表示信号量用于线程/进程间共享,value 指示信号量的初始值
    • int sem_destroy(sem_t *sem):销毁信号量,释放相关资源
  • mac 系统
    • sem_t *sem_open(const char *name, int oflag, mode_t mode, unsigned int value)创建和打开一个命名信号量,name 是以“/”开头信号量名称,oflag 控制着信号量的创建和行为(通常为O_CREAT),mode 指定权限(通常为 0644),value 指定初始值
    • int sem_close(sem_t *sem):关闭命名信号量,释放当前进程对该信号量的引用
    • int sem_unlink(const char *name):删除该信号量,释放资源
  • 通用
    • int sem_wait(sem_t *sem):如果信号量的值大于0,则减 1 并立即返回,若信号量的值为 0,则被阻塞挂起
    • int sem_post(sem_t *sem):对信号量的值加 1,如果有线程被阻塞挂起,则唤醒其中一个

线程 0 发送消息给其他线程,其他线程接收消息并输出,其他线程都输出完后,线程 0 才输出结束信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>

// 全局变量
int thread_count;
char** messages;
sem_t** semaphores;

// 传递消息
void* send_msg(void* rank) {
long my_rank = (long)rank;
if (my_rank == 0) {
// 线程 0 发送消息给其他线程,并唤醒阻塞线程
for (int i = 1; i < thread_count; i++) {
char* msg = malloc(20 * sizeof(char));
sprintf(msg, "hello from thread %ld to %d!", my_rank, i);
messages[i] = msg;
sem_post(semaphores[i]);
}
// 等待所有其他线程完成接收和输出后通知线程 0
for (int i = 1; i < thread_count; i++)
sem_wait(semaphores[0]);
printf("thread %ld has determined that other threads received the message!\n", my_rank);
}
else {
// 非线程 0 先等待收到自己的消息通知
sem_wait(semaphores[my_rank]);
printf("thread %ld get message: %s\n", my_rank, messages[my_rank]);
// 打印完成后通知线程 0
sem_post(semaphores[0]);
}
return NULL;
}

// argv[1]记录了传递的线程数量
int main(int argc, char* argv[]) {
pthread_t* thread_handles;
thread_count = strtol(argv[1], NULL, 10);
thread_handles = malloc(thread_count * sizeof(pthread_t));

// 为消息和信号量分配内存空间
messages = malloc(thread_count * sizeof(char*));
semaphores = malloc(thread_count * sizeof(sem_t*));

long thread;
char sem_name[32];

// 初始化消息数组和创建命名信号量
for (thread = 0; thread < thread_count; thread++) {
messages[thread] = NULL;
sprintf(sem_name, "/mysem_%ld", thread);
semaphores[thread] = sem_open(sem_name, O_CREAT, 0644, 0);
}

// 创建线程,传递线程编号
for (thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread], NULL, send_msg, (void*)thread);

// 等待所有线程结束
for (thread = 0; thread < thread_count; thread++)
pthread_join(thread_handles[thread], NULL);

// 释放内存并关闭和删除信号量
for (thread = 0; thread < thread_count; thread++){
free(messages[thread]);
sem_close(semaphores[thread]);
sprintf(sem_name, "/mysem_%ld", thread);
sem_unlink(sem_name);
}
free(thread_handles);
free(messages);
free(semaphores);
return 0;
}

4. 路障

4.1 利用忙等待和互斥量实现

使用一个由互斥量 barrier_mutex 保护的共享计数器 counter,当计数器的值表面每个线程都已经进入临界区时,结束忙等待

1
2
3
4
5
6
7
8
9
10
11
int counter = 0;
pthread_mutex_t barrier_mutex;

...

void* Barrier(...) {
pthread_mutex_lock(&barrier_mutex);
counter++;
pthread_mutex_unlock(&barrier_mutex);
while (counter < thread_count);
}

无法重用 counter 实现第 2 个路障:某些线程可能已经进入第 2 个路障,但某些线程可能刚刚执行 counter = 0,导致计数错误

4.2 信号量实现

利用两个信号量,counter_sem 用于保护计数器,barrier_sem 用于阻塞已经进入路障的线程,当计数器的值小于线程总数时,阻塞当前线程,当最后一个线程即将执行完毕时,释放全部被阻塞的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
int counter = 0;
sem_t counter_sem; // 初始值为 1
sem_t barrier_sem; // 初始值为 0

...

void* Barrier(...) {
sem_wait(&counter_sem);
if (counter == thread_count - 1) {
// 计数器归零
counter = 0;
sem_post(&counter_sem);
// 释放全部被阻塞线程
for (int i = 0; i < thread_count - 1; i++)
sem_post(&barrier_sem);
}
else {
counter++;
sem_post(&counter_sem);
// 阻塞当前线程
sem_wait(&barrier_sem);
}
}

无法重用 barrier_sem 实现第 2 个路障:某些线程可能已经进入第 2 个路障等待 barrier_sem,但是上一个路障中最后一个线程仍然在执行 sem_post(&barrier_sem),导致信号量作用域发生混乱

4.3 条件变量实现

条件变量:用于让线程阻塞以等待某个条件的发生,并在条件满足时被唤醒,通常与互斥量配合使用

  • int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr):初始化条件变量,通常第二个参数传 NULL 表示使用默认属性
  • int pthread_cond_destroy(pthread_cond_t *cond);:销毁条件变量,释放相关资源
  • int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex):阻塞等待条件变量 cond 的信号
  • int pthread_cond_signal(pthread_cond_t *cond):唤醒至少一个等待在条件变量 cond 上的线程
  • int pthread_cond_broadcast(pthread_cond_t *cond):唤醒所有等待在条件变量 cond 上的线程

pthread_cond_wait:调用该函数必须要拥有 mutex,否则会出现错误,线程被挂起后会自动释放mutex,如果线程被唤醒,又会重新拥有 mutex,实际上等价于下述环节

1
2
3
4
5
pthread_mutex_lock(&mutex_p);
block_thread(&cond);
pthread_mutex_unlock(&mutex_p);
wait_on_signal(&cond);
pthread_mutex_lock(&mutex_p);

因为存在其他函数或事件可以将挂起的线程解锁,因此 pthread_cond_wait 通常被放在 while 中使用,如果返回值不是 0,则需要再次执行该函数

条件变量的好处在于不需要像信号量那样管理一个计数器,只需要关注一个确切的条件就可以实现全部唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond;

...

void* Barrier(...) {
pthread_mutex_lock(&mutex);
counter++;
if (counter == thread_count) {
counter = 0;
pthread_cond_broadcast(&cond);
}
else
while (pthread_cond_wait(&cond, &mutex) != 0);
pthread_mutex_unlock(&mutex);
}

5. 读写锁

5.1 多线程链表

存在问题:当多个线程同时访问链表且至少有一个线程正在执行 Insert 或 Delete 操作,则会导致程序不安全

  • 某个线程读取链表中某个节点的值,但是再返回前该节点被其他线程删除,导致访问内存违规
  • 某个线程向链表中插入节点,但是在执行插入操作时前节点或后节点被其他线程删除,导致访问内存违规
方法流程优势局限性
对链表上锁所有对链表的操作都先获得锁才能执行实现简单并发性差,所有线程争用同一把锁
对节点上锁为链表中的每个节点分配独立的锁变量成员,在操作时仅对涉及的节点加锁细粒度增加存储开销,可能导致更多的锁等待延迟
对操作上锁针对具体操作(读和写操作)在操作期间对需要保护的区域加锁高并发性设计和实现较复杂

下图分析

  1. 在读多写少的情况下,读写锁能够充分发挥并行的优势
  2. 随着写操作的比重上升,写锁会更频繁地排他占用链表,导致并行性下降
  3. 无论哪种锁方案,写操作增多时,线程数增加都会导致更明显的争用和排他锁等待,性能增长受限
  4. 给节点进行加锁和解锁的开销太大,始终是最低效的

5.2 读写锁

读锁:当一个线程以读锁的方式加锁时,其他线程也可以获取读锁,因此又称为共享锁

写锁:当一个线程以写锁的方式加锁时,其他线程既不能获得写锁也不能获得读锁,因此又称为独占锁

读写锁使用类型 pthread_rwlock_t 表示

  • int pthread_rwlock_init(pthread_rwlock_t *rwlock, const pthread_rwlockattr_t *attr):初始化读写锁,attr 为 NULL 时使用默认属性
  • int pthread_rwlock_destroy(pthread_rwlock_t *rwlock):销毁读写锁,释放相关资源
  • int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock):获取读锁,如果有线程持有写锁,则阻塞等待
  • int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock):获取写锁,如果有线程持有读锁或写锁,则阻塞等待
  • int pthread_rwlock_unlock(pthread_rwlock_t *rwlock):释放读锁或写锁

因为写操作通常较难获得锁,会导致“写饥饿”问题,因此解锁时系统可能更偏向给予等待写锁的线程优先权

6. 缓存

局部性原理:如果一个处理器在时间 t 访问内存位置 a,那么很可能它在一个接近 t 的时间访问接近 a 的内存位置

每个核都有自己的缓存:在设计多线程算法时,需要注意数据布局,使每个线程主要操作其私有数据或尽量避免对共享数据的频繁写入,减少缓存冲突,线程就能更高效地利用 CPU 缓存,减少主内存访问延迟

考虑矩阵-向量乘法y=Axy = Ax,且第 k 个线程负责计算矩阵第 m / k 到 (m+1) /k 行的部分

  • 矩阵维数为8000000×88000000 \times 8 (高矩阵)时:y 的规模导致频繁写缺失,因缓存无法容纳所有待写入位置
  • 矩阵维数为8×80000008 \times 8000000 (宽矩阵)时:x 的规模导致频繁读缺失,因数据量远超缓存容量

伪共享:如果多个独立变量恰好存储在同一缓存行中,一个线程写操作会使得这条缓存行在其他核的副本失效,即使其他线程只想读或写与之无关的变量,也会受到影响

7. 线程安全的

线程安全的:多个线程并发执行同一代码时,不会导致数据竞态、数据损坏或其他未定义行为

char *strtok(char *str, const char *delim):第一次调用传入待分割的字符串 str,后续传入 NULL 表示继续分割上一次的字符串,始终传入分隔符 delim,返回指向被分割字符串的起始位置的指针

strtok 不是线程安全的,因为他使用一个静态指针来保存分割位置,当多个线程同时调用 strtok 时,这个内部状态会被共享和修改,可能导致数据混乱或竞态条件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 分词函数
void* split(void* rank) {
long my_rank = (long)rank;
char* word;

pthread_mutex_lock(&mutex);
// 线程 0 读取所有句子
if (my_rank == 0) {
for (int i = 0; i < thread_count; i++) {
char* buf = malloc(MAX_LEN * sizeof(char));
fgets(buf, MAX_LEN, stdin);
size_t len = strlen(buf);
buf[len - 1] = '\0';
sentences[i] = buf;
}
pthread_cond_broadcast(&cond);
}
// 其他线程等待直到自己的句子不为 NULL
else
while (sentences[my_rank] == NULL)
pthread_cond_wait(&cond, &mutex);

// 释放获得的锁
pthread_mutex_unlock(&mutex);

printf("Thread %ld > my sentence = %s\n", my_rank, sentences[my_rank]);

int count = 1;
word = strtok(sentences[my_rank], " ");
while (word != NULL) {
printf("Thread %ld > word %d = %s\n", my_rank, count, word);
word = strtok(NULL, " ");
count++;
}
return NULL;
}