RDMA-In-Practice[0] 前言&ping-pong

知乎专栏RDMA杂谈非常详尽的讲述了RDMA技术的基本概念与一些重要的细节,希望能在理论学习的基础上写一些代码对RDMA进行实践,同时把学习过程与代码整理出来作为这一系列博客的内容。

代码仓库:RDMA-In-Practice

Ping-Pong

在rdma-core的仓库中是有给出基于RDMA编写的ping-pong程序的示例代码的:https://github.com/linux-rdma/rdma-core/tree/master/libibverbs/examples ,然而直接去看会有些困惑。这篇博客会结合RDMA杂谈专栏中的讲解以及ping-pong示例代码一步一步实现一个RDMAping-pong程序。

这篇文章给出了编写简单RDMA程序的一个大致流程:

img

整体流程可以大致分为几个阶段:

  1. 查询与打开设备

  2. 创建与初始化PD、MR、CQ与QP

  3. 通过Socket或CM建立QP间连接

  4. 收发数据

  5. 销毁创建的资源,关闭设备

接下来本篇文章中就按照这个顺序一点一点完成一个简单的Ping-Pong程序。完整程序见pingpong–send-recv–v1.0

打开IB设备

使用libverbs API编写程序使用RDMA通信离不开IB网卡设备的协助,因此程序首先要查询可用的IB设备并打开。程序能够从命令行读取出用户指定的设备名ib_devname,并从设备列表中选择设备。这部分代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/* 解析设备列表并选择设备 */
dev_list = ibv_get_device_list(&num_devices);
if (!dev_list) {
perror("Failed to get IB devices list");
return 1;
}
if (ib_devname.empty()) {
ib_dev = dev_list[0];
if (ib_dev == nullptr) {
fprintf(stderr, "No IB devices found\n");
return 1;
}
} else {
for (int i = 0; i < num_devices && dev_list[i] != nullptr; i++) {
if (strcmp(ibv_get_device_name(dev_list[i]), ib_devname.c_str()) == 0) {
ib_dev = dev_list[i];
break;
}
}
if (ib_dev == nullptr) {
fprintf(stderr, "No IB devices found\n");
return 1;
}
}

/* 打开ib设备,初始化上下文ibv_context */
ib_ctx = ibv_open_device(ib_dev);
if (ib_ctx == nullptr) {
fprintf(stderr, "Couldn't get context for %s\n", ibv_get_device_name(ib_dev));
goto clear_and_ret;
}
fprintf(stdout, "Get context for %s\n", ibv_get_device_name(ib_dev));

初始化资源

要使用RDMA通信需要将用户态申请的内存区域向RDMA网卡注册为Memory Region,得到操作句柄后才能真正开始通信。创建与注册通信内存缓冲区的完整的流程为:首先申请用户态内存,然后创建Protection Domain,最后在PD下将用户态内存注册为Memory Region。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/* 分配页对齐的内存 */
int page_size = sysconf(_SC_PAGESIZE);
buf = aligned_alloc(page_size, size);
if (buf == nullptr) {
fprintf(stderr, "Couldn't allocate work buf.\n");
goto clear_and_ret;
}
memset(buf, 0x7b, size);

/* 分配Protection Domain */
ib_pd = ibv_alloc_pd(ib_ctx);
if (ib_pd == nullptr) {
fprintf(stderr, "Couldn't allocate PD\n");
goto clear_and_ret;
}

/* 注册Memory Region */
ib_mr = ibv_reg_mr(ib_pd, buf, size, access_flags);
if (ib_mr == nullptr) {
fprintf(stderr, "Couldn't register MR\n");
goto clear_and_ret;
}

其中,由于RDMA通信过程中需要将页锁在物理内存区内,因此尽可能分配页对齐的内存。向RDMA网卡注册好通信的内存缓冲区后,需要完成通信还要有Completion Queue和Queue Pair。创建CQ和QP的过程如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/* 注册Completion Queue */
ib_cq = ibv_create_cq(ib_ctx, rx_depth + 1, NULL, NULL, 0);
if (ib_cq == nullptr) {
fprintf(stderr, "Couldn't create CQ\n");
goto clear_and_ret;
}

/* 创建Queue Pair */
{
struct ibv_qp_attr attr;
struct ibv_qp_init_attr init_attr = {
.send_cq = ib_cq,
.recv_cq = ib_cq,
.cap = {
.max_send_wr = 1,
.max_recv_wr = rx_depth,
.max_send_sge = 1,
.max_recv_sge = 1
},
.qp_type = IBV_QPT_RC
};

ib_qp = ibv_create_qp(ib_pd, &init_attr);

if (ib_qp == nullptr) {
fprintf(stderr, "Couldn't create QP\n");
goto clear_and_ret;
}

ibv_query_qp(ib_qp, &attr, IBV_QP_CAP, &init_attr);
if (init_attr.cap.max_inline_data >= size)
send_flags |= IBV_SEND_INLINE;
}

这里创建的QP中的Send Queue和Recv Queue均使用同一个Completion Queue,max_send_wrmax_recv_wr分别代表QP中最大可以容纳Send和Recv请求的数量,qp_type指定QP的类型为RC。创建好QP后需要改变其状态,并向其中预先填上足够多的Recv请求,防止错过发送方发送的消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* 改变QP状态到INIT */
{
struct ibv_qp_attr attr = {
.qp_state = IBV_QPS_INIT,
.qp_access_flags = 0,
.pkey_index = 0,
.port_num = ib_port
};

if (ibv_modify_qp(ib_qp, &attr,
IBV_QP_STATE |
IBV_QP_ACCESS_FLAGS |
IBV_QP_PKEY_INDEX |
IBV_QP_PORT )) {
fprintf(stderr, "Failed to modify QP to INIT\n");
perror("ibv_modify_qp");
goto clear_and_ret;
}
}

/* 先给Receive Queue里面提交足够多的请求if (server_name.empty()) */
{
struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = size,
.lkey = ib_mr->lkey
};
struct ibv_recv_wr wr = {
.wr_id = PINGPONG_RECV_WRID,
.sg_list = &list,
.num_sge = 1,
};
struct ibv_recv_wr *bad_wr;
int i;

for (i = 0; i < rx_depth; ++i)
if (ibv_post_recv(ib_qp, &wr, &bad_wr))
break;

routs = i;
if (routs < rx_depth) {
fprintf(stderr, "Couldn't post receive (%d)\n", routs);
return 1;
}
printf("routs %d\n", routs);
}

为双方QP建立连接

QP之间建立连接需要交换地址、QP号、Queue Sequence Number、Global Identifier等信息,对于Send操作,无需交换虚拟地址以及Remote Key。首先通过查询获取需要交换的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/* 获取自己qp的信息 */
ret = ibv_query_port(ib_ctx, ib_port, &port_attr);
if (ret != 0) {
fprintf(stderr, "Couldn't get port info\n");
return 1;
}
/* lid */
local_ibaddr.lid = port_attr.lid;
if (port_attr.link_layer != IBV_LINK_LAYER_ETHERNET && !port_attr.lid) {
fprintf(stderr, "Couldn't get local LID\n");
}
/* gid */
if (gid_idx >= 0) {
ret = ibv_query_gid(ib_ctx, ib_port, gid_idx, &local_ibaddr.gid);
if (ret != 0) {
fprintf(stderr, "can't read sgid of index %d\n", gid_idx);
return 1;
}
} else {
memset(&local_ibaddr.gid, 0, sizeof(local_ibaddr.gid));
}
/* qp序号和Packet Sequence Number */
local_ibaddr.qpn = ib_qp->qp_num;
local_ibaddr.psn = lrand48() & 0xffffff;

接下来便是建立Socket连接、交换信息、再次改变QP状态的过程。值得注意的是为了保证发送方发送消息是接收方的QP已经处于Ready-To-Receieve状态,可以在交换信息的过程中首先切换接收方QP状态,等到完成后再将自己的信息发送给发送方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/* 交换ib地址信息 */
inet_ntop(AF_INET6, &local_ibaddr.gid, gid_str, sizeof gid_str);
printf(" local address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n",
local_ibaddr.lid, local_ibaddr.qpn, local_ibaddr.psn, gid_str);
if (server_name.empty()) {
remote_ibaddr = server_exch_ibaddr(port, ib_port, gid_idx, ib_qp, &local_ibaddr);
} else {
remote_ibaddr = client_exch_ibaddr(server_name.c_str(), port, &local_ibaddr);
}
if (remote_ibaddr == nullptr)
return 1;
inet_ntop(AF_INET6, &remote_ibaddr->gid, gid_str, sizeof gid_str);
printf(" remote address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n",
remote_ibaddr->lid, remote_ibaddr->qpn, remote_ibaddr->psn, gid_str);

建立Socket连接、交换信息无非是Socket通信的一套,这里只给出得到信息后QP之间建立连接的过程,在利用收到的信息设置完QP状态后,先后将QP状态转移到RTS和RTR:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
static int connect_qp(const ibaddr_info *remote_ibaddr, 
const ibaddr_info *local_ibaddr,
int ib_port, int gid_idx, ibv_qp *ib_qp)
{
ibv_qp_attr attr = {
.qp_state = IBV_QPS_RTR,
.path_mtu = IBV_MTU_1024,
.rq_psn = remote_ibaddr->psn,
.dest_qp_num = remote_ibaddr->qpn,
.ah_attr = {
.dlid = remote_ibaddr->lid,
.sl = 0,
.src_path_bits = 0,
.is_global = 0,
.port_num = ib_port
},
.max_dest_rd_atomic = 1,
.min_rnr_timer = 12
};

if (remote_ibaddr->gid.global.interface_id) {
attr.ah_attr.is_global = 1;
attr.ah_attr.grh.hop_limit = 1;
attr.ah_attr.grh.dgid = remote_ibaddr->gid;
attr.ah_attr.grh.sgid_index = gid_idx;
}
if (ibv_modify_qp(ib_qp, &attr,
IBV_QP_STATE |
IBV_QP_AV |
IBV_QP_PATH_MTU |
IBV_QP_DEST_QPN |
IBV_QP_RQ_PSN |
IBV_QP_MAX_DEST_RD_ATOMIC |
IBV_QP_MIN_RNR_TIMER)) {
fprintf(stderr, "Failed to modify QP to RTR\n");
return 1;
}

attr.qp_state = IBV_QPS_RTS;
attr.timeout = 14;
attr.retry_cnt = 7;
attr.rnr_retry = 7;
attr.sq_psn = local_ibaddr->psn;
attr.max_rd_atomic = 1;
if (ibv_modify_qp(ib_qp, &attr,
IBV_QP_STATE |
IBV_QP_TIMEOUT |
IBV_QP_RETRY_CNT |
IBV_QP_RNR_RETRY |
IBV_QP_SQ_PSN |
IBV_QP_MAX_QP_RD_ATOMIC)) {
fprintf(stderr, "Failed to modify QP to RTS\n");
return 1;
}
return 0;
}

自此便可以提交请求在QP之间通信了。

通信

这一部分的代码条理比较清晰。client首先post一个send请求,等发送完成后等待server的回复,在收到回复后退出;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
/* ping-pong */
if (!server_name.empty()) {
if (connect_qp(remote_ibaddr, &local_ibaddr, ib_port, gid_idx, ib_qp)) return -1;
/* client提交一个send */
struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = size,
.lkey = ib_mr->lkey
};
struct ibv_send_wr wr = {
.wr_id = PINGPONG_SEND_WRID,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = send_flags,
};
struct ibv_send_wr *bad_wr;

ret = ibv_post_send(ib_qp, &wr, &bad_wr);
if (ret != 0) {
fprintf(stderr, "Couldn't post send\n");
return 1;
}
printf("post send msg:%s\n", (char *)buf);

/* client等待send完成 */
int ne;
ibv_wc wc;
do {
ne = ibv_poll_cq(ib_cq, 1, &wc);
if (ne < 0) {
fprintf(stderr, "poll CQ failed %d\n", ne);
return 1;
}
} while (ne < 1);
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "Failed status %s (%d) for wr_id %d\n",
ibv_wc_status_str(wc.status),
wc.status, (int)wc.wr_id);
return 1;
}
if (wc.wr_id != PINGPONG_SEND_WRID) {
fprintf(stderr, "Unexpected recv completion %d\n", (int)wc.wr_id);
return 1;
}
printf("send complete\n");

/* client等待server回复 */
do {
ne = ibv_poll_cq(ib_cq, 1, &wc);
if (ne < 0) {
fprintf(stderr, "poll CQ failed %d\n", ne);
return 1;
}
} while (ne < 1);
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "Failed status %s (%d) for wr_id %d\n",
ibv_wc_status_str(wc.status),
wc.status, (int)wc.wr_id);
return 1;
}
if (wc.wr_id != PINGPONG_RECV_WRID) {
fprintf(stderr, "Unexpected send completion\n");
return 1;
}
printf("recv msg:%s\n", (char *)buf);
}

server端相反,收到发送方发来的消息后返回一条消息,随机退出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
else {
/* server等待client回复 */
int ne;
ibv_wc wc;
do {
ne = ibv_poll_cq(ib_cq, 1, &wc);
if (ne < 0) {
fprintf(stderr, "poll CQ failed %d\n", ne);
return 1;
}
} while (ne < 1);
// printf("ne: %d\n", ne);
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "Failed status %s (%d) for wr_id %d\n",
ibv_wc_status_str(wc.status),
wc.status, (int)wc.wr_id);
return 1;
}
if (wc.wr_id != PINGPONG_RECV_WRID) {
fprintf(stderr, "Unexpected send completion\n");
return 1;
}
printf("recv msg:%s\n", (char *)buf);

/* server提交一个send */
memset(buf, 0x7c, size);
struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = size,
.lkey = ib_mr->lkey
};
struct ibv_send_wr wr = {
.wr_id = PINGPONG_SEND_WRID,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = send_flags,
};
struct ibv_send_wr *bad_wr;

ret = ibv_post_send(ib_qp, &wr, &bad_wr);
if (ret != 0) {
fprintf(stderr, "Couldn't post send\n");
return 1;
}
printf("post send msg:%s\n", (char *)buf);
/* server等待send完成 */
do {
ne = ibv_poll_cq(ib_cq, 1, &wc);
if (ne < 0) {
fprintf(stderr, "poll CQ failed %d\n", ne);
return 1;
}
} while (ne < 1);
if (wc.status != IBV_WC_SUCCESS) {
fprintf(stderr, "Failed status %s (%d) for wr_id %d\n",
ibv_wc_status_str(wc.status),
wc.status, (int)wc.wr_id);
return 1;
}
if (wc.wr_id != PINGPONG_SEND_WRID) {
fprintf(stderr, "Unexpected recv completion\n");
return 1;
}
printf("send complete\n");
}

释放资源

这里即释放掉第二步申请的资源,并关掉第一步打开的设备:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
if (ib_qp != nullptr && ibv_destroy_qp(ib_qp)) {
fprintf(stderr, "Couldn't destroy QP\n");
return 1;
}

if (ib_cq != nullptr && ibv_destroy_cq(ib_cq)) {
fprintf(stderr, "Couldn't destroy CQ\n");
return 1;
}

if (ib_mr != nullptr && ibv_dereg_mr(ib_mr)) {
fprintf(stderr, "Couldn't deregister MR\n");
return 1;
}

if (ib_pd != nullptr && ibv_dealloc_pd(ib_pd)) {
fprintf(stderr, "Couldn't deallocate PD\n");
return 1;
}

if (ib_ctx != nullptr && ibv_close_device(ib_ctx)) {
fprintf(stderr, "Couldn't release context\n");
return 1;
}
free(buf);

ibv_free_device_list(dev_list);

if (remote_ibaddr != nullptr) {
free(remote_ibaddr);
}

代码封装

在目前的代码中,几乎所有操作都在main函数中,为了改进代码结构,可以把一部分代码提取出来封装成函数,能够多次在main或其他地方复用。Post Send/Recv等操作均可抽出封装为函数,如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static int post_send(void *buf, unsigned int size, ibv_mr *ib_mr, ibv_qp *ib_qp, int send_flags)
{
int ret;
struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = size,
.lkey = ib_mr->lkey
};
struct ibv_send_wr wr = {
.wr_id = PINGPONG_SEND_WRID,
.sg_list = &list,
.num_sge = 1,
.opcode = IBV_WR_SEND,
.send_flags = send_flags,
};
struct ibv_send_wr *bad_wr;

ret = ibv_post_send(ib_qp, &wr, &bad_wr);
if (ret != 0) {
fprintf(stderr, "Couldn't post send\n");
return ret;
}
return ret;
}

static int post_recv(void *buf, unsigned int size, ibv_mr *ib_mr, ibv_qp *ib_qp, int n)
{
struct ibv_sge list = {
.addr = (uintptr_t) buf,
.length = size,
.lkey = ib_mr->lkey
};
struct ibv_recv_wr wr = {
.wr_id = PINGPONG_RECV_WRID,
.sg_list = &list,
.num_sge = 1,
};
struct ibv_recv_wr *bad_wr;
int i;

for (i = 0; i < n; ++i)
if (ibv_post_recv(ib_qp, &wr, &bad_wr))
break;

return i;
}

即使将尽可能多的代码抽离出来封装成函数,编写RDMA代码的流程仍十分复杂,所以这里进一步选择构建出一个类出来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
class RDMAEndpoint {
private:
/* Endpoint related members */
void* buf = nullptr;
unsigned int size = -1;
int txDepth = -1;
int rxDepth = -1;
std::unordered_map<int, std::queue<ibv_wc>> wcQueueMap;
enum class EndpointStatus {
INIT,
CONN,
FNLZ,
FAIL
} endpointStatus;
enum class WorkRequestID {
REP_RECV_WRID = 1,
REP_SEND_WRID,
};

/* IB related members */
int ibPort = 1;
int sendFlags = IBV_ACCESS_LOCAL_WRITE;
int gidIdx = -1;
ibv_context *ibCtx = nullptr;
ibv_pd *ibPD = nullptr;
ibv_mr *ibMR = nullptr;
ibv_cq *ibCQ = nullptr;
ibv_qp *ibQP = nullptr;
struct ibAddrInfo {
int lid;
int qpn;
int psn;
union ibv_gid gid;
} remoteIBAddr = {0}, localIBAddr = {0};

void connect_qp();
void wire_gid_to_gid(const char* wgid, union ibv_gid* gid);
void gid_to_wire_gid(const union ibv_gid* gid, char wgid[]);
void client_exch_ibaddr(const char* servername, int port);
void server_exch_ibaddr(int port);

public:
RDMAEndpoint(std::string deviceName, int gidIdx, void* buf,
unsigned int size, int txDepth, int rxDepth);

void connectToPeer(std::string peerHost, int peerPort);

void postSend();
void postSend(int wrid);

int postRecv(int n);
int postRecv(int n, int wrId);

void pollCompletion(int tag);
void pollSendCompletion();
void pollRecvCompletion();
};

RDMAEndpoint的主要功能是在一块指定大小与地址的缓冲区上建立用来进行RDMA通信的Endpoint,在RDMAEndpoint中,IB相关的一些变量以及通信过程中的一些变量被封装为类的成员变量,其向外提供三类功能:

  1. 初始化与建立连接

    • 构造函数RDMAEndpoint会完成打开设备以及对PD、MR、QP的初始化,将QP迁移到INIT状态;
    • connectToPeer负责建链,将两个Endpoint的QP连接起来;
  2. 提交请求

    • postSend用于提交发送请求,可以传入参数指定请求的ID;
    • postRecv用于提交就收请求,可以传入参数指定请求的ID;
  3. 等待完成:pollCompletionpollSendCompletionpollRecvCompletion

    检查CQ直到有一个对应请求ID的请求完成,拉取出的其他请求会暂存起来。

RDMAEndpoint的代码见pingpong–send-recv–v1.1/pingpong,将其封装起来是为之后的代码做准备,在实现RDMA协议其他功能时向封装好的类里面添加就ok,会少写许多重复的代码。