diff --git a/projects/rccl/CHANGELOG.md b/projects/rccl/CHANGELOG.md index 60276a7b01..2388e7c640 100644 --- a/projects/rccl/CHANGELOG.md +++ b/projects/rccl/CHANGELOG.md @@ -3,8 +3,17 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https://rccl.readthedocs.io) ## [Unreleased] +### Added +- Experimental support for clique-based kernels (opt in with RCCL_ENABLE_CLIQUE=1) +- Clique-based kernels may offer better performance for smaller input sizes +- Clique-based kernels are currently only enabled for AllReduce under a certain byte limit (controlled via RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT) ### Optimizations - Performance improvements for Rome-based systems +### Known issues +- Clique-based kernels are currently experimental and have not been fully tested on all topologies. By default, clique-based kernels are disabled if the detected topology is not supported (override with RCCL_FORCE_ENABLE_CLIQUE) +- Clique-based kernels may hang if there are differences between environment variables set across ranks. +- Clique-based kernels may fail if the input / output device pointers are not the base device pointers returned by hipMalloc. + ## [RCCL-2.7.8 for ROCm 3.9.0] ### Added diff --git a/projects/rccl/CMakeLists.txt b/projects/rccl/CMakeLists.txt index e10c785a5c..cc6ddcb929 100644 --- a/projects/rccl/CMakeLists.txt +++ b/projects/rccl/CMakeLists.txt @@ -126,6 +126,12 @@ set(CC_SOURCES src/collectives/all_to_all_api.cc src/collectives/all_to_allv_api.cc src/channel.cc + src/clique/CliqueManager.cc # RCCL + src/clique/HandleCache.cc # RCCL + src/clique/HandleShm.cc # RCCL + src/clique/Hash.cc # RCCL + src/clique/MsgQueue.cc # RCCL + src/clique/ShmObject.cc # RCCL src/misc/argcheck.cc src/misc/nvmlwrap_stub.cc src/misc/utils.cc diff --git a/projects/rccl/src/bootstrap.cc b/projects/rccl/src/bootstrap.cc index e90dd66823..71e7070284 100644 --- a/projects/rccl/src/bootstrap.cc +++ b/projects/rccl/src/bootstrap.cc @@ -12,6 +12,11 @@ #include "socket.h" #include #include +// [RCCL] +#include "clique/CliqueManager.h" +#include "clique/CliqueShmNames.h" +#include "clique/Hash.h" +// [/RCCL] struct bootstrapNetComm { int fd; @@ -163,7 +168,14 @@ static ncclResult_t setFilesLimit() { return ncclSuccess; } -static void *bootstrapRoot(void* listenComm) { +static void *bootstrapRoot(void* bootstrapRootStruct) { // [RCCL] Modified to include hash argument) + // [RCCL] Unpack bootstrapRootStruct + struct bootstrapRootStruct* rootStruct = (struct bootstrapRootStruct*) bootstrapRootStruct; + void* listenComm = rootStruct->listenComm; + unsigned long hash = rootStruct->hash; + int pid = getpid(); // sharing PID to other ranks for creating shared memory files for CliqueManager + // [/RCCL] + struct extInfo info; ncclNetHandle_t *rankHandles = NULL; ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange @@ -205,12 +217,19 @@ static void *bootstrapRoot(void* listenComm) { } while (c < nranks); TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks); + { // [RCCL] Initialize message queues / shared memory files + NCCLCHECKGOTO(CliqueManager::BootstrapRootInit(pid, hash), res, out); + } // [/RCCL] + // Send the connect handle for the next rank in the AllGather ring for (int r=0; rhash = djb2Hash(id->internal); + rootStruct->listenComm = listenComm; + pthread_create(&thread, NULL, bootstrapRoot, (void *)rootStruct); + // [/RCCL] + return ncclSuccess; } @@ -267,9 +293,10 @@ struct extState { int rank; int nranks; int dev; + int rootPid; // [RCCL] PID of root }; -ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) { +ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState, int* rootPid) { // [RCCL] Adding rootPid ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id; bool idFromEnv = getenv("NCCL_COMM_ID") != NULL; struct extState* state; @@ -314,6 +341,9 @@ ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commS ncclNetHandle_t extHandleNext; NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm)); NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext))); + { // [RCCL] Receive PID from root + NCCLCHECK(bootstrapNetRecv(tmpRecvComm, rootPid, sizeof(int))); + } // [/RCCL] NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm)); NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot)); diff --git a/projects/rccl/src/clique/AllReduceCliqueKernel.h b/projects/rccl/src/clique/AllReduceCliqueKernel.h new file mode 100644 index 0000000000..a46ed6a219 --- /dev/null +++ b/projects/rccl/src/clique/AllReduceCliqueKernel.h @@ -0,0 +1,75 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef ALLREDUCECLIQUEKERNEL_H +#define ALLREDUCECLIQUEKERNEL_H + +#include "CliqueCommon.h" +#include "devcomm.h" +#include "reduce_kernel.h" +#include "common_kernel.h" + +template +__device__ void AllReduceCliqueSplitKernel(struct CollectiveArgs* args) +{ + // Clique-specific kernel arguments + cliqueDevicePtrs_t* cliquePtrs = args->clique.ptrs; // Collection of all input/output pointers across ranks in clique + size_t const N = args->clique.count; // Total number of elements to reduce + int const nBlocks = args->clique.nChannels; // Total number of blocks assigned to this kernel (may be different than gridDim.x) + int const blockId = args->clique.bid; // 0-indexed blockIdx for this threadblock (may be different than blockIdx.x) + int const rank = args->comm->rank; // Current rank + + // Each threadblock works independently of others on a subsection of the input + // First split evently across ranks, while maintaining multiples of blocksize + size_t const perRankN = RoundUp((N + NUM_RANKS - 1) / NUM_RANKS, blockDim.x); + size_t const perBlockN = RoundUp((perRankN + nBlocks - 1) / nBlocks, blockDim.x); + size_t const currBlockStart = min((rank * nBlocks + blockId) * perBlockN, N); + size_t const currBlockStop = min(currBlockStart + perBlockN, N); + size_t const blockN = currBlockStop - currBlockStart; + + if (blockN > 0) + { + // Prepare input / output subarrays + T const** inputs = (T const**)cliquePtrs->inputs; + T** outputs = (T **)cliquePtrs->outputs; + T const* srcs[NUM_RANKS]; + T* dsts[NUM_RANKS]; + + #pragma unroll + for (int r = 0; r < NUM_RANKS; r++) + { + srcs[r] = inputs[r] + currBlockStart; + dsts[r] = outputs[r] + currBlockStart; + } + + // Perform the reduction + #define ALL_REDUCE_CLIQUE_UNROLL 2 + ReduceOrCopyMulti( + threadIdx.x, blockDim.x, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN); + } + + // Even if there was nothing for this GPU to do, it must participate in a barrier + // because other GPUs may be modifying this GPUs output buffer still + if (blockId == 0) WaitForBarrier(cliquePtrs->barrier); +} + +#endif diff --git a/projects/rccl/src/clique/CliqueCommon.h b/projects/rccl/src/clique/CliqueCommon.h new file mode 100644 index 0000000000..b7bdee19ae --- /dev/null +++ b/projects/rccl/src/clique/CliqueCommon.h @@ -0,0 +1,93 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef CLIQUE_COMMON_H +#define CLIQUE_COMMON_H + +#include "nccl.h" +#include + +#define MIN_CLIQUE_SIZE 2 +#define MAX_CLIQUE_SIZE 8 + +typedef struct +{ + int* globalCount; // Shared across GPUs + int* globalSense; // Shared across GPUs + int* localSense; // Local to this GPU +} gpuBarrier_t; + +typedef struct +{ + // Input/output pointers from participating ranks + void const* inputs[MAX_CLIQUE_SIZE]; + void* outputs[MAX_CLIQUE_SIZE]; + + // Barrier variable + gpuBarrier_t barrier; +} cliqueDevicePtrs_t; + +// Helper macro to launch an appropriate kernel by converting rank to a template argument +#define LAUNCH_CLIQUE_KERNEL(kernelname, FUNC, T, args) \ + { \ + switch (args->comm->nRanks){ \ + case 2: kernelname(args); break; \ + case 3: kernelname(args); break; \ + case 4: kernelname(args); break; \ + case 5: kernelname(args); break; \ + case 6: kernelname(args); break; \ + case 7: kernelname(args); break; \ + case 8: kernelname(args); break; \ + } \ + } + +// Multi-GPU (on same node) barrier. One thread per grid per GPU updates barrier / waits +template +__forceinline__ __device__ void WaitForBarrier(gpuBarrier_t const& barrier) +{ + if (threadIdx.x == 0) + { + // Sense inversion barrier + *barrier.localSense = 1 - *barrier.localSense; + int localSense = *barrier.localSense; + + int val = __atomic_add_fetch(barrier.globalCount, 1, __ATOMIC_SEQ_CST); + if (val == NUM_RANKS) + { + // Last arrival resets barrier + __atomic_store_n(barrier.globalCount, 0, __ATOMIC_SEQ_CST); + __atomic_store_n(barrier.globalSense, localSense, __ATOMIC_SEQ_CST); + } + else + { + // Wait for all ranks to reach barrier + while (__atomic_load_n(barrier.globalSense, __ATOMIC_SEQ_CST) != localSense); + } + } +} + +__forceinline__ __host__ __device__ size_t RoundUp(size_t X, size_t Y) +{ + return (X+Y-1)/Y * Y; +} + +#endif diff --git a/projects/rccl/src/clique/CliqueManager.cc b/projects/rccl/src/clique/CliqueManager.cc new file mode 100644 index 0000000000..3afd3cfa31 --- /dev/null +++ b/projects/rccl/src/clique/CliqueManager.cc @@ -0,0 +1,519 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "CliqueManager.h" +#include "CliqueShmNames.h" +#include "MsgQueue.h" + +#include "nccl.h" +#include "core.h" + +#include "Hash.h" + +#include "AllReduceCliqueKernel.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {}; +int* CliqueManager::m_staticGpuBarrierMem = NULL; + +// Define some environment variables that affect clique-based kernels +RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels +RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 2097152); // Max number of bytes to use clique-based kernels for all reduce +RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 4); // Number of channels to use for all-reduce + +CliqueManager::CliqueManager(int const rank, + int const numRanks, + cliqueMode_t const cliqueMode) : + m_rank(rank), + m_numRanks(numRanks), + m_cliqueMode(cliqueMode), + m_init(false), + m_pinnedCliquePtrs(NULL), + m_fineGrainBarrierMem(NULL) +{ +} + +CliqueManager::~CliqueManager() +{ + if (m_init) + { + CleanUp(); + } +} + +void CliqueManager::CleanUp() +{ + if (m_cliqueMode == CLIQUE_DISABLED) return; + + // Free variables that are shared between SINGLE_PROCESS / SINGLE_NODE + if (m_pinnedCliquePtrs) hipHostFree(m_pinnedCliquePtrs); + if (m_gpuBarrierLocalSense) hipFree(m_gpuBarrierLocalSense); + + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Release caches + if (m_ipcHandleSendCache) delete m_ipcHandleSendCache; + if (m_ipcHandleSendCache) delete m_ipcHandleRecvCache; + + // Close shared memory + m_shmHandles.Close(); + m_sharedCpuMemory.Close(); + m_sharedIpcHandle.Close(); + + if (m_fineGrainBarrierMem) + { + if (m_rank == 0) + hipFree(m_fineGrainBarrierMem); + else + hipIpcCloseMemHandle(m_fineGrainBarrierMem); + } + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + if (m_rank == 0 && m_staticGpuBarrierMem) + hipFree(m_staticGpuBarrierMem); + } + m_init = false; +} + +ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix) +{ + ncclResult_t res; + + if (m_init) return ncclSuccess; + m_init = true; + + if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + + // Check parameters + if (m_rank < 0 || m_rank >= m_numRanks) + { + WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks); + return ncclInvalidUsage; + } + if (commId == NULL) + { + WARN("CommId should not be empty"); + return ncclInvalidUsage; + } + + // For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var + if (!rcclParamEnableClique()) + { + INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)"); + m_cliqueMode = CLIQUE_DISABLED; + return ncclSuccess; + } + + // Allocate pinned CPU memory for holding clique pointers, which kernels will have access to + if (hipHostMalloc(&m_pinnedCliquePtrs, sizeof(cliqueDevicePtrs_t) * NCCL_MAX_OPS) != hipSuccess) + { + WARN("Unable to allocated pinned host memory for clique pointers. Disabling clique-based kernels"); + m_cliqueMode = CLIQUE_DISABLED; + m_init = true; + return ncclSuccess; + } + + unsigned long hash = djb2Hash(commId->internal); + std::string shmSuffix = std::to_string(hash) + "_" + std::to_string(suffix); + + // Allocate sense barrier variable on local GPU + NCCLCHECKGOTO(ncclCudaCalloc(&m_gpuBarrierLocalSense, NCCL_MAX_OPS * sizeof(int)), res, dropback); + + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Initialize shared memory file for IPC handles (based on commId hash) + m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix); + NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback); + + // Initialize IPC caches + m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS); + m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS, + 100, + hipIpcMemHandleHash, + hipIpcMemHandleEqual); + + // Initialize shared object for GPU barrier IPC handle + m_sharedIpcHandle = ShmObject(std::max(4096LU, sizeof(hipIpcMemHandle_t)), + CliqueShmNames["Barriers"] + shmSuffix, + m_rank, + m_numRanks, + hash); + NCCLCHECKGOTO(m_sharedIpcHandle.Open(), res, dropback); + + if (m_rank == 0) + { + hipIpcMemHandle_t handle; + // Allocate fine-grained device memory on rank 0 and get IPC handle for it + // Re-usable barrier consists of (globalCount / globalSense) pair of integers + NCCLCHECKGOTO(ncclCudaCalloc(&m_fineGrainBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), true), res, dropback); + if (hipIpcGetMemHandle(&handle, m_fineGrainBarrierMem) != hipSuccess) + { + WARN("Unable to get IPC handle for barrier memory"); + goto dropback; + } + // Write IPC handle to shared memory for other ranks to receive + *m_sharedIpcHandle.Get() = handle; + + // Set up global count/sense for first rank + m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0]; + m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS]; + } + + // Initialize shared CPU memory to be used for barrier variables + m_sharedCpuMemory = ShmObject(2 * sizeof(int32_t), + CliqueShmNames["SharedCounters"] + shmSuffix, + m_rank, + m_numRanks, + hash); + NCCLCHECKGOTO(m_sharedCpuMemory.Open(), res, dropback); + + // Split up the shared CPU memory for barrier counters / global sense + m_cpuBarrierGlobalCount = &m_sharedCpuMemory.Get()[0]; + m_cpuBarrierGlobalSense = &m_sharedCpuMemory.Get()[1]; + + // Initialize CPU barriers + if (m_rank == 0) + { + *m_cpuBarrierGlobalCount = 0; + *m_cpuBarrierGlobalSense = 0; + } + m_cpuBarrierLocalSense = 0; + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + // First rank prepares fine-grained memory shared across ranks used for the two barrier variables + if (m_rank == 0) + { + NCCLCHECKGOTO(ncclCudaCalloc(&m_staticGpuBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), true), res, dropback); + + // Prepare all barriers + for (int opIndex = 0; opIndex < NCCL_MAX_OPS; opIndex++) + { + m_staticCliquePtrs[opIndex].barrier.globalCount = &m_staticGpuBarrierMem[opIndex]; + m_staticCliquePtrs[opIndex].barrier.globalSense = &m_staticGpuBarrierMem[opIndex + NCCL_MAX_OPS];; + } + } + } + + + m_init = true; + INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d)", m_cliqueMode); + return ncclSuccess; + +dropback: + // NOTE: This currently assumes that all ranks will fail the same way + // Additional support is required to handle cases when some processes succeed while others fail + WARN("Unable to initialize shared memory. Disabling clique-based kernels"); + CleanUp(); + m_cliqueMode = CLIQUE_DISABLED; + return ncclSuccess; +} + +bool CliqueManager::IsSupported(ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op) const +{ + if (m_cliqueMode == CLIQUE_DISABLED) return false; + + // Filter based on total input size for each collective type + size_t totalBytes = count * ncclTypeSize(datatype); + if (coll == ncclCollAllReduce && (totalBytes <= rcclParamAllReduceCliqueByteLimit())) return true; + + return false; +} + +ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr) +{ + // Do nothing if disabled + if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + + if (!m_init) + { + WARN("CliqueManager must be initialized before use"); + return ncclInvalidUsage; + } + + int const opIndex = opCount % NCCL_MAX_OPS; + + // Add opIndex to queue of in-progress collectives + m_inProgress.push(opIndex); + + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Get fine-grained device memory if not already done + if (m_fineGrainBarrierMem == NULL) + { + hipIpcMemHandle_t handle = *m_sharedIpcHandle.Get(); + CUDACHECK(hipIpcOpenMemHandle((void**)&m_fineGrainBarrierMem, handle, hipIpcMemLazyEnablePeerAccess)); + + // Prepare global count/sense barrier variables used the ipc-shared gpu device memory + m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0]; + m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS]; + } + + std::vector> handles(NUM_HANDLES_PER_RANK); + + // Get IPC handles for input/output pointers from cache + NCCLCHECK(CheckCacheForPtr(const_cast(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0])); + NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1])); + + // Prepare barrier pointers (done after the IpcOpenMemory) + m_pinnedCliquePtrs[opIndex].barrier.globalCount = &m_gpuBarrierGlobalCount[opIndex]; + m_pinnedCliquePtrs[opIndex].barrier.globalSense = &m_gpuBarrierGlobalSense[opIndex]; + m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex]; + + // Write IPC handles to shared memory for given rank / opCount + NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles)); + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + // Store this rank's input/output pointers into static member + m_staticCliquePtrs[opIndex].inputs[m_rank] = inputPtr; + m_staticCliquePtrs[opIndex].outputs[m_rank] = outputPtr; + } + + return ncclSuccess; +} + +ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op, + int const totalNumChannels, + uint8_t* numChannelstoUse) +{ + size_t const totalBytes = count * ncclTypeSize(datatype); + *numChannelstoUse = 1; + + if (coll == ncclCollAllReduce) { + *numChannelstoUse = std::min((int)rcclParamAllReduceNumChannels(), totalNumChannels); + } + + return ncclSuccess; +} + + + +ncclResult_t CliqueManager::SetCliqueCollectiveArgs(CollectiveArgs* args) +{ + // Do nothing if disabled + if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + if (!m_init) + { + WARN("CliqueManager must be initialized before use"); + return ncclInvalidUsage; + } + + // Prepare clique argments (NOTE: clique pointers are not ready yet) + int opIndex = args->opCount % NCCL_MAX_OPS; + args->clique.ptrs = &m_pinnedCliquePtrs[opIndex]; + + + // Determine number of channels to use for this collective + args->clique.nChannels = rcclParamAllReduceNumChannels(); + + return ncclSuccess; +} + +ncclResult_t CliqueManager::WaitForPointers() +{ + // Do nothing if disabled + if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + + if (!m_init) + { + WARN("CliqueManager must be initialized before use"); + return ncclInvalidUsage; + } + + // Do nothing if there are no outstanding clique-kernels + if (m_inProgress.empty()) return ncclSuccess; + + // Copy clique device pointers to pinned device memory + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Wait for all ranks to arrive + WaitForBarrier(); + + int numHandles = m_numRanks * NUM_HANDLES_PER_RANK; + std::vector> handles(numHandles); + + while (!m_inProgress.empty()) + { + int const opIndex = m_inProgress.front(); + m_inProgress.pop(); + + // Collect the ready handles from shared memory and convert them to device pointers + NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles)); + for (int i = 0; i < m_numRanks; i++) + { + void *input; + NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK], + m_ipcHandleRecvCache, &input)); + m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast(input); + + NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1], + m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i])); + } + } + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + while (!m_inProgress.empty()) + { + int const opIndex = m_inProgress.front(); + m_inProgress.pop(); + + // Copy from static memory to pinned host memory and set local sense + memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t)); + m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex]; + } + } + return ncclSuccess; +} + +std::string HandleToString(hipIpcMemHandle_t handle) +{ + char mapping[17] = "0123456789ABCDEF"; + std::string result; + for (int i = 0; i < 4; i++) + { + unsigned char val = (unsigned char)handle.reserved[i]; + result += mapping[val / 16]; + result += mapping[val % 16]; + } + return result; +} + + +ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr, + NcclIpcHandleSendCache* cache, + int rank, + std::pair* handlePair) +{ + // Get the base address for this device allocation + hsa_status_t status; + hsa_amd_pointer_info_t info; + info.size = sizeof(hsa_amd_pointer_info_t); + status = hsa_amd_pointer_info(devPtr, &info, NULL, NULL, NULL); + if (status != HSA_STATUS_SUCCESS) { + WARN("Uanble to get pointer information for %p", devPtr); + return ncclInvalidArgument; + } + + // Compute the offset between the device addres and the base address + uint64_t baseAddr = (uint64_t)info.agentBaseAddress; + uint64_t realAddr = (uint64_t)devPtr; + handlePair->second = realAddr - baseAddr; + + // IPC handles are only supported for base address pointers + NcclIpcHandleSendCache::iterator it = cache->find(baseAddr); + + if (it == cache->end()) + { + CUDACHECK(hipIpcGetMemHandle(&handlePair->first, (void*)baseAddr)); + cache->insert(baseAddr, handlePair->first); + } + else + { + handlePair->first = (it->second).first; + } + return ncclSuccess; +} + +ncclResult_t CliqueManager::CheckCacheForHandle(std::pair const& handlePair, + NcclIpcHandleRecvCache* cache, + void** ptr) +{ + NcclIpcHandleRecvCache::iterator it = cache->find(handlePair.first); + + // Get base address pointer from cache if it exists + void* baseAddr; + if (it == cache->end()) + { + CUDACHECK(hipIpcOpenMemHandle(&baseAddr, handlePair.first, hipIpcMemLazyEnablePeerAccess)); + cache->insert(handlePair.first, baseAddr); + } + else + { + baseAddr = (it->second).first; + } + + // Modify base address pointer with offset + uint64_t realAddr = (uint64_t)baseAddr + handlePair.second; + *ptr = (void*)realAddr; + return ncclSuccess; +} + +void CliqueManager::WaitForBarrier() +{ + // Sense inversion barrier + m_cpuBarrierLocalSense = 1 - m_cpuBarrierLocalSense; + + if (__sync_add_and_fetch(m_cpuBarrierGlobalCount, 1) == m_numRanks) + { + // Reset the barrier + STORE(m_cpuBarrierGlobalCount, 0); + STORE(m_cpuBarrierGlobalSense, m_cpuBarrierLocalSense); + } else { + while (LOAD(m_cpuBarrierGlobalSense) != m_cpuBarrierLocalSense); + } +} + +ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash) +{ + for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + { + int msgid, fd; + std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid); + SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd); + NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid)); + SYSCHECK(close(fd), "close"); + } + + std::string shmDir = "/dev/shm/"; + + for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + { + struct stat fileStatus; + std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid); + std::string shmFullPath = shmDir + shmFileName; + + // Check if shm file already exists; if so, unlink it + if (stat(shmFullPath.c_str(), &fileStatus) == 0) + { + NCCLCHECK(shmUnlink(shmFileName.c_str())); + } + } + return ncclSuccess; +} diff --git a/projects/rccl/src/clique/CliqueManager.h b/projects/rccl/src/clique/CliqueManager.h new file mode 100644 index 0000000000..bf8f028e58 --- /dev/null +++ b/projects/rccl/src/clique/CliqueManager.h @@ -0,0 +1,128 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef RCCL_CLIQUE_MANAGER_HPP_ +#define RCCL_CLIQUE_MANAGER_HPP_ + +#include +#include +#include + +#include "nccl.h" +#include "devcomm.h" +#include "CliqueCommon.h" +#include "HandleCache.h" +#include "HandleShm.h" + +#define NUM_HANDLES_PER_RANK 2 + +class CliqueManager +{ +public: + typedef enum + { + CLIQUE_DISABLED = 0, + CLIQUE_SINGLE_PROCESS = 1, + CLIQUE_SINGLE_NODE = 2 + } cliqueMode_t; + + CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode); + + ~CliqueManager(); + + void CleanUp(); + + ncclResult_t Init(ncclUniqueId const* commId, int suffix); + + // Returns true if the collective is supported via a clique-based kernel + bool IsSupported(ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op) const; + + // Provide the pointers to be exchanged across the clique for the given rank / opCount + ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr); + + // Determine the number of channels / CUs to use for this call + ncclResult_t GetNumChannelsToUse(ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op, + int const totalNumChannels, + uint8_t* numChannelstoUse); + + // Set pointers for where clique-related arguments will be found + // This sets pointers to device-accessible memory where the arguments will eventually reside + ncclResult_t SetCliqueCollectiveArgs(CollectiveArgs* args); + + // Blocking call that only returns after all out-standing clique pointers are ready + ncclResult_t WaitForPointers(); + + // Prepares shared memory files upon initialization + static ncclResult_t BootstrapRootInit(int pid, unsigned long hash); + +protected: + ncclResult_t CheckCacheForPtr(void* devPtr, + NcclIpcHandleSendCache* cache, + int rank, + std::pair* handlePair); + + ncclResult_t CheckCacheForHandle(std::pair const& handlePair, + NcclIpcHandleRecvCache* cache, + void** ptr); + + // Race-condition helper functions + void WaitForBarrier(); + + int m_rank; // Associated rank + int m_numRanks; // Total number of ranks + cliqueMode_t m_cliqueMode; // Clique mode (off/single process/single node) + bool m_init; // Whether CliqueManager has been initialized + cliqueDevicePtrs_t* m_pinnedCliquePtrs; // Pinned-host-memory (device accessible) containing device pointers + int* m_gpuBarrierGlobalCount; // Part of GPU barrier (count variable shared across ranks) + int* m_gpuBarrierGlobalSense; // Part of GPU barrier (reset variable shared across ranks) + int* m_gpuBarrierLocalSense; // Part of GPU barrier (reset variable local to this rank) + std::queue m_inProgress; // Queue of clique-based collectives waiting for pointers + + // IPC-related (CLIQUE_SINGLE_NODE) + NcclIpcHandleShm m_shmHandles; // Used to exchange IPC handles between ranks + NcclIpcHandleSendCache* m_ipcHandleSendCache; // Caches pointers to IPC handles (to send to other processes) + NcclIpcHandleRecvCache* m_ipcHandleRecvCache; // Caches IPC handles to pointers (received from other processes) + ShmObject m_sharedCpuMemory; // Used to pass shared memory used for CPU barrier + ShmObject m_sharedIpcHandle; // Used to pass fine-grained device memory buffer IPC handle + int* m_fineGrainBarrierMem; // Fine-grained GPU memory barrier (allocated only on 1st rank, shared on others) + int* m_cpuBarrierGlobalCount; // Part of CPU barrier (count variable shared across ranks) + int* m_cpuBarrierGlobalSense; // Part of CPU barrier (reset variable shared across ranks) + int m_cpuBarrierLocalSense; // Part of CPU barrier (reset variable local to this rank) + + // Single-process (CLIQUE_SINGLE_PROCESS) + static cliqueDevicePtrs_t m_staticCliquePtrs[NCCL_MAX_OPS]; // Use shared static memory to exchange pointer info + static int* m_staticGpuBarrierMem; // Static storage backing for fine-grained gpu barrier +}; + +// For use in bootstrapping code +struct bootstrapRootStruct { + void* listenComm; + unsigned long hash; +}; + +#endif diff --git a/projects/rccl/src/clique/CliqueShmNames.h b/projects/rccl/src/clique/CliqueShmNames.h new file mode 100644 index 0000000000..577af8be8d --- /dev/null +++ b/projects/rccl/src/clique/CliqueShmNames.h @@ -0,0 +1,37 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_CLIQUE_SHM_NAMES_H_ +#define NCCL_CLIQUE_SHM_NAMES_H_ + +#include +#include + +static std::map CliqueShmNames = +{ + {"SharedCounters", "RcclCounters" }, + {"Mutexes" , "RcclMutexes" }, + {"IpcHandles" , "RcclIpcHandles"}, + {"Barriers" , "RcclBarriers" } +}; + +#endif diff --git a/projects/rccl/src/clique/HandleCache.cc b/projects/rccl/src/clique/HandleCache.cc new file mode 100644 index 0000000000..8fe4b6ab4b --- /dev/null +++ b/projects/rccl/src/clique/HandleCache.cc @@ -0,0 +1,31 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "HandleCache.h" + +#include "Hash.h" + +// djb2 hash function for hashing char array in hipIpcMemHandle_t +unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle) +{ + return djb2Hash(handle.reserved); +} diff --git a/projects/rccl/src/clique/HandleCache.h b/projects/rccl/src/clique/HandleCache.h new file mode 100644 index 0000000000..dc479e00e8 --- /dev/null +++ b/projects/rccl/src/clique/HandleCache.h @@ -0,0 +1,142 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_HANDLE_CACHE_H_ +#define NCCL_HANDLE_CACHE_H_ + +#include +#include +#include + +#include "core.h" + +//#include "llvm/ADT/DenseMap.h" + +template < + class Key, + class Value, + class Hash, + class KeyEqual, + class Allocator +> +class NcclIpcHandleCache +{ +public: + typedef std::pair::iterator> NcclIpcHandleCacheValueType; + typedef std::unordered_map LRUCache; + using iterator = typename LRUCache::iterator; + NcclIpcHandleCache(size_t size, + size_t bucket_count = 100, + const Hash& hash = Hash(), + const KeyEqual& eql = KeyEqual(), + const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc) + { + m_capacity = size; + } + + ~NcclIpcHandleCache() + { + m_lruHistory.clear(); + m_cache.clear(); + } + + iterator begin() + { + return m_cache.begin(); + } + + iterator end() + { + return m_cache.end(); + } + + iterator find(const Key& key) + { + iterator it = m_cache.find(key); + if (it != m_cache.end()) + { + updateHistory(it); + } + + return it; + } + + std::pair insert(const Key& key, const Value& value) + { + if (m_cache.size() == m_capacity) + { + // remove entry + pop(); + } + + typename LRUCache::iterator it = m_cache.find(key); + bool inserted; + if (it == m_cache.end()) + { + typename std::list::iterator it = m_lruHistory.insert(m_lruHistory.end(), key); + m_cache.insert(std::make_pair(key, std::make_pair(value, it))); + inserted = true; + } + else + { + inserted = false; + } + + return std::pair(it, inserted); + } + +private: + void pop() + { + typename LRUCache::iterator it = m_cache.find(m_lruHistory.front()); + m_cache.erase(it); + m_lruHistory.pop_front(); + } + + void updateHistory(const iterator& it) + { + if (m_lruHistory.size() > 0) + { + m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, (it->second).second); + } + } + size_t m_capacity; + std::list m_lruHistory; + LRUCache m_cache; +}; + +// djb2 hash function for hashing char array in hipIpcMemHandle_t +unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle); + +// equality function required for unordered_map +auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r) +{ + return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0; +}; + +//typedef llvm::DenseMap SendCache; +//typedef llvm::DenseMap RecvCache; + +typedef NcclIpcHandleCache, std::equal_to, std::allocator< std::pair::iterator>>>> NcclIpcHandleSendCache; +typedef NcclIpcHandleCache::iterator>>>> NcclIpcHandleRecvCache; + +#endif diff --git a/projects/rccl/src/clique/HandleShm.cc b/projects/rccl/src/clique/HandleShm.cc new file mode 100644 index 0000000000..937390cf20 --- /dev/null +++ b/projects/rccl/src/clique/HandleShm.cc @@ -0,0 +1,67 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include + +#include "HandleShm.h" +#include "CliqueShmNames.h" +#include "core.h" +#include "Hash.h" +#include "shm.h" + +NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix) : + ShmObject>(numRanks * numHandlesPerRank * capacity * sizeof(std::pair), + CliqueShmNames["IpcHandles"] + suffix, + rank, + numRanks, + projid), + m_numHandlesPerRank(numHandlesPerRank), + m_numHandlesPerOpCount(numRanks * numHandlesPerRank) +{ +} + +NcclIpcHandleShm::NcclIpcHandleShm() +{ +} + +NcclIpcHandleShm::~NcclIpcHandleShm() +{ +} + +ncclResult_t NcclIpcHandleShm::Open() +{ + return ShmObject::Open(); +} + +ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector> const& sendHandles) +{ + size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank); + memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(std::pair) * m_numHandlesPerRank); + return ncclSuccess; +} + +ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector>& recvHandles) +{ + size_t idx = opCount * m_numHandlesPerOpCount; + memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(std::pair)); + return ncclSuccess; +} diff --git a/projects/rccl/src/clique/HandleShm.h b/projects/rccl/src/clique/HandleShm.h new file mode 100644 index 0000000000..c681de0eb4 --- /dev/null +++ b/projects/rccl/src/clique/HandleShm.h @@ -0,0 +1,53 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_IPC_HANDLE_SHM_H_ +#define NCCL_IPC_HANDLE_SHM_H_ + +#include +#include +#include + +#include "nccl.h" +#include "ShmObject.h" + +class NcclIpcHandleShm : public ShmObject> +{ +public: + NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix); + + NcclIpcHandleShm(); + + ~NcclIpcHandleShm(); + + ncclResult_t Open(); + + ncclResult_t WriteHandles(uint64_t opCount, std::vector> const& sendHandles); + + ncclResult_t ReadHandles(uint64_t opCount, std::vector>& recvHandles); + +private: + int m_numHandlesPerRank; + int m_numHandlesPerOpCount; +}; + +#endif diff --git a/projects/rccl/src/clique/Hash.cc b/projects/rccl/src/clique/Hash.cc new file mode 100644 index 0000000000..249c66b329 --- /dev/null +++ b/projects/rccl/src/clique/Hash.cc @@ -0,0 +1,34 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "Hash.h" + +unsigned long djb2Hash(const char* data) +{ + unsigned long hash = 5381; + int c; + + while ((c = *(data)++)) + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + + return hash; +} diff --git a/projects/rccl/src/clique/Hash.h b/projects/rccl/src/clique/Hash.h new file mode 100644 index 0000000000..e6cbbaa569 --- /dev/null +++ b/projects/rccl/src/clique/Hash.h @@ -0,0 +1,28 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_HASH_H_ +#define NCCL_HASH_H_ + +unsigned long djb2Hash(const char* data); + +#endif diff --git a/projects/rccl/src/clique/MsgQueue.cc b/projects/rccl/src/clique/MsgQueue.cc new file mode 100644 index 0000000000..716c449e8f --- /dev/null +++ b/projects/rccl/src/clique/MsgQueue.cc @@ -0,0 +1,72 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "MsgQueue.h" + +#include +#include + +#define MSG_QUEUE_PERM 0666 + +ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid) +{ + key_t key; + SYSCHECKVAL(ftok(name.c_str(), projid), "ftok", key); + int flag = (exclusive == true ? IPC_CREAT | IPC_EXCL : IPC_CREAT); + + msgid = msgget(key, MSG_QUEUE_PERM | flag); + // Check if we're trying to create message queue and it already exists; if so, delete existing queue + if (msgid == -1 && exclusive == true && errno == EEXIST) + { + NCCLCHECK(MsgQueueClose(name, projid)); + SYSCHECKVAL(msgget(key, MSG_QUEUE_PERM | flag), "msgget", msgid); + } + else if (msgid == -1) + { + WARN("Call to MsgQueueGetId failed : %s", strerror(errno)); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg) +{ + SYSCHECK(msgsnd(msgid, msgp, msgsz, msgflg), "msgsnd"); + return ncclSuccess; +} + +ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait) +{ + int msgflg = (wait == false ? IPC_NOWAIT : 0); + SYSCHECK(msgrcv(msgid, msgp, msgsz, msgtyp, msgflg), "msgrcv"); + return ncclSuccess; +} + +ncclResult_t MsgQueueClose(std::string name, int projid) +{ + key_t key; + int msgid; + key = ftok(name.c_str(), projid); + SYSCHECKVAL(msgget(key, IPC_CREAT), "msgget", msgid); + SYSCHECK(msgctl(msgid, IPC_RMID, NULL), "msgctl"); + return ncclSuccess; +} diff --git a/projects/rccl/src/clique/MsgQueue.h b/projects/rccl/src/clique/MsgQueue.h new file mode 100644 index 0000000000..346208a6e8 --- /dev/null +++ b/projects/rccl/src/clique/MsgQueue.h @@ -0,0 +1,42 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef RCCL_MSG_QUEUE_HPP_ +#define RCCL_MSG_QUEUE_HPP_ + +#include + +#include "nccl.h" +#include "core.h" + +struct MsgBuffer +{ + long msg_type; + char msg_text[1]; +}; + +ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid); +ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg); +ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait); +ncclResult_t MsgQueueClose(std::string name, int projid); + +#endif diff --git a/projects/rccl/src/clique/SharedMemHelper.h b/projects/rccl/src/clique/SharedMemHelper.h new file mode 100644 index 0000000000..6fc269e7fc --- /dev/null +++ b/projects/rccl/src/clique/SharedMemHelper.h @@ -0,0 +1,43 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef SHAREDMEMHELPER_H +#define SHAREDMEMHELPER_H + + +class SharedMemHelper +{ +public: + SharedMemHelper(int rank, int numRanks, int numEntries); + + ncclStatus_t Init(std::string const& baseFilename); + + ncclStatus_t + + +protected: + bool m_initialized; + int m_rank; + int m_numRanks; +}; + +#endif diff --git a/projects/rccl/src/clique/ShmObject.cc b/projects/rccl/src/clique/ShmObject.cc new file mode 100644 index 0000000000..353779e41d --- /dev/null +++ b/projects/rccl/src/clique/ShmObject.cc @@ -0,0 +1,45 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "ShmObject.h" +#include + +// Template specializations for sem_t objects which require additional initialization +template<> +ncclResult_t ShmObject::Close() +{ + size_t numMutexes = m_shmSize / sizeof(sem_t); + + for (size_t i = 0; i < numMutexes; i++) + { + sem_destroy(static_cast(&m_shmPtr[i])); + } + + int retVal = shm_unlink(m_shmName.c_str()); + if (retVal == -1 && errno != ENOENT) + { + WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno)); + return ncclSystemError; + } + + return ncclSuccess; +} diff --git a/projects/rccl/src/clique/ShmObject.h b/projects/rccl/src/clique/ShmObject.h new file mode 100644 index 0000000000..f458014f5e --- /dev/null +++ b/projects/rccl/src/clique/ShmObject.h @@ -0,0 +1,210 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_SHM_OBJECT_H_ +#define NCCL_SHM_OBJECT_H_ + +#include +#include +#include +#include +#include +#include + +#include "MsgQueue.h" +#include "nccl.h" +#include "core.h" +#include "shm.h" + +// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared +// memory object at the same time. + +static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) { + *fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR); + if (*fd == -1) return ncclSystemError; + if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate"); + SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap"); + close(*fd); + *fd = -1; + if (create) memset(*ptr, 0, shmsize); + return ncclSuccess; +} + +template +class ShmObject +{ +public: +ShmObject(size_t size, std::string fileName, int rank, int numRanks, int projid) : + m_shmSize(size), + m_shmName(fileName), + m_rank(rank), + m_numRanks(numRanks), + m_projid(projid), + m_alloc(false), + m_shmPtr(nullptr) {} + + ShmObject() {} + + ~ShmObject() {} + + ncclResult_t Open(); + + ncclResult_t Close() + { + if (m_alloc) + { + if (m_rank == 0) + { + std::string tmpFileName = "/tmp/" + m_shmName; + remove(tmpFileName.c_str()); + } + int retVal = shm_unlink(m_shmName.c_str()); + if (retVal == -1 && errno != ENOENT) + { + WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno)); + return ncclSystemError; + } + } + return ncclSuccess; + } + + T*& Get() + { + return m_shmPtr; + } +protected: + ncclResult_t BroadcastMessage(int msgid, bool pass) + { + MsgBuffer msg; + msg.msg_text[0] = (pass == 0 ? 'F': 'P'); + for (int rank = 0; rank < m_numRanks; rank++) + { + if (rank == m_rank) continue; + msg.msg_type = rank; + NCCLCHECK(MsgQueueSend(msgid, &msg, sizeof(msg), 0)); + } + return ncclSuccess; + } + + // tag for dispatch + template + struct OpenTag{}; + + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag> tag); + + size_t m_shmSize; + std::string m_shmName; + int m_rank; + int m_numRanks; + int m_projid; + bool m_alloc; + T* m_shmPtr; +}; + +template +ncclResult_t ShmObject::Open() +{ + if (m_alloc == false) + { + int shmFd; + int protection = PROT_READ | PROT_WRITE; + int visibility = MAP_SHARED; + + int msgid; + std::string tmpFileName = "/tmp/" + m_shmName; + NCCLCHECK(MsgQueueGetId(tmpFileName, m_projid, false, msgid)); + if (m_rank == 0) + { + ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1); + ncclResult_t resultSemInit = InitIfSemaphore(OpenTag{}); + if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess)) + { + NCCLCHECK(BroadcastMessage(msgid, false)); + WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno)); + return ncclSystemError; + } + NCCLCHECK(BroadcastMessage(msgid, true)); + } + else + { + MsgBuffer msg; + NCCLCHECK(MsgQueueRecv(msgid, &msg, sizeof(msg), m_rank, true)); + if (msg.msg_text[0] == 'P') + { + NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0)); + } + else + { + WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno)); + return ncclSystemError; + } + } + m_alloc = true; + } + else + { + WARN("Cannot allocate ShmObject twice.\n"); + return ncclInvalidUsage; + } + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag> tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + size_t numMutexes = m_shmSize / sizeof(sem_t); + + for (size_t i = 0; i < numMutexes; i++) + { + SYSCHECK(sem_init(static_cast(&m_shmPtr[i]), 1, 1), "sem_init"); + } + return ncclSuccess; +} +#endif diff --git a/projects/rccl/src/collectives/device/all_reduce.h b/projects/rccl/src/collectives/device/all_reduce.h index 3af59a4c14..15d0c6768f 100644 --- a/projects/rccl/src/collectives/device/all_reduce.h +++ b/projects/rccl/src/collectives/device/all_reduce.h @@ -8,6 +8,7 @@ #include "devcomm.h" #include "primitives.h" #include "collectives.h" +#include "clique/AllReduceCliqueKernel.h" // [RCCL] AllReduce Clique-based kernel support template __attribute__((noinline)) @@ -310,6 +311,7 @@ __device__ void ncclAllReduceTreeLLKernel(struct CollectiveArgs* args) { const ssize_t loopSize = nChannels*chunkSize; const ssize_t size = args->coll.count; + if (loopSize > size) { chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; } @@ -417,76 +419,10 @@ __device__ void ncclAllReduceCollNetLLKernel(struct CollectiveArgs* args) { template __attribute__((noinline)) __device__ void ncclAllReduceRingLL128Kernel(struct CollectiveArgs* args) { - const int tid = threadIdx.x; - const int nthreads = args->coll.nThreads; - const int bid = args->coll.bid; - const int nChannels = args->coll.nChannels; - struct ncclDevComm* comm = args->comm; - struct ncclChannel* channel = comm->channels+blockIdx.x; - struct ncclRing* ring = &channel->ring; - const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS); - ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T)); - // We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere. - const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2; - const int nranks = comm->nRanks; - const ssize_t loopSize = nChannels*nranks*chunkSize; - const ssize_t size = args->coll.count; - ncclLL128Primitives LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm); - - // Compute pointers - const T * __restrict__ thisInput = (const T*)args->sendbuff; - T * __restrict__ thisOutput = (T*)args->recvbuff; - - for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { - chunkSize = min(DIVUP(size-gridOffset, nChannels*nranks*minChunkSize)*minChunkSize, chunkSize); - - /////////////// begin AllReduce steps /////////////// - ssize_t offset; - int nelem; - int chunk; - - // step 0: push data to next GPU - chunk = ring->devUserRanks[nranks-1]; - offset = gridOffset + (chunk*nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.send(thisInput+offset, nelem); - - // k-2 steps: reduce and copy to next GPU - for (int j=2; jdevUserRanks[nranks-j]; - offset = gridOffset + (chunk*nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.recvReduceSend(thisInput+offset, nelem); - } - - // step k-1: reduce this buffer and data, which will produce the final - // result that we store in this data and push to the next GPU - chunk = ring->devUserRanks[0]; - offset = gridOffset + (chunk*nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem); - - // k-2 steps: copy to next GPU - for (int j=1; jdevUserRanks[nranks-j]; - offset = gridOffset + (chunk*nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - LLprims.recvCopySend(thisOutput+offset, nelem); - } - - // Make final copy from buffer to dest. - chunk = ring->devUserRanks[1]; - offset = gridOffset + (chunk*nChannels+bid) * chunkSize; - nelem = min(chunkSize, size-offset); - - // Here we need to copy from buffer to this output. - LLprims.recv(thisOutput+offset, nelem); - } + // [RCCL] RingLL128 is re-purposed as clique-based kernel + LAUNCH_CLIQUE_KERNEL(AllReduceCliqueSplitKernel, FUNC, T, args); + // [/RCCL] } template @@ -507,6 +443,7 @@ __device__ void ncclAllReduceTreeLL128Kernel(struct CollectiveArgs* args) { int nthreadsSplit = NCCL_LL128_SPLIT(nthreads); const ssize_t size = args->coll.count; + if (loopSize > size) { chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize; } diff --git a/projects/rccl/src/collectives/device/common.h b/projects/rccl/src/collectives/device/common.h index f13957dd7e..efcdf75276 100644 --- a/projects/rccl/src/collectives/device/common.h +++ b/projects/rccl/src/collectives/device/common.h @@ -89,13 +89,45 @@ static inline __device__ void exitIfAbortBarrier(int abort) { NCCL_FUNCS3B(coll, copy), \ NCCL_FUNCS3B(coll, copy) +// [RCCL] Adding clique-based kernels for AllReduce, in-place of unused RingLL28 kernels +#define NCCL_FUNC5B(coll, op, dtype) \ + NCCL_COLL_NAME(coll##LL, op, dtype), \ + NCCL_COLL_NAME(coll##LL128, op, dtype), \ + NCCL_COLL_NAME(coll, op, dtype) + +#define NCCL_FUNC4B(coll, op, dtype) \ + NCCL_FUNC5(coll##Tree, op, dtype), \ + NCCL_FUNC5B(coll##Ring, op, dtype), \ + NCCL_FUNC5(coll##CollNet, op, dtype) + +#define NCCL_FUNCS3C(coll, op) \ + NCCL_FUNC4B(coll, op, i8), \ + NCCL_FUNC4B(coll, op, u8), \ + NCCL_FUNC4B(coll, op, i32), \ + NCCL_FUNC4B(coll, op, u32), \ + NCCL_FUNC4B(coll, op, i64), \ + NCCL_FUNC4B(coll, op, u64), \ + NCCL_FUNC4B(coll, op, f16), \ + NCCL_FUNC4B(coll, op, f32), \ + NCCL_FUNC4B(coll, op, f64), \ + NCCL_FUNC4B(coll, op, b16) + +#define NCCL_FUNCS2C(coll) \ + NCCL_FUNCS3C(coll, sum ), \ + NCCL_FUNCS3C(coll, prod), \ + NCCL_FUNCS3C(coll, max ), \ + NCCL_FUNCS3C(coll, min ) + +// [/RCCL] + + // Must be consistent with ncclFunc_t #define NCCL_FUNCS() { \ NCCL_FUNCS2B(ncclBroadcast), \ NCCL_FUNCS2A(ncclReduce), \ NCCL_FUNCS2B(ncclAllGather), \ NCCL_FUNCS2A(ncclReduceScatter), \ - NCCL_FUNCS2A(ncclAllReduce), \ + NCCL_FUNCS2C(ncclAllReduce), \ NCCL_COLL_NAME(ncclGather, copy, i8), \ NCCL_COLL_NAME(ncclScatter, copy, i8), \ NCCL_COLL_NAME(ncclAllToAll, copy, i8), \ @@ -114,7 +146,7 @@ static const __device__ constexpr ncclKernelFunc_t ncclFuncs[]{ NCCL_FUNCS2A(ncclReduce), NCCL_FUNCS2B(ncclAllGather), NCCL_FUNCS2A(ncclReduceScatter), - NCCL_FUNCS2A(ncclAllReduce), + NCCL_FUNCS2C(ncclAllReduce), NCCL_COLL_NAME(ncclGather, copy, i8), NCCL_COLL_NAME(ncclScatter, copy, i8), NCCL_COLL_NAME(ncclAllToAll, copy, i8), diff --git a/projects/rccl/src/collectives/device/common_kernel.h b/projects/rccl/src/collectives/device/common_kernel.h index c5092cf52a..efa0b7a1aa 100644 --- a/projects/rccl/src/collectives/device/common_kernel.h +++ b/projects/rccl/src/collectives/device/common_kernel.h @@ -350,9 +350,14 @@ __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(int32_t); } __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(Pack128); } #endif +#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) +// Multiply UNROLL by 2 if single source/single destination +#define AUTOUNROLL (UNROLL*((MINSRCS==1 && MINDSTS==1) ? 2 : 1)) +#else // Try to limit consecutive load/stores to 8. // Use UNROLL 8 when we have a single source and a single destination, 4 otherwise #define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS))) +#endif template __device__ void ReduceOrCopyMulti(const int tid, const int nthreads, diff --git a/projects/rccl/src/enqueue.cc b/projects/rccl/src/enqueue.cc index 7f52e089ce..ff5d982b1e 100644 --- a/projects/rccl/src/enqueue.cc +++ b/projects/rccl/src/enqueue.cc @@ -9,6 +9,8 @@ #include "argcheck.h" #include "coll_net.h" #include "../graph/topo.h" +#include +#include // Only generate inline kernels for LL #define NCCL_FUNC5(coll, op, dtype) \ @@ -116,6 +118,10 @@ ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) { STORE(&channel->collectives[(channel->collStart+channel->collCount-1)%NCCL_MAX_OPS].active, 2); } + { // [RCCL] Wait for any clique-based collectives + NCCLCHECK(comm->cliqueManager->WaitForPointers()); + } // [/RCCL] + // Find the first operation, choose the kernel accordingly and pass it // as the first argument. struct ncclColl* coll = comm->channels[0].collectives+comm->channels[0].collStart; @@ -210,7 +216,8 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) { (comm->launchMode == ncclComm::GROUP && comm->groupCudaStream) ? "/Stream" : ""); } - + hipEvent_t startEvent; + hipEvent_t stopEvent; if (comm->launchMode == ncclComm::PARALLEL) { hipLaunchKernelGGL(((void (*)(struct ncclDevComm*))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclDevComm ***)(params->args))); } else { @@ -257,6 +264,7 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) { info->algorithm = -1; info->protocol = -1; int nAlgos = NCCL_NUM_ALGORITHMS; + // Check collNet support int collNetTypeSupport = 0; if (info->comm->collNetSupport) @@ -373,6 +381,7 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo #endif return ncclSuccess; } + // Set nstepsPerLoop and nchunksPerLoop NCCLCHECK(getAlgoInfo(info)); NCCLCHECK(getPatternInfo(info)); @@ -391,6 +400,33 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo coll->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol); + { // [RCCL] Check for clique-based kernel support + if (info->comm->cliqueManager->IsSupported(info->coll, + info->count, + info->datatype, + info->op)) + { + // Declare the input / output pointers being used (to exchange via IPC with other ranks) + NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount, + info->sendbuff, + info->recvbuff)); + + + info->algorithm = NCCL_ALGO_RING; + info->protocol = NCCL_PROTO_CLIQUE; + // Determine the number of channels to use for clique-kernel + NCCLCHECK(info->comm->cliqueManager->GetNumChannelsToUse(info->coll, + info->count, + info->datatype, + info->op, + info->comm->nChannels, + &coll->args.clique.nChannels)); + coll->args.clique.count = info->count; + coll->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol); + return ncclSuccess; + } + } // [RCCL] + int stepSize = info->comm->buffSizes[info->protocol]/NCCL_STEPS; int chunkSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->chunkSteps : 1; int sliceSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->sliceSteps : 1; @@ -478,6 +514,7 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) { info->comm->myParams->blockDim.x = std::max(info->comm->myParams->blockDim.x, info->nThreads); int nChannels = info->coll == ncclCollSendRecv ? 1 : coll.args.coll.nChannels; + int nSubChannels = (info->pattern == ncclPatternCollTreeUp || info->pattern == ncclPatternCollTreeDown) ? 2 : 1; for (int bid=0; bidargs.a2av.extra+info->comm->nRanks*2, info->recvcounts, sizeof(size_t*)*(info->comm->nRanks)); memcpy(c->args.a2av.extra+info->comm->nRanks*3, info->rdispls, sizeof(size_t*)*(info->comm->nRanks)); c->args.a2av.bid = bid % coll.args.coll.nChannels; - } else if (info->coll != ncclCollSendRecv) + } else if (info->coll != ncclCollSendRecv) { c->args.coll.bid = bid % coll.args.coll.nChannels; + } + + // [RCCL] Setup pointers to where all the input/output pointers will be + if (info->protocol == NCCL_PROTO_CLIQUE) { + NCCLCHECK(info->comm->cliqueManager->SetCliqueCollectiveArgs(&c->args)); + } + // [/RCCL] STORE(&c->active, 1); opIndex = (opIndex+1)%NCCL_MAX_OPS; @@ -599,6 +643,7 @@ ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) { } else { NCCLCHECKGOTO(ncclSaveKernel(info), ret, end); } + end: if (savedDev != -1) CUDACHECK(hipSetDevice(savedDev)); ncclAsyncErrCheck(ret); diff --git a/projects/rccl/src/include/bootstrap.h b/projects/rccl/src/include/bootstrap.h index a7d6be965e..aa671b7066 100644 --- a/projects/rccl/src/include/bootstrap.h +++ b/projects/rccl/src/include/bootstrap.h @@ -12,7 +12,7 @@ ncclResult_t bootstrapNetInit(); ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv); ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out); -ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState); +ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState, int* rootPid); // [RCCL] Adding rootPid ncclResult_t bootstrapAllGather(void* commState, void* allData, int size); ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size); ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size); diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index 6df4f750d3..d527e83801 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -10,6 +10,9 @@ #include "transport.h" #include "p2p.h" +// [RCCL] +#include "clique/CliqueManager.h" +// [/RCCL] #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) #else @@ -143,8 +146,12 @@ struct ncclComm { //list of async p2p operation queued in a group semantics struct ncclP2Plist p2plist; - // RCCL AllToAll/Scatter/Gather API - bool alltoallDisable; + // [RCCL] + bool alltoallDisable; // RCCL AllToAll/Scatter/Gather API + CliqueManager* cliqueManager; // CliqueManager handles pointer collection / distribution for clique-based kernels + int rootPid; // Process ID of root + // [/RCCL] + }; #endif diff --git a/projects/rccl/src/include/devcomm.h b/projects/rccl/src/include/devcomm.h index f7b870c5e5..d9c3b02ff2 100644 --- a/projects/rccl/src/include/devcomm.h +++ b/projects/rccl/src/include/devcomm.h @@ -12,6 +12,9 @@ #include "rccl_bfloat16.h" #include "align.h" #include +// [RCCL] Support for clique-based kernels +#include "clique/CliqueCommon.h" +// [/RCCL] // Convert volatile access to atomic #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) @@ -22,6 +25,7 @@ #define STORE(DST, SRC) *(DST) = (SRC) #endif + #define NCCL_NUM_FUNCTIONS 5 // SendRecv not included for now typedef enum { ncclCollBroadcast, ncclCollReduce, ncclCollAllGather, ncclCollReduceScatter, ncclCollAllReduce, ncclCollGather, ncclCollScatter, ncclCollAllToAll, ncclCollAllToAllv, ncclCollSendRecv} ncclFunc_t; extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+4]; @@ -35,6 +39,7 @@ extern const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS]; #define NCCL_NUM_PROTOCOLS 3 // Simple/LL/LL128 #define NCCL_PROTO_LL 0 #define NCCL_PROTO_LL128 1 +#define NCCL_PROTO_CLIQUE 1 // [RCCL] Clique takes up same protocol as unused LL128 #define NCCL_PROTO_SIMPLE 2 extern const char* ncclProtoStr[NCCL_NUM_PROTOCOLS]; @@ -190,8 +195,18 @@ struct CollectiveArgs { size_t count; size_t* extra; } a2av; + // [RCCL] Clique-based arguments + struct { + uint16_t nThreads; + uint8_t bid; + uint8_t nChannels; + size_t count; + cliqueDevicePtrs_t* ptrs; + } clique; + // [/RCCL] }; }; + struct ncclColl { union { struct { diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index 98c41b68e1..3c67b78708 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -28,6 +28,10 @@ #include #include "graph/topo.h" +// [RCCL] +#include "clique/CliqueManager.h" +// [/RCCL] + #define STR2(v) #v #define STR(v) STR2(v) @@ -300,6 +304,7 @@ static ncclResult_t commFree(ncclComm_t comm) { } RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 1); +RCCL_PARAM(ForceEnableClique, "FORCE_ENABLE_CLIQUE", 0); static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) { if (ndev < 1) { @@ -678,7 +683,10 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm int nranks = comm->nRanks; uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES); TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks); - NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap)); + // [RCCL] Collect the PID of the root + int rootPid; + NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap, &rootPid)); + // [/RCCL] // AllGather1 - begin struct { @@ -1016,8 +1024,53 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm return ncclInternalError; } NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, allGather1Data[intraRank0].comm)); + + { // [RCCL] Check if clique-based kernels can be enabled and initialize CliqueManager if so + CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED; + if (comm->localRanks == comm->nRanks) + { + // Check that all the GPUs have peer access to one another + bool hasPeerAccess = true; + for (int i = 0; i < nranks && hasPeerAccess; i++) + { + int cudaDev1 = allGather1Data[i].peerInfo.cudaDev; + for (int j = 0; j < nranks; j++) + { + if (i == j) continue; + int cudaDev2 = allGather1Data[j].peerInfo.cudaDev; + int p2p; + if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p) + { + hasPeerAccess = false; + break; + } + } + } + if (hasPeerAccess) + { + if (intraRanks == nranks) + cliqueMode = CliqueManager::CLIQUE_SINGLE_PROCESS; + else + cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE; + } + + // For now, only enable clique-based kernels on CR8_G topologies, unless explicitly asked + if (!rcclParamForceEnableClique()) + { + // Disable clique-kernel support if not on CR8 topology + if (!(comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_CR8G)) + { + INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (force enable with RCCL_FORCE_ENABLE_CLIQUE)"); + cliqueMode = CliqueManager::CLIQUE_DISABLED; + } + } + } + comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode); + NCCLCHECK(comm->cliqueManager->Init(commId, rootPid)); + } // [/RCCL] } while(0); + // Done with AllGather1 data free(allGather1Data); @@ -1147,6 +1200,10 @@ ncclResult_t ncclCommDestroy(ncclComm_t comm) { return ncclInvalidArgument; } + // [RCCL] Delete CliqueManager if it exists + if (comm->cliqueManager) delete comm->cliqueManager; + // [/RCCL] + return commDestroy(comm); } diff --git a/projects/rccl/test/CorrectnessTest.hpp b/projects/rccl/test/CorrectnessTest.hpp index 95010ee701..fb11027b64 100644 --- a/projects/rccl/test/CorrectnessTest.hpp +++ b/projects/rccl/test/CorrectnessTest.hpp @@ -418,23 +418,24 @@ namespace CorrectnessTests switch (dataset.dataType) { case ncclInt8: - printf("Expected %d. Output %d on device %d[%d]\n", outputI1[j], expectedI1[j], i, j); break; + printf("Expected %d. Output %d on device %d[%d]\n", expectedI1[j], outputI1[j], i, j); + break; case ncclUint8: - printf("Expected %u. Output %u on device %d[%d]\n", outputU1[j], expectedU1[j], i, j); break; + printf("Expected %u. Output %u on device %d[%d]\n", expectedU1[j], outputU1[j], i, j); break; case ncclInt32: - printf("Expected %d. Output %d on device %d[%d]\n", outputI4[j], expectedI4[j], i, j); break; + printf("Expected %d. Output %d on device %d[%d]\n", expectedI4[j], outputI4[j], i, j); break; case ncclUint32: - printf("Expected %u. Output %u on device %d[%d]\n", outputU4[j], expectedU4[j], i, j); break; + printf("Expected %u. Output %u on device %d[%d]\n", expectedU4[j], outputU4[j], i, j); break; case ncclInt64: - printf("Expected %ld. Output %ld on device %d[%d]\n", outputI8[j], expectedI8[j], i, j); break; + printf("Expected %ld. Output %ld on device %d[%d]\n", expectedI8[j], outputI8[j], i, j); break; case ncclUint64: - printf("Expected %lu. Output %lu on device %d[%d]\n", outputU8[j], expectedU8[j], i, j); break; + printf("Expected %lu. Output %lu on device %d[%d]\n", expectedU8[j], outputU8[j], i, j); break; case ncclFloat32: - printf("Expected %f. Output %f on device %d[%d]\n", outputF4[j], expectedF4[j], i, j); break; + printf("Expected %f. Output %f on device %d[%d]\n", expectedF4[j], outputF4[j], i, j); break; case ncclFloat64: - printf("Expected %lf. Output %lf on device %d[%d]\n", outputF8[j], expectedF8[j], i, j); break; + printf("Expected %lf. Output %lf on device %d[%d]\n", expectedF8[j], outputF8[j], i, j); break; case ncclBfloat16: - printf("Expected %f. Output %f on device %d[%d]\n", (float)outputB2[j], (float)expectedB2[j], i, j); break; + printf("Expected %f. Output %f on device %d[%d]\n", (float)expectedB2[j], (float)outputB2[j], i, j); break; default: fprintf(stderr, "[ERROR] Unsupported datatype\n"); exit(0); diff --git a/projects/rccl/test/test_AllReduce.cpp b/projects/rccl/test/test_AllReduce.cpp index 992b7c32d4..28f76e52d2 100644 --- a/projects/rccl/test/test_AllReduce.cpp +++ b/projects/rccl/test/test_AllReduce.cpp @@ -58,6 +58,6 @@ namespace CorrectnessTests testing::Values(2,3,4,5,6,7,8), // In-place or not testing::Values(false, true), - testing::Values("")), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), CorrectnessTest::PrintToStringParamName()); } // namespace diff --git a/projects/rccl/test/test_CombinedCalls.cpp b/projects/rccl/test/test_CombinedCalls.cpp index db4f938fc0..7d1b23c98c 100644 --- a/projects/rccl/test/test_CombinedCalls.cpp +++ b/projects/rccl/test/test_CombinedCalls.cpp @@ -96,6 +96,6 @@ namespace CorrectnessTests testing::Values(2,3,4,5,6,7,8), // In-place or not testing::Values(false, true), - testing::Values("")), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), CorrectnessTest::PrintToStringParamName()); } // namespace diff --git a/projects/rccl/test/test_GroupCalls.cpp b/projects/rccl/test/test_GroupCalls.cpp index b3ab2ca09b..f94dd6cf6a 100644 --- a/projects/rccl/test/test_GroupCalls.cpp +++ b/projects/rccl/test/test_GroupCalls.cpp @@ -116,6 +116,6 @@ namespace CorrectnessTests testing::Values(2,3,4,5,6,7,8), // In-place or not testing::Values(false, true), - testing::Values("")), + testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")), CorrectnessTest::PrintToStringParamName()); } // namespace diff --git a/projects/rccl/tools/HelloRccl/HelloRccl.cpp b/projects/rccl/tools/HelloRccl/HelloRccl.cpp new file mode 100644 index 0000000000..4eb72b6a52 --- /dev/null +++ b/projects/rccl/tools/HelloRccl/HelloRccl.cpp @@ -0,0 +1,257 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "HelloRccl.hpp" + + +void Usage(char *argv0); +void ExecuteTest(int numIntraRank, int intraRankStartId, int numTotalRanks, ncclComm_t* comm); + +int main(int argc, char **argv) +{ + if (getenv("NCCL_COMM_ID") && argc == 3) // Run in multi-process mode + { + int nranks = atoi(argv[1]); + int rank = atoi(argv[2]); + if (rank == 0) printf("Running in multi-process mode\n"); + + // Create communicator for this rank + ncclUniqueId commId; + NCCL_CALL(ncclGetUniqueId(&commId)); + + // Initialize communicator + ncclComm_t comm; + HIP_CALL(hipSetDevice(rank)); + NCCL_CALL(ncclCommInitRank(&comm, nranks, commId, rank)); + + // Run the test + ExecuteTest(1, rank, nranks, &comm); + } + else if (argc == 2) // Run in single-process mode + { + printf("Running in single-process mode\n"); + + int nranks = atoi(argv[1]); + + // Initialize communicators for each rank + ncclComm_t comm[nranks]; + NCCL_CALL(ncclCommInitAll(comm, nranks, NULL)); + + // Run the test + ExecuteTest(nranks, 0, nranks, comm); + } + else + { + Usage(argv[0]); + return 1; + } + return 0; +} + +void ExecuteTest(int numIntraRank, int intraRankStartId, int numTotalRanks, ncclComm_t* comm) +{ + // Test configuration settings + int minPow = 10; // Starting power of 2 input size + int maxPow = 28; // Ending power of 2 input size + int numWarmups = 3; // Number of untimed warmup iterations + int numIterations = 10; // Number of timed iterations + + // Allocate GPU resources for this process + hipStream_t stream[numIntraRank]; + hipEvent_t startEvent[numIntraRank]; + hipEvent_t stopEvent[numIntraRank]; + for (int i = 0; i < numIntraRank; i++) + { + HIP_CALL(hipSetDevice(intraRankStartId + i)); + HIP_CALL(hipStreamCreate(&stream[i])); + HIP_CALL(hipEventCreate(&startEvent[i])); + HIP_CALL(hipEventCreate(&stopEvent[i])); + } + + if (intraRankStartId == 0) + { + printf("AllReduce Performance (sum of floats):\n"); + printf("%10s %10s %10s\n", "Bytes", "CpuTime(ms)", "GpuTime(ms)"); + } + + // Loop over power-of-two input sizes + for (int power = minPow; power <= maxPow; power++) + { + int N = 1 << power; + + // Allocate GPU memory + float *iputGpu[numIntraRank], *oputGpu[numIntraRank]; + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipSetDevice(intraRankStartId + r)); + HIP_CALL(hipMalloc((void **)&iputGpu[r], N * sizeof(float))); + HIP_CALL(hipMalloc((void **)&oputGpu[r], N * sizeof(float))); + } + + // Allocate CPU memory for input/output + float *iputCpu = (float *)malloc(N * sizeof(float)); + float *oputCpu = (float *)malloc(N * sizeof(float)); + + // Fill CPU memory with a simple pattern + for (int i = 0; i < N; i++) + { + iputCpu[i] = 1.0f; + oputCpu[i] = 0.0f; + } + + // Copy the input from CPU memory to GPU memory + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipSetDevice(intraRankStartId + r)); + HIP_CALL(hipMemcpy(iputGpu[r], iputCpu, N * sizeof(float), hipMemcpyHostToDevice)); + } + + // Perform some untimed initial warmup iterations + for (int iteration = 0; iteration < numWarmups; iteration++) + { + NCCL_CALL(ncclGroupStart()); + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipSetDevice(intraRankStartId + r)); + NCCL_CALL(ncclAllReduce(iputGpu[r], oputGpu[r], N, ncclFloat, ncclSum, comm[r], stream[r])); + } + NCCL_CALL(ncclGroupEnd()); + } + for (int r = 0; r < numIntraRank; r++) + HIP_CALL(hipStreamSynchronize(stream[r])); + + // Perform timed iterations + auto cpuStart = std::chrono::high_resolution_clock::now(); + for (int r = 0; r < numIntraRank; r++) + HIP_CALL(hipEventRecord(startEvent[r], stream[r])); + + for (int iteration = 0; iteration < numIterations; iteration++) + { + NCCL_CALL(ncclGroupStart()); + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipSetDevice(intraRankStartId + r)); + NCCL_CALL(ncclAllReduce(iputGpu[r], oputGpu[r], N, ncclFloat, ncclSum, comm[r], stream[r])); + } + NCCL_CALL(ncclGroupEnd()); + } + + for (int r = 0; r < numIntraRank; r++) + HIP_CALL(hipEventRecord(stopEvent[r], stream[r])); + + for (int r = 0; r < numIntraRank; r++) + HIP_CALL(hipStreamSynchronize(stream[r])); + + auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; + double totalCpuTime = std::chrono::duration_cast>(cpuDelta).count(); + + float totalGpuTime; + HIP_CALL(hipEventElapsedTime(&totalGpuTime, startEvent[0], stopEvent[0])); + + if (intraRankStartId == 0) printf("%10lu %10.3f %10.3f\n", N * sizeof(float), (totalCpuTime / numIterations), (totalGpuTime / numIterations)); + + // Validate results + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipMemcpy(oputCpu, oputGpu[r], N * sizeof(float), hipMemcpyDeviceToHost)); + bool isOK = true; + int expected = numTotalRanks; + for (int i = 0; i < N; i++) + { + isOK &= (oputCpu[i] == expected); + } + if (!isOK) + { + printf("[ERROR] Rank %d Incorrect results for N = %d\n", intraRankStartId + r, N); + NCCL_CALL(ncclCommDestroy(comm[r])); + exit(1); + } + } + + // Release GPU resources + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipFree(oputGpu[r])); + HIP_CALL(hipFree(iputGpu[r])); + } + free(iputCpu); + free(oputCpu); + } + + for (int r = 0; r < numIntraRank; r++) + { + HIP_CALL(hipStreamDestroy(stream[r])); + HIP_CALL(hipEventDestroy(startEvent[r])); + HIP_CALL(hipEventDestroy(stopEvent[r])); + NCCL_CALL(ncclCommDestroy(comm[r])); + } +} + +void Usage(char *argv0) +{ + printf("Single Process Usage: %s numRanks\n", argv0); + printf("\n"); + printf("Multi Process Usage: %s numRanks rank\n", argv0); + printf(" - NCCL_COMM_ID must be set in order to use this\n\n"); + printf(" - To use this process as the root process you may use any of the following:\n"); + + char hostname[256]; + gethostname(hostname, 256); + printf(" export NCCL_COMM_ID=%s:12345\n", hostname); + + // Loop over linked list of interfaces + struct ifaddrs *ifaddr; + getifaddrs(&ifaddr); + for (struct ifaddrs* ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) + { + // Skip anything not based on IPv4 / IPv6 + int family = ifa->ifa_addr->sa_family; + if (family != AF_INET && family != AF_INET6) continue; + + // Skip iPv6 loopback interface + if (family == AF_INET6) + { + struct sockaddr_in6* sa = (struct sockaddr_in6*)(ifa->ifa_addr); + if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue; + } + + socklen_t saLen = (family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); + char host[NI_MAXHOST]; + char service[NI_MAXSERV]; + + getnameinfo(ifa->ifa_addr, saLen, host, NI_MAXHOST, service, NI_MAXSERV, + NI_NUMERICHOST|NI_NUMERICSERV); + + std::string result = std::string(host) + ":12345"; + printf(" export NCCL_COMM_ID=%s\n", result.c_str()); + } + freeifaddrs(ifaddr); +} diff --git a/projects/rccl/tools/HelloRccl/HelloRccl.hpp b/projects/rccl/tools/HelloRccl/HelloRccl.hpp new file mode 100644 index 0000000000..55d24ecd5d --- /dev/null +++ b/projects/rccl/tools/HelloRccl/HelloRccl.hpp @@ -0,0 +1,49 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef HELLORCCL_HPP +#define HELLORCCL_HPP +#include + +#define HIP_CALL(cmd) \ + do { \ + hipError_t error = (cmd); \ + if (error != hipSuccess) \ + { \ + std::cerr << "Encountered HIP error (" << hipGetErrorString(error) << ") at line " \ + << __LINE__ << " in file " << __FILE__ << "\n"; \ + exit(-1); \ + } \ + } while (0) + +#define NCCL_CALL(cmd) \ + do { \ + ncclResult_t error = (cmd); \ + if (error != ncclSuccess) \ + { \ + std::cerr << "Encountered NCCL error (" << ncclGetErrorString(error) << ") at line " \ + << __LINE__ << " in file " << __FILE__ << "\n"; \ + exit(-1); \ + } \ + } while (0) + +#endif diff --git a/projects/rccl/tools/HelloRccl/Makefile b/projects/rccl/tools/HelloRccl/Makefile new file mode 100644 index 0000000000..8729c17feb --- /dev/null +++ b/projects/rccl/tools/HelloRccl/Makefile @@ -0,0 +1,21 @@ +# Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +# Set to where RCCL is installed +RCCL_INSTALL=../../build/release + +HIP_PATH?= $(wildcard /opt/rocm/hip) +ifeq (,$(HIP_PATH)) +HIP_PATH=../../.. +endif +HIPCC=$(HIP_PATH)/bin/hipcc + +EXE=HelloRccl +CXXFLAGS = -std=c++11 -O3 -I../../src/include -I$(RCCL_INSTALL) -L$(RCCL_INSTALL) -lrccl + +all: $(EXE) + +$(EXE): $(EXE).cpp $(shell find -regex ".*\.\hpp") + $(HIPCC) $(CXXFLAGS) $< -o $@ + +clean: + rm -f *.o $(EXE) diff --git a/projects/rccl/tools/HelloRccl/runTest.sh b/projects/rccl/tools/HelloRccl/runTest.sh new file mode 100755 index 0000000000..c6e832e3c4 --- /dev/null +++ b/projects/rccl/tools/HelloRccl/runTest.sh @@ -0,0 +1,22 @@ +#!/bin/bash +RCCL_INSTALL=../../build/release +EXE=$PWD/HelloRccl +LDPATH=$LD_LIBRARY_PATH:$RCCL_INSTALL + +echo "Single process - With clique-based kernels:" +RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 LD_LIBRARY_PATH=$LDPATH $EXE 4 + +echo "Single process - Without clique-based kernels:" +NCCL_DEBUG=INFO LD_LIBRARY_PATH=$LDPATH $EXE 4 + +echo "With clique-based kernels:" +RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 & +RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 & +RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 & +RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 + +echo "Without clique-based kernels:" +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 & +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 & +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 & +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3