Removing the experimental clique kernel files (#1610)

这个提交包含在:
gilbertlee-amd
2025-03-20 18:10:01 -06:00
提交者 GitHub
父节点 90ad586d94
当前提交 626dc50ab5
修改 22 个文件,包含 9 行新增1804 行删除
+1 -17
查看文件
@@ -424,22 +424,6 @@ set(SRC_FILES
src/proxy.cc
src/register.cc
src/transport.cc
# src/clique/AllReduceCliqueKernel.h
# src/clique/CliqueCommon.h
# src/clique/CliqueManager.cc
# src/clique/CliqueManager.h
# src/clique/CliqueShmNames.h
# src/clique/HandleCache.cc
# src/clique/HandleCache.h
# src/clique/HandleShm.cc
# src/clique/HandleShm.h
# src/clique/Hash.cc
# src/clique/Hash.h
# src/clique/MsgQueue.cc
# src/clique/MsgQueue.h
# src/clique/SharedMemHelper.h
# src/clique/ShmObject.cc
# src/clique/ShmObject.h
src/device/all_gather.h
src/device/all_reduce.h
src/device/alltoall_pivot.h
@@ -888,7 +872,7 @@ else()
execute_process(
COMMAND bash "-c" "free | grep -o '[[:digit:]]*' | head -1"
OUTPUT_VARIABLE memory_max_string)
## memory_max_string holds the free memory in KB
## memory_max_string holds the free memory in KB
if (${memory_max_string} MATCHES "^[0-9]+")
math(EXPR memory_in_gb "${memory_max_string} / (1024 * 1024)") ## KB to GB conversion
else()
-75
查看文件
@@ -1,75 +0,0 @@
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef ALLREDUCECLIQUEKERNEL_H
#define ALLREDUCECLIQUEKERNEL_H
#include "CliqueCommon.h"
#include "devcomm.h"
#include "reduce_kernel.h"
#include "common_kernel.h"
template <class FUNC, typename T, int NUM_RANKS>
__device__ void AllReduceCliqueSplitKernel(struct ncclWorkElem* args)
{
// Clique-specific kernel arguments
cliqueDevicePtrs_t* cliquePtrs = args->clique.ptrs; // Collection of all input/output pointers across ranks in clique
size_t const N = args->clique.count; // Total number of elements to reduce
int const nBlocks = args->clique.nChannels; // Total number of blocks assigned to this kernel (may be different than gridDim.x)
int const blockId = args->clique.bid; // 0-indexed blockIdx for this threadblock (may be different than blockIdx.x)
int const rank = args->comm->rank; // Current rank
// Each threadblock works independently of others on a subsection of the input
// First split evently across ranks, while maintaining multiples of blocksize
size_t const perRankN = RoundUp((N + NUM_RANKS - 1) / NUM_RANKS, blockDim.x);
size_t const perBlockN = RoundUp((perRankN + nBlocks - 1) / nBlocks, blockDim.x);
size_t const currBlockStart = min((rank * nBlocks + blockId) * perBlockN, N);
size_t const currBlockStop = min(currBlockStart + perBlockN, N);
size_t const blockN = currBlockStop - currBlockStart;
if (blockN > 0)
{
// Prepare input / output subarrays
T const** inputs = (T const**)cliquePtrs->inputs;
T** outputs = (T **)cliquePtrs->outputs;
T const* srcs[NUM_RANKS];
T* dsts[NUM_RANKS];
#pragma unroll
for (int r = 0; r < NUM_RANKS; r++)
{
srcs[r] = inputs[r] + currBlockStart;
dsts[r] = outputs[r] + currBlockStart;
}
// Perform the reduction
#define ALL_REDUCE_CLIQUE_UNROLL 1
ReduceOrCopyMulti<ALL_REDUCE_CLIQUE_UNROLL, FUNC, T, NUM_RANKS, NUM_RANKS, NUM_RANKS, NUM_RANKS, 0>(
threadIdx.x, blockDim.x, nullptr, false, NUM_RANKS, srcs, NUM_RANKS, dsts, blockN);
}
// Even if there was nothing for this GPU to do, it must participate in a barrier
// because other GPUs may be modifying this GPUs output buffer still
if (blockId == 0) WaitForBarrier<NUM_RANKS>(cliquePtrs->barrier);
}
#endif
-93
查看文件
@@ -1,93 +0,0 @@
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef CLIQUE_COMMON_H
#define CLIQUE_COMMON_H
#include "nccl.h"
#include <cstdint>
#define MIN_CLIQUE_SIZE 2
#define MAX_CLIQUE_SIZE 8
typedef struct
{
int* globalCount; // Shared across GPUs
int* globalSense; // Shared across GPUs
int* localSense; // Local to this GPU
} gpuBarrier_t;
typedef struct
{
// Input/output pointers from participating ranks
void const* inputs[MAX_CLIQUE_SIZE];
void* outputs[MAX_CLIQUE_SIZE];
// Barrier variable
gpuBarrier_t barrier;
} cliqueDevicePtrs_t;
// Helper macro to launch an appropriate kernel by converting rank to a template argument
#define LAUNCH_CLIQUE_KERNEL(kernelname, FUNC, T, args) \
{ \
switch (args->comm->nRanks){ \
case 2: kernelname<FUNC, T, 2>(args); break; \
case 3: kernelname<FUNC, T, 3>(args); break; \
case 4: kernelname<FUNC, T, 4>(args); break; \
case 5: kernelname<FUNC, T, 5>(args); break; \
case 6: kernelname<FUNC, T, 6>(args); break; \
case 7: kernelname<FUNC, T, 7>(args); break; \
case 8: kernelname<FUNC, T, 8>(args); break; \
} \
}
// Multi-GPU (on same node) barrier. One thread per grid per GPU updates barrier / waits
template <int NUM_RANKS>
__forceinline__ __device__ void WaitForBarrier(gpuBarrier_t const& barrier)
{
if (threadIdx.x == 0)
{
// Sense inversion barrier
*barrier.localSense = 1 - *barrier.localSense;
int localSense = *barrier.localSense;
int val = __atomic_add_fetch(barrier.globalCount, 1, __ATOMIC_SEQ_CST);
if (val == NUM_RANKS)
{
// Last arrival resets barrier
__atomic_store_n(barrier.globalCount, 0, __ATOMIC_SEQ_CST);
__atomic_store_n(barrier.globalSense, localSense, __ATOMIC_SEQ_CST);
}
else
{
// Wait for all ranks to reach barrier
while (__atomic_load_n(barrier.globalSense, __ATOMIC_SEQ_CST) != localSense);
}
}
}
__forceinline__ __host__ __device__ size_t RoundUp(size_t X, size_t Y)
{
return (X+Y-1)/Y * Y;
}
#endif
-577
查看文件
@@ -1,577 +0,0 @@
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "CliqueManager.h"
#include "CliqueShmNames.h"
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "Hash.h"
#include "AllReduceCliqueKernel.h"
#include <hip/hip_runtime.h>
#include <hsa/hsa_ext_amd.h>
#include <stdio.h>
#include <stdlib.h>
#include <chrono>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <thread>
#include <unistd.h>
cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {};
int CliqueManager::m_staticBarrierCount[NCCL_MAX_OPS*2] = {};
int* CliqueManager::m_staticGpuBarrierMem = NULL;
// Define some environment variables that affect clique-based kernels
RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels
RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 0); // Max number of bytes to use clique-based kernels for all reduce (0 for auto-select)
RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 0); // Number of channels to use for all-reduce. (0 for auto-select)
CliqueManager::CliqueManager(int const rank,
int const numRanks,
cliqueMode_t const cliqueMode) :
m_rank(rank),
m_numRanks(numRanks),
m_hash(0),
m_cliqueMode(cliqueMode),
m_opIndexHead(0),
m_opIndexTail(0),
m_init(false),
m_gcnArchName(char[256]),
m_allReduceByteLimit(0),
m_pinnedCliquePtrs(NULL),
m_gpuBarrierGlobalCount(NULL),
m_gpuBarrierGlobalSense(NULL),
m_gpuBarrierLocalSense(NULL),
m_cpuBarrierCount(NULL),
m_shmHandles(),
m_ipcHandleSendCache(),
m_ipcHandleRecvCache(),
m_sharedCpuMemory(),
m_sharedIpcHandle(),
m_fineGrainBarrierMem(NULL),
m_sharedBarrierCount(NULL)
{}
CliqueManager::~CliqueManager()
{
if (m_init)
{
CleanUp();
}
}
void CliqueManager::CleanUp()
{
if (m_cliqueMode == CLIQUE_DISABLED) return;
// Free variables that are shared between SINGLE_PROCESS / SINGLE_NODE
if (m_pinnedCliquePtrs) hipHostFree(m_pinnedCliquePtrs);
if (m_gpuBarrierLocalSense) hipFree(m_gpuBarrierLocalSense);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Release caches
INFO(NCCL_COLL, "Rank %d deleting IPC caches", m_rank);
if (m_ipcHandleSendCache) delete m_ipcHandleSendCache;
if (m_ipcHandleRecvCache) delete m_ipcHandleRecvCache;
// Close shared memory
m_shmHandles.Close();
m_sharedCpuMemory.Close();
m_sharedIpcHandle.Close();
if (m_fineGrainBarrierMem)
{
if (m_rank == 0)
hipFree(m_fineGrainBarrierMem);
else
hipIpcCloseMemHandle(m_fineGrainBarrierMem);
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
if (m_rank == 0 && m_staticGpuBarrierMem)
hipFree(m_staticGpuBarrierMem);
}
m_init = false;
}
ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
{
ncclResult_t res;
if (m_init) return ncclSuccess;
m_init = true;
m_hash = djb2Hash(commId->internal);
if (m_cliqueMode == CLIQUE_DISABLED)
{
INFO(NCCL_INIT, "Clique kernels disabled");
return ncclSuccess;
}
// Check parameters
if (m_rank < 0 || m_rank >= m_numRanks)
{
WARN("Invalid rank specified. Expected 0 <= %d < %d for CliqueManager", m_rank, m_numRanks);
return ncclInvalidUsage;
}
// For now, opt-into clique based kernels via RCCL_ENABLE_CLIQUE env var
if (!rcclParamEnableClique())
{
INFO(NCCL_INIT, "Disabling clique-based kernels (did not find env var RCCL_ENABLE_CLIQUE)");
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
// Allocate pinned CPU memory for holding clique pointers, which kernels will have access to
if (hipHostMalloc(&m_pinnedCliquePtrs, sizeof(cliqueDevicePtrs_t) * NCCL_MAX_OPS) != hipSuccess)
{
WARN("Unable to allocated pinned host memory for clique pointers. Disabling clique-based kernels");
m_cliqueMode = CLIQUE_DISABLED;
m_init = true;
return ncclSuccess;
}
std::string shmSuffix = std::to_string(m_hash) + "_" + std::to_string(suffix);
// Allocate sense barrier variable on local GPU
NCCLCHECKGOTO(ncclCudaCalloc(&m_gpuBarrierLocalSense, NCCL_MAX_OPS * sizeof(int)), res, dropback);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Initialize shared memory file for IPC handles (based on commId hash)
m_shmHandles = NcclIpcHandleShm(m_rank, m_numRanks, m_hash, NUM_HANDLES_PER_RANK, NCCL_MAX_OPS, shmSuffix);
NCCLCHECKGOTO(m_shmHandles.Open(), res, dropback);
// Initialize IPC caches
m_ipcHandleSendCache = new NcclIpcHandleSendCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS);
m_ipcHandleRecvCache = new NcclIpcHandleRecvCache(m_numRanks * NUM_HANDLES_PER_RANK * NCCL_MAX_OPS,
100,
hipIpcMemHandleHash,
hipIpcMemHandleEqual);
// Initialize shared object for GPU barrier IPC handle
m_sharedIpcHandle = ShmObject<hipIpcMemHandle_t>(std::max(4096LU, sizeof(hipIpcMemHandle_t)),
CliqueShmNames["Barriers"] + shmSuffix,
m_rank,
m_numRanks,
m_hash);
NCCLCHECKGOTO(m_sharedIpcHandle.Open(), res, dropback);
if (m_rank == 0)
{
hipIpcMemHandle_t handle;
// Allocate fine-grained device memory on rank 0 and get IPC handle for it
// Re-usable barrier consists of (globalCount / globalSense) pair of integers
NCCLCHECKGOTO(ncclCudaCalloc(&m_fineGrainBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), nullptr, hipDeviceMallocFinegrained), res, dropback);
if (hipIpcGetMemHandle(&handle, m_fineGrainBarrierMem) != hipSuccess)
{
WARN("Unable to get IPC handle for barrier memory");
goto dropback;
}
// Write IPC handle to shared memory for other ranks to receive
*m_sharedIpcHandle.Get() = handle;
// Set up global count/sense for first rank
m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0];
m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS];
}
// Initialize shared CPU memory to be used for barrier variables
m_sharedCpuMemory = ShmObject<int32_t>(NCCL_MAX_OPS * 2 * sizeof(int32_t),
CliqueShmNames["SharedCounters"] + shmSuffix,
m_rank,
m_numRanks,
m_hash);
NCCLCHECKGOTO(m_sharedCpuMemory.Open(), res, dropback);
// Split up the shared CPU memory for barrier counters / global sense
m_cpuBarrierCount = m_sharedCpuMemory.Get();
// Initialize CPU barriers
if (m_rank == 0)
{
memset(m_cpuBarrierCount, 0, NCCL_MAX_OPS * 2 * sizeof(int32_t));
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
m_cpuBarrierCount = &m_staticBarrierCount[0];
// First rank prepares fine-grained memory shared across ranks used for the two barrier variables
if (m_rank == 0)
{
NCCLCHECKGOTO(ncclCudaCalloc(&m_staticGpuBarrierMem, NCCL_MAX_OPS * 2 * sizeof(int), nullptr, hipDeviceMallocFinegrained), res, dropback);
// Prepare all barriers
for (int opIndex = 0; opIndex < NCCL_MAX_OPS; opIndex++)
{
m_staticCliquePtrs[opIndex].barrier.globalCount = &m_staticGpuBarrierMem[opIndex];
m_staticCliquePtrs[opIndex].barrier.globalSense = &m_staticGpuBarrierMem[opIndex + NCCL_MAX_OPS];;
}
}
}
// Figure out device arch for tuning
int deviceId;
CUDACHECK(hipGetDevice(&deviceId));
hipDeviceProp_t devProp;
CUDACHECK(hipGetDeviceProperties(&devProp, deviceId));
m_gcnArchName = devProp.gcnArchName;
// Establish when to use clique-based kernels based on input size
SetByteLimits();
m_init = true;
INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d) [GCN %d]", m_cliqueMode, m_gcnArchName);
return ncclSuccess;
dropback:
// NOTE: This currently assumes that all ranks will fail the same way
// Additional support is required to handle cases when some processes succeed while others fail
WARN("Unable to initialize shared memory. Disabling clique-based kernels");
CleanUp();
m_cliqueMode = CLIQUE_DISABLED;
return ncclSuccess;
}
void CliqueManager::SetByteLimits()
{
m_allReduceByteLimit = rcclParamAllReduceCliqueByteLimit();
if (m_allReduceByteLimit == 0)
{
if (IsArchMatch(m_gcnArchName, "gfx906"))
m_allReduceByteLimit = 16777216;
else if (IsArchMatch(m_gcnArchName, "gfx908"))
m_allReduceByteLimit = 8388608;
else
m_allReduceByteLimit = 16777216;
}
}
bool CliqueManager::IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const
{
if (m_cliqueMode == CLIQUE_DISABLED) return false;
// Filter based on total input size for each collective type and ops sum/prod/min/max
size_t totalBytes = count * ncclTypeSize(datatype);
if (coll == ncclFuncAllReduce && (totalBytes <= m_allReduceByteLimit) && op < ncclAvg) return true;
return false;
}
ncclResult_t CliqueManager::DeclarePointers(void const* inputPtr, void* outputPtr)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Add to queue of in-progress collectives
int32_t const opIndex = m_opIndexTail;
m_opIndexTail = (m_opIndexTail + 1) % NCCL_MAX_OPS;
INFO(NCCL_COLL, "Rank %d declaring pointers for opIndex %d", m_rank, opIndex);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Get fine-grained device memory if not already done
if (m_fineGrainBarrierMem == NULL)
{
hipIpcMemHandle_t handle = *m_sharedIpcHandle.Get();
CUDACHECK(hipIpcOpenMemHandle((void**)&m_fineGrainBarrierMem, handle, hipIpcMemLazyEnablePeerAccess));
// Prepare global count/sense barrier variables used the ipc-shared gpu device memory
m_gpuBarrierGlobalCount = &m_fineGrainBarrierMem[0];
m_gpuBarrierGlobalSense = &m_fineGrainBarrierMem[NCCL_MAX_OPS];
}
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(NUM_HANDLES_PER_RANK);
// Get IPC handles for input/output pointers from cache
NCCLCHECK(CheckCacheForPtr(const_cast<void*>(inputPtr), m_ipcHandleSendCache, m_rank, &handles[0]));
NCCLCHECK(CheckCacheForPtr(outputPtr , m_ipcHandleSendCache, m_rank, &handles[1]));
// Prepare barrier pointers (done after the IpcOpenMemory)
m_pinnedCliquePtrs[opIndex].barrier.globalCount = &m_gpuBarrierGlobalCount[opIndex];
m_pinnedCliquePtrs[opIndex].barrier.globalSense = &m_gpuBarrierGlobalSense[opIndex];
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
// Write IPC handles to shared memory for given rank / opCount
NCCLCHECK(m_shmHandles.WriteHandles(opIndex, handles));
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Store this rank's input/output pointers into static member
m_staticCliquePtrs[opIndex].inputs[m_rank] = inputPtr;
m_staticCliquePtrs[opIndex].outputs[m_rank] = outputPtr;
}
// Increment entry barrier counter - must not block
volatile int* entryCounter = &m_cpuBarrierCount[2 * opIndex];
int entryVal = LOAD(entryCounter);
// Loop until successful atomic update to counter
bool done = false;
while (done == false) {
// Last rank resets exit barrier counter prior to incrementing entry count to numRanks
if (entryVal+1 == m_numRanks)
m_cpuBarrierCount[2 * opIndex + 1] = 0;
done = __sync_bool_compare_and_swap(entryCounter, entryVal, entryVal+1);
entryVal++;
}
return ncclSuccess;
}
ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const totalNumChannels,
uint8_t* numChannelstoUse) const
{
size_t const totalBytes = count * ncclTypeSize(datatype);
*numChannelstoUse = 1;
if (coll == ncclFuncAllReduce) {
if (rcclParamAllReduceNumChannels() == 0)
{
// NOTE: These are currently based on collected data and not necessarily ideal for all hardware
int numChannels;
if (IsArchMatch(m_gcnArchName, "gfx906")) {
if (totalBytes <= 16384) numChannels = 1;
else numChannels = 2;
} else if (IsArchMatch(m_gcnArchName, "gfx908")) {
if (totalBytes <= 131072) numChannels = 2;
else if (totalBytes <= 524288) numChannels = 6;
else if (totalBytes <= 1048576) numChannels = 13;
else numChannels = 16;
} else if (IsArchMatch(m_gcnArchName, "gfx90a")) {
if (totalBytes <= 262144) numChannels = 4;
else numChannels = 8;
} else {
if (totalBytes <= 65536) numChannels = 1;
else if (totalBytes <= 262144) numChannels = 2;
else if (totalBytes <= 524288) numChannels = 4;
else if (totalBytes <= 2097152) numChannels = 8;
else numChannels = 11;
}
*numChannelstoUse = std::min(numChannels, totalNumChannels);
}
else
{
*numChannelstoUse = std::min((int)rcclParamAllReduceNumChannels(), totalNumChannels);
}
}
return ncclSuccess;
}
ncclResult_t CliqueManager::WaitForPointers(ncclWorkElem* args)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Check that collective queue is not empty
if (m_opIndexHead == m_opIndexTail)
{
WARN("WaitForPointers must be called after DeclarePointers");
return ncclInvalidUsage;
}
// Pop first collective off queue
int32_t const opIndex = m_opIndexHead;
INFO(NCCL_COLL, "Rank %d waiting for pointers for opIndex %d", m_rank, opIndex);
m_opIndexHead = (m_opIndexHead + 1) % NCCL_MAX_OPS;
args->clique.ptrs = &m_pinnedCliquePtrs[opIndex];
// Wait for all ranks to declare pointers for this opIndex
volatile int* entryCounter = (volatile int*)(&m_cpuBarrierCount[2 * opIndex]);
int entryVal = LOAD(entryCounter);
while (entryVal != m_numRanks) entryVal = LOAD(entryCounter);
// Last rank to past barrier resets entry barrier
// NOTE: There is another GPU-barrier performed during the kernels therefore it should
// not be possible for any rank to modify entry count prior to being reset
volatile int* exitCounter = &m_cpuBarrierCount[2 * opIndex + 1];
int exitVal = LOAD(exitCounter);
// Loop until successful atomic update to counter
bool done = false;
while (done == false) {
// Last rank resets entry counter
if (exitVal+1 == m_numRanks)
m_cpuBarrierCount[2 * opIndex] = 0;
done = __sync_bool_compare_and_swap(exitCounter, exitVal, exitVal+1);
exitVal++;
}
INFO(NCCL_COLL, "Rank %d past opIndex barrier %d", m_rank, opIndex);
// Collect pointers
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
int numHandles = m_numRanks * NUM_HANDLES_PER_RANK;
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(numHandles);
// Collect the ready handles from shared memory and convert them to device pointers
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
for (int i = 0; i < m_numRanks; i++)
{
void *input;
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
m_ipcHandleRecvCache, &input));
m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast<const void *>(input);
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i]));
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
// Copy from static memory to pinned host memory and set local sense
memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t));
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
}
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
std::pair<hipIpcMemHandle_t, size_t>* handlePair)
{
// Get the base address for this device allocation
hsa_status_t status;
hsa_amd_pointer_info_t info;
info.size = sizeof(hsa_amd_pointer_info_t);
status = hsa_amd_pointer_info(devPtr, &info, NULL, NULL, NULL);
if (status != HSA_STATUS_SUCCESS) {
WARN("Uanble to get pointer information for %p", devPtr);
return ncclInvalidArgument;
}
// Compute the offset between the device addres and the base address
uint64_t baseAddr = (uint64_t)info.agentBaseAddress;
uint64_t realAddr = (uint64_t)devPtr;
handlePair->second = realAddr - baseAddr;
CUDACHECK(hipIpcGetMemHandle(&handlePair->first, (void*)baseAddr));
/* Disabling cache until proper deallocation methods are available
// IPC handles are only supported for base address pointers
NcclIpcHandleSendCache::iterator it = cache->find(baseAddr);
if (it == cache->end())
{
INFO(NCCL_COLL, "Rank %d searching IPC handle cache for %p (not found)", rank, devPtr);
CUDACHECK(hipIpcGetMemHandle(&handlePair->first, (void*)baseAddr));
cache->insert(baseAddr, handlePair->first);
}
else
{
INFO(NCCL_COLL, "Rank %d searching IPC handle cache for %p (found!)", rank, devPtr);
handlePair->first = (it->second).first;
}
*/
return ncclSuccess;
}
ncclResult_t CliqueManager::CheckCacheForHandle(std::pair<hipIpcMemHandle_t, size_t> const& handlePair,
NcclIpcHandleRecvCache* cache,
void** ptr)
{
// Until proper deallocation hooks are implemented, receive cache can not be used
// Handles will need to be extract each time
void* baseAddr;
CUDACHECK(hipIpcOpenMemHandle(&baseAddr, handlePair.first, hipIpcMemLazyEnablePeerAccess));
/*
NcclIpcHandleRecvCache::iterator it = cache->find(handlePair.first);
// Get base address pointer from cache if it exists
if (it == cache->end())
{
CUDACHECK(hipIpcOpenMemHandle(&baseAddr, handlePair.first, hipIpcMemLazyEnablePeerAccess));
cache->insert(handlePair.first, baseAddr);
}
else
{
baseAddr = (it->second).first;
}
*/
// Modify base address pointer with offset
uint64_t realAddr = (uint64_t)baseAddr + handlePair.second;
*ptr = (void*)realAddr;
return ncclSuccess;
}
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
{
if (rcclParamEnableClique())
{
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
mqd_t mq_desc;
std::string msgQueueName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
NCCLCHECK(MsgQueueGetId(msgQueueName, true, mq_desc));
NCCLCHECK(MsgQueueClose(msgQueueName, mq_desc, true));
}
std::string shmDir = "/dev/shm/";
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
struct stat fileStatus;
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
std::string shmFullPath = shmDir + shmFileName;
// Check if shm file already exists; if so, unlink it
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
{
NCCLCHECK(shmUnlink(shmFileName.c_str()));
}
}
}
else
{
INFO(NCCL_INIT, "Not performing bootstrap root for clique kernels as clique mode not enabled.");
}
return ncclSuccess;
}
-127
查看文件
@@ -1,127 +0,0 @@
/*
Copyright (c) 2020-2021 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_CLIQUE_MANAGER_HPP_
#define RCCL_CLIQUE_MANAGER_HPP_
#include <semaphore.h>
#include <mutex>
#include "nccl.h"
#include "devcomm.h"
#include "CliqueCommon.h"
#include "HandleCache.h"
#include "HandleShm.h"
#define NUM_HANDLES_PER_RANK 2
class CliqueManager
{
public:
typedef enum
{
CLIQUE_DISABLED = 0,
CLIQUE_SINGLE_PROCESS = 1,
CLIQUE_SINGLE_NODE = 2
} cliqueMode_t;
CliqueManager(int const rank, int const numRanks, cliqueMode_t const cliqueMode);
~CliqueManager();
void CleanUp();
ncclResult_t Init(ncclUniqueId const* commId, int suffix);
void SetByteLimits();
// Returns true if the collective is supported via a clique-based kernel
bool IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op) const;
// Provide the pointers to be exchanged across the clique for the given rank / opCount
ncclResult_t DeclarePointers(void const* inputPtr, void* outputPtr);
// Determine the number of channels / CUs to use for this call
ncclResult_t GetNumChannelsToUse(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
ncclRedOp_t const op,
int const totalNumChannels,
uint8_t* numChannelstoUse) const;
// Blocking call that only returns the in-progress clique pointers are ready
// This needs to be called in same order as DeclarePointers
ncclResult_t WaitForPointers(ncclWorkElem* args);
// Prepares shared memory files upon initialization
static ncclResult_t BootstrapRootInit(int pid, unsigned long hash);
protected:
ncclResult_t CheckCacheForPtr(void* devPtr,
NcclIpcHandleSendCache* cache,
int rank,
std::pair<hipIpcMemHandle_t, size_t>* handlePair);
ncclResult_t CheckCacheForHandle(std::pair<hipIpcMemHandle_t, size_t> const& handlePair,
NcclIpcHandleRecvCache* cache,
void** ptr);
int m_rank; // Associated rank
int m_numRanks; // Total number of ranks
unsigned long m_hash; // Hash used for identifying message queues & shared memory
cliqueMode_t m_cliqueMode; // Clique mode (off/single process/single node)
int32_t m_opIndexHead; // Track start of outstanding requests
int32_t m_opIndexTail; // Track end of outstanding requests
bool m_init; // Whether CliqueManager has been initialized
char[256] m_gcnArchName; // Device GCN arch value
size_t m_allReduceByteLimit; // Byte limit for AllReduce
cliqueDevicePtrs_t* m_pinnedCliquePtrs; // Pinned-host-memory (device accessible) containing device pointers
int* m_gpuBarrierGlobalCount; // Part of GPU barrier (count variable shared across ranks)
int* m_gpuBarrierGlobalSense; // Part of GPU barrier (reset variable shared across ranks)
int* m_gpuBarrierLocalSense; // Part of GPU barrier (reset variable local to this rank)
int* m_cpuBarrierCount; // Points to either m_sharedBarrierCount or m_staticBarrierCount
// IPC-related (CLIQUE_SINGLE_NODE)
NcclIpcHandleShm m_shmHandles; // Used to exchange IPC handles between ranks
NcclIpcHandleSendCache* m_ipcHandleSendCache; // Caches pointers to IPC handles (to send to other processes)
NcclIpcHandleRecvCache* m_ipcHandleRecvCache; // Caches IPC handles to pointers (received from other processes)
ShmObject<int32_t> m_sharedCpuMemory; // Used to pass shared memory used for CPU barrier
ShmObject<hipIpcMemHandle_t> m_sharedIpcHandle; // Used to pass fine-grained device memory buffer IPC handle
int* m_fineGrainBarrierMem; // Fine-grained GPU memory barrier (allocated only on 1st rank, shared on others)
int* m_sharedBarrierCount; // Part of CPU barrier (count variable shared across ranks)
// Single-process (CLIQUE_SINGLE_PROCESS)
static cliqueDevicePtrs_t m_staticCliquePtrs[NCCL_MAX_OPS]; // Use shared static memory to exchange pointer info
static int m_staticBarrierCount[2*NCCL_MAX_OPS]; // Part of CPU barrier (count variable shared across ranks)
static int* m_staticGpuBarrierMem; // Static storage backing for fine-grained gpu barrier
};
// For use in bootstrapping code
struct bootstrapRootStruct {
int listenFd;
unsigned long hash;
};
#endif
-37
查看文件
@@ -1,37 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_CLIQUE_SHM_NAMES_H_
#define NCCL_CLIQUE_SHM_NAMES_H_
#include <string>
#include <map>
static std::map<std::string, std::string> CliqueShmNames =
{
{"SharedCounters", "RcclCounters" },
{"Mutexes" , "RcclMutexes" },
{"IpcHandles" , "RcclIpcHandles"},
{"Barriers" , "RcclBarriers" }
};
#endif
-31
查看文件
@@ -1,31 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "HandleCache.h"
#include "Hash.h"
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle)
{
return djb2Hash(handle.reserved);
}
-142
查看文件
@@ -1,142 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HANDLE_CACHE_H_
#define NCCL_HANDLE_CACHE_H_
#include <list>
#include <unordered_map>
#include <functional>
#include "core.h"
//#include "llvm/ADT/DenseMap.h"
template <
class Key,
class Value,
class Hash,
class KeyEqual,
class Allocator
>
class NcclIpcHandleCache
{
public:
typedef std::pair<Value, typename std::list<Key>::iterator> NcclIpcHandleCacheValueType;
typedef std::unordered_map<Key, NcclIpcHandleCacheValueType, Hash, KeyEqual, Allocator> LRUCache;
using iterator = typename LRUCache::iterator;
NcclIpcHandleCache(size_t size,
size_t bucket_count = 100,
const Hash& hash = Hash(),
const KeyEqual& eql = KeyEqual(),
const Allocator& alloc = Allocator() ) : m_cache(bucket_count, hash, eql, alloc)
{
m_capacity = size;
}
~NcclIpcHandleCache()
{
m_lruHistory.clear();
m_cache.clear();
}
iterator begin()
{
return m_cache.begin();
}
iterator end()
{
return m_cache.end();
}
iterator find(const Key& key)
{
iterator it = m_cache.find(key);
if (it != m_cache.end())
{
updateHistory(it);
}
return it;
}
std::pair<iterator, bool> insert(const Key& key, const Value& value)
{
if (m_cache.size() == m_capacity)
{
// remove entry
pop();
}
typename LRUCache::iterator it = m_cache.find(key);
bool inserted;
if (it == m_cache.end())
{
typename std::list<Key>::iterator it = m_lruHistory.insert(m_lruHistory.end(), key);
m_cache.insert(std::make_pair(key, std::make_pair(value, it)));
inserted = true;
}
else
{
inserted = false;
}
return std::pair<iterator, bool>(it, inserted);
}
private:
void pop()
{
typename LRUCache::iterator it = m_cache.find(m_lruHistory.front());
m_cache.erase(it);
m_lruHistory.pop_front();
}
void updateHistory(const iterator& it)
{
if (!m_lruHistory.empty())
{
m_lruHistory.splice(m_lruHistory.end(), m_lruHistory, (it->second).second);
}
}
size_t m_capacity;
std::list<Key> m_lruHistory;
LRUCache m_cache;
};
// djb2 hash function for hashing char array in hipIpcMemHandle_t
unsigned long hipIpcMemHandleHash(const hipIpcMemHandle_t& handle);
// equality function required for unordered_map
auto hipIpcMemHandleEqual = [](const hipIpcMemHandle_t& l, const hipIpcMemHandle_t& r)
{
return memcmp(l.reserved, r.reserved, sizeof(l.reserved)) == 0;
};
//typedef llvm::DenseMap<uint64_t, hipIpcMemHandle_t> SendCache;
//typedef llvm::DenseMap<hipIpcMemHandle_t, void*, decltype(&HandleHash), decltype(HandleEqual)> RecvCache;
typedef NcclIpcHandleCache<uint64_t, hipIpcMemHandle_t, std::hash<uint64_t>, std::equal_to<uint64_t>, std::allocator< std::pair<const uint64_t, std::pair<hipIpcMemHandle_t, std::list<uint64_t>::iterator>>>> NcclIpcHandleSendCache;
typedef NcclIpcHandleCache<hipIpcMemHandle_t, void*, decltype(&hipIpcMemHandleHash), decltype(hipIpcMemHandleEqual), std::allocator< std::pair<const hipIpcMemHandle_t, std::pair<void*, std::list<hipIpcMemHandle_t>::iterator>>>> NcclIpcHandleRecvCache;
#endif
-69
查看文件
@@ -1,69 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include <hip/hip_runtime.h>
#include "HandleShm.h"
#include "CliqueShmNames.h"
#include "core.h"
#include "Hash.h"
#include "shm.h"
NcclIpcHandleShm::NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string const& suffix) :
ShmObject<std::pair<hipIpcMemHandle_t,size_t>>(numRanks * numHandlesPerRank * capacity * sizeof(std::pair<hipIpcMemHandle_t,size_t>),
CliqueShmNames["IpcHandles"] + suffix,
rank,
numRanks,
projid),
m_numHandlesPerRank(numHandlesPerRank),
m_numHandlesPerOpCount(numRanks * numHandlesPerRank)
{
}
NcclIpcHandleShm::NcclIpcHandleShm() :
m_numHandlesPerRank(0),
m_numHandlesPerOpCount(0)
{
}
NcclIpcHandleShm::~NcclIpcHandleShm()
{
}
ncclResult_t NcclIpcHandleShm::Open()
{
return ShmObject::Open();
}
ncclResult_t NcclIpcHandleShm::WriteHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>> const& sendHandles)
{
size_t idx = (opCount * m_numHandlesPerOpCount) + (m_rank * m_numHandlesPerRank);
memcpy(m_shmPtr + idx, sendHandles.data(), sizeof(std::pair<hipIpcMemHandle_t,size_t>) * m_numHandlesPerRank);
return ncclSuccess;
}
ncclResult_t NcclIpcHandleShm::ReadHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>>& recvHandles)
{
size_t idx = opCount * m_numHandlesPerOpCount;
memcpy(recvHandles.data(), m_shmPtr + idx, m_numHandlesPerOpCount * sizeof(std::pair<hipIpcMemHandle_t,ssize_t>));
return ncclSuccess;
}
-53
查看文件
@@ -1,53 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_IPC_HANDLE_SHM_H_
#define NCCL_IPC_HANDLE_SHM_H_
#include <hip/hip_runtime.h>
#include <vector>
#include <string>
#include "nccl.h"
#include "ShmObject.h"
class NcclIpcHandleShm : public ShmObject<std::pair<hipIpcMemHandle_t,size_t>>
{
public:
NcclIpcHandleShm(int rank, int numRanks, int projid, int numHandlesPerRank, int capacity, std::string const& suffix);
NcclIpcHandleShm();
~NcclIpcHandleShm();
ncclResult_t Open();
ncclResult_t WriteHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>> const& sendHandles);
ncclResult_t ReadHandles(uint64_t opCount, std::vector<std::pair<hipIpcMemHandle_t,size_t>>& recvHandles);
private:
int m_numHandlesPerRank;
int m_numHandlesPerOpCount;
};
#endif
-34
查看文件
@@ -1,34 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "Hash.h"
unsigned long djb2Hash(const char* data)
{
unsigned long hash = 5381;
int c;
while ((c = *(data)++))
hash = ((hash << 5) + hash) + c; /* hash * 33 + c */
return hash;
}
-28
查看文件
@@ -1,28 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_HASH_H_
#define NCCL_HASH_H_
unsigned long djb2Hash(const char* data);
#endif
-101
查看文件
@@ -1,101 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "MsgQueue.h"
#include <chrono>
#define MSG_QUEUE_PERM S_IRUSR | S_IWUSR
#define MSG_QUEUE_MODE O_RDWR
#define MSG_SIZE 1
#define MSG_QUEUE_TIMEOUT 60
ncclResult_t MsgQueueGetId(std::string const& name, bool exclusive, mqd_t& mq_desc)
{
int flag = (exclusive == true ? O_CREAT | O_EXCL : O_CREAT);
struct mq_attr attr;
attr.mq_maxmsg = 10;
attr.mq_msgsize = MSG_SIZE;
attr.mq_flags = 0;
std::string mq_name = "/" + name;
mq_desc = mq_open(mq_name.c_str(), flag | MSG_QUEUE_MODE, MSG_QUEUE_PERM, &attr);
// Check if we're trying to create message queue and it already exists; if so, delete existing queue
if (mq_desc == -1 && exclusive == true && errno == EBUSY)
{
NCCLCHECK(MsgQueueClose(name, mq_desc, true));
SYSCHECKVAL(mq_open(mq_name.c_str(), flag | MSG_QUEUE_MODE, MSG_QUEUE_PERM, attr), "mq_open", mq_desc);
}
else if (mq_desc == -1)
{
WARN("Call to MsgQueueGetId failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
ncclResult_t MsgQueueSend(mqd_t const& mq_desc, const char* msgp, size_t msgsz)
{
SYSCHECK(mq_send(mq_desc, msgp, msgsz, 0), "mq_send");
return ncclSuccess;
}
ncclResult_t MsgQueueRecv(mqd_t const& mq_desc, char* msgp, size_t msgsz)
{
SYSCHECK(mq_receive(mq_desc, msgp, msgsz, NULL), "mq_receive");
return ncclSuccess;
}
ncclResult_t MsgQueueWaitUntilEmpty(mqd_t const& mq_desc)
{
mq_attr attr;
mq_getattr(mq_desc, &attr);
auto start = std::chrono::steady_clock::now();
while(attr.mq_curmsgs > 0)
{
SYSCHECK(mq_getattr(mq_desc, &attr), "mq_getattr");
if(std::chrono::steady_clock::now() - start > std::chrono::seconds(MSG_QUEUE_TIMEOUT))
{
WARN("Message Queue timed out waiting for all ranks to receive messages.");
return ncclSystemError;
}
}
return ncclSuccess;
}
ncclResult_t MsgQueueClose(std::string const& name, mqd_t& mq_desc, bool unlink)
{
if (unlink)
{
NCCLCHECK(MsgQueueUnlink(name));
}
SYSCHECK(mq_close(mq_desc), "mq_close");
return ncclSuccess;
}
ncclResult_t MsgQueueUnlink(std::string const& name)
{
std::string mq_name = "/" + name;
SYSCHECK(mq_unlink(mq_name.c_str()), "mq_unlink");
return ncclSuccess;
}
-39
查看文件
@@ -1,39 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef RCCL_MSG_QUEUE_HPP_
#define RCCL_MSG_QUEUE_HPP_
#include <string>
#include <mqueue.h>
#include "nccl.h"
#include "core.h"
ncclResult_t MsgQueueGetId(std::string const& name, bool exclusive, mqd_t& mq_desc);
ncclResult_t MsgQueueSend(mqd_t const& mq_desc, const char* msgp, size_t msgsz);
ncclResult_t MsgQueueRecv(mqd_t const& mq_desc, char* msgp, size_t msgsz);
ncclResult_t MsgQueueWaitUntilEmpty(mqd_t const& mq_desc);
ncclResult_t MsgQueueClose(std::string const& name, mqd_t& mq_desc, bool unlink);
ncclResult_t MsgQueueUnlink(std::string const& name);
#endif
-43
查看文件
@@ -1,43 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef SHAREDMEMHELPER_H
#define SHAREDMEMHELPER_H
class SharedMemHelper
{
public:
SharedMemHelper(int rank, int numRanks, int numEntries);
ncclStatus_t Init(std::string const& baseFilename);
ncclStatus_t
protected:
bool m_initialized;
int m_rank;
int m_numRanks;
};
#endif
-45
查看文件
@@ -1,45 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#include "ShmObject.h"
#include <string>
// Template specializations for sem_t objects which require additional initialization
template<>
ncclResult_t ShmObject<sem_t>::Close()
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
sem_destroy(static_cast<sem_t*>(&m_shmPtr[i]));
}
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
return ncclSuccess;
}
-247
查看文件
@@ -1,247 +0,0 @@
/*
Copyright (c) 2020 Advanced Micro Devices, Inc. All rights reserved.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/
#ifndef NCCL_SHM_OBJECT_H_
#define NCCL_SHM_OBJECT_H_
#include <string>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <type_traits>
#include <semaphore.h>
#include "MsgQueue.h"
#include "nccl.h"
#include "core.h"
#include "shm.h"
// ShmObject abstracts away the nitty-gritty when multiple processes need to handle opening a shared
// memory object at the same time.
static ncclResult_t shmSetupExclusive(const char* shmname, const int shmsize, int* fd, void** ptr, int create) {
*fd = shm_open(shmname, O_CREAT | O_RDWR | O_EXCL, S_IRUSR | S_IWUSR);
if (*fd == -1) return ncclSystemError;
if (create) SYSCHECK(shm_allocate(*fd, shmsize), "posix_fallocate");
SYSCHECK(shm_map(*fd, shmsize, ptr), "mmap");
close(*fd);
*fd = -1;
if (create) memset(*ptr, 0, shmsize);
return ncclSuccess;
}
template <typename T>
class ShmObject
{
public:
ShmObject(size_t size, std::string const& fileName, int rank, int numRanks, int projid) :
m_shmSize(size),
m_shmName(fileName),
m_rank(rank),
m_numRanks(numRanks),
m_projid(projid),
m_alloc(false),
m_shmPtr(nullptr) {}
ShmObject() :
m_shmSize(0),
m_shmName(""),
m_rank(0),
m_numRanks(0),
m_projid(0),
m_alloc(false),
m_shmPtr(nullptr) {}
~ShmObject() {}
ncclResult_t Open();
ncclResult_t Close()
{
if (m_alloc)
{
SYSCHECK(munmap(m_shmPtr, m_shmSize), "munmap");
}
return ncclSuccess;
}
T*& Get()
{
return m_shmPtr;
}
protected:
ncclResult_t BroadcastMessage(mqd_t& mq_desc, bool pass) const
{
char msg_text[1];
msg_text[0] = (pass == 0 ? 'F': 'P');
for (int rank = 0; rank < m_numRanks; rank++)
{
if (rank == m_rank) continue;
NCCLCHECK(MsgQueueSend(mq_desc, &msg_text[0], sizeof(msg_text)));
}
return ncclSuccess;
}
ncclResult_t BroadcastAndCloseMessageQueue(mqd_t& mq_desc, bool pass)
{
ncclResult_t res;
NCCLCHECKGOTO(BroadcastMessage(mq_desc, pass), res, dropback);
NCCLCHECKGOTO(MsgQueueWaitUntilEmpty(mq_desc), res, dropback);
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, true));
return ncclSuccess;
dropback:
WARN("Root rank unable to broadcast across message queue. Closing message queue.");
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, true));
return ncclSystemError;
}
// tag for dispatch
template<class U>
struct OpenTag{};
static ncclResult_t InitIfSemaphore(OpenTag<int> tag);
ncclResult_t InitIfSemaphore(OpenTag<uint32_t> tag);
static ncclResult_t InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag);
ncclResult_t InitIfSemaphore(OpenTag<sem_t> tag);
static ncclResult_t InitIfSemaphore(OpenTag<std::pair<hipIpcMemHandle_t,size_t>> tag);
size_t m_shmSize;
std::string m_shmName;
int m_rank;
int m_numRanks;
int m_projid;
bool m_alloc;
T* m_shmPtr;
};
template <typename T>
ncclResult_t ShmObject<T>::Open()
{
mqd_t mq_desc;
if (m_alloc == false)
{
int shmFd;
INFO(NCCL_INIT, "Rank %d Initializing message queue for %s\n", m_rank, m_shmName.c_str());
NCCLCHECK(MsgQueueGetId(m_shmName, false, mq_desc));
if (m_rank == 0)
{
ncclResult_t resultSetup = shmSetupExclusive(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 1);
ncclResult_t resultSemInit = InitIfSemaphore(OpenTag<T>{});
if ((resultSetup != ncclSuccess && errno != EEXIST) || (resultSemInit != ncclSuccess))
{
NCCLCHECK(BroadcastAndCloseMessageQueue(mq_desc, false));
WARN("Call to ShmObject::Open in root rank failed : %s", strerror(errno));
if (resultSetup == ncclSuccess)
{
Close();
}
return ncclSystemError;
}
ncclResult_t result;
// Broadcast two sets of messages: one set is consumed by the other ranks to acknowledge root rank
// has successfully opened shared memory; second set is consumed by the other ranks to indicate
// that they have successfully opened shared memory and root rank can now unlink shared memory
NCCLCHECK(BroadcastMessage(mq_desc, true));
NCCLCHECK(BroadcastAndCloseMessageQueue(mq_desc, true));
int retVal = shm_unlink(m_shmName.c_str());
if (retVal == -1 && errno != ENOENT)
{
WARN("Call to shm_unlink in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
else
{
char msg_text[1];
ncclResult_t res;
NCCLCHECKGOTO(MsgQueueRecv(mq_desc, &msg_text[0], sizeof(msg_text)), res, dropback);
if (msg_text[0] == 'P')
{
NCCLCHECK(shmSetup(m_shmName.c_str(), m_shmSize, &shmFd, (void**)&m_shmPtr, 0));
NCCLCHECKGOTO(MsgQueueRecv(mq_desc, &msg_text[0], sizeof(msg_text)), res, dropback);
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, false));
}
else
{
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, false));
WARN("Call to shm_open from non-root rank in ShmObject failed : %s", strerror(errno));
return ncclSystemError;
}
}
m_alloc = true;
}
else
{
WARN("Cannot allocate ShmObject twice.\n");
return ncclInvalidUsage;
}
return ncclSuccess;
dropback:
WARN("Rank %d failed ShmObject::Open(). Closing message queue.", m_rank);
NCCLCHECK(MsgQueueClose(m_shmName, mq_desc, false));
SYSCHECK(shm_unlink(m_shmName.c_str()), "shm_unlink");
NCCLCHECK(Close());
return ncclSystemError;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<unsigned int> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<hipIpcMemHandle_t> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<std::pair<hipIpcMemHandle_t,size_t>> tag)
{
return ncclSuccess;
}
template<typename T>
ncclResult_t ShmObject<T>::InitIfSemaphore(OpenTag<sem_t> tag)
{
size_t numMutexes = m_shmSize / sizeof(sem_t);
for (size_t i = 0; i < numMutexes; i++)
{
SYSCHECK(sem_init(static_cast<sem_t*>(&m_shmPtr[i]), 1, 1), "sem_init");
}
return ncclSuccess;
}
#endif
-2
查看文件
@@ -1059,7 +1059,6 @@ template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncAllReduce, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_LL128> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
runRing<T, RedOp, ProtoLL128>(tid, nthreads, work);
//LAUNCH_CLIQUE_KERNEL(AllReduceCliqueSplitKernel, RedOp, T, work);
}
};
@@ -1067,6 +1066,5 @@ template<typename T, typename RedOp>
struct RunWorkColl<ncclFuncAllReduce, T, RedOp, NCCL_ALGO_TREE, NCCL_PROTO_LL128> {
__device__ __forceinline__ void run(int tid, int nthreads, struct ncclDevWorkColl* work) {
runTreeSplit<T, RedOp, ProtoLL128>(tid, nthreads, work);
//LAUNCH_CLIQUE_KERNEL(AllReduceCliqueSplitKernel, RedOp, T, worrk);
}
};
+3 -5
查看文件
@@ -43,7 +43,6 @@
#include "git_version.h"
#include "rccl_vars.h"
#include "hip_rocm_version_info.h"
//#include "clique/CliqueManager.h"
//#include <hsa/hsa_ext_amd.h>
#ifdef ENABLE_MSCCLPP
#include "mscclpp/mscclpp_nccl.h"
@@ -485,7 +484,6 @@ static ncclResult_t commFree(ncclComm_t comm) {
return ncclSuccess;
}
RCCL_PARAM(CliqueIgnoreTopo, "CLIQUE_IGNORE_TOPO", 0);
RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 0);
RCCL_PARAM(PivotAlltoallEnable, "PIVOT_ALLTOALL_ENABLE", 1);
RCCL_PARAM(LL128ForceEnable, "LL128_FORCE_ENABLE", 0);
@@ -1578,7 +1576,7 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, struct ncclComm* p
} else {
NCCLCHECKGOTO(ncclProxyCreate(comm), ret, fail);
}
timers[TIMER_INIT_CONNECT] = clockNano();
do { // Build p2p schedule
int node = comm->node;
@@ -1979,7 +1977,7 @@ static ncclResult_t ncclCommInitRankFunc(struct ncclAsyncJob* job_) {
if (rcclParamMscclppForceEnabled()) {
comm->mscclppForceEnable = true;
} else {
comm->mscclppForceEnable = false;
comm->mscclppForceEnable = false;
}
} else {
WARN("MSCCL++: Cannot enable MSCCL++ on %s architecture", devProp.gcnArchName);
@@ -2410,7 +2408,7 @@ static ncclResult_t commDestroySync(struct ncclAsyncJob* job_) {
// And keep polling until all graphs referencing us die.
while (comm->persistentRefs != 0) {
NCCLCHECKGOTO(ncclCommPollCallbacks(comm, /*waitSome=*/true), ret, fail);
}
}
}
if ((ret = ncclProxyStop(comm)) != ncclSuccess) {
+3 -27
查看文件
@@ -118,7 +118,7 @@ namespace RcclUnitTesting
std::vector<bool> const managedMemList = {false};
std::vector<bool> const useHipGraphList = {false, true};
std::vector<const char *> const channelList = {"84", "112"};
bool const enableSweep = false;
bool const enableSweep = false;
for (auto channel : channelList) {
setenv("NCCL_MIN_NCHANNELS", channel, 1);
testBed.RunSimpleSweep(funcTypes, dataTypes, redOps, roots, numElements,
@@ -149,30 +149,6 @@ namespace RcclUnitTesting
testBed.Finalize();
}
TEST(AllReduce, DISABLED_Clique)
{
// Set clique env var prior to TestBed
setenv("RCCL_ENABLE_CLIQUE", "1", 1);
TestBed testBed;
// Configuration
std::vector<ncclFunc_t> const funcTypes = {ncclCollAllReduce};
std::vector<ncclDataType_t> const dataTypes = testBed.GetAllSupportedDataTypes();
std::vector<ncclRedOp_t> const redOps = testBed.GetAllSupportedRedOps();
std::vector<int> const roots = {0};
std::vector<int> const numElements = {1048576, 1024};
std::vector<bool> const inPlaceList = {false, true};
std::vector<bool> const managedMemList = {false};
std::vector<bool> const useHipGraphList = {false, true};
testBed.RunSimpleSweep(funcTypes, dataTypes, redOps, roots, numElements,
inPlaceList, managedMemList, useHipGraphList);
testBed.Finalize();
unsetenv("RCCL_ENABLE_CLIQUE");
}
// This tests using custom pre-mult scalars reductions
TEST(AllReduce, PreMultScalar)
{
@@ -245,7 +221,7 @@ namespace RcclUnitTesting
}
TEST(AllReduce, UserBufferRegistration)
{
{
const int nranks = 8;
size_t count = 2048;
std::vector<int> sendBuff(count, 0);
@@ -260,7 +236,7 @@ namespace RcclUnitTesting
}
TEST(AllReduce, ManagedMemUserBufferRegistration)
{
{
const int nranks = 8;
size_t count = 2048;
std::vector<int> sendBuff(count, 0);
+2 -11
查看文件
@@ -3,19 +3,10 @@ RCCL_INSTALL=../../build/release
EXE=$PWD/HelloRccl
LDPATH=$LD_LIBRARY_PATH:$RCCL_INSTALL
echo "Single process - With clique-based kernels:"
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 LD_LIBRARY_PATH=$LDPATH $EXE 4
echo "Single process - Without clique-based kernels:"
echo "Single process:"
NCCL_DEBUG=INFO LD_LIBRARY_PATH=$LDPATH $EXE 4
echo "With clique-based kernels:"
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 &
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 &
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 &
RCCL_CLIQUE_ALLREDUCE_BYTE_LIMIT=1073741824 RCCL_FORCE_ENABLE_CLIQUE=1 NCCL_DEBUG=INFO RCCL_ENABLE_CLIQUE=1 NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 3
echo "Without clique-based kernels:"
echo "Multi-process:"
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 0 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 1 &
NCCL_COMM_ID=$HOSTNAME:12345 LD_LIBRARY_PATH=$LDPATH $EXE 4 2 &
-1
查看文件
@@ -38,7 +38,6 @@ const char* ncclProtoStr[NCCL_NUM_PROTOCOLS] = { "LL", "LL128", "Simple" };
extern NodeModel *node_model;
RCCL_PARAM(CliqueIgnoreTopo, "CLIQUE_IGNORE_TOPO", 0);
RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 0);
RCCL_PARAM(PivotAlltoallEnable, "PIVOT_ALLTOALL_ENABLE", 1);
RCCL_PARAM(LL128ForceEnable, "LL128_FORCE_ENABLE", 0);