Misc fixes and improvements for 2.5.6

1. Fix RCCL unit test
2. Add ROME detection and tuning
3. Change default P2P level
4. Fix search algorithm for XGMI
5. Remove explicit channel duplication with implicit by using half of link speed
6. Add collective trace support
7. Correct Intel Skylake CPU detection and bandwidth
8. Fix topo connect function
9. Disable GDR read and remove unreachable code
10. Disable LL128 kernels
11. Add tuning parameters
12. Use original clock64() implementation which returns RTC counter value
13. Print out timestamp of collective trace
14. Do not use struct ncclColl in kernel launch parameter
15. Fix abort handling and add tracing
17. Add __launch_bounds__ to kernel functions
18. Remove unused abortCount
19. Unset default MIN_NRINGS and MIN_NCHANNELS
20. Do not allocate shared memory when not using LL128 kernels
21. Correct time print out in tuning log


[ROCm/rccl commit: 1e55645d97]
Αυτή η υποβολή περιλαμβάνεται σε:
Wenkai Du
2019-11-26 16:33:13 -08:00
γονέας d7d4175df0
υποβολή d2fbcfea02
20 αρχεία άλλαξαν με 307 προσθήκες και 121 διαγραφές
@@ -147,6 +147,11 @@ if(PROFILE)
add_definitions(-DENABLE_PROFILING)
endif()
set(COLLTRACE 1 CACHE BOOL "Collective Trace Option")
if(COLLTRACE)
add_definitions(-DENABLE_COLLTRACE)
endif()
foreach(target ${AMDGPU_TARGETS})
target_link_libraries(rccl PRIVATE --amdgpu-target=${target})
endforeach()
@@ -26,7 +26,7 @@ __device__ void ncclAllReduceRingKernel(struct CollectiveArgs* args) {
#ifdef ENABLE_PROFILING
auto devProf = comm->devProf;
uint64_t clk, t0 = 0ULL, ws, wr;
if (tid == 0) clk = clock64();
if (tid == 0) clk = __rtc64();
#endif
// Compute pointers
@@ -98,7 +98,7 @@ __device__ void ncclAllReduceRingKernel(struct CollectiveArgs* args) {
ACCUMULATE_COUNTER(directRecv);
}
#ifdef ENABLE_PROFILING
if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), clock64() - clk, __ATOMIC_SEQ_CST);
if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), __rtc64() - clk, __ATOMIC_SEQ_CST);
#endif
}
@@ -28,7 +28,7 @@ __device__ void ncclBroadcastRingKernel(struct CollectiveArgs* args) {
#ifdef ENABLE_PROFILING
auto devProf = comm->devProf;
uint64_t clk, t0 = 0ULL, ws, wr;
if (tid == 0) clk = clock64();
if (tid == 0) clk = __rtc64();
#endif
// Compute pointers
@@ -65,7 +65,7 @@ __device__ void ncclBroadcastRingKernel(struct CollectiveArgs* args) {
}
}
#ifdef ENABLE_PROFILING
if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), clock64() - clk, __ATOMIC_SEQ_CST);
if (tid == 0) __atomic_fetch_add(&(devProf->total_cycle), __rtc64() - clk, __ATOMIC_SEQ_CST);
#endif
}
@@ -12,6 +12,16 @@
#include "collectives.h"
#include "devcomm.h"
__device__
inline __attribute((always_inline))
long long int __rtc64() {
#if __HIP__
return (long long int) __builtin_amdgcn_s_memrealtime();
#else
return (long long int) __clock_u64();
#endif
}
// Exit If Abort Barrier across CTA: make sure all threads exit consistently
// Each thread sets a predicate to true if abort == 1
// all CTA's threads enter the barrier and do a popc on their predicates being True
@@ -20,7 +30,7 @@
#define exitIfAbortBarrier(abort, abortCount) \
if (abort) __atomic_fetch_add(abortCount, 1, __ATOMIC_SEQ_CST); \
__syncthreads(); \
if (LOAD(abortCount)) { asm volatile ("s_endpgm"); return; }
if (LOAD(abortCount)) { /*asm volatile ("s_endpgm");*/ return false; }
#define __syncwarp()
#else
static inline __device__ void exitIfAbortBarrier(int abort) {
@@ -36,7 +46,7 @@ static inline __device__ void exitIfAbortBarrier(int abort) {
#define NCCL_FUNC5(coll, op, dtype) \
NCCL_COLL_NAME(coll##LL, op, dtype), \
NCCL_COLL_NAME(coll##LL128, op, dtype), \
NCCL_COLL_NAME(coll##LL, op, dtype), \
NCCL_COLL_NAME(coll, op, dtype)
#define NCCL_FUNC4(coll, op, dtype) \
@@ -149,17 +159,54 @@ static __device__ void load_parallel(void* dst, void* src, size_t size, int tid,
for (int o = tid; o < (size/sizeof(int)); o += blockDim.x) d[o] = s[o];
}
static __device__ void load_coll(struct ncclColl* localColl, struct ncclColl* hostColl, int tid, struct ncclDevComm* comm, uint32_t* abortCount) {
static __device__ bool load_coll(struct ncclColl* localColl, struct ncclColl* hostColl, int tid, struct ncclDevComm* comm, uint32_t* abortCount) {
// Check whether the last operation was aborted and make sure all threads exit
int abort = tid == 0 ? *(comm->abortFlag) : 0;
exitIfAbortBarrier(abort, abortCount);
load_parallel(localColl, hostColl, sizeof(struct ncclColl), tid, abortCount);
__syncthreads();
if (tid == 0) hostColl->active = 0;
return true;
}
#ifdef ENABLE_COLLTRACE
#define traceColl(fIdx) \
uint32_t pos = __atomic_fetch_add(comm->collTraceTail, 1, __ATOMIC_SEQ_CST)%COLLTRACE_NUM_ITEMS; \
comm->collTrace[pos].timeStamp = __rtc64(); \
comm->collTrace[pos].opCount = localColl.args.opCount; \
comm->collTrace[pos].bid = bid; \
comm->collTrace[pos].funcIndex = fIdx;
#define traceKernelLaunch(fIdx) { \
traceColl(fIdx); \
comm->collTrace[pos].type = ncclCollTraceKernelLaunchType; \
asm volatile ("s_getreg_b32 %0, hwreg(HW_REG_HW_ID)" : "=s" (comm->collTrace[pos].data_0)); \
}
#define traceCollEnd(fIdx) { \
traceColl(fIdx); \
comm->collTrace[pos].type = ncclCollTraceCollEndType; \
}
#define traceAbort(fIdx) { \
traceColl(fIdx); \
comm->collTrace[pos].type = ncclCollTraceAbortType; \
}
#else
#define traceKernelLaunch()
#define traceCollEnd()
#define traceAbort()
#endif
extern __device__ volatile uint64_t* ncclShmem;
#ifdef ENABLE_LL128
#define ALLOCATE_SHMEM \
__shared__ volatile uint64_t shmem[NCCL_LL128_SHMEM_SIZE]; \
ncclShmem = shmem; \
__shared__ uint32_t sync[NCCL_LL128_MAX_NTHREADS/WARP_SIZE];
#else
#define ALLOCATE_SHMEM \
uint32_t* sync = 0;
#endif
/* Functions for aggregation case */
#define IMPL_COLL_FUNC(coll, op, ncclFunc, dtype, ctype) \
__device__ void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args) { \
@@ -168,47 +215,45 @@ __device__ void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args) { \
/* Kernels with the first operation inlined */
#define IMPL_COLL_KERN(coll, op, ncclFunc, dtype, ctype, fIndex) \
__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl firstColl) { \
__launch_bounds__(NCCL_MAX_NTHREADS, 1) \
__global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclDevComm* comm) { \
int tid = threadIdx.x; \
int bid = blockIdx.x; \
__shared__ volatile uint64_t shmem[NCCL_LL128_SHMEM_SIZE]; \
ncclShmem = shmem; \
ALLOCATE_SHMEM; \
__shared__ struct ncclColl localColl; \
__shared__ uint32_t abortCount; \
__shared__ uint32_t sync[NCCL_LL128_MAX_NTHREADS/WARP_SIZE]; \
if (tid == 0) abortCount = 0; \
__syncthreads(); \
\
struct ncclDevComm* comm = firstColl.args.comm; \
struct ncclChannel* channel = comm->channels+bid; \
struct ncclColl* c; \
channel->abortCount = &abortCount; \
channel->sync = sync; \
if (bid == 0) { \
/* To optimize for latency, (only) the first operation is passed as argument.*/ \
c = &firstColl; \
} else { \
c = &localColl; \
load_coll(c, channel->devCollectives+channel->collFifoHead, tid, comm, &abortCount); \
if (!load_coll(&localColl, channel->devCollectives+channel->collFifoHead, tid, comm, &abortCount)) { \
if (tid == 0) traceAbort(-1); \
return; \
} \
if (tid == 0) traceKernelLaunch(localColl.funcIndex); \
while (1) { \
if (tid < c->args.nThreads) { \
if (c->funcIndex == fIndex) { \
coll##Kernel<COLL_UNROLL, ncclFunc<ctype>, ctype>(&c->args); \
if (tid < localColl.args.nThreads) { \
if (localColl.funcIndex == fIndex) { \
coll##Kernel<COLL_UNROLL, ncclFunc<ctype>, ctype>(&localColl.args); \
} else { \
NCCL_CALL_FUNCTIONS(c); \
NCCL_CALL_FUNCTIONS(&localColl); \
} \
} \
int nextIndex = c->nextIndex; \
int nextIndex = localColl.nextIndex; \
if (tid == 0) channel->collFifoHead = nextIndex; \
\
if (c->active == 2) { \
if (localColl.active == 2) { \
if (tid == 0) traceCollEnd(-1); \
return; \
} \
\
/* Load next collective operation*/ \
c = &localColl; /* for bid 0 */ \
load_coll(c, channel->devCollectives+nextIndex, tid, comm, &abortCount); \
if (!load_coll(&localColl, channel->devCollectives+nextIndex, tid, comm, &abortCount)) { \
if (tid == 0) traceAbort(-1); \
break; \
} \
if (tid == 0) traceCollEnd(localColl.funcIndex); \
} \
}
@@ -65,8 +65,7 @@ class ncclPrimitives {
#endif
const T* recvBuff[NRECV];
T* sendBuff[NSEND];
struct ncclDevComm* comm;
uint32_t* abortCount;
const struct ncclDevComm* comm;
inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepSize; }
inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepSize; }
@@ -112,14 +111,14 @@ class ncclPrimitives {
if (sendConnHeadPtr) {
#ifdef ENABLE_PROFILING
auto devProf = comm->devProf;
uint64_t t0 = clock64();
uint64_t t0 = __rtc64();
#endif
while (sendConnHeadCache + NCCL_STEPS < sendConnHead + SLICESTEPS) {
sendConnHeadCache = LOAD(sendConnHeadPtr);
if (checkAbort(wid, 1)) break;
}
#ifdef ENABLE_PROFILING
__atomic_fetch_add(&devProf->wait_send_cycle[blockIdx.x], clock64() - t0, __ATOMIC_SEQ_CST);
__atomic_fetch_add(&devProf->wait_send_cycle[blockIdx.x], __rtc64() - t0, __ATOMIC_SEQ_CST);
#endif
if (sendConnFifoPtr) {
STORE(sendConnFifoPtr+sendConnHead%NCCL_STEPS, nbytes);
@@ -134,14 +133,14 @@ class ncclPrimitives {
if (recvConnTailPtr) {
#ifdef ENABLE_PROFILING
auto devProf = comm->devProf;
uint64_t t0 = clock64();
uint64_t t0 = __rtc64();
#endif
while (recvConnTailCache < recvConnTail + SLICESTEPS) {
recvConnTailCache = LOAD(recvConnTailPtr);
if (checkAbort(wid, 0)) break;
}
#ifdef ENABLE_PROFILING
__atomic_fetch_add(&devProf->wait_recv_cycle[blockIdx.x], clock64() - t0, __ATOMIC_SEQ_CST);
__atomic_fetch_add(&devProf->wait_recv_cycle[blockIdx.x], __rtc64() - t0, __ATOMIC_SEQ_CST);
#endif
recvConnTail += SLICESTEPS;
}
@@ -340,7 +339,6 @@ inline __device__ int directSendInc(int i, int directInc, int sliceInc) {
ncclPrimitives(const int tid, const int nthreads, int* recvPeers, int* sendPeers, T* directBuff, int stepSize, struct ncclChannel* channel, struct ncclDevComm* comm, const uint64_t opCount)
: comm(comm), tid(tid), nthreads(nthreads), wid(tid%WARP_SIZE), stepSize(stepSize), opCount(opCount) {
// Make sure step is updated before we read it.
abortCount = channel->abortCount;
barrier();
for (int i=0; i<NRECV && recvPeers[i] >= 0; i++) loadRecvConn(&channel->devPeers[recvPeers[i]].recv.conn, i, 0);
@@ -417,11 +415,11 @@ inline __device__ int directSendInc(int i, int directInc, int sliceInc) {
#ifdef ENABLE_PROFILING
#define INIT_COUNTER \
if (tid == 0) { t0 = clock64(); ws = LOAD(&(devProf->wait_send_cycle[blockIdx.x])); \
if (tid == 0) { t0 = __rtc64(); ws = LOAD(&(devProf->wait_send_cycle[blockIdx.x])); \
wr = LOAD(&(devProf->wait_recv_cycle[blockIdx.x])); }
#define ACCUMULATE_COUNTER(prim) \
if (tid == 0) { __atomic_fetch_add(&(devProf->prim##_cycle), clock64() - t0 \
if (tid == 0) { __atomic_fetch_add(&(devProf->prim##_cycle), __rtc64() - t0 \
+ ws - LOAD(&(devProf->wait_send_cycle[blockIdx.x])) \
+ wr - LOAD(&(devProf->wait_recv_cycle[blockIdx.x])), \
__ATOMIC_SEQ_CST); \
@@ -54,7 +54,7 @@
NCCL_FUNCS3B(coll, copy), \
NCCL_FUNCS3B(coll, copy)
typedef void(*ncclKern_t)(struct ncclColl);
typedef void(*ncclKern_t)(struct ncclDevComm*);
// Must be consistent with the ncclFuncSet enum
static ncclKern_t const ncclKerns[NCCL_NUM_FUNCTIONS*ncclNumOps*ncclNumTypes*NCCL_NUM_ALGORITHMS*NCCL_NUM_PROTOCOLS] = {
NCCL_FUNCS2B(ncclBroadcast),
@@ -80,7 +80,7 @@ ncclResult_t ncclLaunchCooperativeKernelMultiDevice(hipLaunchParams *paramsList,
for (int i = 0; i < numDevices; i++) {
hipLaunchParams* params = paramsList+i;
CUDACHECK(hipSetDevice(cudaDevs[i]));
hipLaunchKernelGGL(((void (*)(struct ncclColl))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclColl **)(params->args)));
hipLaunchKernelGGL(((void (*)(struct ncclDevComm*))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclDevComm ***)(params->args)));
}
CUDACHECK(hipSetDevice(savedDev));
return ncclSuccess;
@@ -98,10 +98,8 @@ ncclResult_t setupLaunch(struct ncclComm* comm, hipLaunchParams* params) {
// Find the first operation, choose the kernel accordingly and pass it
// as the first argument.
struct ncclColl* coll = comm->channels[0].collectives+comm->channels[0].collStart;
memcpy(&comm->args, coll, sizeof(struct ncclColl));
// As we pass that coll directly, we can free it immediately.
STORE(&coll->active, 0);
comm->args = comm->devComm;
params->func = (void *)ncclKerns[coll->funcIndex];
return ncclSuccess;
}
@@ -194,7 +192,7 @@ ncclResult_t ncclBarrierEnqueueWait(ncclComm_t comm) {
hipLaunchParams *params = comm->myParams;
if (comm->launchMode == ncclComm::PARALLEL) {
hipLaunchKernelGGL(((void (*)(struct ncclColl))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclColl **)(params->args)));
hipLaunchKernelGGL(((void (*)(struct ncclDevComm*))params->func), params->gridDim, params->blockDim, params->sharedMem, params->stream, **((struct ncclDevComm ***)(params->args)));
}
// Start the network proxies as soon as the kernel has been launched. We can't
// perform any CUDA call between the two or having a hipFree between the CUDA
@@ -232,9 +230,9 @@ ncclResult_t ncclEnqueueEvents(ncclComm_t comm) {
// Trees are not perfectly sticking to the model for medium sizes. Applying a static correction
// factor is not ideal but works quite well. Powers of two, 64 B to 1 GB.
static float treeCorrectionFactor[NCCL_NUM_PROTOCOLS][22] = {
{ 1.0, 1.0, 1.0, 1.0, .9, .8, .7, .7, .7, .7, .6, .5, .5, .5, .6, .7, .8, .9, .9, 1.0, 1.0, 1.0 },
{ 1.0, 1.0, 1.0, 1.0, 1.0, .9, .8, .8, .8, .8, .7, .7, .7, .6, .6, .7, .7, .8, .8, .9, .9, 1.0 },
{ .9, .9, .9, .9, .9, .9, .9, .8, .7, .6, .6, .5, .5, .5, .5, .5, .5, .6, .6, .7, .8, .9 }
{ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, .84, .49, .42, .60, .75, .87, .94, .94, .99, 1.0, 1.0 , 1.0 , 1.0 , 1.0 , 1.0 },
{ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, .84, .49, .42, .60, .75, .87, .94, .94, .99, 1.0, 1.0 , 1.0 , 1.0 , 1.0 , 1.0 },
{ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, .41, .27, .25, .39, .46, .72, .76, .87, .92, .97, 1.0, 1.0 , 1.0 , 1.0 , 1.0 , 1.0 }
};
static ncclResult_t getAlgoInfo(struct ncclInfo* info) {
@@ -262,7 +260,7 @@ static ncclResult_t getAlgoInfo(struct ncclInfo* info) {
return ncclInternalError;
}
if (comm->rank == 0) INFO(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %d", info->nBytes, info->algorithm, info->protocol, minTime);
if (comm->rank == 0) INFO(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %d", info->nBytes, info->algorithm, info->protocol, (int)minTime);
TRACE(NCCL_COLL, "%ld Bytes -> Algo %d proto %d time %f", info->nBytes, info->algorithm, info->protocol, minTime);
int nc = comm->nChannels;
@@ -182,7 +182,7 @@ NCCL_PARAM(MinNchannels, "MIN_NCHANNELS", -2);
NCCL_PARAM(MaxNchannels, "MAX_NCHANNELS", -2);
int ncclMinNchannels() {
int minNchannels = 0;
int minNchannels = 2;
if (ncclParamMinNrings() != -2) minNchannels = ncclParamMinNrings();
if (ncclParamMinNchannels() != -2) minNchannels = ncclParamMinNchannels();
if (minNchannels > MAXCHANNELS) {
@@ -234,12 +234,14 @@ ncclResult_t ncclTopoPostset(struct ncclComm* comm, int* firstRanks, struct nccl
NCCLCHECK(connectRings(comm, ringRecv, ringSend, ringPrev, ringNext, firstRanks));
NCCLCHECK(connectTrees(comm, treeUpRecv, treeUpSend, treeDnRecv, treeDnSend, firstRanks));
// Duplicate ringPrev/ringNext for ncclBuildRing
memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int));
memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int));
if (nChannels == 1) {
// Duplicate ringPrev/ringNext for ncclBuildRing
memcpy(ringPrev+nChannels*nranks, ringPrev, nChannels*nranks*sizeof(int));
memcpy(ringNext+nChannels*nranks, ringNext, nChannels*nranks*sizeof(int));
// Duplication should be complete now
nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2);
// Duplication should be complete now
nChannels = comm->nChannels = std::min(MAXCHANNELS,nChannels*2);
}
// Honor NCCL_MIN_NRINGS/NCCL_MAX_NRINGS.
// We permit combining max, then min, to only use the first channels, then duplicate them.
@@ -12,6 +12,7 @@ static ncclResult_t ncclTopoFollowPath(struct ncclTopoGraph* graph, struct ncclT
if (path->count == 0) return ncclSuccess;
*node = NULL;
width /= 2;
if (width > 0) {
if (path->type > graph->type) return ncclSuccess;
graph->type = std::max(graph->type, path->type);
@@ -204,7 +205,7 @@ ncclResult_t ncclTopoSearchRecGpu(struct ncclTopoSystem* system, struct ncclTopo
NCCLCHECK(ncclTopoCompareGraphs(graph, saveGraph, &copy));
if (copy) {
memcpy(saveGraph, graph, sizeof(struct ncclTopoGraph));
if (graph->nChannels*graph->speedIntra == maxSpeed) *time = -1;
if (graph->nChannels*graph->speedIntra/2 == maxSpeed) *time = -1;
}
if (graph->nChannels < MAXCHANNELS/2) {
NCCLCHECK(ncclTopoSearchRec(system, graph, saveGraph, maxSpeed, time));
@@ -13,6 +13,10 @@
#include "net.h"
#include <sys/stat.h>
#include <fcntl.h>
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
#include <hsa/hsa.h>
#include <hsa/hsa_ext_amd.h>
#endif
#define BUSID_SIZE (sizeof("0000:00:00.0"))
#define BUSID_REDUCED_SIZE (sizeof("0000:00"))
@@ -96,15 +100,16 @@ ncclResult_t ncclTopoCudaPath(int cudaDev, char** path) {
int interCpuWidth = 0;
int cpuPciWidth = 0;
int p2pPciWidth = 0;
static ncclResult_t getCpuWidths() {
// Check if already detected
if (interCpuWidth + cpuPciWidth) return ncclSuccess;
if (interCpuWidth + cpuPciWidth + p2pPciWidth) return ncclSuccess;
// Defaults
char cpu[256];
sprintf(cpu, "Generic");
cpuPciWidth = interCpuWidth = PCI_WIDTH;
cpuPciWidth = interCpuWidth = p2pPciWidth = PCI_WIDTH;
#ifdef __PPC__
sprintf(cpu, "ppc64");
@@ -124,6 +129,7 @@ static ncclResult_t getCpuWidths() {
asm volatile("cpuid" : "=b" (cpuid0.ebx), "=c" (cpuid0.ecx), "=d" (cpuid0.edx) : "a" (0));
if (strncmp(cpuid0.vendor, "GenuineIntel", 12) == 0) sprintf(cpu, "Intel");
else if (strncmp(cpuid0.vendor, "AuthenticAMD", 12) == 0) sprintf(cpu, "AMD");
if (strcmp(cpu, "Intel") == 0) {
union {
@@ -133,22 +139,47 @@ static ncclResult_t getCpuWidths() {
int familyId:4;
int processorType:2;
int resv0:2;
int extModelId:4;
int modelId:8;
int extModel:4;
int extFamily:8;
int resv1:4;
};
uint32_t val;
} cpuid1;
asm volatile("cpuid" : "=a" (cpuid1.val) : "a" (1));
if (cpuid1.familyId == 6 && cpuid1.modelId >= 0x55) { // Skylake
if (cpuid1.familyId == 6 && (cpuid1.model + cpuid1.extModel * 16) >= 0x55) { // Skylake
sprintf(cpu, "Intel/Skylake (or later)");
interCpuWidth = SKL_QPI_WIDTH;
cpuPciWidth = SKL_CPUPCI_WIDTH;
p2pPciWidth = SKL_PCI_WIDTH;
} else {
interCpuWidth = QPI_WIDTH;
}
}
else if (strcmp(cpu, "AMD") == 0) {
union {
struct {
uint32_t steppingId:4;
uint32_t model:4;
uint32_t family:4;
uint32_t resv0:4;
uint32_t extModel:4;
uint32_t extFamily:8;
uint32_t resv1:4;
};
uint32_t val;
} cpuid1;
asm volatile("cpuid" : "=a" (cpuid1.val) : "a" (1));
if ((cpuid1.family + cpuid1.extFamily) == 23 && (cpuid1.model + cpuid1.extModel * 16) >= 49) {
sprintf(cpu, "AMD/Rome (or later)");
interCpuWidth = ROME_QPI_WIDTH;
cpuPciWidth = ROME_CPUPCI_WIDTH;
p2pPciWidth = ROME_PCI_WIDTH;
} else {
interCpuWidth = QPI_WIDTH;
}
}
#endif
INFO(NCCL_GRAPH, "%s CPU (PCI %d, InterCpu %d)", cpu, cpuPciWidth, interCpuWidth);
INFO(NCCL_GRAPH, "%s CPU (CPU-PCI %d, PCI/P2P %d, InterCpu %d)", cpu, cpuPciWidth, p2pPciWidth, interCpuWidth);
return ncclSuccess;
}
@@ -163,7 +194,8 @@ static ncclResult_t ncclTopoGetCpuPciP2pWidth(int* width) {
return ncclSuccess;
}
static ncclResult_t ncclTopoGetPciWidth(int* width) {
*width = PCI_WIDTH;
NCCLCHECK(getCpuWidths());
*width = p2pPciWidth;
return ncclSuccess;
}
static ncclResult_t ncclTopoGetNetWidth(int* width) {
@@ -226,8 +258,9 @@ ncclResult_t ncclTopoConnectCpu(struct ncclTopoSystem* system, int numaId, struc
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
#define VEGA_XGMI_WIDTH 20
extern int busIdToCudaDev(int64_t busId);
ncclResult_t ncclTopoConnectXGMI(int num_gpus, struct ncclTopoSystem* system) {
ncclResult_t ncclTopoConnectXGMI(struct ncclComm* comm, struct ncclTopoSystem* system) {
struct ncclTopoNode* nvsNode = NULL;
int minNvlinks = 2, minWidth = VEGA_XGMI_WIDTH;
@@ -237,7 +270,9 @@ ncclResult_t ncclTopoConnectXGMI(int num_gpus, struct ncclTopoSystem* system) {
struct ncclTopoNode* gpu1 = system->nodes[GPU].nodes+g1;
struct ncclTopoNode* gpu2 = system->nodes[GPU].nodes+g2;
uint32_t link_type, hops;
if (hipExtGetLinkTypeAndHopCount(gpu1->rank, gpu2->rank, &link_type, &hops) == hipSuccess) {
int cudaDev1 = busIdToCudaDev(comm->peerInfo[gpu1->rank].busId);
int cudaDev2 = busIdToCudaDev(comm->peerInfo[gpu2->rank].busId);
if (hipExtGetLinkTypeAndHopCount(cudaDev1, cudaDev2, &link_type, &hops) == hipSuccess) {
if (link_type == HSA_AMD_LINK_INFO_TYPE_XGMI && hops == 1) {
NCCLCHECK(ncclTopoConnectNodes(gpu1, gpu2, LINK_NVL, minWidth));
}
@@ -613,7 +648,7 @@ ncclResult_t ncclTopoGetSystem(struct ncclComm* comm, struct ncclTopoSystem** sy
}
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
NCCLCHECK(ncclTopoConnectXGMI(g, s));
NCCLCHECK(ncclTopoConnectXGMI(comm, s));
#else
NCCLCHECK(ncclTopoConnectNVLink(nvmlDevs, s));
#endif
@@ -16,8 +16,13 @@
#define PCI_WIDTH 12 // PCI Gen3 x16
#define QPI_WIDTH 8
#define SKL_QPI_WIDTH 12
#define SKL_PCI_WIDTH 14
#define SKL_CPUPCI_WIDTH 10
#define P9_WIDTH 32
#define NET_WIDTH 12 // 100Gbit
#define ROME_QPI_WIDTH 12
#define ROME_PCI_WIDTH 22
#define ROME_CPUPCI_WIDTH 16
// Intel CPU convert GPU P2P traffic into 64B PCI TLPs, to GPU
// to GPU traffic consumed more PCI bandwidth.
@@ -58,7 +58,7 @@ static const char* ncclProtoStr[] = { "LL", "LL128", "Simple" };
// Latencies in us, Bandwidths in GB/s
// Tree { LL, LL128, Simple } , Ring { LL, LL128, Simple }
static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4, 4.4, 0 }, { 3.6, 3.6, 8.4 } };
static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 37.9, 37.9, 40.4 }, { 20.5, 20.5, 27.9 } };
// NVLink, PCI, Network
#define NCCL_HW_NVLINK 0
@@ -67,11 +67,11 @@ static const float baseLat [NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] = { { 4.4,
// Tree/Simple is the latency a 256kB chunk, which is ~ base lat + 256k/12GB/s (+ 256k/12GB/s for the network).
static const float hwLat [3][NCCL_NUM_ALGORITHMS][NCCL_NUM_PROTOCOLS] =
{ /* NVLINK */
{ /* Tree (LL/LL128/Simple)*/ { .5, 1.9, 28 }, /* Ring (LL/LL128/Simple)*/ { .4, 2.5, 5.7 } },
{ /* Tree (LL/LL128/Simple)*/ { 1.2, 1.2, 3.8 }, /* Ring (LL/LL128/Simple)*/ { 2.3, 2.3, 2.7 } },
/* PCI */
{ /* Tree (LL/LL128/Simple)*/ { 1.0, 1.9, 28 }, /* Ring (LL/LL128/Simple)*/ { 1.0, 2.5, 5.7 } },
{ /* Tree (LL/LL128/Simple)*/ { 2.2, 2.2, 5.7 }, /* Ring (LL/LL128/Simple)*/ { 1.3, 1.3, 1.9 } },
/* NET */
{ /* Tree (LL/LL128/Simple)*/ { 5.0, 7.5, 50 }, /* Ring (LL/LL128/Simple)*/ { .9, 2.5, 6.6 } }
{ /* Tree (LL/LL128/Simple)*/ { 9.8, 9.8, 19.5 }, /* Ring (LL/LL128/Simple)*/ { 2.0, 2.0, 4.5 } }
};
// LL128 max BW for the different collectives
@@ -102,13 +102,13 @@ ncclResult_t ncclSetThresholds(struct ncclComm* comm, int minCompCap, int maxCom
for (int p=0; p<NCCL_NUM_PROTOCOLS; p++) {
int speed = comm->nNodes <= 2 ? graphs[a]->speedIntra : graphs[a]->speedInter;
float busBw = graphs[a]->nChannels * speed * 1.0;
float busBw = graphs[a]->nChannels * speed * 0.6;
// Various model refinements
if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL) busBw *= 1.0/4.0;
if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL) busBw *= 1.0/5.0;
if (a == NCCL_ALGO_RING && p == NCCL_PROTO_LL128) busBw = std::min(busBw*120.0/128.0, ll128MaxBw[coll]);
if (a == NCCL_ALGO_TREE) busBw = std::min(busBw*.9, comm->nNodes > 1 ? 70.0 : 90.0);
if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL) busBw *= 1.0/3.0;
if (a == NCCL_ALGO_TREE) busBw = std::min(busBw*.27, comm->nNodes > 1 ? 70.0 : 90.0);
if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL) busBw *= 1.0/2.3;
if (a == NCCL_ALGO_TREE && p == NCCL_PROTO_LL128) busBw *= 7.0/9.0;
// Convert bus BW to algorithm BW
@@ -23,7 +23,7 @@
/* Declare all collective operations */
#define DECL_COLL5(coll, op, dtype) \
extern __device__ __attribute__((noinline)) void NCCL_COLL_NAME(coll, op, dtype)(struct CollectiveArgs* args); \
extern __global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclColl c); \
extern __global__ void NCCL_KERN_NAME(coll, op, dtype)(struct ncclDevComm* comm); \
#define DECL_COLL4(coll, op, dtype) \
DECL_COLL5(coll, op, dtype) \
@@ -134,8 +134,8 @@ struct ncclComm {
int* intraCudaDevs;
int* intraCGMode; // Whether we can use CUDA9 CGMD or not
int* intraCC; // Only to check all have the same ComputeCap and disable CGMode if not
struct ncclColl args;
struct ncclColl* argsptr;
struct ncclDevComm* args;
struct ncclDevComm** argsptr;
// Global proxy thread
pthread_t proxyThread;
@@ -207,7 +207,6 @@ struct ncclChannel {
int collFifoHead; // Only used by GPU
int collFifoTail; // Only used by CPU
uint32_t* abortCount;
uint32_t* sync;
};
int data[0x80];
@@ -255,6 +254,27 @@ struct ncclProf {
};
#endif
#ifdef ENABLE_COLLTRACE
typedef enum {
ncclCollTraceKernelLaunchType,
ncclCollTraceCollEndType,
ncclCollTraceAbortType
} ncclCollTraceDataType_t;
struct ncclCollTrace {
uint8_t type;
uint8_t bid;
int16_t funcIndex;
uint32_t data_0;
uint64_t timeStamp;
uint64_t opCount;
uint64_t data_1;
};
static_assert(sizeof(struct ncclCollTrace) == 8*sizeof(int), "ncclCollTrace must have a pow2 size");
#define COLLTRACE_NUM_ITEMS 1024
#endif
typedef enum {
ncclDevSuccess,
ncclDevAssertedMismatch,
@@ -276,6 +296,13 @@ struct ncclDevComm {
// Profiling counters
struct ncclProf* devProf;
#endif
#ifdef ENABLE_COLLTRACE
struct ncclCollTrace* collTrace;
uint32_t collTraceHead, *collTraceTail;
pthread_t collTraceThread;
bool collTraceExit;
#endif
};
#endif
@@ -131,6 +131,54 @@ void __attribute__((optimize("O0"))) commPoison(ncclComm_t comm) {
comm->rank = comm->cudaDev = comm->busId = comm->nRanks = -1;
}
#ifdef ENABLE_COLLTRACE
void *ncclCommThreadMain(void *arg) {
ncclComm_t comm = (ncclComm_t)arg;
do {
int tail = LOAD(comm->hostDevComm.collTraceTail)%COLLTRACE_NUM_ITEMS;
int head = comm->hostDevComm.collTraceHead;
int count;
if (head <= tail)
count = tail - head;
else
count = COLLTRACE_NUM_ITEMS + head - tail;
usleep(1000); //sleep 1ms
for (int i = 0; i < count; i++) {
char line[1024];
int offset = 0;
#define VEGA_GPU_RTC_FREQUENCY 2.5E7
sprintf(line, "## [%12.6f] [%02d:%02d] %06lx",
(double)(comm->hostDevComm.collTrace[head].timeStamp)/VEGA_GPU_RTC_FREQUENCY, comm->rank, comm->hostDevComm.collTrace[head].bid, comm->hostDevComm.collTrace[head].opCount);
offset = strlen(line);
switch (comm->hostDevComm.collTrace[head].type) {
case ncclCollTraceKernelLaunchType:
sprintf(line+offset, " KL hwid %8x funcIndex %d",
comm->hostDevComm.collTrace[head].data_0, comm->hostDevComm.collTrace[head].funcIndex);
break;
case ncclCollTraceCollEndType:
if (comm->hostDevComm.collTrace[head].funcIndex != -1)
sprintf(line+offset, " CE next funcIndex %d",
comm->hostDevComm.collTrace[head].funcIndex);
else
sprintf(line+offset, " KE");
break;
case ncclCollTraceAbortType:
sprintf(line+offset, " Abort");
break;
default:
sprintf(line+offset, " unknown collective trace data type");
break;
}
INFO(NCCL_COLL, "%s", line);
head ++;
head %= COLLTRACE_NUM_ITEMS;
}
comm->hostDevComm.collTraceHead = tail;
} while(!LOAD(&comm->hostDevComm.collTraceExit));
pthread_exit(NULL);
}
#endif
static ncclResult_t commFree(ncclComm_t comm) {
if (comm == NULL)
return ncclSuccess;
@@ -164,6 +212,13 @@ static ncclResult_t commFree(ncclComm_t comm) {
CUDACHECK(hipFree(comm->hostDevComm.devProf));
#endif
#ifdef ENABLE_COLLTRACE
STORE(&comm->hostDevComm.collTraceExit, 1);
if (comm->hostDevComm.collTraceThread) pthread_join(comm->hostDevComm.collTraceThread, NULL);
CUDACHECK(hipHostFree((void *)comm->hostDevComm.collTrace));
CUDACHECK(hipHostFree((void *)comm->hostDevComm.collTraceTail));
#endif
free(comm->peerInfo);
ncclTopoFree(comm->topo);
@@ -248,6 +303,17 @@ static ncclResult_t commAlloc(ncclComm_t* comret, int ndev, int rank) {
NCCLCHECK(ncclCudaCalloc(&comm->hostDevComm.devProf, 1));
#endif
#ifdef ENABLE_COLLTRACE
CUDACHECK(hipHostMalloc((void**) &comm->hostDevComm.collTraceTail, sizeof(uint32_t), hipHostMallocMapped));
CUDACHECK(hipHostMalloc((void**) &comm->hostDevComm.collTrace, sizeof(struct ncclCollTrace) * COLLTRACE_NUM_ITEMS, hipHostMallocMapped));
memset(comm->hostDevComm.collTrace, 0, sizeof(struct ncclCollTrace) * COLLTRACE_NUM_ITEMS);
comm->hostDevComm.collTraceExit = comm->hostDevComm.collTraceHead = *comm->hostDevComm.collTraceTail = 0;
if ((ncclDebugLevel >= NCCL_LOG_INFO) && (ncclDebugMask & NCCL_COLL))
pthread_create(&comm->hostDevComm.collTraceThread, NULL, ncclCommThreadMain, (void *)comm);
else
comm->hostDevComm.collTraceThread = 0;
#endif
*comret = comm;
return ncclSuccess;
}
@@ -64,11 +64,15 @@ static ncclResult_t netGetGdrSupport(struct ncclTopoSystem* topo, int64_t busId,
if (read) { // For reads (sends) only enable under certain conditions
int gdrReadParam = ncclParamNetGdrRead();
if (gdrReadParam == 0) return ncclSuccess;
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
return ncclSuccess;
#else
if (gdrReadParam < 0) {
int nvlink;
NCCLCHECK(ncclTopoHasNvlink(topo, busId, &nvlink));
if (!nvlink) return ncclSuccess;
}
#endif
}
// Check if we are close enough that it makes sense to enable GDR
@@ -234,7 +238,7 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
struct netSendResources* resources = (struct netSendResources*) (args->connector->transportResources);
if (args->state == ncclProxyOpReady) {
// Update opCount
resources->hostRecvMem->opCount = args->opCount;
STORE(&resources->hostRecvMem->opCount, args->opCount);
// Round to next multiple of sliceSteps
resources->step = ROUNDUP(resources->step, args->chunkSteps);
@@ -251,9 +255,9 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
volatile uint64_t* recvTail = &resources->hostRecvMem->tail;
if (args->protocol == NCCL_PROTO_LL128) {
int stepSize = NCCL_LL128_BUFF_SIZE/NCCL_STEPS;
if (args->tail < *recvTail) {
if (args->tail < LOAD(recvTail)) {
int buffSlot = args->tail%NCCL_STEPS;
if (sizesFifo[buffSlot] != -1) {
if (LOAD(sizesFifo+buffSlot) != -1) {
struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem;
char* localBuff = (char*)localMem->ll128Buff;
int ready = resources->useGdr;
@@ -261,18 +265,18 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
// When data is in sysmem, we need to wait until all flags are correct since the GPU only
// called threadfence()
uint64_t flag = args->tail + 1;
int nFifoLines = DIVUP(sizesFifo[buffSlot], sizeof(uint64_t)*NCCL_LL128_LINEELEMS);
int nFifoLines = DIVUP(LOAD(sizesFifo+buffSlot), sizeof(uint64_t)*NCCL_LL128_LINEELEMS);
volatile uint64_t* lines = (volatile uint64_t*)(localBuff+buffSlot*stepSize);
ready = 1;
for (int i=0; i<nFifoLines; i++) {
if (lines[i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS] != flag) { ready = 0; break; }
if (LOAD(lines+i*NCCL_LL128_LINEELEMS+NCCL_LL128_DATAELEMS) != flag) { ready = 0; break; }
}
}
if (ready) {
// Send through network
NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, sizesFifo[buffSlot], resources->ll128Mhandle, args->requests+buffSlot));
NCCLCHECK(ncclNetIsend(resources->netSendComm, localBuff+buffSlot*stepSize, LOAD(sizesFifo+buffSlot), resources->ll128Mhandle, args->requests+buffSlot));
if (args->requests[buffSlot] != NULL) {
sizesFifo[buffSlot] = -1;
STORE(sizesFifo+buffSlot, -1);
// Make sure size is reset to zero before we update the head.
__sync_synchronize();
args->tail += args->sliceSteps;
@@ -311,7 +315,7 @@ ncclResult_t netSendProxy(struct ncclProxyArgs* args) {
struct ncclRecvMem* localMem = resources->useGdr ? resources->devRecvMem : resources->hostRecvMem;
// Send through network
int buffSlot = args->tail%NCCL_STEPS;
if (sizesFifo[buffSlot] != -1) {
if (LOAD(sizesFifo+buffSlot) != -1) {
NCCLCHECK(ncclNetIsend(resources->netSendComm, localMem->buff+buffSlot*stepSize, LOAD(sizesFifo+buffSlot), resources->mhandle, args->requests+buffSlot));
if (args->requests[buffSlot] != NULL) {
STORE(sizesFifo+buffSlot, -1);
@@ -347,7 +351,7 @@ ncclResult_t netRecvProxy(struct ncclProxyArgs* args) {
struct netRecvResources* resources = (struct netRecvResources*) (args->connector->transportResources);
if (args->state == ncclProxyOpReady) {
// Update opCount
resources->hostSendMem->opCount = args->opCount;
STORE(&resources->hostSendMem->opCount, args->opCount);
// Round to next multiple of sliceSteps
resources->step = ROUNDUP(resources->step, args->chunkSteps);
@@ -51,7 +51,7 @@ NCCL_PARAM(P2pLevel, "P2P_LEVEL", -2);
NCCL_PARAM(P2pDisable, "P2P_DISABLE", -2);
/* Convert a PCI busId string into a local cudaDev device index (cf. CUDA_VISIBLE_DEVICES) */
static int busIdToCudaDev(int64_t busId) {
int busIdToCudaDev(int64_t busId) {
int ndev;
if (hipGetDeviceCount(&ndev) != hipSuccess)
return -1;
@@ -71,10 +71,14 @@ static int busIdToCudaDev(int64_t busId) {
ncclResult_t p2pCanConnect(int* ret, struct ncclTopoSystem* topo, struct ncclTopoGraph* graph, struct ncclPeerInfo* info1, struct ncclPeerInfo* info2) {
int cpuCount;
NCCLCHECK(ncclTopoCpuCount(topo, &cpuCount));
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
int p2pLevel = PATH_ARRAY_SIZE;
#else
// Do not use P2P across sockets by default (provided CUDA permits it).
// When we are on a single socket, don't even use P2P through the CPU as
// it should be able to sustain two flows to sysmem faster than PCI P2P.
int p2pLevel = cpuCount == 1 ? PATH_PHB : PATH_NODE;
#endif
if (ncclParamP2pDisable() == 1) p2pLevel = 0;
if (ncclParamP2pLevel() != -2) p2pLevel = ncclParamP2pLevel();
@@ -5,7 +5,7 @@
************************************************************************/
#include "test_AllReduceAbort.hpp"
#include "../include/core.h"
#include "../include/comm.h"
#include <omp.h>
#define NUM_ITER 8
@@ -41,31 +41,24 @@ namespace CorrectnessTests
hipStream_t stream;
HIPCHECK(hipStreamCreateWithFlags(&stream, hipStreamNonBlocking));
struct ncclChannel* channel = comm->channels;
struct ncclRing *ring = &channel->ring;
struct ncclConnector* send = &channel->peers[ring->next].send;
size_t op_offset = &(send->conn.opCountRem) - (uint64_t **)channel->peers;
size_t head_offset = &(send->conn.head) - (uint64_t **)channel->peers;
uint64_t **p_dev_opCount = (uint64_t **)(channel->devPeers) + op_offset;
uint64_t **p_dev_head = (uint64_t **)(channel->devPeers) + head_offset;
uint64_t **p_dev_opCount = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.opCountRem));
uint64_t **p_dev_head = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.head));
uint64_t *real_opCount, *fake_opCount, *fake_o;
uint64_t *real_head, *fake_head, *fake_h;
// get original opCount and head
HIPCHECK(hipMemcpyAsync(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipMemcpyAsync(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipStreamSynchronize(stream));
HIPCHECK(hipMemcpy(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault));
HIPCHECK(hipMemcpy(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault));
// allocate and install fakes
HIPCHECK(hipHostMalloc(&fake_opCount, sizeof(uint64_t*), hipHostMallocMapped));
HIPCHECK(hipMemcpyAsync(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyHostToDevice, stream));
HIPCHECK(hipMemcpy(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyDefault));
*fake_opCount = FAKE_OP_COUNT;
HIPCHECK(hipHostMalloc(&fake_head, sizeof(uint64_t*), hipHostMallocMapped));
HIPCHECK(hipMemcpyAsync(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyHostToDevice, stream));
HIPCHECK(hipMemcpy(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyDefault));
*fake_head = 0;
HIPCHECK(hipStreamSynchronize(stream));
// read back fakes to confirm
HIPCHECK(hipMemcpyAsync(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipMemcpyAsync(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipStreamSynchronize(stream));
HIPCHECK(hipMemcpy(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault));
HIPCHECK(hipMemcpy(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault));
//std::cerr << "[ ] replaced gpu " << gpu << " real_opCount = " << real_opCount << " to fake_opCount = " << fake_o << std::endl;
//std::cerr << "[ ] replaced gpu " << gpu << " real_head = " << real_head << " to fake_head = " << fake_h << std::endl;
@@ -5,7 +5,7 @@
************************************************************************/
#include "test_BroadcastAbort.hpp"
#include "../include/core.h"
#include "../include/comm.h"
#include <omp.h>
#define NUM_ITER 8
@@ -42,31 +42,24 @@ namespace CorrectnessTests
hipStream_t stream;
HIPCHECK(hipStreamCreateWithFlags(&stream, hipStreamNonBlocking));
struct ncclChannel* channel = comm->channels;
struct ncclRing *ring = &channel->ring;
struct ncclConnector* send = &channel->peers[ring->next].send;
size_t op_offset = &(send->conn.opCountRem) - (uint64_t **)channel->peers;
size_t head_offset = &(send->conn.head) - (uint64_t **)channel->peers;
uint64_t **p_dev_opCount = (uint64_t **)(channel->devPeers) + op_offset;
uint64_t **p_dev_head = (uint64_t **)(channel->devPeers) + head_offset;
uint64_t **p_dev_opCount = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.opCountRem));
uint64_t **p_dev_head = (uint64_t **)((uint8_t*)(channel->devPeers + channel->ring.next) + offsetof(struct ncclPeer, send.conn.head));
uint64_t *real_opCount, *fake_opCount, *fake_o;
uint64_t *real_head, *fake_head, *fake_h;
// get original opCount and head
HIPCHECK(hipMemcpyAsync(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipMemcpyAsync(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipStreamSynchronize(stream));
HIPCHECK(hipMemcpy(&real_opCount, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault));
HIPCHECK(hipMemcpy(&real_head, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault));
// allocate and install fakes
HIPCHECK(hipHostMalloc(&fake_opCount, sizeof(uint64_t*), hipHostMallocMapped));
HIPCHECK(hipMemcpyAsync(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyHostToDevice, stream));
HIPCHECK(hipMemcpy(p_dev_opCount, &fake_opCount, sizeof(uint64_t*), hipMemcpyDefault));
*fake_opCount = FAKE_OP_COUNT;
HIPCHECK(hipHostMalloc(&fake_head, sizeof(uint64_t*), hipHostMallocMapped));
HIPCHECK(hipMemcpyAsync(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyHostToDevice, stream));
HIPCHECK(hipMemcpy(p_dev_head, &fake_head, sizeof(uint64_t*), hipMemcpyDefault));
*fake_head = 0;
HIPCHECK(hipStreamSynchronize(stream));
// read back fakes to confirm
HIPCHECK(hipMemcpyAsync(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipMemcpyAsync(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDeviceToHost, stream));
HIPCHECK(hipStreamSynchronize(stream));
HIPCHECK(hipMemcpy(&fake_o, p_dev_opCount, sizeof(uint64_t*), hipMemcpyDefault));
HIPCHECK(hipMemcpy(&fake_h, p_dev_head, sizeof(uint64_t*), hipMemcpyDefault));
//std::cerr << "[ ] replaced gpu " << gpu << " real_opCount = " << real_opCount << " to fake_opCount = " << fake_o << std::endl;
//std::cerr << "[ ] replaced gpu " << gpu << " real_head = " << real_head << " to fake_head = " << fake_h << std::endl;
@@ -34,7 +34,7 @@ THE SOFTWARE.
#include "copy_kernel.h"
#define MAX_GPU 8
#define MAX_WORKGROUPS 16
#define MAX_WORKGROUPS 32
#define THREADS 256
#define COPY_UNROLL 4
@@ -56,6 +56,16 @@ THE SOFTWARE.
#define RTC_CLOCK_FREQ_MI100 2.5E07
#define RTC_CLOCK_FREQ_DEFAULT 2.7E07
__device__
inline __attribute((always_inline))
long long int __rtc64() {
#if __HIP__
return (long long int) __builtin_amdgcn_s_memrealtime();
#else
return (long long int) __clock_u64();
#endif
}
struct transfer_data_t {
float *dest0[MAX_WORKGROUPS]; //remote fine grain
float *src0[MAX_WORKGROUPS]; //local fine grain
@@ -116,7 +126,7 @@ __global__ void flag_sync_kernel(struct transfer_data_t* transfer_data, struct p
__syncthreads();
if (idx == 0) {
curr_time = clock64();
curr_time = __rtc64();
}
if (op == OP_COPY) Copy<COPY_UNROLL, THREADS, float>(transfer_data->dest0[bid], transfer_data->src0[bid], n);
@@ -129,7 +139,7 @@ __global__ void flag_sync_kernel(struct transfer_data_t* transfer_data, struct p
if (op == OP_READ) Copy<COPY_UNROLL, THREADS, float>(transfer_data->src0[bid],transfer_data->dest0[bid], n);
__syncthreads();
if (idx == 0) {
next_time = clock64();
next_time = __rtc64();
__atomic_fetch_add(&(profiling_data->write_cycles[bid]), next_time - curr_time, __ATOMIC_SEQ_CST);
__atomic_fetch_add(&(profiling_data->bytes_transferred[bid]), n * sizeof(float), __ATOMIC_SEQ_CST);
}