Use one side stream per process (#2063)

* Use one side stream per process

* Handle multiple GPUs per process

* Reset stream when not found

* Address review comments

* Fix missing mutex initializer

[ROCm/rccl commit: 185e78a8f0]
Этот коммит содержится в:
Wenkai Du
2025-12-02 10:03:15 -08:00
коммит произвёл GitHub
родитель 8e3f60e080
Коммит 3e650467fa
8 изменённых файлов: 117 добавлений и 40 удалений
+86 -7
Просмотреть файл
@@ -17,6 +17,7 @@
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <map>
#include "rccl_vars.h"
#if CUDART_VERSION >= 11030
@@ -31,6 +32,78 @@ constexpr size_t ncclSizeOfT() { return sizeof(T); }
template<>
constexpr size_t ncclSizeOfT<void>() { return 1; }
struct ncclSideStream {
cudaStream_t stream;
uint64_t refCount;
};
inline std::unordered_map<int64_t, ncclSideStream> sideStream;
inline pthread_mutex_t sideStreamLock = PTHREAD_MUTEX_INITIALIZER;
extern ncclResult_t getBusId(int cudaDev, int64_t *busId);
static inline ncclResult_t ncclCreateSideStream(int cudaDev) {
ncclResult_t res = ncclSuccess;
int64_t busId;
NCCLCHECK(getBusId(cudaDev, &busId));
pthread_mutex_lock(&sideStreamLock);
if (auto it = sideStream.find(busId); it != sideStream.end()) {
it->second.refCount++;
INFO(NCCL_ALLOC, "Side stream %p of dev %d busid %lx inc count to %ld",
it->second.stream, cudaDev, busId, it->second.refCount);
} else {
cudaStream_t stream;
CUDACHECKGOTO(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking), res, fail);
sideStream.emplace(busId, ncclSideStream{stream, 1});
INFO(NCCL_ALLOC, "Created side stream %p of dev %d busid %lx",
stream, cudaDev, busId);
}
fail:
pthread_mutex_unlock(&sideStreamLock);
return res;
};
static inline ncclResult_t ncclDestroySideStream(int cudaDev) {
ncclResult_t res = ncclSuccess;
int64_t busId;
NCCLCHECK(getBusId(cudaDev, &busId));
pthread_mutex_lock(&sideStreamLock);
if (auto it = sideStream.find(busId); it != sideStream.end()) {
it->second.refCount--;
if (it->second.refCount== 0) {
INFO(NCCL_ALLOC, "Destroyed side stream %p of dev %d busid %lx",
it->second.stream, cudaDev, busId);
CUDACHECKGOTO(cudaStreamDestroy(it->second.stream), res, fail);
sideStream.erase(it);
} else {
INFO(NCCL_ALLOC, "Side stream %p of dev %d busid %lx dec count to %ld",
it->second.stream, cudaDev, busId, it->second.refCount);
}
} else {
WARN("Side stream of dev %d busid %lx was not found for destroy", cudaDev, busId);
}
fail:
pthread_mutex_unlock(&sideStreamLock);
return res;
};
static inline ncclResult_t getSideStream(cudaStream_t *stream) {
int cudaDev;
int64_t busId;
CUDACHECK(cudaGetDevice(&cudaDev));
NCCLCHECK(getBusId(cudaDev, &busId));
pthread_mutex_lock(&sideStreamLock);
if (auto it = sideStream.find(busId); it != sideStream.end()) {
*stream = it->second.stream;
INFO(NCCL_ALLOC, "Found side stream %p of dev %d busid %lx count %ld",
it->second.stream, cudaDev, busId, it->second.refCount);
} else {
*stream = 0;
WARN("Side stream of dev %d busid %lx was not found", cudaDev, busId);
}
pthread_mutex_unlock(&sideStreamLock);
return ncclSuccess;
}
#if CUDART_VERSION >= 12020
static inline ncclResult_t ncclCuMemHostAlloc(void** ptr, CUmemGenericAllocationHandle *handlep, size_t size) {
@@ -362,7 +435,7 @@ finish:
#define ncclCudaMalloc(...) ncclCudaMallocDebug( __FILE__, __LINE__, __VA_ARGS__)
template <typename T>
ncclResult_t ncclCudaCallocDebug(const char *filefunc, int line, T** ptr, size_t nelem, cudaStream_t sideStream = nullptr, unsigned int flags = hipDeviceMallocDefault) {
ncclResult_t ncclCudaCallocDebug(const char *filefunc, int line, T** ptr, size_t nelem, unsigned int flags = hipDeviceMallocDefault) {
ncclResult_t result = ncclSuccess;
cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
*ptr = nullptr;
@@ -370,13 +443,15 @@ ncclResult_t ncclCudaCallocDebug(const char *filefunc, int line, T** ptr, size_t
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
// Need a side stream so as not to interfere with graph capture.
cudaStream_t stream = sideStream;
if (stream == nullptr)
cudaStream_t stream, sidestream;
NCCLCHECK(getSideStream(&sidestream));
stream = sidestream;
if (sidestream == nullptr)
CUDACHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
CUDACHECKGOTO(hipExtMallocWithFlags((void**)ptr, nelem*ncclSizeOfT<T>(), flags), result, finish);
CUDACHECKGOTO(cudaMemsetAsync(*ptr, 0, nelem*ncclSizeOfT<T>(), stream), result, finish);
CUDACHECKGOTO(cudaStreamSynchronize(stream), result, finish);
if (sideStream == nullptr)
if (sidestream == nullptr)
CUDACHECKGOTO(cudaStreamDestroy(stream), result, finish);
finish:
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
@@ -428,11 +503,15 @@ ncclResult_t ncclCudaMemcpy(T* dst, T* src, size_t nelem) {
cudaStreamCaptureMode mode = cudaStreamCaptureModeRelaxed;
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
// Need a side stream so as not to interfere with graph capture.
cudaStream_t stream;
CUDACHECKGOTO(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking), result, finish);
cudaStream_t stream, sidestream;
NCCLCHECK(getSideStream(&sidestream));
stream = sidestream;
if (sidestream == nullptr)
CUDACHECKGOTO(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking), result, finish);
NCCLCHECKGOTO(ncclCudaMemcpyAsync(dst, src, nelem, stream), result, finish);
CUDACHECKGOTO(cudaStreamSynchronize(stream), result, finish);
CUDACHECKGOTO(cudaStreamDestroy(stream), result, finish);
if (sidestream == nullptr)
CUDACHECKGOTO(cudaStreamDestroy(stream), result, finish);
finish:
CUDACHECK(cudaThreadExchangeStreamCaptureMode(&mode));
return result;
-2
Просмотреть файл
@@ -629,8 +629,6 @@ struct ncclComm {
struct ncclKernelPlanner planner;
hipStream_t sideStream; // [RCCL] Cached non-captured stream
cudaMemPool_t memPool;
// Queue of events and associated callbacks for cleaning up asynchronous work.
// Using this is preferable to using CUDA host callbacks because host callbacks
+3 -3
Просмотреть файл
@@ -180,7 +180,7 @@ static gdr_t ncclGdrInit() {
}
template <typename T>
static ncclResult_t ncclGdrCudaCalloc(T** ptr, T** devPtr, size_t nelem, void** gdrHandle, hipStream_t stream) {
static ncclResult_t ncclGdrCudaCalloc(T** ptr, T** devPtr, size_t nelem, void** gdrHandle) {
// gdr_info_t info; // unused variable - compiler warning
size_t mapSize;
// gdr_mh_t mh; // unused variable - compiler warning
@@ -193,9 +193,9 @@ static ncclResult_t ncclGdrCudaCalloc(T** ptr, T** devPtr, size_t nelem, void**
ALIGN_SIZE(mapSize, GPU_PAGE_SIZE);
// GDRCOPY Pinned buffer has to be GPU_PAGE_SIZE aligned too
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1, stream, hipDeviceMallocUncached));
NCCLCHECK(ncclCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1, hipDeviceMallocUncached));
#else
NCCLCHECK(ncclCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1, stream, hipDeviceMallocFinegrained));
NCCLCHECK(ncclCudaCalloc(&devMem, mapSize+GPU_PAGE_SIZE-1, hipDeviceMallocFinegrained));
#endif
gdr_mem_desc_t* md;
NCCLCHECK(ncclCalloc(&md, 1));
+7 -7
Просмотреть файл
@@ -94,7 +94,7 @@ NCCL_PARAM(CtaPolicy, "CTA_POLICY", NCCL_CONFIG_UNDEF_INT);
NCCL_PARAM(NvlsChannels, "NVLS_NCHANNELS", NCCL_CONFIG_UNDEF_INT);
struct allocationTracker allocTracker[MAX_ALLOC_TRACK_NGPU] = {};
static ncclResult_t commReclaim(ncclComm_t comm);
ncclResult_t commReclaim(ncclComm_t comm);
#ifdef ENABLE_MSCCLPP
size_t std::hash<ncclUniqueId>::operator ()(const ncclUniqueId& uniqueId) const noexcept {
@@ -521,7 +521,6 @@ static ncclResult_t commFree(ncclComm_t comm) {
NCCLCHECK(dtor->fn(dtor));
dtor = dtor->next;
}
CUDACHECK(hipStreamDestroy(comm->sideStream));
ncclMemoryStackDestruct(&comm->memScoped);
ncclMemoryStackDestruct(&comm->memPermanent);
@@ -544,6 +543,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
NCCLCHECK(ncclNvlsSymmetricFinalize(comm));
NCCLCHECK(ncclIpcSymmetricFinalize(comm));
}
NCCLCHECK(ncclDestroySideStream(comm->cudaDev));
INFO(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx - %s COMPLETE", comm, comm->rank, comm->nRanks, comm->cudaDev, comm->busId, abort ? "Abort" : "Destroy");
commPoison(comm); // poison comm before free to avoid comm reuse.
@@ -650,6 +650,9 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
comm->lastStream = nullptr;
CUDACHECK(cudaGetDevice(&comm->cudaDev));
// RCCL: create persistent stream for calloc
NCCLCHECK(ncclCreateSideStream(comm->cudaDev));
// Disable until we validate NCCL_LAUNCH_IMPLICIT_ORDER support.
// but can be enabled via environment variable
if (rcclParamEnableContextTracking() == 1) {
@@ -666,9 +669,6 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
comm->compCap = ncclCudaCompCap();
TRACE(NCCL_INIT,"comm %p rank %d nranks %d cudaDev %d busId %lx compCap %d", comm, rank, ndev, comm->cudaDev, comm->busId, comm->compCap);
// RCCL: create persistent stream for calloc
CUDACHECK(hipStreamCreateWithFlags(&comm->sideStream, hipStreamNonBlocking));
comm->checkPointers = ncclParamCheckPointers() == 1 ? true : false;
comm->dmaBufSupport = (dmaBufSupported(comm) == ncclSuccess) ? true : false;
@@ -819,7 +819,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
if (ncclGdrCopy != NULL && ncclParamGdrCopyFifoEnable() == 1) {
// The workFifoBuf lives in GDR mapped CUDA memory.
NCCLCHECKGOTO(ncclGdrCudaCalloc(&comm->workFifoBuf, &comm->workFifoBufDev, comm->workFifoBytes, &comm->workFifoBufGdrHandle, comm->sideStream), ret, fail);
NCCLCHECKGOTO(ncclGdrCudaCalloc(&comm->workFifoBuf, &comm->workFifoBufDev, comm->workFifoBytes, &comm->workFifoBufGdrHandle), ret, fail);
ncclCommPushCudaGdrFree(comm, comm->workFifoBufGdrHandle);
} else {
// The workFifoBuf lives in cudaHost memory.
@@ -876,7 +876,7 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
#endif
#ifdef ENABLE_PROFILING
NCCLCHECK(ncclCudaCalloc(&tmpCommAndChans.comm.devProf, MAXCHANNELS*PROFILE_NUM_LAUNCHES, comm->sideStream));
NCCLCHECK(ncclCudaCalloc(&tmpCommAndChans.comm.devProf, MAXCHANNELS*PROFILE_NUM_LAUNCHES));
#endif
#ifdef ENABLE_FAULT_INJECTION
+4 -4
Просмотреть файл
@@ -409,9 +409,9 @@ static ncclResult_t sharedBuffersInit(struct ncclCollNetSharedRes* collNet, int
if (cuda && collNet->cudaBuff == NULL) {
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc(&collNet->cudaBuff, *size, nullptr, cuda ? hipDeviceMallocUncached : hipDeviceMallocDefault));
NCCLCHECK(ncclCudaCalloc(&collNet->cudaBuff, *size, cuda ? hipDeviceMallocUncached : hipDeviceMallocDefault));
#else
NCCLCHECK(ncclCudaCalloc(&collNet->cudaBuff, *size, nullptr, cuda ? hipDeviceMallocFinegrained : hipDeviceMallocDefault));
NCCLCHECK(ncclCudaCalloc(&collNet->cudaBuff, *size, cuda ? hipDeviceMallocFinegrained : hipDeviceMallocDefault));
#endif
}
if (!cuda && collNet->hostBuff == NULL) {
@@ -504,7 +504,7 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
map->mems[NCCL_NET_MAP_HOSTMEM].gpuPtr = map->mems[NCCL_NET_MAP_HOSTMEM].cpuPtr;
if (ncclGdrCopy && ncclParamGdrCopySyncEnable()) {
uint64_t *cpuPtr, *gpuPtr;
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 1, &resources->gdrDesc, nullptr));
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 1, &resources->gdrDesc));
resources->gdcSync = cpuPtr;
struct connectMapMem* gdcMem = map->mems+NCCL_NET_MAP_GDCMEM;
@@ -582,7 +582,7 @@ static ncclResult_t recvProxyConnect(struct ncclProxyConnection* connection, str
map->mems[NCCL_NET_MAP_HOSTMEM].gpuPtr = map->mems[NCCL_NET_MAP_HOSTMEM].cpuPtr;
if (ncclGdrCopy) {
uint64_t *cpuPtr, *gpuPtr;
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 2, &resources->gdrDesc, nullptr));
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 2, &resources->gdrDesc));
if (ncclParamGdrCopySyncEnable()) {
resources->gdcSync = cpuPtr;
+11 -11
Просмотреть файл
@@ -597,14 +597,14 @@ static ncclResult_t sharedNetBuffersInit(struct ncclProxyState* proxyState, int
} else {
#if defined(HIP_UNCACHED_MEMORY)
#if defined(HIP_CONTIGUOUS_MEMORY)
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, state->size, nullptr,
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, state->size,
cuda ? (rcclParamNetContiguousMem() ? hipDeviceMallocContiguous : hipDeviceMallocUncached) : hipDeviceMallocDefault));
#else
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, state->size, nullptr,
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, state->size,
cuda ? hipDeviceMallocUncached : hipDeviceMallocDefault));
#endif
#else
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, state->size, nullptr,
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, state->size,
cuda ? hipDeviceMallocFinegrained : hipDeviceMallocDefault));
#endif
}
@@ -888,14 +888,14 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
} else {
#if defined(HIP_UNCACHED_MEMORY)
#if defined(HIP_CONTIGUOUS_MEMORY)
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size, nullptr,
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size,
resources->useGdr ? (rcclParamNetContiguousMem() ? hipDeviceMallocContiguous : hipDeviceMallocUncached) : hipDeviceMallocDefault));
#else
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size, nullptr,
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size,
resources->useGdr ? hipDeviceMallocUncached : hipDeviceMallocDefault));
#endif
#else
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size, nullptr,
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size,
resources->useGdr ? hipDeviceMallocFinegrained : hipDeviceMallocDefault));
#endif
}
@@ -914,7 +914,7 @@ static ncclResult_t sendProxyConnect(struct ncclProxyConnection* connection, str
}
if (ncclGdrCopy && map->sameProcess && ncclParamGdrCopySyncEnable()) {
uint64_t *cpuPtr, *gpuPtr;
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 1, &resources->gdrDesc, nullptr));
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 1, &resources->gdrDesc));
resources->gdcSync = cpuPtr;
struct connectMapMem* gdcMem = map->mems+NCCL_NET_MAP_GDCMEM;
@@ -1091,14 +1091,14 @@ static ncclResult_t recvProxyConnect(struct ncclProxyConnection* connection, str
} else {
#if defined(HIP_UNCACHED_MEMORY)
#if defined(HIP_CONTIGUOUS_MEMORY)
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size, nullptr,
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size,
resources->useGdr ? (rcclParamNetContiguousMem() ? hipDeviceMallocContiguous : hipDeviceMallocUncached) : hipDeviceMallocDefault));
#else
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size, nullptr,
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size,
resources->useGdr ? hipDeviceMallocUncached : hipDeviceMallocDefault));
#endif
#else
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size, nullptr,
NCCLCHECK(ncclCudaCalloc(&map->mems[NCCL_NET_MAP_DEVMEM].gpuPtr, map->mems[NCCL_NET_MAP_DEVMEM].size,
resources->useGdr ? hipDeviceMallocFinegrained : hipDeviceMallocDefault));
#endif
}
@@ -1109,7 +1109,7 @@ static ncclResult_t recvProxyConnect(struct ncclProxyConnection* connection, str
map->mems[NCCL_NET_MAP_HOSTMEM].gpuPtr = map->mems[NCCL_NET_MAP_HOSTMEM].cpuPtr;
if (ncclGdrCopy && map->sameProcess) {
uint64_t *cpuPtr, *gpuPtr;
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 2, &resources->gdrDesc, nullptr));
NCCLCHECK(ncclGdrCudaCalloc(&cpuPtr, &gpuPtr, 2, &resources->gdrDesc));
if (ncclParamGdrCopySyncEnable()) {
resources->gdcSync = cpuPtr;
+2 -2
Просмотреть файл
@@ -1855,9 +1855,9 @@ ib_recv:
if (rComm->flushEnabled) {
if (rcclParamIbGdrFlushGpuMemNoRelaxedOrdering()) {
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECKGOTO(ncclCudaCalloc(&rCommDev->gpuFlush.gpuFlushGpuMem, sizeof(int), nullptr, hipDeviceMallocUncached), ret, fail);
NCCLCHECKGOTO(ncclCudaCalloc(&rCommDev->gpuFlush.gpuFlushGpuMem, sizeof(int), hipDeviceMallocUncached), ret, fail);
#else
NCCLCHECKGOTO(ncclCudaCalloc(&rCommDev->gpuFlush.gpuFlushGpuMem, sizeof(int), nullptr, hipDeviceMallocFinegrained), ret, fail);
NCCLCHECKGOTO(ncclCudaCalloc(&rCommDev->gpuFlush.gpuFlushGpuMem, sizeof(int), hipDeviceMallocFinegrained), ret, fail);
#endif
if (useDmaBuf)
{
+4 -4
Просмотреть файл
@@ -247,9 +247,9 @@ ncclResult_t ncclP2pAllocateShareableBuffer(size_t size, int refcount, ncclIpcDe
} else {
// Allocate a CUDA buffer and generate an IPC handle for it
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc((char **)ptr, size, nullptr, hipDeviceMallocUncached));
NCCLCHECK(ncclCudaCalloc((char **)ptr, size, hipDeviceMallocUncached));
#else
NCCLCHECK(ncclCudaCalloc((char **)ptr, size, nullptr, hipDeviceMallocFinegrained));
NCCLCHECK(ncclCudaCalloc((char **)ptr, size, hipDeviceMallocFinegrained));
#endif
cudaError_t res = cudaIpcGetMemHandle(&ipcDesc->devIpc, *ptr);
if (res != cudaSuccess) {
@@ -667,9 +667,9 @@ static ncclResult_t p2pSendProxySetup(struct ncclProxyConnection* connection, st
connection->transportResources = proxyInfo;
#if defined(HIP_UNCACHED_MEMORY)
NCCLCHECK(ncclCudaCalloc(&proxyInfo->ceDevBuff, proxyState->buffSizes[NCCL_PROTO_SIMPLE], nullptr, hipDeviceMallocUncached));
NCCLCHECK(ncclCudaCalloc(&proxyInfo->ceDevBuff, proxyState->buffSizes[NCCL_PROTO_SIMPLE], hipDeviceMallocUncached));
#else
NCCLCHECK(ncclCudaCalloc(&proxyInfo->ceDevBuff, proxyState->buffSizes[NCCL_PROTO_SIMPLE], nullptr, hipDeviceMallocFinegrained));
NCCLCHECK(ncclCudaCalloc(&proxyInfo->ceDevBuff, proxyState->buffSizes[NCCL_PROTO_SIMPLE], hipDeviceMallocFinegrained));
#endif
// Create a SHM segment for the peer to attach to