* Enabling clique for any XGMI-connected topology, adding tuning
* Updating CHANGELOG for clique tuning
* Re-working clique barrier system to work on multi-process / multi-gpu

[ROCm/rccl commit: 9d7232c091]
Этот коммит содержится в:
gilbertlee-amd
2021-05-06 09:50:07 -06:00
коммит произвёл GitHub
родитель ffdf00a2fa
Коммит f4a12be69b
12 изменённых файлов: 605 добавлений и 225 удалений
+9
Просмотреть файл
@@ -2,6 +2,15 @@
Full documentation for RCCL is available at [https://rccl.readthedocs.io](https://rccl.readthedocs.io)
## [Unreleased]
### Optimizations
- Additional tuning for clique-based kernel AllReduce performance (still requires opt in with RCCL_ENABLE_CLIQUE=1)
### Changed
- Replaced RCCL_FORCE_ENABLE_CLIQUE to RCCL_CLIQUE_IGNORE_TOPO
- Clique-based kernels can now be enabled on topologies where all active GPUs are XGMI-connected
- Topologies not normally supported by clique-based kernels require RCCL_CLIQUE_IGNORE_TOPO=1
## [RCCL-2.7.8 for ROCm 4.1.0]
### Added
- Experimental support for clique-based kernels (opt in with RCCL_ENABLE_CLIQUE=1)
+126 -89
Просмотреть файл
@@ -42,13 +42,14 @@ THE SOFTWARE.
#include <thread>
#include <unistd.h>
cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {};
int* CliqueManager::m_staticGpuBarrierMem = NULL;
cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {};
int CliqueManager::m_staticBarrierCount[NCCL_MAX_OPS*2] = {};
int* CliqueManager::m_staticGpuBarrierMem = NULL;
// Define some environment variables that affect clique-based kernels
RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels
RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 16777216); // Max number of bytes to use clique-based kernels for all reduce
RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 0); // Number of channels to use for all-reduce. (0 for auto-select)
RCCL_PARAM(EnableClique, "ENABLE_CLIQUE", 0); // Opt-in environment variable for clique-based kernels
RCCL_PARAM(AllReduceCliqueByteLimit, "CLIQUE_ALLREDUCE_BYTE_LIMIT", 0); // Max number of bytes to use clique-based kernels for all reduce (0 for auto-select)
RCCL_PARAM(AllReduceNumChannels, "CLIQUE_ALLREDUCE_NCHANNELS", 0); // Number of channels to use for all-reduce. (0 for auto-select)
CliqueManager::CliqueManager(int const rank,
int const numRanks,
@@ -56,6 +57,8 @@ CliqueManager::CliqueManager(int const rank,
m_rank(rank),
m_numRanks(numRanks),
m_cliqueMode(cliqueMode),
m_opIndexHead(0),
m_opIndexTail(0),
m_init(false),
m_pinnedCliquePtrs(NULL),
m_fineGrainBarrierMem(NULL)
@@ -113,7 +116,11 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
m_init = true;
m_hash = djb2Hash(commId->internal);
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (m_cliqueMode == CLIQUE_DISABLED)
{
INFO(NCCL_INIT, "Clique kernels disabled");
return ncclSuccess;
}
// Check parameters
if (m_rank < 0 || m_rank >= m_numRanks)
@@ -190,7 +197,7 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
}
// Initialize shared CPU memory to be used for barrier variables
m_sharedCpuMemory = ShmObject<int32_t>(2 * sizeof(int32_t),
m_sharedCpuMemory = ShmObject<int32_t>(NCCL_MAX_OPS * 2 * sizeof(int32_t),
CliqueShmNames["SharedCounters"] + shmSuffix,
m_rank,
m_numRanks,
@@ -198,19 +205,18 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
NCCLCHECKGOTO(m_sharedCpuMemory.Open(), res, dropback);
// Split up the shared CPU memory for barrier counters / global sense
m_cpuBarrierGlobalCount = &m_sharedCpuMemory.Get()[0];
m_cpuBarrierGlobalSense = &m_sharedCpuMemory.Get()[1];
m_cpuBarrierCount = m_sharedCpuMemory.Get();
// Initialize CPU barriers
if (m_rank == 0)
{
*m_cpuBarrierGlobalCount = 0;
*m_cpuBarrierGlobalSense = 0;
memset(m_cpuBarrierCount, 0, NCCL_MAX_OPS * 2 * sizeof(int32_t));
}
m_cpuBarrierLocalSense = 0;
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
m_cpuBarrierCount = &m_staticBarrierCount[0];
// First rank prepares fine-grained memory shared across ranks used for the two barrier variables
if (m_rank == 0)
{
@@ -224,9 +230,18 @@ ncclResult_t CliqueManager::Init(ncclUniqueId const* commId, int suffix)
}
}
// Figure out device arch for tuning
int deviceId;
CUDACHECK(hipGetDevice(&deviceId));
hipDeviceProp_t devProp;
CUDACHECK(hipGetDeviceProperties(&devProp, deviceId));
m_gcnArch = devProp.gcnArch;
// Establish when to use clique-based kernels based on input size
SetByteLimits();
m_init = true;
INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d)", m_cliqueMode);
INFO(NCCL_INIT, "Clique-based kernels enabled (mode %d) [GCN %d]", m_cliqueMode, m_gcnArch);
return ncclSuccess;
dropback:
@@ -238,6 +253,20 @@ dropback:
return ncclSuccess;
}
void CliqueManager::SetByteLimits()
{
m_allReduceByteLimit = rcclParamAllReduceCliqueByteLimit();
if (m_allReduceByteLimit == 0)
{
switch (m_gcnArch)
{
case 906: m_allReduceByteLimit = 16777216; break;
case 908: m_allReduceByteLimit = 536870912; break;
default: m_allReduceByteLimit = 16777216; break;
}
}
}
bool CliqueManager::IsSupported(ncclFunc_t const coll,
size_t const count,
ncclDataType_t const datatype,
@@ -247,12 +276,11 @@ bool CliqueManager::IsSupported(ncclFunc_t const coll,
// Filter based on total input size for each collective type
size_t totalBytes = count * ncclTypeSize(datatype);
if (coll == ncclFuncAllReduce && (totalBytes <= rcclParamAllReduceCliqueByteLimit())) return true;
if (coll == ncclFuncAllReduce && (totalBytes <= m_allReduceByteLimit)) return true;
return false;
}
ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr)
ncclResult_t CliqueManager::DeclarePointers(void const* inputPtr, void* outputPtr)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
@@ -263,11 +291,11 @@ ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputP
return ncclInvalidUsage;
}
int const opIndex = opCount % NCCL_MAX_OPS;
// Add opIndex to queue of in-progress collectives
m_inProgress.push(opIndex);
// Add to queue of in-progress collectives
int32_t const opIndex = m_opIndexTail;
m_opIndexTail = (m_opIndexTail + 1) % NCCL_MAX_OPS;
INFO(NCCL_COLL, "Rank %d declaring pointers for opIndex %d", m_rank, opIndex);
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Get fine-grained device memory if not already done
@@ -302,6 +330,18 @@ ncclResult_t CliqueManager::DeclarePointers(uint64_t opCount, void const* inputP
m_staticCliquePtrs[opIndex].outputs[m_rank] = outputPtr;
}
// Increment entry barrier counter - must not block
volatile int* entryCounter = &m_cpuBarrierCount[2 * opIndex];
int entryVal = LOAD(entryCounter);
// Loop until successful atomic update to counter
bool done = false;
while (done == false) {
// Last rank resets exit barrier counter prior to incrementing entry count to numRanks
if (entryVal+1 == m_numRanks)
m_cpuBarrierCount[2 * opIndex + 1] = 0;
done = __sync_bool_compare_and_swap(entryCounter, entryVal, entryVal+1);
entryVal++;
}
return ncclSuccess;
}
@@ -320,12 +360,27 @@ ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll,
{
// NOTE: These are currently based on collected data and not necessarily ideal for all hardware
int numChannels;
if (totalBytes <= 65536) numChannels = 1;
else if (totalBytes <= 262144) numChannels = 2;
else if (totalBytes <= 524288) numChannels = 4;
else if (totalBytes <= 2097152) numChannels = 8;
else numChannels = 11;
switch (m_gcnArch)
{
case 906:
if (totalBytes <= 16384) numChannels = 1;
else numChannels = 2;
break;
case 908:
if (totalBytes <= 262144) numChannels = 4;
else numChannels = 14;
break;
case 910:
if (totalBytes <= 262144) numChannels = 4;
else numChannels = 8;
break;
default:
if (totalBytes <= 65536) numChannels = 1;
else if (totalBytes <= 262144) numChannels = 2;
else if (totalBytes <= 524288) numChannels = 4;
else if (totalBytes <= 2097152) numChannels = 8;
else numChannels = 11;
}
*numChannelstoUse = std::min(numChannels, totalNumChannels);
}
else
@@ -333,82 +388,79 @@ ncclResult_t CliqueManager::GetNumChannelsToUse(ncclFunc_t const coll,
*numChannelstoUse = std::min((int)rcclParamAllReduceNumChannels(), totalNumChannels);
}
}
return ncclSuccess;
}
ncclResult_t CliqueManager::SetCliqueArgs(ncclWorkElem* args)
ncclResult_t CliqueManager::WaitForPointers(ncclWorkElem* args)
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
}
// Prepare clique argments (NOTE: clique pointers are not ready yet)
int opIndex = args->opCount % NCCL_MAX_OPS;
// Check that collective queue is not empty
if (m_opIndexHead == m_opIndexTail)
{
WARN("WaitForPointers must be called after DeclarePointers");
return ncclInvalidUsage;
}
// Pop first collective off queue
int32_t const opIndex = m_opIndexHead;
INFO(NCCL_COLL, "Rank %d waiting for pointers for opIndex %d", m_rank, opIndex);
m_opIndexHead = (m_opIndexHead + 1) % NCCL_MAX_OPS;
args->clique.ptrs = &m_pinnedCliquePtrs[opIndex];
return ncclSuccess;
}
// Wait for all ranks to declare pointers for this opIndex
volatile int* entryCounter = (volatile int*)(&m_cpuBarrierCount[2 * opIndex]);
int entryVal = LOAD(entryCounter);
while (entryVal != m_numRanks) entryVal = LOAD(entryCounter);
ncclResult_t CliqueManager::WaitForPointers()
{
// Do nothing if disabled
if (m_cliqueMode == CLIQUE_DISABLED) return ncclSuccess;
if (!m_init)
{
WARN("CliqueManager must be initialized before use");
return ncclInvalidUsage;
// Last rank to past barrier resets entry barrier
// NOTE: There is another GPU-barrier performed during the kernels therefore it should
// not be possible for any rank to modify entry count prior to being reset
volatile int* exitCounter = &m_cpuBarrierCount[2 * opIndex + 1];
int exitVal = LOAD(exitCounter);
// Loop until successful atomic update to counter
bool done = false;
while (done == false) {
// Last rank resets entry counter
if (exitVal+1 == m_numRanks)
m_cpuBarrierCount[2 * opIndex] = 0;
done = __sync_bool_compare_and_swap(exitCounter, exitVal, exitVal+1);
exitVal++;
}
INFO(NCCL_COLL, "Rank %d past opIndex barrier %d", m_rank, opIndex);
// Do nothing if there are no outstanding clique-kernels
if (m_inProgress.empty()) return ncclSuccess;
// Copy clique device pointers to pinned device memory
// Collect pointers
if (m_cliqueMode == CLIQUE_SINGLE_NODE)
{
// Wait for all ranks to arrive
WaitForBarrier();
int numHandles = m_numRanks * NUM_HANDLES_PER_RANK;
std::vector<std::pair<hipIpcMemHandle_t,size_t>> handles(numHandles);
while (!m_inProgress.empty())
// Collect the ready handles from shared memory and convert them to device pointers
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
for (int i = 0; i < m_numRanks; i++)
{
int const opIndex = m_inProgress.front();
m_inProgress.pop();
void *input;
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
m_ipcHandleRecvCache, &input));
m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast<const void *>(input);
// Collect the ready handles from shared memory and convert them to device pointers
NCCLCHECK(m_shmHandles.ReadHandles(opIndex, handles));
for (int i = 0; i < m_numRanks; i++)
{
void *input;
NCCLCHECK(CheckCacheForHandle(handles[i * NUM_HANDLES_PER_RANK],
m_ipcHandleRecvCache, &input));
m_pinnedCliquePtrs[opIndex].inputs[i] = const_cast<const void *>(input);
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i]));
}
NCCLCHECK(CheckCacheForHandle(handles[(i * NUM_HANDLES_PER_RANK) + 1],
m_ipcHandleRecvCache, &m_pinnedCliquePtrs[opIndex].outputs[i]));
}
}
else if (m_cliqueMode == CLIQUE_SINGLE_PROCESS)
{
while (!m_inProgress.empty())
{
int const opIndex = m_inProgress.front();
m_inProgress.pop();
// Copy from static memory to pinned host memory and set local sense
memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t));
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
}
// Copy from static memory to pinned host memory and set local sense
memcpy(&m_pinnedCliquePtrs[opIndex], &m_staticCliquePtrs[opIndex], sizeof(cliqueDevicePtrs_t));
m_pinnedCliquePtrs[opIndex].barrier.localSense = &m_gpuBarrierLocalSense[opIndex];
}
return ncclSuccess;
}
@@ -486,21 +538,6 @@ ncclResult_t CliqueManager::CheckCacheForHandle(std::pair<hipIpcMemHandle_t, siz
return ncclSuccess;
}
void CliqueManager::WaitForBarrier()
{
// Sense inversion barrier
m_cpuBarrierLocalSense = 1 - m_cpuBarrierLocalSense;
if (__sync_add_and_fetch(m_cpuBarrierGlobalCount, 1) == m_numRanks)
{
// Reset the barrier
STORE(m_cpuBarrierGlobalCount, 0);
STORE(m_cpuBarrierGlobalSense, m_cpuBarrierLocalSense);
} else {
while (LOAD(m_cpuBarrierGlobalSense) != m_cpuBarrierLocalSense);
}
}
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
{
if (rcclParamEnableClique())
+13 -15
Просмотреть файл
@@ -25,7 +25,6 @@ THE SOFTWARE.
#include <semaphore.h>
#include <mutex>
#include <queue>
#include "nccl.h"
#include "devcomm.h"
@@ -53,6 +52,8 @@ public:
ncclResult_t Init(ncclUniqueId const* commId, int suffix);
void SetByteLimits();
// Returns true if the collective is supported via a clique-based kernel
bool IsSupported(ncclFunc_t const coll,
size_t const count,
@@ -60,7 +61,7 @@ public:
ncclRedOp_t const op) const;
// Provide the pointers to be exchanged across the clique for the given rank / opCount
ncclResult_t DeclarePointers(uint64_t opCount, void const* inputPtr, void* outputPtr);
ncclResult_t DeclarePointers(void const* inputPtr, void* outputPtr);
// Determine the number of channels / CUs to use for this call
ncclResult_t GetNumChannelsToUse(ncclFunc_t const coll,
@@ -70,12 +71,9 @@ public:
int const totalNumChannels,
uint8_t* numChannelstoUse);
// Set pointers for where clique-related arguments will be found
// This sets pointers to device-accessible memory where the arguments will eventually reside
ncclResult_t SetCliqueArgs(ncclWorkElem* args);
// Blocking call that only returns after all out-standing clique pointers are ready
ncclResult_t WaitForPointers();
// Blocking call that only returns the in-progress clique pointers are ready
// This needs to be called in same order as DeclarePointers
ncclResult_t WaitForPointers(ncclWorkElem* args);
// Prepares shared memory files upon initialization
static ncclResult_t BootstrapRootInit(int pid, unsigned long hash);
@@ -90,19 +88,20 @@ protected:
NcclIpcHandleRecvCache* cache,
void** ptr);
// Race-condition helper functions
void WaitForBarrier();
int m_rank; // Associated rank
int m_numRanks; // Total number of ranks
unsigned long m_hash; // Hash used for identifying message queues & shared memory
cliqueMode_t m_cliqueMode; // Clique mode (off/single process/single node)
int32_t m_opIndexHead; // Track start of outstanding requests
int32_t m_opIndexTail; // Track end of outstanding requests
bool m_init; // Whether CliqueManager has been initialized
int m_gcnArch; // Device GCN arch value
size_t m_allReduceByteLimit; // Byte limit for AllReduce
cliqueDevicePtrs_t* m_pinnedCliquePtrs; // Pinned-host-memory (device accessible) containing device pointers
int* m_gpuBarrierGlobalCount; // Part of GPU barrier (count variable shared across ranks)
int* m_gpuBarrierGlobalSense; // Part of GPU barrier (reset variable shared across ranks)
int* m_gpuBarrierLocalSense; // Part of GPU barrier (reset variable local to this rank)
std::queue<int> m_inProgress; // Queue of clique-based collectives waiting for pointers
int* m_cpuBarrierCount; // Points to either m_sharedBarrierCount or m_staticBarrierCount
// IPC-related (CLIQUE_SINGLE_NODE)
NcclIpcHandleShm m_shmHandles; // Used to exchange IPC handles between ranks
@@ -111,12 +110,11 @@ protected:
ShmObject<int32_t> m_sharedCpuMemory; // Used to pass shared memory used for CPU barrier
ShmObject<hipIpcMemHandle_t> m_sharedIpcHandle; // Used to pass fine-grained device memory buffer IPC handle
int* m_fineGrainBarrierMem; // Fine-grained GPU memory barrier (allocated only on 1st rank, shared on others)
int* m_cpuBarrierGlobalCount; // Part of CPU barrier (count variable shared across ranks)
int* m_cpuBarrierGlobalSense; // Part of CPU barrier (reset variable shared across ranks)
int m_cpuBarrierLocalSense; // Part of CPU barrier (reset variable local to this rank)
int* m_sharedBarrierCount; // Part of CPU barrier (count variable shared across ranks)
// Single-process (CLIQUE_SINGLE_PROCESS)
static cliqueDevicePtrs_t m_staticCliquePtrs[NCCL_MAX_OPS]; // Use shared static memory to exchange pointer info
static int m_staticBarrierCount[2*NCCL_MAX_OPS]; // Part of CPU barrier (count variable shared across ranks)
static int* m_staticGpuBarrierMem; // Static storage backing for fine-grained gpu barrier
};
+17 -16
Просмотреть файл
@@ -128,10 +128,6 @@ static ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params)
STORE(&channel->workFifo[(channel->workFifoTail-1)%NCCL_MAX_OPS].elems[0].active, 2);
}
{ // [RCCL] Wait for any clique-based collectives
NCCLCHECK(comm->cliqueManager->WaitForPointers());
} // [/RCCL]
// Find the first operation, choose the kernel accordingly and pass it
// as the first argument.
struct ncclChannel* c0 = comm->channels;
@@ -394,12 +390,6 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo
info->datatype,
info->op))
{
// Declare the input / output pointers being used (to exchange via IPC with other ranks)
NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->comm->opCount,
info->sendbuff,
info->recvbuff));
info->algorithm = NCCL_ALGO_RING;
info->protocol = NCCL_PROTO_CLIQUE;
// Determine the number of channels to use for clique-kernel
@@ -411,6 +401,9 @@ static ncclResult_t computeColl(struct ncclInfo* info /* input */, struct ncclWo
&work->clique.nChannels));
work->clique.count = info->count;
work->funcIndex = FUNC_INDEX(info->coll, info->op, info->datatype, info->algorithm, info->protocol);
// Setup pointers to where all the input/output pointers will be
NCCLCHECK(info->comm->cliqueManager->WaitForPointers(work));
return ncclSuccess;
}
} // [RCCL]
@@ -519,12 +512,6 @@ ncclResult_t ncclSaveKernel(struct ncclInfo* info) {
info->comm->myParams->gridDim.x++;
// [RCCL] Setup pointers to where all the input/output pointers will be
if (info->protocol == NCCL_PROTO_CLIQUE) {
NCCLCHECK(info->comm->cliqueManager->SetCliqueArgs(&work));
}
// [/RCCL]
work.coll.bid = bid % nChannels;
NCCLCHECK(getNextOp(channel, NULL, &work));
}
@@ -664,6 +651,20 @@ ncclResult_t ncclSaveP2pKernel(struct ncclInfo* info) {
}
ncclResult_t ncclEnqueueCheck(struct ncclInfo* info) {
// [RCCL] Check for clique-based kernel support
{
if (info->comm->cliqueManager->IsSupported(info->coll,
info->count,
info->datatype,
info->op))
{
// Declare the input / output pointers being used (to exchange via IPC with other ranks)
// This is done immediately, and does not block
NCCLCHECK(info->comm->cliqueManager->DeclarePointers(info->sendbuff, info->recvbuff));
}
}
// [/RCCL]
// Launch asynchronously if needed
if (ncclAsyncMode()) {
ncclResult_t ret = ncclSuccess;
+12 -10
Просмотреть файл
@@ -30,6 +30,7 @@
// [RCCL]
#include "clique/CliqueManager.h"
#include <hsa/hsa_ext_amd.h>
// [/RCCL]
#define STR2(v) #v
@@ -363,7 +364,7 @@ static ncclResult_t commFree(ncclComm_t comm) {
return ncclSuccess;
}
RCCL_PARAM(ForceEnableClique, "FORCE_ENABLE_CLIQUE", 0);
RCCL_PARAM(CliqueIgnoreTopo, "CLIQUE_IGNORE_TOPO", 0);
RCCL_PARAM(P2pNetDisable, "P2P_NET_DISABLE", 0);
static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) {
@@ -865,7 +866,8 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
CliqueManager::cliqueMode_t cliqueMode = CliqueManager::CLIQUE_DISABLED;
if (comm->localRanks == comm->nRanks)
{
// Check that all the GPUs have peer access to one another
// Check that all the GPUs have peer access to one another and are XGMI connected
bool allXgmi = true;
bool hasPeerAccess = true;
for (int i = 0; i < nranks && hasPeerAccess; i++)
{
@@ -880,6 +882,10 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
hasPeerAccess = false;
break;
}
uint32_t linkType, hopCount;
CUDACHECK(hipExtGetLinkTypeAndHopCount(i, j, &linkType, &hopCount));
allXgmi &= (linkType == HSA_AMD_LINK_INFO_TYPE_XGMI);
}
}
if (hasPeerAccess)
@@ -890,15 +896,11 @@ static ncclResult_t initTransportsRank(struct ncclComm* comm, ncclUniqueId* comm
cliqueMode = CliqueManager::CLIQUE_SINGLE_NODE;
}
// For now, only enable clique-based kernels on CR8_G topologies, unless explicitly asked
if (!rcclParamForceEnableClique())
// For now, only enable clique-based kernels on nodes where all GPUs are XGMI connected
if (!allXgmi && !rcclParamCliqueIgnoreTopo())
{
// Disable clique-kernel support if not on CR8 topology
if (!(comm->topo->nodes[GPU].count == comm->topo->nRanks && (comm->topo->type & RCCL_TOPO_CR8G)))
{
INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (force enable with RCCL_FORCE_ENABLE_CLIQUE)");
cliqueMode = CliqueManager::CLIQUE_DISABLED;
}
INFO(NCCL_INIT, "Disabling clique-based kernels due to topology (ignore with RCCL_CLIQUE_IGNORE_TOPO)");
cliqueMode = CliqueManager::CLIQUE_DISABLED;
}
}
comm->cliqueManager = new CliqueManager(rank, nranks, cliqueMode);
+2
Просмотреть файл
@@ -11,6 +11,7 @@ if(BUILD_TESTS)
set(TEST_SOURCES_SINGLE_PROCESS
test_AllGather.cpp
test_AllReduce.cpp
test_AllReduceGroup.cpp
test_Broadcast.cpp
test_Reduce.cpp
test_ReduceScatter.cpp
@@ -27,6 +28,7 @@ if(BUILD_TESTS)
set(TEST_SOURCES_MULTI_PROCESS
test_AllGatherMultiProcess.cpp
test_AllReduceMultiProcess.cpp
test_AllReduceGroupMultiProcess.cpp
test_AllToAllMultiProcess.cpp
test_BroadcastMultiProcess.cpp
test_CombinedCallsMultiProcess.cpp
+49 -49
Просмотреть файл
@@ -8,56 +8,56 @@
namespace CorrectnessTests
{
TEST_P(AllReduceCorrectnessTest, Correctness)
TEST_P(AllReduceCorrectnessTest, Correctness)
{
if (numDevices > numDevicesAvailable) return;
// Prepare input / output / expected results
Dataset dataset;
dataset.Initialize(numDevices, numElements, dataType, inPlace);
FillDatasetWithPattern(dataset);
ComputeExpectedResults(dataset, op);
// Launch the reduction (1 thread per GPU)
ncclGroupStart();
for (int i = 0; i < numDevices; i++)
{
if (numDevices > numDevicesAvailable) return;
// Prepare input / output / expected results
Dataset dataset;
dataset.Initialize(numDevices, numElements, dataType, inPlace);
FillDatasetWithPattern(dataset);
ComputeExpectedResults(dataset, op);
// Launch the reduction (1 thread per GPU)
ncclGroupStart();
for (int i = 0; i < numDevices; i++)
{
ncclAllReduce(dataset.inputs[i], dataset.outputs[i],
numElements, dataType, op, comms[i], streams[i]);
}
ncclGroupEnd();
// Wait for reduction to complete
Synchronize();
// Check results
ValidateResults(dataset);
dataset.Release();
ncclAllReduce(dataset.inputs[i], dataset.outputs[i],
numElements, dataType, op, comms[i], streams[i]);
}
ncclGroupEnd();
INSTANTIATE_TEST_SUITE_P(AllReduceCorrectnessSweep,
AllReduceCorrectnessTest,
testing::Combine(
// Reduction operator
testing::Values(ncclSum, ncclProd, ncclMax, ncclMin),
// Data types
testing::Values(ncclInt8,
ncclUint8,
ncclInt32,
ncclUint32,
ncclInt64,
ncclUint64,
//ncclFloat16,
ncclFloat32,
ncclFloat64,
ncclBfloat16),
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4,5,6,7,8),
// In-place or not
testing::Values(false, true),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
// Wait for reduction to complete
Synchronize();
// Check results
ValidateResults(dataset);
dataset.Release();
}
INSTANTIATE_TEST_SUITE_P(AllReduceCorrectnessSweep,
AllReduceCorrectnessTest,
testing::Combine(
// Reduction operator
testing::Values(ncclSum, ncclProd, ncclMax, ncclMin),
// Data types
testing::Values(ncclInt8,
ncclUint8,
ncclInt32,
ncclUint32,
ncclInt64,
ncclUint64,
//ncclFloat16,
ncclFloat32,
ncclFloat64,
ncclBfloat16),
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4,5,6,7,8),
// In-place or not
testing::Values(false, true),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace
+66
Просмотреть файл
@@ -0,0 +1,66 @@
/*************************************************************************
* Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "test_AllReduceGroup.hpp"
namespace CorrectnessTests
{
// This tests aggregated AllReduce calls within a group
TEST_P(AllReduceGroupCorrectnessTest, Correctness)
{
if (numDevices > numDevicesAvailable) return;
// Prepare input / output / expected results
Dataset dataset1, dataset2, dataset3;
dataset1.Initialize(numDevices, numElements, dataType, inPlace);
dataset2.Initialize(numDevices, numElements, dataType, inPlace);
dataset3.Initialize(numDevices, numElements, dataType, inPlace);
FillDatasetWithPattern(dataset1);
FillDatasetWithPattern(dataset2);
FillDatasetWithPattern(dataset3);
ComputeExpectedResults(dataset1, op);
ComputeExpectedResults(dataset2, op);
ComputeExpectedResults(dataset3, op);
// Launch the reduction (1 thread per GPU)
ncclGroupStart();
for (int i = 0; i < numDevices; i++)
{
ncclAllReduce(dataset1.inputs[i], dataset1.outputs[i], numElements, dataType, op, comms[i], streams[i]);
ncclAllReduce(dataset2.inputs[i], dataset2.outputs[i], numElements, dataType, op, comms[i], streams[i]);
ncclAllReduce(dataset3.inputs[i], dataset3.outputs[i], numElements, dataType, op, comms[i], streams[i]);
}
ncclGroupEnd();
// Wait for reduction to complete
Synchronize();
// Check results
ValidateResults(dataset1);
ValidateResults(dataset2);
ValidateResults(dataset3);
dataset1.Release();
dataset2.Release();
dataset3.Release();
}
INSTANTIATE_TEST_SUITE_P(AllReduceGroupCorrectnessSweep,
AllReduceGroupCorrectnessTest,
testing::Combine(
// Reduction operator
testing::Values(ncclSum),
// Data types
testing::Values(ncclFloat32, ncclFloat64),
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4,5,6,7,8),
// In-place or not
testing::Values(false, true),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace
+79
Просмотреть файл
@@ -0,0 +1,79 @@
/*************************************************************************
* Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef TEST_ALLREDUCEGROUP_HPP
#define TEST_ALLREDUCEGROUP_HPP
#include "CorrectnessTest.hpp"
namespace CorrectnessTests
{
class AllReduceGroupCorrectnessTest : public CorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, ncclRedOp_t const op)
{
// Copy all inputs to expected arrays temporarily to perform reduction on host
for (int i = 0; i < dataset.numDevices; i++)
HIP_CALL(hipMemcpy(dataset.expected[i], dataset.inputs[i],
dataset.NumBytes(), hipMemcpyDeviceToHost));
// Allocate temporary host array to accumulate results
int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes());
uint8_t* resultU1 = (uint8_t *)resultI1;
int32_t* resultI4 = (int32_t *)resultI1;
uint32_t* resultU4 = (uint32_t *)resultI1;
int64_t* resultI8 = (int64_t *)resultI1;
uint64_t* resultU8 = (uint64_t *)resultI1;
float* resultF4 = (float *)resultI1;
double* resultF8 = (double *)resultI1;
rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1;
// Initialize the result with the first device's array
memcpy(resultI1, dataset.expected[0], dataset.NumBytes());
// Perform reduction on the other device arrays
for (int i = 1; i < dataset.numDevices; i++)
{
int8_t* arrayI1 = (int8_t *)dataset.expected[i];
uint8_t* arrayU1 = (uint8_t *)arrayI1;
int32_t* arrayI4 = (int32_t *)arrayI1;
uint32_t* arrayU4 = (uint32_t *)arrayI1;
int64_t* arrayI8 = (int64_t *)arrayI1;
uint64_t* arrayU8 = (uint64_t *)arrayI1;
float* arrayF4 = (float *)arrayI1;
double* arrayF8 = (double *)arrayI1;
rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1;
for (int j = 0; j < dataset.numElements; j++)
{
switch (dataset.dataType)
{
case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break;
case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break;
case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break;
case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break;
case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break;
case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break;
case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break;
case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break;
case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break;
default:
fprintf(stderr, "[ERROR] Unsupported datatype\n");
exit(0);
}
}
}
// Copy results into expected arrays
for (int i = 0; i < dataset.numDevices; i++)
memcpy(dataset.expected[i], resultI1, dataset.NumBytes());
free(resultI1);
}
};
}
#endif
+81
Просмотреть файл
@@ -0,0 +1,81 @@
/*************************************************************************
* Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "test_AllReduceGroupMultiProcess.hpp"
namespace CorrectnessTests
{
TEST_P(AllReduceGroupMultiProcessCorrectnessTest, Correctness)
{
// Important: Make sure the order of ncclFunc_t's here match the order of ncclFunc_ts
// as they appear in TestGroupCalls()
std::vector<ncclFunc_t> ncclFuncs;
ncclFuncs.push_back(ncclCollAllReduce);
ncclFuncs.push_back(ncclCollAllReduce);
ncclFuncs.push_back(ncclCollAllReduce);
// Create multiple datasets for combined operation
std::vector<Dataset*> datasets(ncclFuncs.size());
for (int i = 0; i < datasets.size(); i++)
{
datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]);
}
int const numGpusPerProcess = 2;
int const numProcesses = numDevices / numGpusPerProcess;
std::vector<int> pids(numProcesses);
int process = -1;
for (int i = 0; i < numDevices; i+= numGpusPerProcess)
{
process++;
int pid = fork();
if (pid == 0)
{
int gpuIdx = i;
int maxIdx = gpuIdx + (numGpusPerProcess - 1) >= numDevices ? numDevices : gpuIdx + numGpusPerProcess;
std::vector<int> ranks;
for (; gpuIdx < maxIdx; gpuIdx++)
{
ranks.push_back(gpuIdx);
}
bool pass;
TestGroupCalls(process, ranks, datasets, ncclFuncs, pass);
TerminateChildProcess(pass);
}
else
{
pids[process] = pid;
}
}
ValidateProcesses(pids);
for (int i = 0; i < datasets.size(); i++)
{
munmap(datasets[i], sizeof(Dataset));
}
}
INSTANTIATE_TEST_SUITE_P(AllReduceGroupMultiProcessCorrectnessSweep,
AllReduceGroupMultiProcessCorrectnessTest,
testing::Combine(
// Reduction operator (not used)
testing::Values(ncclSum),
// Data types
testing::Values(ncclFloat32,
ncclFloat64),
// Number of elements
testing::Values(3072, 3145728),
// Number of devices
testing::Values(4,8),
// In-place or not
testing::Values(false, true),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace
+105
Просмотреть файл
@@ -0,0 +1,105 @@
/*************************************************************************
* Copyright (c) 2021 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#ifndef TEST_ALLREDUCEGROUP_MULTI_PROCESS_HPP
#define TEST_ALLREDUCEGROUP_MULTI_PROCESS_HPP
#include "CorrectnessTest.hpp"
#include "test_AllReduceMultiProcess.hpp"
#include <string>
namespace CorrectnessTests
{
class AllReduceGroupMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
void TestGroupCalls(int process, std::vector<int> const& ranks, std::vector<Dataset*>& datasets, std::vector<ncclFunc_t> const& funcs, bool& pass)
{
ncclGroupStart();
for (int i = 0; i < ranks.size(); i++)
{
SetUpPerProcess(ranks[i], funcs, comms[ranks[i]], streams[ranks[i]], datasets);
if (numDevices > numDevicesAvailable)
{
break;
}
}
ncclGroupEnd();
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
int numProcesses = numDevices / ranks.size();
Barrier barrier(process, numProcesses, std::atoi(getenv("NCCL_COMM_ID")));
for (int i = 0; i < ranks.size(); i++)
{
for (int j = 0; j < datasets.size(); j++)
{
FillDatasetWithPattern(*datasets[j], ranks[i]);
}
}
int const root = 0;
for (int i = 0; i < 3; i++)
{
AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[i], barrier, op, ranks);
}
barrier.Wait();
size_t const byteCount = datasets[0]->NumBytes() / numDevices;
size_t const elemCount = numElements / numDevices;
ncclGroupStart();
// AllReduce
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
for (int j = 0; j < 3; j++)
{
ncclAllReduce(datasets[j]->inputs[rank], datasets[j]->outputs[rank],
numElements, dataType, op, comms[rank], streams[rank]);
}
}
// Signal end of group call
ncclGroupEnd();
for (int i = 0; i < ranks.size(); i++)
{
HIP_CALL(hipSetDevice(ranks[i]));
HIP_CALL(hipStreamSynchronize(streams[ranks[i]]));
}
for (int i = 0; i < funcs.size(); i++)
{
for (int j = 0; j < ranks.size(); j++)
{
pass = ValidateResults(*datasets[i], ranks[j], root);
if (!pass)
{
break;
}
}
barrier.Wait();
for (int j = 0; j < ranks.size(); j++)
{
datasets[i]->Release(ranks[j]);
}
}
for (int i = 0; i < ranks.size(); i++)
{
TearDownPerProcess(comms[ranks[i]], streams[ranks[i]]);
}
}
};
}
#endif
+46 -46
Просмотреть файл
@@ -8,53 +8,53 @@
namespace CorrectnessTests
{
TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness)
TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness)
{
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce);
std::vector<int> pids(numDevices);
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce);
std::vector<int> pids(numDevices);
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
gpu++;
int pid = fork();
if (pid == 0)
{
bool pass;
TestAllReduce(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
pids[gpu] = pid;
}
}
ValidateProcesses(pids);
gpu++;
int pid = fork();
if (pid == 0)
{
bool pass;
TestAllReduce(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
pids[gpu] = pid;
}
}
INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep,
AllReduceMultiProcessCorrectnessTest,
testing::Combine(
// Reduction operator
testing::Values(ncclSum, ncclProd, ncclMax, ncclMin),
// Data types
testing::Values(ncclInt8,
ncclUint8,
ncclInt32,
ncclUint32,
ncclInt64,
ncclUint64,
//ncclFloat16,
ncclFloat32,
ncclFloat64,
ncclBfloat16),
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
CorrectnessTest::PrintToStringParamName());
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep,
AllReduceMultiProcessCorrectnessTest,
testing::Combine(
// Reduction operator
testing::Values(ncclSum, ncclProd, ncclMax, ncclMin),
// Data types
testing::Values(ncclInt8,
ncclUint8,
ncclInt32,
ncclUint32,
ncclInt64,
ncclUint64,
//ncclFloat16,
ncclFloat32,
ncclFloat64,
ncclBfloat16),
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("RCCL_ENABLE_CLIQUE=0", "RCCL_ENABLE_CLIQUE=1")),
CorrectnessTest::PrintToStringParamName());
} // namespace