Change MSCCL kernel signature to allow kernel arguments be preloaded via SGPR (#911)

* Adding a script that will download/compile/run TransferBench/RCCL/UCX/RCCL-tests/RCCL-Unittests/hip-mpi-testsuite (#895)

Co-authored-by: Pedram Alizadeh <pmohamma@banff-pla-r27-05.pla.dcgpu>

* Only build gfx941

* demo

* fine tune malloc

* Fix merge errors

* Fix merge errors

* Disable parallel build

* Adopt --amdgpu-kernarg-preload-count

* Revert "Adding a script that will download/compile/run TransferBench/RCCL/UCX/RCCL-tests/RCCL-Unittests/hip-mpi-testsuite (#895)"

This reverts commit f5e252dddf02a41b4d1bc512f306f45f97166304.

* Revert CMake changes.

* NPKIT changes.

* Remove some license declarations.

* Address code review feedbacks on msccl_kernel_impl.h

* Update CMakeLists.txt

* Add CMake logic to check the existence of --amdgpu-kernarg-preload-count

* Fix NPKIT trace logic.

---------

Co-authored-by: Pedram Alizadeh <pmohamma@amd.com>
Co-authored-by: Pedram Alizadeh <pmohamma@banff-pla-r27-05.pla.dcgpu>
Co-authored-by: Ziyue Yang <ziyyang@microsoft.com>
Tento commit je obsažen v:
Wen-Heng (Jack) Chung
2023-10-12 20:17:08 -05:00
odevzdal GitHub
rodič 7e2d905376
revize 7ee5c1c28b
7 změnil soubory, kde provedl 133 přidání a 18 odebrání
+12
Zobrazit soubor
@@ -201,6 +201,12 @@ if(BUILD_BFD)
endif()
endif()
# Check for --amdgpu-kernarg-preload-count
check_cxx_compiler_flag("-mllvm --amdgpu-kernarg-preload-count=16" HAVE_KERNARG_PRELOAD)
if (HAVE_KERNARG_PRELOAD)
message(STATUS "Kernarg preloading to SGPR enabled")
endif()
# Determine version from makefiles/version.mk and fill in templates
#==================================================================================================
## parse version from Makefile NCCL_MAJOR, NCCL_MINOR, NCCL_PATCH must exist
@@ -590,6 +596,9 @@ target_compile_options(rccl PRIVATE -parallel-jobs=12)
target_compile_options(rccl PRIVATE -Wno-format-nonliteral)
target_compile_options(rccl PRIVATE -fgpu-rdc) # Generate relocatable device code (required for extern __shared__)
target_compile_options(rccl PRIVATE -fvisibility=hidden) # Set symbol visibility to hidden by default
if (HAVE_KERNARG_PRELOAD)
target_compile_options(rccl PRIVATE -mllvm --amdgpu-kernarg-preload-count=16)
endif()
## NOTE: This is currently being handled by rocm-cmake, however may need to be re-enabled in the future
#foreach(target ${GPU_TARGETS})
@@ -634,6 +643,9 @@ if(NOT BUILD_SHARED_LIBS)
else()
message(STATUS "Building shared RCCL library")
endif()
if (HAVE_KERNARG_PRELOAD)
target_link_options(rccl PRIVATE -mllvm --amdgpu-kernarg-preload-count=16)
endif()
target_link_libraries(rccl PRIVATE Threads::Threads)
target_link_libraries(rccl INTERFACE hip::host)
+11 -9
Zobrazit soubor
@@ -130,7 +130,7 @@ for (int r = 0; r < numloops; r++) { \
template<typename T, typename RedOp, typename Proto>
__device__ __forceinline__ void mscclRunInterpreter(
struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) {
struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) {
const int tid = threadIdx.x;
const int bid = blockIdx.x;
const int nthreads = NCCL_MAX_NTHREADS;
@@ -173,7 +173,7 @@ __device__ __forceinline__ void mscclRunInterpreter(
break;
case 2:
dst = &mscclShmem.work;
src = &work;
src = work + blockIdx.x;
bytes = sizeof(mscclWork);
static_assert(sizeof(mscclWork) <= sizeof(uint64_t) * WARP_SIZE, "mscclWork cannot be loaded by a single warp in one insn.");
break;
@@ -192,6 +192,8 @@ __device__ __forceinline__ void mscclRunInterpreter(
if (tid == 0) ncclShmem.event_buffer_head = 0;
#endif
__synclds(); // publish shmem
if (tid == 0)
*mscclShmem.work.workFifoDone = mscclShmem.work.workFifoDoneAck;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (tid == 0) {
@@ -226,7 +228,7 @@ __device__ __forceinline__ void mscclRunInterpreter(
}
}
__synclds(); // publish shmem
// User pointers for primitives
T* thisInput = (T*)mscclShmem.work.sendBuff;
T* thisOutput = (T*)mscclShmem.work.recvBuff;
@@ -253,7 +255,7 @@ __device__ __forceinline__ void mscclRunInterpreter(
}
#endif
const ssize_t sizePerMscclChunk = mscclShmem.work.count / mscclShmem.work.nChunksPerLoop;
const ssize_t sizePerMscclChunk = mscclShmem.work.sizePerMscclChunk;
uint32_t maxAllowedCount = mscclShmem.work.maxAllowedCount;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_MSCCL_RUN_ENTRY)
@@ -262,7 +264,7 @@ __device__ __forceinline__ void mscclRunInterpreter(
#if defined(__gfx940__) || defined(__gfx941__) || defined(__gfx942__)
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_XCC_ID)" : "=s" (xcc_id));
#endif
NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_ENTRY, mscclShmem.work.count*sizeof(T), 0, NPKIT_GET_GPU_TIMESTAMP());
NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_ENTRY, mscclShmem.work.sizePerMscclChunk*mscclShmem.work.nChunksPerLoop, 0, NPKIT_GET_GPU_TIMESTAMP());
}
#endif
@@ -435,7 +437,7 @@ __device__ __forceinline__ void mscclRunInterpreter(
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_MSCCL_RUN_EXIT)
if (tid == 0) {
NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_EXIT, mscclShmem.work.count*sizeof(T), 0, NPKIT_GET_GPU_TIMESTAMP());
NpKit::CollectGpuEventLDS(NPKIT_EVENT_MSCCL_RUN_EXIT, mscclShmem.work.sizePerMscclChunk*mscclShmem.work.nChunksPerLoop, 0, NPKIT_GET_GPU_TIMESTAMP());
}
#endif
#if defined(ENABLE_NPKIT)
@@ -447,13 +449,13 @@ __device__ __forceinline__ void mscclRunInterpreter(
}
#define MSCCL_IMPL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE(devredop, type) \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { \
mscclRunInterpreter<type, Func##devredop<type>, ProtoLL>(comm, algo, work); \
} \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL128)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, LL128)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { \
mscclRunInterpreter<type, Func##devredop<type>, ProtoLL128>(comm, algo, work); \
} \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, Simple)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work) { \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, Simple)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work) { \
mscclRunInterpreter<type, Func##devredop<type>, ProtoSimple<MSCCL_CHUNKSTEPS/MSCCL_SLICESTEPS, MSCCL_SLICESTEPS>>(comm, algo, work); \
}
+1 -1
Zobrazit soubor
@@ -78,7 +78,7 @@ ncclResult_t mscclUnloadAlgo(mscclAlgoHandle_t mscclAlgoHandle) {
free(status.hostAlgos[mscclAlgoHandle]);
status.hostAlgos.erase(mscclAlgoHandle);
CUDACHECK(hipFree(status.devAlgos[mscclAlgoHandle]));
NCCLCHECK(ncclCudaFree(status.devAlgos[mscclAlgoHandle]));
status.devAlgos.erase(mscclAlgoHandle);
status.freeAlgoHandles.push_back(mscclAlgoHandle);
+1 -1
Zobrazit soubor
@@ -9,7 +9,7 @@
#define MSCCL_KERNEL_ENTRY_NAME(devredop, type, proto) mscclKernel_##devredop##_##type##_##proto
#define MSCCL_DECL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE_PROTO(devredop, type, proto) \
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, proto)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork work);
__global__ void MSCCL_KERNEL_ENTRY_NAME(devredop, type, proto)(struct ncclDevComm* comm, struct mscclAlgo* algo, struct mscclWork* work);
#define MSCCL_DECL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE(devredop, type) \
MSCCL_DECL_KERNEL_ENTRY_FUNC_DEVREDOP_TYPE_PROTO(devredop, type, LL) \
+20 -3
Zobrazit soubor
@@ -36,6 +36,8 @@
#define MSCCL_LOCAL_COPY 6
#define MSCCL_REDUCE 7
#define MSCCL_WORK_FIFO_DEPTH (64 << 10)
struct mscclTransmission {
int16_t dependencePointer; // index to the first dependence
int16_t numDependencies; // dependencePointer+numDependencies indicate the last dependence
@@ -210,21 +212,36 @@ struct mscclStatus {
bool graphEnabled;
bool graphFirstKernel;
bool needsProxy;
uint64_t workFifoDepth;
struct mscclWork* workFifo;
uint32_t* workFifoDone;
uint32_t workFifoSent;
uint32_t workFifoSentPerChannel[MAXCHANNELS];
uint32_t workFifoAckdMin;
};
struct alignas(16) mscclWork {
#pragma pack(push)
#pragma pack(8)
struct mscclWork {
volatile struct mscclFlag *syncFlags;
void *scratchBuffer;
const void *sendBuff;
void *recvBuff;
size_t count;
uint32_t* workFifoDone;
size_t sizePerMscclChunk;
uint64_t redOpArg;
uint32_t workIndex;
int nChunksPerLoop;
uint32_t maxAllowedCount;
uint32_t workFifoDoneAck;
int nChunksPerLoop;
bool hasReduce;
bool redOpArgIsPtr;
uint32_t pad[1];
};
static_assert(sizeof(struct mscclWork) % 16 == 0, "mscclWork needs to be 16B aligned");
#pragma pack(pop)
struct mscclShmemData {
struct mscclThreadBlock mscclTB;
+10
Zobrazit soubor
@@ -190,6 +190,14 @@ ncclResult_t mscclInit(ncclComm_t comm) {
NCCLCHECK(ncclCudaCalloc(&status.syncFlags, MSCCL_MAX_NUM_THREAD_BLOCKS));
status.lastStream = nullptr;
status.needsProxy = false;
status.workFifoDepth = MSCCL_WORK_FIFO_DEPTH;
NCCLCHECK(ncclCudaCalloc(&status.workFifo, status.workFifoDepth, nullptr, true));
NCCLCHECK(ncclCudaHostCalloc(&status.workFifoDone, MAXCHANNELS));
status.workFifoSent = 0;
for (int i = 0; i < MAXCHANNELS; i++) {
status.workFifoSentPerChannel[i] = 0;
}
status.workFifoAckdMin = 0;
mscclSchedulerTriedLoadAlgo = false;
NCCLCHECK(mscclSchedulerInit());
@@ -494,6 +502,8 @@ ncclResult_t mscclTeardown() {
} else {
NCCLCHECK(mscclInternalSchedulerTeardown());
}
NCCLCHECK(ncclCudaFree(status.workFifo));
NCCLCHECK(ncclCudaHostFree(status.workFifoDone));
mscclInitialized.store(false, std::memory_order_release);
}
+78 -4
Zobrazit soubor
@@ -68,7 +68,7 @@ ncclResult_t mscclSetupScratch(struct mscclAlgo* hostAlgo, hipStream_t stream) {
size_t sizeNeeded = (status.nBytes * (size_t)(hostAlgo->nScratchChunks)) / (size_t)(hostAlgo->nChunksPerLoop);
if (sizeNeeded > status.scratchBufferSize){
CUDACHECK(hipStreamSynchronize(stream));
CUDACHECK(hipFree(status.scratchBuffer));
NCCLCHECK(ncclCudaFree(status.scratchBuffer));
NCCLCHECK(ncclCudaCalloc((char**)&status.scratchBuffer, sizeNeeded));
status.scratchBufferSize = sizeNeeded;
}
@@ -313,6 +313,57 @@ void* mscclKernelEntries[(ncclNumDevRedOps - 2) * ncclNumTypes * NCCL_NUM_PROTOC
#endif
};
// Comparison of monotonic rolling counters.
static inline bool rollingLess32(uint32_t a, uint32_t b) {
constexpr uint32_t PositiveMax = uint32_t(-1)>>1;
return a-b > PositiveMax;
}
static inline uint32_t rollingMin32(uint32_t a, uint32_t b) {
constexpr uint32_t PositiveMax = uint32_t(-1)>>1;
return (b-a <= PositiveMax) ? a : b;
}
static void mscclWaitWorkFifoAvailable(uint32_t desiredSent) {
mscclStatus& status = mscclGetStatus();
if (__builtin_expect(rollingLess32(status.workFifoAckdMin + status.workFifoDepth, desiredSent), false)) {
while (1) {
// We have to poll for notifications from device.
uint32_t* doneLive = status.workFifoDone;
uint32_t ackd[MAXCHANNELS];
for (int c=0; c < MAXCHANNELS; c++) {
ackd[c] = __atomic_load_n(&doneLive[c], __ATOMIC_RELAXED);
}
// Compiler-only fence to prevent fusion of loops to encourage dense loads.
__atomic_signal_fence(__ATOMIC_SEQ_CST);
uint32_t ackdAll = status.workFifoSent;
for (int c=0; c < MAXCHANNELS; c++) {
// ackdAll is min over all non-quiesced channels
if (ackd[c] != status.workFifoSentPerChannel[c])
ackdAll = rollingMin32(ackdAll, ackd[c]);
}
// Compiler only fence to prevent fusion of loops to encourage dense stores.
__atomic_signal_fence(__ATOMIC_SEQ_CST);
for (int c=0; c < MAXCHANNELS; c++) {
// Advance counter on quiesced channels so they don't lag behind
// too far where they could get lost in 32-bit wraparound.
if (ackd[c] == status.workFifoSentPerChannel[c]) {
status.workFifoSentPerChannel[c] = ackdAll;
__atomic_store_n(&doneLive[c], ackdAll, __ATOMIC_RELAXED);
}
}
status.workFifoAckdMin = ackdAll;
// See if that was enough.
if (!rollingLess32(status.workFifoAckdMin + status.workFifoDepth, desiredSent)) break;
sched_yield();
}
}
}
ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count,
ncclDataType_t dataType, ncclRedOp_t op, struct mscclAlgo* hostAlgo, struct mscclAlgo* devAlgo,
ncclComm_t comm, hipStream_t stream) {
@@ -329,7 +380,8 @@ ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count
CUDACHECK(hipStreamWaitEvent(stream, comm->doneEvent, 0));
}
dim3 grid = {(uint32_t)hostAlgo->nBlocks, 1, 1};
uint32_t numBlocks = (uint32_t)hostAlgo->nBlocks;
dim3 grid = {numBlocks, 1, 1};
dim3 block = {NCCL_MAX_NTHREADS, 1, 1};
ncclDevRedOpFull opFull = {};
NCCLCHECK(hostToDevRedOp(&opFull, op, dataType, comm));
@@ -339,7 +391,7 @@ ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count
work.scratchBuffer = status.scratchBuffer;
work.sendBuff = sendBuff;
work.recvBuff = recvBuff;
work.count = count * hostAlgo->sizeMultiplier; // count is sum of all ranks in MSCCL kernel
work.sizePerMscclChunk = count * hostAlgo->sizeMultiplier / hostAlgo->nChunksPerLoop; // count is sum of all ranks in MSCCL kernel
work.redOpArg = opFull.scalarArg;
work.workIndex = status.workIndex;
work.nChunksPerLoop = hostAlgo->nChunksPerLoop;
@@ -348,7 +400,29 @@ ncclResult_t mscclSetupKernel(const void* sendBuff, void* recvBuff, size_t count
work.redOpArgIsPtr = opFull.scalarArgIsPtr;
INFO(NCCL_INIT, "MSCCL: Setup Kernel finished");
void *args[3] = {&comm->devComm, &devAlgo, &work};
uint32_t workFifoIdxMask = status.workFifoDepth - 1;
uint32_t workFifoSent = status.workFifoSent;
// First work for a channel has to be at workHeap+blockIdx.x which means
// we cannot tolerate fifo wraparound. So round up to the wrap boundary
// if not doing so would incur crossing it.
if (((workFifoSent + numBlocks - 1) & workFifoIdxMask) < (workFifoSent & workFifoIdxMask)) {
workFifoSent = (workFifoSent + workFifoIdxMask) & ~workFifoIdxMask;
// Need to update workFifoSent so waitWorkFifoAvailable() knows we've
// skipped those elements. Consider if all the channels report quiesced,
// this way the skipped slots will be considered consumed as well.
status.workFifoSent = workFifoSent;
}
mscclWaitWorkFifoAvailable(workFifoSent + numBlocks);
for (int i = 0; i < numBlocks; i++) {
work.workFifoDoneAck = workFifoSent + i;
work.workFifoDone = status.workFifoDone + i;
status.workFifoSentPerChannel[i] = workFifoSent + i;
status.workFifo[(workFifoSent + i) & workFifoIdxMask] = work;
}
status.workFifoSent = workFifoSent + numBlocks;
struct mscclWork *workPtr = status.workFifo + (workFifoSent & workFifoIdxMask);
void *args[3] = {&comm->devComm, &devAlgo, &workPtr};
void *func = mscclKernelEntries[(opFull.op * ncclNumTypes + dataType) * NCCL_NUM_PROTOCOLS + hostAlgo->protocol];
if (enableDoneEvent) {
CUDACHECK(hipExtLaunchKernel(func, grid, block, args, 0, stream, NULL, comm->doneEvent, 0));