[NET] Enable capping the number of QPs created for send/recv colls (#1998)

[ROCm/rccl commit: 45991fadad]
Этот коммит содержится в:
Ghadeer Ahmed H Alabandi
2025-11-07 00:47:01 +00:00
коммит произвёл AbandiGa
родитель bd614458c3
Коммит 5b66480595
4 изменённых файлов: 53 добавлений и 13 удалений
+1
Просмотреть файл
@@ -20,6 +20,7 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https:
## RCCL 2.27.7 for ROCm 7.1.0
### Added
* Added `RCCL_IB_QPS_PER_P2P` to set the number of QPs per connection for P2P operations. When set (≥1), P2P operations (Send/Recv) use `RCCL_IB_QPS_PER_P2P`, while other collective operations continue to use `NCCL_IB_QPS_PER_CONNECTION`. When not set, `NCCL_IB_QPS_PER_CONNECTION` applies to all operations.
* Added `RCCL_FORCE_ENABLE_DMABUF` as a debugging feature if the user wants to explicitly enable DMABUF and forego system/kernel checks.
* Added `RCCL_P2P_BATCH_THRESHOLD` to set the message size limit for batching P2P operations. This mainly affects small message performance for alltoall at a large scale but also applies to alltoallv.
* Added `RCCL_P2P_BATCH_ENABLE` to enable batching P2P operations to receive performance gains for smaller messages up to 4MB for alltoall when the workload requires it. This is to avoid performance dips for larger messages.
+2
Просмотреть файл
@@ -23,4 +23,6 @@ ncclResult_t ncclGpuGdrSupport(struct ncclComm* comm, int* gdrSupport);
extern ncclNet_t ncclNetIb;
extern ncclNet_t ncclNetSocket;
extern ncclResult_t rcclNetP2pPolicy(void* handle, int isP2p);
#endif
+14
Просмотреть файл
@@ -122,6 +122,7 @@ struct sendNetResources {
ncclNetDeviceHandle_t* netDeviceHandle;
size_t maxP2pBytes;
volatile uint32_t* curr_hdp_reg; // Curr GPU in ring (for rdma transport use only)
int isP2p;
};
struct recvNetResources {
@@ -194,6 +195,7 @@ struct setupReq {
int channelId;
int connIndex;
uint32_t* curr_hdp_reg;
int isP2p;
};
NCCL_PARAM(NetOptionalRecvCompletion, "NET_OPTIONAL_RECV_COMPLETION", 1);
@@ -222,6 +224,12 @@ static ncclResult_t sendSetup(struct ncclComm* comm, struct ncclTopoGraph* graph
req.connIndex = connIndex;
req.curr_hdp_reg = 0;
req.netDev = -1;
// Determine if this is a P2P connection or not based on the graph pointer
if(graph == NULL) {
req.isP2p = 1;
} else {
req.isP2p = 0;
}
int proxyRank = myInfo->rank;
int64_t netId;
@@ -672,6 +680,7 @@ static ncclResult_t sendProxySetup(struct ncclProxyConnection* connection, struc
resources->channelId = req->channelId;
resources->connIndex = req->connIndex;
resources->curr_hdp_reg = req->curr_hdp_reg;
resources->isP2p = req->isP2p;
ncclNetProperties_t props;
NCCLCHECK(proxyState->ncclNet->getProperties(req->netDev, &props));
/* DMA-BUF support */
@@ -766,6 +775,11 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
commConfig.trafficClass = req->trafficClass == NCCL_CONFIG_UNDEF_INT ? NCCL_NET_TRAFFIC_CLASS_UNDEF : req->trafficClass;
NCCLCHECK(ncclNetGetDeviceHandle(resources->netDeviceType, resources->netDeviceVersion, false /*isRecv*/, &resources->netDeviceHandle));
bool rccl_anp = !(strcmp(proxyState->ncclNet->name, RCCL_ANP_PLUGIN_STR));
if (rcclNetP2pPolicy) {
NCCLCHECK(rcclNetP2pPolicy(req->handle, resources->isP2p));
}
if (resources->shared) {
// Shared buffers
struct ncclProxyProgressState* progressState = &proxyState->progressState;
+36 -13
Просмотреть файл
@@ -129,6 +129,8 @@ NCCL_PARAM(IbFifoTc, "IB_FIFO_TC", -1);
NCCL_PARAM(IbAsyncEvents,"IB_RETURN_ASYNC_EVENTS",1);
NCCL_PARAM(IbEceEnable,"IB_ECE_ENABLE",1);
NCCL_PARAM(IbDataDirect,"IB_DATA_DIRECT",1);
NCCL_PARAM(IbQpsPerConn, "IB_QPS_PER_CONNECTION", 1);
RCCL_PARAM(IbQpsPerP2p, "IB_QPS_PER_P2P", 0);
static ncclResult_t ncclIbStatsInit(struct ncclIbStats* stat) {
__atomic_store_n(&stat->fatalErrorCount, 0, __ATOMIC_RELAXED);
@@ -151,6 +153,18 @@ static void ncclIbQpFatalError(struct ibv_qp* qp) {
static void ncclIbCqFatalError(struct ibv_cq* cq) {
ncclIbStatsFatalError((struct ncclIbStats*)cq->cq_context);
}
// Calculate number of QPs based on P2P flag and device counts
static int ncclIbCalculateNqps(int isP2p, int localNdevs, int remoteNdevs, const char* funcName) {
auto qp_multiplier = (rcclParamIbQpsPerP2p() > 0 && isP2p) ?
rcclParamIbQpsPerP2p() : ncclParamIbQpsPerConn();
int localNqps = qp_multiplier * localNdevs;
int remoteNqps = qp_multiplier * remoteNdevs;
int maxNqps = (remoteNqps > localNqps) ? remoteNqps : localNqps;
INFO(NCCL_NET, "NET/IB: %s Max Nqps=%d, localNqps=%d, remoteNqps=%d",
funcName, maxNqps, localNqps, remoteNqps);
return maxNqps;
}
static void ncclIbDevFatalError(struct ncclIbDev* dev) {
ncclIbStatsFatalError(&dev->stats);
}
@@ -1009,6 +1023,7 @@ struct ncclIbConnectionMetadata {
int ndevs;
int tc;
int sl;
int isP2p;
};
enum ncclIbCommState {
@@ -1034,6 +1049,7 @@ struct ncclIbCommStage {
struct ncclIbHandle {
union ncclSocketAddress connectAddr; // Filled by the target
uint64_t magic; // random number to help debugging
int isP2p; // P2P flag
struct ncclIbCommStage stage; // Used by the other side when connecting
};
@@ -1208,8 +1224,6 @@ struct ncclIbRecvComm {
};
static_assert((offsetof(struct ncclIbRecvComm, remFifo) % 32) == 0, "ncclIbRecvComm fifo must be 32-byte aligned");
NCCL_PARAM(IbQpsPerConn, "IB_QPS_PER_CONNECTION", 1);
static void ncclIbAddEvent(struct ncclIbRequest* req, int devIndex, struct ncclIbNetCommDevBase* base) {
req->events[devIndex]++;
req->devBases[devIndex] = base;
@@ -1364,6 +1378,7 @@ ncclResult_t ncclIbConnect(int dev, ncclNetCommConfig_t* config, void* opaqueHan
struct ncclIbSendComm* comm = (struct ncclIbSendComm*)stage->comm;
int ready;
uint8_t link_layer = IBV_LINK_LAYER_UNSPECIFIED;
int isP2p = 0;
*sendComm = NULL;
if (stage->state == ncclIbCommStateConnect) goto ib_connect_check;
@@ -1422,10 +1437,12 @@ ib_recv_dev_list:
memcpy(&remoteVProps, stage->buffer, sizeof(ncclNetVDeviceProps_t));
mergedDev = ncclIbMergedDevs + dev;
comm->base.vProps = mergedDev->vProps;
int localNqps, remoteNqps;
localNqps = ncclParamIbQpsPerConn() * comm->base.vProps.ndevs; // We must have at least 1 qp per-device
remoteNqps = ncclParamIbQpsPerConn() * remoteVProps.ndevs;
comm->base.nqps = remoteNqps > localNqps ? remoteNqps : localNqps; // Select max nqps (local or remote)
// Read isP2p from handle
isP2p = handle->isP2p;
INFO(NCCL_NET, "NET/IB: ncclIbConnect isP2p=%d", isP2p);
comm->base.nqps = ncclIbCalculateNqps(isP2p, comm->base.vProps.ndevs,
remoteVProps.ndevs, __func__);
// Init PD, Ctx for each IB device
comm->ar = 1; // Set to 1 for logic
@@ -1437,7 +1454,7 @@ ib_recv_dev_list:
memset(&meta, 0, sizeof(meta));
meta.ndevs = comm->base.vProps.ndevs;
meta.isP2p = isP2p;
// Alternate QPs between devices
int devIndex;
devIndex = 0;
@@ -1715,11 +1732,6 @@ ib_recv_dev_list:
rComm->base.vProps = mergedDev->vProps;
memcpy(stage->buffer, &rComm->base.vProps, sizeof(ncclNetVDeviceProps_t));
rComm->base.isSend = false;
int localNqps, remoteNqps;
localNqps = ncclParamIbQpsPerConn() * rComm->base.vProps.ndevs; // We must have at least 1 qp per-device
remoteNqps = ncclParamIbQpsPerConn() * remoteVProps.ndevs;
rComm->base.nqps = remoteNqps > localNqps ? remoteNqps : localNqps; // Select max nqps (local or remote)
stage->offset = 0;
stage->state = ncclIbCommStateSendDevList;
@@ -1744,10 +1756,12 @@ ib_recv:
struct ncclIbRecvCommDev* rCommDev;
struct ncclIbDevInfo* remDevInfo;
struct ncclIbQp* qp;
bool useDmaBuf;
bool useDmaBuf;
mergedDev = ncclIbMergedDevs + lComm->dev;
rComm->base.nRemDevs = remMeta.ndevs;
rComm->base.nqps = ncclIbCalculateNqps(remMeta.isP2p, rComm->base.vProps.ndevs,
remMeta.ndevs, __func__);
if (rComm->base.nRemDevs != rComm->base.vProps.ndevs) {
INFO(NCCL_NET, "NET/IB : Local mergedDev %s has a different number of devices=%d as remote %s %d",
mergedDev->devName, rComm->base.vProps.ndevs, remMeta.devName, rComm->base.nRemDevs);
@@ -1907,6 +1921,7 @@ ib_recv:
meta.qpInfo[q].devIndex = rComm->base.qps[q].devIndex;
}
meta.ndevs = rComm->base.vProps.ndevs;
meta.isP2p = remMeta.isP2p;
strncpy(meta.devName, mergedDev->devName, MAX_MERGED_DEV_NAME);
rComm->base.nDataQps = std::max(rComm->base.vProps.ndevs, rComm->base.nRemDevs);
@@ -2718,6 +2733,14 @@ ncclResult_t ncclIbCloseListen(void* listenComm) {
return ncclSuccess;
}
ncclResult_t rcclNetP2pPolicy(void* handle, int isP2p) {
if (!handle) return ncclInvalidArgument;
struct ncclIbHandle* ibHandle = (struct ncclIbHandle*)handle;
if (ibHandle->magic != NCCL_SOCKET_MAGIC) return ncclInvalidArgument;
ibHandle->isP2p = isP2p;
return ncclSuccess;
}
ncclNet_t ncclNetIb = {
"IB",
ncclIbInit,