From 7ee5c1c28bd8d97f1693a07274deace9a85aa9f4 Mon Sep 17 00:00:00 2001 From: "Wen-Heng (Jack) Chung" Date: Thu, 12 Oct 2023 20:17:08 -0500 Subject: [PATCH] Change MSCCL kernel signature to allow kernel arguments be preloaded via SGPR (#911) * Adding a script that will download/compile/run TransferBench/RCCL/UCX/RCCL-tests/RCCL-Unittests/hip-mpi-testsuite (#895) Co-authored-by: Pedram Alizadeh * Only build gfx941 * demo * fine tune malloc * Fix merge errors * Fix merge errors * Disable parallel build * Adopt --amdgpu-kernarg-preload-count * Revert "Adding a script that will download/compile/run TransferBench/RCCL/UCX/RCCL-tests/RCCL-Unittests/hip-mpi-testsuite (#895)" This reverts commit f5e252dddf02a41b4d1bc512f306f45f97166304. * Revert CMake changes. * NPKIT changes. * Remove some license declarations. * Address code review feedbacks on msccl_kernel_impl.h * Update CMakeLists.txt * Add CMake logic to check the existence of --amdgpu-kernarg-preload-count * Fix NPKIT trace logic. --------- Co-authored-by: Pedram Alizadeh Co-authored-by: Pedram Alizadeh Co-authored-by: Ziyue Yang --- CMakeLists.txt | 12 ++++ src/collectives/device/msccl_kernel_impl.h | 20 +++--- src/collectives/msccl.cc | 2 +- src/include/msccl/msccl_kernel.h | 2 +- src/include/msccl/msccl_struct.h | 23 +++++- src/misc/msccl/msccl_lifecycle.cc | 10 +++ src/misc/msccl/msccl_setup.cc | 82 ++++++++++++++++++++-- 7 files changed, 133 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index d8c9ed6802..1ea721f795 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -201,6 +201,12 @@ if(BUILD_BFD) endif() endif() +# Check for --amdgpu-kernarg-preload-count +check_cxx_compiler_flag("-mllvm --amdgpu-kernarg-preload-count=16" HAVE_KERNARG_PRELOAD) +if (HAVE_KERNARG_PRELOAD) + message(STATUS "Kernarg preloading to SGPR enabled") +endif() + # Determine version from makefiles/version.mk and fill in templates #================================================================================================== ## parse version from Makefile NCCL_MAJOR, NCCL_MINOR, NCCL_PATCH must exist @@ -590,6 +596,9 @@ target_compile_options(rccl PRIVATE -parallel-jobs=12) target_compile_options(rccl PRIVATE -Wno-format-nonliteral) target_compile_options(rccl PRIVATE -fgpu-rdc) # Generate relocatable device code (required for extern __shared__) target_compile_options(rccl PRIVATE -fvisibility=hidden) # Set symbol visibility to hidden by default +if (HAVE_KERNARG_PRELOAD) + target_compile_options(rccl PRIVATE -mllvm --amdgpu-kernarg-preload-count=16) +endif() ## NOTE: This is currently being handled by rocm-cmake, however may need to be re-enabled in the future #foreach(target ${GPU_TARGETS}) @@ -634,6 +643,9 @@ if(NOT BUILD_SHARED_LIBS) else() message(STATUS "Building shared RCCL library") endif() +if (HAVE_KERNARG_PRELOAD) + target_link_options(rccl PRIVATE -mllvm --amdgpu-kernarg-preload-count=16) +endif() target_link_libraries(rccl PRIVATE Threads::Threads) target_link_libraries(rccl INTERFACE hip::host) diff --git a/src/collectives/device/msccl_kernel_impl.h b/src/collectives/device/msccl_kernel_impl.h index ba9460d48f..6f50d09ac4 100644 --- a/src/collectives/device/msccl_kernel_impl.h +++ b/src/collectives/device/msccl_kernel_impl.h @@ -130,7 +130,7 @@ for (int r = 0; r < numloops; r++) { \ template __device__ __forceinline__ void mscclRunInterpreter( - struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { + struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { const int tid = threadIdx.x; const int bid = blockIdx.x; const int nthreads = NCCL_MAX_NTHREADS; @@ -173,7 +173,7 @@ __device__ __forceinline__ void mscclRunInterpreter( break; case 2: dst = &mscclShmem.work; - src = &work; + src = work + blockIdx.x; bytes = sizeof(mscclWork); static_assert(sizeof(mscclWork) <= sizeof(uint64_t) * WARP_SIZE, "mscclWork cannot be loaded by a single warp in one insn."); break; @@ -192,6 +192,8 @@ __device__ __forceinline__ void mscclRunInterpreter( if (tid == 0) ncclShmem.event_buffer_head = 0; #endif __synclds(); // publish shmem + if (tid == 0) + *mscclShmem.work.workFifoDone = mscclShmem.work.workFifoDoneAck; #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) if (tid == 0) { @@ -226,7 +228,7 @@ __device__ __forceinline__ void mscclRunInterpreter( } } __synclds(); // publish shmem - + // User pointers for primitives T* thisInput = (T*)mscclShmem.work.sendBuff; T* thisOutput = (T*)mscclShmem.work.recvBuff; @@ -253,7 +255,7 @@ __device__ __forceinline__ void mscclRunInterpreter( } #endif - const ssize_t sizePerMscclChunk = mscclShmem.work.count / mscclShmem.work.nChunksPerLoop; + const ssize_t sizePerMscclChunk = mscclShmem.work.sizePerMscclChunk; uint32_t maxAllowedCount = mscclShmem.work.maxAllowedCount; #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_MSCCL_RUN_ENTRY) @@ -262,7 +264,7 @@ __device__ __forceinline__ void mscclRunInterpreter( #if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__) asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_XCC_ID)" : "=s" (xcc_id)); #endif - NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_ENTRY, mscclShmem.work.count*sizeof(T), 0, NPKIT_GET_GPU_TIMESTAMP()); + NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_ENTRY, mscclShmem.work.sizePerMscclChunk*mscclShmem.work.nChunksPerLoop, 0, NPKIT_GET_GPU_TIMESTAMP()); } #endif @@ -435,7 +437,7 @@ __device__ __forceinline__ void mscclRunInterpreter( } #if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_MSCCL_RUN_EXIT) if (tid == 0) { - NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_EXIT, mscclShmem.work.count*sizeof(T), 0, NPKIT_GET_GPU_TIMESTAMP()); + NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_EXIT, mscclShmem.work.sizePerMscclChunk*mscclShmem.work.nChunksPerLoop, 0, NPKIT_GET_GPU_TIMESTAMP()); } #endif #if defined(ENABLE_NPKIT) @@ -447,13 +449,13 @@ __device__ __forceinline__ void mscclRunInterpreter( } #define MSCCL_IMPL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE(devredop, type) \ -__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { \ +__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { \ mscclRunInterpreter, ProtoLL>(comm, algo, work); \ } \ -__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL128)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { \ +__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL128)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { \ mscclRunInterpreter, ProtoLL128>(comm, algo, work); \ } \ -__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, Simple)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { \ +__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, Simple)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { \ mscclRunInterpreter, ProtoSimple>(comm, algo, work); \ } diff --git a/src/collectives/msccl.cc b/src/collectives/msccl.cc index 9c93945cae..0d8197b731 100644 --- a/src/collectives/msccl.cc +++ b/src/collectives/msccl.cc @@ -78,7 +78,7 @@ ncclResult_t mscclUnloadAlgo(mscclAlgoHandle_t mscclAlgoHandle) { free(status.hostAlgos[mscclAlgoHandle]); status.hostAlgos.erase(mscclAlgoHandle); - CUDACHECK(hipFree(status.devAlgos[mscclAlgoHandle])); + NCCLCHECK(ncclCudaFree(status.devAlgos[mscclAlgoHandle])); status.devAlgos.erase(mscclAlgoHandle); status.freeAlgoHandles.push_back(mscclAlgoHandle); diff --git a/src/include/msccl/msccl_kernel.h b/src/include/msccl/msccl_kernel.h index 0f634e0f89..647183431b 100644 --- a/src/include/msccl/msccl_kernel.h +++ b/src/include/msccl/msccl_kernel.h @@ -9,7 +9,7 @@ #define MSCCL_KERNEL_ENTRY_NAME(devredop, type, proto) mscclKernel_##devredop##_##type##_##proto #define MSCCL_DECL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE_PROTO(devredop, type, proto) \ -__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, proto)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work); +__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, proto)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work); #define MSCCL_DECL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE(devredop, type) \ MSCCL_DECL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE_PROTO(devredop, type, LL) \ diff --git a/src/include/msccl/msccl_struct.h b/src/include/msccl/msccl_struct.h index ec1ddca432..19a4c375ca 100644 --- a/src/include/msccl/msccl_struct.h +++ b/src/include/msccl/msccl_struct.h @@ -36,6 +36,8 @@ #define MSCCL_LOCAL_COPY 6 #define MSCCL_REDUCE 7 +#define MSCCL_WORK_FIFO_DEPTH (64 << 10) + struct mscclTransmission { int16_t dependencePointer; // index to the first dependence int16_t numDependencies; // dependencePointer+numDependencies indicate the last dependence @@ -210,21 +212,36 @@ struct mscclStatus { bool graphEnabled; bool graphFirstKernel; bool needsProxy; + uint64_t workFifoDepth; + struct mscclWork* workFifo; + uint32_t* workFifoDone; + uint32_t workFifoSent; + uint32_t workFifoSentPerChannel[MAXCHANNELS]; + uint32_t workFifoAckdMin; }; -struct alignas(16) mscclWork { +#pragma pack(push) +#pragma pack(8) + +struct mscclWork { volatile struct mscclFlag *syncFlags; void *scratchBuffer; const void *sendBuff; void *recvBuff; - size_t count; + uint32_t* workFifoDone; + size_t sizePerMscclChunk; uint64_t redOpArg; uint32_t workIndex; - int nChunksPerLoop; uint32_t maxAllowedCount; + uint32_t workFifoDoneAck; + int nChunksPerLoop; bool hasReduce; bool redOpArgIsPtr; + uint32_t pad[1]; }; +static_assert(sizeof(struct mscclWork) % 16 == 0, "mscclWork needs to be 16B aligned"); + +#pragma pack(pop) struct mscclShmemData { struct mscclThreadBlock mscclTB; diff --git a/src/misc/msccl/msccl_lifecycle.cc b/src/misc/msccl/msccl_lifecycle.cc index 540e7bf293..4b32e98837 100644 --- a/src/misc/msccl/msccl_lifecycle.cc +++ b/src/misc/msccl/msccl_lifecycle.cc @@ -190,6 +190,14 @@ ncclResult_t mscclInit(ncclComm_t comm) { NCCLCHECK(ncclCudaCalloc(&status.syncFlags, MSCCL_MAX_NUM_THREAD_BLOCKS)); status.lastStream = nullptr; status.needsProxy = false; + status.workFifoDepth = MSCCL_WORK_FIFO_DEPTH; + NCCLCHECK(ncclCudaCalloc(&status.workFifo, status.workFifoDepth, nullptr, true)); + NCCLCHECK(ncclCudaHostCalloc(&status.workFifoDone, MAXCHANNELS)); + status.workFifoSent = 0; + for (int i = 0; i < MAXCHANNELS; i++) { + status.workFifoSentPerChannel[i] = 0; + } + status.workFifoAckdMin = 0; mscclSchedulerTriedLoadAlgo = false; NCCLCHECK(mscclSchedulerInit()); @@ -494,6 +502,8 @@ ncclResult_t mscclTeardown() { } else { NCCLCHECK(mscclInternalSchedulerTeardown()); } + NCCLCHECK(ncclCudaFree(status.workFifo)); + NCCLCHECK(ncclCudaHostFree(status.workFifoDone)); mscclInitialized.store(false, std::memory_order_release); } diff --git a/src/misc/msccl/msccl_setup.cc b/src/misc/msccl/msccl_setup.cc index bf5aaa535c..5702fabd64 100644 --- a/src/misc/msccl/msccl_setup.cc +++ b/src/misc/msccl/msccl_setup.cc @@ -68,7 +68,7 @@ ncclResult_t mscclSetupScratch(struct mscclAlgo* hostAlgo, hipStream_t stream) { size_t sizeNeeded = (status.nBytes * (size_t)(hostAlgo->nScratchChunks)) / (size_t)(hostAlgo->nChunksPerLoop); if (sizeNeeded > status.scratchBufferSize){ CUDACHECK(hipStreamSynchronize(stream)); - CUDACHECK(hipFree(status.scratchBuffer)); + NCCLCHECK(ncclCudaFree(status.scratchBuffer)); NCCLCHECK(ncclCudaCalloc((char**)&status.scratchBuffer, sizeNeeded)); status.scratchBufferSize = sizeNeeded; } @@ -313,6 +313,57 @@ void* mscclKernelEntries[(ncclNumDevRedOps - 2) * ncclNumTypes * NCCL_NUM_PROTOC #endif }; +// Comparison of monotonic rolling counters. +static inline bool rollingLess32(uint32_t a, uint32_t b) { + constexpr uint32_t PositiveMax = uint32_t(-1)>>1; + return a-b > PositiveMax; +} + +static inline uint32_t rollingMin32(uint32_t a, uint32_t b) { + constexpr uint32_t PositiveMax = uint32_t(-1)>>1; + return (b-a <= PositiveMax) ? a : b; +} + +static void mscclWaitWorkFifoAvailable(uint32_t desiredSent) { + mscclStatus& status = mscclGetStatus(); + if (__builtin_expect(rollingLess32(status.workFifoAckdMin + status.workFifoDepth, desiredSent), false)) { + while (1) { + // We have to poll for notifications from device. + uint32_t* doneLive = status.workFifoDone; + uint32_t ackd[MAXCHANNELS]; + for (int c=0; c < MAXCHANNELS; c++) { + ackd[c] = __atomic_load_n(&doneLive[c], __ATOMIC_RELAXED); + } + // Compiler-only fence to prevent fusion of loops to encourage dense loads. + __atomic_signal_fence(__ATOMIC_SEQ_CST); + + uint32_t ackdAll = status.workFifoSent; + for (int c=0; c < MAXCHANNELS; c++) { + // ackdAll is min over all non-quiesced channels + if (ackd[c] != status.workFifoSentPerChannel[c]) + ackdAll = rollingMin32(ackdAll, ackd[c]); + } + + // Compiler only fence to prevent fusion of loops to encourage dense stores. + __atomic_signal_fence(__ATOMIC_SEQ_CST); + + for (int c=0; c < MAXCHANNELS; c++) { + // Advance counter on quiesced channels so they don't lag behind + // too far where they could get lost in 32-bit wraparound. + if (ackd[c] == status.workFifoSentPerChannel[c]) { + status.workFifoSentPerChannel[c] = ackdAll; + __atomic_store_n(&doneLive[c], ackdAll, __ATOMIC_RELAXED); + } + } + status.workFifoAckdMin = ackdAll; + + // See if that was enough. + if (!rollingLess32(status.workFifoAckdMin + status.workFifoDepth, desiredSent)) break; + sched_yield(); + } + } +} + ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count, ncclDataType_t dataType, ncclRedOp_t op, struct mscclAlgo* hostAlgo, struct mscclAlgo* devAlgo, ncclComm_t comm, hipStream_t stream) { @@ -329,7 +380,8 @@ ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count CUDACHECK(hipStreamWaitEvent(stream, comm->doneEvent, 0)); } - dim3 grid = {(uint32_t)hostAlgo->nBlocks, 1, 1}; + uint32_t numBlocks = (uint32_t)hostAlgo->nBlocks; + dim3 grid = {numBlocks, 1, 1}; dim3 block = {NCCL_MAX_NTHREADS, 1, 1}; ncclDevRedOpFull opFull = {}; NCCLCHECK(hostToDevRedOp(&opFull, op, dataType, comm)); @@ -339,7 +391,7 @@ ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count work.scratchBuffer = status.scratchBuffer; work.sendBuff = sendBuff; work.recvBuff = recvBuff; - work.count = count * hostAlgo->sizeMultiplier; // count is sum of all ranks in MSCCL kernel + work.sizePerMscclChunk = count * hostAlgo->sizeMultiplier / hostAlgo->nChunksPerLoop; // count is sum of all ranks in MSCCL kernel work.redOpArg = opFull.scalarArg; work.workIndex = status.workIndex; work.nChunksPerLoop = hostAlgo->nChunksPerLoop; @@ -348,7 +400,29 @@ ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count work.redOpArgIsPtr = opFull.scalarArgIsPtr; INFO(NCCL_INIT, "MSCCL: Setup Kernel finished"); - void *args[3] = {&comm->devComm, &devAlgo, &work}; + uint32_t workFifoIdxMask = status.workFifoDepth - 1; + uint32_t workFifoSent = status.workFifoSent; + // First work for a channel has to be at workHeap+blockIdx.x which means + // we cannot tolerate fifo wraparound. So round up to the wrap boundary + // if not doing so would incur crossing it. + if (((workFifoSent + numBlocks - 1) & workFifoIdxMask) < (workFifoSent & workFifoIdxMask)) { + workFifoSent = (workFifoSent + workFifoIdxMask) & ~workFifoIdxMask; + // Need to update workFifoSent so waitWorkFifoAvailable() knows we've + // skipped those elements. Consider if all the channels report quiesced, + // this way the skipped slots will be considered consumed as well. + status.workFifoSent = workFifoSent; + } + mscclWaitWorkFifoAvailable(workFifoSent + numBlocks); + for (int i = 0; i < numBlocks; i++) { + work.workFifoDoneAck = workFifoSent + i; + work.workFifoDone = status.workFifoDone + i; + status.workFifoSentPerChannel[i] = workFifoSent + i; + status.workFifo[(workFifoSent + i) & workFifoIdxMask] = work; + } + status.workFifoSent = workFifoSent + numBlocks; + + struct mscclWork *workPtr = status.workFifo + (workFifoSent & workFifoIdxMask); + void *args[3] = {&comm->devComm, &devAlgo, &workPtr}; void *func = mscclKernelEntries[(opFull.op * ncclNumTypes + dataType) * NCCL_NUM_PROTOCOLS + hostAlgo->protocol]; if (enableDoneEvent) { CUDACHECK(hipExtLaunchKernel(func, grid, block, args, 0, stream, NULL, comm->doneEvent, 0));