使用MPI测量延迟

延迟的测量一般采用Ping-Pong的方法,即A发送一则消息给B,B收到后立刻进行回复,最终由A计算得出消息一次来回所用的时间,也就是A与B延迟的2倍,本文也是基于这种方法测量不同消息大小时点对点的延迟,参考了osu-micro-benchmarks-5.8中的测量程序,因为osu-micro-benchmarks的不好配置在SimGrid虚拟环境下就仿照写了测量程序。

使用的数据结构以及一些参数的定义,measure_util.h:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
#define BW_SKIP_SMALL 10
#define BW_LOOP_SMALL 100
#define BW_SKIP_LARGE 2
#define BW_LOOP_LARGE 20
#define LAT_SKIP_SMALL 100
#define LAT_LOOP_SMALL 10000
#define LAT_SKIP_LARGE 10
#define LAT_LOOP_LARGE 1000
#define LARGE_MESSAGE_SIZE 8192
#define FIELD_WIDTH 20
#define FLOAT_PRECISION 2

struct parameters_t {
unsigned long min_message_size;
unsigned long max_message_size;
int window_size;
unsigned long iterations;
unsigned long skip;
unsigned long iterations_large;
unsigned long skip_large;
};

点到点延迟测量

首先需要为程序分配收发缓冲区的资源:

1
2
3
4
5
6
7
8
9
10
11
char *s_buf, *r_buf;
if (allocate_buffer(&s_buf, &r_buf, size)) {
if (!rank) {
fprintf(stderr, "Insufficient Memory!\n");
}
MPI_Finalize();
exit(0);
}
memset(s_buf, 's', size);
memset(r_buf, 'r', size);
}

allocate_buffer的定义如下:

1
2
3
4
5
6
7
int allocate_buffer(char** s_buf, char** r_buf, int size) {
if ((*s_buf = malloc(size)) == NULL)
return 1;
if ((*r_buf = malloc(size)) == NULL)
return 1;
return 0;
}

一次Ping-Pong的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (!rank) {

if (i >= params.skip) {
t_start = MPI_Wtime();
}

MPI_Send(s_buf, size, MPI_CHAR, 1, 1, MPI_COMM_WORLD);
MPI_Recv(r_buf, size, MPI_CHAR, 1, 1, MPI_COMM_WORLD, &reqstats);

if (i >= params.skip) {
t_end = MPI_Wtime();
t_total += t_end - t_start;
}

} else if (rank == 1) {

MPI_Recv(r_buf, size, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &reqstats);
MPI_Send(s_buf, size, MPI_CHAR, 0, 1, MPI_COMM_WORLD);

}

为了保证延迟测量的准确性,采取了下属措施:

  • 测量多次取平均值,共测量parameters_t.iterations次

  • 不对最早开始测出的parameters_t.skip次结果进行取值,猜测可能是跟主机的缓存或者交换机有关

程序中对发送不同大小的消息的延迟都进行了测试,最终完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/*latency_measure.c*/
#include <math.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "measure_util.h"
#include "mpi.h"

struct parameters_t params = {1, 4194304, 1, LAT_LOOP_SMALL, LAT_SKIP_SMALL, LAT_LOOP_LARGE, LAT_SKIP_LARGE};

int allocate_buffer(char** s_buf, char** r_buf, int size) {
if ((*s_buf = malloc(size)) == NULL)
return 1;
if ((*r_buf = malloc(size)) == NULL)
return 1;
return 0;
}
void main(int argc, char* argv[]) {

int rank, size;
char *s_buf, *r_buf;
double t_total, t_start, t_end;

MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

MPI_Status reqstats;

for (size = params.min_message_size; size <= params.max_message_size; size *= 2) {

if (allocate_buffer(&s_buf, &r_buf, size)) {
if (!rank) {
fprintf(stderr, "Insufficient Memory!\n");
}
MPI_Finalize();
exit(0);
}
memset(s_buf, 's', size);
memset(r_buf, 'r', size);

if (size > LARGE_MESSAGE_SIZE) {
params.iterations = params.iterations_large;
params.skip = params.skip_large;
}

MPI_Barrier(MPI_COMM_WORLD);
t_total = 0.0;

for (int i = 0; i < params.iterations + params.skip; i++) {

if (!rank) {

if (i >= params.skip) {
t_start = MPI_Wtime();
}

MPI_Send(s_buf, size, MPI_CHAR, 1, 1, MPI_COMM_WORLD);
MPI_Recv(r_buf, size, MPI_CHAR, 1, 1, MPI_COMM_WORLD, &reqstats);

if (i >= params.skip) {
t_end = MPI_Wtime();
t_total += t_end - t_start;
}

} else if (rank == 1) {

MPI_Recv(r_buf, size, MPI_CHAR, 0, 1, MPI_COMM_WORLD, &reqstats);
MPI_Send(s_buf, size, MPI_CHAR, 0, 1, MPI_COMM_WORLD);

}
}

if (!rank) {
double latency = (t_total * 1e6) / (2.0 * params.iterations);
fprintf(stdout, "%-*d%*.*f\n", 10, size, FIELD_WIDTH,
FLOAT_PRECISION, latency);
fflush(stdout);
}

free(s_buf);
free(r_buf);

}


MPI_Finalize();
}

多进程延迟矩阵的测量

延迟矩阵的测量仍然沿用上述方法,由于不同进程对之间的通信可能会用到相同的网络资源进而产生干扰,所以同一时间只能测试一对进程的延迟,导致进程数较大时程序可能运行的比较慢,另一方面暂时还没有用到大消息的延迟,就只对4字节的消息测量了延迟。

测量的主要部分,每个进程都会作为发起方测量跟比它序号大的进程之间的延迟:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for (int i = 0; i < size; i ++) {
if (rank == i) {
for (int j = i+1; j < size; j ++) {
total = 0.0;
for (int k = 0; k < PING_COUNT + SKIP_COUNT; k ++) {
if (k >= SKIP_COUNT) {
start = MPI_Wtime();
}
MPI_Send(sbuf, MSG_SIZE, MPI_CHAR, j, 0, MPI_COMM_WORLD);
MPI_Recv(rbuf, MSG_SIZE, MPI_CHAR, j, 0, MPI_COMM_WORLD, &status);
if (k >= SKIP_COUNT) {
end = MPI_Wtime();
total += end - start;
}
}
latency[j] = (total * 1e6) / (2.0 * PING_COUNT);
}
} else if (rank > i) {
for (int k = 0; k < PING_COUNT + SKIP_COUNT; k++) {
MPI_Recv(rbuf, MSG_SIZE, MPI_CHAR, i, 0, MPI_COMM_WORLD, &status);
MPI_Send(sbuf, MSG_SIZE, MPI_CHAR, i, 0, MPI_COMM_WORLD);
}
}
}

完整代码,最后会将结果输出到./res/latency_matrix{procs_num}_measured.txt下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*lat_matrix_measure.c*/
#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>
#include <math.h>
#include <string.h>

#define MSG_SIZE 4
#define PING_COUNT 64
#define SKIP_COUNT 8

char* itoa(int num,char* str,int radix)
{
char index[]="0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";//索引表
unsigned unum;//存放要转换的整数的绝对值,转换的整数可能是负数
int i=0,j,k;//i用来指示设置字符串相应位,转换之后i其实就是字符串的长度;转换后顺序是逆序的,有正负的情况,k用来指示调整顺序的开始位置;j用来指示调整顺序时的交换。

//获取要转换的整数的绝对值
if(radix==10&&num<0)//要转换成十进制数并且是负数
{
unum=(unsigned)-num;//将num的绝对值赋给unum
str[i++]='-';//在字符串最前面设置为'-'号,并且索引加1
}
else unum=(unsigned)num;//若是num为正,直接赋值给unum

//转换部分,注意转换后是逆序的
do
{
str[i++]=index[unum%(unsigned)radix];//取unum的最后一位,并设置为str对应位,指示索引加1
unum/=radix;//unum去掉最后一位

}while(unum);//直至unum为0退出循环

str[i]='\0';//在字符串最后添加'\0'字符,c语言字符串以'\0'结束。

//将顺序调整过来
if(str[0]=='-') k=1;//如果是负数,符号不用调整,从符号后面开始调整
else k=0;//不是负数,全部都要调整

char temp;//临时变量,交换两个值时用到
for(j=k;j<=(i-1)/2;j++)//头尾一一对称交换,i其实就是字符串的长度,索引最大值比长度少1
{
temp=str[j];//头部赋值给临时变量
str[j]=str[i-1+k-j];//尾部赋值给头部
str[i-1+k-j]=temp;//将临时变量的值(其实就是之前的头部值)赋给尾部
}

return str;//返回转换后的字符串

}

void main(int argc, char* argv[]) {

int rank, size;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &size);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);

double *latency, *latency_matrix;
latency = malloc(sizeof(double) * size);
latency_matrix = malloc(sizeof(double) * size * size);
double start, end, total;
char *rbuf, *sbuf;
MPI_Status status;

rbuf = malloc(sizeof(char) * MSG_SIZE);
memset(rbuf, 'r', MSG_SIZE);
sbuf = malloc(sizeof(char) * MSG_SIZE);
memset(sbuf, 's', MSG_SIZE);
for (int i = 0; i < size; i ++) {
if (rank == i) {
for (int j = i+1; j < size; j ++) {
total = 0.0;
for (int k = 0; k < PING_COUNT + SKIP_COUNT; k ++) {
if (k >= SKIP_COUNT) {
start = MPI_Wtime();
}
MPI_Send(sbuf, MSG_SIZE, MPI_CHAR, j, 0, MPI_COMM_WORLD);
MPI_Recv(rbuf, MSG_SIZE, MPI_CHAR, j, 0, MPI_COMM_WORLD, &status);
if (k >= SKIP_COUNT) {
end = MPI_Wtime();
total += end - start;
}
}
latency[j] = (total * 1e6) / (2.0 * PING_COUNT);
}
} else if (rank > i) {
for (int k = 0; k < PING_COUNT + SKIP_COUNT; k++) {
MPI_Recv(rbuf, MSG_SIZE, MPI_CHAR, i, 0, MPI_COMM_WORLD, &status);
MPI_Send(sbuf, MSG_SIZE, MPI_CHAR, i, 0, MPI_COMM_WORLD);
}
}
}

MPI_Gather(latency, size, MPI_DOUBLE, latency_matrix, size, MPI_DOUBLE, 0, MPI_COMM_WORLD);
if (!rank) {
for (int i = 0; i < size; i ++) {
for (int j = 0; j < size; j ++) {
printf("%lf ", *(latency_matrix + i*size + j));
}
printf("\n");
}
FILE *fp;
char file_name[100] = "./res/latency_matrix";
char file_hostnum[5];
itoa(size, file_hostnum, 10);
strcat(file_name, file_hostnum);
strcat(file_name, "_measured.txt");
fp = fopen(file_name, "w");
for (int i = 0; i < size; i ++) {
for (int j = 0; j < size; j ++) {
fprintf(fp, "%d ", i <= j ? (int) *(latency_matrix + i*size + j) : (int) *(latency_matrix + j*size + i));
}
fprintf(fp, "\n");
}
fclose(fp);
printf("ok\n");
}

free(latency);
free(latency_matrix);
free(rbuf);
free(sbuf);

MPI_Finalize();
}