头部信息
#define _GNU_SOURCE
#include <endian.h>
#include <getopt.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <errno.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <semaphore.h>
#include <pthread.h>
#include <inttypes.h>
#include <rdma/rdma_cma.h>
#include "common.h"
static int debug = 0;
#define DEBUG_LOG if (debug) printf
/*
* rping "ping/pong" loop:
* client sends source rkey/addr/len
* server receives source rkey/add/len
* server rdma reads "ping" data from source
* server sends "go ahead" on rdma read completion
* client sends sink rkey/addr/len
* server receives sink rkey/addr/len
* server rdma writes "pong" data to sink
* server sends "go ahead" on rdma write completion
* <repeat loop>
*/
/*
* These states are used to signal events between the completion handler
* and the main client or server thread.
*
* Once CONNECTED, they cycle through RDMA_READ_ADV, RDMA_WRITE_ADV,
* and RDMA_WRITE_COMPLETE for each ping.
*/
// 状态信息
enum test_state {
IDLE = 1,
CONNECT_REQUEST,
ADDR_RESOLVED,
ROUTE_RESOLVED,
CONNECTED,
RDMA_READ_ADV,
RDMA_READ_COMPLETE,
RDMA_WRITE_ADV,
RDMA_WRITE_COMPLETE,
DISCONNECTED,
ERROR
};
// rping_rdma信息存储器
struct rping_rdma_info {
__be64 buf;
__be32 rkey;
__be32 size;
};
/*
* Default max buffer size for IO...
*/
#define RPING_BUFSIZE 64*1024
#define RPING_SQ_DEPTH 16
/* Default string for print data and
* minimum buffer size
*/
#define _stringify( _x ) # _x
#define stringify( _x ) _stringify( _x )
#define RPING_MSG_FMT "rdma-ping-%d: "
#define RPING_MIN_BUFSIZE sizeof(stringify(INT_MAX)) + sizeof(RPING_MSG_FMT)
/*
* Control block struct.
*/
struct rping_cb {
int server; /* 0 iff client */
pthread_t cqthread;
pthread_t persistent_server_thread;
struct ibv_comp_channel *channel;
struct ibv_cq *cq;
struct ibv_pd *pd;
struct ibv_qp *qp;
struct ibv_recv_wr rq_wr; /* recv work request record */
struct ibv_sge recv_sgl; /* recv single SGE */
struct rping_rdma_info recv_buf;/* malloc'd buffer */
struct ibv_mr *recv_mr; /* MR associated with this buffer */
struct ibv_send_wr sq_wr; /* send work request record */
struct ibv_sge send_sgl;
struct rping_rdma_info send_buf;/* single send buf */
struct ibv_mr *send_mr;
struct ibv_send_wr rdma_sq_wr; /* rdma work request record */
struct ibv_sge rdma_sgl; /* rdma single SGE */
char *rdma_buf; /* used as rdma sink */
struct ibv_mr *rdma_mr;
uint32_t remote_rkey; /* remote guys RKEY */
uint64_t remote_addr; /* remote guys TO */
uint32_t remote_len; /* remote guys LEN */
char *start_buf; /* rdma read src */
struct ibv_mr *start_mr;
enum test_state state; /* used for cond/signalling */
sem_t sem;
struct sockaddr_storage sin;
struct sockaddr_storage ssource;
__be16 port; /* dst port in NBO */
int verbose; /* verbose logging */
int self_create_qp; /* Create QP not via cma */
int count; /* ping count */
int size; /* ping data size */
int validate; /* validate ping data */
/* CM stuff */
pthread_t cmthread;
struct rdma_event_channel *cm_channel;
struct rdma_cm_id *cm_id; /* connection on client side,*/
/* listener on service side. */
struct rdma_cm_id *child_cm_id; /* connection on server side */
};
main函数
一开始解析命令参数,提取需要的参数信息。
int main(int argc, char *argv[])
{
struct rping_cb *cb;
int op;
int ret = 0;
int persistent_server = 0;
cb = malloc(sizeof(*cb));
if (!cb)
return -ENOMEM;
memset(cb, 0, sizeof(*cb));
cb->server = -1;
cb->state = IDLE;
cb->size = 64;
cb->sin.ss_family = PF_INET;
cb->port = htobe16(7174);
sem_init(&cb->sem, 0, 0);
opterr = 0;
while ((op = getopt(argc, argv, "a:I:Pp:C:S:t:scvVdq")) != -1) {
switch (op) {
case 'a':
ret = get_addr(optarg, (struct sockaddr *) &cb->sin);
break;
case 'I':
ret = get_addr(optarg, (struct sockaddr *) &cb->ssource);
break;
case 'P':
persistent_server = 1;
break;
case 'p':
cb->port = htobe16(atoi(optarg));
DEBUG_LOG("port %d\n", (int) atoi(optarg));
break;
case 's':
cb->server = 1;
DEBUG_LOG("server\n");
break;
case 'c':
cb->server = 0;
DEBUG_LOG("client\n");
break;
case 'S':
cb->size = atoi(optarg);
if ((cb->size < RPING_MIN_BUFSIZE) ||
(cb->size > (RPING_BUFSIZE - 1))) {
fprintf(stderr, "Invalid size %d "
"(valid range is %zd to %d)\n",
cb->size, RPING_MIN_BUFSIZE, RPING_BUFSIZE);
ret = EINVAL;
} else
DEBUG_LOG("size %d\n", (int) atoi(optarg));
break;
case 'C':
cb->count = atoi(optarg);
if (cb->count < 0) {
fprintf(stderr, "Invalid count %d\n",
cb->count);
ret = EINVAL;
} else
DEBUG_LOG("count %d\n", (int) cb->count);
break;
case 'v':
cb->verbose++;
DEBUG_LOG("verbose\n");
break;
case 'V':
cb->validate++;
DEBUG_LOG("validate data\n");
break;
case 'd':
debug++;
break;
case 'q':
cb->self_create_qp = 1;
break;
default:
usage("rping");
ret = EINVAL;
goto out;
}
}
if (ret)
goto out;
if (cb->server == -1) {
usage("rping");
ret = EINVAL;
goto out;
}
// 创建事件通道,以便接收对应的RDMA事件
cb->cm_channel = create_first_event_channel();
if (!cb->cm_channel) {
ret = errno;
goto out;
}
// 创建CM管理器,用于交互双方信息
ret = rdma_create_id(cb->cm_channel, &cb->cm_id, cb, RDMA_PS_TCP);
if (ret) {
perror("rdma_create_id");
goto out2;
}
DEBUG_LOG("created cm_id %p\n", cb->cm_id);
// 启用线程处理,后面有解析
ret = pthread_create(&cb->cmthread, NULL, cm_thread, cb);
if (ret) {
perror("pthread_create");
goto out2;
}
// 判断cb里面的标识符server是否有效
if (cb->server) {
if (persistent_server)
ret = rping_run_persistent_server(cb);
else
ret = rping_run_server(cb);
} else {
ret = rping_run_client(cb);
}
DEBUG_LOG("destroy cm_id %p\n", cb->cm_id);
rdma_destroy_id(cb->cm_id);
out2:
rdma_destroy_event_channel(cb->cm_channel);
out:
free(cb);
return ret;
}
在108行启用一个线程来处理函数cm_thread,参数是cb(控制块)
// 主要是从cb里面的cm_channel通道里面过去对应的event,然后交由rping_cma_event_handler函数处理对应的事件信息,然后通过rdma_ack_cm_event函数反馈一个确认。
static void *cm_thread(void *arg)
{
struct rping_cb *cb = arg;
struct rdma_cm_event *event;
int ret;
while (1) {
ret = rdma_get_cm_event(cb->cm_channel, &event);
if (ret) {
perror("rdma_get_cm_event");
exit(ret);
}
ret = rping_cma_event_handler(event->id, event);
rdma_ack_cm_event(event);
if (ret)
exit(ret);
}
}
rping_run_persistent_server函数
static int rping_run_persistent_server(struct rping_cb *listening_cb)
{
int ret;
struct rping_cb *cb;
pthread_attr_t attr;
ret = rping_bind_server(listening_cb); // 读取对应的地址和端口信息,然后绑定在cm_id上,然后开始侦听.
if (ret)
return ret;
/*
* Set persistent server threads to DEATCHED state so
* they release all their resources when they exit.
*/
ret = pthread_attr_init(&attr);// 初始化pthread_attr_t信息。
if (ret) {
perror("pthread_attr_init");
return ret;
}
// 设置创建线程的配置信息。
ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
if (ret) {
perror("pthread_attr_setdetachstate");
return ret;
}
while (1) {
sem_wait(&listening_cb->sem);
if (listening_cb->state != CONNECT_REQUEST) {
fprintf(stderr, "wait for CONNECT_REQUEST state %d\n",
listening_cb->state);
return -1;
}
// 此处类似接收连接,并且开一个新的cm_id用于和客户端交互。
cb = clone_cb(listening_cb);
if (!cb)
return -1;
// 使用线程开启服务模式(里面很复杂,内容挺多)
ret = pthread_create(&cb->persistent_server_thread, &attr, rping_persistent_server_thread, cb);
if (ret) {
perror("pthread_create");
return ret;
}
}
return 0;
}
rping_persistent_server_thread函数
创建QP信息,设置buffer,然后发送一个WR到接收队列中,以便连接的使用,然后启动线程处理,cq_thread函数参数是cb。
static void *rping_persistent_server_thread(void *arg)
{
struct rping_cb *cb = arg;
struct ibv_recv_wr *bad_wr;
int ret;
// 设置qp状态
ret = rping_setup_qp(cb, cb->child_cm_id);
if (ret) {
fprintf(stderr, "setup_qp failed: %d\n", ret);
goto err0;
}
// 设置buffer
ret = rping_setup_buffers(cb);
if (ret) {
fprintf(stderr, "rping_setup_buffers failed: %d\n", ret);
goto err1;
}
// 下发一个WR,目前是为了处理接收情况
ret = ibv_post_recv(cb->qp, &cb->rq_wr, &bad_wr);
if (ret) {
fprintf(stderr, "ibv_post_recv failed: %d\n", ret);
goto err2;
}
// 开启一个新的线程处理问题,根据获取的事件信息,然后处理
ret = pthread_create(&cb->cqthread, NULL, cq_thread, cb);
if (ret) {
perror("pthread_create");
goto err2;
}
// 表示接收连接请求
ret = rping_accept(cb);
if (ret) {
fprintf(stderr, "connect error %d\n", ret);
goto err3;
}
// 然后就是测试,之后就是回收数据和信息
rping_test_server(cb);
rping_disconnect(cb, cb->child_cm_id);
pthread_join(cb->cqthread, NULL);
rping_free_buffers(cb);
rping_free_qp(cb);
rdma_destroy_id(cb->child_cm_id);
free_cb(cb);
return NULL;
err3:
pthread_cancel(cb->cqthread);
pthread_join(cb->cqthread, NULL);
err2:
rping_free_buffers(cb);
err1:
rping_free_qp(cb);
err0:
free_cb(cb);
return NULL;
}