1. 基本结构

1.1 终端命令

消息传递接口(Message-Passing Interface, MPI):定义了一种可以被 C 和 C++ 程序调用的函数库,而不是一种新的语言,它主要用于分布式内存系统中进程之间的通信,通过显式地发送和接收消息来实现数据交换。

mpi.h:包含了所有 MPI 函数的声明和常量定义,在编写 MPI 程序时,需要在源代码中通过 #include <mpi.h> 引入该头文件

mpicc:是 MPI 提供的 C 语言编译器包装器,封装了调用系统中标准编译器(如 gcc 或 clang)的过程,并自动添加 MPI 头文件和库的搜索路径,编译 MPI 程序时通常使用 mpicc -o my_program my_program.c

mpiexec:用于启动 MPI 程序,负责在一个或多个节点上启动多个 MPI 进程,并建立进程间的通信环境,运行 MPI 程序时通常使用 mpiexec -n 4 ./my_program

1.2 基本框架

初始化 MPI 环境:当程序不适用参数时,可以简单设置为 NULL,函数返回程序错误码,在这之前不应该调用任何 MPI 函数

1
2
3
4
int MPI_Init(
int* argc_p,
char*** argv_p
);

关闭 MPI 环境并释放资源:在这之后不应该调用任何 MPI 函数

1
int MPI_Finalize(void);

1.3 通信子

通信子(communicator):是 MPI 中管理和组织进程通信的基本单位,用于定义了一个通信范围=进程组+通信上下文

  • 进程组:明确了参与该通信的所有进程,只有属于同一通信子的进程才能直接进行通信
  • 通信上下文:保证消息隔离,确保了不同通信子之间的消息不会互相干扰,即使消息的标签、数据类型等相同

MPI_COMM_WORLD:默认通信子,包括了启动程序时的所有进程

通信子操作

  1. MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);:复制一个已有的通信子,复制后的通信子拥有相同的进程组,但会获得一个新的上下文,从而保证与原通信子中的消息互不干扰
  2. MPI_Comm_split(MPI_COMM_WORLD, color, key, &new_comm);:根据提供的**颜色(color)和关键字(key)**参数,将一个通信子划分为多个新的通信子
  3. MPI_Comm_create(MPI_COMM_WORLD, group, &new_comm);:根据一个进程组创建一个新的通信子,只包含指定组中的进程
  4. MPI_Comm_free(&comm);:释放通信子占用的资源
  5. int MPI_Comm_size(MPI_Comm comm, int *size);:获取指定通信子中包含的进程总数
  6. int MPI_Comm_rank(MPI_Comm comm, int *rank);:获取当前进程在指定通信子中的编号,编号范围通常从 0 到 size - 1

1.4 SPMD 模式

单程序多数据模式(Single Program Multiple Data, SPMD):是一种常见的并行编程范式,其基本思想是所有参与并行计算的进程或线程都运行同一份程序代码,但每个进程或线程根据自己的标识处理不同的数据子集或执行不同的任务

  • 灵活的数据分配:通过进程标识,可以灵活地将大数据集划分为多个子集,并由不同进程独立处理
  • 维护简单:所有参与计算的进程运行同一份代码
  • 良好的扩展性:只需调整数据划分或通信策略,程序整体结构基本不变

通常来说,利用if-else结构来区分主进程负责数据汇总、调度和 I/O 操作,而其他进程 则负责具体的计算或数据处理工作

1.5 MPI_Send 和 MPI_Recv

int MPI_Send(const void *msg_buf_p, int msg_size, MPI_Datatype msg_type, int dest, int tag, MPI_Comm comm);:将数据从发送进程传送给目标进程,是阻塞式的即当且仅当数据已被复制到内部缓冲区或目标进程已开始接收数据才返回

  • msg_buf_p:指向待发送数据的缓冲区的指针
  • msg_size:待发送数据中元素的数量
  • msg_type:数据类型
  • dest:目标进程在指定通信子中的编号
  • tag:消息标签,用于标识消息
  • comm:通信子

int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, MPI_Status *status);:用于接收来自其他进程发送的数据,是阻塞式的即当且仅当数据完全接收到并放入指定缓冲区后才返回

  • msg_buf_p:指向接收数据的缓冲区的指针
  • buf_size:接收数据中最大允许存储的元素数量
  • buf_type:数据类型
  • tag:消息标签,用于标识消息
  • comm:通信子
  • status_p:返回接收状态信息的结构体指针,可以用来查询实际接收的数据量、来源、标签等

匹配条件

  1. 通信子相同:处于同一通信域内
  2. 消息标签相同:发送操作使用的标签必须与接收操作中指定的标签相等
  3. 消息标识匹配:接收操作中指定的 source 和发送操作中的 dest 必须对应
  4. 数据类型一致:发送的数据类型 msg_type 必须与接收端缓冲区预期的数据类型 buf_type 一致
  5. 消息大小不超过接收缓冲区:发送的数据大小 msg_size 必须小于等于接收端缓冲区的大小 buf_size

通配符:特殊关键字 MPI_ANY_SOURCEMPI_ANY_TAG,使得接收端可以接收任意来源和标签的消息

MPI_Status:作为一个结构体,是 MPI_Recv 函数中的最后一个参数,当使用通配符时,通过 status 参数可以查明实际匹配到的发送进程和消息标签,具有数据成员 MPI_SOURCE、MPI_TAG 和 MPI_ERROR

阻塞的影响

  • 消息是不可超越的,即对于同一个发送者向同一个接收者发送的消息,如果它们使用相同的通信子、标签和匹配条件,那么接收者必定以发送顺序接收到这些消息,而不会出现后发送的消息先到的情况
  • 进程悬挂,即如果进程调用了 MPI_Recv 但没有匹配的 MPI_Send,或者调用了 MPI_Send 但没有匹配的 MPI_Recv,则进程会被无限阻塞

2. 用 MPI 实现梯形积分法

2.1 伪代码

梯形面积和 =h[f(x0)/2+f(x1)++f(xn1+f(xn)/2)]h[f(x_0) / 2 + f(x_1) + \ldots + f(x_{n-1} + f(x_n) / 2)]

串行程序伪代码

1
2
3
4
5
6
7
h = (b-a) / n
approx = ( f(a) + f(b)) / 2
for (i = 1; i <= n-1; i++) {
x_i = a + i * h
approx += f(x_i)
}
approx = h * approx

并行程序伪代码(假设有k个核,划分为n个梯形)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Get a, b, n;
h = (b - a) / n;
local_n = n / k;
local_a = a + rank * local_n * h
local_b = local_a + local_n * h
local_integral = Trap(local_a, local_b, local_n, h)
if (rank != 0)
Send local_integral to Process0
else {
total_integral = local_integral
for (proc = 1; proc < k; proc++){
Recv local_integral from proc
total_integral += local_integral
}
if (rank == 0)
print result
}

2.2 I/O 处理

输出存在的问题:如果多个进程试图写标准输出stdout,那么这些进程的输出顺序是无法预测的,每次运行的顺序都可能不同

输入存在的问题:大部分 MPI 只允许 0 号进程访问标准输入 stdin,因此 0 号进程还需要将输入数据发送到其他进程

输出缓冲区问题:如果要输出其他提示信息,必须添加换行符,因为 stdout 只有在遇到换行符或者缓冲区满时,才会真正把数据刷新到屏幕上

2.3 代码

算法流程

  1. 并行划分:将总的梯形数 n 分成 k 份
  2. 局部计算:每个进程独立计算其区间上的积分制
  3. 消息传递:非主进程将计算结果发送到主进程
  4. 结果输出:主进程汇总全部计算结果,输出积分结果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
#include <stdio.h>
#include <string.h>
#include <mpi.h>

// 假设原函数为 f(x) = x^2,计算从 local_a 到 local_b,次数为 local_n,步长为 h 的梯形积分
double calcIntegral(double local_a, double local_b, int local_n, double h) {
double estimate = 0.0, x_i;
estimate = (local_a*local_a + local_b*local_b) / 2.0;
for (int i = 1; i < local_n; i++) {
x_i = local_a + i * h;
estimate += x_i * x_i;
}
estimate = estimate * h;
return estimate;
}

// 获取输入
void getInput(int my_rank, int comm_sz, double* a_p, double* b_p, int* n_p) {
int dest;
if (my_rank == 0) {
printf("Enter a, b, and n:\n");
scanf("%lf %lf %d", a_p, b_p, n_p);
for (dest = 1; dest < comm_sz; dest++) {
MPI_Send(a_p, 1, MPI_DOUBLE, dest, 0, MPI_COMM_WORLD);
MPI_Send(b_p, 1, MPI_DOUBLE, dest, 0, MPI_COMM_WORLD);
MPI_Send(n_p, 1, MPI_INT, dest, 0, MPI_COMM_WORLD);
}
} else {
MPI_Recv(a_p, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(b_p, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
MPI_Recv(n_p, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
}
}

int main(void) {
int comm_sz;
int my_rank;

MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

// 获取积分区间和次数
double a, b;
int n;
getInput(my_rank, comm_sz, &a, &b, &n);
double h = (b - a) / (double)n;

// 根据进程编号计算当前进程负责的积分区间
double local_n = n / comm_sz;
double local_a = a + my_rank * local_n * h;
double local_b = local_a + local_n * h;

// 计算局部积分
double local_integral = calcIntegral(local_a, local_b, local_n, h);
printf("Process %d of %d gets local_integral %.2f\n", my_rank, comm_sz, local_integral);

// 如果当前进程不是主进程,将局部积分发送到主进程
if (my_rank != 0) {
MPI_Send(&local_integral, 1, MPI_DOUBLE, 0, 0, MPI_COMM_WORLD);
}
// 主进程从其他进程接收局部积分,并累加计算总积分,最后输出
else {
double total_integral = local_integral;
for (int proc = 1; proc < comm_sz; proc++) {
double temp_integral;
MPI_Recv(&temp_integral, 1, MPI_DOUBLE, proc, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
total_integral += temp_integral;
}
printf("Estimate of the integral from %lf to %lf = %.15e\n", a, b, total_integral);
}

MPI_Finalize();
return 0;
}

3. 集合通信

点到点通信(point-to-point):单个发送进程与单个接收进程之间的通信,如 MPI_Send 和 MPI_Recv

树形结构通信:是点到点通信的变种,将汇总任务分散到不同进程节点上,而主进程负责将全部子汇总任务的结果汇总

如果是 0 号进程负责全部汇总,那么需要进行 7 次接收和 7 次加法,而使用树形结构通信,0 号进程只需要 3 次接收和 3 次加法
如果全部进程是同时启动的,那么前者需要 7 个时间量,而后者只需要 3 个时间量,减少了超过 50% 的总时间!

集合通信(collective):通信子中的所有进程都参与到通信中,而不需要指定单一发送方和接收方

3.1 MPI_Reduce

MPI_Reduce:将通信子中所有进程的数据按照某种归约操作合并,并将结果发送给指定进程

  • sendbuf:指向发送缓冲区的指针,包含了本进程要参加归约操作的数据
  • recvbuf:指向接收缓冲区的指针,存放归约操作后的结果
  • count:参与归约的数据个数
  • datatype:数据类型
  • op:归约操作符
  • root:归约结果存放的根进程编号
  • comm:通信子,规定了归约操作的进程范围

1
2
3
4
5
6
7
8
// 对所有进程的值进行求最大值和求和操作
int local_value, global_max, global_sum;
MPI_Reduce(&local_value, &global_max, 1, MPI_INT, MPI_MAX, 0, MPI_COMM_WORLD);
MPI_Reduce(&local_value, &global_sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

// 对数组的每个分量进行求累乘积操作
double local_value[N], prod[N];
MPI_Reduce(local_value, prod, N, MPI_DOUBLE, MPI_PROD, 0, MPI_COMM_WORLD);

注意事项

  1. 所有进程必须调用相同的集合通信函数
  2. 每个进程传递给 MPI 集合通信函数的参数必须是相容的,即 count,root 等参数应该相同,否则会导致错误或未定义行为
  3. 只有根进程需要接收归约后的全局结果参数,即recv_buf 只能用在 root 上,其他进程只需要提供占位参数
  4. 不允许使用同一个缓冲区同时是输入缓冲和输出缓冲,这样的结果是不可预测的

MPI_AllReduce:相比于 MPI_Reduce,少了 root 参数,且允许全部进程都设置 recv_buf 参数,也就是说所有进程都可以得到归约结果

3.2 MPI_Bcast

MPI_Bcast:用于将某个进程中的数据发送给通信子内所有进程,常用于分发输入参数、配置数据、或者初始条件等

  • recv_buf:存放广播数据的缓冲区
  • count:要广播的数据元素个数
  • datatype:要广播的数据类型
  • root:根进程编号,数据源头
  • comm:通信子,指定参与广播的进程组

对上述 getInput 函数的一个改进

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void getInput(
int my_rank,
int comm_sz,
double* a_p,
double* b_p,
double* n_p) {
if (my_rank == 0) {
printf("Enter a, b and n \n");
scanf("%lf %lf %d", a_p, b_p, n_p);
}
MPI_Bcast(a_p, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Bcast(b_p, 1, MPI_DOUBLE, 0, MPI_COMM_WORLD);
MPI_Bcast(n_p, 1, MPI_INT, 0, MPI_COMM_WORLD);
}

3.3 MPI_Scatter

数据分发

方式定义特点
块划分将数据或任务划分成连续区间的多个块,每个进程负责处理其中一个块数据局部性好,但是可能导致负载不均
循环划分按照轮询的方式将数据轮流分配给各个进程数据局部性差,负载均衡
块循环划分先将数据划分成若干小块,然后将这些块按照循环方式分配给各进程兼顾数据局部性与负载均衡,可以根据问题特性选择合适的块大小

MPI_Scatter:相当于块划分,用于将进程的数据缓冲区中的数据按进程数量等分,然后将每一份数据分散到通信子中的所有进程的接收缓冲区中进行处理

  • send_buf:指向发送缓冲区的指针,仅在根进程上有效
  • send_count:发送数据的个数
  • send_type:发送数据的数据类型
  • recv_buf:指向接收缓冲区的指针
  • recv_count:接收数据的个数
  • recv_type:接收数据的数据类型
  • root:根进程编号
  • comm:通信子,指定参与分散操作的进程集合

假设根进程有一个包含 20 个整数的数组,分散给 4 个进程处理加法(包括自己),则每个进程接收 5 个整数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <stdio.h>
#include <mpi.h>

int main(void) {
int comm_sz, rank;
const int root = 0;
const int n = 20;
const int count = 5;
int sendbuf[n];
int recvbuf[count];

MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &comm_sz);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

if (rank == root) {
for (int i = 0; i < n; i++) {
sendbuf[i] = i + 1;
}
}

MPI_Scatter(sendbuf, count, MPI_INT,
recvbuf, count, MPI_INT,
root, MPI_COMM_WORLD);

int local_sum = 0;
for (int i = 0; i < sendcount; i++) {
local_sum += recvbuf[i];
}
printf("Process %d gets sum %d\n", rank, local_sum);

MPI_Finalize();
return 0;
}

3.4 MPI_Gather

MPI_Gather:相当于块聚集,将每个进程中的数据收集到根进程上,并将这些数据放在一个连续的缓冲区中

  • send_buf:指向发送缓冲区的指针
  • send_count:发送数据的个数
  • send_type:发送数据的类型
  • recv_buf:指向接收缓冲区的指针,仅在根进程上有效
  • recv_count:接收数据的个数
  • recv_type:接收数据的类型
  • root:根进程编号
  • comm:通信子,指定了参与操作的进程组

根据 MPI 标准,聚集结果是按照进程的 rank 顺序排列的

打印分布式向量:每个进程传递一个数据给主进程,主进程聚集所有进程数据然后输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void Print_vector(
double local_b[],
int local_n,
int n,
char title[],
int my_rank,
MPI_Comm comm
) {
double* b = NULL;
int i;
if (my_rank == 0) {
b = malloc(n * sizeof(double));
MPI_Gather(local_b, local_n, MPI_DOUBLE, b, local_n, MPI_DOUBLE, 0, comm);
printf("%s\n", title);
for (i = 0; i < n; i++)
printf("%f ", b[i]);
printf("\n");
free(b);
} else {
MPI_Gather(local_b, local_n, MPI_DOUBLE, b, local_n, MPI_DOUBLE, 0, comm);
}
}

MPI_Allgather:相比于 MPI_Gather,没有了根进程,因为所有进程都可以获得聚集结果

矩阵-向量乘法:计算 y 的行分量时,需要 A 的一个行分量,但需要 x 的全部行分量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void Mat_vect_mult(
double local_A[], // 当前进程拥有的矩阵 A 的部分行
double local_x[], // 当前进程拥有的向量 x 的部分行
double local_y[], // 存放当前进程计算得到的结果向量 y 的对应部分
int local_m, // 当前进程获得的矩阵 A 的行数
int local_n, // 当前进程获得的向量 x 的行数
int n, // 矩阵 A 的总列数,向量 x 的总行数
MPI_Comm comm) {
double* x; // 局部变量,存放收集到的完整向量 x
x = malloc(n * sizeof(double));
MPI_Allgather(local_x, local_n, MPI_DOUBLE, x, local_n, MPI_DOUBLE, comm);

int i, j;
for (i = 0; i < local_m; i++) {
local_y[i] = 0.0;
for (j = 0; j < n; j++) {
local_y[i] += local_A[i * n + j] * x[j];
}
free(x);
}
}

4. 派生数据类型

派生数据类型:由一系列的 MPI 基本数据类型和每个数据类型的偏移所组成的

  • 在 MPI 中,消息的发送和接收通常存在较高的时间成本,与其将基础数据类型分多次发送,不如将多个基础数据组合成一个派生数据类型一次性发送,这样可以减少通信次数和启动开销,从而提高性能
  • 如果不使用派生数据类型,必须手动将非连续的数据打包到一个连续的缓冲区中,然后传输,接收端再拆包,不仅编程麻烦,而且可能增加额外的内存和时间开销

MPI_Type_contiguous:创建由相同类型的数据块组成的派生数据类型

  • count:连续元素的数量
  • datatype:基础数据类型
  • new_type_p:指向新数据类型的数据的起始位置的指针

MPI_Type_create_struct:创建一个由不同类型的数据块组成的派生数据类型

  • count:不同数据块的数量
  • array_of_lengths:每个数据块中的数据数量
  • array_of_offsets:每个数据块在结构体中相对于起始位置的字节偏移
  • array_of_types:每个数据块中数据的基础数据类型
  • new_type_p:指向新数据类型的数据的起始位置的指针

要用到的其他函数

  • int MPI_Get_address(const void *location, MPI_Aint *address):用于获取某个变量或结构体成员在内存中的绝对地址,存储在类型为 MPI_Aint 的变量中,在创建派生数据类型时用于计算关于起始地址的偏移量
  • int MPI_Type_commit(MPI_Datatype *datatype):将一个新创建的派生数据类型提交给 MPI 系统,使其准备好供通信操作使用
  • int MPI_Type_free(MPI_Datatype *datatype):释放一个已提交的派生数据类型所占用的资源

主进程一次性传递数据到其他进程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
void spreadData(double* a_p, double* b_p, int* c_p) {
// 设置派生数据类型的基础信息
int count = 3;
int lengths[3] = {1, 1, 1};
MPI_Aint offsets[3];
MPI_Datatype types[3] = {MPI_DOUBLE, MPI_DOUBLE, MPI_INT};

// 获取成员地址及其偏移量
MPI_Aint a_addr, b_addr, c_addr;
MPI_Get_address(a_p, &a_addr);
MPI_Get_address(b_p, &b_addr);
MPI_Get_address(c_p, &c_addr);
offsets[0] = 0;
offsets[1] = b_addr - a_addr;
offsets[2] = c_addr - a_addr;

// 创建派生数据类型
MPI_Datatype new_struct;
MPI_Type_create_struct(count, lengths, offsets, types, &new_struct);

// 提交派生数据类型
MPI_Type_commit(&new_struct);

// 广播派生数据类型
MPI_Bcast(a_p, 1, new_struct, 0, MPI_COMM_WORLD);

// 释放派生数据类型
MPI_Type_free(&new_struct);
}

5. 性能评估

5.1 计时

几种时间

  • 程序运行时间:从程序开始运行(MPI_INIT)到结束运行(MPI_Finalize)所耗费的时间,用来衡量整个程序包括初始化、通信、I/O 和计算等各部分的耗时
  • CPU时间:程序实际占用 CPU 执行指令的时间,不包括程序等待 I/O 或其他系统资源的时间
  • 并行计算时间:所有进程共同进行并行计算部分所花费的时间,整个并行计算的完成时间取决于最慢的进程

相关函数

  • double MPI_Wtime(void):返回一个双精度浮点数,表示当前的墙上时钟时间
  • int MPI_Barrier(MPI_Comm comm):用于同步所有进程,使通信子中所有进程在此调用处等待,直到所有进程都到达该调用点为止,然后再继续向下执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
double local_start, local_finish, local_elapsed, global_elapsed;

/* 初始化部分 */

// 同步进程,确保所有进程的开始时间相同
MPI_Barrier(comm);
local_start = MPI_Wtime();

/* 并行运算部分 */

// 获取当前进程的耗时
local_finish = MPI_Wtime();
local_elapsed = local_finish - local_start;

// 获取最慢进程的耗时作为整个并行程序的耗时
MPI_Reduce(&local_elapsed, &global_elapsed, 1, MPI_DOUBLE, MPI_MAX, 0, comm);

// 主进程输出计时结果
if (my_rank == 0)
printf("Elapsed time = %e seconds\n", global_elapsed);

由于操作系统的不可预知性,同一段程序的运行时间不可能每次都完全一样

5.2 影响因素分析

由下表,我们可以得到以下规律

  1. 随着进程数量 p 增加,运行时间下降
  2. 随着问题规模 n 增加,运行时间上升
  3. 当问题规模较小且进程数量足够多的时候,增加进程数量不会使得运行时间显著下降,这是因为此时进程间通信开销增大
  4. 在进程数量 p 较小,问题规模 n 较大的情况下,近似线性效率;在进程数量 p 较大,问题规模 n 较小的情况下,远远达不到线性效率

扩展性:当问题规模或资源数量发生变化时,系统能否保持原先较好的性能

  • 强扩展性:在不需要增加进程数量的前提下,增加问题规模依然能够保持恒定效率
  • 弱扩展性:增加问题规模后,需要增加进程数量来维持恒定效率

6. 并行奇偶换位排序算法

原理:反复交替执行两个阶段来排序,每个阶段内的比较和交换操作彼此独立,因此可以并行执行

  • 奇数阶段:比较序列中索引为奇数的元素与其后面的相邻元素,如果前者大于后者,则交换
  • 偶数阶段:比较序列中索引为偶数的元素与其后面的相邻元素,如果前者大于后者,则交换

定理:如果序列有 n 个键值,那么经过 n 个阶段后,序列一定可以排好序

0起始:5,9,4,3
1偶数:5,9,3,4
2奇数:5,3,9,4
3奇数:3,5,4,9
4偶数:3,4,5,9

算法流程

  1. 数据分发:主进程通过 MPI_Scatter 将待排序数组分发给各个进程
  2. 本地排序:每个进程对自己接收到的子数组进行本地排序
  3. 局部交换
    1. 偶数阶段:如果进程号为偶数,则与右侧进程交换,否则与左侧进程交换
    2. 奇数阶段:如果进程号为奇数,则与右侧进程交换,否则与左侧进程交换
    3. 第一个进程可能没有左交换对象,最后一个进程可能没有右交换对象,可以设置 MPI_PROC_NULL 来避免产生无效通信
  4. 合并数据:将两个局部有序数组合并,小编号进程保留小半部分,大编号进程保留大半部分
  5. 数据输出:主进程通过 MPI_Gather 得到全局排序结果

MPI_Sendrecv:当我们进行局部交换的时候,进程 x 发送消息等待进程 y 接收,而进程 y 也发送消息等待进程 x 接收,那么就会发生死锁,而 MPI_Sendrecv 会将发送和接收操作组合在一起,交给 MPI 库内部来协调这两个操作,不会出现死锁情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

// 比较函数,用于 qsort 排序
int compare_ints(const void *a, const void *b) {
int arg1 = *(const int *)a;
int arg2 = *(const int *)b;
return (arg1 > arg2) - (arg1 < arg2);
}

// 合并函数:合并两个数组为一个大数组,然后小半部分的数组给编号较小的进程
void merge(int *local_arr, int *recv_arr, int local_n, int rank, int partner) {
int *temp = malloc(2 * local_n * sizeof(int));
int i = 0, j = 0, k = 0;
while (i < local_n && j < local_n) {
if (local_arr[i] <= recv_arr[j])
temp[k++] = local_arr[i++];
else
temp[k++] = recv_arr[j++];
}
while (i < local_n)
temp[k++] = local_arr[i++];
while (j < local_n)
temp[k++] = recv_arr[j++];

if (rank < partner)
for (i = 0; i < local_n; i++)
local_arr[i] = temp[i];
else
for (i = 0; i < local_n; i++)
local_arr[i] = temp[i + local_n];
free(temp);
}

int main(void) {
int size, rank;
const int N = 20;
int nums[N] = {9, 3, 7, 1, 2, 8, 6, 5, 4, 0, 13, 19, 15, 11, 12, 14, 18, 17, 16, 10};

MPI_Init(NULL, NULL);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

int local_n = N / size;
int* local_nums = malloc(local_n * sizeof(int));
int* recv_nums = malloc(local_n * sizeof(int));

// 分发数据到每个进程的 local_nums 中
MPI_Scatter(nums, local_n, MPI_INT, local_nums, local_n, MPI_INT, 0, MPI_COMM_WORLD);

// 每个进程对 local_nums 进行首次本地排序
qsort(local_nums, local_n, sizeof(int), compare_ints);

// 进行 size 次奇偶交换排序
for(int phase = 0; phase < size; phase++) {
int partner;
// 偶数阶段
if (phase % 2 == 0) {
if (rank % 2 == 0)
partner = rank + 1;
else
partner = rank - 1;
}
// 奇数阶段
else if (phase % 2 != 0) {
if (rank % 2 != 0)
partner = rank + 1;
else
partner = rank - 1;
}

// 判断交换对象是否无效
if (partner == -1 || partner == size)
partner = MPI_PROC_NULL;

// 接发数据
MPI_Sendrecv(local_nums, local_n, MPI_INT, partner, 0, recv_nums, local_n, MPI_INT, partner, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

// 如果交换对象有效,则执行局部合并操作
if (partner != MPI_PROC_NULL)
merge(local_nums, recv_nums, local_n, rank, partner);

// 同步,保证所有进程都完成局部合并操作
MPI_Barrier(MPI_COMM_WORLD);
}

// 收集每个进程的局部排序结果到主进程
MPI_Gather(local_nums, local_n, MPI_INT, nums, local_n, MPI_INT, 0, MPI_COMM_WORLD);

// 主进程输出排序好的结果
if (rank == 0) {
for (int i = 0; i < N; i++)
printf("%d ", nums[i]);
printf("\n");
}

// 释放动态分配的内存
free(local_nums);
free(recv_nums);

MPI_Finalize();
return 0;
}