1. 概述

OpenMP:针对共享内存系统进行并行编程的 API,通过在源代码中插入编译指令,编译器根据程序中添加的 #pragma 指令,自动将程序并行处理

  • 头文件是 <omp.h>
  • 编译终端指令为 gcc -fopenmp -o program program.c
区别OpenMPPthreads
抽象层次高层次,基于编译器指令,隐藏线程细节低层次,需要显式创建、管理线程和同步
使用方式加入 #pragma 编译指令标记并行区域直接调用 pthread_h 库中的函数
控制粒度调度由运行时系统自动处理可实现自定义调度和同步策略

不是所有编译器都支持 OpenMP,需要添加以下检查代码

1
2
3
#ifdef _OPENMP
#include <omp.h>
#endif

2. 程序结构

2.1 编译指令

结构化代码块:由编译指令 #pragma omp parallel 标识,由花括号括起来

应该将花括号放在新的一行,而不是与指令同一行!

fork-join 的执行模式:主线程在串行执行过程中,当遇到需要进行并行区域时,派生出线程来执行并行任务,并等待全部派生线程执行完合并后,回到主线程继续进行串行执行

子句:添加在编译指令后,用来控制并行区域的行为、数据共享和任务调度等,如 num_threads(thread_count) 用于指定线程数

基本函数

  • int omp_get_thread_num(void):获得当前线程在线程组中的编号(从 0 开始,且主线程是 0)
  • int omp_get_num_threads(void):获得当前并行区域中线程的总数
  • int omp_in_parallel(void):判断当前是否处于并行区域
  • int omp_get_max_threads(void):获取当前环境中能使用的最大线程数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <stdio.h>
#include <stdlib.h>
#include <omp.h>

void Hallo(void) {
int my_rank = omp_get_thread_num();
int thread_count = omp_get_num_threads();

printf("Hello from thread %d of %d\n", my_rank, thread_count);
}

int main(int argc, char* argv[]) {
int thread_count = strtol(argv[1], NULL, 10);

# pragma omp parallel num_threads(thread_count)
{
Hallo();
}
return 0;
}

2.2 归约子句

归约子句:用于并行循环中对共享变量进行归约操作,即将各线程的局部副本按照指定的运算合并到一个全局变量中,格式为 #pragma omp parallel for reduction(operator: variable)

对每个线程的结果求和:可以避免对共享变量更新的竞争条件

1
2
3
4
5
  int global_sum = 0;
# pragma omp parallel reduction(+: global_sum)
{
global_sum += local_trap(a, b);
}

2.3 parallel for

parallel for:生成一组线程来执行紧跟着的 for 循环结构化代码块,由系统自动在线程间划分循环迭代来实现并行化

  • 不能无限循环
  • 不能含有跳出语句,如 break 或 return
  • 循环变量必须是整数类型或指针类型
  • 循环变量只能被 for 语句中的增量表达式修改
  • 循环表达式不能被更改
  • 循环迭代间应该相互独立,不存在数据/循环依赖

parallel for 之后紧跟 for 循环,不需要用花括号括住

计算斐波那契数存在循环依赖,结果是不可预测的

1
2
3
4
  fibo[0] = fibo[1] = 1;
# pragma omp parallel for num_threads(thread_count)
for (i = 2; i < n; i++)
fibo[i] = fibo[i-1] + fibo[i-2];

OpenMP 允许在 parallel for 中使用 exit,即跳出整个程序,立即终止所有线程,但是这会使得并行环境中的资源释放和状态恢复无法正确进行,从而带来难以预测的后果,因此这种做法是不合理的也不安全的

2.4 parallel omp for

#pragma omp for:在一个已经存在的并行区域内部使用,用于将一个 for 循环的迭代划分给多个线程并行执行

内循环负责创建线程团队,并在最后销毁:造成大量开销

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  for (phase = 0; phase < n; phase++) {
if (phase % 2 == 0)
# pragma omp parallel for num_threads(thread_count) default(none) shared(a, n) private(i, tmp)
for (i = 1; i < n; i += 2) {
if (a[i-1] > a[i]) {
tmp = a[i-1];
a[i-1] = a[i];
a[i] = tmp;
}
}
else
# pragma omp parallel for num_threads(thread_count) default(none) shared(a, n) private(i, tmp)
for (i = 1; i < n; i += 2) {
if (a[i] > a[i+1]) {
tmp = a[i+1];
a[i+1] = a[i];
a[i] = tmp;
}
}
}

外循环负责创建线程团队,内循环反复利用这个线程团队:尽管外层循环的每个迭代是分配给不同线程执行的,但当这些线程遇到内层的 #pragma omp for 时,整个线程团队都会“帮忙”共同执行这个内层循环

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# pragma omp parallel for num_threads(thread_count) default(none) shared(a, n) private(i, tmp, phase)
for (phase = 0; phase < n; phase++) {
if (phase % 2 == 0)
# pragma omp for
for (i = 1; i < n; i += 2)
if (a[i-1] > a[i]) {
tmp = a[i-1];
a[i-1] = a[i];
a[i] = tmp;
}
else
# pragma omp for
for (i = 1; i < n; i += 2)
if (a[i+1] > a[i]) {
tmp = a[i+1];
a[i+1] = a[i];
a[i] = tmp;
}
}

2.5 作用域

shared():用于显式声明变量是共享作用域,适用于不会更改或者进行归约操作的变量

private():用于显式声明变量是私有作用域,适用于每个线程都不一样且互不影响的变量

default():指定每个变量的默认作用域,如果是 none 则要求必须显式声明每个变量是 shared 还是 private

估计圆周率时,符号应该是独立私有的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  double factor;
double sum = 0.0;
int n = 10000;
int i;

# pragma omp parallel for num_threads(thread_count) \
reduction(+: sum) \
default(none) private(i, factor) shared(n)
for (i = 0; i < n; i++) {
if (i % 2 == 0)
factor = 1.0;
else
factor = -1.0;
sum += factor / (2 * i + 1);
}
printf("pi = %lf\n", sum * 4);
return 0;

3. 调度方式

schedule(<type>, [<chunksize]):schedule子句用来控制并行 for 循环中迭代任务如何分配给各个线程

方式描述性质
default分成 n / thread_count 个块分配给线程本质是自动计算块大小的 static
static将所有迭代按照固定的块大小和顺序分配给各个线程适用于迭代负载均衡的场景
dynamic动态分配迭代块,线程完成一个块后从剩余迭代中动态获取新的块适用于负载不均衡的场景
guided起始块较大,随后块大小逐渐减小至用户指定的最小值综合降低调度开销和负载均衡问题
auto让编译器或运行时自动选择最合适的调度策略完全依赖于实现,不易预知具体行为
runtime调度方式由运行时环境变量 OMP_SCHEDULE 指定可在运行时调整,便于调试和性能调优

假设有 12 个迭代和 3 个线程

schedule(static, 2)

  • Thread0:0,1,6,7
  • Thread1:2,3,8,9
  • Thread2:4,5,10,11

schedule(static, 4):相当于缺省的默认调度

  • Thread0:0,1,2,3
  • Thread1:4,5,6,7
  • Thread2:8,9,10,11

调度选择

  • 如果循环的每次迭代需要几乎相同的计算量,那么可能默认的调度方式能提供最好的性能
  • 如果随着循环的进行,迭代的计算量线性递增,那么采用比较小的 chunksize 的 static 调度可能会提供最好的性能
  • 如果每次迭代的开销无法确定,那么应当使用 schedule(runtime),通过赋予环境变量 OMP_SCHEDULE 不同的值来比较不同调度策略下程序的性能

4. 互斥机制

4.1 atomic

用于对共享变量执行简单的更新操作,即只能保护由一条 C 语言原子赋值操作所形成的临界区,例如 x <op>= <expression>x++++xx----x

  • expression 不能引用 x
  • 只有 x 的装载和存储可以确保是受保护的,例如 x += y++ 中的 y 的更新不受保护

4.2 critical

用于标记代码块临界区,同一时间只能有一个线程能够执行该区域内的代码块,从而保证对共享数据的互斥访问

  • 简单直观,适用于较大或复杂的代码块保护
  • 线程级别的互斥,降低程序并行度,不利于程序的性能
  • 代码级别的互斥,无法实现不同线程利用相同代码实现不同操作

临界区命名:不同名称的 critical 块则使用不同的锁,不会相互排斥

1
2
3
4
# pragma omp critical(name)
{
// 临界区
}

4.3 omp_lock

通过 omp_lock_t 类型的锁和相关 API 进行互斥控制,适用于互斥的是某个数据结构而不是代码块

  • void omp_init_lock(omp_lock_t *lock):初始化锁
  • void omp_set_lock(omp_lock_t *lock):获得锁,如果锁已经被占用,则等待
  • void omp_unset_lock(omp_lock_t *lock):释放锁
  • void omp_destroy_lock(omp_lock_t *lock):销毁锁,释放相关资源

4.4 消息队列的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#include <stdio.h>
#include <stdlib.h>
#include <stdbool.h>
#include <omp.h>

// 队列的最大消息数量和每条消息的最大长度
const int MSGNUM = 100;
const int MSGLEN = 20;

// 队列结构体:锁、消息数组、头/尾指针、入/出队计数器
typedef struct{
omp_lock_t lock;
char** message;
int head;
int tail;
int en_num;
int de_num;
} Queue;

// 全局消息队列数组,每个线程对应一个队列
Queue** msg_queue;

// 将一条消息加入队列尾部
void Enqueue(Queue* q, char* msg) {
q->message[q->tail] = msg;
q->tail = (q->tail + 1) % MSGNUM;
q->en_num += 1;
}

// 将一条消息从队列头部取出
void Dequeue(Queue* q) {
if (q->message[q->head] == NULL)
return;
printf("%s", q->message[q->head]);
free(q->message[q->head]);
q->head = (q->head + 1) % MSGNUM;
q->de_num += 1;
}

// 向随机目的地线程的队列发送一条消息
void send_msg(int my_rank, int thread_count) {
int dest = random() % thread_count;
char* msg = malloc(MSGLEN * sizeof(char));
sprintf(msg, "hello from thread %d to %d!\n", my_rank, dest);
Queue* q = msg_queue[dest];
omp_set_lock(&q->lock);
Enqueue(q, msg);
omp_unset_lock(&q->lock);
}

// 从当前线程对应的队列中取出一条消息并打印
void recv_msg(int my_rank) {
Queue* q = msg_queue[my_rank];
omp_set_lock(&q->lock);
Dequeue(q);
omp_unset_lock(&q->lock);
}

// 判断全部线程是否发送完消息
bool Done(int my_rank, int thread_count, int counter) {
Queue* q = msg_queue[my_rank];
int msg_size = q->en_num - q->de_num;
if (msg_size == 0 && counter == thread_count)
return true;
else
return false;
}

int main(int argc, char* argv[]) {
int counter = 0;
int thread_count = strtol(argv[1], NULL, 10);
msg_queue = malloc(thread_count * sizeof(Queue*));

// 开启 OpenMP 并行区域,线程数为 thread_count,counter 为共享变量
# pragma omp parallel num_threads(thread_count) shared(counter)
{
int my_rank = omp_get_thread_num();
int thread_count = omp_get_num_threads();

// 每个线程创建一个队列,并初始化其各项属性
Queue* q = malloc(sizeof(Queue));
q->head = 0;
q->tail = 0;
q->en_num = 0;
q->de_num = 0;
q->message = malloc(MSGNUM * sizeof(char*));
for (int i = 0; i < MSGNUM; i++)
q->message[i] = NULL;
omp_init_lock(&q->lock);
msg_queue[my_rank] = q;

// 屏障,确保所有线程完成队列的初始化
# pragma omp barrier

// 每个线程执行发送与接收操作
for (int i = 0; i < thread_count; i++) {
send_msg(my_rank, thread_count);
recv_msg(my_rank);
}

// 更新 counter(原子操作确保线程安全)
# pragma omp atomic
counter += 1;

// 当线程还未完成所有消息处理时,继续接收消息
while (!Done(my_rank, thread_count, counter))
recv_msg(my_rank);
printf("thread %d receive all the messages!\n", my_rank);
}

// 释放各线程创建的队列及其内部资源
for (int i = 0; i < thread_count; i++) {
omp_destroy_lock(&msg_queue[i]->lock);
free(msg_queue[i]->message);
free(msg_queue[i]);
}
free(msg_queue);

return 0;
}