diff --git a/projects/rccl/CHANGELOG.md b/projects/rccl/CHANGELOG.md index 54d391ca54..0db0cc936b 100644 --- a/projects/rccl/CHANGELOG.md +++ b/projects/rccl/CHANGELOG.md @@ -2,6 +2,15 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https://rccl.readthedocs.io) +## [Unreleased] +### Optimizations +- Additional tuning for clique-based kernel AllReduce performance (still requires opt in with RCCL_ENABLE_CLIQUE=1) + +### Changed +- Replaced RCCL_FORCE_ENABLE_CLIQUE to RCCL_CLIQUE_IGNORE_TOPO +- Clique-based kernels can now be enabled on topologies where all active GPUs are XGMI-connected +- Topologies not normally supported by clique-based kernels require RCCL_CLIQUE_IGNORE_TOPO=1 + ## [RCCL-2.7.8 for ROCm 4.1.0] ### Added - Experimental support for clique-based kernels (opt in with RCCL_ENABLE_CLIQUE=1) diff --git a/projects/rccl/src/clique/CliqueManager.cc b/projects/rccl/src/clique/CliqueManager.cc index 7e6100d603..ee38baf860 100644 --- a/projects/rccl/src/clique/CliqueManager.cc +++ b/projects/rccl/src/clique/CliqueManager.cc @@ -42,13 +42,14 @@ THE SOFTWARE. #include #include -cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {}; -int* CliqueManager::m_staticGpuBarrierMem = NULL; +cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {}; +int CliqueManager::m_staticBarrierCount[NCCL_MAX_OPS*2] = {}; +int* CliqueManager::m_staticGpuBarrierMem = NULL; // Define some environment variables that affect clique-based kernels -RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels -RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 16777216); // Max number of bytes to use clique-based kernels for all reduce -RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 0); // Number of channels to use for all-reduce. (0 for auto-select) +RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels +RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 0); // Max number of bytes to use clique-based kernels for all reduce (0 for auto-select) +RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 0); // Number of channels to use for all-reduce. (0 for auto-select) CliqueManager::CliqueManager(int const rank, int const numRanks, @@ -56,6 +57,8 @@ CliqueManager::CliqueManager(int const rank, m_rank(rank), m_numRanks(numRanks), m_cliqueMode(cliqueMode), + m_opIndexHead(0), + m_opIndexTail(0), m_init(false), m_pinnedCliquePtrs(NULL), m_fineGrainBarrierMem(NULL) @@ -113,7 +116,11 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix) m_init = true; m_hash = djb2Hash(commId->internal); - if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + if (m_cliqueMode == CLIQUE_DISABLED) + { + INFO(NCCL_INIT, "Clique kernels disabled"); + return ncclSuccess; + } // Check parameters if (m_rank < 0 || m_rank >= m_numRanks) @@ -190,7 +197,7 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix) } // Initialize shared CPU memory to be used for barrier variables - m_sharedCpuMemory = ShmObject(2 * sizeof(int32_t), + m_sharedCpuMemory = ShmObject(NCCL_MAX_OPS * 2 * sizeof(int32_t), CliqueShmNames["SharedCounters"] + shmSuffix, m_rank, m_numRanks, @@ -198,19 +205,18 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix) NCCLCHECKGOTO(m_sharedCpuMemory.Open(), res, dropback); // Split up the shared CPU memory for barrier counters / global sense - m_cpuBarrierGlobalCount = &m_sharedCpuMemory.Get()[0]; - m_cpuBarrierGlobalSense = &m_sharedCpuMemory.Get()[1]; + m_cpuBarrierCount = m_sharedCpuMemory.Get(); // Initialize CPU barriers if (m_rank == 0) { - *m_cpuBarrierGlobalCount = 0; - *m_cpuBarrierGlobalSense = 0; + memset(m_cpuBarrierCount, 0, NCCL_MAX_OPS * 2 * sizeof(int32_t)); } - m_cpuBarrierLocalSense = 0; } else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) { + m_cpuBarrierCount = &m_staticBarrierCount[0]; + // First rank prepares fine-grained memory shared across ranks used for the two barrier variables if (m_rank == 0) { @@ -224,9 +230,18 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix) } } + // Figure out device arch for tuning + int deviceId; + CUDACHECK(hipGetDevice(&deviceId)); + hipDeviceProp_t devProp; + CUDACHECK(hipGetDeviceProperties(&devProp, deviceId)); + m_gcnArch = devProp.gcnArch; + + // Establish when to use clique-based kernels based on input size + SetByteLimits(); m_init = true; - INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d)", m_cliqueMode); + INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d) [GCN %d]", m_cliqueMode, m_gcnArch); return ncclSuccess; dropback: @@ -238,6 +253,20 @@ dropback: return ncclSuccess; } +void CliqueManager::SetByteLimits() +{ + m_allReduceByteLimit = rcclParamAllReduceCliqueByteLimit(); + if (m_allReduceByteLimit == 0) + { + switch (m_gcnArch) + { + case 906: m_allReduceByteLimit = 16777216; break; + case 908: m_allReduceByteLimit = 536870912; break; + default: m_allReduceByteLimit = 16777216; break; + } + } +} + bool CliqueManager::IsSupported(ncclFunc_t const coll, size_t const count, ncclDataType_t const datatype, @@ -247,12 +276,11 @@ bool CliqueManager::IsSupported(ncclFunc_t const coll, // Filter based on total input size for each collective type size_t totalBytes = count * ncclTypeSize(datatype); - if (coll == ncclFuncAllReduce && (totalBytes <= rcclParamAllReduceCliqueByteLimit())) return true; - + if (coll == ncclFuncAllReduce && (totalBytes <= m_allReduceByteLimit)) return true; return false; } -ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr) +ncclResult_t CliqueManager::DeclarePointers(void const* inputPtr, void* outputPtr) { // Do nothing if disabled if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; @@ -263,11 +291,11 @@ ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputP return ncclInvalidUsage; } - int const opIndex = opCount % NCCL_MAX_OPS; - - // Add opIndex to queue of in-progress collectives - m_inProgress.push(opIndex); + // Add to queue of in-progress collectives + int32_t const opIndex = m_opIndexTail; + m_opIndexTail = (m_opIndexTail + 1) % NCCL_MAX_OPS; + INFO(NCCL_COLL, "Rank %d declaring pointers for opIndex %d", m_rank, opIndex); if (m_cliqueMode == CLIQUE_SINGLE_NODE) { // Get fine-grained device memory if not already done @@ -302,6 +330,18 @@ ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputP m_staticCliquePtrs[opIndex].outputs[m_rank] = outputPtr; } + // Increment entry barrier counter - must not block + volatile int* entryCounter = &m_cpuBarrierCount[2 * opIndex]; + int entryVal = LOAD(entryCounter); + // Loop until successful atomic update to counter + bool done = false; + while (done == false) { + // Last rank resets exit barrier counter prior to incrementing entry count to numRanks + if (entryVal+1 == m_numRanks) + m_cpuBarrierCount[2 * opIndex + 1] = 0; + done = __sync_bool_compare_and_swap(entryCounter, entryVal, entryVal+1); + entryVal++; + } return ncclSuccess; } @@ -320,12 +360,27 @@ ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll, { // NOTE: These are currently based on collected data and not necessarily ideal for all hardware int numChannels; - if (totalBytes <= 65536) numChannels = 1; - else if (totalBytes <= 262144) numChannels = 2; - else if (totalBytes <= 524288) numChannels = 4; - else if (totalBytes <= 2097152) numChannels = 8; - else numChannels = 11; - + switch (m_gcnArch) + { + case 906: + if (totalBytes <= 16384) numChannels = 1; + else numChannels = 2; + break; + case 908: + if (totalBytes <= 262144) numChannels = 4; + else numChannels = 14; + break; + case 910: + if (totalBytes <= 262144) numChannels = 4; + else numChannels = 8; + break; + default: + if (totalBytes <= 65536) numChannels = 1; + else if (totalBytes <= 262144) numChannels = 2; + else if (totalBytes <= 524288) numChannels = 4; + else if (totalBytes <= 2097152) numChannels = 8; + else numChannels = 11; + } *numChannelstoUse = std::min(numChannels, totalNumChannels); } else @@ -333,82 +388,79 @@ ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll, *numChannelstoUse = std::min((int)rcclParamAllReduceNumChannels(), totalNumChannels); } } - return ncclSuccess; } - - -ncclResult_t CliqueManager::SetCliqueArgs(ncclWorkElem* args) +ncclResult_t CliqueManager::WaitForPointers(ncclWorkElem* args) { // Do nothing if disabled if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + if (!m_init) { WARN("CliqueManager must be initialized before use"); return ncclInvalidUsage; } - // Prepare clique argments (NOTE: clique pointers are not ready yet) - int opIndex = args->opCount % NCCL_MAX_OPS; + // Check that collective queue is not empty + if (m_opIndexHead == m_opIndexTail) + { + WARN("WaitForPointers must be called after DeclarePointers"); + return ncclInvalidUsage; + } + + // Pop first collective off queue + int32_t const opIndex = m_opIndexHead; + INFO(NCCL_COLL, "Rank %d waiting for pointers for opIndex %d", m_rank, opIndex); + + m_opIndexHead = (m_opIndexHead + 1) % NCCL_MAX_OPS; args->clique.ptrs = &m_pinnedCliquePtrs[opIndex]; - return ncclSuccess; -} + // Wait for all ranks to declare pointers for this opIndex + volatile int* entryCounter = (volatile int*)(&m_cpuBarrierCount[2 * opIndex]); + int entryVal = LOAD(entryCounter); + while (entryVal != m_numRanks) entryVal = LOAD(entryCounter); -ncclResult_t CliqueManager::WaitForPointers() -{ - // Do nothing if disabled - if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; - - if (!m_init) - { - WARN("CliqueManager must be initialized before use"); - return ncclInvalidUsage; + // Last rank to past barrier resets entry barrier + // NOTE: There is another GPU-barrier performed during the kernels therefore it should + // not be possible for any rank to modify entry count prior to being reset + volatile int* exitCounter = &m_cpuBarrierCount[2 * opIndex + 1]; + int exitVal = LOAD(exitCounter); + // Loop until successful atomic update to counter + bool done = false; + while (done == false) { + // Last rank resets entry counter + if (exitVal+1 == m_numRanks) + m_cpuBarrierCount[2 * opIndex] = 0; + done = __sync_bool_compare_and_swap(exitCounter, exitVal, exitVal+1); + exitVal++; } + INFO(NCCL_COLL, "Rank %d past opIndex barrier %d", m_rank, opIndex); - // Do nothing if there are no outstanding clique-kernels - if (m_inProgress.empty()) return ncclSuccess; - - // Copy clique device pointers to pinned device memory + // Collect pointers if (m_cliqueMode == CLIQUE_SINGLE_NODE) { - // Wait for all ranks to arrive - WaitForBarrier(); - int numHandles = m_numRanks * NUM_HANDLES_PER_RANK; std::vector> handles(numHandles); - while (!m_inProgress.empty()) + // Collect the ready handles from shared memory and convert them to device pointers + NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles)); + for (int i = 0; i < m_numRanks; i++) { - int const opIndex = m_inProgress.front(); - m_inProgress.pop(); + void *input; + NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK], + m_ipcHandleRecvCache, &input)); + m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast(input); - // Collect the ready handles from shared memory and convert them to device pointers - NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles)); - for (int i = 0; i < m_numRanks; i++) - { - void *input; - NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK], - m_ipcHandleRecvCache, &input)); - m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast(input); - - NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1], - m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i])); - } + NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1], + m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i])); } } else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) { - while (!m_inProgress.empty()) - { - int const opIndex = m_inProgress.front(); - m_inProgress.pop(); - - // Copy from static memory to pinned host memory and set local sense - memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t)); - m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex]; - } + // Copy from static memory to pinned host memory and set local sense + memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t)); + m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex]; } return ncclSuccess; } @@ -486,21 +538,6 @@ ncclResult_t CliqueManager::CheckCacheForHandle(std::pair #include -#include #include "nccl.h" #include "devcomm.h" @@ -53,6 +52,8 @@ public: ncclResult_t Init(ncclUniqueId const* commId, int suffix); + void SetByteLimits(); + // Returns true if the collective is supported via a clique-based kernel bool IsSupported(ncclFunc_t const coll, size_t const count, @@ -60,7 +61,7 @@ public: ncclRedOp_t const op) const; // Provide the pointers to be exchanged across the clique for the given rank / opCount - ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr); + ncclResult_t DeclarePointers(void const* inputPtr, void* outputPtr); // Determine the number of channels / CUs to use for this call ncclResult_t GetNumChannelsToUse(ncclFunc_t const coll, @@ -70,12 +71,9 @@ public: int const totalNumChannels, uint8_t* numChannelstoUse); - // Set pointers for where clique-related arguments will be found - // This sets pointers to device-accessible memory where the arguments will eventually reside - ncclResult_t SetCliqueArgs(ncclWorkElem* args); - - // Blocking call that only returns after all out-standing clique pointers are ready - ncclResult_t WaitForPointers(); + // Blocking call that only returns the in-progress clique pointers are ready + // This needs to be called in same order as DeclarePointers + ncclResult_t WaitForPointers(ncclWorkElem* args); // Prepares shared memory files upon initialization static ncclResult_t BootstrapRootInit(int pid, unsigned long hash); @@ -90,19 +88,20 @@ protected: NcclIpcHandleRecvCache* cache, void** ptr); - // Race-condition helper functions - void WaitForBarrier(); - int m_rank; // Associated rank int m_numRanks; // Total number of ranks unsigned long m_hash; // Hash used for identifying message queues & shared memory cliqueMode_t m_cliqueMode; // Clique mode (off/single process/single node) + int32_t m_opIndexHead; // Track start of outstanding requests + int32_t m_opIndexTail; // Track end of outstanding requests bool m_init; // Whether CliqueManager has been initialized + int m_gcnArch; // Device GCN arch value + size_t m_allReduceByteLimit; // Byte limit for AllReduce cliqueDevicePtrs_t* m_pinnedCliquePtrs; // Pinned-host-memory (device accessible) containing device pointers int* m_gpuBarrierGlobalCount; // Part of GPU barrier (count variable shared across ranks) int* m_gpuBarrierGlobalSense; // Part of GPU barrier (reset variable shared across ranks) int* m_gpuBarrierLocalSense; // Part of GPU barrier (reset variable local to this rank) - std::queue m_inProgress; // Queue of clique-based collectives waiting for pointers + int* m_cpuBarrierCount; // Points to either m_sharedBarrierCount or m_staticBarrierCount // IPC-related (CLIQUE_SINGLE_NODE) NcclIpcHandleShm m_shmHandles; // Used to exchange IPC handles between ranks @@ -111,12 +110,11 @@ protected: ShmObject m_sharedCpuMemory; // Used to pass shared memory used for CPU barrier ShmObject m_sharedIpcHandle; // Used to pass fine-grained device memory buffer IPC handle int* m_fineGrainBarrierMem; // Fine-grained GPU memory barrier (allocated only on 1st rank, shared on others) - int* m_cpuBarrierGlobalCount; // Part of CPU barrier (count variable shared across ranks) - int* m_cpuBarrierGlobalSense; // Part of CPU barrier (reset variable shared across ranks) - int m_cpuBarrierLocalSense; // Part of CPU barrier (reset variable local to this rank) + int* m_sharedBarrierCount; // Part of CPU barrier (count variable shared across ranks) // Single-process (CLIQUE_SINGLE_PROCESS) static cliqueDevicePtrs_t m_staticCliquePtrs[NCCL_MAX_OPS]; // Use shared static memory to exchange pointer info + static int m_staticBarrierCount[2*NCCL_MAX_OPS]; // Part of CPU barrier (count variable shared across ranks) static int* m_staticGpuBarrierMem; // Static storage backing for fine-grained gpu barrier }; diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index b5365a492f..35dae896e8 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -128,10 +128,6 @@ static ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) STORE(&channel->workFifo[(channel->workFifoTail-1)%NCCL_MAX_OPS].elems[0].active, 2); } - { // [RCCL] Wait for any clique-based collectives - NCCLCHECK(comm->cliqueManager->WaitForPointers()); - } // [/RCCL] - // Find the first operation, choose the kernel accordingly and pass it // as the first argument. struct ncclChannel* c0 = comm->channels; @@ -394,12 +390,6 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo info->datatype, info->op)) { - // Declare the input / output pointers being used (to exchange via IPC with other ranks) - NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount, - info->sendbuff, - info->recvbuff)); - - info->algorithm = NCCL_ALGO_RING; info->protocol = NCCL_PROTO_CLIQUE; // Determine the number of channels to use for clique-kernel @@ -411,6 +401,9 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo &work->clique.nChannels)); work->clique.count = info->count; work->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol); + + // Setup pointers to where all the input/output pointers will be + NCCLCHECK(info->comm->cliqueManager->WaitForPointers(work)); return ncclSuccess; } } // [RCCL] @@ -519,12 +512,6 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) { info->comm->myParams->gridDim.x++; - // [RCCL] Setup pointers to where all the input/output pointers will be - if (info->protocol == NCCL_PROTO_CLIQUE) { - NCCLCHECK(info->comm->cliqueManager->SetCliqueArgs(&work)); - } - // [/RCCL] - work.coll.bid = bid % nChannels; NCCLCHECK(getNextOp(channel, NULL, &work)); } @@ -664,6 +651,20 @@ ncclResult_t ncclSaveP2pKernel(struct ncclInfo* info) { } ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) { + // [RCCL] Check for clique-based kernel support + { + if (info->comm->cliqueManager->IsSupported(info->coll, + info->count, + info->datatype, + info->op)) + { + // Declare the input / output pointers being used (to exchange via IPC with other ranks) + // This is done immediately, and does not block + NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->sendbuff, info->recvbuff)); + } + } + // [/RCCL] + // Launch asynchronously if needed if (ncclAsyncMode()) { ncclResult_t ret = ncclSuccess; diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index 80de771fe0..3c981e07fb 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -30,6 +30,7 @@ // [RCCL] #include "clique/CliqueManager.h" +#include // [/RCCL] #define STR2(v) #v @@ -363,7 +364,7 @@ static ncclResult_t commFree(ncclComm_t comm) { return ncclSuccess; } -RCCL_PARAM(ForceEnableClique, "FORCE_ENABLE_CLIQUE", 0); +RCCL_PARAM(CliqueIgnoreTopo, "CLIQUE_IGNORE_TOPO", 0); RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 0); static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { @@ -865,7 +866,8 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED; if (comm->localRanks == comm->nRanks) { - // Check that all the GPUs have peer access to one another + // Check that all the GPUs have peer access to one another and are XGMI connected + bool allXgmi = true; bool hasPeerAccess = true; for (int i = 0; i < nranks && hasPeerAccess; i++) { @@ -880,6 +882,10 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm hasPeerAccess = false; break; } + + uint32_t linkType, hopCount; + CUDACHECK(hipExtGetLinkTypeAndHopCount(i, j, &linkType, &hopCount)); + allXgmi &= (linkType == HSA_AMD_LINK_INFO_TYPE_XGMI); } } if (hasPeerAccess) @@ -890,15 +896,11 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE; } - // For now, only enable clique-based kernels on CR8_G topologies, unless explicitly asked - if (!rcclParamForceEnableClique()) + // For now, only enable clique-based kernels on nodes where all GPUs are XGMI connected + if (!allXgmi && !rcclParamCliqueIgnoreTopo()) { - // Disable clique-kernel support if not on CR8 topology - if (!(comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_CR8G))) - { - INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (force enable with RCCL_FORCE_ENABLE_CLIQUE)"); - cliqueMode = CliqueManager::CLIQUE_DISABLED; - } + INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (ignore with RCCL_CLIQUE_IGNORE_TOPO)"); + cliqueMode = CliqueManager::CLIQUE_DISABLED; } } comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode); diff --git a/projects/rccl/test/CMakeLists.txt b/projects/rccl/test/CMakeLists.txt index 485f83311d..b9311a355e 100644 --- a/projects/rccl/test/CMakeLists.txt +++ b/projects/rccl/test/CMakeLists.txt @@ -11,6 +11,7 @@ if(BUILD_TESTS) set(TEST_SOURCES_SINGLE_PROCESS test_AllGather.cpp test_AllReduce.cpp + test_AllReduceGroup.cpp test_Broadcast.cpp test_Reduce.cpp test_ReduceScatter.cpp @@ -27,6 +28,7 @@ if(BUILD_TESTS) set(TEST_SOURCES_MULTI_PROCESS test_AllGatherMultiProcess.cpp test_AllReduceMultiProcess.cpp + test_AllReduceGroupMultiProcess.cpp test_AllToAllMultiProcess.cpp test_BroadcastMultiProcess.cpp test_CombinedCallsMultiProcess.cpp diff --git a/projects/rccl/test/test_AllReduce.cpp b/projects/rccl/test/test_AllReduce.cpp index 28f76e52d2..1ef7b410e5 100644 --- a/projects/rccl/test/test_AllReduce.cpp +++ b/projects/rccl/test/test_AllReduce.cpp @@ -8,56 +8,56 @@ namespace CorrectnessTests { - TEST_P(AllReduceCorrectnessTest, Correctness) + TEST_P(AllReduceCorrectnessTest, Correctness) + { + if (numDevices > numDevicesAvailable) return; + + // Prepare input / output / expected results + Dataset dataset; + dataset.Initialize(numDevices, numElements, dataType, inPlace); + FillDatasetWithPattern(dataset); + ComputeExpectedResults(dataset, op); + + // Launch the reduction (1 thread per GPU) + ncclGroupStart(); + for (int i = 0; i < numDevices; i++) { - if (numDevices > numDevicesAvailable) return; - - // Prepare input / output / expected results - Dataset dataset; - dataset.Initialize(numDevices, numElements, dataType, inPlace); - FillDatasetWithPattern(dataset); - ComputeExpectedResults(dataset, op); - - // Launch the reduction (1 thread per GPU) - ncclGroupStart(); - for (int i = 0; i < numDevices; i++) - { - ncclAllReduce(dataset.inputs[i], dataset.outputs[i], - numElements, dataType, op, comms[i], streams[i]); - } - ncclGroupEnd(); - - // Wait for reduction to complete - Synchronize(); - - // Check results - ValidateResults(dataset); - - dataset.Release(); + ncclAllReduce(dataset.inputs[i], dataset.outputs[i], + numElements, dataType, op, comms[i], streams[i]); } + ncclGroupEnd(); - INSTANTIATE_TEST_SUITE_P(AllReduceCorrectnessSweep, - AllReduceCorrectnessTest, - testing::Combine( - // Reduction operator - testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), - // Data types - testing::Values(ncclInt8, - ncclUint8, - ncclInt32, - ncclUint32, - ncclInt64, - ncclUint64, - //ncclFloat16, - ncclFloat32, - ncclFloat64, - ncclBfloat16), - // Number of elements - testing::Values(1024, 1048576), - // Number of devices - testing::Values(2,3,4,5,6,7,8), - // In-place or not - testing::Values(false, true), - testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), - CorrectnessTest::PrintToStringParamName()); + // Wait for reduction to complete + Synchronize(); + + // Check results + ValidateResults(dataset); + + dataset.Release(); + } + + INSTANTIATE_TEST_SUITE_P(AllReduceCorrectnessSweep, + AllReduceCorrectnessTest, + testing::Combine( + // Reduction operator + testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4,5,6,7,8), + // In-place or not + testing::Values(false, true), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), + CorrectnessTest::PrintToStringParamName()); } // namespace diff --git a/projects/rccl/test/test_AllReduceGroup.cpp b/projects/rccl/test/test_AllReduceGroup.cpp new file mode 100644 index 0000000000..5b4cbad4e5 --- /dev/null +++ b/projects/rccl/test/test_AllReduceGroup.cpp @@ -0,0 +1,66 @@ +/************************************************************************* + * Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_AllReduceGroup.hpp" + +namespace CorrectnessTests +{ + // This tests aggregated AllReduce calls within a group + TEST_P(AllReduceGroupCorrectnessTest, Correctness) + { + if (numDevices > numDevicesAvailable) return; + + // Prepare input / output / expected results + Dataset dataset1, dataset2, dataset3; + dataset1.Initialize(numDevices, numElements, dataType, inPlace); + dataset2.Initialize(numDevices, numElements, dataType, inPlace); + dataset3.Initialize(numDevices, numElements, dataType, inPlace); + FillDatasetWithPattern(dataset1); + FillDatasetWithPattern(dataset2); + FillDatasetWithPattern(dataset3); + ComputeExpectedResults(dataset1, op); + ComputeExpectedResults(dataset2, op); + ComputeExpectedResults(dataset3, op); + + // Launch the reduction (1 thread per GPU) + ncclGroupStart(); + for (int i = 0; i < numDevices; i++) + { + ncclAllReduce(dataset1.inputs[i], dataset1.outputs[i], numElements, dataType, op, comms[i], streams[i]); + ncclAllReduce(dataset2.inputs[i], dataset2.outputs[i], numElements, dataType, op, comms[i], streams[i]); + ncclAllReduce(dataset3.inputs[i], dataset3.outputs[i], numElements, dataType, op, comms[i], streams[i]); + } + ncclGroupEnd(); + + // Wait for reduction to complete + Synchronize(); + + // Check results + ValidateResults(dataset1); + ValidateResults(dataset2); + ValidateResults(dataset3); + + dataset1.Release(); + dataset2.Release(); + dataset3.Release(); + } + + INSTANTIATE_TEST_SUITE_P(AllReduceGroupCorrectnessSweep, + AllReduceGroupCorrectnessTest, + testing::Combine( + // Reduction operator + testing::Values(ncclSum), + // Data types + testing::Values(ncclFloat32, ncclFloat64), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4,5,6,7,8), + // In-place or not + testing::Values(false, true), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/projects/rccl/test/test_AllReduceGroup.hpp b/projects/rccl/test/test_AllReduceGroup.hpp new file mode 100644 index 0000000000..e21da66dd2 --- /dev/null +++ b/projects/rccl/test/test_AllReduceGroup.hpp @@ -0,0 +1,79 @@ +/************************************************************************* + * Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_ALLREDUCEGROUP_HPP +#define TEST_ALLREDUCEGROUP_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class AllReduceGroupCorrectnessTest : public CorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, ncclRedOp_t const op) + { + // Copy all inputs to expected arrays temporarily to perform reduction on host + for (int i = 0; i < dataset.numDevices; i++) + HIP_CALL(hipMemcpy(dataset.expected[i], dataset.inputs[i], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + + // Allocate temporary host array to accumulate results + int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes()); + uint8_t* resultU1 = (uint8_t *)resultI1; + int32_t* resultI4 = (int32_t *)resultI1; + uint32_t* resultU4 = (uint32_t *)resultI1; + int64_t* resultI8 = (int64_t *)resultI1; + uint64_t* resultU8 = (uint64_t *)resultI1; + float* resultF4 = (float *)resultI1; + double* resultF8 = (double *)resultI1; + rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; + + // Initialize the result with the first device's array + memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); + + // Perform reduction on the other device arrays + for (int i = 1; i < dataset.numDevices; i++) + { + int8_t* arrayI1 = (int8_t *)dataset.expected[i]; + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + for (int j = 0; j < dataset.numElements; j++) + { + switch (dataset.dataType) + { + case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; + case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; + case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; + case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; + case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; + case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; + case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; + case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; + case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + } + } + + // Copy results into expected arrays + for (int i = 0; i < dataset.numDevices; i++) + memcpy(dataset.expected[i], resultI1, dataset.NumBytes()); + + free(resultI1); + } + }; +} + +#endif diff --git a/projects/rccl/test/test_AllReduceGroupMultiProcess.cpp b/projects/rccl/test/test_AllReduceGroupMultiProcess.cpp new file mode 100644 index 0000000000..1c43a9d3e2 --- /dev/null +++ b/projects/rccl/test/test_AllReduceGroupMultiProcess.cpp @@ -0,0 +1,81 @@ +/************************************************************************* + * Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "test_AllReduceGroupMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(AllReduceGroupMultiProcessCorrectnessTest, Correctness) + { + // Important: Make sure the order of ncclFunc_t's here match the order of ncclFunc_ts + // as they appear in TestGroupCalls() + std::vector ncclFuncs; + ncclFuncs.push_back(ncclCollAllReduce); + ncclFuncs.push_back(ncclCollAllReduce); + ncclFuncs.push_back(ncclCollAllReduce); + + // Create multiple datasets for combined operation + std::vector datasets(ncclFuncs.size()); + for (int i = 0; i < datasets.size(); i++) + { + datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]); + } + + int const numGpusPerProcess = 2; + int const numProcesses = numDevices / numGpusPerProcess; + std::vector pids(numProcesses); + int process = -1; + + for (int i = 0; i < numDevices; i+= numGpusPerProcess) + { + process++; + int pid = fork(); + if (pid == 0) + { + int gpuIdx = i; + int maxIdx = gpuIdx + (numGpusPerProcess - 1) >= numDevices ? numDevices : gpuIdx + numGpusPerProcess; + + std::vector ranks; + for (; gpuIdx < maxIdx; gpuIdx++) + { + ranks.push_back(gpuIdx); + } + + bool pass; + TestGroupCalls(process, ranks, datasets, ncclFuncs, pass); + TerminateChildProcess(pass); + } + else + { + pids[process] = pid; + } + } + + ValidateProcesses(pids); + + for (int i = 0; i < datasets.size(); i++) + { + munmap(datasets[i], sizeof(Dataset)); + } + } + + INSTANTIATE_TEST_SUITE_P(AllReduceGroupMultiProcessCorrectnessSweep, + AllReduceGroupMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator (not used) + testing::Values(ncclSum), + // Data types + testing::Values(ncclFloat32, + ncclFloat64), + // Number of elements + testing::Values(3072, 3145728), + // Number of devices + testing::Values(4,8), + // In-place or not + testing::Values(false, true), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/projects/rccl/test/test_AllReduceGroupMultiProcess.hpp b/projects/rccl/test/test_AllReduceGroupMultiProcess.hpp new file mode 100644 index 0000000000..9eb141ba58 --- /dev/null +++ b/projects/rccl/test/test_AllReduceGroupMultiProcess.hpp @@ -0,0 +1,105 @@ +/************************************************************************* + * Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef TEST_ALLREDUCEGROUP_MULTI_PROCESS_HPP +#define TEST_ALLREDUCEGROUP_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" +#include "test_AllReduceMultiProcess.hpp" +#include + +namespace CorrectnessTests +{ + class AllReduceGroupMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + void TestGroupCalls(int process, std::vector const& ranks, std::vector& datasets, std::vector const& funcs, bool& pass) + { + ncclGroupStart(); + for (int i = 0; i < ranks.size(); i++) + { + SetUpPerProcess(ranks[i], funcs, comms[ranks[i]], streams[ranks[i]], datasets); + if (numDevices > numDevicesAvailable) + { + break; + } + } + ncclGroupEnd(); + + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } + + int numProcesses = numDevices / ranks.size(); + Barrier barrier(process, numProcesses, std::atoi(getenv("NCCL_COMM_ID"))); + + for (int i = 0; i < ranks.size(); i++) + { + for (int j = 0; j < datasets.size(); j++) + { + FillDatasetWithPattern(*datasets[j], ranks[i]); + } + } + + int const root = 0; + + for (int i = 0; i < 3; i++) + { + AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[i], barrier, op, ranks); + } + barrier.Wait(); + + size_t const byteCount = datasets[0]->NumBytes() / numDevices; + size_t const elemCount = numElements / numDevices; + + ncclGroupStart(); + // AllReduce + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + for (int j = 0; j < 3; j++) + { + ncclAllReduce(datasets[j]->inputs[rank], datasets[j]->outputs[rank], + numElements, dataType, op, comms[rank], streams[rank]); + } + } + // Signal end of group call + ncclGroupEnd(); + + for (int i = 0; i < ranks.size(); i++) + { + HIP_CALL(hipSetDevice(ranks[i])); + HIP_CALL(hipStreamSynchronize(streams[ranks[i]])); + } + + for (int i = 0; i < funcs.size(); i++) + { + for (int j = 0; j < ranks.size(); j++) + { + pass = ValidateResults(*datasets[i], ranks[j], root); + if (!pass) + { + break; + } + } + barrier.Wait(); + for (int j = 0; j < ranks.size(); j++) + { + datasets[i]->Release(ranks[j]); + } + } + + for (int i = 0; i < ranks.size(); i++) + { + TearDownPerProcess(comms[ranks[i]], streams[ranks[i]]); + } + } + }; +} + +#endif diff --git a/projects/rccl/test/test_AllReduceMultiProcess.cpp b/projects/rccl/test/test_AllReduceMultiProcess.cpp index 34ea532f40..dbea0fec75 100644 --- a/projects/rccl/test/test_AllReduceMultiProcess.cpp +++ b/projects/rccl/test/test_AllReduceMultiProcess.cpp @@ -8,53 +8,53 @@ namespace CorrectnessTests { - TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness) + TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness) + { + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce); + std::vector pids(numDevices); + + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce); - std::vector pids(numDevices); - - int gpu = -1; - for (int i = 0; i < numDevices; i++) - { - gpu++; - int pid = fork(); - if (pid == 0) - { - bool pass; - TestAllReduce(gpu, *dataset, pass); - TerminateChildProcess(pass); - } - else - { - pids[gpu] = pid; - } - } - - ValidateProcesses(pids); + gpu++; + int pid = fork(); + if (pid == 0) + { + bool pass; + TestAllReduce(gpu, *dataset, pass); + TerminateChildProcess(pass); + } + else + { + pids[gpu] = pid; + } } - INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep, - AllReduceMultiProcessCorrectnessTest, - testing::Combine( - // Reduction operator - testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), - // Data types - testing::Values(ncclInt8, - ncclUint8, - ncclInt32, - ncclUint32, - ncclInt64, - ncclUint64, - //ncclFloat16, - ncclFloat32, - ncclFloat64, - ncclBfloat16), - // Number of elements - testing::Values(1024, 1048576), - // Number of devices - testing::Values(2,3,4,8), - // In-place or not - testing::Values(false, true), - testing::Values("")), - CorrectnessTest::PrintToStringParamName()); + ValidateProcesses(pids); + } + + INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep, + AllReduceMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator + testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4,8), + // In-place or not + testing::Values(false, true), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), + CorrectnessTest::PrintToStringParamName()); } // namespace