Revert "Initial support for clique-based kernels (#276)" (#280)

This reverts commit 2b8184808d.
Этот коммит содержится в:
gilbertlee-amd
2020-10-15 12:30:18 -06:00
коммит произвёл GitHub
родитель 33babcb5e2
Коммит 84a2541e01
27 изменённых файлов: 7 добавлений и 1969 удалений
-6
Просмотреть файл
@@ -126,12 +126,6 @@ set(CC_SOURCES
src/collectives/all_to_all_api.cc
src/collectives/all_to_allv_api.cc
src/channel.cc
src/clique/CliqueManager.cc # RCCL
src/clique/HandleCache.cc # RCCL
src/clique/HandleShm.cc # RCCL
src/clique/Hash.cc # RCCL
src/clique/MsgQueue.cc # RCCL
src/clique/ShmObject.cc # RCCL
src/misc/argcheck.cc
src/misc/nvmlwrap_stub.cc
src/misc/utils.cc
+3 -33
Просмотреть файл
@@ -12,11 +12,6 @@
#include "socket.h"
#include <unistd.h>
#include <sys/types.h>
// [RCCL]
#include "clique/CliqueManager.h"
#include "clique/CliqueShmNames.h"
#include "clique/Hash.h"
// [/RCCL]
struct bootstrapNetComm {
int fd;
@@ -168,14 +163,7 @@ static ncclResult_t setFilesLimit() {
return ncclSuccess;
}
static void *bootstrapRoot(void* bootstrapRootStruct) { // [RCCL] Modified to include hash argument)
// [RCCL] Unpack bootstrapRootStruct
struct bootstrapRootStruct* rootStruct = (struct bootstrapRootStruct*) bootstrapRootStruct;
void* listenComm = rootStruct->listenComm;
unsigned long hash = rootStruct->hash;
int pid = getpid(); // sharing PID to other ranks for creating shared memory files for CliqueManager
// [/RCCL]
static void *bootstrapRoot(void* listenComm) {
struct extInfo info;
ncclNetHandle_t *rankHandles = NULL;
ncclNetHandle_t *rankHandlesRoot = NULL; // for initial rank <-> root information exchange
@@ -217,19 +205,12 @@ static void *bootstrapRoot(void* bootstrapRootStruct) { // [RCCL] Modified to in
} while (c < nranks);
TRACE(NCCL_INIT, "COLLECTED ALL %d HANDLES", nranks);
{ // [RCCL] Initialize message queues / shared memory files
NCCLCHECKGOTO(CliqueManager::BootstrapRootInit(pid, hash), res, out);
} // [/RCCL]
// Send the connect handle for the next rank in the AllGather ring
for (int r=0; r<nranks; ++r) {
int next = (r+1) % nranks;
void *tmpSendComm;
NCCLCHECKGOTO(bootstrapNetConnect(0, rankHandlesRoot+r, &tmpSendComm), res, out);
NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, rankHandles+next, sizeof(ncclNetHandle_t)), res, out);
{ // [RCCL] Send the root pid for shared file naming
NCCLCHECKGOTO(bootstrapNetSend(tmpSendComm, &pid, sizeof(int)), res, out);
} // [/RCCL]
NCCLCHECKGOTO(bootstrapNetCloseSend(tmpSendComm), res, out);
}
TRACE(NCCL_INIT, "SENT OUT ALL %d HANDLES", nranks);
@@ -248,14 +229,7 @@ ncclResult_t bootstrapCreateRoot(ncclUniqueId* id, bool idFromEnv) {
void* listenComm;
NCCLCHECK(bootstrapNetListen(idFromEnv ? dontCareIf : 0, netHandle, &listenComm));
pthread_t thread;
// [RCCL] Use the ncclUniqueId to get a hash for bootstrap
struct bootstrapRootStruct* rootStruct = new bootstrapRootStruct;
rootStruct->hash = djb2Hash(id->internal);
rootStruct->listenComm = listenComm;
pthread_create(&thread, NULL, bootstrapRoot, (void *)rootStruct);
// [/RCCL]
pthread_create(&thread, NULL, bootstrapRoot, listenComm);
return ncclSuccess;
}
@@ -293,10 +267,9 @@ struct extState {
int rank;
int nranks;
int dev;
int rootPid; // [RCCL] PID of root
};
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState, int* rootPid) { // [RCCL] Adding rootPid
ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commState) {
ncclNetHandle_t* netHandle = (ncclNetHandle_t*) id;
bool idFromEnv = getenv("NCCL_COMM_ID") != NULL;
struct extState* state;
@@ -341,9 +314,6 @@ ncclResult_t bootstrapInit(ncclUniqueId * id, int rank, int nranks, void** commS
ncclNetHandle_t extHandleNext;
NCCLCHECK(bootstrapNetAccept(extBstrapListenCommRoot, &tmpRecvComm));
NCCLCHECK(bootstrapNetRecv(tmpRecvComm, &extHandleNext, sizeof(extHandleNext)));
{ // [RCCL] Receive PID from root
NCCLCHECK(bootstrapNetRecv(tmpRecvComm, rootPid, sizeof(int)));
} // [/RCCL]
NCCLCHECK(bootstrapNetCloseRecv(tmpRecvComm));
NCCLCHECK(bootstrapNetCloseListen(extBstrapListenCommRoot));
-145
Просмотреть файл
@@ -1,145 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef ALLREDUCECLIQUEKERNEL_H
#define ALLREDUCECLIQUEKERNEL_H
#include "CliqueCommon.h"
#include "common_kernel.h"
#include <hip/hip_runtime.h>
#include "hip/hip_ext.h"
#define ALL_REDUCE_SPLIT_BLOCKSIZE 256
template <typename T, class FUNC, int NUM_RANKS>
__global__ __launch_bounds__(ALL_REDUCE_SPLIT_BLOCKSIZE)
void AllReduceCliqueSplitKernel(int N,
size_t startIdx,
cliqueDevicePtrs_t cliquePtrs)
{
if (N > 0)
{
// Each workgroup operates on a contiguous portion of memory
// Divide the # of elements evenly across workgroups, then round up to multiple of blocksize
int baseSize = (N + gridDim.x - 1) / gridDim.x;
int chunkSize = RoundUp(baseSize, ALL_REDUCE_SPLIT_BLOCKSIZE);
int blockOffsetStart = min(blockIdx.x * chunkSize, N);
int blockOffsetStop = min(blockOffsetStart + chunkSize, N);
int blockN = blockOffsetStop - blockOffsetStart;
if (blockN > 0)
{
T const** inputs = (T const**)cliquePtrs.inputs;
T** outputs = (T **)cliquePtrs.outputs;
T const* srcs[NUM_RANKS];
T* dsts[NUM_RANKS];
#pragma unroll
for (int r = 0; r < NUM_RANKS; r++)
{
srcs[r] = inputs[r] + startIdx + blockOffsetStart;
dsts[r] = outputs[r] + startIdx + blockOffsetStart;
}
#define ALL_REDUCE_CLIQUE_UNROLL 4
ReduceOrCopyMulti<ALL_REDUCE_CLIQUE_UNROLL, FUNC, T, NUM_RANKS, NUM_RANKS, NUM_RANKS, NUM_RANKS>(
threadIdx.x, ALL_REDUCE_SPLIT_BLOCKSIZE, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN);
}
}
// Each GPU works on a separate subsection, however we cannot finish the kernel
// until all GPUs have finished otherwise part of the result may not be correct yet
WaitForBarrier<NUM_RANKS>(cliquePtrs.barrierCounter);
}
class AllReduceCliqueKernel
{
public:
static ncclResult_t Launch(int const rank,
int const numRanks,
int const maxGridSize,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
hipStream_t const stream,
cliqueDevicePtrs_t const& cliquePtrs,
bool const doTiming = false)
{
if (numRanks < MIN_CLIQUE_SIZE || numRanks > MAX_CLIQUE_SIZE)
{
WARN("Number of ranks exceeds supported. Expected %d <= %d < %d for numRanks",
MIN_CLIQUE_SIZE, numRanks, MAX_CLIQUE_SIZE);
return ncclInvalidUsage;
}
// Divide the # of elements done per GPU evenly across ranks, then round up to blocksize
int baseSize = (count + numRanks - 1) / numRanks;
int chunkSize = RoundUp(baseSize, ALL_REDUCE_SPLIT_BLOCKSIZE);
int startIdx = min(chunkSize * rank, count);
int stopIdx = min(startIdx + chunkSize, count);
int rankN = max(stopIdx - startIdx, 0);
// Adjust gridsize if there isn't enough work to prevent empty workgroups
int realGridSize = std::max(std::min(maxGridSize,
(rankN + ALL_REDUCE_SPLIT_BLOCKSIZE - 1) / ALL_REDUCE_SPLIT_BLOCKSIZE),
1);
hipEvent_t startEvent = 0;
hipEvent_t stopEvent = 0;
float kernelTimeMs;
if (doTiming)
{
hipEventCreate(&startEvent);
hipEventCreate(&stopEvent);
hipEventRecord(startEvent, stream);
}
// Launch even if empty for this rank, because all ranks must hit sync barrier
hipLaunchKernelGGL(m_allReduceCliqueKernels[datatype][op][numRanks - MIN_CLIQUE_SIZE],
dim3(realGridSize, 1, 1),
dim3(ALL_REDUCE_SPLIT_BLOCKSIZE, 1, 1),
0, stream,
rankN, startIdx, cliquePtrs);
if (doTiming)
{
hipEventRecord(stopEvent, stream);
hipEventSynchronize(stopEvent);
hipEventElapsedTime(&kernelTimeMs, startEvent, stopEvent);
printf("[%d/%d:%d] %lu %13.6f ms\n", rank, numRanks, maxGridSize, count, kernelTimeMs);
hipEventDestroy(startEvent);
hipEventDestroy(stopEvent);
}
return ncclSuccess;
}
protected:
// List of all templated device kernels function pointers
typedef void(*allReduceCliqueFunc_t)(int, size_t, cliqueDevicePtrs_t);
static constexpr allReduceCliqueFunc_t
m_allReduceCliqueKernels[ncclNumTypes][ncclNumOps][MAX_CLIQUE_SIZE - MIN_CLIQUE_SIZE + 1] =
KERNEL_LIST_MACRO(AllReduceCliqueSplitKernel);
};
#endif
-105
Просмотреть файл
@@ -1,105 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef CLIQUE_COMMON_H
#define CLIQUE_COMMON_H
#include "nccl.h"
#include <cstdint>
#include "rccl_bfloat16.h"
#include "reduce_kernel.h"
#define MIN_CLIQUE_SIZE 2
#define MAX_CLIQUE_SIZE 8
typedef struct
{
void const* inputs[MAX_CLIQUE_SIZE];
void* outputs[MAX_CLIQUE_SIZE];
int* barrierCounter;
} cliqueDevicePtrs_t;
// Helper macro to generate a table of templated kernel functions
// This is expected to run between MIN_CLIQUE_SIZE to MAX_CLIQUE_SIZE
#define KERNEL_LIST_RANK(kernelname, datatype, func) \
{ \
kernelname<datatype, func<datatype>, 2>, \
kernelname<datatype, func<datatype>, 3>, \
kernelname<datatype, func<datatype>, 4>, \
kernelname<datatype, func<datatype>, 5>, \
kernelname<datatype, func<datatype>, 6>, \
kernelname<datatype, func<datatype>, 7>, \
kernelname<datatype, func<datatype>, 8> \
}
// Helper macro to generate a table of templated kernel functions
// This is expected to match the number of supported reduction operations (ncclNumOps)
#define KERNEL_LIST_OP(kernelname, datatype) \
{ \
KERNEL_LIST_RANK(kernelname, datatype, FuncSum), \
KERNEL_LIST_RANK(kernelname, datatype, FuncProd), \
KERNEL_LIST_RANK(kernelname, datatype, FuncMax), \
KERNEL_LIST_RANK(kernelname, datatype, FuncMin) \
}
// Helper Macro to generate table of templated kernel functions
// This is expected to match the number of supported datatypes (ncclNumTypes)
#define KERNEL_LIST_MACRO(kernelname) \
{ \
KERNEL_LIST_OP(kernelname, int8_t), \
KERNEL_LIST_OP(kernelname, uint8_t), \
KERNEL_LIST_OP(kernelname, int32_t), \
KERNEL_LIST_OP(kernelname, uint32_t), \
KERNEL_LIST_OP(kernelname, int64_t), \
KERNEL_LIST_OP(kernelname, uint64_t), \
KERNEL_LIST_OP(kernelname, half), \
KERNEL_LIST_OP(kernelname, float), \
KERNEL_LIST_OP(kernelname, double), \
KERNEL_LIST_OP(kernelname, rccl_bfloat16) \
}
template <int NUM_RANKS>
__forceinline__ __device__ void WaitForBarrier(int* counter)
{
if (threadIdx.x == 0 & blockIdx.x == 0)
{
// Assumes counter starts at 0 prior to any rank access
__atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST);
// Wait for all ranks to reach barrier
while (__atomic_load_n(counter, __ATOMIC_SEQ_CST) < NUM_RANKS);
// Each rank increments again, last one resets barrier
if (__atomic_add_fetch(counter, 1, __ATOMIC_SEQ_CST) == (2*NUM_RANKS))
__atomic_store_n(counter, 0, __ATOMIC_SEQ_CST);
// Wait for counter to be zeroed
while (__atomic_load_n(counter, __ATOMIC_SEQ_CST) != 0);
}
}
__forceinline__ __host__ __device__ int RoundUp(int X, int Y)
{
return (X+Y-1)/Y * Y;
}
#endif
-399
Просмотреть файл
@@ -1,399 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "CliqueManager.h"
#include "CliqueShmNames.h"
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "Hash.h"
#include "AllReduceCliqueKernel.h"
#include <hip/hip_runtime.h>
#include <stdio.h>
#include <stdlib.h>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
cliqueDevicePtrs_t CliqueManager::m_cliquePtrs[NCCL_MAX_OPS] = {};
uint32_t CliqueManager::m_staticCounters[NCCL_MAX_OPS] = {};
int* CliqueManager::m_staticBarriers = NULL;
CliqueManager::CliqueManager(int const rank,
int const numRanks,
cliqueMode_t const cliqueMode) :
m_rank(rank),
m_numRanks(numRanks),
m_cliqueMode(cliqueMode),
m_init(false),
m_deviceBarriers(NULL)
{
}
CliqueManager::~CliqueManager()
{
if (m_init)
{
CleanUp();
}
}
void CliqueManager::CleanUp()
{
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Release caches
if (m_ipcHandleSendCache) delete m_ipcHandleSendCache;
if (m_ipcHandleSendCache) delete m_ipcHandleRecvCache;
// Close shared memory
m_shmHandles.Close();
m_sharedCounters.Close();
m_sharedBarrier.Close();
if (m_rank == 0)
{
hipFree(m_deviceBarriers);
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
if (m_staticBarriers) hipHostFree(m_staticBarriers);
}
m_init = false;
}
ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
{
ncclResult_t res;
if (m_init) return ncclSuccess;
m_init = true;
// Check parameters
if (m_rank < 0 || m_rank >= m_numRanks)
{
WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks);
return ncclInvalidUsage;
}
if (commId == NULL)
{
WARN("CommId should not be empty");
return ncclInvalidUsage;
}
// For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var
if (!getenv("RCCL_ENABLE_CLIQUE"))
{
if (m_rank == 0) INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)");
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
unsigned long hash = djb2Hash(commId->internal);
std::string shmSuffix = std::to_string(hash) + "_" + std::to_string(suffix);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Initialize shared memory file for IPC handles (based on commId hash)
m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix);
NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback);
// Initialize IPC caches
m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS);
m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS,
100,
hipIpcMemHandleHash,
hipIpcMemHandleEqual);
// Initialize shared host barrier counters
m_sharedCounters = ShmObject<uint32_t>(NCCL_MAX_OPS * sizeof(uint32_t),
CliqueShmNames["SharedCounters"] + shmSuffix,
m_rank,
m_numRanks,
hash);
NCCLCHECKGOTO(m_sharedCounters.Open(), res, dropback);
m_arrivalCounter = m_sharedCounters.Get();
// Initialized shared barriers
m_sharedBarrier = ShmObject<hipIpcMemHandle_t>(std::max(4096LU, sizeof(hipIpcMemHandle_t)),
CliqueShmNames["Barriers"] + shmSuffix,
m_rank,
m_numRanks,
hash);
NCCLCHECKGOTO(m_sharedBarrier.Open(), res, dropback);
if (m_rank == 0)
{
hipIpcMemHandle_t handle;
// Allocate fine-grained device memory on rank 0 and get handle for it and store in IPC
NCCLCHECKGOTO(ncclCudaCalloc(&m_deviceBarriers, NCCL_MAX_OPS * sizeof(int), true), res, dropback);
if (hipIpcGetMemHandle(&handle, m_deviceBarriers) != hipSuccess)
{
WARN("Unable to get IPC handle for barrier memory");
goto dropback;
}
*m_sharedBarrier.Get() = handle;
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Allocate and zero pinned host memory that all GPU kernels will have access to as a barrier
if (hipHostMalloc(&m_staticBarriers, sizeof(int) * NCCL_MAX_OPS) != hipSuccess)
{
WARN("Unable to allocated pinned host memory for clique barrier. Disabling clique-based kernels");
m_cliqueMode = CLIQUE_DISABLED;
m_init = true;
return ncclSuccess;
}
memset(m_staticBarriers, 0, NCCL_MAX_OPS * sizeof(int));
m_arrivalCounter = m_staticCounters;
}
m_init = true;
return ncclSuccess;
dropback:
WARN("Unable to initialize shared memory. Disabling clique-based kernels");
CleanUp();
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
bool CliqueManager::IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const
{
if (m_cliqueMode == CLIQUE_DISABLED) return false;
if (coll == ncclCollAllReduce) return true;
// NOTE: Currently we only support allReduce
//#define ALL_REDUCE_COUNT 1048576
//if (coll == ncclCollAllReduce && count < ALL_REDUCE_COUNT) return true;
return false;
}
ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
int const opIndex = opCount % NCCL_MAX_OPS;
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Get fine-grained device memory if not already done
if (m_deviceBarriers == NULL)
{
hipIpcMemHandle_t handle = *m_sharedBarrier.Get();
CUDACHECK(hipIpcOpenMemHandle((void**)&m_deviceBarriers, handle, hipIpcMemLazyEnablePeerAccess));
}
std::vector<hipIpcMemHandle_t> handles(NUM_HANDLES_PER_RANK);
// Get IPC handles for input/output pointers from cache
NCCLCHECK(CheckCacheForPtr(const_cast<void*>(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0]));
NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1]));
// Write IPC handles to shared memory for given rank / opCount
NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles));
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Store this rank's input/output pointers into static member
m_cliquePtrs[opIndex].inputs[m_rank] = inputPtr;
m_cliquePtrs[opIndex].outputs[m_rank] = outputPtr;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::QueueKernel(uint64_t const opCount,
ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const root,
hipStream_t const stream)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Wait for all ranks to declare pointers
int opIndex = opCount % NCCL_MAX_OPS;
WaitForBarrier(opIndex);
// Get cliqueDevicePointers
cliqueDevicePtrs_t cliquePtrs;
NCCLCHECK(GetCliqueDevicePointers(opCount, cliquePtrs));
// NOTE: The number of blocks to use per GPU will need to be further optimized
int gridSize = (getenv("RCCL_CLIQUE_GRIDSIZE") ? atoi(getenv("RCCL_CLIQUE_GRIDSIZE")) : 2);
// Launch kernel
switch (coll)
{
case ncclCollAllReduce:
return AllReduceCliqueKernel::Launch(m_rank, m_numRanks, gridSize, count, datatype, op, stream, cliquePtrs);
default:
WARN("Unsupported collective type");
return ncclInvalidUsage;
}
}
ncclResult_t CliqueManager::GetCliqueDevicePointers(uint64_t opCount, cliqueDevicePtrs_t& cliquePtrs)
{
// Wait for completion for current opCount
int opIndex = opCount % NCCL_MAX_OPS;
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Collect the ready handles from shared memory and convert them to device pointers
int numHandles = m_numRanks * NUM_HANDLES_PER_RANK;
std::vector<hipIpcMemHandle_t> handles(numHandles);
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
for (int i = 0; i < m_numRanks; i++)
{
void *input;
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
m_ipcHandleRecvCache, &input));
cliquePtrs.inputs[i] = const_cast<const void *>(input);
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
m_ipcHandleRecvCache, &cliquePtrs.outputs[i]));
}
cliquePtrs.barrierCounter = &(m_deviceBarriers[opIndex]);
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
m_cliquePtrs[opIndex].barrierCounter = &m_staticBarriers[opIndex];
cliquePtrs = m_cliquePtrs[opIndex];
}
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
hipIpcMemHandle_t* handle)
{
uint64_t addr = (uint64_t)devPtr;
// handle NULL ptr case
if (addr == 0)
{
WARN("Error while checking IPC memory handle cache for ptr: null pointer specified.\n");
return ncclInternalError;
}
NcclIpcHandleSendCache::iterator it = cache->find(addr);
if (it == cache->end())
{
CUDACHECK(hipIpcGetMemHandle(handle, devPtr));
std::pair<uint64_t, hipIpcMemHandle_t> ptrHandleMap(addr, *handle) ;
cache->insert(addr, *handle);
}
else
{
*handle = (it->second).first;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForHandle(hipIpcMemHandle_t handle,
NcclIpcHandleRecvCache* cache,
void** ptr)
{
NcclIpcHandleRecvCache::iterator it = cache->find(handle);
if (it == cache->end())
{
CUDACHECK(hipIpcOpenMemHandle(ptr, handle, hipIpcMemLazyEnablePeerAccess));
cache->insert(handle, *ptr);
}
else
{
*ptr = (it->second).first;
}
return ncclSuccess;
}
void CliqueManager::WaitForBarrier(int opIndex)
{
m_nextBarrierValue[opIndex] += m_numRanks;
int const nextValue = m_nextBarrierValue[opIndex];
__atomic_add_fetch(&m_arrivalCounter[opIndex], 1, __ATOMIC_SEQ_CST);
while (m_arrivalCounter[opIndex] < nextValue)
{
std::this_thread::yield();
}
}
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
{
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
int msgid, fd;
std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid);
SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd);
NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid));
SYSCHECK(close(fd), "close");
}
std::string shmDir = "/dev/shm/";
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
struct stat fileStatus;
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
std::string shmFullPath = shmDir + shmFileName;
// Check if shm file already exists; if so, unlink it
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
{
NCCLCHECK(shmUnlink(shmFileName.c_str()));
}
}
return ncclSuccess;
}
-121
Просмотреть файл
@@ -1,121 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_CLIQUE_MANAGER_HPP_
#define RCCL_CLIQUE_MANAGER_HPP_
#include <semaphore.h>
#include <mutex>
#include "nccl.h"
#include "devcomm.h"
#include "CliqueCommon.h"
#include "HandleCache.h"
#include "HandleShm.h"
#define NUM_HANDLES_PER_RANK 2
class CliqueManager
{
public:
typedef enum
{
CLIQUE_DISABLED = 0,
CLIQUE_SINGLE_PROCESS = 1,
CLIQUE_SINGLE_NODE = 2
} cliqueMode_t;
CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode);
~CliqueManager();
void CleanUp();
ncclResult_t Init(ncclUniqueId const* commId, int suffix);
// Returns true if the collective is supported via a clique-based kernel
bool IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const;
// Provide the pointers to be exchanged across the clique for the given rank / opCount
ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr);
// Launch a clique based kernel
ncclResult_t QueueKernel(uint64_t const opCount,
ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const root,
hipStream_t const stream);
ncclResult_t CloseSharedMemory();
static ncclResult_t BootstrapRootInit(int pid, unsigned long hash);
protected:
// Collect the device pointers from all GPUs for specified opCount
ncclResult_t GetCliqueDevicePointers(uint64_t opCount, cliqueDevicePtrs_t& cliquePtrs);
ncclResult_t CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
hipIpcMemHandle_t* handle);
ncclResult_t CheckCacheForHandle(hipIpcMemHandle_t handle,
NcclIpcHandleRecvCache* cache,
void** ptr);
// Race-condition helper functions
void WaitForBarrier(int opIndex);
cliqueMode_t m_cliqueMode;
int m_rank;
int m_numRanks;
bool m_init;
int m_nextBarrierValue[NCCL_MAX_OPS];
uint32_t* m_arrivalCounter;
// IPC-related (CLIQUE_SINGLE_NODE)
NcclIpcHandleShm m_shmHandles;
NcclIpcHandleSendCache* m_ipcHandleSendCache;
NcclIpcHandleRecvCache* m_ipcHandleRecvCache;
ShmObject<uint32_t> m_sharedCounters; // Tracks # of ranks that have finished declaring pointers
ShmObject<hipIpcMemHandle_t> m_sharedBarrier; // Used to pass fine-grained device memory buffer
int* m_deviceBarriers; // fine-grained barrier
// Single-process (CLIQUE_SINGLE_PROCESS)
static cliqueDevicePtrs_t m_cliquePtrs[NCCL_MAX_OPS];
static uint32_t m_staticCounters[NCCL_MAX_OPS];
static int* m_staticBarriers;
};
// For use in bootstrapping code
struct bootstrapRootStruct {
void* listenComm;
unsigned long hash;
};
#endif
-37
Просмотреть файл
@@ -1,37 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_CLIQUE_SHM_NAMES_H_
#define NCCL_CLIQUE_SHM_NAMES_H_
#include <string>
#include <map>
static std::map<std::string, std::string> CliqueShmNames =
{
{"SharedCounters", "RcclCounters" },
{"Mutexes" , "RcclMutexes" },
{"IpcHandles" , "RcclIpcHandles"},
{"Barriers" , "RcclBarriers" }
};
#endif
-31
Просмотреть файл
@@ -1,31 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "HandleCache.h"
#include "Hash.h"
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle)
{
return djb2Hash(handle.reserved);
}
-141
Просмотреть файл
@@ -1,141 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HANDLE_CACHE_H_
#define NCCL_HANDLE_CACHE_H_
#include <list>
#include <unordered_map>
#include <functional>
#include "core.h"
//#include "llvm/ADT/DenseMap.h"
template <
class Key,
class Value,
class Hash,
class KeyEqual,
class Allocator
>
class NcclIpcHandleCache
{
typedef std::unordered_map<Key, std::pair<Value, typename std::list<Key>::iterator>, Hash, KeyEqual, Allocator> LRUCache;
public:
using iterator = typename LRUCache::iterator;
NcclIpcHandleCache(size_t size,
size_t bucket_count = 100,
const Hash& hash = Hash(),
const KeyEqual& eql = KeyEqual(),
const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc)
{
m_capacity = size;
}
~NcclIpcHandleCache()
{
m_lruHistory.clear();
m_cache.clear();
}
iterator begin()
{
return m_cache.begin();
}
iterator end()
{
return m_cache.end();
}
iterator find(const Key& key)
{
iterator it = m_cache.find(key);
if (it != m_cache.end())
{
updateHistory(key);
}
return it;
}
std::pair<iterator, bool> insert(const Key& key, const Value& value)
{
if (m_cache.size() == m_capacity)
{
// remove entry
pop();
}
typename LRUCache::iterator it = m_cache.find(key);
bool inserted;
if (it == m_cache.end())
{
typename std::list<Key>::iterator it = m_lruHistory.insert(m_lruHistory.end(), key);
m_cache.insert(std::make_pair(key, std::make_pair(value, it)));
inserted = true;
}
else
{
inserted = false;
}
return std::pair<iterator, bool>(it, inserted);
}
private:
void pop()
{
typename LRUCache::iterator it = m_cache.find(m_lruHistory.front());
m_cache.erase(it);
m_lruHistory.pop_front();
}
void updateHistory(const Key& key)
{
if (m_lruHistory.size() > 0)
{
m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, m_cache[key].second);
}
}
size_t m_capacity;
std::list<Key> m_lruHistory;
LRUCache m_cache;
};
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle);
// equality function required for unordered_map
auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r)
{
return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0;
};
//typedef llvm::DenseMap<uint64_t, hipIpcMemHandle_t> SendCache;
//typedef llvm::DenseMap<hipIpcMemHandle_t, void*, decltype(&HandleHash), decltype(HandleEqual)> RecvCache;
typedef NcclIpcHandleCache<uint64_t, hipIpcMemHandle_t, std::hash<uint64_t>, std::equal_to<uint64_t>, std::allocator< std::pair<const uint64_t, hipIpcMemHandle_t>>> NcclIpcHandleSendCache;
typedef NcclIpcHandleCache<hipIpcMemHandle_t, void*, decltype(&hipIpcMemHandleHash), decltype(hipIpcMemHandleEqual), std::allocator< std::pair<const uint64_t, hipIpcMemHandle_t>>> NcclIpcHandleRecvCache;
#endif
-67
Просмотреть файл
@@ -1,67 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <hip/hip_runtime.h>
#include "HandleShm.h"
#include "CliqueShmNames.h"
#include "core.h"
#include "Hash.h"
#include "shm.h"
NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix) :
ShmObject<hipIpcMemHandle_t>(numRanks * numHandlesPerRank * capacity * sizeof(hipIpcMemHandle_t),
CliqueShmNames["IpcHandles"] + suffix,
rank,
numRanks,
projid),
m_numHandlesPerRank(numHandlesPerRank),
m_numHandlesPerOpCount(numRanks * numHandlesPerRank)
{
}
NcclIpcHandleShm::NcclIpcHandleShm()
{
}
NcclIpcHandleShm::~NcclIpcHandleShm()
{
}
ncclResult_t NcclIpcHandleShm::Open()
{
return ShmObject::Open();
}
ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t> const& sendHandles)
{
size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank);
memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(hipIpcMemHandle_t) * m_numHandlesPerRank);
return ncclSuccess;
}
ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t>& recvHandles)
{
size_t idx = opCount * m_numHandlesPerOpCount;
memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(hipIpcMemHandle_t));
return ncclSuccess;
}
-53
Просмотреть файл
@@ -1,53 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_IPC_HANDLE_SHM_H_
#define NCCL_IPC_HANDLE_SHM_H_
#include <hip/hip_runtime.h>
#include <vector>
#include <string>
#include "nccl.h"
#include "ShmObject.h"
class NcclIpcHandleShm : public ShmObject<hipIpcMemHandle_t>
{
public:
NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string suffix);
NcclIpcHandleShm();
~NcclIpcHandleShm();
ncclResult_t Open();
ncclResult_t WriteHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t> const& sendHandles);
ncclResult_t ReadHandles(uint64_t opCount, std::vector<hipIpcMemHandle_t>& recvHandles);
private:
int m_numHandlesPerRank;
int m_numHandlesPerOpCount;
};
#endif
-34
Просмотреть файл
@@ -1,34 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "Hash.h"
unsigned long djb2Hash(const char* data)
{
unsigned long hash = 5381;
int c;
while ((c = *(data)++))
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
return hash;
}
-28
Просмотреть файл
@@ -1,28 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HASH_H_
#define NCCL_HASH_H_
unsigned long djb2Hash(const char* data);
#endif
-72
Просмотреть файл
@@ -1,72 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "MsgQueue.h"
#include <sys/ipc.h>
#include <sys/msg.h>
#define MSG_QUEUE_PERM 0666
ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid)
{
key_t key;
SYSCHECKVAL(ftok(name.c_str(), projid), "ftok", key);
int flag = (exclusive == true ? IPC_CREAT | IPC_EXCL : IPC_CREAT);
msgid = msgget(key, MSG_QUEUE_PERM | flag);
// Check if we're trying to create message queue and it already exists; if so, delete existing queue
if (msgid == -1 && exclusive == true && errno == EEXIST)
{
NCCLCHECK(MsgQueueClose(name, projid));
SYSCHECKVAL(msgget(key, MSG_QUEUE_PERM | flag), "msgget", msgid);
}
else if (msgid == -1)
{
WARN("Call to MsgQueueGetId failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg)
{
SYSCHECK(msgsnd(msgid, msgp, msgsz, msgflg), "msgsnd");
return ncclSuccess;
}
ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait)
{
int msgflg = (wait == false ? IPC_NOWAIT : 0);
SYSCHECK(msgrcv(msgid, msgp, msgsz, msgtyp, msgflg), "msgrcv");
return ncclSuccess;
}
ncclResult_t MsgQueueClose(std::string name, int projid)
{
key_t key;
int msgid;
key = ftok(name.c_str(), projid);
SYSCHECKVAL(msgget(key, IPC_CREAT), "msgget", msgid);
SYSCHECK(msgctl(msgid, IPC_RMID, NULL), "msgctl");
return ncclSuccess;
}
-42
Просмотреть файл
@@ -1,42 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_MSG_QUEUE_HPP_
#define RCCL_MSG_QUEUE_HPP_
#include <string>
#include "nccl.h"
#include "core.h"
struct MsgBuffer
{
long msg_type;
char msg_text[1];
};
ncclResult_t MsgQueueGetId(std::string name, int projid, bool exclusive, int& msgid);
ncclResult_t MsgQueueSend(int msgid, const void* msgp, size_t msgsz, int msgflg);
ncclResult_t MsgQueueRecv(int msgid, void* msgp, size_t msgsz, long msgtyp, bool wait);
ncclResult_t MsgQueueClose(std::string name, int projid);
#endif
-43
Просмотреть файл
@@ -1,43 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef SHAREDMEMHELPER_H
#define SHAREDMEMHELPER_H
class SharedMemHelper
{
public:
SharedMemHelper(int rank, int numRanks, int numEntries);
ncclStatus_t Init(std::string const& baseFilename);
ncclStatus_t
protected:
bool m_initialized;
int m_rank;
int m_numRanks;
};
#endif
-45
Просмотреть файл
@@ -1,45 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "ShmObject.h"
#include <string>
// Template specializations for sem_t objects which require additional initialization
template<>
ncclResult_t ShmObject<sem_t>::Close()
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
sem_destroy(static_cast<sem_t*>(&m_shmPtr[i]));
}
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
-203
Просмотреть файл
@@ -1,203 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_SHM_OBJECT_H_
#define NCCL_SHM_OBJECT_H_
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <type_traits>
#include <semaphore.h>
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "shm.h"
// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared
// memory object at the same time.
static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) {
*fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR);
if (*fd == -1) return ncclSystemError;
if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate");
SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap");
close(*fd);
*fd = -1;
if (create) memset(*ptr, 0, shmsize);
return ncclSuccess;
}
template <typename T>
class ShmObject
{
public:
ShmObject(size_t size, std::string fileName, int rank, int numRanks, int projid) :
m_shmSize(size),
m_shmName(fileName),
m_rank(rank),
m_numRanks(numRanks),
m_projid(projid),
m_alloc(false),
m_shmPtr(nullptr) {}
ShmObject() {}
~ShmObject() {}
ncclResult_t Open();
ncclResult_t Close()
{
if (m_alloc)
{
if (m_rank == 0)
{
std::string tmpFileName = "/tmp/" + m_shmName;
remove(tmpFileName.c_str());
}
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
return ncclSuccess;
}
T*& Get()
{
return m_shmPtr;
}
protected:
ncclResult_t BroadcastMessage(int msgid, bool pass)
{
MsgBuffer msg;
msg.msg_text[0] = (pass == 0 ? 'F': 'P');
for (int rank = 0; rank < m_numRanks; rank++)
{
if (rank == m_rank) continue;
msg.msg_type = rank;
NCCLCHECK(MsgQueueSend(msgid, &msg, sizeof(msg), 0));
}
return ncclSuccess;
}
// tag for dispatch
template<class U>
struct OpenTag{};
ncclResult_t InitIfSemaphore(OpenTag<int> tag);
ncclResult_t InitIfSemaphore(OpenTag<uint32_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<sem_t> tag);
size_t m_shmSize;
std::string m_shmName;
int m_rank;
int m_numRanks;
int m_projid;
bool m_alloc;
T* m_shmPtr;
};
template <typename T>
ncclResult_t ShmObject<T>::Open()
{
if (m_alloc == false)
{
int shmFd;
int protection = PROT_READ | PROT_WRITE;
int visibility = MAP_SHARED;
int msgid;
std::string tmpFileName = "/tmp/" + m_shmName;
NCCLCHECK(MsgQueueGetId(tmpFileName, m_projid, false, msgid));
if (m_rank == 0)
{
ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1);
ncclResult_t resultSemInit = InitIfSemaphore(OpenTag<T>{});
if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess))
{
NCCLCHECK(BroadcastMessage(msgid, false));
WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno));
return ncclSystemError;
}
NCCLCHECK(BroadcastMessage(msgid, true));
}
else
{
MsgBuffer msg;
NCCLCHECK(MsgQueueRecv(msgid, &msg, sizeof(msg), m_rank, true));
if (msg.msg_text[0] == 'P')
{
NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0));
}
else
{
WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
m_alloc = true;
}
else
{
WARN("Cannot allocate ShmObject twice.\n");
return ncclInvalidUsage;
}
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<unsigned int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<sem_t> tag)
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
SYSCHECK(sem_init(static_cast<sem_t*>(&m_shmPtr[i]), 1, 1), "sem_init");
}
return ncclSuccess;
}
#endif
-5
Просмотреть файл
@@ -350,14 +350,9 @@ __device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(int32_t); }
__device__ int ptrAlign128(T* ptr) { return (uint64_t)ptr % alignof(Pack128); }
#endif
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
// Multiply UNROLL by 2 if single source/single destination
#define AUTOUNROLL (UNROLL*((MINSRCS==1 && MINDSTS==1) ? 2 : 1))
#else
// Try to limit consecutive load/stores to 8.
// Use UNROLL 8 when we have a single source and a single destination, 4 otherwise
#define AUTOUNROLL (UNROLL*(4/(MINDSTS+MINSRCS)))
#endif
template<int UNROLL, class FUNC, typename T, int MINSRCS, int MAXSRCS, int MINDSTS, int MAXDSTS>
__device__ void ReduceOrCopyMulti(const int tid, const int nthreads,
-25
Просмотреть файл
@@ -617,31 +617,6 @@ end:
info->opName, info->comm->opCount, info->sendbuff, info->recvbuff, info->count,
info->datatype, info->op, info->root, info->comm, info->comm->nRanks, info->stream);
// [RCCL] Alternative launch path for clique-based kernels (if supported and enabled)
{
if (info->comm->cliqueManager->IsSupported(info->coll,
info->count,
info->datatype,
info->op))
{
// Declare the input / output pointers being used (to exchange via IPC with other ranks)
NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount,
info->sendbuff,
info->recvbuff));
// Queue the clique-based kernel
NCCLCHECK(info->comm->cliqueManager->QueueKernel(info->comm->opCount,
info->coll,
info->count,
info->datatype,
info->op,
info->root,
info->stream));
return ncclSuccess;
}
}
// [/RCCL]
NCCLCHECK(ncclSaveKernel(info));
NCCLCHECK(ncclBarrierEnqueue(info->comm));
NCCLCHECK(ncclBarrierEnqueueWait(info->comm));
+1 -1
Просмотреть файл
@@ -12,7 +12,7 @@
ncclResult_t bootstrapNetInit();
ncclResult_t bootstrapCreateRoot(ncclUniqueId* commId, bool idFromEnv);
ncclResult_t bootstrapGetUniqueId(ncclUniqueId* out);
ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState, int* rootPid); // [RCCL] Adding rootPid
ncclResult_t bootstrapInit(ncclUniqueId* id, int rank, int nranks, void** commState);
ncclResult_t bootstrapAllGather(void* commState, void* allData, int size);
ncclResult_t bootstrapSend(void* commState, int peer, void* data, int size);
ncclResult_t bootstrapRecv(void* commState, int peer, void* data, int size);
+2 -9
Просмотреть файл
@@ -10,9 +10,6 @@
#include "transport.h"
#include "p2p.h"
// [RCCL]
#include "clique/CliqueManager.h"
// [/RCCL]
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
#else
@@ -146,12 +143,8 @@ struct ncclComm {
//list of async p2p operation queued in a group semantics
struct ncclP2Plist p2plist;
// [RCCL]
bool alltoallDisable; // RCCL AllToAll/Scatter/Gather API
CliqueManager* cliqueManager; // CliqueManager handles pointer collection / distribution for clique-based kernels
int rootPid; // Process ID of root
// [/RCCL]
// RCCL AllToAll/Scatter/Gather API
bool alltoallDisable;
};
#endif
+1 -46
Просмотреть файл
@@ -28,10 +28,6 @@
#include <unistd.h>
#include "graph/topo.h"
// [RCCL]
#include "clique/CliqueManager.h"
// [/RCCL]
#define STR2(v) #v
#define STR(v) STR2(v)
@@ -682,10 +678,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
int nranks = comm->nRanks;
uint64_t commHash = getHash(commId->internal, NCCL_UNIQUE_ID_BYTES);
TRACE(NCCL_INIT, "comm %p, commHash %lx, rank %d nranks %d - BEGIN", comm, commHash, rank, nranks);
// [RCCL] Collect the PID of the root
int rootPid;
NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap, &rootPid));
// [/RCCL]
NCCLCHECK(bootstrapInit(commId, rank, nranks, &comm->bootstrap));
// AllGather1 - begin
struct {
@@ -1028,40 +1021,6 @@ affinity_restore:
}
NCCLCHECK(ncclCommSetIntra(comm, intraRank, intraRanks, allGather1Data[intraRank0].comm));
{ // [RCCL] Check if clique-based kernels can be enabled and initialize CliqueManager if so
CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED;
if (comm->localRanks == comm->nRanks)
{
// Check that all the GPUs have peer access to one another
bool hasPeerAccess = true;
for (int i = 0; i < nranks && hasPeerAccess; i++)
{
int cudaDev1 = allGather1Data[i].peerInfo.cudaDev;
for (int j = 0; j < nranks; j++)
{
if (i == j) continue;
int cudaDev2 = allGather1Data[j].peerInfo.cudaDev;
int p2p;
if (hipDeviceCanAccessPeer(&p2p, cudaDev1, cudaDev2) != hipSuccess || !p2p)
{
hasPeerAccess = false;
break;
}
}
}
if (hasPeerAccess)
{
if (intraRanks == nranks)
cliqueMode = CliqueManager::CLIQUE_SINGLE_PROCESS;
else
cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE;
}
}
comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode);
NCCLCHECK(comm->cliqueManager->Init(commId, rootPid));
} // [/RCCL]
// Done with AllGather1 data
free(allGather1Data);
@@ -1185,10 +1144,6 @@ ncclResult_t ncclCommDestroy(ncclComm_t comm) {
return ncclInvalidArgument;
}
// [RCCL] Delete CliqueManager if it exists
if (comm->cliqueManager) delete comm->cliqueManager;
// [/RCCL]
return commDestroy(comm);
}
-192
Просмотреть файл
@@ -1,192 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <sys/socket.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <unistd.h>
#include <cstdio>
#include <string>
#include <chrono>
#include <hip/hip_runtime.h>
#include <rccl.h>
#include "HelloRccl.hpp"
void Usage(char *argv0);
int main(int argc, char **argv)
{
// This example program uses NCCL_COMM_ID to perform bootstrapping
// to sidestep the need to communicate the ncclUniqueId (e.g. via MPI)
if (argc < 3 || getenv("NCCL_COMM_ID") == NULL)
{
Usage(argv[0]);
return 1;
}
// Collect command-line arguments
int nranks = atoi(argv[1]);
int rank = atoi(argv[2]);
int deviceId = atoi(argv[3]);
// Allocate GPU resources
hipStream_t stream;
hipEvent_t startEvent, stopEvent;
HIP_CALL(hipSetDevice(deviceId));
HIP_CALL(hipStreamCreate(&stream));
HIP_CALL(hipEventCreate(&startEvent));
HIP_CALL(hipEventCreate(&stopEvent));
// Create communicator
ncclUniqueId commId;
NCCL_CALL(ncclGetUniqueId(&commId));
// Initialize communicator
ncclComm_t comm;
NCCL_CALL(ncclCommInitRank(&comm, nranks, commId, rank));
// Loop over powers of 2
int minPow = 10;
int maxPow = 28;
if (rank == 0)
{
printf("AllReduce Performance (sum of floats):\n");
printf("%10s %10s %10s\n", "Bytes", "CpuTime(ms)", "GpuTime(ms)");
}
for (int power = minPow; power <= maxPow; power++)
{
int N = 1 << power;
// Allocate GPU memory
float *iputGpu, *oputGpu;
HIP_CALL(hipMalloc((void **)&iputGpu, N * sizeof(float)));
HIP_CALL(hipMalloc((void **)&oputGpu, N * sizeof(float)));
// Allocate CPU memory
float *iputCpu = (float *)malloc(N * sizeof(float));
float *oputCpu = (float *)malloc(N * sizeof(float));
// Fill CPU with a simple pattern
for (int i = 0; i < N; i++)
{
iputCpu[i] = 1.0f;
oputCpu[i] = 0.0f;
}
// Copy the input from CPU memory to GPU memory
HIP_CALL(hipMemcpy(iputGpu, iputCpu, N * sizeof(float), hipMemcpyHostToDevice));
// Perform some untimed initial warmup iterations
int numWarmups = 3;
for (int iteration = 0; iteration < numWarmups; iteration++)
{
NCCL_CALL(ncclAllReduce(iputGpu, oputGpu, N, ncclFloat, ncclSum, comm, stream));
}
HIP_CALL(hipStreamSynchronize(stream));
// Perform timed iterations
int numIterations = 10;
auto cpuStart = std::chrono::high_resolution_clock::now();
HIP_CALL(hipEventRecord(startEvent, stream));
for (int iteration = 0; iteration < numIterations; iteration++)
{
NCCL_CALL(ncclAllReduce(iputGpu, oputGpu, N, ncclFloat, ncclSum, comm, stream));
}
HIP_CALL(hipEventRecord(stopEvent, stream));
HIP_CALL(hipStreamSynchronize(stream));
auto cpuDelta = std::chrono::high_resolution_clock::now() - cpuStart;
double totalCpuTime = std::chrono::duration_cast<std::chrono::duration<double, std::milli>>(cpuDelta).count();
float totalGpuTime;
HIP_CALL(hipEventElapsedTime(&totalGpuTime, startEvent, stopEvent));
if (rank == 0) printf("%10lu %10.3f %10.3f\n", N * sizeof(float), (totalCpuTime / numIterations), (totalGpuTime / numIterations));
// Validate results
HIP_CALL(hipMemcpy(oputCpu, oputGpu, N * sizeof(float), hipMemcpyDeviceToHost));
bool isOK = true;
int expected = nranks;
for (int i = 0; i < N; i++)
{
isOK &= (oputCpu[i] == expected);
}
if (!isOK)
{
printf("[ERROR] Rank %d Incorrect results for N = %d\n", rank, N);
exit(1);
}
// Release GPU resources
HIP_CALL(hipFree(oputGpu));
HIP_CALL(hipFree(iputGpu));
free(iputCpu);
free(oputCpu);
}
HIP_CALL(hipStreamDestroy(stream));
HIP_CALL(hipEventDestroy(startEvent));
HIP_CALL(hipEventDestroy(stopEvent));
NCCL_CALL(ncclCommDestroy(comm));
return 0;
}
void Usage(char *argv0)
{
printf("Usage: %s numRanks rank deviceId [N=8] [verbose=0]\n", argv0);
printf(" - NCCL_COMM_ID must be set in order to use this\n\n");
printf(" - To use this process as the root process you may use any of the following:\n");
char hostname[256];
gethostname(hostname, 256);
printf(" export NCCL_COMM_ID=%s:12345\n", hostname);
// Loop over linked list of interfaces
struct ifaddrs *ifaddr;
getifaddrs(&ifaddr);
for (struct ifaddrs* ifa = ifaddr; ifa != NULL; ifa = ifa->ifa_next)
{
// Skip anything not based on IPv4 / IPv6
int family = ifa->ifa_addr->sa_family;
if (family != AF_INET && family != AF_INET6) continue;
// Skip iPv6 loopback interface
if (family == AF_INET6)
{
struct sockaddr_in6* sa = (struct sockaddr_in6*)(ifa->ifa_addr);
if (IN6_IS_ADDR_LOOPBACK(&sa->sin6_addr)) continue;
}
socklen_t saLen = (family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6));
char host[NI_MAXHOST];
char service[NI_MAXSERV];
getnameinfo(ifa->ifa_addr, saLen, host, NI_MAXHOST, service, NI_MAXSERV,
NI_NUMERICHOST|NI_NUMERICSERV);
std::string result = std::string(host) + ":12345";
printf(" export NCCL_COMM_ID=%s\n", result.c_str());
}
freeifaddrs(ifaddr);
}
-49
Просмотреть файл
@@ -1,49 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef HELLORCCL_HPP
#define HELLORCCL_HPP
#include <iostream>
#define HIP_CALL(cmd) \
do { \
hipError_t error = (cmd); \
if (error != hipSuccess) \
{ \
std::cerr << "Encountered HIP error (" << hipGetErrorString(error) << ") at line " \
<< __LINE__ << " in file " << __FILE__ << "\n"; \
exit(-1); \
} \
} while (0)
#define NCCL_CALL(cmd) \
do { \
ncclResult_t error = (cmd); \
if (error != ncclSuccess) \
{ \
std::cerr << "Encountered NCCL error (" << ncclGetErrorString(error) << ") at line " \
<< __LINE__ << " in file " << __FILE__ << "\n"; \
exit(-1); \
} \
} while (0)
#endif
-21
Просмотреть файл
@@ -1,21 +0,0 @@
# Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
# Set to where RCCL is installed
RCCL_INSTALL=../../build/release
HIP_PATH?= $(wildcard /opt/rocm/hip)
ifeq (,$(HIP_PATH))
HIP_PATH=../../..
endif
HIPCC=$(HIP_PATH)/bin/hipcc
EXE=HelloRccl
CXXFLAGS = -std=c++11 -O3 -I../../src/include -I$(RCCL_INSTALL) -L$(RCCL_INSTALL) -lrccl
all: $(EXE)
$(EXE): $(EXE).cpp $(shell find -regex ".*\.\hpp")
$(HIPCC) $(CXXFLAGS) $< -o $@
clean:
rm -f *.o $(EXE)
-16
Просмотреть файл
@@ -1,16 +0,0 @@
#!/bin/bash
RCCL_INSTALL=../../build/release
EXE=$PWD/HelloRccl
LDPATH=$LD_LIBRARY_PATH:$RCCL_INSTALL
echo "With clique-based kernels:"
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 0 &
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 1 &
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 2 &
RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 3
echo "Without clique-based kernels:"
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 0 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 1 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 2 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3 3