知乎专栏RDMA杂谈非常详尽的讲述了RDMA技术的基本概念与一些重要的细节,希望能在理论学习的基础上写一些代码对RDMA进行实践,同时把学习过程与代码整理出来作为这一系列博客的内容。
代码仓库:RDMA-In-Practice
Ping-Pong
在rdma-core的仓库中是有给出基于RDMA编写的ping-pong程序的示例代码的:https://github.com/linux-rdma/rdma-core/tree/master/libibverbs/examples ,然而直接去看会有些困惑。这篇博客会结合RDMA杂谈专栏中的讲解以及ping-pong示例代码一步一步实现一个RDMAping-pong程序。
这篇文章给出了编写简单RDMA程序的一个大致流程:

整体流程可以大致分为几个阶段:
-
查询与打开设备
-
创建与初始化PD、MR、CQ与QP
-
通过Socket或CM建立QP间连接
-
收发数据
-
销毁创建的资源,关闭设备
接下来本篇文章中就按照这个顺序一点一点完成一个简单的Ping-Pong程序。完整程序见pingpong–send-recv–v1.0
打开IB设备
使用libverbs API编写程序使用RDMA通信离不开IB网卡设备的协助,因此程序首先要查询可用的IB设备并打开。程序能够从命令行读取出用户指定的设备名ib_devname
,并从设备列表中选择设备。这部分代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| dev_list = ibv_get_device_list(&num_devices); if (!dev_list) { perror("Failed to get IB devices list"); return 1; } if (ib_devname.empty()) { ib_dev = dev_list[0]; if (ib_dev == nullptr) { fprintf(stderr, "No IB devices found\n"); return 1; } } else { for (int i = 0; i < num_devices && dev_list[i] != nullptr; i++) { if (strcmp(ibv_get_device_name(dev_list[i]), ib_devname.c_str()) == 0) { ib_dev = dev_list[i]; break; } } if (ib_dev == nullptr) { fprintf(stderr, "No IB devices found\n"); return 1; } }
ib_ctx = ibv_open_device(ib_dev); if (ib_ctx == nullptr) { fprintf(stderr, "Couldn't get context for %s\n", ibv_get_device_name(ib_dev)); goto clear_and_ret; } fprintf(stdout, "Get context for %s\n", ibv_get_device_name(ib_dev));
|
初始化资源
要使用RDMA通信需要将用户态申请的内存区域向RDMA网卡注册为Memory Region,得到操作句柄后才能真正开始通信。创建与注册通信内存缓冲区的完整的流程为:首先申请用户态内存,然后创建Protection Domain,最后在PD下将用户态内存注册为Memory Region。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| int page_size = sysconf(_SC_PAGESIZE); buf = aligned_alloc(page_size, size); if (buf == nullptr) { fprintf(stderr, "Couldn't allocate work buf.\n"); goto clear_and_ret; } memset(buf, 0x7b, size);
ib_pd = ibv_alloc_pd(ib_ctx); if (ib_pd == nullptr) { fprintf(stderr, "Couldn't allocate PD\n"); goto clear_and_ret; }
ib_mr = ibv_reg_mr(ib_pd, buf, size, access_flags); if (ib_mr == nullptr) { fprintf(stderr, "Couldn't register MR\n"); goto clear_and_ret; }
|
其中,由于RDMA通信过程中需要将页锁在物理内存区内,因此尽可能分配页对齐的内存。向RDMA网卡注册好通信的内存缓冲区后,需要完成通信还要有Completion Queue和Queue Pair。创建CQ和QP的过程如下所示:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| ib_cq = ibv_create_cq(ib_ctx, rx_depth + 1, NULL, NULL, 0); if (ib_cq == nullptr) { fprintf(stderr, "Couldn't create CQ\n"); goto clear_and_ret; }
{ struct ibv_qp_attr attr; struct ibv_qp_init_attr init_attr = { .send_cq = ib_cq, .recv_cq = ib_cq, .cap = { .max_send_wr = 1, .max_recv_wr = rx_depth, .max_send_sge = 1, .max_recv_sge = 1 }, .qp_type = IBV_QPT_RC };
ib_qp = ibv_create_qp(ib_pd, &init_attr);
if (ib_qp == nullptr) { fprintf(stderr, "Couldn't create QP\n"); goto clear_and_ret; }
ibv_query_qp(ib_qp, &attr, IBV_QP_CAP, &init_attr); if (init_attr.cap.max_inline_data >= size) send_flags |= IBV_SEND_INLINE; }
|
这里创建的QP中的Send Queue和Recv Queue均使用同一个Completion Queue,max_send_wr
和max_recv_wr
分别代表QP中最大可以容纳Send和Recv请求的数量,qp_type
指定QP的类型为RC。创建好QP后需要改变其状态,并向其中预先填上足够多的Recv请求,防止错过发送方发送的消息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| { struct ibv_qp_attr attr = { .qp_state = IBV_QPS_INIT, .qp_access_flags = 0, .pkey_index = 0, .port_num = ib_port };
if (ibv_modify_qp(ib_qp, &attr, IBV_QP_STATE | IBV_QP_ACCESS_FLAGS | IBV_QP_PKEY_INDEX | IBV_QP_PORT )) { fprintf(stderr, "Failed to modify QP to INIT\n"); perror("ibv_modify_qp"); goto clear_and_ret; } }
{ struct ibv_sge list = { .addr = (uintptr_t) buf, .length = size, .lkey = ib_mr->lkey }; struct ibv_recv_wr wr = { .wr_id = PINGPONG_RECV_WRID, .sg_list = &list, .num_sge = 1, }; struct ibv_recv_wr *bad_wr; int i;
for (i = 0; i < rx_depth; ++i) if (ibv_post_recv(ib_qp, &wr, &bad_wr)) break;
routs = i; if (routs < rx_depth) { fprintf(stderr, "Couldn't post receive (%d)\n", routs); return 1; } printf("routs %d\n", routs); }
|
为双方QP建立连接
QP之间建立连接需要交换地址、QP号、Queue Sequence Number、Global Identifier等信息,对于Send操作,无需交换虚拟地址以及Remote Key。首先通过查询获取需要交换的信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| ret = ibv_query_port(ib_ctx, ib_port, &port_attr); if (ret != 0) { fprintf(stderr, "Couldn't get port info\n"); return 1; }
local_ibaddr.lid = port_attr.lid; if (port_attr.link_layer != IBV_LINK_LAYER_ETHERNET && !port_attr.lid) { fprintf(stderr, "Couldn't get local LID\n"); }
if (gid_idx >= 0) { ret = ibv_query_gid(ib_ctx, ib_port, gid_idx, &local_ibaddr.gid); if (ret != 0) { fprintf(stderr, "can't read sgid of index %d\n", gid_idx); return 1; } } else { memset(&local_ibaddr.gid, 0, sizeof(local_ibaddr.gid)); }
local_ibaddr.qpn = ib_qp->qp_num; local_ibaddr.psn = lrand48() & 0xffffff;
|
接下来便是建立Socket连接、交换信息、再次改变QP状态的过程。值得注意的是为了保证发送方发送消息是接收方的QP已经处于Ready-To-Receieve状态,可以在交换信息的过程中首先切换接收方QP状态,等到完成后再将自己的信息发送给发送方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| inet_ntop(AF_INET6, &local_ibaddr.gid, gid_str, sizeof gid_str); printf(" local address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n", local_ibaddr.lid, local_ibaddr.qpn, local_ibaddr.psn, gid_str); if (server_name.empty()) { remote_ibaddr = server_exch_ibaddr(port, ib_port, gid_idx, ib_qp, &local_ibaddr); } else { remote_ibaddr = client_exch_ibaddr(server_name.c_str(), port, &local_ibaddr); } if (remote_ibaddr == nullptr) return 1; inet_ntop(AF_INET6, &remote_ibaddr->gid, gid_str, sizeof gid_str); printf(" remote address: LID 0x%04x, QPN 0x%06x, PSN 0x%06x, GID %s\n", remote_ibaddr->lid, remote_ibaddr->qpn, remote_ibaddr->psn, gid_str);
|
建立Socket连接、交换信息无非是Socket通信的一套,这里只给出得到信息后QP之间建立连接的过程,在利用收到的信息设置完QP状态后,先后将QP状态转移到RTS和RTR:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| static int connect_qp(const ibaddr_info *remote_ibaddr, const ibaddr_info *local_ibaddr, int ib_port, int gid_idx, ibv_qp *ib_qp) { ibv_qp_attr attr = { .qp_state = IBV_QPS_RTR, .path_mtu = IBV_MTU_1024, .rq_psn = remote_ibaddr->psn, .dest_qp_num = remote_ibaddr->qpn, .ah_attr = { .dlid = remote_ibaddr->lid, .sl = 0, .src_path_bits = 0, .is_global = 0, .port_num = ib_port }, .max_dest_rd_atomic = 1, .min_rnr_timer = 12 };
if (remote_ibaddr->gid.global.interface_id) { attr.ah_attr.is_global = 1; attr.ah_attr.grh.hop_limit = 1; attr.ah_attr.grh.dgid = remote_ibaddr->gid; attr.ah_attr.grh.sgid_index = gid_idx; } if (ibv_modify_qp(ib_qp, &attr, IBV_QP_STATE | IBV_QP_AV | IBV_QP_PATH_MTU | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER)) { fprintf(stderr, "Failed to modify QP to RTR\n"); return 1; }
attr.qp_state = IBV_QPS_RTS; attr.timeout = 14; attr.retry_cnt = 7; attr.rnr_retry = 7; attr.sq_psn = local_ibaddr->psn; attr.max_rd_atomic = 1; if (ibv_modify_qp(ib_qp, &attr, IBV_QP_STATE | IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC)) { fprintf(stderr, "Failed to modify QP to RTS\n"); return 1; } return 0; }
|
自此便可以提交请求在QP之间通信了。
通信
这一部分的代码条理比较清晰。client首先post一个send请求,等发送完成后等待server的回复,在收到回复后退出;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| if (!server_name.empty()) { if (connect_qp(remote_ibaddr, &local_ibaddr, ib_port, gid_idx, ib_qp)) return -1; struct ibv_sge list = { .addr = (uintptr_t) buf, .length = size, .lkey = ib_mr->lkey }; struct ibv_send_wr wr = { .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, .opcode = IBV_WR_SEND, .send_flags = send_flags, }; struct ibv_send_wr *bad_wr;
ret = ibv_post_send(ib_qp, &wr, &bad_wr); if (ret != 0) { fprintf(stderr, "Couldn't post send\n"); return 1; } printf("post send msg:%s\n", (char *)buf);
int ne; ibv_wc wc; do { ne = ibv_poll_cq(ib_cq, 1, &wc); if (ne < 0) { fprintf(stderr, "poll CQ failed %d\n", ne); return 1; } } while (ne < 1); if (wc.status != IBV_WC_SUCCESS) { fprintf(stderr, "Failed status %s (%d) for wr_id %d\n", ibv_wc_status_str(wc.status), wc.status, (int)wc.wr_id); return 1; } if (wc.wr_id != PINGPONG_SEND_WRID) { fprintf(stderr, "Unexpected recv completion %d\n", (int)wc.wr_id); return 1; } printf("send complete\n");
do { ne = ibv_poll_cq(ib_cq, 1, &wc); if (ne < 0) { fprintf(stderr, "poll CQ failed %d\n", ne); return 1; } } while (ne < 1); if (wc.status != IBV_WC_SUCCESS) { fprintf(stderr, "Failed status %s (%d) for wr_id %d\n", ibv_wc_status_str(wc.status), wc.status, (int)wc.wr_id); return 1; } if (wc.wr_id != PINGPONG_RECV_WRID) { fprintf(stderr, "Unexpected send completion\n"); return 1; } printf("recv msg:%s\n", (char *)buf); }
|
server端相反,收到发送方发来的消息后返回一条消息,随机退出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| else { int ne; ibv_wc wc; do { ne = ibv_poll_cq(ib_cq, 1, &wc); if (ne < 0) { fprintf(stderr, "poll CQ failed %d\n", ne); return 1; } } while (ne < 1); if (wc.status != IBV_WC_SUCCESS) { fprintf(stderr, "Failed status %s (%d) for wr_id %d\n", ibv_wc_status_str(wc.status), wc.status, (int)wc.wr_id); return 1; } if (wc.wr_id != PINGPONG_RECV_WRID) { fprintf(stderr, "Unexpected send completion\n"); return 1; } printf("recv msg:%s\n", (char *)buf);
memset(buf, 0x7c, size); struct ibv_sge list = { .addr = (uintptr_t) buf, .length = size, .lkey = ib_mr->lkey }; struct ibv_send_wr wr = { .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, .opcode = IBV_WR_SEND, .send_flags = send_flags, }; struct ibv_send_wr *bad_wr;
ret = ibv_post_send(ib_qp, &wr, &bad_wr); if (ret != 0) { fprintf(stderr, "Couldn't post send\n"); return 1; } printf("post send msg:%s\n", (char *)buf); do { ne = ibv_poll_cq(ib_cq, 1, &wc); if (ne < 0) { fprintf(stderr, "poll CQ failed %d\n", ne); return 1; } } while (ne < 1); if (wc.status != IBV_WC_SUCCESS) { fprintf(stderr, "Failed status %s (%d) for wr_id %d\n", ibv_wc_status_str(wc.status), wc.status, (int)wc.wr_id); return 1; } if (wc.wr_id != PINGPONG_SEND_WRID) { fprintf(stderr, "Unexpected recv completion\n"); return 1; } printf("send complete\n"); }
|
释放资源
这里即释放掉第二步申请的资源,并关掉第一步打开的设备:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| if (ib_qp != nullptr && ibv_destroy_qp(ib_qp)) { fprintf(stderr, "Couldn't destroy QP\n"); return 1; }
if (ib_cq != nullptr && ibv_destroy_cq(ib_cq)) { fprintf(stderr, "Couldn't destroy CQ\n"); return 1; }
if (ib_mr != nullptr && ibv_dereg_mr(ib_mr)) { fprintf(stderr, "Couldn't deregister MR\n"); return 1; }
if (ib_pd != nullptr && ibv_dealloc_pd(ib_pd)) { fprintf(stderr, "Couldn't deallocate PD\n"); return 1; }
if (ib_ctx != nullptr && ibv_close_device(ib_ctx)) { fprintf(stderr, "Couldn't release context\n"); return 1; } free(buf);
ibv_free_device_list(dev_list);
if (remote_ibaddr != nullptr) { free(remote_ibaddr); }
|
代码封装
在目前的代码中,几乎所有操作都在main函数中,为了改进代码结构,可以把一部分代码提取出来封装成函数,能够多次在main或其他地方复用。Post Send/Recv等操作均可抽出封装为函数,如:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| static int post_send(void *buf, unsigned int size, ibv_mr *ib_mr, ibv_qp *ib_qp, int send_flags) { int ret; struct ibv_sge list = { .addr = (uintptr_t) buf, .length = size, .lkey = ib_mr->lkey }; struct ibv_send_wr wr = { .wr_id = PINGPONG_SEND_WRID, .sg_list = &list, .num_sge = 1, .opcode = IBV_WR_SEND, .send_flags = send_flags, }; struct ibv_send_wr *bad_wr;
ret = ibv_post_send(ib_qp, &wr, &bad_wr); if (ret != 0) { fprintf(stderr, "Couldn't post send\n"); return ret; } return ret; }
static int post_recv(void *buf, unsigned int size, ibv_mr *ib_mr, ibv_qp *ib_qp, int n) { struct ibv_sge list = { .addr = (uintptr_t) buf, .length = size, .lkey = ib_mr->lkey }; struct ibv_recv_wr wr = { .wr_id = PINGPONG_RECV_WRID, .sg_list = &list, .num_sge = 1, }; struct ibv_recv_wr *bad_wr; int i;
for (i = 0; i < n; ++i) if (ibv_post_recv(ib_qp, &wr, &bad_wr)) break;
return i; }
|
即使将尽可能多的代码抽离出来封装成函数,编写RDMA代码的流程仍十分复杂,所以这里进一步选择构建出一个类出来:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| class RDMAEndpoint { private: void* buf = nullptr; unsigned int size = -1; int txDepth = -1; int rxDepth = -1; std::unordered_map<int, std::queue<ibv_wc>> wcQueueMap; enum class EndpointStatus { INIT, CONN, FNLZ, FAIL } endpointStatus; enum class WorkRequestID { REP_RECV_WRID = 1, REP_SEND_WRID, };
int ibPort = 1; int sendFlags = IBV_ACCESS_LOCAL_WRITE; int gidIdx = -1; ibv_context *ibCtx = nullptr; ibv_pd *ibPD = nullptr; ibv_mr *ibMR = nullptr; ibv_cq *ibCQ = nullptr; ibv_qp *ibQP = nullptr; struct ibAddrInfo { int lid; int qpn; int psn; union ibv_gid gid; } remoteIBAddr = {0}, localIBAddr = {0};
void connect_qp(); void wire_gid_to_gid(const char* wgid, union ibv_gid* gid); void gid_to_wire_gid(const union ibv_gid* gid, char wgid[]); void client_exch_ibaddr(const char* servername, int port); void server_exch_ibaddr(int port);
public: RDMAEndpoint(std::string deviceName, int gidIdx, void* buf, unsigned int size, int txDepth, int rxDepth);
void connectToPeer(std::string peerHost, int peerPort);
void postSend(); void postSend(int wrid);
int postRecv(int n); int postRecv(int n, int wrId);
void pollCompletion(int tag); void pollSendCompletion(); void pollRecvCompletion(); };
|
RDMAEndpoint
的主要功能是在一块指定大小与地址的缓冲区上建立用来进行RDMA通信的Endpoint,在RDMAEndpoint
中,IB相关的一些变量以及通信过程中的一些变量被封装为类的成员变量,其向外提供三类功能:
-
初始化与建立连接
- 构造函数
RDMAEndpoint
会完成打开设备以及对PD、MR、QP的初始化,将QP迁移到INIT状态;
connectToPeer
负责建链,将两个Endpoint的QP连接起来;
-
提交请求
postSend
用于提交发送请求,可以传入参数指定请求的ID;
postRecv
用于提交就收请求,可以传入参数指定请求的ID;
-
等待完成:pollCompletion
、pollSendCompletion
、pollRecvCompletion
检查CQ直到有一个对应请求ID的请求完成,拉取出的其他请求会暂存起来。
RDMAEndpoint
的代码见pingpong–send-recv–v1.1/pingpong,将其封装起来是为之后的代码做准备,在实现RDMA协议其他功能时向封装好的类里面添加就ok,会少写许多重复的代码。