From 2b8184808d3dad344edf26a495c75bf22f2de40c Mon Sep 17 00:00:00 2001 From: gilbertlee-amd <44450918+gilbertlee-amd@users.noreply.github.com> Date: Tue, 13 Oct 2020 11:22:04 -0600 Subject: [PATCH] Initial support for clique-based kernels (#276) * Initial support for clique-based kernels --- CMakeLists.txt | 6 + src/bootstrap.cc | 36 ++- src/clique/AllReduceCliqueKernel.h | 145 +++++++++ src/clique/CliqueCommon.h | 105 +++++++ src/clique/CliqueManager.cc | 399 +++++++++++++++++++++++++ src/clique/CliqueManager.h | 121 ++++++++ src/clique/CliqueShmNames.h | 37 +++ src/clique/HandleCache.cc | 31 ++ src/clique/HandleCache.h | 141 +++++++++ src/clique/HandleShm.cc | 67 +++++ src/clique/HandleShm.h | 53 ++++ src/clique/Hash.cc | 34 +++ src/clique/Hash.h | 28 ++ src/clique/MsgQueue.cc | 72 +++++ src/clique/MsgQueue.h | 42 +++ src/clique/SharedMemHelper.h | 43 +++ src/clique/ShmObject.cc | 45 +++ src/clique/ShmObject.h | 203 +++++++++++++ src/collectives/device/common_kernel.h | 5 + src/enqueue.cc | 25 ++ src/include/bootstrap.h | 2 +- src/include/comm.h | 11 +- src/init.cc | 47 ++- tools/HelloRccl/HelloRccl.cpp | 192 ++++++++++++ tools/HelloRccl/HelloRccl.hpp | 49 +++ tools/HelloRccl/Makefile | 21 ++ tools/HelloRccl/runTest.sh | 16 + 27 files changed, 1969 insertions(+), 7 deletions(-) create mode 100644 src/clique/AllReduceCliqueKernel.h create mode 100644 src/clique/CliqueCommon.h create mode 100644 src/clique/CliqueManager.cc create mode 100644 src/clique/CliqueManager.h create mode 100644 src/clique/CliqueShmNames.h create mode 100644 src/clique/HandleCache.cc create mode 100644 src/clique/HandleCache.h create mode 100644 src/clique/HandleShm.cc create mode 100644 src/clique/HandleShm.h create mode 100644 src/clique/Hash.cc create mode 100644 src/clique/Hash.h create mode 100644 src/clique/MsgQueue.cc create mode 100644 src/clique/MsgQueue.h create mode 100644 src/clique/SharedMemHelper.h create mode 100644 src/clique/ShmObject.cc create mode 100644 src/clique/ShmObject.h create mode 100644 tools/HelloRccl/HelloRccl.cpp create mode 100644 tools/HelloRccl/HelloRccl.hpp create mode 100644 tools/HelloRccl/Makefile create mode 100755 tools/HelloRccl/runTest.sh diff --git a/CMakeLists.txt b/CMakeLists.txt index 450dd48c22..088d7f79bd 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -126,6 +126,12 @@ set(CC_SOURCES src/collectives/all_to_all_api.cc src/collectives/all_to_allv_api.cc src/channel.cc + src/clique/CliqueManager.cc # RCCL + src/clique/HandleCache.cc # RCCL + src/clique/HandleShm.cc # RCCL + src/clique/Hash.cc # RCCL + src/clique/MsgQueue.cc # RCCL + src/clique/ShmObject.cc # RCCL src/misc/argcheck.cc src/misc/nvmlwrap_stub.cc src/misc/utils.cc diff --git a/src/bootstrap.cc b/src/bootstrap.cc index e90dd66823..71e7070284 100644 --- a/src/bootstrap.cc +++ b/src/bootstrap.cc @@ -12,6 +12,11 @@ #include "socket.h" #include #include +// [RCCL] +#include "clique/CliqueManager.h" +#include "clique/CliqueShmNames.h" +#include "clique/Hash.h" +// [/RCCL] struct bootstrapNetComm { int fd; @@ -163,7 +168,14 @@ static ncclResult_t setFilesLimit() { return ncclSuccess; } -static void *bootstrapRoot(void* listenComm) { +static void *bootstrapRoot(void* bootstrapRootStruct) { // [RCCL] Modified to include hash argument) + // [RCCL] Unpack bootstrapRootStruct + struct bootstrapRootStruct* rootStruct = (struct bootstrapRootStruct*) bootstrapRootStruct; + void* listenComm = rootStruct->listenComm; + unsigned long hash = rootStruct->hash; + int pid = getpid(); // sharing PID to other ranks for creating shared memory files for CliqueManager + // [/RCCL] + struct extInfo info; ncclNetHandle_t *rankHandles = NULL; ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange @@ -205,12 +217,19 @@ static void *bootstrapRoot(void* listenComm) { } while (c < nranks); TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks); + { // [RCCL] Initialize message queues / shared memory files + NCCLCHECKGOTO(CliqueManager::BootstrapRootInit(pid, hash), res, out); + } // [/RCCL] + // Send the connect handle for the next rank in the AllGather ring for (int r=0; rhash = djb2Hash(id->internal); + rootStruct->listenComm = listenComm; + pthread_create(&thread, NULL, bootstrapRoot, (void *)rootStruct); + // [/RCCL] + return ncclSuccess; } @@ -267,9 +293,10 @@ struct extState { int rank; int nranks; int dev; + int rootPid; // [RCCL] PID of root }; -ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) { +ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState, int* rootPid) { // [RCCL] Adding rootPid ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id; bool idFromEnv = getenv("NCCL_COMM_ID") != NULL; struct extState* state; @@ -314,6 +341,9 @@ ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commS ncclNetHandle_t extHandleNext; NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm)); NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext))); + { // [RCCL] Receive PID from root + NCCLCHECK(bootstrapNetRecv(tmpRecvComm, rootPid, sizeof(int))); + } // [/RCCL] NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm)); NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot)); diff --git a/src/clique/AllReduceCliqueKernel.h b/src/clique/AllReduceCliqueKernel.h new file mode 100644 index 0000000000..ebd4b16913 --- /dev/null +++ b/src/clique/AllReduceCliqueKernel.h @@ -0,0 +1,145 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef ALLREDUCECLIQUEKERNEL_H +#define ALLREDUCECLIQUEKERNEL_H + +#include "CliqueCommon.h" +#include "common_kernel.h" +#include +#include "hip/hip_ext.h" + +#define ALL_REDUCE_SPLIT_BLOCKSIZE 256 + +template +__global__ __launch_bounds__(ALL_REDUCE_SPLIT_BLOCKSIZE) +void AllReduceCliqueSplitKernel(int N, + size_t startIdx, + cliqueDevicePtrs_t cliquePtrs) +{ + if (N > 0) + { + // Each workgroup operates on a contiguous portion of memory + // Divide the # of elements evenly across workgroups, then round up to multiple of blocksize + int baseSize = (N + gridDim.x - 1) / gridDim.x; + int chunkSize = RoundUp(baseSize, ALL_REDUCE_SPLIT_BLOCKSIZE); + int blockOffsetStart = min(blockIdx.x * chunkSize, N); + int blockOffsetStop = min(blockOffsetStart + chunkSize, N); + int blockN = blockOffsetStop - blockOffsetStart; + + if (blockN > 0) + { + T const** inputs = (T const**)cliquePtrs.inputs; + T** outputs = (T **)cliquePtrs.outputs; + + T const* srcs[NUM_RANKS]; + T* dsts[NUM_RANKS]; + + #pragma unroll + for (int r = 0; r < NUM_RANKS; r++) + { + srcs[r] = inputs[r] + startIdx + blockOffsetStart; + dsts[r] = outputs[r] + startIdx + blockOffsetStart; + } + + #define ALL_REDUCE_CLIQUE_UNROLL 4 + ReduceOrCopyMulti( + threadIdx.x, ALL_REDUCE_SPLIT_BLOCKSIZE, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN); + } + } + + // Each GPU works on a separate subsection, however we cannot finish the kernel + // until all GPUs have finished otherwise part of the result may not be correct yet + WaitForBarrier(cliquePtrs.barrierCounter); +} + +class AllReduceCliqueKernel +{ +public: + static ncclResult_t Launch(int const rank, + int const numRanks, + int const maxGridSize, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op, + hipStream_t const stream, + cliqueDevicePtrs_t const& cliquePtrs, + bool const doTiming = false) + { + if (numRanks < MIN_CLIQUE_SIZE || numRanks > MAX_CLIQUE_SIZE) + { + WARN("Number of ranks exceeds supported. Expected %d <= %d < %d for numRanks", + MIN_CLIQUE_SIZE, numRanks, MAX_CLIQUE_SIZE); + return ncclInvalidUsage; + } + + // Divide the # of elements done per GPU evenly across ranks, then round up to blocksize + int baseSize = (count + numRanks - 1) / numRanks; + int chunkSize = RoundUp(baseSize, ALL_REDUCE_SPLIT_BLOCKSIZE); + int startIdx = min(chunkSize * rank, count); + int stopIdx = min(startIdx + chunkSize, count); + int rankN = max(stopIdx - startIdx, 0); + + // Adjust gridsize if there isn't enough work to prevent empty workgroups + int realGridSize = std::max(std::min(maxGridSize, + (rankN + ALL_REDUCE_SPLIT_BLOCKSIZE - 1) / ALL_REDUCE_SPLIT_BLOCKSIZE), + 1); + + hipEvent_t startEvent = 0; + hipEvent_t stopEvent = 0; + float kernelTimeMs; + if (doTiming) + { + hipEventCreate(&startEvent); + hipEventCreate(&stopEvent); + hipEventRecord(startEvent, stream); + } + + // Launch even if empty for this rank, because all ranks must hit sync barrier + hipLaunchKernelGGL(m_allReduceCliqueKernels[datatype][op][numRanks - MIN_CLIQUE_SIZE], + dim3(realGridSize, 1, 1), + dim3(ALL_REDUCE_SPLIT_BLOCKSIZE, 1, 1), + 0, stream, + rankN, startIdx, cliquePtrs); + + if (doTiming) + { + hipEventRecord(stopEvent, stream); + hipEventSynchronize(stopEvent); + hipEventElapsedTime(&kernelTimeMs, startEvent, stopEvent); + printf("[%d/%d:%d] %lu %13.6f ms\n", rank, numRanks, maxGridSize, count, kernelTimeMs); + hipEventDestroy(startEvent); + hipEventDestroy(stopEvent); + } + + return ncclSuccess; + } + +protected: + // List of all templated device kernels function pointers + typedef void(*allReduceCliqueFunc_t)(int, size_t, cliqueDevicePtrs_t); + static constexpr allReduceCliqueFunc_t + m_allReduceCliqueKernels[ncclNumTypes][ncclNumOps][MAX_CLIQUE_SIZE - MIN_CLIQUE_SIZE + 1] = + KERNEL_LIST_MACRO(AllReduceCliqueSplitKernel); +}; + +#endif diff --git a/src/clique/CliqueCommon.h b/src/clique/CliqueCommon.h new file mode 100644 index 0000000000..9dc57c067e --- /dev/null +++ b/src/clique/CliqueCommon.h @@ -0,0 +1,105 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef CLIQUE_COMMON_H +#define CLIQUE_COMMON_H + +#include "nccl.h" +#include +#include "rccl_bfloat16.h" +#include "reduce_kernel.h" + +#define MIN_CLIQUE_SIZE 2 +#define MAX_CLIQUE_SIZE 8 + +typedef struct +{ + void const* inputs[MAX_CLIQUE_SIZE]; + void* outputs[MAX_CLIQUE_SIZE]; + int* barrierCounter; +} cliqueDevicePtrs_t; + +// Helper macro to generate a table of templated kernel functions +// This is expected to run between MIN_CLIQUE_SIZE to MAX_CLIQUE_SIZE +#define KERNEL_LIST_RANK(kernelname, datatype, func) \ + { \ + kernelname, 2>, \ + kernelname, 3>, \ + kernelname, 4>, \ + kernelname, 5>, \ + kernelname, 6>, \ + kernelname, 7>, \ + kernelname, 8> \ + } + +// Helper macro to generate a table of templated kernel functions +// This is expected to match the number of supported reduction operations (ncclNumOps) +#define KERNEL_LIST_OP(kernelname, datatype) \ + { \ + KERNEL_LIST_RANK(kernelname, datatype, FuncSum), \ + KERNEL_LIST_RANK(kernelname, datatype, FuncProd), \ + KERNEL_LIST_RANK(kernelname, datatype, FuncMax), \ + KERNEL_LIST_RANK(kernelname, datatype, FuncMin) \ + } + +// Helper Macro to generate table of templated kernel functions +// This is expected to match the number of supported datatypes (ncclNumTypes) +#define KERNEL_LIST_MACRO(kernelname) \ + { \ + KERNEL_LIST_OP(kernelname, int8_t), \ + KERNEL_LIST_OP(kernelname, uint8_t), \ + KERNEL_LIST_OP(kernelname, int32_t), \ + KERNEL_LIST_OP(kernelname, uint32_t), \ + KERNEL_LIST_OP(kernelname, int64_t), \ + KERNEL_LIST_OP(kernelname, uint64_t), \ + KERNEL_LIST_OP(kernelname, half), \ + KERNEL_LIST_OP(kernelname, float), \ + KERNEL_LIST_OP(kernelname, double), \ + KERNEL_LIST_OP(kernelname, rccl_bfloat16) \ + } + +template +__forceinline__ __device__ void WaitForBarrier(int* counter) +{ + if (threadIdx.x == 0 & blockIdx.x == 0) + { + // Assumes counter starts at 0 prior to any rank access + __atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST); + + // Wait for all ranks to reach barrier + while (__atomic_load_n(counter, __ATOMIC_SEQ_CST) < NUM_RANKS); + + // Each rank increments again, last one resets barrier + if (__atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST) == (2*NUM_RANKS)) + __atomic_store_n(counter, 0, __ATOMIC_SEQ_CST); + + // Wait for counter to be zeroed + while (__atomic_load_n(counter, __ATOMIC_SEQ_CST) != 0); + } +} + +__forceinline__ __host__ __device__ int RoundUp(int X, int Y) +{ + return (X+Y-1)/Y * Y; +} + +#endif diff --git a/src/clique/CliqueManager.cc b/src/clique/CliqueManager.cc new file mode 100644 index 0000000000..b5cc0f0fb2 --- /dev/null +++ b/src/clique/CliqueManager.cc @@ -0,0 +1,399 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "CliqueManager.h" +#include "CliqueShmNames.h" +#include "MsgQueue.h" + +#include "nccl.h" +#include "core.h" + +#include "Hash.h" + +#include "AllReduceCliqueKernel.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +cliqueDevicePtrs_t CliqueManager::m_cliquePtrs[NCCL_MAX_OPS] = {}; +uint32_t CliqueManager::m_staticCounters[NCCL_MAX_OPS] = {}; +int* CliqueManager::m_staticBarriers = NULL; + +CliqueManager::CliqueManager(int const rank, + int const numRanks, + cliqueMode_t const cliqueMode) : + m_rank(rank), + m_numRanks(numRanks), + m_cliqueMode(cliqueMode), + m_init(false), + m_deviceBarriers(NULL) +{ +} + +CliqueManager::~CliqueManager() +{ + if (m_init) + { + CleanUp(); + } +} + +void CliqueManager::CleanUp() +{ + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Release caches + if (m_ipcHandleSendCache) delete m_ipcHandleSendCache; + if (m_ipcHandleSendCache) delete m_ipcHandleRecvCache; + + // Close shared memory + m_shmHandles.Close(); + m_sharedCounters.Close(); + m_sharedBarrier.Close(); + + if (m_rank == 0) + { + hipFree(m_deviceBarriers); + } + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + if (m_staticBarriers) hipHostFree(m_staticBarriers); + } + m_init = false; +} + +ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix) +{ + ncclResult_t res; + + if (m_init) return ncclSuccess; + m_init = true; + + // Check parameters + if (m_rank < 0 || m_rank >= m_numRanks) + { + WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks); + return ncclInvalidUsage; + } + if (commId == NULL) + { + WARN("CommId should not be empty"); + return ncclInvalidUsage; + } + + // For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var + if (!getenv("RCCL_ENABLE_CLIQUE")) + { + if (m_rank == 0) INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)"); + m_cliqueMode = CLIQUE_DISABLED; + return ncclSuccess; + } + + unsigned long hash = djb2Hash(commId->internal); + std::string shmSuffix = std::to_string(hash) + "_" + std::to_string(suffix); + + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Initialize shared memory file for IPC handles (based on commId hash) + m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix); + NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback); + + // Initialize IPC caches + m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS); + m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS, + 100, + hipIpcMemHandleHash, + hipIpcMemHandleEqual); + + // Initialize shared host barrier counters + m_sharedCounters = ShmObject(NCCL_MAX_OPS * sizeof(uint32_t), + CliqueShmNames["SharedCounters"] + shmSuffix, + m_rank, + m_numRanks, + hash); + NCCLCHECKGOTO(m_sharedCounters.Open(), res, dropback); + m_arrivalCounter = m_sharedCounters.Get(); + + // Initialized shared barriers + m_sharedBarrier = ShmObject(std::max(4096LU, sizeof(hipIpcMemHandle_t)), + CliqueShmNames["Barriers"] + shmSuffix, + m_rank, + m_numRanks, + hash); + NCCLCHECKGOTO(m_sharedBarrier.Open(), res, dropback); + + if (m_rank == 0) + { + hipIpcMemHandle_t handle; + // Allocate fine-grained device memory on rank 0 and get handle for it and store in IPC + NCCLCHECKGOTO(ncclCudaCalloc(&m_deviceBarriers, NCCL_MAX_OPS * sizeof(int), true), res, dropback); + if (hipIpcGetMemHandle(&handle, m_deviceBarriers) != hipSuccess) + { + WARN("Unable to get IPC handle for barrier memory"); + goto dropback; + } + + *m_sharedBarrier.Get() = handle; + } + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + // Allocate and zero pinned host memory that all GPU kernels will have access to as a barrier + if (hipHostMalloc(&m_staticBarriers, sizeof(int) * NCCL_MAX_OPS) != hipSuccess) + { + WARN("Unable to allocated pinned host memory for clique barrier. Disabling clique-based kernels"); + m_cliqueMode = CLIQUE_DISABLED; + m_init = true; + return ncclSuccess; + } + memset(m_staticBarriers, 0, NCCL_MAX_OPS * sizeof(int)); + m_arrivalCounter = m_staticCounters; + } + m_init = true; + return ncclSuccess; + +dropback: + WARN("Unable to initialize shared memory. Disabling clique-based kernels"); + CleanUp(); + m_cliqueMode = CLIQUE_DISABLED; + return ncclSuccess; +} + +bool CliqueManager::IsSupported(ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op) const +{ + if (m_cliqueMode == CLIQUE_DISABLED) return false; + if (coll == ncclCollAllReduce) return true; + + // NOTE: Currently we only support allReduce +//#define ALL_REDUCE_COUNT 1048576 + //if (coll == ncclCollAllReduce && count < ALL_REDUCE_COUNT) return true; + + return false; +} + +ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr) +{ + // Do nothing if disabled + if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + + if (!m_init) + { + WARN("CliqueManager must be initialized before use"); + return ncclInvalidUsage; + } + + int const opIndex = opCount % NCCL_MAX_OPS; + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Get fine-grained device memory if not already done + if (m_deviceBarriers == NULL) + { + hipIpcMemHandle_t handle = *m_sharedBarrier.Get(); + CUDACHECK(hipIpcOpenMemHandle((void**)&m_deviceBarriers, handle, hipIpcMemLazyEnablePeerAccess)); + } + + std::vector handles(NUM_HANDLES_PER_RANK); + + // Get IPC handles for input/output pointers from cache + NCCLCHECK(CheckCacheForPtr(const_cast(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0])); + NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1])); + + // Write IPC handles to shared memory for given rank / opCount + NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles)); + + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + // Store this rank's input/output pointers into static member + m_cliquePtrs[opIndex].inputs[m_rank] = inputPtr; + m_cliquePtrs[opIndex].outputs[m_rank] = outputPtr; + } + return ncclSuccess; +} + +ncclResult_t CliqueManager::QueueKernel(uint64_t const opCount, + ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op, + int const root, + hipStream_t const stream) +{ + // Do nothing if disabled + if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess; + if (!m_init) + { + WARN("CliqueManager must be initialized before use"); + return ncclInvalidUsage; + } + + // Wait for all ranks to declare pointers + int opIndex = opCount % NCCL_MAX_OPS; + WaitForBarrier(opIndex); + + // Get cliqueDevicePointers + cliqueDevicePtrs_t cliquePtrs; + NCCLCHECK(GetCliqueDevicePointers(opCount, cliquePtrs)); + + // NOTE: The number of blocks to use per GPU will need to be further optimized + int gridSize = (getenv("RCCL_CLIQUE_GRIDSIZE") ? atoi(getenv("RCCL_CLIQUE_GRIDSIZE")) : 2); + + // Launch kernel + switch (coll) + { + case ncclCollAllReduce: + return AllReduceCliqueKernel::Launch(m_rank, m_numRanks, gridSize, count, datatype, op, stream, cliquePtrs); + default: + WARN("Unsupported collective type"); + return ncclInvalidUsage; + } +} + +ncclResult_t CliqueManager::GetCliqueDevicePointers(uint64_t opCount, cliqueDevicePtrs_t& cliquePtrs) +{ + // Wait for completion for current opCount + int opIndex = opCount % NCCL_MAX_OPS; + + if (m_cliqueMode == CLIQUE_SINGLE_NODE) + { + // Collect the ready handles from shared memory and convert them to device pointers + int numHandles = m_numRanks * NUM_HANDLES_PER_RANK; + std::vector handles(numHandles); + + NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles)); + + for (int i = 0; i < m_numRanks; i++) + { + void *input; + NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK], + m_ipcHandleRecvCache, &input)); + cliquePtrs.inputs[i] = const_cast(input); + + NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1], + m_ipcHandleRecvCache, &cliquePtrs.outputs[i])); + } + cliquePtrs.barrierCounter = &(m_deviceBarriers[opIndex]); + } + else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS) + { + m_cliquePtrs[opIndex].barrierCounter = &m_staticBarriers[opIndex]; + cliquePtrs = m_cliquePtrs[opIndex]; + } + return ncclSuccess; +} + +ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr, + NcclIpcHandleSendCache* cache, + int rank, + hipIpcMemHandle_t* handle) +{ + uint64_t addr = (uint64_t)devPtr; + + // handle NULL ptr case + if (addr == 0) + { + WARN("Error while checking IPC memory handle cache for ptr: null pointer specified.\n"); + return ncclInternalError; + } + + NcclIpcHandleSendCache::iterator it = cache->find(addr); + + if (it == cache->end()) + { + CUDACHECK(hipIpcGetMemHandle(handle, devPtr)); + std::pair ptrHandleMap(addr, *handle) ; + cache->insert(addr, *handle); + } + else + { + *handle = (it->second).first; + } + return ncclSuccess; +} + +ncclResult_t CliqueManager::CheckCacheForHandle(hipIpcMemHandle_t handle, + NcclIpcHandleRecvCache* cache, + void** ptr) +{ + NcclIpcHandleRecvCache::iterator it = cache->find(handle); + + if (it == cache->end()) + { + CUDACHECK(hipIpcOpenMemHandle(ptr, handle, hipIpcMemLazyEnablePeerAccess)); + cache->insert(handle, *ptr); + } + else + { + *ptr = (it->second).first; + } + return ncclSuccess; +} + +void CliqueManager::WaitForBarrier(int opIndex) +{ + m_nextBarrierValue[opIndex] += m_numRanks; + int const nextValue = m_nextBarrierValue[opIndex]; + + __atomic_add_fetch(&m_arrivalCounter[opIndex], 1, __ATOMIC_SEQ_CST); + while (m_arrivalCounter[opIndex] < nextValue) + { + std::this_thread::yield(); + } +} + +ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash) +{ + for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + { + int msgid, fd; + std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid); + SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd); + NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid)); + SYSCHECK(close(fd), "close"); + } + + std::string shmDir = "/dev/shm/"; + + for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + { + struct stat fileStatus; + std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid); + std::string shmFullPath = shmDir + shmFileName; + + // Check if shm file already exists; if so, unlink it + if (stat(shmFullPath.c_str(), &fileStatus) == 0) + { + NCCLCHECK(shmUnlink(shmFileName.c_str())); + } + } + return ncclSuccess; +} diff --git a/src/clique/CliqueManager.h b/src/clique/CliqueManager.h new file mode 100644 index 0000000000..6b1acb1b19 --- /dev/null +++ b/src/clique/CliqueManager.h @@ -0,0 +1,121 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef RCCL_CLIQUE_MANAGER_HPP_ +#define RCCL_CLIQUE_MANAGER_HPP_ + +#include +#include + +#include "nccl.h" +#include "devcomm.h" +#include "CliqueCommon.h" +#include "HandleCache.h" +#include "HandleShm.h" + +#define NUM_HANDLES_PER_RANK 2 + +class CliqueManager +{ +public: + typedef enum + { + CLIQUE_DISABLED = 0, + CLIQUE_SINGLE_PROCESS = 1, + CLIQUE_SINGLE_NODE = 2 + } cliqueMode_t; + + CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode); + + ~CliqueManager(); + + void CleanUp(); + + ncclResult_t Init(ncclUniqueId const* commId, int suffix); + + // Returns true if the collective is supported via a clique-based kernel + bool IsSupported(ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op) const; + + // Provide the pointers to be exchanged across the clique for the given rank / opCount + ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr); + + // Launch a clique based kernel + ncclResult_t QueueKernel(uint64_t const opCount, + ncclFunc_t const coll, + size_t const count, + ncclDataType_t const datatype, + ncclRedOp_t const op, + int const root, + hipStream_t const stream); + + ncclResult_t CloseSharedMemory(); + + static ncclResult_t BootstrapRootInit(int pid, unsigned long hash); + +protected: + // Collect the device pointers from all GPUs for specified opCount + ncclResult_t GetCliqueDevicePointers(uint64_t opCount, cliqueDevicePtrs_t& cliquePtrs); + + + ncclResult_t CheckCacheForPtr(void* devPtr, + NcclIpcHandleSendCache* cache, + int rank, + hipIpcMemHandle_t* handle); + + ncclResult_t CheckCacheForHandle(hipIpcMemHandle_t handle, + NcclIpcHandleRecvCache* cache, + void** ptr); + + // Race-condition helper functions + void WaitForBarrier(int opIndex); + + cliqueMode_t m_cliqueMode; + int m_rank; + int m_numRanks; + bool m_init; + int m_nextBarrierValue[NCCL_MAX_OPS]; + uint32_t* m_arrivalCounter; + + // IPC-related (CLIQUE_SINGLE_NODE) + NcclIpcHandleShm m_shmHandles; + NcclIpcHandleSendCache* m_ipcHandleSendCache; + NcclIpcHandleRecvCache* m_ipcHandleRecvCache; + ShmObject m_sharedCounters; // Tracks # of ranks that have finished declaring pointers + ShmObject m_sharedBarrier; // Used to pass fine-grained device memory buffer + int* m_deviceBarriers; // fine-grained barrier + + // Single-process (CLIQUE_SINGLE_PROCESS) + static cliqueDevicePtrs_t m_cliquePtrs[NCCL_MAX_OPS]; + static uint32_t m_staticCounters[NCCL_MAX_OPS]; + static int* m_staticBarriers; +}; + +// For use in bootstrapping code +struct bootstrapRootStruct { + void* listenComm; + unsigned long hash; +}; + +#endif diff --git a/src/clique/CliqueShmNames.h b/src/clique/CliqueShmNames.h new file mode 100644 index 0000000000..577af8be8d --- /dev/null +++ b/src/clique/CliqueShmNames.h @@ -0,0 +1,37 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_CLIQUE_SHM_NAMES_H_ +#define NCCL_CLIQUE_SHM_NAMES_H_ + +#include +#include + +static std::map CliqueShmNames = +{ + {"SharedCounters", "RcclCounters" }, + {"Mutexes" , "RcclMutexes" }, + {"IpcHandles" , "RcclIpcHandles"}, + {"Barriers" , "RcclBarriers" } +}; + +#endif diff --git a/src/clique/HandleCache.cc b/src/clique/HandleCache.cc new file mode 100644 index 0000000000..8fe4b6ab4b --- /dev/null +++ b/src/clique/HandleCache.cc @@ -0,0 +1,31 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "HandleCache.h" + +#include "Hash.h" + +// djb2 hash function for hashing char array in hipIpcMemHandle_t +unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle) +{ + return djb2Hash(handle.reserved); +} diff --git a/src/clique/HandleCache.h b/src/clique/HandleCache.h new file mode 100644 index 0000000000..5652d444b7 --- /dev/null +++ b/src/clique/HandleCache.h @@ -0,0 +1,141 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_HANDLE_CACHE_H_ +#define NCCL_HANDLE_CACHE_H_ + +#include +#include +#include + +#include "core.h" + +//#include "llvm/ADT/DenseMap.h" + +template < + class Key, + class Value, + class Hash, + class KeyEqual, + class Allocator +> +class NcclIpcHandleCache +{ + typedef std::unordered_map::iterator>, Hash, KeyEqual, Allocator> LRUCache; +public: + using iterator = typename LRUCache::iterator; + NcclIpcHandleCache(size_t size, + size_t bucket_count = 100, + const Hash& hash = Hash(), + const KeyEqual& eql = KeyEqual(), + const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc) + { + m_capacity = size; + } + + ~NcclIpcHandleCache() + { + m_lruHistory.clear(); + m_cache.clear(); + } + + iterator begin() + { + return m_cache.begin(); + } + + iterator end() + { + return m_cache.end(); + } + + iterator find(const Key& key) + { + iterator it = m_cache.find(key); + if (it != m_cache.end()) + { + updateHistory(key); + } + + return it; + } + + std::pair insert(const Key& key, const Value& value) + { + if (m_cache.size() == m_capacity) + { + // remove entry + pop(); + } + + typename LRUCache::iterator it = m_cache.find(key); + bool inserted; + if (it == m_cache.end()) + { + typename std::list::iterator it = m_lruHistory.insert(m_lruHistory.end(), key); + m_cache.insert(std::make_pair(key, std::make_pair(value, it))); + inserted = true; + } + else + { + inserted = false; + } + + return std::pair(it, inserted); + } + +private: + void pop() + { + typename LRUCache::iterator it = m_cache.find(m_lruHistory.front()); + m_cache.erase(it); + m_lruHistory.pop_front(); + } + + void updateHistory(const Key& key) + { + if (m_lruHistory.size() > 0) + { + m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, m_cache[key].second); + } + } + size_t m_capacity; + std::list m_lruHistory; + LRUCache m_cache; +}; + +// djb2 hash function for hashing char array in hipIpcMemHandle_t +unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle); + +// equality function required for unordered_map +auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r) +{ + return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0; +}; + +//typedef llvm::DenseMap SendCache; +//typedef llvm::DenseMap RecvCache; + +typedef NcclIpcHandleCache, std::equal_to, std::allocator< std::pair>> NcclIpcHandleSendCache; +typedef NcclIpcHandleCache>> NcclIpcHandleRecvCache; + +#endif diff --git a/src/clique/HandleShm.cc b/src/clique/HandleShm.cc new file mode 100644 index 0000000000..27f44484ee --- /dev/null +++ b/src/clique/HandleShm.cc @@ -0,0 +1,67 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include + +#include "HandleShm.h" +#include "CliqueShmNames.h" +#include "core.h" +#include "Hash.h" +#include "shm.h" + +NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix) : + ShmObject(numRanks * numHandlesPerRank * capacity * sizeof(hipIpcMemHandle_t), + CliqueShmNames["IpcHandles"] + suffix, + rank, + numRanks, + projid), + m_numHandlesPerRank(numHandlesPerRank), + m_numHandlesPerOpCount(numRanks * numHandlesPerRank) + { + } + +NcclIpcHandleShm::NcclIpcHandleShm() +{ +} + +NcclIpcHandleShm::~NcclIpcHandleShm() +{ +} + +ncclResult_t NcclIpcHandleShm::Open() +{ + return ShmObject::Open(); +} + +ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector const& sendHandles) +{ + size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank); + memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(hipIpcMemHandle_t) * m_numHandlesPerRank); + return ncclSuccess; +} + +ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector& recvHandles) +{ + size_t idx = opCount * m_numHandlesPerOpCount; + memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(hipIpcMemHandle_t)); + return ncclSuccess; +} diff --git a/src/clique/HandleShm.h b/src/clique/HandleShm.h new file mode 100644 index 0000000000..c5b3d41f1c --- /dev/null +++ b/src/clique/HandleShm.h @@ -0,0 +1,53 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_IPC_HANDLE_SHM_H_ +#define NCCL_IPC_HANDLE_SHM_H_ + +#include +#include +#include + +#include "nccl.h" +#include "ShmObject.h" + +class NcclIpcHandleShm : public ShmObject +{ +public: + NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix); + + NcclIpcHandleShm(); + + ~NcclIpcHandleShm(); + + ncclResult_t Open(); + + ncclResult_t WriteHandles(uint64_t opCount, std::vector const& sendHandles); + + ncclResult_t ReadHandles(uint64_t opCount, std::vector& recvHandles); + +private: + int m_numHandlesPerRank; + int m_numHandlesPerOpCount; +}; + +#endif diff --git a/src/clique/Hash.cc b/src/clique/Hash.cc new file mode 100644 index 0000000000..249c66b329 --- /dev/null +++ b/src/clique/Hash.cc @@ -0,0 +1,34 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "Hash.h" + +unsigned long djb2Hash(const char* data) +{ + unsigned long hash = 5381; + int c; + + while ((c = *(data)++)) + hash = ((hash << 5) + hash) + c; /* hash * 33 + c */ + + return hash; +} diff --git a/src/clique/Hash.h b/src/clique/Hash.h new file mode 100644 index 0000000000..e6cbbaa569 --- /dev/null +++ b/src/clique/Hash.h @@ -0,0 +1,28 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_HASH_H_ +#define NCCL_HASH_H_ + +unsigned long djb2Hash(const char* data); + +#endif diff --git a/src/clique/MsgQueue.cc b/src/clique/MsgQueue.cc new file mode 100644 index 0000000000..716c449e8f --- /dev/null +++ b/src/clique/MsgQueue.cc @@ -0,0 +1,72 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "MsgQueue.h" + +#include +#include + +#define MSG_QUEUE_PERM 0666 + +ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid) +{ + key_t key; + SYSCHECKVAL(ftok(name.c_str(), projid), "ftok", key); + int flag = (exclusive == true ? IPC_CREAT | IPC_EXCL : IPC_CREAT); + + msgid = msgget(key, MSG_QUEUE_PERM | flag); + // Check if we're trying to create message queue and it already exists; if so, delete existing queue + if (msgid == -1 && exclusive == true && errno == EEXIST) + { + NCCLCHECK(MsgQueueClose(name, projid)); + SYSCHECKVAL(msgget(key, MSG_QUEUE_PERM | flag), "msgget", msgid); + } + else if (msgid == -1) + { + WARN("Call to MsgQueueGetId failed : %s", strerror(errno)); + return ncclSystemError; + } + return ncclSuccess; +} + +ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg) +{ + SYSCHECK(msgsnd(msgid, msgp, msgsz, msgflg), "msgsnd"); + return ncclSuccess; +} + +ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait) +{ + int msgflg = (wait == false ? IPC_NOWAIT : 0); + SYSCHECK(msgrcv(msgid, msgp, msgsz, msgtyp, msgflg), "msgrcv"); + return ncclSuccess; +} + +ncclResult_t MsgQueueClose(std::string name, int projid) +{ + key_t key; + int msgid; + key = ftok(name.c_str(), projid); + SYSCHECKVAL(msgget(key, IPC_CREAT), "msgget", msgid); + SYSCHECK(msgctl(msgid, IPC_RMID, NULL), "msgctl"); + return ncclSuccess; +} diff --git a/src/clique/MsgQueue.h b/src/clique/MsgQueue.h new file mode 100644 index 0000000000..346208a6e8 --- /dev/null +++ b/src/clique/MsgQueue.h @@ -0,0 +1,42 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef RCCL_MSG_QUEUE_HPP_ +#define RCCL_MSG_QUEUE_HPP_ + +#include + +#include "nccl.h" +#include "core.h" + +struct MsgBuffer +{ + long msg_type; + char msg_text[1]; +}; + +ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid); +ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg); +ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait); +ncclResult_t MsgQueueClose(std::string name, int projid); + +#endif diff --git a/src/clique/SharedMemHelper.h b/src/clique/SharedMemHelper.h new file mode 100644 index 0000000000..6fc269e7fc --- /dev/null +++ b/src/clique/SharedMemHelper.h @@ -0,0 +1,43 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef SHAREDMEMHELPER_H +#define SHAREDMEMHELPER_H + + +class SharedMemHelper +{ +public: + SharedMemHelper(int rank, int numRanks, int numEntries); + + ncclStatus_t Init(std::string const& baseFilename); + + ncclStatus_t + + +protected: + bool m_initialized; + int m_rank; + int m_numRanks; +}; + +#endif diff --git a/src/clique/ShmObject.cc b/src/clique/ShmObject.cc new file mode 100644 index 0000000000..353779e41d --- /dev/null +++ b/src/clique/ShmObject.cc @@ -0,0 +1,45 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include "ShmObject.h" +#include + +// Template specializations for sem_t objects which require additional initialization +template<> +ncclResult_t ShmObject::Close() +{ + size_t numMutexes = m_shmSize / sizeof(sem_t); + + for (size_t i = 0; i < numMutexes; i++) + { + sem_destroy(static_cast(&m_shmPtr[i])); + } + + int retVal = shm_unlink(m_shmName.c_str()); + if (retVal == -1 && errno != ENOENT) + { + WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno)); + return ncclSystemError; + } + + return ncclSuccess; +} diff --git a/src/clique/ShmObject.h b/src/clique/ShmObject.h new file mode 100644 index 0000000000..9262686158 --- /dev/null +++ b/src/clique/ShmObject.h @@ -0,0 +1,203 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef NCCL_SHM_OBJECT_H_ +#define NCCL_SHM_OBJECT_H_ + +#include +#include +#include +#include +#include +#include + +#include "MsgQueue.h" +#include "nccl.h" +#include "core.h" +#include "shm.h" + +// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared +// memory object at the same time. + +static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) { + *fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR); + if (*fd == -1) return ncclSystemError; + if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate"); + SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap"); + close(*fd); + *fd = -1; + if (create) memset(*ptr, 0, shmsize); + return ncclSuccess; +} + +template +class ShmObject +{ +public: +ShmObject(size_t size, std::string fileName, int rank, int numRanks, int projid) : + m_shmSize(size), + m_shmName(fileName), + m_rank(rank), + m_numRanks(numRanks), + m_projid(projid), + m_alloc(false), + m_shmPtr(nullptr) {} + + ShmObject() {} + + ~ShmObject() {} + + ncclResult_t Open(); + + ncclResult_t Close() + { + if (m_alloc) + { + if (m_rank == 0) + { + std::string tmpFileName = "/tmp/" + m_shmName; + remove(tmpFileName.c_str()); + } + int retVal = shm_unlink(m_shmName.c_str()); + if (retVal == -1 && errno != ENOENT) + { + WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno)); + return ncclSystemError; + } + } + return ncclSuccess; + } + + T*& Get() + { + return m_shmPtr; + } +protected: + ncclResult_t BroadcastMessage(int msgid, bool pass) + { + MsgBuffer msg; + msg.msg_text[0] = (pass == 0 ? 'F': 'P'); + for (int rank = 0; rank < m_numRanks; rank++) + { + if (rank == m_rank) continue; + msg.msg_type = rank; + NCCLCHECK(MsgQueueSend(msgid, &msg, sizeof(msg), 0)); + } + return ncclSuccess; + } + + // tag for dispatch + template + struct OpenTag{}; + + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag tag); + ncclResult_t InitIfSemaphore(OpenTag tag); + + size_t m_shmSize; + std::string m_shmName; + int m_rank; + int m_numRanks; + int m_projid; + bool m_alloc; + T* m_shmPtr; +}; + +template +ncclResult_t ShmObject::Open() +{ + if (m_alloc == false) + { + int shmFd; + int protection = PROT_READ | PROT_WRITE; + int visibility = MAP_SHARED; + + int msgid; + std::string tmpFileName = "/tmp/" + m_shmName; + NCCLCHECK(MsgQueueGetId(tmpFileName, m_projid, false, msgid)); + if (m_rank == 0) + { + ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1); + ncclResult_t resultSemInit = InitIfSemaphore(OpenTag{}); + if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess)) + { + NCCLCHECK(BroadcastMessage(msgid, false)); + WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno)); + return ncclSystemError; + } + NCCLCHECK(BroadcastMessage(msgid, true)); + } + else + { + MsgBuffer msg; + NCCLCHECK(MsgQueueRecv(msgid, &msg, sizeof(msg), m_rank, true)); + if (msg.msg_text[0] == 'P') + { + NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0)); + } + else + { + WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno)); + return ncclSystemError; + } + } + m_alloc = true; + } + else + { + WARN("Cannot allocate ShmObject twice.\n"); + return ncclInvalidUsage; + } + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + return ncclSuccess; +} + +template +ncclResult_t ShmObject::InitIfSemaphore(OpenTag tag) +{ + size_t numMutexes = m_shmSize / sizeof(sem_t); + + for (size_t i = 0; i < numMutexes; i++) + { + SYSCHECK(sem_init(static_cast(&m_shmPtr[i]), 1, 1), "sem_init"); + } + return ncclSuccess; +} +#endif diff --git a/src/collectives/device/common_kernel.h b/src/collectives/device/common_kernel.h index c5092cf52a..efa0b7a1aa 100644 --- a/src/collectives/device/common_kernel.h +++ b/src/collectives/device/common_kernel.h @@ -350,9 +350,14 @@ __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(int32_t); } __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(Pack128); } #endif +#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) +// Multiply UNROLL by 2 if single source/single destination +#define AUTOUNROLL (UNROLL*((MINSRCS==1 && MINDSTS==1) ? 2 : 1)) +#else // Try to limit consecutive load/stores to 8. // Use UNROLL 8 when we have a single source and a single destination, 4 otherwise #define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS))) +#endif template __device__ void ReduceOrCopyMulti(const int tid, const int nthreads, diff --git a/src/enqueue.cc b/src/enqueue.cc index 7f52e089ce..1d745cf490 100644 --- a/src/enqueue.cc +++ b/src/enqueue.cc @@ -617,6 +617,31 @@ end: info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count, info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream); + // [RCCL] Alternative launch path for clique-based kernels (if supported and enabled) + { + if (info->comm->cliqueManager->IsSupported(info->coll, + info->count, + info->datatype, + info->op)) + { + // Declare the input / output pointers being used (to exchange via IPC with other ranks) + NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount, + info->sendbuff, + info->recvbuff)); + + // Queue the clique-based kernel + NCCLCHECK(info->comm->cliqueManager->QueueKernel(info->comm->opCount, + info->coll, + info->count, + info->datatype, + info->op, + info->root, + info->stream)); + return ncclSuccess; + } + } + // [/RCCL] + NCCLCHECK(ncclSaveKernel(info)); NCCLCHECK(ncclBarrierEnqueue(info->comm)); NCCLCHECK(ncclBarrierEnqueueWait(info->comm)); diff --git a/src/include/bootstrap.h b/src/include/bootstrap.h index a7d6be965e..aa671b7066 100644 --- a/src/include/bootstrap.h +++ b/src/include/bootstrap.h @@ -12,7 +12,7 @@ ncclResult_t bootstrapNetInit(); ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv); ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out); -ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState); +ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState, int* rootPid); // [RCCL] Adding rootPid ncclResult_t bootstrapAllGather(void* commState, void* allData, int size); ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size); ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size); diff --git a/src/include/comm.h b/src/include/comm.h index 6df4f750d3..d527e83801 100644 --- a/src/include/comm.h +++ b/src/include/comm.h @@ -10,6 +10,9 @@ #include "transport.h" #include "p2p.h" +// [RCCL] +#include "clique/CliqueManager.h" +// [/RCCL] #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) #else @@ -143,8 +146,12 @@ struct ncclComm { //list of async p2p operation queued in a group semantics struct ncclP2Plist p2plist; - // RCCL AllToAll/Scatter/Gather API - bool alltoallDisable; + // [RCCL] + bool alltoallDisable; // RCCL AllToAll/Scatter/Gather API + CliqueManager* cliqueManager; // CliqueManager handles pointer collection / distribution for clique-based kernels + int rootPid; // Process ID of root + // [/RCCL] + }; #endif diff --git a/src/init.cc b/src/init.cc index 65cf83342b..3b30c77c70 100644 --- a/src/init.cc +++ b/src/init.cc @@ -28,6 +28,10 @@ #include #include "graph/topo.h" +// [RCCL] +#include "clique/CliqueManager.h" +// [/RCCL] + #define STR2(v) #v #define STR(v) STR2(v) @@ -678,7 +682,10 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm int nranks = comm->nRanks; uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES); TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks); - NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap)); + // [RCCL] Collect the PID of the root + int rootPid; + NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap, &rootPid)); + // [/RCCL] // AllGather1 - begin struct { @@ -1021,6 +1028,40 @@ affinity_restore: } NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, allGather1Data[intraRank0].comm)); + { // [RCCL] Check if clique-based kernels can be enabled and initialize CliqueManager if so + CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED; + if (comm->localRanks == comm->nRanks) + { + // Check that all the GPUs have peer access to one another + bool hasPeerAccess = true; + for (int i = 0; i < nranks && hasPeerAccess; i++) + { + int cudaDev1 = allGather1Data[i].peerInfo.cudaDev; + for (int j = 0; j < nranks; j++) + { + if (i == j) continue; + int cudaDev2 = allGather1Data[j].peerInfo.cudaDev; + int p2p; + if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p) + { + hasPeerAccess = false; + break; + } + } + } + + if (hasPeerAccess) + { + if (intraRanks == nranks) + cliqueMode = CliqueManager::CLIQUE_SINGLE_PROCESS; + else + cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE; + } + } + comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode); + NCCLCHECK(comm->cliqueManager->Init(commId, rootPid)); + } // [/RCCL] + // Done with AllGather1 data free(allGather1Data); @@ -1144,6 +1185,10 @@ ncclResult_t ncclCommDestroy(ncclComm_t comm) { return ncclInvalidArgument; } + // [RCCL] Delete CliqueManager if it exists + if (comm->cliqueManager) delete comm->cliqueManager; + // [/RCCL] + return commDestroy(comm); } diff --git a/tools/HelloRccl/HelloRccl.cpp b/tools/HelloRccl/HelloRccl.cpp new file mode 100644 index 0000000000..22b6e463e8 --- /dev/null +++ b/tools/HelloRccl/HelloRccl.cpp @@ -0,0 +1,192 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "HelloRccl.hpp" + + +void Usage(char *argv0); + +int main(int argc, char **argv) +{ + // This example program uses NCCL_COMM_ID to perform bootstrapping + // to sidestep the need to communicate the ncclUniqueId (e.g. via MPI) + if (argc < 3 || getenv("NCCL_COMM_ID") == NULL) + { + Usage(argv[0]); + return 1; + } + // Collect command-line arguments + int nranks = atoi(argv[1]); + int rank = atoi(argv[2]); + int deviceId = atoi(argv[3]); + + // Allocate GPU resources + hipStream_t stream; + hipEvent_t startEvent, stopEvent; + HIP_CALL(hipSetDevice(deviceId)); + HIP_CALL(hipStreamCreate(&stream)); + HIP_CALL(hipEventCreate(&startEvent)); + HIP_CALL(hipEventCreate(&stopEvent)); + + // Create communicator + ncclUniqueId commId; + NCCL_CALL(ncclGetUniqueId(&commId)); + + // Initialize communicator + ncclComm_t comm; + NCCL_CALL(ncclCommInitRank(&comm, nranks, commId, rank)); + + // Loop over powers of 2 + int minPow = 10; + int maxPow = 28; + + if (rank == 0) + { + printf("AllReduce Performance (sum of floats):\n"); + printf("%10s %10s %10s\n", "Bytes", "CpuTime(ms)", "GpuTime(ms)"); + } + + for (int power = minPow; power <= maxPow; power++) + { + int N = 1 << power; + + // Allocate GPU memory + float *iputGpu, *oputGpu; + HIP_CALL(hipMalloc((void **)&iputGpu, N * sizeof(float))); + HIP_CALL(hipMalloc((void **)&oputGpu, N * sizeof(float))); + + // Allocate CPU memory + float *iputCpu = (float *)malloc(N * sizeof(float)); + float *oputCpu = (float *)malloc(N * sizeof(float)); + + // Fill CPU with a simple pattern + for (int i = 0; i < N; i++) + { + iputCpu[i] = 1.0f; + oputCpu[i] = 0.0f; + } + + // Copy the input from CPU memory to GPU memory + HIP_CALL(hipMemcpy(iputGpu, iputCpu, N * sizeof(float), hipMemcpyHostToDevice)); + + // Perform some untimed initial warmup iterations + int numWarmups = 3; + for (int iteration = 0; iteration < numWarmups; iteration++) + { + NCCL_CALL(ncclAllReduce(iputGpu, oputGpu, N, ncclFloat, ncclSum, comm, stream)); + } + HIP_CALL(hipStreamSynchronize(stream)); + + // Perform timed iterations + int numIterations = 10; + auto cpuStart = std::chrono::high_resolution_clock::now(); + HIP_CALL(hipEventRecord(startEvent, stream)); + for (int iteration = 0; iteration < numIterations; iteration++) + { + NCCL_CALL(ncclAllReduce(iputGpu, oputGpu, N, ncclFloat, ncclSum, comm, stream)); + } + HIP_CALL(hipEventRecord(stopEvent, stream)); + HIP_CALL(hipStreamSynchronize(stream)); + auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart; + double totalCpuTime = std::chrono::duration_cast>(cpuDelta).count(); + float totalGpuTime; + HIP_CALL(hipEventElapsedTime(&totalGpuTime, startEvent, stopEvent)); + + if (rank == 0) printf("%10lu %10.3f %10.3f\n", N * sizeof(float), (totalCpuTime / numIterations), (totalGpuTime / numIterations)); + + + // Validate results + HIP_CALL(hipMemcpy(oputCpu, oputGpu, N * sizeof(float), hipMemcpyDeviceToHost)); + bool isOK = true; + int expected = nranks; + for (int i = 0; i < N; i++) + { + isOK &= (oputCpu[i] == expected); + } + if (!isOK) + { + printf("[ERROR] Rank %d Incorrect results for N = %d\n", rank, N); + exit(1); + } + + // Release GPU resources + HIP_CALL(hipFree(oputGpu)); + HIP_CALL(hipFree(iputGpu)); + + free(iputCpu); + free(oputCpu); + } + + HIP_CALL(hipStreamDestroy(stream)); + HIP_CALL(hipEventDestroy(startEvent)); + HIP_CALL(hipEventDestroy(stopEvent)); + NCCL_CALL(ncclCommDestroy(comm)); + return 0; +} + +void Usage(char *argv0) +{ + printf("Usage: %s numRanks rank deviceId [N=8] [verbose=0]\n", argv0); + printf(" - NCCL_COMM_ID must be set in order to use this\n\n"); + printf(" - To use this process as the root process you may use any of the following:\n"); + + char hostname[256]; + gethostname(hostname, 256); + printf(" export NCCL_COMM_ID=%s:12345\n", hostname); + + // Loop over linked list of interfaces + struct ifaddrs *ifaddr; + getifaddrs(&ifaddr); + for (struct ifaddrs* ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next) + { + // Skip anything not based on IPv4 / IPv6 + int family = ifa->ifa_addr->sa_family; + if (family != AF_INET && family != AF_INET6) continue; + + // Skip iPv6 loopback interface + if (family == AF_INET6) + { + struct sockaddr_in6* sa = (struct sockaddr_in6*)(ifa->ifa_addr); + if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue; + } + + socklen_t saLen = (family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6)); + char host[NI_MAXHOST]; + char service[NI_MAXSERV]; + + getnameinfo(ifa->ifa_addr, saLen, host, NI_MAXHOST, service, NI_MAXSERV, + NI_NUMERICHOST|NI_NUMERICSERV); + + std::string result = std::string(host) + ":12345"; + printf(" export NCCL_COMM_ID=%s\n", result.c_str()); + } + freeifaddrs(ifaddr); +} diff --git a/tools/HelloRccl/HelloRccl.hpp b/tools/HelloRccl/HelloRccl.hpp new file mode 100644 index 0000000000..55d24ecd5d --- /dev/null +++ b/tools/HelloRccl/HelloRccl.hpp @@ -0,0 +1,49 @@ +/* +Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. +*/ + +#ifndef HELLORCCL_HPP +#define HELLORCCL_HPP +#include + +#define HIP_CALL(cmd) \ + do { \ + hipError_t error = (cmd); \ + if (error != hipSuccess) \ + { \ + std::cerr << "Encountered HIP error (" << hipGetErrorString(error) << ") at line " \ + << __LINE__ << " in file " << __FILE__ << "\n"; \ + exit(-1); \ + } \ + } while (0) + +#define NCCL_CALL(cmd) \ + do { \ + ncclResult_t error = (cmd); \ + if (error != ncclSuccess) \ + { \ + std::cerr << "Encountered NCCL error (" << ncclGetErrorString(error) << ") at line " \ + << __LINE__ << " in file " << __FILE__ << "\n"; \ + exit(-1); \ + } \ + } while (0) + +#endif diff --git a/tools/HelloRccl/Makefile b/tools/HelloRccl/Makefile new file mode 100644 index 0000000000..8729c17feb --- /dev/null +++ b/tools/HelloRccl/Makefile @@ -0,0 +1,21 @@ +# Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved. + +# Set to where RCCL is installed +RCCL_INSTALL=../../build/release + +HIP_PATH?= $(wildcard /opt/rocm/hip) +ifeq (,$(HIP_PATH)) +HIP_PATH=../../.. +endif +HIPCC=$(HIP_PATH)/bin/hipcc + +EXE=HelloRccl +CXXFLAGS = -std=c++11 -O3 -I../../src/include -I$(RCCL_INSTALL) -L$(RCCL_INSTALL) -lrccl + +all: $(EXE) + +$(EXE): $(EXE).cpp $(shell find -regex ".*\.\hpp") + $(HIPCC) $(CXXFLAGS) $< -o $@ + +clean: + rm -f *.o $(EXE) diff --git a/tools/HelloRccl/runTest.sh b/tools/HelloRccl/runTest.sh new file mode 100755 index 0000000000..b9b554c2b1 --- /dev/null +++ b/tools/HelloRccl/runTest.sh @@ -0,0 +1,16 @@ +#!/bin/bash +RCCL_INSTALL=../../build/release +EXE=$PWD/HelloRccl +LDPATH=$LD_LIBRARY_PATH:$RCCL_INSTALL + +echo "With clique-based kernels:" +RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 0 & +RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 1 & +RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 2 & +RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 3 + +echo "Without clique-based kernels:" +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 0 & +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 1 & +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 2 & +NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 3