Clique kernel support (#295)

* Adding experimental clique-based kernels (opt-in only)

Co-authored-by: Stanley Tsang <stanley.tsang@amd.com>
Co-authored-by: Gilbert Lee <gilbert.lee@amd.com>
Co-authored-by: Wenkai Du <43822138+wenkaidu@users.noreply.github.com>

[ROCm/rccl commit: 41bcfb8878]
This commit is contained in:
gilbertlee-amd
2020-11-10 15:44:10 -07:00
zatwierdzone przez GitHub
rodzic 273974393e
commit a7ef699687
35 zmienionych plików z 2204 dodań i 92 usunięć
+9
Wyświetl plik
@@ -3,8 +3,17 @@
Full documentation for RCCL is available at [https://rccl.readthedocs.io](https://rccl.readthedocs.io)
## [Unreleased]
### Added
- Experimental support for clique-based kernels (opt in with RCCL_ENABLE_CLIQUE=1)
- Clique-based kernels may offer better performance for smaller input sizes
- Clique-based kernels are currently only enabled for AllReduce under a certain byte limit (controlled via RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT)
### Optimizations
- Performance improvements for Rome-based systems
### Known issues
- Clique-based kernels are currently experimental and have not been fully tested on all topologies. By default, clique-based kernels are disabled if the detected topology is not supported (override with RCCL_FORCE_ENABLE_CLIQUE)
- Clique-based kernels may hang if there are differences between environment variables set across ranks.
- Clique-based kernels may fail if the input / output device pointers are not the base device pointers returned by hipMalloc.
## [RCCL-2.7.8 for ROCm 3.9.0]
### Added
+6
Wyświetl plik
@@ -126,6 +126,12 @@ set(CC_SOURCES
src/collectives/all_to_all_api.cc
src/collectives/all_to_allv_api.cc
src/channel.cc
src/clique/CliqueManager.cc # RCCL
src/clique/HandleCache.cc # RCCL
src/clique/HandleShm.cc # RCCL
src/clique/Hash.cc # RCCL
src/clique/MsgQueue.cc # RCCL
src/clique/ShmObject.cc # RCCL
src/misc/argcheck.cc
src/misc/nvmlwrap_stub.cc
src/misc/utils.cc
+33 -3
Wyświetl plik
@@ -12,6 +12,11 @@
#include "socket.h"
#include <unistd.h>
#include <sys/types.h>
// [RCCL]
#include "clique/CliqueManager.h"
#include "clique/CliqueShmNames.h"
#include "clique/Hash.h"
// [/RCCL]
struct bootstrapNetComm {
int fd;
@@ -163,7 +168,14 @@ static ncclResult_t setFilesLimit() {
return ncclSuccess;
}
static void *bootstrapRoot(void* listenComm) {
static void *bootstrapRoot(void* bootstrapRootStruct) { // [RCCL] Modified to include hash argument)
// [RCCL] Unpack bootstrapRootStruct
struct bootstrapRootStruct* rootStruct = (struct bootstrapRootStruct*) bootstrapRootStruct;
void* listenComm = rootStruct->listenComm;
unsigned long hash = rootStruct->hash;
int pid = getpid(); // sharing PID to other ranks for creating shared memory files for CliqueManager
// [/RCCL]
struct extInfo info;
ncclNetHandle_t *rankHandles = NULL;
ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange
@@ -205,12 +217,19 @@ static void *bootstrapRoot(void* listenComm) {
} while (c < nranks);
TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);
{ // [RCCL] Initialize message queues / shared memory files
NCCLCHECKGOTO(CliqueManager::BootstrapRootInit(pid, hash), res, out);
} // [/RCCL]
// Send the connect handle for the next rank in the AllGather ring
for (int r=0; r<nranks; ++r) {
int next = (r+1) % nranks;
void *tmpSendComm;
NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);
NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);
{ // [RCCL] Send the root pid for shared file naming
NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, &pid, sizeof(int)), res, out);
} // [/RCCL]
NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);
}
TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);
@@ -229,7 +248,14 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {
void* listenComm;
NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
pthread_t thread;
pthread_create(&thread, NULL, bootstrapRoot, listenComm);
// [RCCL] Use the ncclUniqueId to get a hash for bootstrap
struct bootstrapRootStruct* rootStruct = new bootstrapRootStruct;
rootStruct->hash = djb2Hash(id->internal);
rootStruct->listenComm = listenComm;
pthread_create(&thread, NULL, bootstrapRoot, (void *)rootStruct);
// [/RCCL]
return ncclSuccess;
}
@@ -267,9 +293,10 @@ struct extState {
int rank;
int nranks;
int dev;
int rootPid; // [RCCL] PID of root
};
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState, int* rootPid) { // [RCCL] Adding rootPid
ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;
struct extState* state;
@@ -314,6 +341,9 @@ ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commS
ncclNetHandle_t extHandleNext;
NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));
NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));
{ // [RCCL] Receive PID from root
NCCLCHECK(bootstrapNetRecv(tmpRecvComm, rootPid, sizeof(int)));
} // [/RCCL]
NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));
NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));
@@ -0,0 +1,75 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef ALLREDUCECLIQUEKERNEL_H
#define ALLREDUCECLIQUEKERNEL_H
#include "CliqueCommon.h"
#include "devcomm.h"
#include "reduce_kernel.h"
#include "common_kernel.h"
template <class FUNC, typename T, int NUM_RANKS>
__device__ void AllReduceCliqueSplitKernel(struct CollectiveArgs* args)
{
// Clique-specific kernel arguments
cliqueDevicePtrs_t* cliquePtrs = args->clique.ptrs; // Collection of all input/output pointers across ranks in clique
size_t const N = args->clique.count; // Total number of elements to reduce
int const nBlocks = args->clique.nChannels; // Total number of blocks assigned to this kernel (may be different than gridDim.x)
int const blockId = args->clique.bid; // 0-indexed blockIdx for this threadblock (may be different than blockIdx.x)
int const rank = args->comm->rank; // Current rank
// Each threadblock works independently of others on a subsection of the input
// First split evently across ranks, while maintaining multiples of blocksize
size_t const perRankN = RoundUp((N + NUM_RANKS - 1) / NUM_RANKS, blockDim.x);
size_t const perBlockN = RoundUp((perRankN + nBlocks - 1) / nBlocks, blockDim.x);
size_t const currBlockStart = min((rank * nBlocks + blockId) * perBlockN, N);
size_t const currBlockStop = min(currBlockStart + perBlockN, N);
size_t const blockN = currBlockStop - currBlockStart;
if (blockN > 0)
{
// Prepare input / output subarrays
T const** inputs = (T const**)cliquePtrs->inputs;
T** outputs = (T **)cliquePtrs->outputs;
T const* srcs[NUM_RANKS];
T* dsts[NUM_RANKS];
#pragma unroll
for (int r = 0; r < NUM_RANKS; r++)
{
srcs[r] = inputs[r] + currBlockStart;
dsts[r] = outputs[r] + currBlockStart;
}
// Perform the reduction
#define ALL_REDUCE_CLIQUE_UNROLL 2
ReduceOrCopyMulti<ALL_REDUCE_CLIQUE_UNROLL, FUNC, T, NUM_RANKS, NUM_RANKS, NUM_RANKS, NUM_RANKS>(
threadIdx.x, blockDim.x, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN);
}
// Even if there was nothing for this GPU to do, it must participate in a barrier
// because other GPUs may be modifying this GPUs output buffer still
if (blockId == 0) WaitForBarrier<NUM_RANKS>(cliquePtrs->barrier);
}
#endif
@@ -0,0 +1,93 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef CLIQUE_COMMON_H
#define CLIQUE_COMMON_H
#include "nccl.h"
#include <cstdint>
#define MIN_CLIQUE_SIZE 2
#define MAX_CLIQUE_SIZE 8
typedef struct
{
int* globalCount; // Shared across GPUs
int* globalSense; // Shared across GPUs
int* localSense; // Local to this GPU
} gpuBarrier_t;
typedef struct
{
// Input/output pointers from participating ranks
void const* inputs[MAX_CLIQUE_SIZE];
void* outputs[MAX_CLIQUE_SIZE];
// Barrier variable
gpuBarrier_t barrier;
} cliqueDevicePtrs_t;
// Helper macro to launch an appropriate kernel by converting rank to a template argument
#define LAUNCH_CLIQUE_KERNEL(kernelname, FUNC, T, args) \
{ \
switch (args->comm->nRanks){ \
case 2: kernelname<FUNC, T, 2>(args); break; \
case 3: kernelname<FUNC, T, 3>(args); break; \
case 4: kernelname<FUNC, T, 4>(args); break; \
case 5: kernelname<FUNC, T, 5>(args); break; \
case 6: kernelname<FUNC, T, 6>(args); break; \
case 7: kernelname<FUNC, T, 7>(args); break; \
case 8: kernelname<FUNC, T, 8>(args); break; \
} \
}
// Multi-GPU (on same node) barrier. One thread per grid per GPU updates barrier / waits
template <int NUM_RANKS>
__forceinline__ __device__ void WaitForBarrier(gpuBarrier_t const& barrier)
{
if (threadIdx.x == 0)
{
// Sense inversion barrier
*barrier.localSense = 1 - *barrier.localSense;
int localSense = *barrier.localSense;
int val = __atomic_add_fetch(barrier.globalCount, 1, __ATOMIC_SEQ_CST);
if (val == NUM_RANKS)
{
// Last arrival resets barrier
__atomic_store_n(barrier.globalCount, 0, __ATOMIC_SEQ_CST);
__atomic_store_n(barrier.globalSense, localSense, __ATOMIC_SEQ_CST);
}
else
{
// Wait for all ranks to reach barrier
while (__atomic_load_n(barrier.globalSense, __ATOMIC_SEQ_CST) != localSense);
}
}
}
__forceinline__ __host__ __device__ size_t RoundUp(size_t X, size_t Y)
{
return (X+Y-1)/Y * Y;
}
#endif
@@ -0,0 +1,519 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "CliqueManager.h"
#include "CliqueShmNames.h"
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "Hash.h"
#include "AllReduceCliqueKernel.h"
#include <hip/hip_runtime.h>
#include <hsa/hsa_ext_amd.h>
#include <stdio.h>
#include <stdlib.h>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {};
int* CliqueManager::m_staticGpuBarrierMem = NULL;
// Define some environment variables that affect clique-based kernels
RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels
RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 2097152); // Max number of bytes to use clique-based kernels for all reduce
RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 4); // Number of channels to use for all-reduce
CliqueManager::CliqueManager(int const rank,
int const numRanks,
cliqueMode_t const cliqueMode) :
m_rank(rank),
m_numRanks(numRanks),
m_cliqueMode(cliqueMode),
m_init(false),
m_pinnedCliquePtrs(NULL),
m_fineGrainBarrierMem(NULL)
{
}
CliqueManager::~CliqueManager()
{
if (m_init)
{
CleanUp();
}
}
void CliqueManager::CleanUp()
{
if (m_cliqueMode == CLIQUE_DISABLED) return;
// Free variables that are shared between SINGLE_PROCESS / SINGLE_NODE
if (m_pinnedCliquePtrs) hipHostFree(m_pinnedCliquePtrs);
if (m_gpuBarrierLocalSense) hipFree(m_gpuBarrierLocalSense);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Release caches
if (m_ipcHandleSendCache) delete m_ipcHandleSendCache;
if (m_ipcHandleSendCache) delete m_ipcHandleRecvCache;
// Close shared memory
m_shmHandles.Close();
m_sharedCpuMemory.Close();
m_sharedIpcHandle.Close();
if (m_fineGrainBarrierMem)
{
if (m_rank == 0)
hipFree(m_fineGrainBarrierMem);
else
hipIpcCloseMemHandle(m_fineGrainBarrierMem);
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
if (m_rank == 0 && m_staticGpuBarrierMem)
hipFree(m_staticGpuBarrierMem);
}
m_init = false;
}
ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
{
ncclResult_t res;
if (m_init) return ncclSuccess;
m_init = true;
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
// Check parameters
if (m_rank < 0 || m_rank >= m_numRanks)
{
WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks);
return ncclInvalidUsage;
}
if (commId == NULL)
{
WARN("CommId should not be empty");
return ncclInvalidUsage;
}
// For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var
if (!rcclParamEnableClique())
{
INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)");
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
// Allocate pinned CPU memory for holding clique pointers, which kernels will have access to
if (hipHostMalloc(&m_pinnedCliquePtrs, sizeof(cliqueDevicePtrs_t) * NCCL_MAX_OPS) != hipSuccess)
{
WARN("Unable to allocated pinned host memory for clique pointers. Disabling clique-based kernels");
m_cliqueMode = CLIQUE_DISABLED;
m_init = true;
return ncclSuccess;
}
unsigned long hash = djb2Hash(commId->internal);
std::string shmSuffix = std::to_string(hash) + "_" + std::to_string(suffix);
// Allocate sense barrier variable on local GPU
NCCLCHECKGOTO(ncclCudaCalloc(&m_gpuBarrierLocalSense, NCCL_MAX_OPS * sizeof(int)), res, dropback);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Initialize shared memory file for IPC handles (based on commId hash)
m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix);
NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback);
// Initialize IPC caches
m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS);
m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS,
100,
hipIpcMemHandleHash,
hipIpcMemHandleEqual);
// Initialize shared object for GPU barrier IPC handle
m_sharedIpcHandle = ShmObject<hipIpcMemHandle_t>(std::max(4096LU, sizeof(hipIpcMemHandle_t)),
CliqueShmNames["Barriers"] + shmSuffix,
m_rank,
m_numRanks,
hash);
NCCLCHECKGOTO(m_sharedIpcHandle.Open(), res, dropback);
if (m_rank == 0)
{
hipIpcMemHandle_t handle;
// Allocate fine-grained device memory on rank 0 and get IPC handle for it
// Re-usable barrier consists of (globalCount / globalSense) pair of integers
NCCLCHECKGOTO(ncclCudaCalloc(&m_fineGrainBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), true), res, dropback);
if (hipIpcGetMemHandle(&handle, m_fineGrainBarrierMem) != hipSuccess)
{
WARN("Unable to get IPC handle for barrier memory");
goto dropback;
}
// Write IPC handle to shared memory for other ranks to receive
*m_sharedIpcHandle.Get() = handle;
// Set up global count/sense for first rank
m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0];
m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS];
}
// Initialize shared CPU memory to be used for barrier variables
m_sharedCpuMemory = ShmObject<int32_t>(2 * sizeof(int32_t),
CliqueShmNames["SharedCounters"] + shmSuffix,
m_rank,
m_numRanks,
hash);
NCCLCHECKGOTO(m_sharedCpuMemory.Open(), res, dropback);
// Split up the shared CPU memory for barrier counters / global sense
m_cpuBarrierGlobalCount = &m_sharedCpuMemory.Get()[0];
m_cpuBarrierGlobalSense = &m_sharedCpuMemory.Get()[1];
// Initialize CPU barriers
if (m_rank == 0)
{
*m_cpuBarrierGlobalCount = 0;
*m_cpuBarrierGlobalSense = 0;
}
m_cpuBarrierLocalSense = 0;
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// First rank prepares fine-grained memory shared across ranks used for the two barrier variables
if (m_rank == 0)
{
NCCLCHECKGOTO(ncclCudaCalloc(&m_staticGpuBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), true), res, dropback);
// Prepare all barriers
for (int opIndex = 0; opIndex < NCCL_MAX_OPS; opIndex++)
{
m_staticCliquePtrs[opIndex].barrier.globalCount = &m_staticGpuBarrierMem[opIndex];
m_staticCliquePtrs[opIndex].barrier.globalSense = &m_staticGpuBarrierMem[opIndex + NCCL_MAX_OPS];;
}
}
}
m_init = true;
INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d)", m_cliqueMode);
return ncclSuccess;
dropback:
// NOTE: This currently assumes that all ranks will fail the same way
// Additional support is required to handle cases when some processes succeed while others fail
WARN("Unable to initialize shared memory. Disabling clique-based kernels");
CleanUp();
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
bool CliqueManager::IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const
{
if (m_cliqueMode == CLIQUE_DISABLED) return false;
// Filter based on total input size for each collective type
size_t totalBytes = count * ncclTypeSize(datatype);
if (coll == ncclCollAllReduce && (totalBytes <= rcclParamAllReduceCliqueByteLimit())) return true;
return false;
}
ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
int const opIndex = opCount % NCCL_MAX_OPS;
// Add opIndex to queue of in-progress collectives
m_inProgress.push(opIndex);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Get fine-grained device memory if not already done
if (m_fineGrainBarrierMem == NULL)
{
hipIpcMemHandle_t handle = *m_sharedIpcHandle.Get();
CUDACHECK(hipIpcOpenMemHandle((void**)&m_fineGrainBarrierMem, handle, hipIpcMemLazyEnablePeerAccess));
// Prepare global count/sense barrier variables used the ipc-shared gpu device memory
m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0];
m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS];
}
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(NUM_HANDLES_PER_RANK);
// Get IPC handles for input/output pointers from cache
NCCLCHECK(CheckCacheForPtr(const_cast<void*>(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0]));
NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1]));
// Prepare barrier pointers (done after the IpcOpenMemory)
m_pinnedCliquePtrs[opIndex].barrier.globalCount = &m_gpuBarrierGlobalCount[opIndex];
m_pinnedCliquePtrs[opIndex].barrier.globalSense = &m_gpuBarrierGlobalSense[opIndex];
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
// Write IPC handles to shared memory for given rank / opCount
NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles));
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Store this rank's input/output pointers into static member
m_staticCliquePtrs[opIndex].inputs[m_rank] = inputPtr;
m_staticCliquePtrs[opIndex].outputs[m_rank] = outputPtr;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const totalNumChannels,
uint8_t* numChannelstoUse)
{
size_t const totalBytes = count * ncclTypeSize(datatype);
*numChannelstoUse = 1;
if (coll == ncclCollAllReduce) {
*numChannelstoUse = std::min((int)rcclParamAllReduceNumChannels(), totalNumChannels);
}
return ncclSuccess;
}
ncclResult_t CliqueManager::SetCliqueCollectiveArgs(CollectiveArgs* args)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Prepare clique argments (NOTE: clique pointers are not ready yet)
int opIndex = args->opCount % NCCL_MAX_OPS;
args->clique.ptrs = &m_pinnedCliquePtrs[opIndex];
// Determine number of channels to use for this collective
args->clique.nChannels = rcclParamAllReduceNumChannels();
return ncclSuccess;
}
ncclResult_t CliqueManager::WaitForPointers()
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Do nothing if there are no outstanding clique-kernels
if (m_inProgress.empty()) return ncclSuccess;
// Copy clique device pointers to pinned device memory
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Wait for all ranks to arrive
WaitForBarrier();
int numHandles = m_numRanks * NUM_HANDLES_PER_RANK;
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(numHandles);
while (!m_inProgress.empty())
{
int const opIndex = m_inProgress.front();
m_inProgress.pop();
// Collect the ready handles from shared memory and convert them to device pointers
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
for (int i = 0; i < m_numRanks; i++)
{
void *input;
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
m_ipcHandleRecvCache, &input));
m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast<const void *>(input);
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i]));
}
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
while (!m_inProgress.empty())
{
int const opIndex = m_inProgress.front();
m_inProgress.pop();
// Copy from static memory to pinned host memory and set local sense
memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t));
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
}
}
return ncclSuccess;
}
std::string HandleToString(hipIpcMemHandle_t handle)
{
char mapping[17] = "0123456789ABCDEF";
std::string result;
for (int i = 0; i < 4; i++)
{
unsigned char val = (unsigned char)handle.reserved[i];
result += mapping[val / 16];
result += mapping[val % 16];
}
return result;
}
ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
std::pair<hipIpcMemHandle_t, size_t>* handlePair)
{
// Get the base address for this device allocation
hsa_status_t status;
hsa_amd_pointer_info_t info;
info.size = sizeof(hsa_amd_pointer_info_t);
status = hsa_amd_pointer_info(devPtr, &info, NULL, NULL, NULL);
if (status != HSA_STATUS_SUCCESS) {
WARN("Uanble to get pointer information for %p", devPtr);
return ncclInvalidArgument;
}
// Compute the offset between the device addres and the base address
uint64_t baseAddr = (uint64_t)info.agentBaseAddress;
uint64_t realAddr = (uint64_t)devPtr;
handlePair->second = realAddr - baseAddr;
// IPC handles are only supported for base address pointers
NcclIpcHandleSendCache::iterator it = cache->find(baseAddr);
if (it == cache->end())
{
CUDACHECK(hipIpcGetMemHandle(&handlePair->first, (void*)baseAddr));
cache->insert(baseAddr, handlePair->first);
}
else
{
handlePair->first = (it->second).first;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForHandle(std::pair<hipIpcMemHandle_t, size_t> const& handlePair,
NcclIpcHandleRecvCache* cache,
void** ptr)
{
NcclIpcHandleRecvCache::iterator it = cache->find(handlePair.first);
// Get base address pointer from cache if it exists
void* baseAddr;
if (it == cache->end())
{
CUDACHECK(hipIpcOpenMemHandle(&baseAddr, handlePair.first, hipIpcMemLazyEnablePeerAccess));
cache->insert(handlePair.first, baseAddr);
}
else
{
baseAddr = (it->second).first;
}
// Modify base address pointer with offset
uint64_t realAddr = (uint64_t)baseAddr + handlePair.second;
*ptr = (void*)realAddr;
return ncclSuccess;
}
void CliqueManager::WaitForBarrier()
{
// Sense inversion barrier
m_cpuBarrierLocalSense = 1 - m_cpuBarrierLocalSense;
if (__sync_add_and_fetch(m_cpuBarrierGlobalCount, 1) == m_numRanks)
{
// Reset the barrier
STORE(m_cpuBarrierGlobalCount, 0);
STORE(m_cpuBarrierGlobalSense, m_cpuBarrierLocalSense);
} else {
while (LOAD(m_cpuBarrierGlobalSense) != m_cpuBarrierLocalSense);
}
}
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
{
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
int msgid, fd;
std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid);
SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd);
NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid));
SYSCHECK(close(fd), "close");
}
std::string shmDir = "/dev/shm/";
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
struct stat fileStatus;
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
std::string shmFullPath = shmDir + shmFileName;
// Check if shm file already exists; if so, unlink it
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
{
NCCLCHECK(shmUnlink(shmFileName.c_str()));
}
}
return ncclSuccess;
}
@@ -0,0 +1,128 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_CLIQUE_MANAGER_HPP_
#define RCCL_CLIQUE_MANAGER_HPP_
#include <semaphore.h>
#include <mutex>
#include <queue>
#include "nccl.h"
#include "devcomm.h"
#include "CliqueCommon.h"
#include "HandleCache.h"
#include "HandleShm.h"
#define NUM_HANDLES_PER_RANK 2
class CliqueManager
{
public:
typedef enum
{
CLIQUE_DISABLED = 0,
CLIQUE_SINGLE_PROCESS = 1,
CLIQUE_SINGLE_NODE = 2
} cliqueMode_t;
CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode);
~CliqueManager();
void CleanUp();
ncclResult_t Init(ncclUniqueId const* commId, int suffix);
// Returns true if the collective is supported via a clique-based kernel
bool IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const;
// Provide the pointers to be exchanged across the clique for the given rank / opCount
ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr);
// Determine the number of channels / CUs to use for this call
ncclResult_t GetNumChannelsToUse(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const totalNumChannels,
uint8_t* numChannelstoUse);
// Set pointers for where clique-related arguments will be found
// This sets pointers to device-accessible memory where the arguments will eventually reside
ncclResult_t SetCliqueCollectiveArgs(CollectiveArgs* args);
// Blocking call that only returns after all out-standing clique pointers are ready
ncclResult_t WaitForPointers();
// Prepares shared memory files upon initialization
static ncclResult_t BootstrapRootInit(int pid, unsigned long hash);
protected:
ncclResult_t CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
std::pair<hipIpcMemHandle_t, size_t>* handlePair);
ncclResult_t CheckCacheForHandle(std::pair<hipIpcMemHandle_t, size_t> const& handlePair,
NcclIpcHandleRecvCache* cache,
void** ptr);
// Race-condition helper functions
void WaitForBarrier();
int m_rank; // Associated rank
int m_numRanks; // Total number of ranks
cliqueMode_t m_cliqueMode; // Clique mode (off/single process/single node)
bool m_init; // Whether CliqueManager has been initialized
cliqueDevicePtrs_t* m_pinnedCliquePtrs; // Pinned-host-memory (device accessible) containing device pointers
int* m_gpuBarrierGlobalCount; // Part of GPU barrier (count variable shared across ranks)
int* m_gpuBarrierGlobalSense; // Part of GPU barrier (reset variable shared across ranks)
int* m_gpuBarrierLocalSense; // Part of GPU barrier (reset variable local to this rank)
std::queue<int> m_inProgress; // Queue of clique-based collectives waiting for pointers
// IPC-related (CLIQUE_SINGLE_NODE)
NcclIpcHandleShm m_shmHandles; // Used to exchange IPC handles between ranks
NcclIpcHandleSendCache* m_ipcHandleSendCache; // Caches pointers to IPC handles (to send to other processes)
NcclIpcHandleRecvCache* m_ipcHandleRecvCache; // Caches IPC handles to pointers (received from other processes)
ShmObject<int32_t> m_sharedCpuMemory; // Used to pass shared memory used for CPU barrier
ShmObject<hipIpcMemHandle_t> m_sharedIpcHandle; // Used to pass fine-grained device memory buffer IPC handle
int* m_fineGrainBarrierMem; // Fine-grained GPU memory barrier (allocated only on 1st rank, shared on others)
int* m_cpuBarrierGlobalCount; // Part of CPU barrier (count variable shared across ranks)
int* m_cpuBarrierGlobalSense; // Part of CPU barrier (reset variable shared across ranks)
int m_cpuBarrierLocalSense; // Part of CPU barrier (reset variable local to this rank)
// Single-process (CLIQUE_SINGLE_PROCESS)
static cliqueDevicePtrs_t m_staticCliquePtrs[NCCL_MAX_OPS]; // Use shared static memory to exchange pointer info
static int* m_staticGpuBarrierMem; // Static storage backing for fine-grained gpu barrier
};
// For use in bootstrapping code
struct bootstrapRootStruct {
void* listenComm;
unsigned long hash;
};
#endif
@@ -0,0 +1,37 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_CLIQUE_SHM_NAMES_H_
#define NCCL_CLIQUE_SHM_NAMES_H_
#include <string>
#include <map>
static std::map<std::string, std::string> CliqueShmNames =
{
{"SharedCounters", "RcclCounters" },
{"Mutexes" , "RcclMutexes" },
{"IpcHandles" , "RcclIpcHandles"},
{"Barriers" , "RcclBarriers" }
};
#endif
@@ -0,0 +1,31 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "HandleCache.h"
#include "Hash.h"
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle)
{
return djb2Hash(handle.reserved);
}
+142
Wyświetl plik
@@ -0,0 +1,142 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HANDLE_CACHE_H_
#define NCCL_HANDLE_CACHE_H_
#include <list>
#include <unordered_map>
#include <functional>
#include "core.h"
//#include "llvm/ADT/DenseMap.h"
template <
class Key,
class Value,
class Hash,
class KeyEqual,
class Allocator
>
class NcclIpcHandleCache
{
public:
typedef std::pair<Value, typename std::list<Key>::iterator> NcclIpcHandleCacheValueType;
typedef std::unordered_map<Key, NcclIpcHandleCacheValueType, Hash, KeyEqual, Allocator> LRUCache;
using iterator = typename LRUCache::iterator;
NcclIpcHandleCache(size_t size,
size_t bucket_count = 100,
const Hash& hash = Hash(),
const KeyEqual& eql = KeyEqual(),
const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc)
{
m_capacity = size;
}
~NcclIpcHandleCache()
{
m_lruHistory.clear();
m_cache.clear();
}
iterator begin()
{
return m_cache.begin();
}
iterator end()
{
return m_cache.end();
}
iterator find(const Key& key)
{
iterator it = m_cache.find(key);
if (it != m_cache.end())
{
updateHistory(it);
}
return it;
}
std::pair<iterator, bool> insert(const Key& key, const Value& value)
{
if (m_cache.size() == m_capacity)
{
// remove entry
pop();
}
typename LRUCache::iterator it = m_cache.find(key);
bool inserted;
if (it == m_cache.end())
{
typename std::list<Key>::iterator it = m_lruHistory.insert(m_lruHistory.end(), key);
m_cache.insert(std::make_pair(key, std::make_pair(value, it)));
inserted = true;
}
else
{
inserted = false;
}
return std::pair<iterator, bool>(it, inserted);
}
private:
void pop()
{
typename LRUCache::iterator it = m_cache.find(m_lruHistory.front());
m_cache.erase(it);
m_lruHistory.pop_front();
}
void updateHistory(const iterator& it)
{
if (m_lruHistory.size() > 0)
{
m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, (it->second).second);
}
}
size_t m_capacity;
std::list<Key> m_lruHistory;
LRUCache m_cache;
};
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle);
// equality function required for unordered_map
auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r)
{
return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0;
};
//typedef llvm::DenseMap<uint64_t, hipIpcMemHandle_t> SendCache;
//typedef llvm::DenseMap<hipIpcMemHandle_t, void*, decltype(&HandleHash), decltype(HandleEqual)> RecvCache;
typedef NcclIpcHandleCache<uint64_t, hipIpcMemHandle_t, std::hash<uint64_t>, std::equal_to<uint64_t>, std::allocator< std::pair<const uint64_t, std::pair<hipIpcMemHandle_t, std::list<uint64_t>::iterator>>>> NcclIpcHandleSendCache;
typedef NcclIpcHandleCache<hipIpcMemHandle_t, void*, decltype(&hipIpcMemHandleHash), decltype(hipIpcMemHandleEqual), std::allocator< std::pair<const hipIpcMemHandle_t, std::pair<void*, std::list<hipIpcMemHandle_t>::iterator>>>> NcclIpcHandleRecvCache;
#endif
@@ -0,0 +1,67 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <hip/hip_runtime.h>
#include "HandleShm.h"
#include "CliqueShmNames.h"
#include "core.h"
#include "Hash.h"
#include "shm.h"
NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix) :
ShmObject<std::pair<hipIpcMemHandle_t,size_t>>(numRanks * numHandlesPerRank * capacity * sizeof(std::pair<hipIpcMemHandle_t,size_t>),
CliqueShmNames["IpcHandles"] + suffix,
rank,
numRanks,
projid),
m_numHandlesPerRank(numHandlesPerRank),
m_numHandlesPerOpCount(numRanks * numHandlesPerRank)
{
}
NcclIpcHandleShm::NcclIpcHandleShm()
{
}
NcclIpcHandleShm::~NcclIpcHandleShm()
{
}
ncclResult_t NcclIpcHandleShm::Open()
{
return ShmObject::Open();
}
ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>> const& sendHandles)
{
size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank);
memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(std::pair<hipIpcMemHandle_t,size_t>) * m_numHandlesPerRank);
return ncclSuccess;
}
ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>>& recvHandles)
{
size_t idx = opCount * m_numHandlesPerOpCount;
memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(std::pair<hipIpcMemHandle_t,ssize_t>));
return ncclSuccess;
}
+53
Wyświetl plik
@@ -0,0 +1,53 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_IPC_HANDLE_SHM_H_
#define NCCL_IPC_HANDLE_SHM_H_
#include <hip/hip_runtime.h>
#include <vector>
#include <string>
#include "nccl.h"
#include "ShmObject.h"
class NcclIpcHandleShm : public ShmObject<std::pair<hipIpcMemHandle_t,size_t>>
{
public:
NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix);
NcclIpcHandleShm();
~NcclIpcHandleShm();
ncclResult_t Open();
ncclResult_t WriteHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>> const& sendHandles);
ncclResult_t ReadHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>>& recvHandles);
private:
int m_numHandlesPerRank;
int m_numHandlesPerOpCount;
};
#endif
+34
Wyświetl plik
@@ -0,0 +1,34 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "Hash.h"
unsigned long djb2Hash(const char* data)
{
unsigned long hash = 5381;
int c;
while ((c = *(data)++))
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
return hash;
}
+28
Wyświetl plik
@@ -0,0 +1,28 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HASH_H_
#define NCCL_HASH_H_
unsigned long djb2Hash(const char* data);
#endif
+72
Wyświetl plik
@@ -0,0 +1,72 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "MsgQueue.h"
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSG_QUEUE_PERM 0666
ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid)
{
key_t key;
SYSCHECKVAL(ftok(name.c_str(), projid), "ftok", key);
int flag = (exclusive == true ? IPC_CREAT | IPC_EXCL : IPC_CREAT);
msgid = msgget(key, MSG_QUEUE_PERM | flag);
// Check if we're trying to create message queue and it already exists; if so, delete existing queue
if (msgid == -1 && exclusive == true && errno == EEXIST)
{
NCCLCHECK(MsgQueueClose(name, projid));
SYSCHECKVAL(msgget(key, MSG_QUEUE_PERM | flag), "msgget", msgid);
}
else if (msgid == -1)
{
WARN("Call to MsgQueueGetId failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg)
{
SYSCHECK(msgsnd(msgid, msgp, msgsz, msgflg), "msgsnd");
return ncclSuccess;
}
ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait)
{
int msgflg = (wait == false ? IPC_NOWAIT : 0);
SYSCHECK(msgrcv(msgid, msgp, msgsz, msgtyp, msgflg), "msgrcv");
return ncclSuccess;
}
ncclResult_t MsgQueueClose(std::string name, int projid)
{
key_t key;
int msgid;
key = ftok(name.c_str(), projid);
SYSCHECKVAL(msgget(key, IPC_CREAT), "msgget", msgid);
SYSCHECK(msgctl(msgid, IPC_RMID, NULL), "msgctl");
return ncclSuccess;
}
+42
Wyświetl plik
@@ -0,0 +1,42 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_MSG_QUEUE_HPP_
#define RCCL_MSG_QUEUE_HPP_
#include <string>
#include "nccl.h"
#include "core.h"
struct MsgBuffer
{
long msg_type;
char msg_text[1];
};
ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid);
ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg);
ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait);
ncclResult_t MsgQueueClose(std::string name, int projid);
#endif
@@ -0,0 +1,43 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef SHAREDMEMHELPER_H
#define SHAREDMEMHELPER_H
class SharedMemHelper
{
public:
SharedMemHelper(int rank, int numRanks, int numEntries);
ncclStatus_t Init(std::string const& baseFilename);
ncclStatus_t
protected:
bool m_initialized;
int m_rank;
int m_numRanks;
};
#endif
@@ -0,0 +1,45 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "ShmObject.h"
#include <string>
// Template specializations for sem_t objects which require additional initialization
template<>
ncclResult_t ShmObject<sem_t>::Close()
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
sem_destroy(static_cast<sem_t*>(&m_shmPtr[i]));
}
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
+210
Wyświetl plik
@@ -0,0 +1,210 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_SHM_OBJECT_H_
#define NCCL_SHM_OBJECT_H_
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <type_traits>
#include <semaphore.h>
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "shm.h"
// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared
// memory object at the same time.
static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) {
*fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR);
if (*fd == -1) return ncclSystemError;
if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate");
SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap");
close(*fd);
*fd = -1;
if (create) memset(*ptr, 0, shmsize);
return ncclSuccess;
}
template <typename T>
class ShmObject
{
public:
ShmObject(size_t size, std::string fileName, int rank, int numRanks, int projid) :
m_shmSize(size),
m_shmName(fileName),
m_rank(rank),
m_numRanks(numRanks),
m_projid(projid),
m_alloc(false),
m_shmPtr(nullptr) {}
ShmObject() {}
~ShmObject() {}
ncclResult_t Open();
ncclResult_t Close()
{
if (m_alloc)
{
if (m_rank == 0)
{
std::string tmpFileName = "/tmp/" + m_shmName;
remove(tmpFileName.c_str());
}
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
return ncclSuccess;
}
T*& Get()
{
return m_shmPtr;
}
protected:
ncclResult_t BroadcastMessage(int msgid, bool pass)
{
MsgBuffer msg;
msg.msg_text[0] = (pass == 0 ? 'F': 'P');
for (int rank = 0; rank < m_numRanks; rank++)
{
if (rank == m_rank) continue;
msg.msg_type = rank;
NCCLCHECK(MsgQueueSend(msgid, &msg, sizeof(msg), 0));
}
return ncclSuccess;
}
// tag for dispatch
template<class U>
struct OpenTag{};
ncclResult_t InitIfSemaphore(OpenTag<int> tag);
ncclResult_t InitIfSemaphore(OpenTag<uint32_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<sem_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<std::pair<hipIpcMemHandle_t,size_t>> tag);
size_t m_shmSize;
std::string m_shmName;
int m_rank;
int m_numRanks;
int m_projid;
bool m_alloc;
T* m_shmPtr;
};
template <typename T>
ncclResult_t ShmObject<T>::Open()
{
if (m_alloc == false)
{
int shmFd;
int protection = PROT_READ | PROT_WRITE;
int visibility = MAP_SHARED;
int msgid;
std::string tmpFileName = "/tmp/" + m_shmName;
NCCLCHECK(MsgQueueGetId(tmpFileName, m_projid, false, msgid));
if (m_rank == 0)
{
ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1);
ncclResult_t resultSemInit = InitIfSemaphore(OpenTag<T>{});
if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess))
{
NCCLCHECK(BroadcastMessage(msgid, false));
WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno));
return ncclSystemError;
}
NCCLCHECK(BroadcastMessage(msgid, true));
}
else
{
MsgBuffer msg;
NCCLCHECK(MsgQueueRecv(msgid, &msg, sizeof(msg), m_rank, true));
if (msg.msg_text[0] == 'P')
{
NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0));
}
else
{
WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
m_alloc = true;
}
else
{
WARN("Cannot allocate ShmObject twice.\n");
return ncclInvalidUsage;
}
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<unsigned int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<std::pair<hipIpcMemHandle_t,size_t>> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<sem_t> tag)
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
SYSCHECK(sem_init(static_cast<sem_t*>(&m_shmPtr[i]), 1, 1), "sem_init");
}
return ncclSuccess;
}
#endif
@@ -8,6 +8,7 @@
#include "devcomm.h"
#include "primitives.h"
#include "collectives.h"
#include "clique/AllReduceCliqueKernel.h" // [RCCL] AllReduce Clique-based kernel support
template<int UNROLL, class FUNC, typename T>
__attribute__((noinline))
@@ -310,6 +311,7 @@ __device__ void ncclAllReduceTreeLLKernel(struct CollectiveArgs* args) {
const ssize_t loopSize = nChannels*chunkSize;
const ssize_t size = args->coll.count;
if (loopSize > size) {
chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize;
}
@@ -417,76 +419,10 @@ __device__ void ncclAllReduceCollNetLLKernel(struct CollectiveArgs* args) {
template<int UNUSED, class FUNC, typename T>
__attribute__((noinline))
__device__ void ncclAllReduceRingLL128Kernel(struct CollectiveArgs* args) {
const int tid = threadIdx.x;
const int nthreads = args->coll.nThreads;
const int bid = args->coll.bid;
const int nChannels = args->coll.nChannels;
struct ncclDevComm* comm = args->comm;
struct ncclChannel* channel = comm->channels+blockIdx.x;
struct ncclRing* ring = &channel->ring;
const int stepSize = comm->buffSizes[NCCL_PROTO_LL128] / (sizeof(uint64_t)*NCCL_STEPS);
ssize_t chunkSize = stepSize*NCCL_LL128_DATAELEMS*sizeof(uint64_t) / (NCCL_LL128_LINEELEMS*sizeof(T));
// We should not need the final /2 but it makes performance much, much smoother. Might be a bug somewhere.
const ssize_t minChunkSize = (NCCL_LL128_SHMEM_ELEMS_PER_THREAD*nthreads*NCCL_LL128_DATAELEMS*sizeof(uint64_t))/(NCCL_LL128_LINEELEMS*sizeof(T))/2;
const int nranks = comm->nRanks;
const ssize_t loopSize = nChannels*nranks*chunkSize;
const ssize_t size = args->coll.count;
ncclLL128Primitives<T, FUNC, 1, 1> LLprims(tid, nthreads, &ring->prev, &ring->next, stepSize, channel, comm);
// Compute pointers
const T * __restrict__ thisInput = (const T*)args->sendbuff;
T * __restrict__ thisOutput = (T*)args->recvbuff;
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
chunkSize = min(DIVUP(size-gridOffset, nChannels*nranks*minChunkSize)*minChunkSize, chunkSize);
/////////////// begin AllReduce steps ///////////////
ssize_t offset;
int nelem;
int chunk;
// step 0: push data to next GPU
chunk = ring->devUserRanks[nranks-1];
offset = gridOffset + (chunk*nChannels+bid) * chunkSize;
nelem = min(chunkSize, size-offset);
LLprims.send(thisInput+offset, nelem);
// k-2 steps: reduce and copy to next GPU
for (int j=2; j<nranks; ++j) {
chunk = ring->devUserRanks[nranks-j];
offset = gridOffset + (chunk*nChannels+bid) * chunkSize;
nelem = min(chunkSize, size-offset);
LLprims.recvReduceSend(thisInput+offset, nelem);
}
// step k-1: reduce this buffer and data, which will produce the final
// result that we store in this data and push to the next GPU
chunk = ring->devUserRanks[0];
offset = gridOffset + (chunk*nChannels+bid) * chunkSize;
nelem = min(chunkSize, size-offset);
LLprims.recvReduceCopySend(thisInput+offset, thisOutput+offset, nelem);
// k-2 steps: copy to next GPU
for (int j=1; j<nranks-1; ++j) {
chunk = ring->devUserRanks[nranks-j];
offset = gridOffset + (chunk*nChannels+bid) * chunkSize;
nelem = min(chunkSize, size-offset);
LLprims.recvCopySend(thisOutput+offset, nelem);
}
// Make final copy from buffer to dest.
chunk = ring->devUserRanks[1];
offset = gridOffset + (chunk*nChannels+bid) * chunkSize;
nelem = min(chunkSize, size-offset);
// Here we need to copy from buffer to this output.
LLprims.recv(thisOutput+offset, nelem);
}
// [RCCL] RingLL128 is re-purposed as clique-based kernel
LAUNCH_CLIQUE_KERNEL(AllReduceCliqueSplitKernel, FUNC, T, args);
// [/RCCL]
}
template<int UNUSED, class FUNC, typename T>
@@ -507,6 +443,7 @@ __device__ void ncclAllReduceTreeLL128Kernel(struct CollectiveArgs* args) {
int nthreadsSplit = NCCL_LL128_SPLIT(nthreads);
const ssize_t size = args->coll.count;
if (loopSize > size) {
chunkSize = DIVUP(size, nChannels*minChunkSize)*minChunkSize;
}
@@ -89,13 +89,45 @@ static inline __device__ void exitIfAbortBarrier(int abort) {
NCCL_FUNCS3B(coll, copy), \
NCCL_FUNCS3B(coll, copy)
// [RCCL] Adding clique-based kernels for AllReduce, in-place of unused RingLL28 kernels
#define NCCL_FUNC5B(coll, op, dtype) \
NCCL_COLL_NAME(coll##LL, op, dtype), \
NCCL_COLL_NAME(coll##LL128, op, dtype), \
NCCL_COLL_NAME(coll, op, dtype)
#define NCCL_FUNC4B(coll, op, dtype) \
NCCL_FUNC5(coll##Tree, op, dtype), \
NCCL_FUNC5B(coll##Ring, op, dtype), \
NCCL_FUNC5(coll##CollNet, op, dtype)
#define NCCL_FUNCS3C(coll, op) \
NCCL_FUNC4B(coll, op, i8), \
NCCL_FUNC4B(coll, op, u8), \
NCCL_FUNC4B(coll, op, i32), \
NCCL_FUNC4B(coll, op, u32), \
NCCL_FUNC4B(coll, op, i64), \
NCCL_FUNC4B(coll, op, u64), \
NCCL_FUNC4B(coll, op, f16), \
NCCL_FUNC4B(coll, op, f32), \
NCCL_FUNC4B(coll, op, f64), \
NCCL_FUNC4B(coll, op, b16)
#define NCCL_FUNCS2C(coll) \
NCCL_FUNCS3C(coll, sum ), \
NCCL_FUNCS3C(coll, prod), \
NCCL_FUNCS3C(coll, max ), \
NCCL_FUNCS3C(coll, min )
// [/RCCL]
// Must be consistent with ncclFunc_t
#define NCCL_FUNCS() { \
NCCL_FUNCS2B(ncclBroadcast), \
NCCL_FUNCS2A(ncclReduce), \
NCCL_FUNCS2B(ncclAllGather), \
NCCL_FUNCS2A(ncclReduceScatter), \
NCCL_FUNCS2A(ncclAllReduce), \
NCCL_FUNCS2C(ncclAllReduce), \
NCCL_COLL_NAME(ncclGather, copy, i8), \
NCCL_COLL_NAME(ncclScatter, copy, i8), \
NCCL_COLL_NAME(ncclAllToAll, copy, i8), \
@@ -114,7 +146,7 @@ static const __device__ constexpr ncclKernelFunc_t ncclFuncs[]{
NCCL_FUNCS2A(ncclReduce),
NCCL_FUNCS2B(ncclAllGather),
NCCL_FUNCS2A(ncclReduceScatter),
NCCL_FUNCS2A(ncclAllReduce),
NCCL_FUNCS2C(ncclAllReduce),
NCCL_COLL_NAME(ncclGather, copy, i8),
NCCL_COLL_NAME(ncclScatter, copy, i8),
NCCL_COLL_NAME(ncclAllToAll, copy, i8),
@@ -350,9 +350,14 @@ __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(int32_t); }
__device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(Pack128); }
#endif
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
// Multiply UNROLL by 2 if single source/single destination
#define AUTOUNROLL (UNROLL*((MINSRCS==1 && MINDSTS==1) ? 2 : 1))
#else
// Try to limit consecutive load/stores to 8.
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
#define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS)))
#endif
template<int UNROLL, class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
__device__ void ReduceOrCopyMulti(const int tid, const int nthreads,
+47 -2
Wyświetl plik
@@ -9,6 +9,8 @@
#include "argcheck.h"
#include "coll_net.h"
#include "../graph/topo.h"
#include <hip/hip_runtime.h>
#include <hip/hip_ext.h>
// Only generate inline kernels for LL
#define NCCL_FUNC5(coll, op, dtype) \
@@ -116,6 +118,10 @@ ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) {
STORE(&channel->collectives[(channel->collStart+channel->collCount-1)%NCCL_MAX_OPS].active, 2);
}
{ // [RCCL] Wait for any clique-based collectives
NCCLCHECK(comm->cliqueManager->WaitForPointers());
} // [/RCCL]
// Find the first operation, choose the kernel accordingly and pass it
// as the first argument.
struct ncclColl* coll = comm->channels[0].collectives+comm->channels[0].collStart;
@@ -210,7 +216,8 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) {
(comm->launchMode == ncclComm::GROUP && comm->groupCudaStream) ? "/Stream" : "");
}
hipEvent_t startEvent;
hipEvent_t stopEvent;
if (comm->launchMode == ncclComm::PARALLEL) {
hipLaunchKernelGGL(((void (*)(struct ncclDevComm*))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclDevComm ***)(params->args)));
} else {
@@ -257,6 +264,7 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) {
info->algorithm = -1;
info->protocol = -1;
int nAlgos = NCCL_NUM_ALGORITHMS;
// Check collNet support
int collNetTypeSupport = 0;
if (info->comm->collNetSupport)
@@ -373,6 +381,7 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo
#endif
return ncclSuccess;
}
// Set nstepsPerLoop and nchunksPerLoop
NCCLCHECK(getAlgoInfo(info));
NCCLCHECK(getPatternInfo(info));
@@ -391,6 +400,33 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclCo
coll->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol);
{ // [RCCL] Check for clique-based kernel support
if (info->comm->cliqueManager->IsSupported(info->coll,
info->count,
info->datatype,
info->op))
{
// Declare the input / output pointers being used (to exchange via IPC with other ranks)
NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount,
info->sendbuff,
info->recvbuff));
info->algorithm = NCCL_ALGO_RING;
info->protocol = NCCL_PROTO_CLIQUE;
// Determine the number of channels to use for clique-kernel
NCCLCHECK(info->comm->cliqueManager->GetNumChannelsToUse(info->coll,
info->count,
info->datatype,
info->op,
info->comm->nChannels,
&coll->args.clique.nChannels));
coll->args.clique.count = info->count;
coll->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol);
return ncclSuccess;
}
} // [RCCL]
int stepSize = info->comm->buffSizes[info->protocol]/NCCL_STEPS;
int chunkSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->chunkSteps : 1;
int sliceSteps = (info->protocol == NCCL_PROTO_SIMPLE && info->algorithm == NCCL_ALGO_RING) ? info->sliceSteps : 1;
@@ -478,6 +514,7 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
info->comm->myParams->blockDim.x = std::max<unsigned>(info->comm->myParams->blockDim.x, info->nThreads);
int nChannels = info->coll == ncclCollSendRecv ? 1 : coll.args.coll.nChannels;
int nSubChannels = (info->pattern == ncclPatternCollTreeUp || info->pattern == ncclPatternCollTreeDown) ? 2 : 1;
for (int bid=0; bid<nChannels*nSubChannels; bid++) {
@@ -519,8 +556,15 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
memcpy(c->args.a2av.extra+info->comm->nRanks*2, info->recvcounts, sizeof(size_t*)*(info->comm->nRanks));
memcpy(c->args.a2av.extra+info->comm->nRanks*3, info->rdispls, sizeof(size_t*)*(info->comm->nRanks));
c->args.a2av.bid = bid % coll.args.coll.nChannels;
} else if (info->coll != ncclCollSendRecv)
} else if (info->coll != ncclCollSendRecv) {
c->args.coll.bid = bid % coll.args.coll.nChannels;
}
// [RCCL] Setup pointers to where all the input/output pointers will be
if (info->protocol == NCCL_PROTO_CLIQUE) {
NCCLCHECK(info->comm->cliqueManager->SetCliqueCollectiveArgs(&c->args));
}
// [/RCCL]
STORE(&c->active, 1);
opIndex = (opIndex+1)%NCCL_MAX_OPS;
@@ -599,6 +643,7 @@ ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) {
} else {
NCCLCHECKGOTO(ncclSaveKernel(info), ret, end);
}
end:
if (savedDev != -1) CUDACHECK(hipSetDevice(savedDev));
ncclAsyncErrCheck(ret);
+1 -1
Wyświetl plik
@@ -12,7 +12,7 @@
ncclResult_t bootstrapNetInit();
ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv);
ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out);
ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState);
ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState, int* rootPid); // [RCCL] Adding rootPid
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size);
ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size);
ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size);
+9 -2
Wyświetl plik
@@ -10,6 +10,9 @@
#include "transport.h"
#include "p2p.h"
// [RCCL]
#include "clique/CliqueManager.h"
// [/RCCL]
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
#else
@@ -143,8 +146,12 @@ struct ncclComm {
//list of async p2p operation queued in a group semantics
struct ncclP2Plist p2plist;
// RCCL AllToAll/Scatter/Gather API
bool alltoallDisable;
// [RCCL]
bool alltoallDisable; // RCCL AllToAll/Scatter/Gather API
CliqueManager* cliqueManager; // CliqueManager handles pointer collection / distribution for clique-based kernels
int rootPid; // Process ID of root
// [/RCCL]
};
#endif
+15
Wyświetl plik
@@ -12,6 +12,9 @@
#include "rccl_bfloat16.h"
#include "align.h"
#include <stdint.h>
// [RCCL] Support for clique-based kernels
#include "clique/CliqueCommon.h"
// [/RCCL]
// Convert volatile access to atomic
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
@@ -22,6 +25,7 @@
#define STORE(DST, SRC) *(DST) = (SRC)
#endif
#define NCCL_NUM_FUNCTIONS 5 // SendRecv not included for now
typedef enum { ncclCollBroadcast, ncclCollReduce, ncclCollAllGather, ncclCollReduceScatter, ncclCollAllReduce, ncclCollGather, ncclCollScatter, ncclCollAllToAll, ncclCollAllToAllv, ncclCollSendRecv} ncclFunc_t;
extern const char* ncclFuncStr[NCCL_NUM_FUNCTIONS+4];
@@ -35,6 +39,7 @@ extern const char* ncclAlgoStr[NCCL_NUM_ALGORITHMS];
#define NCCL_NUM_PROTOCOLS 3 // Simple/LL/LL128
#define NCCL_PROTO_LL 0
#define NCCL_PROTO_LL128 1
#define NCCL_PROTO_CLIQUE 1 // [RCCL] Clique takes up same protocol as unused LL128
#define NCCL_PROTO_SIMPLE 2
extern const char* ncclProtoStr[NCCL_NUM_PROTOCOLS];
@@ -190,8 +195,18 @@ struct CollectiveArgs {
size_t count;
size_t* extra;
} a2av;
// [RCCL] Clique-based arguments
struct {
uint16_t nThreads;
uint8_t bid;
uint8_t nChannels;
size_t count;
cliqueDevicePtrs_t* ptrs;
} clique;
// [/RCCL]
};
};
struct ncclColl {
union {
struct {
+58 -1
Wyświetl plik
@@ -28,6 +28,10 @@
#include <unistd.h>
#include "graph/topo.h"
// [RCCL]
#include "clique/CliqueManager.h"
// [/RCCL]
#define STR2(v) #v
#define STR(v) STR2(v)
@@ -300,6 +304,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
}
RCCL_PARAM(AllToAllDisable, "ALLTOALL_KERNEL_DISABLE", 1);
RCCL_PARAM(ForceEnableClique, "FORCE_ENABLE_CLIQUE", 0);
static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) {
if (ndev < 1) {
@@ -678,7 +683,10 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
int nranks = comm->nRanks;
uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));
// [RCCL] Collect the PID of the root
int rootPid;
NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap, &rootPid));
// [/RCCL]
// AllGather1 - begin
struct {
@@ -1016,8 +1024,53 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
return ncclInternalError;
}
NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, allGather1Data[intraRank0].comm));
{ // [RCCL] Check if clique-based kernels can be enabled and initialize CliqueManager if so
CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED;
if (comm->localRanks == comm->nRanks)
{
// Check that all the GPUs have peer access to one another
bool hasPeerAccess = true;
for (int i = 0; i < nranks && hasPeerAccess; i++)
{
int cudaDev1 = allGather1Data[i].peerInfo.cudaDev;
for (int j = 0; j < nranks; j++)
{
if (i == j) continue;
int cudaDev2 = allGather1Data[j].peerInfo.cudaDev;
int p2p;
if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p)
{
hasPeerAccess = false;
break;
}
}
}
if (hasPeerAccess)
{
if (intraRanks == nranks)
cliqueMode = CliqueManager::CLIQUE_SINGLE_PROCESS;
else
cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE;
}
// For now, only enable clique-based kernels on CR8_G topologies, unless explicitly asked
if (!rcclParamForceEnableClique())
{
// Disable clique-kernel support if not on CR8 topology
if (!(comm->topo->nodes[NET].count == 0 && comm->topo->type == RCCL_TOPO_CR8G))
{
INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (force enable with RCCL_FORCE_ENABLE_CLIQUE)");
cliqueMode = CliqueManager::CLIQUE_DISABLED;
}
}
}
comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode);
NCCLCHECK(comm->cliqueManager->Init(commId, rootPid));
} // [/RCCL]
} while(0);
// Done with AllGather1 data
free(allGather1Data);
@@ -1147,6 +1200,10 @@ ncclResult_t ncclCommDestroy(ncclComm_t comm) {
return ncclInvalidArgument;
}
// [RCCL] Delete CliqueManager if it exists
if (comm->cliqueManager) delete comm->cliqueManager;
// [/RCCL]
return commDestroy(comm);
}
+10 -9
Wyświetl plik
@@ -418,23 +418,24 @@ namespace CorrectnessTests
switch (dataset.dataType)
{
case ncclInt8:
printf("Expected %d. Output %d on device %d[%d]\n", outputI1[j], expectedI1[j], i, j); break;
printf("Expected %d. Output %d on device %d[%d]\n", expectedI1[j], outputI1[j], i, j);
break;
case ncclUint8:
printf("Expected %u. Output %u on device %d[%d]\n", outputU1[j], expectedU1[j], i, j); break;
printf("Expected %u. Output %u on device %d[%d]\n", expectedU1[j], outputU1[j], i, j); break;
case ncclInt32:
printf("Expected %d. Output %d on device %d[%d]\n", outputI4[j], expectedI4[j], i, j); break;
printf("Expected %d. Output %d on device %d[%d]\n", expectedI4[j], outputI4[j], i, j); break;
case ncclUint32:
printf("Expected %u. Output %u on device %d[%d]\n", outputU4[j], expectedU4[j], i, j); break;
printf("Expected %u. Output %u on device %d[%d]\n", expectedU4[j], outputU4[j], i, j); break;
case ncclInt64:
printf("Expected %ld. Output %ld on device %d[%d]\n", outputI8[j], expectedI8[j], i, j); break;
printf("Expected %ld. Output %ld on device %d[%d]\n", expectedI8[j], outputI8[j], i, j); break;
case ncclUint64:
printf("Expected %lu. Output %lu on device %d[%d]\n", outputU8[j], expectedU8[j], i, j); break;
printf("Expected %lu. Output %lu on device %d[%d]\n", expectedU8[j], outputU8[j], i, j); break;
case ncclFloat32:
printf("Expected %f. Output %f on device %d[%d]\n", outputF4[j], expectedF4[j], i, j); break;
printf("Expected %f. Output %f on device %d[%d]\n", expectedF4[j], outputF4[j], i, j); break;
case ncclFloat64:
printf("Expected %lf. Output %lf on device %d[%d]\n", outputF8[j], expectedF8[j], i, j); break;
printf("Expected %lf. Output %lf on device %d[%d]\n", expectedF8[j], outputF8[j], i, j); break;
case ncclBfloat16:
printf("Expected %f. Output %f on device %d[%d]\n", (float)outputB2[j], (float)expectedB2[j], i, j); break;
printf("Expected %f. Output %f on device %d[%d]\n", (float)expectedB2[j], (float)outputB2[j], i, j); break;
default:
fprintf(stderr, "[ERROR] Unsupported datatype\n");
exit(0);
+1 -1
Wyświetl plik
@@ -58,6 +58,6 @@ namespace CorrectnessTests
testing::Values(2,3,4,5,6,7,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace
@@ -96,6 +96,6 @@ namespace CorrectnessTests
testing::Values(2,3,4,5,6,7,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace
+1 -1
Wyświetl plik
@@ -116,6 +116,6 @@ namespace CorrectnessTests
testing::Values(2,3,4,5,6,7,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace
@@ -0,0 +1,257 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <sys/socket.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <unistd.h>
#include <cstdio>
#include <string>
#include <chrono>
#include <hip/hip_runtime.h>
#include <rccl.h>
#include "HelloRccl.hpp"
void Usage(char *argv0);
void ExecuteTest(int numIntraRank, int intraRankStartId, int numTotalRanks, ncclComm_t* comm);
int main(int argc, char **argv)
{
if (getenv("NCCL_COMM_ID") && argc == 3) // Run in multi-process mode
{
int nranks = atoi(argv[1]);
int rank = atoi(argv[2]);
if (rank == 0) printf("Running in multi-process mode\n");
// Create communicator for this rank
ncclUniqueId commId;
NCCL_CALL(ncclGetUniqueId(&commId));
// Initialize communicator
ncclComm_t comm;
HIP_CALL(hipSetDevice(rank));
NCCL_CALL(ncclCommInitRank(&comm, nranks, commId, rank));
// Run the test
ExecuteTest(1, rank, nranks, &comm);
}
else if (argc == 2) // Run in single-process mode
{
printf("Running in single-process mode\n");
int nranks = atoi(argv[1]);
// Initialize communicators for each rank
ncclComm_t comm[nranks];
NCCL_CALL(ncclCommInitAll(comm, nranks, NULL));
// Run the test
ExecuteTest(nranks, 0, nranks, comm);
}
else
{
Usage(argv[0]);
return 1;
}
return 0;
}
void ExecuteTest(int numIntraRank, int intraRankStartId, int numTotalRanks, ncclComm_t* comm)
{
// Test configuration settings
int minPow = 10; // Starting power of 2 input size
int maxPow = 28; // Ending power of 2 input size
int numWarmups = 3; // Number of untimed warmup iterations
int numIterations = 10; // Number of timed iterations
// Allocate GPU resources for this process
hipStream_t stream[numIntraRank];
hipEvent_t startEvent[numIntraRank];
hipEvent_t stopEvent[numIntraRank];
for (int i = 0; i < numIntraRank; i++)
{
HIP_CALL(hipSetDevice(intraRankStartId + i));
HIP_CALL(hipStreamCreate(&stream[i]));
HIP_CALL(hipEventCreate(&startEvent[i]));
HIP_CALL(hipEventCreate(&stopEvent[i]));
}
if (intraRankStartId == 0)
{
printf("AllReduce Performance (sum of floats):\n");
printf("%10s %10s %10s\n", "Bytes", "CpuTime(ms)", "GpuTime(ms)");
}
// Loop over power-of-two input sizes
for (int power = minPow; power <= maxPow; power++)
{
int N = 1 << power;
// Allocate GPU memory
float *iputGpu[numIntraRank], *oputGpu[numIntraRank];
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipSetDevice(intraRankStartId + r));
HIP_CALL(hipMalloc((void **)&iputGpu[r], N * sizeof(float)));
HIP_CALL(hipMalloc((void **)&oputGpu[r], N * sizeof(float)));
}
// Allocate CPU memory for input/output
float *iputCpu = (float *)malloc(N * sizeof(float));
float *oputCpu = (float *)malloc(N * sizeof(float));
// Fill CPU memory with a simple pattern
for (int i = 0; i < N; i++)
{
iputCpu[i] = 1.0f;
oputCpu[i] = 0.0f;
}
// Copy the input from CPU memory to GPU memory
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipSetDevice(intraRankStartId + r));
HIP_CALL(hipMemcpy(iputGpu[r], iputCpu, N * sizeof(float), hipMemcpyHostToDevice));
}
// Perform some untimed initial warmup iterations
for (int iteration = 0; iteration < numWarmups; iteration++)
{
NCCL_CALL(ncclGroupStart());
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipSetDevice(intraRankStartId + r));
NCCL_CALL(ncclAllReduce(iputGpu[r], oputGpu[r], N, ncclFloat, ncclSum, comm[r], stream[r]));
}
NCCL_CALL(ncclGroupEnd());
}
for (int r = 0; r < numIntraRank; r++)
HIP_CALL(hipStreamSynchronize(stream[r]));
// Perform timed iterations
auto cpuStart = std::chrono::high_resolution_clock::now();
for (int r = 0; r < numIntraRank; r++)
HIP_CALL(hipEventRecord(startEvent[r], stream[r]));
for (int iteration = 0; iteration < numIterations; iteration++)
{
NCCL_CALL(ncclGroupStart());
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipSetDevice(intraRankStartId + r));
NCCL_CALL(ncclAllReduce(iputGpu[r], oputGpu[r], N, ncclFloat, ncclSum, comm[r], stream[r]));
}
NCCL_CALL(ncclGroupEnd());
}
for (int r = 0; r < numIntraRank; r++)
HIP_CALL(hipEventRecord(stopEvent[r], stream[r]));
for (int r = 0; r < numIntraRank; r++)
HIP_CALL(hipStreamSynchronize(stream[r]));
auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart;
double totalCpuTime = std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(cpuDelta).count();
float totalGpuTime;
HIP_CALL(hipEventElapsedTime(&totalGpuTime, startEvent[0], stopEvent[0]));
if (intraRankStartId == 0) printf("%10lu %10.3f %10.3f\n", N * sizeof(float), (totalCpuTime / numIterations), (totalGpuTime / numIterations));
// Validate results
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipMemcpy(oputCpu, oputGpu[r], N * sizeof(float), hipMemcpyDeviceToHost));
bool isOK = true;
int expected = numTotalRanks;
for (int i = 0; i < N; i++)
{
isOK &= (oputCpu[i] == expected);
}
if (!isOK)
{
printf("[ERROR] Rank %d Incorrect results for N = %d\n", intraRankStartId + r, N);
NCCL_CALL(ncclCommDestroy(comm[r]));
exit(1);
}
}
// Release GPU resources
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipFree(oputGpu[r]));
HIP_CALL(hipFree(iputGpu[r]));
}
free(iputCpu);
free(oputCpu);
}
for (int r = 0; r < numIntraRank; r++)
{
HIP_CALL(hipStreamDestroy(stream[r]));
HIP_CALL(hipEventDestroy(startEvent[r]));
HIP_CALL(hipEventDestroy(stopEvent[r]));
NCCL_CALL(ncclCommDestroy(comm[r]));
}
}
void Usage(char *argv0)
{
printf("Single Process Usage: %s numRanks\n", argv0);
printf("\n");
printf("Multi Process Usage: %s numRanks rank\n", argv0);
printf(" - NCCL_COMM_ID must be set in order to use this\n\n");
printf(" - To use this process as the root process you may use any of the following:\n");
char hostname[256];
gethostname(hostname, 256);
printf(" export NCCL_COMM_ID=%s:12345\n", hostname);
// Loop over linked list of interfaces
struct ifaddrs *ifaddr;
getifaddrs(&ifaddr);
for (struct ifaddrs* ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
{
// Skip anything not based on IPv4 / IPv6
int family = ifa->ifa_addr->sa_family;
if (family != AF_INET && family != AF_INET6) continue;
// Skip iPv6 loopback interface
if (family == AF_INET6)
{
struct sockaddr_in6* sa = (struct sockaddr_in6*)(ifa->ifa_addr);
if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
}
socklen_t saLen = (family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
char host[NI_MAXHOST];
char service[NI_MAXSERV];
getnameinfo(ifa->ifa_addr, saLen, host, NI_MAXHOST, service, NI_MAXSERV,
NI_NUMERICHOST|NI_NUMERICSERV);
std::string result = std::string(host) + ":12345";
printf(" export NCCL_COMM_ID=%s\n", result.c_str());
}
freeifaddrs(ifaddr);
}
@@ -0,0 +1,49 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef HELLORCCL_HPP
#define HELLORCCL_HPP
#include <iostream>
#define HIP_CALL(cmd) \
do { \
hipError_t error = (cmd); \
if (error != hipSuccess) \
{ \
std::cerr << "Encountered HIP error (" << hipGetErrorString(error) << ") at line " \
<< __LINE__ << " in file " << __FILE__ << "\n"; \
exit(-1); \
} \
} while (0)
#define NCCL_CALL(cmd) \
do { \
ncclResult_t error = (cmd); \
if (error != ncclSuccess) \
{ \
std::cerr << "Encountered NCCL error (" << ncclGetErrorString(error) << ") at line " \
<< __LINE__ << " in file " << __FILE__ << "\n"; \
exit(-1); \
} \
} while (0)
#endif
@@ -0,0 +1,21 @@
# Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
# Set to where RCCL is installed
RCCL_INSTALL=../../build/release
HIP_PATH?= $(wildcard /opt/rocm/hip)
ifeq (,$(HIP_PATH))
HIP_PATH=../../..
endif
HIPCC=$(HIP_PATH)/bin/hipcc
EXE=HelloRccl
CXXFLAGS = -std=c++11 -O3 -I../../src/include -I$(RCCL_INSTALL) -L$(RCCL_INSTALL) -lrccl
all: $(EXE)
$(EXE): $(EXE).cpp $(shell find -regex ".*\.\hpp")
$(HIPCC) $(CXXFLAGS) $< -o $@
clean:
rm -f *.o $(EXE)
+22
Wyświetl plik
@@ -0,0 +1,22 @@
#!/bin/bash
RCCL_INSTALL=../../build/release
EXE=$PWD/HelloRccl
LDPATH=$LD_LIBRARY_PATH:$RCCL_INSTALL
echo "Single process - With clique-based kernels:"
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 LD_LIBRARY_PATH=$LDPATH $EXE 4
echo "Single process - Without clique-based kernels:"
NCCL_DEBUG=INFO LD_LIBRARY_PATH=$LDPATH $EXE 4
echo "With clique-based kernels:"
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 &
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 &
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 &
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3
echo "Without clique-based kernels:"
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3