aboutsummaryrefslogtreecommitdiffstats
path: root/library/cpp/netliba/v6/ib_low.h
diff options
context:
space:
mode:
authormonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
committermonster <monster@ydb.tech>2022-07-07 14:41:37 +0300
commit06e5c21a835c0e923506c4ff27929f34e00761c2 (patch)
tree75efcbc6854ef9bd476eb8bf00cc5c900da436a2 /library/cpp/netliba/v6/ib_low.h
parent03f024c4412e3aa613bb543cf1660176320ba8f4 (diff)
downloadydb-06e5c21a835c0e923506c4ff27929f34e00761c2.tar.gz
fix ya.make
Diffstat (limited to 'library/cpp/netliba/v6/ib_low.h')
-rw-r--r--library/cpp/netliba/v6/ib_low.h797
1 files changed, 797 insertions, 0 deletions
diff --git a/library/cpp/netliba/v6/ib_low.h b/library/cpp/netliba/v6/ib_low.h
new file mode 100644
index 0000000000..b2a3e341d2
--- /dev/null
+++ b/library/cpp/netliba/v6/ib_low.h
@@ -0,0 +1,797 @@
+#pragma once
+
+#include "udp_address.h"
+
+#if defined(_linux_) && !defined(CATBOOST_OPENSOURCE)
+#include <contrib/libs/ibdrv/include/infiniband/verbs.h>
+#include <contrib/libs/ibdrv/include/rdma/rdma_cma.h>
+#endif
+
+namespace NNetliba {
+#define CHECK_Z(x) \
+ { \
+ int rv = (x); \
+ if (rv != 0) { \
+ fprintf(stderr, "check_z failed, errno = %d\n", errno); \
+ Y_VERIFY(0, "check_z"); \
+ } \
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ const int MAX_SGE = 1;
+ const size_t MAX_INLINE_DATA_SIZE = 16;
+ const int MAX_OUTSTANDING_RDMA = 10;
+
+#if defined(_linux_) && !defined(CATBOOST_OPENSOURCE)
+ class TIBContext: public TThrRefBase, TNonCopyable {
+ ibv_context* Context;
+ ibv_pd* ProtDomain;
+ TMutex Lock;
+
+ ~TIBContext() override {
+ if (Context) {
+ CHECK_Z(ibv_dealloc_pd(ProtDomain));
+ CHECK_Z(ibv_close_device(Context));
+ }
+ }
+
+ public:
+ TIBContext(ibv_device* device) {
+ Context = ibv_open_device(device);
+ if (Context) {
+ ProtDomain = ibv_alloc_pd(Context);
+ }
+ }
+ bool IsValid() const {
+ return Context != nullptr && ProtDomain != nullptr;
+ }
+
+ class TLock {
+ TIntrusivePtr<TIBContext> Ptr;
+ TGuard<TMutex> Guard;
+
+ public:
+ TLock(TPtrArg<TIBContext> ctx)
+ : Ptr(ctx)
+ , Guard(ctx->Lock)
+ {
+ }
+ ibv_context* GetContext() {
+ return Ptr->Context;
+ }
+ ibv_pd* GetProtDomain() {
+ return Ptr->ProtDomain;
+ }
+ };
+ };
+
+ class TIBPort: public TThrRefBase, TNonCopyable {
+ int Port;
+ int LID;
+ TIntrusivePtr<TIBContext> IBCtx;
+ enum {
+ MAX_GID = 16
+ };
+ ibv_gid MyGidArr[MAX_GID];
+
+ public:
+ TIBPort(TPtrArg<TIBContext> ctx, int port)
+ : IBCtx(ctx)
+ {
+ ibv_port_attr portAttrs;
+ TIBContext::TLock ibContext(IBCtx);
+ CHECK_Z(ibv_query_port(ibContext.GetContext(), port, &portAttrs));
+ Port = port;
+ LID = portAttrs.lid;
+ for (int i = 0; i < MAX_GID; ++i) {
+ ibv_gid& dst = MyGidArr[i];
+ Zero(dst);
+ ibv_query_gid(ibContext.GetContext(), Port, i, &dst);
+ }
+ }
+ int GetPort() const {
+ return Port;
+ }
+ int GetLID() const {
+ return LID;
+ }
+ TIBContext* GetCtx() {
+ return IBCtx.Get();
+ }
+ void GetGID(ibv_gid* res) const {
+ *res = MyGidArr[0];
+ }
+ int GetGIDIndex(const ibv_gid& arg) const {
+ for (int i = 0; i < MAX_GID; ++i) {
+ const ibv_gid& chk = MyGidArr[i];
+ if (memcmp(&chk, &arg, sizeof(chk)) == 0) {
+ return i;
+ }
+ }
+ return 0;
+ }
+ void GetAHAttr(ibv_wc* wc, ibv_grh* grh, ibv_ah_attr* res) {
+ TIBContext::TLock ibContext(IBCtx);
+ CHECK_Z(ibv_init_ah_from_wc(ibContext.GetContext(), Port, wc, grh, res));
+ }
+ };
+
+ class TComplectionQueue: public TThrRefBase, TNonCopyable {
+ ibv_cq* CQ;
+ TIntrusivePtr<TIBContext> IBCtx;
+
+ ~TComplectionQueue() override {
+ if (CQ) {
+ CHECK_Z(ibv_destroy_cq(CQ));
+ }
+ }
+
+ public:
+ TComplectionQueue(TPtrArg<TIBContext> ctx, int maxCQEcount)
+ : IBCtx(ctx)
+ {
+ TIBContext::TLock ibContext(IBCtx);
+ /* ibv_cq_init_attr_ex attr;
+ Zero(attr);
+ attr.cqe = maxCQEcount;
+ attr.cq_create_flags = 0;
+ ibv_cq_ex *vcq = ibv_create_cq_ex(ibContext.GetContext(), &attr);
+ if (vcq) {
+ CQ = (ibv_cq*)vcq; // doubtful trick but that's life
+ } else {*/
+ // no completion channel
+ // no completion vector
+ CQ = ibv_create_cq(ibContext.GetContext(), maxCQEcount, nullptr, nullptr, 0);
+ // }
+ }
+ ibv_cq* GetCQ() {
+ return CQ;
+ }
+ int Poll(ibv_wc* res, int bufSize) {
+ Y_ASSERT(bufSize >= 1);
+ //struct ibv_wc
+ //{
+ // ui64 wr_id;
+ // enum ibv_wc_status status;
+ // enum ibv_wc_opcode opcode;
+ // ui32 vendor_err;
+ // ui32 byte_len;
+ // ui32 imm_data;/* network byte order */
+ // ui32 qp_num;
+ // ui32 src_qp;
+ // enum ibv_wc_flags wc_flags;
+ // ui16 pkey_index;
+ // ui16 slid;
+ // ui8 sl;
+ // ui8 dlid_path_bits;
+ //};
+ int rv = ibv_poll_cq(CQ, bufSize, res);
+ if (rv < 0) {
+ Y_VERIFY(0, "ibv_poll_cq failed");
+ }
+ if (rv > 0) {
+ //printf("Completed wr\n");
+ //printf(" wr_id = %" PRIx64 "\n", wc.wr_id);
+ //printf(" status = %d\n", wc.status);
+ //printf(" opcode = %d\n", wc.opcode);
+ //printf(" byte_len = %d\n", wc.byte_len);
+ //printf(" imm_data = %d\n", wc.imm_data);
+ //printf(" qp_num = %d\n", wc.qp_num);
+ //printf(" src_qp = %d\n", wc.src_qp);
+ //printf(" wc_flags = %x\n", wc.wc_flags);
+ //printf(" slid = %d\n", wc.slid);
+ }
+ //rv = number_of_toggled_wc;
+ return rv;
+ }
+ };
+
+ //struct ibv_mr
+ //{
+ // struct ibv_context *context;
+ // struct ibv_pd *pd;
+ // void *addr;
+ // size_t length;
+ // ui32 handle;
+ // ui32 lkey;
+ // ui32 rkey;
+ //};
+ class TMemoryRegion: public TThrRefBase, TNonCopyable {
+ ibv_mr* MR;
+ TIntrusivePtr<TIBContext> IBCtx;
+
+ ~TMemoryRegion() override {
+ if (MR) {
+ CHECK_Z(ibv_dereg_mr(MR));
+ }
+ }
+
+ public:
+ TMemoryRegion(TPtrArg<TIBContext> ctx, size_t len)
+ : IBCtx(ctx)
+ {
+ TIBContext::TLock ibContext(IBCtx);
+ int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; // TODO: IBV_ACCESS_ALLOCATE_MR
+ MR = ibv_reg_mr(ibContext.GetProtDomain(), 0, len, access);
+ Y_ASSERT(MR);
+ }
+ ui32 GetLKey() const {
+ static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
+ return MR->lkey;
+ }
+ ui32 GetRKey() const {
+ static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
+ return MR->lkey;
+ }
+ char* GetData() {
+ return MR ? (char*)MR->addr : nullptr;
+ }
+ bool IsCovered(const void* data, size_t len) const {
+ size_t dataAddr = (const char*)data - (const char*)nullptr;
+ size_t bufAddr = (const char*)MR->addr - (const char*)nullptr;
+ return (dataAddr >= bufAddr) && (dataAddr + len <= bufAddr + MR->length);
+ }
+ };
+
+ class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
+ ibv_srq* SRQ;
+ TIntrusivePtr<TIBContext> IBCtx;
+
+ ~TSharedReceiveQueue() override {
+ if (SRQ) {
+ ibv_destroy_srq(SRQ);
+ }
+ }
+
+ public:
+ TSharedReceiveQueue(TPtrArg<TIBContext> ctx, int maxWR)
+ : IBCtx(ctx)
+ {
+ ibv_srq_init_attr attr;
+ Zero(attr);
+ attr.srq_context = this;
+ attr.attr.max_sge = MAX_SGE;
+ attr.attr.max_wr = maxWR;
+
+ TIBContext::TLock ibContext(IBCtx);
+ SRQ = ibv_create_srq(ibContext.GetProtDomain(), &attr);
+ Y_ASSERT(SRQ);
+ }
+ ibv_srq* GetSRQ() {
+ return SRQ;
+ }
+ void PostReceive(TPtrArg<TMemoryRegion> mem, ui64 id, const void* buf, size_t len) {
+ Y_ASSERT(mem->IsCovered(buf, len));
+ ibv_recv_wr wr, *bad;
+ ibv_sge sg;
+ sg.addr = (const char*)buf - (const char*)nullptr;
+ sg.length = len;
+ sg.lkey = mem->GetLKey();
+ Zero(wr);
+ wr.wr_id = id;
+ wr.sg_list = &sg;
+ wr.num_sge = 1;
+ CHECK_Z(ibv_post_srq_recv(SRQ, &wr, &bad));
+ }
+ };
+
+ inline void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, int lid, int serviceLevel) {
+ Zero(*res);
+ res->dlid = lid;
+ res->port_num = port->GetPort();
+ res->sl = serviceLevel;
+ }
+
+ void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel);
+
+ class TAddressHandle: public TThrRefBase, TNonCopyable {
+ ibv_ah* AH;
+ TIntrusivePtr<TIBContext> IBCtx;
+
+ ~TAddressHandle() override {
+ if (AH) {
+ CHECK_Z(ibv_destroy_ah(AH));
+ }
+ AH = nullptr;
+ IBCtx = nullptr;
+ }
+
+ public:
+ TAddressHandle(TPtrArg<TIBContext> ctx, ibv_ah_attr* attr)
+ : IBCtx(ctx)
+ {
+ TIBContext::TLock ibContext(IBCtx);
+ AH = ibv_create_ah(ibContext.GetProtDomain(), attr);
+ Y_ASSERT(AH != nullptr);
+ }
+ TAddressHandle(TPtrArg<TIBPort> port, int lid, int serviceLevel)
+ : IBCtx(port->GetCtx())
+ {
+ ibv_ah_attr attr;
+ MakeAH(&attr, port, lid, serviceLevel);
+ TIBContext::TLock ibContext(IBCtx);
+ AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
+ Y_ASSERT(AH != nullptr);
+ }
+ TAddressHandle(TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel)
+ : IBCtx(port->GetCtx())
+ {
+ ibv_ah_attr attr;
+ MakeAH(&attr, port, remoteAddr, localAddr, serviceLevel);
+ TIBContext::TLock ibContext(IBCtx);
+ AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
+ Y_ASSERT(AH != nullptr);
+ }
+ ibv_ah* GetAH() {
+ return AH;
+ }
+ bool IsValid() const {
+ return AH != nullptr;
+ }
+ };
+
+ // GRH + wc -> address_handle_attr
+ //int ibv_init_ah_from_wc(struct ibv_context *context, ui8 port_num,
+ //struct ibv_wc *wc, struct ibv_grh *grh,
+ //struct ibv_ah_attr *ah_attr)
+ //ibv_create_ah_from_wc(struct ibv_pd *pd, struct ibv_wc *wc, struct ibv_grh
+ // *grh, ui8 port_num)
+
+ class TQueuePair: public TThrRefBase, TNonCopyable {
+ protected:
+ ibv_qp* QP;
+ int MyPSN; // start packet sequence number
+ TIntrusivePtr<TIBContext> IBCtx;
+ TIntrusivePtr<TComplectionQueue> CQ;
+ TIntrusivePtr<TSharedReceiveQueue> SRQ;
+
+ TQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq,
+ int sendQueueSize,
+ ibv_qp_type qp_type)
+ : IBCtx(ctx)
+ , CQ(cq)
+ , SRQ(srq)
+ {
+ MyPSN = GetCycleCount() & 0xffffff; // should be random and different on different runs, 24bit
+
+ ibv_qp_init_attr attr;
+ Zero(attr);
+ attr.qp_context = this; // not really useful
+ attr.send_cq = cq->GetCQ();
+ attr.recv_cq = cq->GetCQ();
+ attr.srq = srq->GetSRQ();
+ attr.cap.max_send_wr = sendQueueSize;
+ attr.cap.max_recv_wr = 0; // we are using srq, no need for qp's rq
+ attr.cap.max_send_sge = MAX_SGE;
+ attr.cap.max_recv_sge = MAX_SGE;
+ attr.cap.max_inline_data = MAX_INLINE_DATA_SIZE;
+ attr.qp_type = qp_type;
+ attr.sq_sig_all = 1; // inline sends need not be signaled, but if they are not work queue overflows
+
+ TIBContext::TLock ibContext(IBCtx);
+ QP = ibv_create_qp(ibContext.GetProtDomain(), &attr);
+ Y_ASSERT(QP);
+
+ //struct ibv_qp {
+ // struct ibv_context *context;
+ // void *qp_context;
+ // struct ibv_pd *pd;
+ // struct ibv_cq *send_cq;
+ // struct ibv_cq *recv_cq;
+ // struct ibv_srq *srq;
+ // ui32 handle;
+ // ui32 qp_num;
+ // enum ibv_qp_state state;
+ // enum ibv_qp_type qp_type;
+
+ // pthread_mutex_t mutex;
+ // pthread_cond_t cond;
+ // ui32 events_completed;
+ //};
+ //qp_context The value qp_context that was provided to ibv_create_qp()
+ //qp_num The number of this Queue Pair
+ //state The last known state of this Queue Pair. The actual state may be different from this state (in the RDMA device transitioned the state into other state)
+ //qp_type The Transport Service Type of this Queue Pair
+ }
+ ~TQueuePair() override {
+ if (QP) {
+ CHECK_Z(ibv_destroy_qp(QP));
+ }
+ }
+ void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
+ ui64 localAddr, ui32 lKey, ui64 id, size_t len) {
+ sg->addr = localAddr;
+ sg->length = len;
+ sg->lkey = lKey;
+ Zero(*wr);
+ wr->wr_id = id;
+ wr->sg_list = sg;
+ wr->num_sge = 1;
+ if (len <= MAX_INLINE_DATA_SIZE) {
+ wr->send_flags = IBV_SEND_INLINE;
+ }
+ }
+ void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
+ TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
+ ui64 localAddr = (const char*)data - (const char*)nullptr;
+ ui32 lKey = 0;
+ if (mem) {
+ Y_ASSERT(mem->IsCovered(data, len));
+ lKey = mem->GetLKey();
+ } else {
+ Y_ASSERT(len <= MAX_INLINE_DATA_SIZE);
+ }
+ FillSendAttrs(wr, sg, localAddr, lKey, id, len);
+ }
+
+ public:
+ int GetQPN() const {
+ if (QP)
+ return QP->qp_num;
+ return 0;
+ }
+ int GetPSN() const {
+ return MyPSN;
+ }
+ // we are using srq
+ //void PostReceive(const TMemoryRegion &mem)
+ //{
+ // ibv_recv_wr wr, *bad;
+ // ibv_sge sg;
+ // sg.addr = mem.Addr;
+ // sg.length = mem.Length;
+ // sg.lkey = mem.lkey;
+ // Zero(wr);
+ // wr.wr_id = 13;
+ // wr.sg_list = sg;
+ // wr.num_sge = 1;
+ // CHECK_Z(ibv_post_recv(QP, &wr, &bad));
+ //}
+ };
+
+ class TRCQueuePair: public TQueuePair {
+ public:
+ TRCQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
+ : TQueuePair(ctx, cq, srq, sendQueueSize, IBV_QPT_RC)
+ {
+ }
+ // SRQ should have receive posted
+ void Init(const ibv_ah_attr& peerAddr, int peerQPN, int peerPSN) {
+ Y_ASSERT(QP->qp_type == IBV_QPT_RC);
+ ibv_qp_attr attr;
+ //{
+ // enum ibv_qp_state qp_state;
+ // enum ibv_qp_state cur_qp_state;
+ // enum ibv_mtu path_mtu;
+ // enum ibv_mig_state path_mig_state;
+ // ui32 qkey;
+ // ui32 rq_psn;
+ // ui32 sq_psn;
+ // ui32 dest_qp_num;
+ // int qp_access_flags;
+ // struct ibv_qp_cap cap;
+ // struct ibv_ah_attr ah_attr;
+ // struct ibv_ah_attr alt_ah_attr;
+ // ui16 pkey_index;
+ // ui16 alt_pkey_index;
+ // ui8 en_sqd_async_notify;
+ // ui8 sq_draining;
+ // ui8 max_rd_atomic;
+ // ui8 max_dest_rd_atomic;
+ // ui8 min_rnr_timer;
+ // ui8 port_num;
+ // ui8 timeout;
+ // ui8 retry_cnt;
+ // ui8 rnr_retry;
+ // ui8 alt_port_num;
+ // ui8 alt_timeout;
+ //};
+ // RESET -> INIT
+ Zero(attr);
+ attr.qp_state = IBV_QPS_INIT;
+ attr.pkey_index = 0;
+ attr.port_num = peerAddr.port_num;
+ // for connected QP
+ attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC;
+ CHECK_Z(ibv_modify_qp(QP, &attr,
+ IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS));
+
+ // INIT -> ReadyToReceive
+ //PostReceive(mem);
+ attr.qp_state = IBV_QPS_RTR;
+ attr.path_mtu = IBV_MTU_512; // allows more fine grained VL arbitration
+ // for connected QP
+ attr.ah_attr = peerAddr;
+ attr.dest_qp_num = peerQPN;
+ attr.rq_psn = peerPSN;
+ attr.max_dest_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
+ attr.min_rnr_timer = 12; // recommended
+ CHECK_Z(ibv_modify_qp(QP, &attr,
+ IBV_QP_STATE | IBV_QP_PATH_MTU |
+ IBV_QP_AV | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
+
+ // ReadyToReceive -> ReadyToTransmit
+ attr.qp_state = IBV_QPS_RTS;
+ // for connected QP
+ attr.timeout = 14; // increased to 18 for sometime, 14 recommended
+ //attr.retry_cnt = 0; // for debug purposes
+ //attr.rnr_retry = 0; // for debug purposes
+ attr.retry_cnt = 7; // release configuration
+ attr.rnr_retry = 7; // release configuration (try forever)
+ attr.sq_psn = MyPSN;
+ attr.max_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
+ CHECK_Z(ibv_modify_qp(QP, &attr,
+ IBV_QP_STATE |
+ IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC));
+ }
+ void PostSend(TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
+ ibv_send_wr wr, *bad;
+ ibv_sge sg;
+ FillSendAttrs(&wr, &sg, mem, id, data, len);
+ wr.opcode = IBV_WR_SEND;
+ //IBV_WR_RDMA_WRITE
+ //IBV_WR_RDMA_WRITE_WITH_IMM
+ //IBV_WR_SEND
+ //IBV_WR_SEND_WITH_IMM
+ //IBV_WR_RDMA_READ
+ //wr.imm_data = xz;
+
+ CHECK_Z(ibv_post_send(QP, &wr, &bad));
+ }
+ void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
+ TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
+ ibv_send_wr wr, *bad;
+ ibv_sge sg;
+ FillSendAttrs(&wr, &sg, mem, id, data, len);
+ wr.opcode = IBV_WR_RDMA_WRITE;
+ wr.wr.rdma.remote_addr = remoteAddr;
+ wr.wr.rdma.rkey = remoteKey;
+
+ CHECK_Z(ibv_post_send(QP, &wr, &bad));
+ }
+ void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
+ ui64 localAddr, ui32 localKey, ui64 id, size_t len) {
+ ibv_send_wr wr, *bad;
+ ibv_sge sg;
+ FillSendAttrs(&wr, &sg, localAddr, localKey, id, len);
+ wr.opcode = IBV_WR_RDMA_WRITE;
+ wr.wr.rdma.remote_addr = remoteAddr;
+ wr.wr.rdma.rkey = remoteKey;
+
+ CHECK_Z(ibv_post_send(QP, &wr, &bad));
+ }
+ void PostRDMAWriteImm(ui64 remoteAddr, ui32 remoteKey, ui32 immData,
+ TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
+ ibv_send_wr wr, *bad;
+ ibv_sge sg;
+ FillSendAttrs(&wr, &sg, mem, id, data, len);
+ wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
+ wr.imm_data = immData;
+ wr.wr.rdma.remote_addr = remoteAddr;
+ wr.wr.rdma.rkey = remoteKey;
+
+ CHECK_Z(ibv_post_send(QP, &wr, &bad));
+ }
+ };
+
+ class TUDQueuePair: public TQueuePair {
+ TIntrusivePtr<TIBPort> Port;
+
+ public:
+ TUDQueuePair(TPtrArg<TIBPort> port, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
+ : TQueuePair(port->GetCtx(), cq, srq, sendQueueSize, IBV_QPT_UD)
+ , Port(port)
+ {
+ }
+ // SRQ should have receive posted
+ void Init(int qkey) {
+ Y_ASSERT(QP->qp_type == IBV_QPT_UD);
+ ibv_qp_attr attr;
+ // RESET -> INIT
+ Zero(attr);
+ attr.qp_state = IBV_QPS_INIT;
+ attr.pkey_index = 0;
+ attr.port_num = Port->GetPort();
+ // for unconnected qp
+ attr.qkey = qkey;
+ CHECK_Z(ibv_modify_qp(QP, &attr,
+ IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY));
+
+ // INIT -> ReadyToReceive
+ //PostReceive(mem);
+ attr.qp_state = IBV_QPS_RTR;
+ CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE));
+
+ // ReadyToReceive -> ReadyToTransmit
+ attr.qp_state = IBV_QPS_RTS;
+ attr.sq_psn = 0;
+ CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN));
+ }
+ void PostSend(TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
+ TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
+ ibv_send_wr wr, *bad;
+ ibv_sge sg;
+ FillSendAttrs(&wr, &sg, mem, id, data, len);
+ wr.opcode = IBV_WR_SEND;
+ wr.wr.ud.ah = ah->GetAH();
+ wr.wr.ud.remote_qpn = remoteQPN;
+ wr.wr.ud.remote_qkey = remoteQKey;
+ //IBV_WR_SEND_WITH_IMM
+ //wr.imm_data = xz;
+
+ CHECK_Z(ibv_post_send(QP, &wr, &bad));
+ }
+ };
+
+ TIntrusivePtr<TIBPort> GetIBDevice();
+
+#else
+ //////////////////////////////////////////////////////////////////////////
+ // stub for OS without IB support
+ //////////////////////////////////////////////////////////////////////////
+ enum ibv_wc_opcode {
+ IBV_WC_SEND,
+ IBV_WC_RDMA_WRITE,
+ IBV_WC_RDMA_READ,
+ IBV_WC_COMP_SWAP,
+ IBV_WC_FETCH_ADD,
+ IBV_WC_BIND_MW,
+ IBV_WC_RECV = 1 << 7,
+ IBV_WC_RECV_RDMA_WITH_IMM
+ };
+
+ enum ibv_wc_status {
+ IBV_WC_SUCCESS,
+ // lots of errors follow
+ };
+ //struct ibv_device;
+ //struct ibv_pd;
+ union ibv_gid {
+ ui8 raw[16];
+ struct {
+ ui64 subnet_prefix;
+ ui64 interface_id;
+ } global;
+ };
+
+ struct ibv_wc {
+ ui64 wr_id;
+ enum ibv_wc_status status;
+ enum ibv_wc_opcode opcode;
+ ui32 imm_data; /* in network byte order */
+ ui32 qp_num;
+ ui32 src_qp;
+ };
+ struct ibv_grh {};
+ struct ibv_ah_attr {
+ ui8 sl;
+ };
+ //struct ibv_cq;
+ class TIBContext: public TThrRefBase, TNonCopyable {
+ public:
+ bool IsValid() const {
+ return false;
+ }
+ //ibv_context *GetContext() { return 0; }
+ //ibv_pd *GetProtDomain() { return 0; }
+ };
+
+ class TIBPort: public TThrRefBase, TNonCopyable {
+ public:
+ TIBPort(TPtrArg<TIBContext>, int) {
+ }
+ int GetPort() const {
+ return 1;
+ }
+ int GetLID() const {
+ return 1;
+ }
+ TIBContext* GetCtx() {
+ return 0;
+ }
+ void GetGID(ibv_gid* res) {
+ Zero(*res);
+ }
+ void GetAHAttr(ibv_wc*, ibv_grh*, ibv_ah_attr*) {
+ }
+ };
+
+ class TComplectionQueue: public TThrRefBase, TNonCopyable {
+ public:
+ TComplectionQueue(TPtrArg<TIBContext>, int) {
+ }
+ //ibv_cq *GetCQ() { return 0; }
+ int Poll(ibv_wc*, int) {
+ return 0;
+ }
+ };
+
+ class TMemoryRegion: public TThrRefBase, TNonCopyable {
+ public:
+ TMemoryRegion(TPtrArg<TIBContext>, size_t) {
+ }
+ ui32 GetLKey() const {
+ return 0;
+ }
+ ui32 GetRKey() const {
+ return 0;
+ }
+ char* GetData() {
+ return 0;
+ }
+ bool IsCovered(const void*, size_t) const {
+ return false;
+ }
+ };
+
+ class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
+ public:
+ TSharedReceiveQueue(TPtrArg<TIBContext>, int) {
+ }
+ //ibv_srq *GetSRQ() { return SRQ; }
+ void PostReceive(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
+ }
+ };
+
+ inline void MakeAH(ibv_ah_attr*, TPtrArg<TIBPort>, int, int) {
+ }
+
+ class TAddressHandle: public TThrRefBase, TNonCopyable {
+ public:
+ TAddressHandle(TPtrArg<TIBContext>, ibv_ah_attr*) {
+ }
+ TAddressHandle(TPtrArg<TIBPort>, int, int) {
+ }
+ TAddressHandle(TPtrArg<TIBPort>, const TUdpAddress&, const TUdpAddress&, int) {
+ }
+ //ibv_ah *GetAH() { return AH; }
+ bool IsValid() {
+ return true;
+ }
+ };
+
+ class TQueuePair: public TThrRefBase, TNonCopyable {
+ public:
+ int GetQPN() const {
+ return 0;
+ }
+ int GetPSN() const {
+ return 0;
+ }
+ };
+
+ class TRCQueuePair: public TQueuePair {
+ public:
+ TRCQueuePair(TPtrArg<TIBContext>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
+ }
+ // SRQ should have receive posted
+ void Init(const ibv_ah_attr&, int, int) {
+ }
+ void PostSend(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
+ }
+ void PostRDMAWrite(ui64, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
+ }
+ void PostRDMAWrite(ui64, ui32, ui64, ui32, ui64, size_t) {
+ }
+ void PostRDMAWriteImm(ui64, ui32, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
+ }
+ };
+
+ class TUDQueuePair: public TQueuePair {
+ TIntrusivePtr<TIBPort> Port;
+
+ public:
+ TUDQueuePair(TPtrArg<TIBPort>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
+ }
+ // SRQ should have receive posted
+ void Init(int) {
+ }
+ void PostSend(TPtrArg<TAddressHandle>, int, int, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
+ }
+ };
+
+ inline TIntrusivePtr<TIBPort> GetIBDevice() {
+ return 0;
+ }
+#endif
+}