Initial support for clique-based kernels (#276)
* Initial support for clique-based kernels
This commit is contained in:
@@ -126,6 +126,12 @@ set(CC_SOURCES
|
||||
src/collectives/all_to_all_api.cc
|
||||
src/collectives/all_to_allv_api.cc
|
||||
src/channel.cc
|
||||
src/clique/CliqueManager.cc # RCCL
|
||||
src/clique/HandleCache.cc # RCCL
|
||||
src/clique/HandleShm.cc # RCCL
|
||||
src/clique/Hash.cc # RCCL
|
||||
src/clique/MsgQueue.cc # RCCL
|
||||
src/clique/ShmObject.cc # RCCL
|
||||
src/misc/argcheck.cc
|
||||
src/misc/nvmlwrap_stub.cc
|
||||
src/misc/utils.cc
|
||||
|
||||
+33
-3
@@ -12,6 +12,11 @@
|
||||
#include "socket.h"
|
||||
#include <unistd.h>
|
||||
#include <sys/types.h>
|
||||
// [RCCL]
|
||||
#include "clique/CliqueManager.h"
|
||||
#include "clique/CliqueShmNames.h"
|
||||
#include "clique/Hash.h"
|
||||
// [/RCCL]
|
||||
|
||||
struct bootstrapNetComm {
|
||||
int fd;
|
||||
@@ -163,7 +168,14 @@ static ncclResult_t setFilesLimit() {
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
static void *bootstrapRoot(void* listenComm) {
|
||||
static void *bootstrapRoot(void* bootstrapRootStruct) { // [RCCL] Modified to include hash argument)
|
||||
// [RCCL] Unpack bootstrapRootStruct
|
||||
struct bootstrapRootStruct* rootStruct = (struct bootstrapRootStruct*) bootstrapRootStruct;
|
||||
void* listenComm = rootStruct->listenComm;
|
||||
unsigned long hash = rootStruct->hash;
|
||||
int pid = getpid(); // sharing PID to other ranks for creating shared memory files for CliqueManager
|
||||
// [/RCCL]
|
||||
|
||||
struct extInfo info;
|
||||
ncclNetHandle_t *rankHandles = NULL;
|
||||
ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange
|
||||
@@ -205,12 +217,19 @@ static void *bootstrapRoot(void* listenComm) {
|
||||
} while (c < nranks);
|
||||
TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);
|
||||
|
||||
{ // [RCCL] Initialize message queues / shared memory files
|
||||
NCCLCHECKGOTO(CliqueManager::BootstrapRootInit(pid, hash), res, out);
|
||||
} // [/RCCL]
|
||||
|
||||
// Send the connect handle for the next rank in the AllGather ring
|
||||
for (int r=0; r<nranks; ++r) {
|
||||
int next = (r+1) % nranks;
|
||||
void *tmpSendComm;
|
||||
NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);
|
||||
NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);
|
||||
{ // [RCCL] Send the root pid for shared file naming
|
||||
NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, &pid, sizeof(int)), res, out);
|
||||
} // [/RCCL]
|
||||
NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);
|
||||
}
|
||||
TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);
|
||||
@@ -229,7 +248,14 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {
|
||||
void* listenComm;
|
||||
NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
|
||||
pthread_t thread;
|
||||
pthread_create(&thread, NULL, bootstrapRoot, listenComm);
|
||||
|
||||
// [RCCL] Use the ncclUniqueId to get a hash for bootstrap
|
||||
struct bootstrapRootStruct* rootStruct = new bootstrapRootStruct;
|
||||
rootStruct->hash = djb2Hash(id->internal);
|
||||
rootStruct->listenComm = listenComm;
|
||||
pthread_create(&thread, NULL, bootstrapRoot, (void *)rootStruct);
|
||||
// [/RCCL]
|
||||
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
@@ -267,9 +293,10 @@ struct extState {
|
||||
int rank;
|
||||
int nranks;
|
||||
int dev;
|
||||
int rootPid; // [RCCL] PID of root
|
||||
};
|
||||
|
||||
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
|
||||
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState, int* rootPid) { // [RCCL] Adding rootPid
|
||||
ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
|
||||
bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;
|
||||
struct extState* state;
|
||||
@@ -314,6 +341,9 @@ ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commS
|
||||
ncclNetHandle_t extHandleNext;
|
||||
NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));
|
||||
NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));
|
||||
{ // [RCCL] Receive PID from root
|
||||
NCCLCHECK(bootstrapNetRecv(tmpRecvComm, rootPid, sizeof(int)));
|
||||
} // [/RCCL]
|
||||
NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));
|
||||
NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));
|
||||
|
||||
|
||||
@@ -0,0 +1,145 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef ALLREDUCECLIQUEKERNEL_H
|
||||
#define ALLREDUCECLIQUEKERNEL_H
|
||||
|
||||
#include "CliqueCommon.h"
|
||||
#include "common_kernel.h"
|
||||
#include <hip/hip_runtime.h>
|
||||
#include "hip/hip_ext.h"
|
||||
|
||||
#define ALL_REDUCE_SPLIT_BLOCKSIZE 256
|
||||
|
||||
template <typename T, class FUNC, int NUM_RANKS>
|
||||
__global__ __launch_bounds__(ALL_REDUCE_SPLIT_BLOCKSIZE)
|
||||
void AllReduceCliqueSplitKernel(int N,
|
||||
size_t startIdx,
|
||||
cliqueDevicePtrs_t cliquePtrs)
|
||||
{
|
||||
if (N > 0)
|
||||
{
|
||||
// Each workgroup operates on a contiguous portion of memory
|
||||
// Divide the # of elements evenly across workgroups, then round up to multiple of blocksize
|
||||
int baseSize = (N + gridDim.x - 1) / gridDim.x;
|
||||
int chunkSize = RoundUp(baseSize, ALL_REDUCE_SPLIT_BLOCKSIZE);
|
||||
int blockOffsetStart = min(blockIdx.x * chunkSize, N);
|
||||
int blockOffsetStop = min(blockOffsetStart + chunkSize, N);
|
||||
int blockN = blockOffsetStop - blockOffsetStart;
|
||||
|
||||
if (blockN > 0)
|
||||
{
|
||||
T const** inputs = (T const**)cliquePtrs.inputs;
|
||||
T** outputs = (T **)cliquePtrs.outputs;
|
||||
|
||||
T const* srcs[NUM_RANKS];
|
||||
T* dsts[NUM_RANKS];
|
||||
|
||||
#pragma unroll
|
||||
for (int r = 0; r < NUM_RANKS; r++)
|
||||
{
|
||||
srcs[r] = inputs[r] + startIdx + blockOffsetStart;
|
||||
dsts[r] = outputs[r] + startIdx + blockOffsetStart;
|
||||
}
|
||||
|
||||
#define ALL_REDUCE_CLIQUE_UNROLL 4
|
||||
ReduceOrCopyMulti<ALL_REDUCE_CLIQUE_UNROLL, FUNC, T, NUM_RANKS, NUM_RANKS, NUM_RANKS, NUM_RANKS>(
|
||||
threadIdx.x, ALL_REDUCE_SPLIT_BLOCKSIZE, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN);
|
||||
}
|
||||
}
|
||||
|
||||
// Each GPU works on a separate subsection, however we cannot finish the kernel
|
||||
// until all GPUs have finished otherwise part of the result may not be correct yet
|
||||
WaitForBarrier<NUM_RANKS>(cliquePtrs.barrierCounter);
|
||||
}
|
||||
|
||||
class AllReduceCliqueKernel
|
||||
{
|
||||
public:
|
||||
static ncclResult_t Launch(int const rank,
|
||||
int const numRanks,
|
||||
int const maxGridSize,
|
||||
size_t const count,
|
||||
ncclDataType_t const datatype,
|
||||
ncclRedOp_t const op,
|
||||
hipStream_t const stream,
|
||||
cliqueDevicePtrs_t const& cliquePtrs,
|
||||
bool const doTiming = false)
|
||||
{
|
||||
if (numRanks < MIN_CLIQUE_SIZE || numRanks > MAX_CLIQUE_SIZE)
|
||||
{
|
||||
WARN("Number of ranks exceeds supported. Expected %d <= %d < %d for numRanks",
|
||||
MIN_CLIQUE_SIZE, numRanks, MAX_CLIQUE_SIZE);
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
|
||||
// Divide the # of elements done per GPU evenly across ranks, then round up to blocksize
|
||||
int baseSize = (count + numRanks - 1) / numRanks;
|
||||
int chunkSize = RoundUp(baseSize, ALL_REDUCE_SPLIT_BLOCKSIZE);
|
||||
int startIdx = min(chunkSize * rank, count);
|
||||
int stopIdx = min(startIdx + chunkSize, count);
|
||||
int rankN = max(stopIdx - startIdx, 0);
|
||||
|
||||
// Adjust gridsize if there isn't enough work to prevent empty workgroups
|
||||
int realGridSize = std::max(std::min(maxGridSize,
|
||||
(rankN + ALL_REDUCE_SPLIT_BLOCKSIZE - 1) / ALL_REDUCE_SPLIT_BLOCKSIZE),
|
||||
1);
|
||||
|
||||
hipEvent_t startEvent = 0;
|
||||
hipEvent_t stopEvent = 0;
|
||||
float kernelTimeMs;
|
||||
if (doTiming)
|
||||
{
|
||||
hipEventCreate(&startEvent);
|
||||
hipEventCreate(&stopEvent);
|
||||
hipEventRecord(startEvent, stream);
|
||||
}
|
||||
|
||||
// Launch even if empty for this rank, because all ranks must hit sync barrier
|
||||
hipLaunchKernelGGL(m_allReduceCliqueKernels[datatype][op][numRanks - MIN_CLIQUE_SIZE],
|
||||
dim3(realGridSize, 1, 1),
|
||||
dim3(ALL_REDUCE_SPLIT_BLOCKSIZE, 1, 1),
|
||||
0, stream,
|
||||
rankN, startIdx, cliquePtrs);
|
||||
|
||||
if (doTiming)
|
||||
{
|
||||
hipEventRecord(stopEvent, stream);
|
||||
hipEventSynchronize(stopEvent);
|
||||
hipEventElapsedTime(&kernelTimeMs, startEvent, stopEvent);
|
||||
printf("[%d/%d:%d] %lu %13.6f ms\n", rank, numRanks, maxGridSize, count, kernelTimeMs);
|
||||
hipEventDestroy(startEvent);
|
||||
hipEventDestroy(stopEvent);
|
||||
}
|
||||
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
protected:
|
||||
// List of all templated device kernels function pointers
|
||||
typedef void(*allReduceCliqueFunc_t)(int, size_t, cliqueDevicePtrs_t);
|
||||
static constexpr allReduceCliqueFunc_t
|
||||
m_allReduceCliqueKernels[ncclNumTypes][ncclNumOps][MAX_CLIQUE_SIZE - MIN_CLIQUE_SIZE + 1] =
|
||||
KERNEL_LIST_MACRO(AllReduceCliqueSplitKernel);
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,105 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef CLIQUE_COMMON_H
|
||||
#define CLIQUE_COMMON_H
|
||||
|
||||
#include "nccl.h"
|
||||
#include <cstdint>
|
||||
#include "rccl_bfloat16.h"
|
||||
#include "reduce_kernel.h"
|
||||
|
||||
#define MIN_CLIQUE_SIZE 2
|
||||
#define MAX_CLIQUE_SIZE 8
|
||||
|
||||
typedef struct
|
||||
{
|
||||
void const* inputs[MAX_CLIQUE_SIZE];
|
||||
void* outputs[MAX_CLIQUE_SIZE];
|
||||
int* barrierCounter;
|
||||
} cliqueDevicePtrs_t;
|
||||
|
||||
// Helper macro to generate a table of templated kernel functions
|
||||
// This is expected to run between MIN_CLIQUE_SIZE to MAX_CLIQUE_SIZE
|
||||
#define KERNEL_LIST_RANK(kernelname, datatype, func) \
|
||||
{ \
|
||||
kernelname<datatype, func<datatype>, 2>, \
|
||||
kernelname<datatype, func<datatype>, 3>, \
|
||||
kernelname<datatype, func<datatype>, 4>, \
|
||||
kernelname<datatype, func<datatype>, 5>, \
|
||||
kernelname<datatype, func<datatype>, 6>, \
|
||||
kernelname<datatype, func<datatype>, 7>, \
|
||||
kernelname<datatype, func<datatype>, 8> \
|
||||
}
|
||||
|
||||
// Helper macro to generate a table of templated kernel functions
|
||||
// This is expected to match the number of supported reduction operations (ncclNumOps)
|
||||
#define KERNEL_LIST_OP(kernelname, datatype) \
|
||||
{ \
|
||||
KERNEL_LIST_RANK(kernelname, datatype, FuncSum), \
|
||||
KERNEL_LIST_RANK(kernelname, datatype, FuncProd), \
|
||||
KERNEL_LIST_RANK(kernelname, datatype, FuncMax), \
|
||||
KERNEL_LIST_RANK(kernelname, datatype, FuncMin) \
|
||||
}
|
||||
|
||||
// Helper Macro to generate table of templated kernel functions
|
||||
// This is expected to match the number of supported datatypes (ncclNumTypes)
|
||||
#define KERNEL_LIST_MACRO(kernelname) \
|
||||
{ \
|
||||
KERNEL_LIST_OP(kernelname, int8_t), \
|
||||
KERNEL_LIST_OP(kernelname, uint8_t), \
|
||||
KERNEL_LIST_OP(kernelname, int32_t), \
|
||||
KERNEL_LIST_OP(kernelname, uint32_t), \
|
||||
KERNEL_LIST_OP(kernelname, int64_t), \
|
||||
KERNEL_LIST_OP(kernelname, uint64_t), \
|
||||
KERNEL_LIST_OP(kernelname, half), \
|
||||
KERNEL_LIST_OP(kernelname, float), \
|
||||
KERNEL_LIST_OP(kernelname, double), \
|
||||
KERNEL_LIST_OP(kernelname, rccl_bfloat16) \
|
||||
}
|
||||
|
||||
template <int NUM_RANKS>
|
||||
__forceinline__ __device__ void WaitForBarrier(int* counter)
|
||||
{
|
||||
if (threadIdx.x == 0 & blockIdx.x == 0)
|
||||
{
|
||||
// Assumes counter starts at 0 prior to any rank access
|
||||
__atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST);
|
||||
|
||||
// Wait for all ranks to reach barrier
|
||||
while (__atomic_load_n(counter, __ATOMIC_SEQ_CST) < NUM_RANKS);
|
||||
|
||||
// Each rank increments again, last one resets barrier
|
||||
if (__atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST) == (2*NUM_RANKS))
|
||||
__atomic_store_n(counter, 0, __ATOMIC_SEQ_CST);
|
||||
|
||||
// Wait for counter to be zeroed
|
||||
while (__atomic_load_n(counter, __ATOMIC_SEQ_CST) != 0);
|
||||
}
|
||||
}
|
||||
|
||||
__forceinline__ __host__ __device__ int RoundUp(int X, int Y)
|
||||
{
|
||||
return (X+Y-1)/Y * Y;
|
||||
}
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,399 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "CliqueManager.h"
|
||||
#include "CliqueShmNames.h"
|
||||
#include "MsgQueue.h"
|
||||
|
||||
#include "nccl.h"
|
||||
#include "core.h"
|
||||
|
||||
#include "Hash.h"
|
||||
|
||||
#include "AllReduceCliqueKernel.h"
|
||||
|
||||
#include <hip/hip_runtime.h>
|
||||
#include <stdio.h>
|
||||
#include <stdlib.h>
|
||||
#include <chrono>
|
||||
#include <iomanip>
|
||||
#include <iostream>
|
||||
#include <sstream>
|
||||
#include <thread>
|
||||
|
||||
cliqueDevicePtrs_t CliqueManager::m_cliquePtrs[NCCL_MAX_OPS] = {};
|
||||
uint32_t CliqueManager::m_staticCounters[NCCL_MAX_OPS] = {};
|
||||
int* CliqueManager::m_staticBarriers = NULL;
|
||||
|
||||
CliqueManager::CliqueManager(int const rank,
|
||||
int const numRanks,
|
||||
cliqueMode_t const cliqueMode) :
|
||||
m_rank(rank),
|
||||
m_numRanks(numRanks),
|
||||
m_cliqueMode(cliqueMode),
|
||||
m_init(false),
|
||||
m_deviceBarriers(NULL)
|
||||
{
|
||||
}
|
||||
|
||||
CliqueManager::~CliqueManager()
|
||||
{
|
||||
if (m_init)
|
||||
{
|
||||
CleanUp();
|
||||
}
|
||||
}
|
||||
|
||||
void CliqueManager::CleanUp()
|
||||
{
|
||||
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
|
||||
{
|
||||
// Release caches
|
||||
if (m_ipcHandleSendCache) delete m_ipcHandleSendCache;
|
||||
if (m_ipcHandleSendCache) delete m_ipcHandleRecvCache;
|
||||
|
||||
// Close shared memory
|
||||
m_shmHandles.Close();
|
||||
m_sharedCounters.Close();
|
||||
m_sharedBarrier.Close();
|
||||
|
||||
if (m_rank == 0)
|
||||
{
|
||||
hipFree(m_deviceBarriers);
|
||||
}
|
||||
}
|
||||
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
|
||||
{
|
||||
if (m_staticBarriers) hipHostFree(m_staticBarriers);
|
||||
}
|
||||
m_init = false;
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
|
||||
{
|
||||
ncclResult_t res;
|
||||
|
||||
if (m_init) return ncclSuccess;
|
||||
m_init = true;
|
||||
|
||||
// Check parameters
|
||||
if (m_rank < 0 || m_rank >= m_numRanks)
|
||||
{
|
||||
WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks);
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
if (commId == NULL)
|
||||
{
|
||||
WARN("CommId should not be empty");
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
|
||||
// For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var
|
||||
if (!getenv("RCCL_ENABLE_CLIQUE"))
|
||||
{
|
||||
if (m_rank == 0) INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)");
|
||||
m_cliqueMode = CLIQUE_DISABLED;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
unsigned long hash = djb2Hash(commId->internal);
|
||||
std::string shmSuffix = std::to_string(hash) + "_" + std::to_string(suffix);
|
||||
|
||||
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
|
||||
{
|
||||
// Initialize shared memory file for IPC handles (based on commId hash)
|
||||
m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix);
|
||||
NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback);
|
||||
|
||||
// Initialize IPC caches
|
||||
m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS);
|
||||
m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS,
|
||||
100,
|
||||
hipIpcMemHandleHash,
|
||||
hipIpcMemHandleEqual);
|
||||
|
||||
// Initialize shared host barrier counters
|
||||
m_sharedCounters = ShmObject<uint32_t>(NCCL_MAX_OPS * sizeof(uint32_t),
|
||||
CliqueShmNames["SharedCounters"] + shmSuffix,
|
||||
m_rank,
|
||||
m_numRanks,
|
||||
hash);
|
||||
NCCLCHECKGOTO(m_sharedCounters.Open(), res, dropback);
|
||||
m_arrivalCounter = m_sharedCounters.Get();
|
||||
|
||||
// Initialized shared barriers
|
||||
m_sharedBarrier = ShmObject<hipIpcMemHandle_t>(std::max(4096LU, sizeof(hipIpcMemHandle_t)),
|
||||
CliqueShmNames["Barriers"] + shmSuffix,
|
||||
m_rank,
|
||||
m_numRanks,
|
||||
hash);
|
||||
NCCLCHECKGOTO(m_sharedBarrier.Open(), res, dropback);
|
||||
|
||||
if (m_rank == 0)
|
||||
{
|
||||
hipIpcMemHandle_t handle;
|
||||
// Allocate fine-grained device memory on rank 0 and get handle for it and store in IPC
|
||||
NCCLCHECKGOTO(ncclCudaCalloc(&m_deviceBarriers, NCCL_MAX_OPS * sizeof(int), true), res, dropback);
|
||||
if (hipIpcGetMemHandle(&handle, m_deviceBarriers) != hipSuccess)
|
||||
{
|
||||
WARN("Unable to get IPC handle for barrier memory");
|
||||
goto dropback;
|
||||
}
|
||||
|
||||
*m_sharedBarrier.Get() = handle;
|
||||
}
|
||||
}
|
||||
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
|
||||
{
|
||||
// Allocate and zero pinned host memory that all GPU kernels will have access to as a barrier
|
||||
if (hipHostMalloc(&m_staticBarriers, sizeof(int) * NCCL_MAX_OPS) != hipSuccess)
|
||||
{
|
||||
WARN("Unable to allocated pinned host memory for clique barrier. Disabling clique-based kernels");
|
||||
m_cliqueMode = CLIQUE_DISABLED;
|
||||
m_init = true;
|
||||
return ncclSuccess;
|
||||
}
|
||||
memset(m_staticBarriers, 0, NCCL_MAX_OPS * sizeof(int));
|
||||
m_arrivalCounter = m_staticCounters;
|
||||
}
|
||||
m_init = true;
|
||||
return ncclSuccess;
|
||||
|
||||
dropback:
|
||||
WARN("Unable to initialize shared memory. Disabling clique-based kernels");
|
||||
CleanUp();
|
||||
m_cliqueMode = CLIQUE_DISABLED;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
bool CliqueManager::IsSupported(ncclFunc_t const coll,
|
||||
size_t const count,
|
||||
ncclDataType_t const datatype,
|
||||
ncclRedOp_t const op) const
|
||||
{
|
||||
if (m_cliqueMode == CLIQUE_DISABLED) return false;
|
||||
if (coll == ncclCollAllReduce) return true;
|
||||
|
||||
// NOTE: Currently we only support allReduce
|
||||
//#define ALL_REDUCE_COUNT 1048576
|
||||
//if (coll == ncclCollAllReduce && count < ALL_REDUCE_COUNT) return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr)
|
||||
{
|
||||
// Do nothing if disabled
|
||||
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
|
||||
|
||||
if (!m_init)
|
||||
{
|
||||
WARN("CliqueManager must be initialized before use");
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
|
||||
int const opIndex = opCount % NCCL_MAX_OPS;
|
||||
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
|
||||
{
|
||||
// Get fine-grained device memory if not already done
|
||||
if (m_deviceBarriers == NULL)
|
||||
{
|
||||
hipIpcMemHandle_t handle = *m_sharedBarrier.Get();
|
||||
CUDACHECK(hipIpcOpenMemHandle((void**)&m_deviceBarriers, handle, hipIpcMemLazyEnablePeerAccess));
|
||||
}
|
||||
|
||||
std::vector<hipIpcMemHandle_t> handles(NUM_HANDLES_PER_RANK);
|
||||
|
||||
// Get IPC handles for input/output pointers from cache
|
||||
NCCLCHECK(CheckCacheForPtr(const_cast<void*>(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0]));
|
||||
NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1]));
|
||||
|
||||
// Write IPC handles to shared memory for given rank / opCount
|
||||
NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles));
|
||||
|
||||
}
|
||||
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
|
||||
{
|
||||
// Store this rank's input/output pointers into static member
|
||||
m_cliquePtrs[opIndex].inputs[m_rank] = inputPtr;
|
||||
m_cliquePtrs[opIndex].outputs[m_rank] = outputPtr;
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::QueueKernel(uint64_t const opCount,
|
||||
ncclFunc_t const coll,
|
||||
size_t const count,
|
||||
ncclDataType_t const datatype,
|
||||
ncclRedOp_t const op,
|
||||
int const root,
|
||||
hipStream_t const stream)
|
||||
{
|
||||
// Do nothing if disabled
|
||||
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
|
||||
if (!m_init)
|
||||
{
|
||||
WARN("CliqueManager must be initialized before use");
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
|
||||
// Wait for all ranks to declare pointers
|
||||
int opIndex = opCount % NCCL_MAX_OPS;
|
||||
WaitForBarrier(opIndex);
|
||||
|
||||
// Get cliqueDevicePointers
|
||||
cliqueDevicePtrs_t cliquePtrs;
|
||||
NCCLCHECK(GetCliqueDevicePointers(opCount, cliquePtrs));
|
||||
|
||||
// NOTE: The number of blocks to use per GPU will need to be further optimized
|
||||
int gridSize = (getenv("RCCL_CLIQUE_GRIDSIZE") ? atoi(getenv("RCCL_CLIQUE_GRIDSIZE")) : 2);
|
||||
|
||||
// Launch kernel
|
||||
switch (coll)
|
||||
{
|
||||
case ncclCollAllReduce:
|
||||
return AllReduceCliqueKernel::Launch(m_rank, m_numRanks, gridSize, count, datatype, op, stream, cliquePtrs);
|
||||
default:
|
||||
WARN("Unsupported collective type");
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::GetCliqueDevicePointers(uint64_t opCount, cliqueDevicePtrs_t& cliquePtrs)
|
||||
{
|
||||
// Wait for completion for current opCount
|
||||
int opIndex = opCount % NCCL_MAX_OPS;
|
||||
|
||||
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
|
||||
{
|
||||
// Collect the ready handles from shared memory and convert them to device pointers
|
||||
int numHandles = m_numRanks * NUM_HANDLES_PER_RANK;
|
||||
std::vector<hipIpcMemHandle_t> handles(numHandles);
|
||||
|
||||
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
|
||||
|
||||
for (int i = 0; i < m_numRanks; i++)
|
||||
{
|
||||
void *input;
|
||||
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
|
||||
m_ipcHandleRecvCache, &input));
|
||||
cliquePtrs.inputs[i] = const_cast<const void *>(input);
|
||||
|
||||
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
|
||||
m_ipcHandleRecvCache, &cliquePtrs.outputs[i]));
|
||||
}
|
||||
cliquePtrs.barrierCounter = &(m_deviceBarriers[opIndex]);
|
||||
}
|
||||
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
|
||||
{
|
||||
m_cliquePtrs[opIndex].barrierCounter = &m_staticBarriers[opIndex];
|
||||
cliquePtrs = m_cliquePtrs[opIndex];
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr,
|
||||
NcclIpcHandleSendCache* cache,
|
||||
int rank,
|
||||
hipIpcMemHandle_t* handle)
|
||||
{
|
||||
uint64_t addr = (uint64_t)devPtr;
|
||||
|
||||
// handle NULL ptr case
|
||||
if (addr == 0)
|
||||
{
|
||||
WARN("Error while checking IPC memory handle cache for ptr: null pointer specified.\n");
|
||||
return ncclInternalError;
|
||||
}
|
||||
|
||||
NcclIpcHandleSendCache::iterator it = cache->find(addr);
|
||||
|
||||
if (it == cache->end())
|
||||
{
|
||||
CUDACHECK(hipIpcGetMemHandle(handle, devPtr));
|
||||
std::pair<uint64_t, hipIpcMemHandle_t> ptrHandleMap(addr, *handle) ;
|
||||
cache->insert(addr, *handle);
|
||||
}
|
||||
else
|
||||
{
|
||||
*handle = (it->second).first;
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::CheckCacheForHandle(hipIpcMemHandle_t handle,
|
||||
NcclIpcHandleRecvCache* cache,
|
||||
void** ptr)
|
||||
{
|
||||
NcclIpcHandleRecvCache::iterator it = cache->find(handle);
|
||||
|
||||
if (it == cache->end())
|
||||
{
|
||||
CUDACHECK(hipIpcOpenMemHandle(ptr, handle, hipIpcMemLazyEnablePeerAccess));
|
||||
cache->insert(handle, *ptr);
|
||||
}
|
||||
else
|
||||
{
|
||||
*ptr = (it->second).first;
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
void CliqueManager::WaitForBarrier(int opIndex)
|
||||
{
|
||||
m_nextBarrierValue[opIndex] += m_numRanks;
|
||||
int const nextValue = m_nextBarrierValue[opIndex];
|
||||
|
||||
__atomic_add_fetch(&m_arrivalCounter[opIndex], 1, __ATOMIC_SEQ_CST);
|
||||
while (m_arrivalCounter[opIndex] < nextValue)
|
||||
{
|
||||
std::this_thread::yield();
|
||||
}
|
||||
}
|
||||
|
||||
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
|
||||
{
|
||||
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
|
||||
{
|
||||
int msgid, fd;
|
||||
std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid);
|
||||
SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd);
|
||||
NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid));
|
||||
SYSCHECK(close(fd), "close");
|
||||
}
|
||||
|
||||
std::string shmDir = "/dev/shm/";
|
||||
|
||||
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
|
||||
{
|
||||
struct stat fileStatus;
|
||||
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
|
||||
std::string shmFullPath = shmDir + shmFileName;
|
||||
|
||||
// Check if shm file already exists; if so, unlink it
|
||||
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
|
||||
{
|
||||
NCCLCHECK(shmUnlink(shmFileName.c_str()));
|
||||
}
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
@@ -0,0 +1,121 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef RCCL_CLIQUE_MANAGER_HPP_
|
||||
#define RCCL_CLIQUE_MANAGER_HPP_
|
||||
|
||||
#include <semaphore.h>
|
||||
#include <mutex>
|
||||
|
||||
#include "nccl.h"
|
||||
#include "devcomm.h"
|
||||
#include "CliqueCommon.h"
|
||||
#include "HandleCache.h"
|
||||
#include "HandleShm.h"
|
||||
|
||||
#define NUM_HANDLES_PER_RANK 2
|
||||
|
||||
class CliqueManager
|
||||
{
|
||||
public:
|
||||
typedef enum
|
||||
{
|
||||
CLIQUE_DISABLED = 0,
|
||||
CLIQUE_SINGLE_PROCESS = 1,
|
||||
CLIQUE_SINGLE_NODE = 2
|
||||
} cliqueMode_t;
|
||||
|
||||
CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode);
|
||||
|
||||
~CliqueManager();
|
||||
|
||||
void CleanUp();
|
||||
|
||||
ncclResult_t Init(ncclUniqueId const* commId, int suffix);
|
||||
|
||||
// Returns true if the collective is supported via a clique-based kernel
|
||||
bool IsSupported(ncclFunc_t const coll,
|
||||
size_t const count,
|
||||
ncclDataType_t const datatype,
|
||||
ncclRedOp_t const op) const;
|
||||
|
||||
// Provide the pointers to be exchanged across the clique for the given rank / opCount
|
||||
ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr);
|
||||
|
||||
// Launch a clique based kernel
|
||||
ncclResult_t QueueKernel(uint64_t const opCount,
|
||||
ncclFunc_t const coll,
|
||||
size_t const count,
|
||||
ncclDataType_t const datatype,
|
||||
ncclRedOp_t const op,
|
||||
int const root,
|
||||
hipStream_t const stream);
|
||||
|
||||
ncclResult_t CloseSharedMemory();
|
||||
|
||||
static ncclResult_t BootstrapRootInit(int pid, unsigned long hash);
|
||||
|
||||
protected:
|
||||
// Collect the device pointers from all GPUs for specified opCount
|
||||
ncclResult_t GetCliqueDevicePointers(uint64_t opCount, cliqueDevicePtrs_t& cliquePtrs);
|
||||
|
||||
|
||||
ncclResult_t CheckCacheForPtr(void* devPtr,
|
||||
NcclIpcHandleSendCache* cache,
|
||||
int rank,
|
||||
hipIpcMemHandle_t* handle);
|
||||
|
||||
ncclResult_t CheckCacheForHandle(hipIpcMemHandle_t handle,
|
||||
NcclIpcHandleRecvCache* cache,
|
||||
void** ptr);
|
||||
|
||||
// Race-condition helper functions
|
||||
void WaitForBarrier(int opIndex);
|
||||
|
||||
cliqueMode_t m_cliqueMode;
|
||||
int m_rank;
|
||||
int m_numRanks;
|
||||
bool m_init;
|
||||
int m_nextBarrierValue[NCCL_MAX_OPS];
|
||||
uint32_t* m_arrivalCounter;
|
||||
|
||||
// IPC-related (CLIQUE_SINGLE_NODE)
|
||||
NcclIpcHandleShm m_shmHandles;
|
||||
NcclIpcHandleSendCache* m_ipcHandleSendCache;
|
||||
NcclIpcHandleRecvCache* m_ipcHandleRecvCache;
|
||||
ShmObject<uint32_t> m_sharedCounters; // Tracks # of ranks that have finished declaring pointers
|
||||
ShmObject<hipIpcMemHandle_t> m_sharedBarrier; // Used to pass fine-grained device memory buffer
|
||||
int* m_deviceBarriers; // fine-grained barrier
|
||||
|
||||
// Single-process (CLIQUE_SINGLE_PROCESS)
|
||||
static cliqueDevicePtrs_t m_cliquePtrs[NCCL_MAX_OPS];
|
||||
static uint32_t m_staticCounters[NCCL_MAX_OPS];
|
||||
static int* m_staticBarriers;
|
||||
};
|
||||
|
||||
// For use in bootstrapping code
|
||||
struct bootstrapRootStruct {
|
||||
void* listenComm;
|
||||
unsigned long hash;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,37 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef NCCL_CLIQUE_SHM_NAMES_H_
|
||||
#define NCCL_CLIQUE_SHM_NAMES_H_
|
||||
|
||||
#include <string>
|
||||
#include <map>
|
||||
|
||||
static std::map<std::string, std::string> CliqueShmNames =
|
||||
{
|
||||
{"SharedCounters", "RcclCounters" },
|
||||
{"Mutexes" , "RcclMutexes" },
|
||||
{"IpcHandles" , "RcclIpcHandles"},
|
||||
{"Barriers" , "RcclBarriers" }
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "HandleCache.h"
|
||||
|
||||
#include "Hash.h"
|
||||
|
||||
// djb2 hash function for hashing char array in hipIpcMemHandle_t
|
||||
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle)
|
||||
{
|
||||
return djb2Hash(handle.reserved);
|
||||
}
|
||||
@@ -0,0 +1,141 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef NCCL_HANDLE_CACHE_H_
|
||||
#define NCCL_HANDLE_CACHE_H_
|
||||
|
||||
#include <list>
|
||||
#include <unordered_map>
|
||||
#include <functional>
|
||||
|
||||
#include "core.h"
|
||||
|
||||
//#include "llvm/ADT/DenseMap.h"
|
||||
|
||||
template <
|
||||
class Key,
|
||||
class Value,
|
||||
class Hash,
|
||||
class KeyEqual,
|
||||
class Allocator
|
||||
>
|
||||
class NcclIpcHandleCache
|
||||
{
|
||||
typedef std::unordered_map<Key, std::pair<Value, typename std::list<Key>::iterator>, Hash, KeyEqual, Allocator> LRUCache;
|
||||
public:
|
||||
using iterator = typename LRUCache::iterator;
|
||||
NcclIpcHandleCache(size_t size,
|
||||
size_t bucket_count = 100,
|
||||
const Hash& hash = Hash(),
|
||||
const KeyEqual& eql = KeyEqual(),
|
||||
const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc)
|
||||
{
|
||||
m_capacity = size;
|
||||
}
|
||||
|
||||
~NcclIpcHandleCache()
|
||||
{
|
||||
m_lruHistory.clear();
|
||||
m_cache.clear();
|
||||
}
|
||||
|
||||
iterator begin()
|
||||
{
|
||||
return m_cache.begin();
|
||||
}
|
||||
|
||||
iterator end()
|
||||
{
|
||||
return m_cache.end();
|
||||
}
|
||||
|
||||
iterator find(const Key& key)
|
||||
{
|
||||
iterator it = m_cache.find(key);
|
||||
if (it != m_cache.end())
|
||||
{
|
||||
updateHistory(key);
|
||||
}
|
||||
|
||||
return it;
|
||||
}
|
||||
|
||||
std::pair<iterator, bool> insert(const Key& key, const Value& value)
|
||||
{
|
||||
if (m_cache.size() == m_capacity)
|
||||
{
|
||||
// remove entry
|
||||
pop();
|
||||
}
|
||||
|
||||
typename LRUCache::iterator it = m_cache.find(key);
|
||||
bool inserted;
|
||||
if (it == m_cache.end())
|
||||
{
|
||||
typename std::list<Key>::iterator it = m_lruHistory.insert(m_lruHistory.end(), key);
|
||||
m_cache.insert(std::make_pair(key, std::make_pair(value, it)));
|
||||
inserted = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
inserted = false;
|
||||
}
|
||||
|
||||
return std::pair<iterator, bool>(it, inserted);
|
||||
}
|
||||
|
||||
private:
|
||||
void pop()
|
||||
{
|
||||
typename LRUCache::iterator it = m_cache.find(m_lruHistory.front());
|
||||
m_cache.erase(it);
|
||||
m_lruHistory.pop_front();
|
||||
}
|
||||
|
||||
void updateHistory(const Key& key)
|
||||
{
|
||||
if (m_lruHistory.size() > 0)
|
||||
{
|
||||
m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, m_cache[key].second);
|
||||
}
|
||||
}
|
||||
size_t m_capacity;
|
||||
std::list<Key> m_lruHistory;
|
||||
LRUCache m_cache;
|
||||
};
|
||||
|
||||
// djb2 hash function for hashing char array in hipIpcMemHandle_t
|
||||
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle);
|
||||
|
||||
// equality function required for unordered_map
|
||||
auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r)
|
||||
{
|
||||
return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0;
|
||||
};
|
||||
|
||||
//typedef llvm::DenseMap<uint64_t, hipIpcMemHandle_t> SendCache;
|
||||
//typedef llvm::DenseMap<hipIpcMemHandle_t, void*, decltype(&HandleHash), decltype(HandleEqual)> RecvCache;
|
||||
|
||||
typedef NcclIpcHandleCache<uint64_t, hipIpcMemHandle_t, std::hash<uint64_t>, std::equal_to<uint64_t>, std::allocator< std::pair<const uint64_t, hipIpcMemHandle_t>>> NcclIpcHandleSendCache;
|
||||
typedef NcclIpcHandleCache<hipIpcMemHandle_t, void*, decltype(&hipIpcMemHandleHash), decltype(hipIpcMemHandleEqual), std::allocator< std::pair<const uint64_t, hipIpcMemHandle_t>>> NcclIpcHandleRecvCache;
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include <hip/hip_runtime.h>
|
||||
|
||||
#include "HandleShm.h"
|
||||
#include "CliqueShmNames.h"
|
||||
#include "core.h"
|
||||
#include "Hash.h"
|
||||
#include "shm.h"
|
||||
|
||||
NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix) :
|
||||
ShmObject<hipIpcMemHandle_t>(numRanks * numHandlesPerRank * capacity * sizeof(hipIpcMemHandle_t),
|
||||
CliqueShmNames["IpcHandles"] + suffix,
|
||||
rank,
|
||||
numRanks,
|
||||
projid),
|
||||
m_numHandlesPerRank(numHandlesPerRank),
|
||||
m_numHandlesPerOpCount(numRanks * numHandlesPerRank)
|
||||
{
|
||||
}
|
||||
|
||||
NcclIpcHandleShm::NcclIpcHandleShm()
|
||||
{
|
||||
}
|
||||
|
||||
NcclIpcHandleShm::~NcclIpcHandleShm()
|
||||
{
|
||||
}
|
||||
|
||||
ncclResult_t NcclIpcHandleShm::Open()
|
||||
{
|
||||
return ShmObject::Open();
|
||||
}
|
||||
|
||||
ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t> const& sendHandles)
|
||||
{
|
||||
size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank);
|
||||
memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(hipIpcMemHandle_t) * m_numHandlesPerRank);
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t>& recvHandles)
|
||||
{
|
||||
size_t idx = opCount * m_numHandlesPerOpCount;
|
||||
memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(hipIpcMemHandle_t));
|
||||
return ncclSuccess;
|
||||
}
|
||||
@@ -0,0 +1,53 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef NCCL_IPC_HANDLE_SHM_H_
|
||||
#define NCCL_IPC_HANDLE_SHM_H_
|
||||
|
||||
#include <hip/hip_runtime.h>
|
||||
#include <vector>
|
||||
#include <string>
|
||||
|
||||
#include "nccl.h"
|
||||
#include "ShmObject.h"
|
||||
|
||||
class NcclIpcHandleShm : public ShmObject<hipIpcMemHandle_t>
|
||||
{
|
||||
public:
|
||||
NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix);
|
||||
|
||||
NcclIpcHandleShm();
|
||||
|
||||
~NcclIpcHandleShm();
|
||||
|
||||
ncclResult_t Open();
|
||||
|
||||
ncclResult_t WriteHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t> const& sendHandles);
|
||||
|
||||
ncclResult_t ReadHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t>& recvHandles);
|
||||
|
||||
private:
|
||||
int m_numHandlesPerRank;
|
||||
int m_numHandlesPerOpCount;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,34 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "Hash.h"
|
||||
|
||||
unsigned long djb2Hash(const char* data)
|
||||
{
|
||||
unsigned long hash = 5381;
|
||||
int c;
|
||||
|
||||
while ((c = *(data)++))
|
||||
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
|
||||
|
||||
return hash;
|
||||
}
|
||||
@@ -0,0 +1,28 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef NCCL_HASH_H_
|
||||
#define NCCL_HASH_H_
|
||||
|
||||
unsigned long djb2Hash(const char* data);
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,72 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "MsgQueue.h"
|
||||
|
||||
#include <sys/ipc.h>
|
||||
#include <sys/msg.h>
|
||||
|
||||
#define MSG_QUEUE_PERM 0666
|
||||
|
||||
ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid)
|
||||
{
|
||||
key_t key;
|
||||
SYSCHECKVAL(ftok(name.c_str(), projid), "ftok", key);
|
||||
int flag = (exclusive == true ? IPC_CREAT | IPC_EXCL : IPC_CREAT);
|
||||
|
||||
msgid = msgget(key, MSG_QUEUE_PERM | flag);
|
||||
// Check if we're trying to create message queue and it already exists; if so, delete existing queue
|
||||
if (msgid == -1 && exclusive == true && errno == EEXIST)
|
||||
{
|
||||
NCCLCHECK(MsgQueueClose(name, projid));
|
||||
SYSCHECKVAL(msgget(key, MSG_QUEUE_PERM | flag), "msgget", msgid);
|
||||
}
|
||||
else if (msgid == -1)
|
||||
{
|
||||
WARN("Call to MsgQueueGetId failed : %s", strerror(errno));
|
||||
return ncclSystemError;
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg)
|
||||
{
|
||||
SYSCHECK(msgsnd(msgid, msgp, msgsz, msgflg), "msgsnd");
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait)
|
||||
{
|
||||
int msgflg = (wait == false ? IPC_NOWAIT : 0);
|
||||
SYSCHECK(msgrcv(msgid, msgp, msgsz, msgtyp, msgflg), "msgrcv");
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t MsgQueueClose(std::string name, int projid)
|
||||
{
|
||||
key_t key;
|
||||
int msgid;
|
||||
key = ftok(name.c_str(), projid);
|
||||
SYSCHECKVAL(msgget(key, IPC_CREAT), "msgget", msgid);
|
||||
SYSCHECK(msgctl(msgid, IPC_RMID, NULL), "msgctl");
|
||||
return ncclSuccess;
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef RCCL_MSG_QUEUE_HPP_
|
||||
#define RCCL_MSG_QUEUE_HPP_
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "nccl.h"
|
||||
#include "core.h"
|
||||
|
||||
struct MsgBuffer
|
||||
{
|
||||
long msg_type;
|
||||
char msg_text[1];
|
||||
};
|
||||
|
||||
ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid);
|
||||
ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg);
|
||||
ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait);
|
||||
ncclResult_t MsgQueueClose(std::string name, int projid);
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,43 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef SHAREDMEMHELPER_H
|
||||
#define SHAREDMEMHELPER_H
|
||||
|
||||
|
||||
class SharedMemHelper
|
||||
{
|
||||
public:
|
||||
SharedMemHelper(int rank, int numRanks, int numEntries);
|
||||
|
||||
ncclStatus_t Init(std::string const& baseFilename);
|
||||
|
||||
ncclStatus_t
|
||||
|
||||
|
||||
protected:
|
||||
bool m_initialized;
|
||||
int m_rank;
|
||||
int m_numRanks;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,45 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include "ShmObject.h"
|
||||
#include <string>
|
||||
|
||||
// Template specializations for sem_t objects which require additional initialization
|
||||
template<>
|
||||
ncclResult_t ShmObject<sem_t>::Close()
|
||||
{
|
||||
size_t numMutexes = m_shmSize / sizeof(sem_t);
|
||||
|
||||
for (size_t i = 0; i < numMutexes; i++)
|
||||
{
|
||||
sem_destroy(static_cast<sem_t*>(&m_shmPtr[i]));
|
||||
}
|
||||
|
||||
int retVal = shm_unlink(m_shmName.c_str());
|
||||
if (retVal == -1 && errno != ENOENT)
|
||||
{
|
||||
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
|
||||
return ncclSystemError;
|
||||
}
|
||||
|
||||
return ncclSuccess;
|
||||
}
|
||||
@@ -0,0 +1,203 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef NCCL_SHM_OBJECT_H_
|
||||
#define NCCL_SHM_OBJECT_H_
|
||||
|
||||
#include <string>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
#include <fcntl.h>
|
||||
#include <type_traits>
|
||||
#include <semaphore.h>
|
||||
|
||||
#include "MsgQueue.h"
|
||||
#include "nccl.h"
|
||||
#include "core.h"
|
||||
#include "shm.h"
|
||||
|
||||
// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared
|
||||
// memory object at the same time.
|
||||
|
||||
static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) {
|
||||
*fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR);
|
||||
if (*fd == -1) return ncclSystemError;
|
||||
if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate");
|
||||
SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap");
|
||||
close(*fd);
|
||||
*fd = -1;
|
||||
if (create) memset(*ptr, 0, shmsize);
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
class ShmObject
|
||||
{
|
||||
public:
|
||||
ShmObject(size_t size, std::string fileName, int rank, int numRanks, int projid) :
|
||||
m_shmSize(size),
|
||||
m_shmName(fileName),
|
||||
m_rank(rank),
|
||||
m_numRanks(numRanks),
|
||||
m_projid(projid),
|
||||
m_alloc(false),
|
||||
m_shmPtr(nullptr) {}
|
||||
|
||||
ShmObject() {}
|
||||
|
||||
~ShmObject() {}
|
||||
|
||||
ncclResult_t Open();
|
||||
|
||||
ncclResult_t Close()
|
||||
{
|
||||
if (m_alloc)
|
||||
{
|
||||
if (m_rank == 0)
|
||||
{
|
||||
std::string tmpFileName = "/tmp/" + m_shmName;
|
||||
remove(tmpFileName.c_str());
|
||||
}
|
||||
int retVal = shm_unlink(m_shmName.c_str());
|
||||
if (retVal == -1 && errno != ENOENT)
|
||||
{
|
||||
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
|
||||
return ncclSystemError;
|
||||
}
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
T*& Get()
|
||||
{
|
||||
return m_shmPtr;
|
||||
}
|
||||
protected:
|
||||
ncclResult_t BroadcastMessage(int msgid, bool pass)
|
||||
{
|
||||
MsgBuffer msg;
|
||||
msg.msg_text[0] = (pass == 0 ? 'F': 'P');
|
||||
for (int rank = 0; rank < m_numRanks; rank++)
|
||||
{
|
||||
if (rank == m_rank) continue;
|
||||
msg.msg_type = rank;
|
||||
NCCLCHECK(MsgQueueSend(msgid, &msg, sizeof(msg), 0));
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
// tag for dispatch
|
||||
template<class U>
|
||||
struct OpenTag{};
|
||||
|
||||
ncclResult_t InitIfSemaphore(OpenTag<int> tag);
|
||||
ncclResult_t InitIfSemaphore(OpenTag<uint32_t> tag);
|
||||
ncclResult_t InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag);
|
||||
ncclResult_t InitIfSemaphore(OpenTag<sem_t> tag);
|
||||
|
||||
size_t m_shmSize;
|
||||
std::string m_shmName;
|
||||
int m_rank;
|
||||
int m_numRanks;
|
||||
int m_projid;
|
||||
bool m_alloc;
|
||||
T* m_shmPtr;
|
||||
};
|
||||
|
||||
template <typename T>
|
||||
ncclResult_t ShmObject<T>::Open()
|
||||
{
|
||||
if (m_alloc == false)
|
||||
{
|
||||
int shmFd;
|
||||
int protection = PROT_READ | PROT_WRITE;
|
||||
int visibility = MAP_SHARED;
|
||||
|
||||
int msgid;
|
||||
std::string tmpFileName = "/tmp/" + m_shmName;
|
||||
NCCLCHECK(MsgQueueGetId(tmpFileName, m_projid, false, msgid));
|
||||
if (m_rank == 0)
|
||||
{
|
||||
ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1);
|
||||
ncclResult_t resultSemInit = InitIfSemaphore(OpenTag<T>{});
|
||||
if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess))
|
||||
{
|
||||
NCCLCHECK(BroadcastMessage(msgid, false));
|
||||
WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno));
|
||||
return ncclSystemError;
|
||||
}
|
||||
NCCLCHECK(BroadcastMessage(msgid, true));
|
||||
}
|
||||
else
|
||||
{
|
||||
MsgBuffer msg;
|
||||
NCCLCHECK(MsgQueueRecv(msgid, &msg, sizeof(msg), m_rank, true));
|
||||
if (msg.msg_text[0] == 'P')
|
||||
{
|
||||
NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0));
|
||||
}
|
||||
else
|
||||
{
|
||||
WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno));
|
||||
return ncclSystemError;
|
||||
}
|
||||
}
|
||||
m_alloc = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
WARN("Cannot allocate ShmObject twice.\n");
|
||||
return ncclInvalidUsage;
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<int> tag)
|
||||
{
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<unsigned int> tag)
|
||||
{
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag)
|
||||
{
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<sem_t> tag)
|
||||
{
|
||||
size_t numMutexes = m_shmSize / sizeof(sem_t);
|
||||
|
||||
for (size_t i = 0; i < numMutexes; i++)
|
||||
{
|
||||
SYSCHECK(sem_init(static_cast<sem_t*>(&m_shmPtr[i]), 1, 1), "sem_init");
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
#endif
|
||||
@@ -350,9 +350,14 @@ __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(int32_t); }
|
||||
__device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(Pack128); }
|
||||
#endif
|
||||
|
||||
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
|
||||
// Multiply UNROLL by 2 if single source/single destination
|
||||
#define AUTOUNROLL (UNROLL*((MINSRCS==1 && MINDSTS==1) ? 2 : 1))
|
||||
#else
|
||||
// Try to limit consecutive load/stores to 8.
|
||||
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
|
||||
#define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS)))
|
||||
#endif
|
||||
|
||||
template<int UNROLL, class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
|
||||
__device__ void ReduceOrCopyMulti(const int tid, const int nthreads,
|
||||
|
||||
@@ -617,6 +617,31 @@ end:
|
||||
info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count,
|
||||
info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream);
|
||||
|
||||
// [RCCL] Alternative launch path for clique-based kernels (if supported and enabled)
|
||||
{
|
||||
if (info->comm->cliqueManager->IsSupported(info->coll,
|
||||
info->count,
|
||||
info->datatype,
|
||||
info->op))
|
||||
{
|
||||
// Declare the input / output pointers being used (to exchange via IPC with other ranks)
|
||||
NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount,
|
||||
info->sendbuff,
|
||||
info->recvbuff));
|
||||
|
||||
// Queue the clique-based kernel
|
||||
NCCLCHECK(info->comm->cliqueManager->QueueKernel(info->comm->opCount,
|
||||
info->coll,
|
||||
info->count,
|
||||
info->datatype,
|
||||
info->op,
|
||||
info->root,
|
||||
info->stream));
|
||||
return ncclSuccess;
|
||||
}
|
||||
}
|
||||
// [/RCCL]
|
||||
|
||||
NCCLCHECK(ncclSaveKernel(info));
|
||||
NCCLCHECK(ncclBarrierEnqueue(info->comm));
|
||||
NCCLCHECK(ncclBarrierEnqueueWait(info->comm));
|
||||
|
||||
@@ -12,7 +12,7 @@
|
||||
ncclResult_t bootstrapNetInit();
|
||||
ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv);
|
||||
ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out);
|
||||
ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState);
|
||||
ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState, int* rootPid); // [RCCL] Adding rootPid
|
||||
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size);
|
||||
ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size);
|
||||
ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size);
|
||||
|
||||
+9
-2
@@ -10,6 +10,9 @@
|
||||
|
||||
#include "transport.h"
|
||||
#include "p2p.h"
|
||||
// [RCCL]
|
||||
#include "clique/CliqueManager.h"
|
||||
// [/RCCL]
|
||||
|
||||
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
|
||||
#else
|
||||
@@ -143,8 +146,12 @@ struct ncclComm {
|
||||
//list of async p2p operation queued in a group semantics
|
||||
struct ncclP2Plist p2plist;
|
||||
|
||||
// RCCL AllToAll/Scatter/Gather API
|
||||
bool alltoallDisable;
|
||||
// [RCCL]
|
||||
bool alltoallDisable; // RCCL AllToAll/Scatter/Gather API
|
||||
CliqueManager* cliqueManager; // CliqueManager handles pointer collection / distribution for clique-based kernels
|
||||
int rootPid; // Process ID of root
|
||||
// [/RCCL]
|
||||
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
+46
-1
@@ -28,6 +28,10 @@
|
||||
#include <unistd.h>
|
||||
#include "graph/topo.h"
|
||||
|
||||
// [RCCL]
|
||||
#include "clique/CliqueManager.h"
|
||||
// [/RCCL]
|
||||
|
||||
#define STR2(v) #v
|
||||
#define STR(v) STR2(v)
|
||||
|
||||
@@ -678,7 +682,10 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
|
||||
int nranks = comm->nRanks;
|
||||
uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
|
||||
TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
|
||||
NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));
|
||||
// [RCCL] Collect the PID of the root
|
||||
int rootPid;
|
||||
NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap, &rootPid));
|
||||
// [/RCCL]
|
||||
|
||||
// AllGather1 - begin
|
||||
struct {
|
||||
@@ -1021,6 +1028,40 @@ affinity_restore:
|
||||
}
|
||||
NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, allGather1Data[intraRank0].comm));
|
||||
|
||||
{ // [RCCL] Check if clique-based kernels can be enabled and initialize CliqueManager if so
|
||||
CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED;
|
||||
if (comm->localRanks == comm->nRanks)
|
||||
{
|
||||
// Check that all the GPUs have peer access to one another
|
||||
bool hasPeerAccess = true;
|
||||
for (int i = 0; i < nranks && hasPeerAccess; i++)
|
||||
{
|
||||
int cudaDev1 = allGather1Data[i].peerInfo.cudaDev;
|
||||
for (int j = 0; j < nranks; j++)
|
||||
{
|
||||
if (i == j) continue;
|
||||
int cudaDev2 = allGather1Data[j].peerInfo.cudaDev;
|
||||
int p2p;
|
||||
if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p)
|
||||
{
|
||||
hasPeerAccess = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hasPeerAccess)
|
||||
{
|
||||
if (intraRanks == nranks)
|
||||
cliqueMode = CliqueManager::CLIQUE_SINGLE_PROCESS;
|
||||
else
|
||||
cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE;
|
||||
}
|
||||
}
|
||||
comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode);
|
||||
NCCLCHECK(comm->cliqueManager->Init(commId, rootPid));
|
||||
} // [/RCCL]
|
||||
|
||||
// Done with AllGather1 data
|
||||
free(allGather1Data);
|
||||
|
||||
@@ -1144,6 +1185,10 @@ ncclResult_t ncclCommDestroy(ncclComm_t comm) {
|
||||
return ncclInvalidArgument;
|
||||
}
|
||||
|
||||
// [RCCL] Delete CliqueManager if it exists
|
||||
if (comm->cliqueManager) delete comm->cliqueManager;
|
||||
// [/RCCL]
|
||||
|
||||
return commDestroy(comm);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,192 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#include <sys/socket.h>
|
||||
#include <ifaddrs.h>
|
||||
#include <netdb.h>
|
||||
#include <unistd.h>
|
||||
#include <cstdio>
|
||||
#include <string>
|
||||
#include <chrono>
|
||||
#include <hip/hip_runtime.h>
|
||||
#include <rccl.h>
|
||||
#include "HelloRccl.hpp"
|
||||
|
||||
|
||||
void Usage(char *argv0);
|
||||
|
||||
int main(int argc, char **argv)
|
||||
{
|
||||
// This example program uses NCCL_COMM_ID to perform bootstrapping
|
||||
// to sidestep the need to communicate the ncclUniqueId (e.g. via MPI)
|
||||
if (argc < 3 || getenv("NCCL_COMM_ID") == NULL)
|
||||
{
|
||||
Usage(argv[0]);
|
||||
return 1;
|
||||
}
|
||||
// Collect command-line arguments
|
||||
int nranks = atoi(argv[1]);
|
||||
int rank = atoi(argv[2]);
|
||||
int deviceId = atoi(argv[3]);
|
||||
|
||||
// Allocate GPU resources
|
||||
hipStream_t stream;
|
||||
hipEvent_t startEvent, stopEvent;
|
||||
HIP_CALL(hipSetDevice(deviceId));
|
||||
HIP_CALL(hipStreamCreate(&stream));
|
||||
HIP_CALL(hipEventCreate(&startEvent));
|
||||
HIP_CALL(hipEventCreate(&stopEvent));
|
||||
|
||||
// Create communicator
|
||||
ncclUniqueId commId;
|
||||
NCCL_CALL(ncclGetUniqueId(&commId));
|
||||
|
||||
// Initialize communicator
|
||||
ncclComm_t comm;
|
||||
NCCL_CALL(ncclCommInitRank(&comm, nranks, commId, rank));
|
||||
|
||||
// Loop over powers of 2
|
||||
int minPow = 10;
|
||||
int maxPow = 28;
|
||||
|
||||
if (rank == 0)
|
||||
{
|
||||
printf("AllReduce Performance (sum of floats):\n");
|
||||
printf("%10s %10s %10s\n", "Bytes", "CpuTime(ms)", "GpuTime(ms)");
|
||||
}
|
||||
|
||||
for (int power = minPow; power <= maxPow; power++)
|
||||
{
|
||||
int N = 1 << power;
|
||||
|
||||
// Allocate GPU memory
|
||||
float *iputGpu, *oputGpu;
|
||||
HIP_CALL(hipMalloc((void **)&iputGpu, N * sizeof(float)));
|
||||
HIP_CALL(hipMalloc((void **)&oputGpu, N * sizeof(float)));
|
||||
|
||||
// Allocate CPU memory
|
||||
float *iputCpu = (float *)malloc(N * sizeof(float));
|
||||
float *oputCpu = (float *)malloc(N * sizeof(float));
|
||||
|
||||
// Fill CPU with a simple pattern
|
||||
for (int i = 0; i < N; i++)
|
||||
{
|
||||
iputCpu[i] = 1.0f;
|
||||
oputCpu[i] = 0.0f;
|
||||
}
|
||||
|
||||
// Copy the input from CPU memory to GPU memory
|
||||
HIP_CALL(hipMemcpy(iputGpu, iputCpu, N * sizeof(float), hipMemcpyHostToDevice));
|
||||
|
||||
// Perform some untimed initial warmup iterations
|
||||
int numWarmups = 3;
|
||||
for (int iteration = 0; iteration < numWarmups; iteration++)
|
||||
{
|
||||
NCCL_CALL(ncclAllReduce(iputGpu, oputGpu, N, ncclFloat, ncclSum, comm, stream));
|
||||
}
|
||||
HIP_CALL(hipStreamSynchronize(stream));
|
||||
|
||||
// Perform timed iterations
|
||||
int numIterations = 10;
|
||||
auto cpuStart = std::chrono::high_resolution_clock::now();
|
||||
HIP_CALL(hipEventRecord(startEvent, stream));
|
||||
for (int iteration = 0; iteration < numIterations; iteration++)
|
||||
{
|
||||
NCCL_CALL(ncclAllReduce(iputGpu, oputGpu, N, ncclFloat, ncclSum, comm, stream));
|
||||
}
|
||||
HIP_CALL(hipEventRecord(stopEvent, stream));
|
||||
HIP_CALL(hipStreamSynchronize(stream));
|
||||
auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart;
|
||||
double totalCpuTime = std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(cpuDelta).count();
|
||||
float totalGpuTime;
|
||||
HIP_CALL(hipEventElapsedTime(&totalGpuTime, startEvent, stopEvent));
|
||||
|
||||
if (rank == 0) printf("%10lu %10.3f %10.3f\n", N * sizeof(float), (totalCpuTime / numIterations), (totalGpuTime / numIterations));
|
||||
|
||||
|
||||
// Validate results
|
||||
HIP_CALL(hipMemcpy(oputCpu, oputGpu, N * sizeof(float), hipMemcpyDeviceToHost));
|
||||
bool isOK = true;
|
||||
int expected = nranks;
|
||||
for (int i = 0; i < N; i++)
|
||||
{
|
||||
isOK &= (oputCpu[i] == expected);
|
||||
}
|
||||
if (!isOK)
|
||||
{
|
||||
printf("[ERROR] Rank %d Incorrect results for N = %d\n", rank, N);
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Release GPU resources
|
||||
HIP_CALL(hipFree(oputGpu));
|
||||
HIP_CALL(hipFree(iputGpu));
|
||||
|
||||
free(iputCpu);
|
||||
free(oputCpu);
|
||||
}
|
||||
|
||||
HIP_CALL(hipStreamDestroy(stream));
|
||||
HIP_CALL(hipEventDestroy(startEvent));
|
||||
HIP_CALL(hipEventDestroy(stopEvent));
|
||||
NCCL_CALL(ncclCommDestroy(comm));
|
||||
return 0;
|
||||
}
|
||||
|
||||
void Usage(char *argv0)
|
||||
{
|
||||
printf("Usage: %s numRanks rank deviceId [N=8] [verbose=0]\n", argv0);
|
||||
printf(" - NCCL_COMM_ID must be set in order to use this\n\n");
|
||||
printf(" - To use this process as the root process you may use any of the following:\n");
|
||||
|
||||
char hostname[256];
|
||||
gethostname(hostname, 256);
|
||||
printf(" export NCCL_COMM_ID=%s:12345\n", hostname);
|
||||
|
||||
// Loop over linked list of interfaces
|
||||
struct ifaddrs *ifaddr;
|
||||
getifaddrs(&ifaddr);
|
||||
for (struct ifaddrs* ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
|
||||
{
|
||||
// Skip anything not based on IPv4 / IPv6
|
||||
int family = ifa->ifa_addr->sa_family;
|
||||
if (family != AF_INET && family != AF_INET6) continue;
|
||||
|
||||
// Skip iPv6 loopback interface
|
||||
if (family == AF_INET6)
|
||||
{
|
||||
struct sockaddr_in6* sa = (struct sockaddr_in6*)(ifa->ifa_addr);
|
||||
if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
|
||||
}
|
||||
|
||||
socklen_t saLen = (family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
|
||||
char host[NI_MAXHOST];
|
||||
char service[NI_MAXSERV];
|
||||
|
||||
getnameinfo(ifa->ifa_addr, saLen, host, NI_MAXHOST, service, NI_MAXSERV,
|
||||
NI_NUMERICHOST|NI_NUMERICSERV);
|
||||
|
||||
std::string result = std::string(host) + ":12345";
|
||||
printf(" export NCCL_COMM_ID=%s\n", result.c_str());
|
||||
}
|
||||
freeifaddrs(ifaddr);
|
||||
}
|
||||
@@ -0,0 +1,49 @@
|
||||
/*
|
||||
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in
|
||||
all copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
||||
THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef HELLORCCL_HPP
|
||||
#define HELLORCCL_HPP
|
||||
#include <iostream>
|
||||
|
||||
#define HIP_CALL(cmd) \
|
||||
do { \
|
||||
hipError_t error = (cmd); \
|
||||
if (error != hipSuccess) \
|
||||
{ \
|
||||
std::cerr << "Encountered HIP error (" << hipGetErrorString(error) << ") at line " \
|
||||
<< __LINE__ << " in file " << __FILE__ << "\n"; \
|
||||
exit(-1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#define NCCL_CALL(cmd) \
|
||||
do { \
|
||||
ncclResult_t error = (cmd); \
|
||||
if (error != ncclSuccess) \
|
||||
{ \
|
||||
std::cerr << "Encountered NCCL error (" << ncclGetErrorString(error) << ") at line " \
|
||||
<< __LINE__ << " in file " << __FILE__ << "\n"; \
|
||||
exit(-1); \
|
||||
} \
|
||||
} while (0)
|
||||
|
||||
#endif
|
||||
@@ -0,0 +1,21 @@
|
||||
# Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
|
||||
|
||||
# Set to where RCCL is installed
|
||||
RCCL_INSTALL=../../build/release
|
||||
|
||||
HIP_PATH?= $(wildcard /opt/rocm/hip)
|
||||
ifeq (,$(HIP_PATH))
|
||||
HIP_PATH=../../..
|
||||
endif
|
||||
HIPCC=$(HIP_PATH)/bin/hipcc
|
||||
|
||||
EXE=HelloRccl
|
||||
CXXFLAGS = -std=c++11 -O3 -I../../src/include -I$(RCCL_INSTALL) -L$(RCCL_INSTALL) -lrccl
|
||||
|
||||
all: $(EXE)
|
||||
|
||||
$(EXE): $(EXE).cpp $(shell find -regex ".*\.\hpp")
|
||||
$(HIPCC) $(CXXFLAGS) $< -o $@
|
||||
|
||||
clean:
|
||||
rm -f *.o $(EXE)
|
||||
Executable
+16
@@ -0,0 +1,16 @@
|
||||
#!/bin/bash
|
||||
RCCL_INSTALL=../../build/release
|
||||
EXE=$PWD/HelloRccl
|
||||
LDPATH=$LD_LIBRARY_PATH:$RCCL_INSTALL
|
||||
|
||||
echo "With clique-based kernels:"
|
||||
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 0 &
|
||||
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 1 &
|
||||
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 2 &
|
||||
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 3
|
||||
|
||||
echo "Without clique-based kernels:"
|
||||
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 0 &
|
||||
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 1 &
|
||||
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 2 &
|
||||
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 3
|
||||
Reference in New Issue
Block a user