From 5b66480595ed2e4ccee79e660396980e020ffc74 Mon Sep 17 00:00:00 2001 From: Ghadeer Ahmed H Alabandi Date: Fri, 7 Nov 2025 00:47:01 +0000 Subject: [PATCH] [NET] Enable capping the number of QPs created for send/recv colls (#1998) [ROCm/rccl commit: 45991fadadefe44d31ff8fd8278be981d57a3129] --- projects/rccl/CHANGELOG.md | 1 + projects/rccl/src/include/net.h | 2 ++ projects/rccl/src/transport/net.cc | 14 ++++++++ projects/rccl/src/transport/net_ib.cc | 49 ++++++++++++++++++++------- 4 files changed, 53 insertions(+), 13 deletions(-) diff --git a/projects/rccl/CHANGELOG.md b/projects/rccl/CHANGELOG.md index 7b7719971e..c1a3de4630 100644 --- a/projects/rccl/CHANGELOG.md +++ b/projects/rccl/CHANGELOG.md @@ -20,6 +20,7 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https: ## RCCL 2.27.7 for ROCm 7.1.0 ### Added +* Added `RCCL_IB_QPS_PER_P2P` to set the number of QPs per connection for P2P operations. When set (≥1), P2P operations (Send/Recv) use `RCCL_IB_QPS_PER_P2P`, while other collective operations continue to use `NCCL_IB_QPS_PER_CONNECTION`. When not set, `NCCL_IB_QPS_PER_CONNECTION` applies to all operations. * Added `RCCL_FORCE_ENABLE_DMABUF` as a debugging feature if the user wants to explicitly enable DMABUF and forego system/kernel checks. * Added `RCCL_P2P_BATCH_THRESHOLD` to set the message size limit for batching P2P operations. This mainly affects small message performance for alltoall at a large scale but also applies to alltoallv. * Added `RCCL_P2P_BATCH_ENABLE` to enable batching P2P operations to receive performance gains for smaller messages up to 4MB for alltoall when the workload requires it. This is to avoid performance dips for larger messages. diff --git a/projects/rccl/src/include/net.h b/projects/rccl/src/include/net.h index 552e9bcb43..3c8567fc31 100644 --- a/projects/rccl/src/include/net.h +++ b/projects/rccl/src/include/net.h @@ -23,4 +23,6 @@ ncclResult_t ncclGpuGdrSupport(struct ncclComm* comm, int* gdrSupport); extern ncclNet_t ncclNetIb; extern ncclNet_t ncclNetSocket; +extern ncclResult_t rcclNetP2pPolicy(void* handle, int isP2p); + #endif diff --git a/projects/rccl/src/transport/net.cc b/projects/rccl/src/transport/net.cc index a7df9a155c..1ad8de99fa 100644 --- a/projects/rccl/src/transport/net.cc +++ b/projects/rccl/src/transport/net.cc @@ -122,6 +122,7 @@ struct sendNetResources { ncclNetDeviceHandle_t* netDeviceHandle; size_t maxP2pBytes; volatile uint32_t* curr_hdp_reg; // Curr GPU in ring (for rdma transport use only) + int isP2p; }; struct recvNetResources { @@ -194,6 +195,7 @@ struct setupReq { int channelId; int connIndex; uint32_t* curr_hdp_reg; + int isP2p; }; NCCL_PARAM(NetOptionalRecvCompletion, "NET_OPTIONAL_RECV_COMPLETION", 1); @@ -222,6 +224,12 @@ static ncclResult_t sendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph req.connIndex = connIndex; req.curr_hdp_reg = 0; req.netDev = -1; + // Determine if this is a P2P connection or not based on the graph pointer + if(graph == NULL) { + req.isP2p = 1; + } else { + req.isP2p = 0; + } int proxyRank = myInfo->rank; int64_t netId; @@ -672,6 +680,7 @@ static ncclResult_t sendProxySetup(struct ncclProxyConnection* connection, struc resources->channelId = req->channelId; resources->connIndex = req->connIndex; resources->curr_hdp_reg = req->curr_hdp_reg; + resources->isP2p = req->isP2p; ncclNetProperties_t props; NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props)); /* DMA-BUF support */ @@ -766,6 +775,11 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str commConfig.trafficClass = req->trafficClass == NCCL_CONFIG_UNDEF_INT ? NCCL_NET_TRAFFIC_CLASS_UNDEF : req->trafficClass; NCCLCHECK(ncclNetGetDeviceHandle(resources->netDeviceType, resources->netDeviceVersion, false /*isRecv*/, &resources->netDeviceHandle)); bool rccl_anp = !(strcmp(proxyState->ncclNet->name, RCCL_ANP_PLUGIN_STR)); + + if (rcclNetP2pPolicy) { + NCCLCHECK(rcclNetP2pPolicy(req->handle, resources->isP2p)); + } + if (resources->shared) { // Shared buffers struct ncclProxyProgressState* progressState = &proxyState->progressState; diff --git a/projects/rccl/src/transport/net_ib.cc b/projects/rccl/src/transport/net_ib.cc index 1d3c7a0b25..4262fe50bf 100644 --- a/projects/rccl/src/transport/net_ib.cc +++ b/projects/rccl/src/transport/net_ib.cc @@ -129,6 +129,8 @@ NCCL_PARAM(IbFifoTc, "IB_FIFO_TC", -1); NCCL_PARAM(IbAsyncEvents,"IB_RETURN_ASYNC_EVENTS",1); NCCL_PARAM(IbEceEnable,"IB_ECE_ENABLE",1); NCCL_PARAM(IbDataDirect,"IB_DATA_DIRECT",1); +NCCL_PARAM(IbQpsPerConn, "IB_QPS_PER_CONNECTION", 1); +RCCL_PARAM(IbQpsPerP2p, "IB_QPS_PER_P2P", 0); static ncclResult_t ncclIbStatsInit(struct ncclIbStats* stat) { __atomic_store_n(&stat->fatalErrorCount, 0, __ATOMIC_RELAXED); @@ -151,6 +153,18 @@ static void ncclIbQpFatalError(struct ibv_qp* qp) { static void ncclIbCqFatalError(struct ibv_cq* cq) { ncclIbStatsFatalError((struct ncclIbStats*)cq->cq_context); } +// Calculate number of QPs based on P2P flag and device counts +static int ncclIbCalculateNqps(int isP2p, int localNdevs, int remoteNdevs, const char* funcName) { + auto qp_multiplier = (rcclParamIbQpsPerP2p() > 0 && isP2p) ? + rcclParamIbQpsPerP2p() : ncclParamIbQpsPerConn(); + int localNqps = qp_multiplier * localNdevs; + int remoteNqps = qp_multiplier * remoteNdevs; + int maxNqps = (remoteNqps > localNqps) ? remoteNqps : localNqps; + INFO(NCCL_NET, "NET/IB: %s Max Nqps=%d, localNqps=%d, remoteNqps=%d", + funcName, maxNqps, localNqps, remoteNqps); + return maxNqps; +} + static void ncclIbDevFatalError(struct ncclIbDev* dev) { ncclIbStatsFatalError(&dev->stats); } @@ -1009,6 +1023,7 @@ struct ncclIbConnectionMetadata { int ndevs; int tc; int sl; + int isP2p; }; enum ncclIbCommState { @@ -1034,6 +1049,7 @@ struct ncclIbCommStage { struct ncclIbHandle { union ncclSocketAddress connectAddr; // Filled by the target uint64_t magic; // random number to help debugging + int isP2p; // P2P flag struct ncclIbCommStage stage; // Used by the other side when connecting }; @@ -1208,8 +1224,6 @@ struct ncclIbRecvComm { }; static_assert((offsetof(struct ncclIbRecvComm, remFifo) % 32) == 0, "ncclIbRecvComm fifo must be 32-byte aligned"); -NCCL_PARAM(IbQpsPerConn, "IB_QPS_PER_CONNECTION", 1); - static void ncclIbAddEvent(struct ncclIbRequest* req, int devIndex, struct ncclIbNetCommDevBase* base) { req->events[devIndex]++; req->devBases[devIndex] = base; @@ -1364,6 +1378,7 @@ ncclResult_t ncclIbConnect(int dev, ncclNetCommConfig_t* config, void* opaqueHan struct ncclIbSendComm* comm = (struct ncclIbSendComm*)stage->comm; int ready; uint8_t link_layer = IBV_LINK_LAYER_UNSPECIFIED; + int isP2p = 0; *sendComm = NULL; if (stage->state == ncclIbCommStateConnect) goto ib_connect_check; @@ -1422,10 +1437,12 @@ ib_recv_dev_list: memcpy(&remoteVProps, stage->buffer, sizeof(ncclNetVDeviceProps_t)); mergedDev = ncclIbMergedDevs + dev; comm->base.vProps = mergedDev->vProps; - int localNqps, remoteNqps; - localNqps = ncclParamIbQpsPerConn() * comm->base.vProps.ndevs; // We must have at least 1 qp per-device - remoteNqps = ncclParamIbQpsPerConn() * remoteVProps.ndevs; - comm->base.nqps = remoteNqps > localNqps ? remoteNqps : localNqps; // Select max nqps (local or remote) + + // Read isP2p from handle + isP2p = handle->isP2p; + INFO(NCCL_NET, "NET/IB: ncclIbConnect isP2p=%d", isP2p); + comm->base.nqps = ncclIbCalculateNqps(isP2p, comm->base.vProps.ndevs, + remoteVProps.ndevs, __func__); // Init PD, Ctx for each IB device comm->ar = 1; // Set to 1 for logic @@ -1437,7 +1454,7 @@ ib_recv_dev_list: memset(&meta, 0, sizeof(meta)); meta.ndevs = comm->base.vProps.ndevs; - + meta.isP2p = isP2p; // Alternate QPs between devices int devIndex; devIndex = 0; @@ -1715,11 +1732,6 @@ ib_recv_dev_list: rComm->base.vProps = mergedDev->vProps; memcpy(stage->buffer, &rComm->base.vProps, sizeof(ncclNetVDeviceProps_t)); rComm->base.isSend = false; - int localNqps, remoteNqps; - localNqps = ncclParamIbQpsPerConn() * rComm->base.vProps.ndevs; // We must have at least 1 qp per-device - remoteNqps = ncclParamIbQpsPerConn() * remoteVProps.ndevs; - rComm->base.nqps = remoteNqps > localNqps ? remoteNqps : localNqps; // Select max nqps (local or remote) - stage->offset = 0; stage->state = ncclIbCommStateSendDevList; @@ -1744,10 +1756,12 @@ ib_recv: struct ncclIbRecvCommDev* rCommDev; struct ncclIbDevInfo* remDevInfo; struct ncclIbQp* qp; - bool useDmaBuf; + bool useDmaBuf; mergedDev = ncclIbMergedDevs + lComm->dev; rComm->base.nRemDevs = remMeta.ndevs; + rComm->base.nqps = ncclIbCalculateNqps(remMeta.isP2p, rComm->base.vProps.ndevs, + remMeta.ndevs, __func__); if (rComm->base.nRemDevs != rComm->base.vProps.ndevs) { INFO(NCCL_NET, "NET/IB : Local mergedDev %s has a different number of devices=%d as remote %s %d", mergedDev->devName, rComm->base.vProps.ndevs, remMeta.devName, rComm->base.nRemDevs); @@ -1907,6 +1921,7 @@ ib_recv: meta.qpInfo[q].devIndex = rComm->base.qps[q].devIndex; } meta.ndevs = rComm->base.vProps.ndevs; + meta.isP2p = remMeta.isP2p; strncpy(meta.devName, mergedDev->devName, MAX_MERGED_DEV_NAME); rComm->base.nDataQps = std::max(rComm->base.vProps.ndevs, rComm->base.nRemDevs); @@ -2718,6 +2733,14 @@ ncclResult_t ncclIbCloseListen(void* listenComm) { return ncclSuccess; } +ncclResult_t rcclNetP2pPolicy(void* handle, int isP2p) { + if (!handle) return ncclInvalidArgument; + struct ncclIbHandle* ibHandle = (struct ncclIbHandle*)handle; + if (ibHandle->magic != NCCL_SOCKET_MAGIC) return ncclInvalidArgument; + ibHandle->isP2p = isP2p; + return ncclSuccess; +} + ncclNet_t ncclNetIb = { "IB", ncclIbInit,