1、引入的库文件
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netdb.h>
#include <unistd.h>
#include <getopt.h>
#include <rdma/rdma_cma.h>
#include <infiniband/ib.h>
#include "common.h"
struct cmatest_node {
int id;
struct rdma_cm_id *cma_id;
int connected;
struct ibv_pd *pd;
struct ibv_cq *cq;
struct ibv_mr *mr;
struct ibv_ah *ah;
uint32_t remote_qpn;
uint32_t remote_qkey;
void *mem;
};
struct cmatest {
struct rdma_event_channel *channel;
pthread_t cmathread;
struct cmatest_node *nodes;
int conn_index;
int connects_left;
struct sockaddr_storage dst_in;
struct sockaddr *dst_addr;
struct sockaddr_storage src_in;
struct sockaddr *src_addr;
};
static struct cmatest test;
static int connections = 1;
static int message_size = 100;
static int message_count = 10;
static int is_sender;
static int send_only;
static int loopback = 1;
static int unmapped_addr;
static char *dst_addr;
static char *src_addr;
static enum rdma_port_space port_space = RDMA_PS_UDP;
2、common.h文件分析
最后一个是在本地导入了一个common.h文件,主要是表示了,当use_rs表示0的时候,使用传统的socket方式通信,use_rs为1 的时候表示使用rsocket的方式进行通信,里面是一些相关的结构体声明和函数声明。
#include <stdlib.h>
#include <sys/types.h>
#include <endian.h>
#include <poll.h>
#include <rdma/rdma_cma.h>
#include <rdma/rsocket.h>
#include <infiniband/ib.h>
/* Defined in common.c; used in all rsocket demos to determine whether to use
* rsocket calls or standard socket calls.
*/
extern int use_rs;
static inline int rs_socket(int f, int t, int p)
{
int fd;
if (!use_rs)
return socket(f, t, p);
fd = rsocket(f, t, p);
if (fd < 0) {
if (t == SOCK_STREAM && errno == ENODEV)
fprintf(stderr, "No RDMA devices were detected\n");
else
perror("rsocket failed");
}
return fd;
}
#define rs_bind(s,a,l) use_rs ? rbind(s,a,l) : bind(s,a,l)
#define rs_listen(s,b) use_rs ? rlisten(s,b) : listen(s,b)
#define rs_connect(s,a,l) use_rs ? rconnect(s,a,l) : connect(s,a,l)
#define rs_accept(s,a,l) use_rs ? raccept(s,a,l) : accept(s,a,l)
#define rs_shutdown(s,h) use_rs ? rshutdown(s,h) : shutdown(s,h)
#define rs_close(s) use_rs ? rclose(s) : close(s)
#define rs_recv(s,b,l,f) use_rs ? rrecv(s,b,l,f) : recv(s,b,l,f)
#define rs_send(s,b,l,f) use_rs ? rsend(s,b,l,f) : send(s,b,l,f)
#define rs_recvfrom(s,b,l,f,a,al) \
use_rs ? rrecvfrom(s,b,l,f,a,al) : recvfrom(s,b,l,f,a,al)
#define rs_sendto(s,b,l,f,a,al) \
use_rs ? rsendto(s,b,l,f,a,al) : sendto(s,b,l,f,a,al)
#define rs_poll(f,n,t) use_rs ? rpoll(f,n,t) : poll(f,n,t)
#define rs_fcntl(s,c,p) use_rs ? rfcntl(s,c,p) : fcntl(s,c,p)
#define rs_setsockopt(s,l,n,v,ol) \
use_rs ? rsetsockopt(s,l,n,v,ol) : setsockopt(s,l,n,v,ol)
#define rs_getsockopt(s,l,n,v,ol) \
use_rs ? rgetsockopt(s,l,n,v,ol) : getsockopt(s,l,n,v,ol)
union socket_addr {
struct sockaddr sa;
struct sockaddr_in sin;
struct sockaddr_in6 sin6;
};
enum rs_optimization {
opt_mixed,
opt_latency,
opt_bandwidth
};
int get_rdma_addr(const char *src, const char *dst, const char *port,
struct rdma_addrinfo *hints, struct rdma_addrinfo **rai);
void size_str(char *str, size_t ssize, long long size);
void cnt_str(char *str, size_t ssize, long long cnt);
int size_to_count(int size);
void format_buf(void *buf, int size);
int verify_buf(void *buf, int size);
int do_poll(struct pollfd *fds, int timeout);
struct rdma_event_channel *create_first_event_channel(void);
3、主函数main分析
int main(int argc, char **argv)
{
int op, ret;
// 下面还是对命令行的解析
while ((op = getopt(argc, argv, "m:M:sb:c:C:S:p:ol")) != -1) {
switch (op) {
case 'm':
dst_addr = optarg;
break;
case 'M':
unmapped_addr = 1;
dst_addr = optarg;
break;
case 's':
is_sender = 1;
break;
case 'b':
src_addr = optarg;
test.src_addr = (struct sockaddr *) &test.src_in;
break;
case 'c':
connections = atoi(optarg);
break;
case 'C':
message_count = atoi(optarg);
break;
case 'S':
message_size = atoi(optarg);
break;
case 'p':
port_space = strtol(optarg, NULL, 0);
break;
case 'o':
send_only = 1;
break;
case 'l':
loopback = 0;
break;
default:
printf("usage: %s\n", argv[0]);
printf("\t-m multicast_address\n");
printf("\t[-M unmapped_multicast_address]\n"
"\t replaces -m and requires -b\n");
printf("\t[-s(ender)]\n");
printf("\t[-b bind_address]\n");
printf("\t[-c connections]\n");
printf("\t[-C message_count]\n");
printf("\t[-S message_size]\n");
printf("\t[-p port_space - %#x for UDP (default), "
"%#x for IPOIB]\n", RDMA_PS_UDP, RDMA_PS_IPOIB);
printf("\t[-o join as a send-only full-member]\n");
printf("\t[-l join without multicast loopback]\n");
exit(1);
}
}
//
if (unmapped_addr && !src_addr) {
printf("unmapped multicast address requires binding "
"to source address\n");
exit(1);
}
// 目标地址存储在dst_addr中,connections表示还有多少个连接建立
test.dst_addr = (struct sockaddr *) &test.dst_in;
test.connects_left = connections;
test.channel = create_first_event_channel(); // 创建事件通道
if (!test.channel) {
exit(1);
}
if (alloc_nodes()) // 为每个链接创建node,为每个node提供pd,cq和mr等
exit(1);
ret = run();
printf("test complete\n");
destroy_nodes();
rdma_destroy_event_channel(test.channel);
printf("return status %d\n", ret);
return ret;
}
4、alloc_node函数分析
主要是每个需要创建的连接分配结构体用于储存相关信息。
run函数是开始函数
static int run(void)
{
int i, ret;
// 把对应的源地址和目的地址放入到test节点里面的src_in和des_in里面
printf("mckey: starting %s\n", is_sender ? "client" : "server");
if (src_addr) {
ret = get_addr(src_addr, (struct sockaddr *) &test.src_in);
if (ret)
return ret;
}
ret = get_dst_addr(dst_addr, (struct sockaddr *) &test.dst_in);
if (ret)
return ret;
printf("mckey: joining\n");
for (i = 0; i < connections; i++) {
if (src_addr) { // src_addr存在,那么就把源地址和cma_id绑定起来,用于获取事件通知
ret = rdma_bind_addr(test.nodes[i].cma_id,
test.src_addr);
if (ret) {
perror("mckey: addr bind failure");
connect_error();
return ret;
}
}
if (unmapped_addr)
ret = addr_handler(&test.nodes[i]);// 把node节点加入到组播里面
else
ret = rdma_resolve_addr(test.nodes[i].cma_id,
test.src_addr, test.dst_addr,
2000); // 解析检查源地址和目的地址的合法性。
if (ret) {
perror("mckey: resolve addr failure");
connect_error();
return ret;
}
}
ret = connect_events(); // 通过获取事件,然后进入cma_thread函数处理,根据事件的不同选择不同的方式,要么加入到组播,要么就是建立ah
if (ret)
goto out;
pthread_create(&test.cmathread, NULL, cma_thread, NULL); // 开启一个线程,轮询获取事件,判断出错的原因是什么。
/*
* Pause to give SM chance to configure switches. We don't want to
* handle reliability issue in this simple test program.
*/
sleep(3);
if (message_count) {
if (is_sender) {
// 如果is_sender为true,本身就是client,client是主动发送请求到服务器的,所以要先下发一个WR到SQ里面。
printf("initiating data transfers\n");
for (i = 0; i < connections; i++) {
ret = post_sends(&test.nodes[i], 0);
if (ret)
goto out;
}
} else {
// 反之,本身就是server,server是被动接收的,所有直接监听是否有完成事件,就是poll_cqs队列,获取事件。
printf("receiving data transfers\n");
ret = poll_cqs();
if (ret)
goto out;
}
printf("data transfers complete\n");
}
out:
for (i = 0; i < connections; i++) {
ret = rdma_leave_multicast(test.nodes[i].cma_id,
test.dst_addr);
if (ret)
perror("mckey: failure leaving");
}
return ret;
}
5、addr_handler函数
先检验加入的地址是否合法,然后判断是不是服务器,如果是,则下发一个WR到RQ里面,然后配置组播的相关参数,最后加入到组播里面。
static int addr_handler(struct cmatest_node *node)
{
int ret;
struct rdma_cm_join_mc_attr_ex mc_attr;
// 检验当前参数是否合法
ret = verify_test_params(node);
if (ret)
goto err;
ret = init_node(node);
if (ret)
goto err;
// 表示如果不是client,那么就直接下发WR到RQ队列中
if (!is_sender) {
ret = post_recvs(node);
if (ret)
goto err;
}
mc_attr.comp_mask =
RDMA_CM_JOIN_MC_ATTR_ADDRESS | RDMA_CM_JOIN_MC_ATTR_JOIN_FLAGS;
mc_attr.addr = test.dst_addr;
mc_attr.join_flags = send_only ? RDMA_MC_JOIN_FLAG_SENDONLY_FULLMEMBER
: RDMA_MC_JOIN_FLAG_FULLMEMBER;
ret = rdma_join_multicast_ex(node->cma_id, &mc_attr, node);// 加入多播组
if (ret) {
perror("mckey: failure joining");
goto err;
}
return 0;
err:
connect_error();
return ret;
}
6、connect_events函数
根据事件通道里面获取到的时间,从而进入cma_handler函数进行进一步处理,要么检查地址的合法性并且加入到多播组,要么就是建立生成ah保存对方节点的信息,以便后序通信。
static int connect_events(void)
{
struct rdma_cm_event *event;
int ret = 0;
while (test.connects_left && !ret) {
ret = rdma_get_cm_event(test.channel, &event);
if (!ret) {
ret = cma_handler(event->id, event);
rdma_ack_cm_event(event);
}
}
return ret;
}
7、cma_handler函数
获取event,然后判断event是什么事件,然后进行对应的处理,addr_handler就是先验证地址合法性,然后加入到多播组;而join_handler就是保存对方的AH信息(UD服务模式的通信是采用ah来选择路径和目的地址的),类似建立连接(但是不是建立连接)。
static int cma_handler(struct rdma_cm_id *cma_id, struct rdma_cm_event *event)
{
int ret = 0;
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
ret = addr_handler(cma_id->context);
break;
case RDMA_CM_EVENT_MULTICAST_JOIN:
ret = join_handler(cma_id->context, &event->param.ud);
break;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_MULTICAST_ERROR:
printf("mckey: event: %s, error: %d\n",
rdma_event_str(event->event), event->status);
connect_error();
ret = event->status;
break;
case RDMA_CM_EVENT_DEVICE_REMOVAL:
/* Cleanup will occur after test completes. */
break;
default:
break;
}
return ret;
}
8、join_handler函数
把远端的配置信息,例如,qpn,qkey等信息通过ibv_create_ah函数创建一个ah函数。
static int join_handler(struct cmatest_node *node,
struct rdma_ud_param *param)
{
char buf[40];
inet_ntop(AF_INET6, param->ah_attr.grh.dgid.raw, buf, 40);
printf("mckey: joined dgid: %s mlid 0x%x sl %d\n", buf,
param->ah_attr.dlid, param->ah_attr.sl);
node->remote_qpn = param->qp_num;
node->remote_qkey = param->qkey;
node->ah = ibv_create_ah(node->pd, ¶m->ah_attr);
if (!node->ah) {
printf("mckey: failure creating address handle\n");
goto err;
}
node->connected = 1;
test.connects_left--;
return 0;
err:
connect_error();
return -1;
}
9、cma_thread函数
通过事件通道,获取对应的事件,然后通过判断是否是RDMA_CM_EVENT_MULTICAST_ERROR和RDMA_CM_EVENT_ADDR_CHANGE的错误,如果是,打印对应的事件和状态信息。最后都是生成一个ack回馈。
static void *cma_thread(void *arg)
{
struct rdma_cm_event *event;
int ret;
while (1) {
ret = rdma_get_cm_event(test.channel, &event);
if (ret) {
perror("rdma_get_cm_event");
break;
}
switch (event->event) {
case RDMA_CM_EVENT_MULTICAST_ERROR:
case RDMA_CM_EVENT_ADDR_CHANGE:
printf("mckey: event: %s, status: %d\n",
rdma_event_str(event->event), event->status);
break;
default:
break;
}
rdma_ack_cm_event(event);
}
return NULL;
}
总结:
使用的是UD服务模式通信,把client的代码和server代码放在一起了,通过is_sender来判断是client还是server。详细的讲述了整个通信过程,首先是解析命令行,然后把双方的地址放入到对应的地址变量里面,然后检验双方的地址的合法性,最后加入到多播组人挪活或者生成AH保存对方的节点信息,最后测试client发送数据过去,接收端通过读取cq队列判断是否接收成功。