2205-MPI and timer

MPI 和 OpenMP 工作模式

OpenMP对于共享内存处理更优,但是没有通信的功能,所以MPI可以弥补这一缺点。

MPI快速上手

基本概念

关于进程和线程的说明我觉得英语的描述更为合适,可以非常清楚的说明这两个概念。

  1. 进程:A process is (traditionally) a program counter and address space.
  2. 线程:Processes may have multiple threads (program counters and associated stacks) sharing a single address space. MPI is for communication among processes, which have separate address spaces.
  3. 进程处理需要进行:
    1. 同步
    2. 数据通信

OpenMPI 和 MPICH

You can read following part later
首先需要认识到这两者不是一个东西,在接口实现上有细微差异。MPICH应该是最新MPI标准的高质量参考实施,以及衍生实施以满足特殊用途需求的基础。OpenMPI在使用和网络通信方面实现更加普通。大部分时候两者都可以使用,但是如果是在有IB通信的机器上,MPICH不可用

一般使用时,最大的差异是使用 Hydra 的时候会有差异。

OpenMPI的 Hostfile 格式为:

1
2
3
4
5
6
7
8
9
10
11
# This is an example hostfile.  Comments begin with #
#
# The following node is a single processor machine:
foo.example.com

# The following node is a dual-processor machine:
bar.example.com slots=2

# The following node is a quad-processor machine, and we absolutely
# want to disallow over-subscribing it:
yow.example.com slots=4 max-slots=4

MPICH的 Hostfile 格式为:

1
2
3
donner:2     # The first 2 procs are scheduled to run here
foo:3 # The next 3 procs run on this host
shakey:2 # The last 2 procs run on this host

更细节的参考:
MPICH Hydra
OpenMPI FAQ

学习的时候个人推荐MPICH进行学习和研究。

安装 MPICH

下载

1
wget http://www.mpich.org/static/downloads/3.4.2/mpich-3.4.2.tar.gz

编译

不做赘述

3对基本函数

MPI Hello World

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

int main(int argc, char** argv) {
// Initialize the MPI environment
MPI_Init(NULL, NULL);

// Get the number of processes
int world_size;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);

// Get the rank of the process
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);

// Get the name of the processor
char processor_name[MPI_MAX_PROCESSOR_NAME];
int name_len;
MPI_Get_processor_name(processor_name, &name_len);

// Print off a hello world message
printf("Hello world from processor %s, rank %d out of %d processors\n",
processor_name, world_rank, world_size);

// Finalize the MPI environment.
MPI_Finalize();
}

编译方法:

1
mpicxx hello.cpp -o hello

MPI_Init & MPI_Finalize

  • MPI_Init用于启动
  • MPI_Finalize用于结束

MPI_Comm_rank & MPI_Comm_size

  • MPI_Comm_rank 用于计算当前的rank
  • MPI_Comm_size 用于当前通信器里面的总进程数

MPI_Send & MPI_Recv

  • MPI_Send 用于发送数据
  • MPI_Recv 用于接收数据

接口格式为:

1
2
MPI_Send(const void *buf, int count, MPI_Datatype datatype,
int dest, int tag, MPI_Comm comm)

1
2
MPI_Recv(void *buf, int count, MPI_Datatype datatype,
int source, int tag, MPI_Comm comm, MPI_Status *status)

对于一个最简单的收发例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

int main(int argc, char ** argv)
{
int rank, data[100];

MPI_Init(&argc, &argv);

MPI_Comm_rank(MPI_COMM_WORLD, &rank);

if (rank == 0)
MPI_Send(data, 100, MPI_INT, 1, 0, MPI_COMM_WORLD);
else if (rank == 1)
MPI_Recv(data, 100, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);

MPI_Finalize();
return 0;
}

这里我们指定了固定的数据收发模式,[RANK0]发送[RANK1]接收。更详细的说明在之后进行介绍,这里只作为入门例子。

MPI 通信数据类型

一般数据类型

(Tip: 下表是用来查的,不是用来背的)

MPI datatype C datatype
MPL_CHAR char (treated as printable character)
MPI_SHORT signed short int
MPL_INT signed int
MPI_LONG signed long int
MPI_LONG_LONG_INT signed long long int
MPI_LONG_LONG (as a synonym) signed long long int
MPI_SIGNED_CHAR signed char (treated as integral value)
MPI_UNSIGNED_CHAR unsigned char (treated as integral value)
MPI_UNSIGNED_SHORT unsigned short int
MPI_UNSIGNED unsigned int
MPI_UNSIGNED_LONG unsigned long int
MPI_UNSIGNED_LONG_LONG unsigned long long int
MPI_FLOAT float
MPI_DOUBLE double
MPI_LONG_DOUBLE long double
MPL_WCHAR wchar_t (defined in ) (treated as printable character)
MPI_C_BOOL Bool
MPI_INT_T int8_t
MPI_INT16_T int16_t
MPI_INT32_T int32_t
MPI_INT64_T int64_t
MPI UINT8_T uint8_t
MPI_UINT16_T uint16_t
MPL_UINT32 T uint32_t
MPI_UINT64_T uint64_t
MPL_C_COMPLEX float_Complex
MPI_C_FLOAT_COMPLEX (as a synonym) float Complex
MPI_C_DOUBLE_COMPLEX double_Complex
MPI_BYTE
MPL_PACKED

对于 C++ 有一些拓展:

MPl datatype C++ datatype
MPI_CXX_BOOL bool
MPI_CXX_FLOAT_COMPLEX std::complex\
MPI_CXX_DOUBLE_COMPLEX std::complex\
MPI_CXX_LONG_DOUBLE_COMPLEX std::complex\

其中的 MPI_PACKED 和 MPI_BYTE 作为拓展消息类型。就是对数据进行压包后进行发送。

MPI_PACKED 数据类型

MPI_PACKED数据类型是一个特殊封装的数据类型,一般用来实现传输地址空间不连续的数据项。

接口格式为: (由于接口文档上,OpenMPI做的更好,后面可能会参考OpenMPI的文档)

MPI_Pack
MPI_Unpack
MPI_Pack_size

一个例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

using namespace std;

int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);
int position;
int i = 1;
int j = 2;
int a[2];

int myrank;
MPI_Comm_rank(MPI_COMM_WORLD, &myrank);

if (myrank == 0) {
/* I am sending */
position = 0;
int true_packed_size;
MPI_Pack_size(2, MPI_INT, MPI_COMM_WORLD, &true_packed_size);
cout << "True packed size = " << true_packed_size << endl;
char *buffer = new char[true_packed_size];
cout << "Packing number i = " << i << ", pos = " << position << endl;
MPI_Pack(&i, 1, MPI_INT, buffer, 1000, &position, MPI_COMM_WORLD);
cout << "Packing number j = " << j << ", pos = " << position << endl;
MPI_Pack(&j, 1, MPI_INT, buffer, 1000, &position, MPI_COMM_WORLD);
cout << "Packing finished, pos = " << position << endl;
MPI_Send(buffer, position, MPI_PACKED, 1, 0, MPI_COMM_WORLD);
delete[] buffer;
} else {
MPI_Recv(a, 2, MPI_INT, 0, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
cout << "a = " << a[0] << ", " << a[1] << endl;
}
MPI_Finalize();
}

MPI 点对点通信

阻塞通信和非阻塞通信

阻塞通信主要特征是:
如果假设进程A发送,进程B接收。在这一对收发任务完成之前,进程A会一致在发送接口处停住,而B会在接收接口处停住。

发送类别主要有以下四类:

  1. 标准模式 MPI_Send/MPI_Isend: 自由发送接收,不考虑其它进程状态
  2. 缓存模式 MPI_Bsend/MPI_Ibsend: 由用户显式提供缓存区,辅助通信
  3. 同步模式 MPI_Ssend/MPI_Issend: 通信双方先建立联系,再通信
  4. 就绪模式 MPI_Rsend/MPI_Irsend: 接受进程必须先于发送进程提出通信要求

一般使用的时候主要使用标准模式即可。

MPI 原语 阻塞 非阻塞
Standard Send MPI_Send MPI_Isend
Buffered Send MPI_Bsend MPI_Ibsend
Synchronous Send MPI_Ssend MPI_Issend
Ready Send MPI_Rsend MPI_Irsend
Receive MPI_Recv MPI_Irecv
Completion Check MPI_Wait MPI_Test

流水/管线通信

模式例子:

1
2
3
4
5
┌───────┐   ┌───────┐   ┌───────┐
│ │ │ │ │ │
│ x ├──>│ y ├──>│ z │
│ │ │ │ │ │
└───────┘ └───────┘ └───────┘

Example:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

using namespace std;

int function_p(int w) {
return w + w;
}

int function_q(int x) {
return x * x;
}

int function_r(int y) {
return y - 2;
}

int main(int argc, char *argv[]) {
MPI_Init(&argc, &argv);

int world_size, my_rank;

MPI_Comm_size(MPI_COMM_WORLD, &world_size);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

if (my_rank == 0) {
int ans[10];
MPI_Request req[10];
for (int index = 0; index < 10; ++index) {
int number = index;
MPI_Isend(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, &req[index]);
ans[index] = function_p(number);
ans[index] = function_q(ans[index]);
ans[index] = function_r(ans[index]);
}
cout << "should be: ";
for (int index = 0; index < 10; ++index) {
cout << ans[index] << " ";
}
cout << endl;
} else if (my_rank == 1) {
MPI_Request req;
int number;
int cnt = 0;
while(1) {
MPI_Irecv(&number, 1, MPI_INT, 0, 0, MPI_COMM_WORLD, &req);
MPI_Wait(&req, MPI_STATUS_IGNORE);
number = function_p(number);
MPI_Isend(&number, 1, MPI_INT, 2, 0, MPI_COMM_WORLD, &req);
cnt++;
if (cnt == 10) {break; }
}
} else if (my_rank == 2) {
MPI_Request req;
int number;
int cnt = 0;
while(1) {
MPI_Irecv(&number, 1, MPI_INT, 1, 0, MPI_COMM_WORLD, &req);
MPI_Wait(&req, MPI_STATUS_IGNORE);
number = function_q(number);
MPI_Isend(&number, 1, MPI_INT, 3, 0, MPI_COMM_WORLD, &req);
cnt++;
if (cnt == 10) {break; }
}
} else if (my_rank == 3) {
MPI_Request req;
int number[10];
int cnt = 0;
while(1) {
MPI_Irecv(&number[cnt], 1, MPI_INT, 2, 0, MPI_COMM_WORLD, &req);
MPI_Wait(&req, MPI_STATUS_IGNORE);
number[cnt] = function_r(number[cnt]);
cnt++;
if (cnt == 10) {break; }
}
for (int index = 0; index < 10; index++)
cout << number[index] << " ";
cout << endl;
}
MPI_Finalize();
}

双缓冲

模式例子:

1
2
3
4
5
6
7
┌─────────┐   ┌─────────────┐   ┌─────────┐
│ x buf 0 ├──>│ ├──>│ y buf 0 │
└─────────┘ │ │ └─────────┘
│ Y = F(X) │
┌─────────┐ │ │ ┌─────────┐
│ x buf 1 ├──>│ ├──>│ y buf 1 │
└─────────┘ └─────────────┘ └─────────┘

针对计算复杂度比较高的情况,第一轮次使用xbuf0和ybuf0,第二轮次使用xbuf1和ybuf1,两组缓冲轮流使用。

对位置消息探查

主要函数有:

MPI_Probe 的接口和 MPI_Recv 近似,他是不接受数据的 Recv
下面这个例子辅助理解(From MPI-Tutorial):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int number_amount;
if (world_rank == 0) {
const int MAX_NUMBERS = 100;
int numbers[MAX_NUMBERS];
// Pick a random amount of integers to send to process one
srand(time(NULL));
number_amount = (rand() / (float)RAND_MAX) * MAX_NUMBERS;

// Send the random amount of integers to process one
MPI_Send(numbers, number_amount, MPI_INT, 1, 0, MPI_COMM_WORLD);
printf("0 sent %d numbers to 1\n", number_amount);
} else if (world_rank == 1) {
MPI_Status status;
// Probe for an incoming message from process zero
MPI_Probe(0, 0, MPI_COMM_WORLD, &status);

// When probe returns, the status object has the size and other
// attributes of the incoming message. Get the message size
MPI_Get_count(&status, MPI_INT, &number_amount);

// Allocate a buffer to hold the incoming numbers
int* number_buf = (int*)malloc(sizeof(int) * number_amount);

// Now receive the message with the allocated buffer
MPI_Recv(number_buf, number_amount, MPI_INT, 0, 0,
MPI_COMM_WORLD, MPI_STATUS_IGNORE);
printf("1 dynamically received %d numbers from 0.\n",
number_amount);
free(number_buf);
}

MPI 通信域

MPI 集群通信

类型 函数名 含义
通信 MPI_Bcast 一对多广播同样的消息
MPI_Gather 多对一收集各个进程的消息
MPI_Gatherv MPI_Gather的一般化
MPI_Allgather 全局收集
MPI_Allgatherv MPI_Allgather的一般化
MPI_Scatter 一对多散播不同的消息
MPI_Scatterv MPI_Scatter的一般化
MPI_Alltoall 多对多全局交换消息
MPI_Alltoallv MPI_Alltoall的一般化
聚集 MPI_Reduce 多对一归约
MPI_Allreduce MPI_Reduce的一般化
MPI_Reduce_scatter MPI_Reduce的一般化
MPI_Scan 扫描
同步 MPI_Barrier 路障同步

OpenMP快速上手

实例

实例之前

为了说明各种实例的效果和性能,这里先补充一下关于计时的方法。

方法1:time

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

int main()
{
time_t tm_now;

time(&tm_now);

// 1970-1-1,00:00:00到现在的秒数
printf("now time is %ld second\n", tm_now);

// 转换成本地时间,精确到秒
struct tm *p_local_tm ;
p_local_tm = localtime(&tm_now) ;
printf("now datetime: %04d-%02d-%02d %02d:%02d:%02d\n",
p_local_tm->tm_year+1900,
p_local_tm->tm_mon+1,
p_local_tm->tm_mday,
p_local_tm->tm_hour,
p_local_tm->tm_min,
p_local_tm->tm_sec);

return 0;
}

方法2:gettimeofday

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22

int main()
{
struct timeval tm_now;

// 获取当前时间戳(tv_sec, tv_usec)
gettimeofday(&tm_now,NULL); // 第二个参数是时区

// 转换成本地时间,精确到秒
struct tm *p_local_tm;
p_local_tm = localtime(&tm_now.tv_sec) ;
printf("now datetime: %04d-%02d-%02d %02d:%02d:%02d.%06ld\n",
p_local_tm->tm_year+1900,
p_local_tm->tm_mon+1,
p_local_tm->tm_mday,
p_local_tm->tm_hour,
p_local_tm->tm_min,
p_local_tm->tm_sec,
tm_now.tv_usec); // 有微秒时间戳了

return 0;
}

方法3:clock_gettime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

void print_timestamp(int use_monotonic)
{
struct timespec tm_now;

// 获取当前时间戳(tv_sec, tv_usec)
if(use_monotonic)
clock_gettime(CLOCK_MONOTONIC, &tm_now);
// 单调时间,屏蔽手动修改时间
else
clock_gettime(CLOCK_REALTIME, &tm_now);
// 机器时间

// 转换成本地时间,精确到秒
struct tm *p_local_tm;
p_local_tm = localtime(&tm_now.tv_sec) ;
printf("now datetime: %04d-%02d-%02d %02d:%02d:%02d.%09ld\n",
p_local_tm->tm_year+1900,
p_local_tm->tm_mon+1,
p_local_tm->tm_mday,
p_local_tm->tm_hour,
p_local_tm->tm_min,
p_local_tm->tm_sec,
tm_now.tv_nsec);
// 纳秒时间
}

方法4:chrono库

1
2
3
4
5
6
7
8

auto start = std::chrono::system_clock::now();

...

auto end = std::chrono::system_clock::now();
std::chrono::duration<double> elapsed_seconds = end - start;
time1 = elapsed_seconds.count();

方法5:rdtsc

最精准也是最难用的方法

1
2
3
4
5
6
7
8
9
10
11
12
uint64_t get_tsc() // Time Stamp Counter寄存器
{
uint64_t x;
__asm__ volatile("rdtsc" : "=A"(x));
return x;
uint64_t a, d;
__asm__ volatile("rdtsc" : "=a"(a), "=d"(d));
return (d << 32) | a;
uint32_t cc = 0;
__asm__ volatile ("mrc p15, 0, %0, c9, c13, 0":"=r" (cc));
return (uint64_t)cc;
}

使用限制:

  1. 机器需要有constant_tsc的特性,使用:cat /proc/cpu_info | grep constant_tsc命令可以确定是否有该特性

  2. 乱序执行核能会打乱时钟周期的测量,必要时需要制造“依赖指令”去避免乱序执行

  3. 必要时需要使用memory barrier

  4. cat /proc/cpuinfo | grep rdtscp 如果开启,可以使用rdtscp,更精准一点。使用方法基本一致:

    1
    uint64_t get_tscp() { uint64_t a, d; __asm__ volatile("rdtscp" : "=a"(a), "=d"(d)); return (d << 32) | a; }

小结

如果使用C++,那么使用chrono库是最好的选择,相信STL不会翻大车
如果机器由constant tsc特性,那么可以使用rdtsc方法
如果没有,那么使用gettimeofday时一个比较稳定的方法

实例1:不适用并行的例子

刻意并行化,或者过低的并行层级往往会带来巨大的负优化。这里用一个求和的例子说明。
采用如下方式对一个数组求和。

MPI实现方案


2205-MPI and timer
http://blog.chivier.site/2022-05-13/8124c549cbee/
Author
Chivier Humber
Posted on
May 13, 2022
Licensed under