Add Feature - Add NPKit Support in RCCL (#564)

* apply npkit

* fix bug

* add npkit in readme
This commit is contained in:
Ziyue Yang
2022-06-21 05:30:19 +08:00
zatwierdzone przez GitHub
rodzic f274c865c1
commit 6e93fafdc3
14 zmienionych plików z 1236 dodań i 8 usunięć
+5
Wyświetl plik
@@ -188,6 +188,7 @@ set(CC_SOURCES
src/misc/nvmlwrap_stub.cc
src/misc/rocm_smi_wrap.cc
src/misc/profiler.cc
src/misc/npkit.cc
src/misc/shmutils.cc
src/misc/signals.cc # RCCL
src/misc/socket.cc
@@ -220,6 +221,10 @@ if(PROFILE)
add_definitions(-DENABLE_PROFILING)
endif()
if(NPKIT_FLAGS)
add_definitions(${NPKIT_FLAGS})
endif()
set(COLLTRACE 1 CACHE BOOL "Collective Trace Option")
if(COLLTRACE)
add_definitions(-DENABLE_COLLTRACE)
+12
Wyświetl plik
@@ -83,6 +83,18 @@ will run only AllReduce correctness tests with float32 datatype. See "Running a
There are also other performance and error-checking tests for RCCL. These are maintained separately at https://github.com/ROCmSoftwarePlatform/rccl-tests.
See the rccl-tests README for more information on how to build and run those tests.
## NPKit
RCCL integrates [NPKit](https://github.com/microsoft/npkit), a profiler framework that enables collecting fine-grained trace events in RCCL components, especially in giant collective GPU kernels.
Please check [NPKit sample workflow for RCCL](https://github.com/microsoft/NPKit/tree/main/rccl_samples) as a fully automated usage example. It also provides good templates for the following manual instructions.
To manually build RCCL with NPKit enabled, pass `-DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_...(other NPKit compile-time switches)"` with cmake command. All NPKit compile-time switches are declared in the RCCL code base as macros with prefix `ENABLE_NPKIT_`, and they control which information will be collected. Also note that currently NPKit only supports collecting non-overlapped events on GPU, and `-DNPKIT_FLAGS` should follow this rule.
To manually run RCCL with NPKit enabled, environment variable `NPKIT_DUMP_DIR` needs to be set as the NPKit event dump directory. Also note that currently NPKit only supports 1 GPU per process.
To manually analyze NPKit dump results, please leverage [npkit_trace_generator.py](https://github.com/microsoft/NPKit/blob/main/rccl_samples/npkit_trace_generator.py).
## Library and API Documentation
Please refer to the [Library documentation](https://rccl.readthedocs.io/) for current documentation.
+315
Wyświetl plik
@@ -10,6 +10,10 @@
#include "primitives.h"
//#include "clique/AllReduceCliqueKernel.h" // [RCCL] AllReduce Clique-based kernel support
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
namespace {
template<typename T, typename RedOp, typename Proto>
__device__ __attribute__((noinline)) void runRing(ncclWorkElem *args) {
@@ -29,6 +33,32 @@ namespace {
#endif
const ssize_t size = args->count;
#if defined(ENABLE_NPKIT)
int npKitCtxIdx = bid;
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (tid == 0) {
uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
int minChunkSize;
if (Proto::Id == NCCL_PROTO_LL)
minChunkSize = nthreads*(Proto::calcBytePerGrain()/sizeof(T));
@@ -42,6 +72,12 @@ namespace {
(tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg, args->connIndex << 16);
ACCUMULATE_PRIM_COUNTER(prim);
#if defined(ENABLE_NPKIT)
if (tid == 0) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t realChunkSize;
if (Proto::Id == NCCL_PROTO_SIMPLE) {
@@ -70,11 +106,36 @@ namespace {
chunk = modRanks(ringIx + nranks-1);
offset = calcOffset(chunk);
nelem = min(realChunkSize, size-offset);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_SEND_ENTRY, nelem*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
INIT_COUNTER;
prims.send(offset, nelem);
ACCUMULATE_COUNTER(send);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_SEND_EXIT, nelem*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
// k-2 steps: reduce and copy to next GPU
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_ENTRY)
if (tid == 0 && nranks > 2) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_ENTRY, nelem*(nranks-2)*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
for (int j=2; j<nranks; ++j) {
chunk = modRanks(ringIx + nranks-j);
offset = calcOffset(chunk);
@@ -84,15 +145,46 @@ namespace {
ACCUMULATE_COUNTER(recvReduceSend);
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_EXIT)
if (tid == 0 && nranks > 2) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_EXIT, nelem*(nranks-2)*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
// step k-1: reduce this buffer and data, which will produce the final
// result that we store in this data and push to the next GPU
chunk = ringIx + 0;
offset = calcOffset(chunk);
nelem = min(realChunkSize, size-offset);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY, nelem*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
INIT_COUNTER;
prims.directRecvReduceCopySend(offset, offset, offset, nelem, /*postOp=*/true);
ACCUMULATE_COUNTER(directRecvReduceCopySend);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_EXIT, nelem*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_ENTRY)
if (tid == 0 && nranks > 2) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_ENTRY, nelem*(nranks-2)*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
// k-2 steps: copy to next GPU
for (int j=1; j<nranks-1; ++j) {
chunk = modRanks(ringIx + nranks-j);
@@ -103,13 +195,37 @@ namespace {
ACCUMULATE_COUNTER(directRecvCopySend);
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_EXIT)
if (tid == 0 && nranks > 2) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_EXIT, nelem*(nranks-2)*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
// Make final copy from buffer to dest.
chunk = modRanks(ringIx + 1);
offset = calcOffset(chunk);
nelem = min(realChunkSize, size-offset);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_ENTRY, nelem*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
INIT_COUNTER;
prims.directRecv(offset, nelem);
ACCUMULATE_COUNTER(directRecv);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_EXIT, nelem*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
#ifdef ENABLE_PROFILING
if (tid == 0) {
@@ -117,6 +233,14 @@ namespace {
elem->elem[blockIdx.x].total_cycle += (__builtin_amdgcn_s_memrealtime() - clk);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_EXIT, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
template<typename T, typename RedOp, typename Proto>
@@ -135,12 +259,53 @@ namespace {
const ssize_t loopSize = int(nChannels*chunkSize);
const ssize_t size = args->count;
#if defined(ENABLE_NPKIT)
int npKitCtxIdx = bid;
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (tid == 0) {
uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
if (loopSize > size)
chunkSize = divUp((int)size, int(nChannels*minChunkSize))*int(minChunkSize);
{ // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local)
Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_DEV_ARITY, 1>, /*Direct=*/0, Proto, 0> prims
(tid, nthreads, tree->down, &tree->up, args->sendbuff, args->recvbuff, args->redOpArg);
#if defined(ENABLE_NPKIT)
if (tid == 0) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
if (tree->up == -1) {
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid*int(chunkSize);
@@ -162,11 +327,34 @@ namespace {
prims.recvReduceSend(offset, nelem);
}
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
{ // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local)
Primitives<T, RedOp, FanAsymmetric<1, NCCL_MAX_DEV_ARITY>, /*Direct=*/0, Proto, 0> prims
(tid, nthreads, &tree->up, tree->down, args->sendbuff, args->recvbuff, args->redOpArg);
#if defined(ENABLE_NPKIT)
if (tid == 0) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
if (tree->up == -1) {
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid*int(chunkSize);
@@ -188,7 +376,23 @@ namespace {
prims.directRecvCopySend(offset, offset, nelem);
}
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_EXIT, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
template<typename T, typename RedOp, typename Proto>
@@ -217,6 +421,40 @@ namespace {
nthreadsSplit = (nthreads*7/(10*WARP_SIZE))*WARP_SIZE;
}
#if defined(ENABLE_NPKIT)
bool isNpKitThread = false;
int npKitCtxIdx = 0;
if (threadIdx.x == 0) {
isNpKitThread = true;
npKitCtxIdx = bid * 2;
} else if (tree->up != -1 && threadIdx.x == nthreadsSplit) {
isNpKitThread = true;
npKitCtxIdx = bid * 2 + 1;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (isNpKitThread) {
uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
if (loopSize > size)
chunkSize = divUp((int)size, nChannels*int(minChunkSize))*int(minChunkSize);
@@ -224,11 +462,34 @@ namespace {
// Reduce and broadcast. Max number of recv is 3, max number of send is 3
Primitives<T, RedOp, FanSymmetric<NCCL_MAX_DEV_ARITY>, /*Direct=*/0, Proto, 0>
prims(tid, nthreads, tree->down, tree->down, args->sendbuff, args->recvbuff, args->redOpArg);
#if defined(ENABLE_NPKIT)
if (isNpKitThread) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid*int(chunkSize);
int nelem = min(chunkSize, size-offset);
prims.directRecvReduceCopySend(offset, offset, offset, nelem, /*doPost=*/true);
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
else if (tid < nthreadsSplit) {
/* Reduce up. Max number of recv is 3, max number of send is 1 (binary tree + local).
@@ -241,6 +502,21 @@ namespace {
*/
Primitives<T, RedOp, FanAsymmetric<NCCL_MAX_DEV_ARITY, 1>, /*Direct=*/0, Proto, 0>
prims(tid, nthreadsSplit, tree->down, &tree->up, args->sendbuff, args->recvbuff, args->redOpArg, 0*Proto::MaxGroupWidth);
#if defined(ENABLE_NPKIT)
if (isNpKitThread) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
if (tree->down[0] == -1) {
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid*int(chunkSize);
@@ -255,11 +531,34 @@ namespace {
prims.recvReduceSend(offset, nelem);
}
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
else {
// Broadcast down. Max number of recv is 1, max number of send is 3 (binary tree + local)
Primitives<T, RedOp, FanAsymmetric<1, NCCL_MAX_DEV_ARITY>, /*Direct=*/0, Proto, 0>
prims(tid-nthreadsSplit, nthreads-nthreadsSplit, &tree->up, tree->down, args->sendbuff, args->recvbuff, args->redOpArg, 1*Proto::MaxGroupWidth);
#if defined(ENABLE_NPKIT)
if (isNpKitThread) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
if (tree->down[0] == -1) {
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t offset = gridOffset + bid*int(chunkSize);
@@ -274,7 +573,23 @@ namespace {
prims.directRecvCopySend(offset, offset, nelem);
}
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_EXIT, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
}
+208 -8
Wyświetl plik
@@ -5,6 +5,10 @@
* See LICENSE.txt for license information
************************************************************************/
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
template<typename T, typename RedOp, typename Fan, int Direct, int P2p>
class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
public PrimitivesWithoutDirect<Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>> {
@@ -34,6 +38,22 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
union ncclLLFifoLine* recvBuff[MaxRecv];
union ncclLLFifoLine* sendBuff[MaxSend];
#if defined(ENABLE_NPKIT)
public:
int npKitCtxIdx = 0;
uint64_t npKitDataProcessEntryTime = 0;
uint64_t npKitDataProcessExitTime = 0;
uint64_t npKitDataProcessTotalTime = 0;
private:
#endif
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
uint64_t npKitWaitRecvDataProcessSize = 0;
uint64_t npKitWaitRecvEntryTime = 0;
uint64_t npKitWaitRecvExitTime = 0;
uint64_t npKitWaitRecvTotalTime = 0;
#endif
inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepLines; }
inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepLines; }
inline __device__ union ncclLLFifoLine* recvPtr(int i) { return recvBuff[i]+recvOffset(i); }
@@ -70,6 +90,12 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
}
inline __device__ void waitSend(int nbytes) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_WAIT_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_WAIT_SEND_ENTRY, nbytes, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
if (sendConnHeadPtr) {
int spins = 0;
while (sendConnHeadCache + NCCL_STEPS < sendConnHead + 1) {
@@ -83,6 +109,12 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
sendConnHead += 1;
}
barrier();
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_WAIT_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_WAIT_SEND_EXIT, nbytes, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
inline __device__ void incRecv(int i) {
@@ -107,21 +139,43 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
uint32_t flag = recvFlag(i);
uint32_t data1, flag1, data2, flag2;
int spins = 0;
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
int npkitWaitRecvSpins = 0;
if (tid == 0) {
npKitWaitRecvEntryTime = __builtin_amdgcn_s_memrealtime();
}
#endif
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
union ncclLLFifoLine i4;
do {
i4.v[0] = __builtin_nontemporal_load(src->v);
i4.v[1] = __builtin_nontemporal_load(src->v+1);
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
npkitWaitRecvSpins++;
#endif
if (checkAbort(spins, 0)) break;
} while ((i4.flag1 != flag) || (i4.flag2 != flag));
uint64_t val64 = (uint64_t)(i4.data1) + (((uint64_t)i4.data2) << 32);
#else
do {
asm("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" : "=r"(data1), "=r"(flag1), "=r"(data2), "=r"(flag2) : "l"(&src->i4));
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
npkitWaitRecvSpins++;
#endif
if (checkAbort(spins, 0)) break;
} while ((flag1 != flag) || (flag2 != flag));
uint64_t val64 = data1 + (((uint64_t)data2) << 32);
#endif
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
if (tid == 0) {
npKitWaitRecvExitTime = __builtin_amdgcn_s_memrealtime();
npKitWaitRecvTotalTime += (npKitWaitRecvExitTime - npKitWaitRecvEntryTime) * (npkitWaitRecvSpins - 1) / npkitWaitRecvSpins;
}
#endif
return val64;
}
@@ -144,16 +198,35 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
union ncclLLFifoLine* src = recvPtr(i) + offset;
uint32_t flag = recvFlag(i);
int spins = 0;
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
int npkitWaitRecvSpins = 0;
if (tid == 0) {
npKitWaitRecvEntryTime = __builtin_amdgcn_s_memrealtime();
}
#endif
do {
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
line[i].v[0] = __builtin_nontemporal_load(src->v);
line[i].v[1] = __builtin_nontemporal_load(src->v+1);
#else
asm("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" : "=r"(line[i].data1), "=r"(line[i].flag1), "=r"(line[i].data2), "=r"(line[i].flag2) : "l"(&src->i4));
#endif
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
npkitWaitRecvSpins++;
#endif
if (checkAbort(spins, 0)) break;
} while(line[i].flag1 != flag || line[i].flag2 != flag);
uint64_t val64 = line[i].data1 + (((uint64_t)line[i].data2) << 32);
#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME))
if (tid == 0) {
npKitWaitRecvExitTime = __builtin_amdgcn_s_memrealtime();
npKitWaitRecvTotalTime += (npKitWaitRecvExitTime - npKitWaitRecvEntryTime) * (npkitWaitRecvSpins - 1) / npkitWaitRecvSpins;
}
#endif
return val64;
}
@@ -296,6 +369,22 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
nelem = nelem < 0 ? 0 : nelem;
if (SEND) waitSend(divUp(nelem, EltPerLine)*sizeof(ncclLLFifoLine));
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT)
if (tid == 0) {
npKitWaitRecvTotalTime = 0;
npKitWaitRecvDataProcessSize = nelem*sizeof(T);
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY,
npKitWaitRecvDataProcessSize, 0, __builtin_amdgcn_s_memrealtime(), ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitWaitRecvTotalTime = 0;
npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime();
}
#endif
nelem -= tid*EltPerLine;
srcElts += tid*EltPerLine;
dstElts += tid*EltPerLine;
@@ -344,6 +433,21 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
offset += nthreads;
}
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime();
npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime - npKitWaitRecvTotalTime;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT,
npKitWaitRecvDataProcessSize, npKitWaitRecvTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
if (RECV) {
for (int i=0; i < MaxRecv; i++) incRecv(i);
postRecv();
@@ -430,28 +534,124 @@ class Primitives<T, RedOp, Fan, Direct, ProtoLL, P2p>:
}
__device__ void send(intptr_t inpIx, int eltN) {
return LLGenericOp<0, 1, Input, -1>(inpIx, -1, eltN, false);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<0, 1, Input, -1>(inpIx, -1, eltN, false);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void sendFromOutput(intptr_t outIx, int eltN) {
return LLGenericOp<0, 1, Output, -1>(outIx, -1, eltN, false);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_FROM_OUTPUT_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_FROM_OUTPUT_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<0, 1, Output, -1>(outIx, -1, eltN, false);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_FROM_OUTPUT_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_FROM_OUTPUT_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void recv(intptr_t outIx, int eltN, bool postOp=false) {
return LLGenericOp<1, 0, -1, Output>(-1, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<1, 0, -1, Output>(-1, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void recvReduceSend(intptr_t inpIx, int eltN) {
return LLGenericOp<1, 1, Input, -1>(inpIx, -1, eltN, false);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<1, 1, Input, -1>(inpIx, -1, eltN, false);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void recvReduceCopy(intptr_t inpIx, intptr_t outIx, int eltN, bool postOp=false) {
return LLGenericOp<1, 0, Input, Output>(inpIx, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<1, 0, Input, Output>(inpIx, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void copySend(intptr_t inpIx, intptr_t outIx, int eltN, bool postOp=false) {
return LLGenericOp<0, 1, Input, Output>(inpIx, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_COPY_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_COPY_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<0, 1, Input, Output>(inpIx, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_COPY_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_COPY_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void recvCopySend(intptr_t outIx, int eltN, bool postOp=false) {
return LLGenericOp<1, 1, -1, Output>(-1, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_COPY_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_COPY_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<1, 1, -1, Output>(-1, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_COPY_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_COPY_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void recvReduceCopySend(intptr_t inpIx, intptr_t outIx, int eltN, bool postOp=false) {
return LLGenericOp<1, 1, Input, Output>(inpIx, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_SEND_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
LLGenericOp<1, 1, Input, Output>(inpIx, outIx, eltN, postOp);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_SEND_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
__device__ void recvSend(int eltN) {
return LLGenericOp<1, 1, -1, -1>(-1, -1, eltN, false);
+100
Wyświetl plik
@@ -5,6 +5,10 @@
* See LICENSE.txt for license information
************************************************************************/
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
template<typename T, typename RedOp, typename Fan, int Direct,
int SlicePerChunk, int StepPerSlice, int Unroll, int P2p>
class Primitives<
@@ -49,6 +53,15 @@ class Primitives<
const uint64_t opCount;
uint32_t* next_hdp_reg;
#if defined(ENABLE_NPKIT)
public:
int npKitCtxIdx = 0;
uint64_t npKitDataProcessEntryTime = 0;
uint64_t npKitDataProcessExitTime = 0;
uint64_t npKitDataProcessTotalTime = 0;
private:
#endif
// Don't use barrier 0 as it's used by the final sync
inline __device__ void barrier() {
#if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__)
@@ -206,21 +219,93 @@ class Primitives<
if (DirectRecv && ncclShmem->groups[group].srcs[0] == ncclShmem->groups[group].dsts[0]) {
// We can only have one direct receive. Since srcs[0] == dstPtr+offset, skip one copy
if (Send) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime();
}
#endif
// (1-Send) is only there to avoid compilation errors in case MaxSend=0 (and Send=0).
ReduceOrCopyMulti<Unroll, RedOp, T, 1, 1, 1, (1-Send)+MaxSend, 0>
(tid, nworkers, nullptr, false,
1, (T const**)ncclShmem->groups[group].srcs,
fan.nsend(), (T**)ncclShmem->groups[group].dsts+1,
sliceSize);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime();
npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
} else if (DirectSend && !DirectRecv && SrcBuf != Input && ncclShmem->groups[group].dsts[Dst] == nullptr) {
// For broadcast in CollNet to do empty send
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime();
}
#endif
ReduceOrCopyMulti<Unroll, RedOp, T, 1, 1, 1, 1, 0>
(tid, nworkers, ncclShmem->redOpArgs, postOp,
Recv, (T const**)ncclShmem->groups[group].srcs,
Dst, (T**)ncclShmem->groups[group].dsts,
sliceSize);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime();
npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
} else {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime();
}
#endif
constexpr int PreOpN = SrcBuf != Input ? 0 :
DirectRecv*MaxRecv == NCCL_MAX_DIRECT_ARITY ? (1+NCCL_MAX_DIRECT_ARITY) : 1;
ReduceOrCopyMulti<Unroll, RedOp, T, Recv+Src, Recv*MaxRecv+Src, Send+Dst, Send*MaxSend+Dst, PreOpN>
@@ -228,6 +313,21 @@ class Primitives<
Recv*fan.nrecv()+Src, (T const**)ncclShmem->groups[group].srcs,
Send*fan.nsend()+Dst, (T**)ncclShmem->groups[group].dsts,
sliceSize);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)
if (tid == 0) {
npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime();
npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
barrier(); // This barrier has a counterpart in following loop
#if defined(__gfx1030__)
+120
Wyświetl plik
@@ -8,14 +8,68 @@
#include "devcomm.h"
#include "collectives.h"
#include "primitives.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
template<typename T, typename RedOp>
struct RunWork<ncclFuncSendRecv, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE> {
__device__ __forceinline__ void runSend(const int tid, const int nthreads, const int group, struct ncclWorkElemP2p* args) {
#if defined(ENABLE_NPKIT)
bool isNpKitThread = (tid == 0);
int npKitCtxIdx = blockIdx.x * NCCL_MAX_WORK_ELEMENTS_P2P;
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (isNpKitThread) {
uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
if (args->peer == ncclShmem->comm.rank) {
struct ncclWorkElemP2p* recvArgs = args-1;
if (args->buff != recvArgs->buff) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_LOCAL_COPY_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_LOCAL_COPY_ENTRY, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
ReduceOrCopyMulti<COLL_UNROLL, RedOp, T, 1, 1, 1, 1, 0>(tid, nthreads, nullptr, false, 1, (const T**)&args->buff, 1, (T**)&recvArgs->buff, args->count);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_LOCAL_COPY_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_LOCAL_COPY_EXIT, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
} else {
using Proto = ProtoSimple<1, 1>;
@@ -24,16 +78,59 @@ struct RunWork<ncclFuncSendRecv, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE> {
int const peer = args->peer;
Primitives<T, RedOp, FanAsymmetric<0, 1>, 0, Proto, 1> prims
(tid, nthreads, nullptr, &peer, args->buff, nullptr, /*redOpArg(ignored)=*/0, group);
#if defined(ENABLE_NPKIT)
if (isNpKitThread) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_SEND_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_SEND_ENTRY, count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
ssize_t offset = 0;
do {
int nelem = min(chunkSize, count-offset);
prims.directSend(offset, offset, nelem);
offset += nelem;
} while(offset < count);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_SEND_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_SEND_EXIT, count*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
}
__device__ __forceinline__ void runRecv(const int tid, const int nthreads, const int group, struct ncclWorkElemP2p* args) {
#if defined(ENABLE_NPKIT)
bool isNpKitThread = (tid == 0);
int npKitCtxIdx = blockIdx.x * NCCL_MAX_WORK_ELEMENTS_P2P + 1;
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (isNpKitThread) {
uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
if (args->peer != ncclShmem->comm.rank) {
using Proto = ProtoSimple<1, 1>;
ssize_t const count = args->count;
@@ -41,12 +138,35 @@ struct RunWork<ncclFuncSendRecv, T, RedOp, NCCL_ALGO_RING, NCCL_PROTO_SIMPLE> {
int const peer = args->peer;
Primitives<T, RedOp, FanAsymmetric<1, 0>, 0, Proto, 1> prims
(tid, nthreads, &peer, nullptr, nullptr, args->buff, /*redOpArg(ignored)=*/0, group);
#if defined(ENABLE_NPKIT)
if (isNpKitThread) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_RECV_ENTRY)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_RECV_ENTRY, count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
prims.npKitDataProcessTotalTime = 0;
}
#endif
ssize_t offset = 0;
do {
int nelem = min(chunkSize, count-offset);
prims.directRecv(offset, nelem);
offset += nelem;
} while(offset < count);
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_RECV_EXIT)
if (isNpKitThread) {
NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_RECV_EXIT, count*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(),
ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
}
}
+8
Wyświetl plik
@@ -11,6 +11,9 @@
#include "nccl.h"
#include "rccl_bfloat16.h"
#include "align.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit_struct.h"
#endif
#include <stdint.h>
// [RCCL] Support for clique-based kernels
//#include "clique/CliqueCommon.h"
@@ -419,6 +422,11 @@ struct ncclDevComm {
// Channels, device side
struct ncclChannel* channels;
#if defined(ENABLE_NPKIT)
NpKitEventCollectContext* npKitEventCollectContexts;
uint64_t* cpuTimestamp;
#endif
#ifdef ENABLE_PROFILING
// Profiling counters
struct ncclProf devProf;
+65
Wyświetl plik
@@ -0,0 +1,65 @@
#ifndef NPKIT_H_
#define NPKIT_H_
#include <string>
#include <thread>
#include <hip/hip_runtime.h>
#include "npkit/npkit_event.h"
#include "npkit/npkit_struct.h"
class NpKit {
public:
static const uint64_t kNumGpuEventBuffers = 512;
static const uint64_t kNumCpuEventBuffers = 32;
static ncclResult_t Init(int rank);
static ncclResult_t Dump(const std::string& dump_dir);
static ncclResult_t Shutdown();
static NpKitEventCollectContext* GetGpuEventCollectContexts();
static inline __device__ void CollectGpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp,
NpKitEventCollectContext* ctx) {
uint64_t event_buffer_head = ctx->event_buffer_head;
if (event_buffer_head < kMaxNumGpuEventsPerBuffer) {
NpKitEvent& event = ctx->event_buffer[event_buffer_head];
event.fields.type = type;
event.fields.size = size;
event.fields.rsvd = rsvd;
event.fields.timestamp = timestamp;
ctx->event_buffer_head++;
}
}
static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id);
static uint64_t* GetCpuTimestamp();
private:
static void CpuTimestampUpdateThread();
// 64K * 512 * 16B = 512MB per GPU
static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16;
// 64K * 2 (send/recv) * (512/32) = 2M, 2M * 32 * 16B = 1GB per CPU
static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21;
static NpKitEvent** gpu_event_buffers_;
static NpKitEvent** cpu_event_buffers_;
static NpKitEventCollectContext* gpu_collect_contexts_;
static NpKitEventCollectContext* cpu_collect_contexts_;
static uint64_t* cpu_timestamp_;
static uint64_t rank_;
static std::thread* cpu_timestamp_update_thread_;
static volatile bool cpu_timestamp_update_thread_should_stop_;
};
#endif
+98
Wyświetl plik
@@ -0,0 +1,98 @@
#ifndef NPKIT_EVENT_H_
#define NPKIT_EVENT_H_
#define NPKIT_EVENT_INVALID 0x0
#define NPKIT_EVENT_ALL_REDUCE_RING_ENTRY 0x1
#define NPKIT_EVENT_ALL_REDUCE_RING_EXIT 0x2
#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_ENTRY 0x3
#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_EXIT 0x4
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_ENTRY 0x5
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_EXIT 0x6
#define NPKIT_EVENT_COPY_SEND_ENTRY 0x7
#define NPKIT_EVENT_COPY_SEND_EXIT 0x8
#define NPKIT_EVENT_DIRECT_COPY_SEND_ENTRY 0x9
#define NPKIT_EVENT_DIRECT_COPY_SEND_EXIT 0xA
#define NPKIT_EVENT_DIRECT_RECV_ENTRY 0xB
#define NPKIT_EVENT_DIRECT_RECV_EXIT 0xC
#define NPKIT_EVENT_DIRECT_RECV_COPY_SEND_ENTRY 0xD
#define NPKIT_EVENT_DIRECT_RECV_COPY_SEND_EXIT 0xE
#define NPKIT_EVENT_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY 0xF
#define NPKIT_EVENT_DIRECT_RECV_REDUCE_COPY_SEND_EXIT 0x10
#define NPKIT_EVENT_DIRECT_SEND_ENTRY 0x11
#define NPKIT_EVENT_DIRECT_SEND_EXIT 0x12
#define NPKIT_EVENT_DIRECT_SEND_FROM_OUTPUT_ENTRY 0x13
#define NPKIT_EVENT_DIRECT_SEND_FROM_OUTPUT_EXIT 0x14
#define NPKIT_EVENT_RECV_ENTRY 0x15
#define NPKIT_EVENT_RECV_EXIT 0x16
#define NPKIT_EVENT_RECV_COPY_SEND_ENTRY 0x17
#define NPKIT_EVENT_RECV_COPY_SEND_EXIT 0x18
#define NPKIT_EVENT_RECV_REDUCE_COPY_ENTRY 0x19
#define NPKIT_EVENT_RECV_REDUCE_COPY_EXIT 0x1A
#define NPKIT_EVENT_RECV_REDUCE_COPY_SEND_ENTRY 0x1B
#define NPKIT_EVENT_RECV_REDUCE_COPY_SEND_EXIT 0x1C
#define NPKIT_EVENT_RECV_REDUCE_SEND_ENTRY 0x1D
#define NPKIT_EVENT_RECV_REDUCE_SEND_EXIT 0x1E
#define NPKIT_EVENT_SEND_ENTRY 0x1F
#define NPKIT_EVENT_SEND_EXIT 0x20
#define NPKIT_EVENT_SEND_FROM_OUTPUT_ENTRY 0x21
#define NPKIT_EVENT_SEND_FROM_OUTPUT_EXIT 0x22
#define NPKIT_EVENT_PRIM_SIMPLE_WAIT_PEER_ENTRY 0x23
#define NPKIT_EVENT_PRIM_SIMPLE_WAIT_PEER_EXIT 0x24
#define NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY 0x25
#define NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT 0x26
#define NPKIT_EVENT_PRIM_LL_WAIT_SEND_ENTRY 0x27
#define NPKIT_EVENT_PRIM_LL_WAIT_SEND_EXIT 0x28
#define NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY 0x29
#define NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT 0x2A
#define NPKIT_EVENT_PRIM_LL128_WAIT_SEND_ENTRY 0x2B
#define NPKIT_EVENT_PRIM_LL128_WAIT_SEND_EXIT 0x2C
#define NPKIT_EVENT_PRIM_LL128_DATA_PROCESS_ENTRY 0x2D
#define NPKIT_EVENT_PRIM_LL128_DATA_PROCESS_EXIT 0x2E
#define NPKIT_EVENT_NET_SEND_ENTRY 0x2F
#define NPKIT_EVENT_NET_SEND_EXIT 0x30
#define NPKIT_EVENT_NET_RECV_ENTRY 0x31
#define NPKIT_EVENT_NET_RECV_EXIT 0x32
#define NPKIT_EVENT_TIME_SYNC_GPU 0x33
#define NPKIT_EVENT_TIME_SYNC_CPU 0x34
#define NPKIT_EVENT_ALL_REDUCE_RING_SEND_ENTRY 0x35
#define NPKIT_EVENT_ALL_REDUCE_RING_SEND_EXIT 0x36
#define NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_ENTRY 0x37
#define NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_EXIT 0x38
#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY 0x39
#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_EXIT 0x3A
#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_ENTRY 0x3B
#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_EXIT 0x3C
#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_ENTRY 0x3D
#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_EXIT 0x3E
#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_ENTRY 0x3F
#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_EXIT 0x40
#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_ENTRY 0x41
#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_EXIT 0x42
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_ENTRY 0x43
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_EXIT 0x44
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_ENTRY 0x45
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_EXIT 0x46
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_ENTRY 0x47
#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_EXIT 0x48
#define NPKIT_EVENT_SEND_RECV_LOCAL_COPY_ENTRY 0x49
#define NPKIT_EVENT_SEND_RECV_LOCAL_COPY_EXIT 0x4A
#define NPKIT_EVENT_SEND_RECV_SEND_ENTRY 0x4B
#define NPKIT_EVENT_SEND_RECV_SEND_EXIT 0x4C
#define NPKIT_EVENT_SEND_RECV_RECV_ENTRY 0x4D
#define NPKIT_EVENT_SEND_RECV_RECV_EXIT 0x4E
#define NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME 0x4F
#endif
+25
Wyświetl plik
@@ -0,0 +1,25 @@
#ifndef NPKIT_STRUCT_H_
#define NPKIT_STRUCT_H_
#include <cstdint>
#pragma pack(push, 1)
union NpKitEvent {
uint64_t bits[2];
struct {
uint64_t type : 8;
uint64_t size : 32;
uint64_t rsvd : 24;
uint64_t timestamp;
} fields;
};
struct NpKitEventCollectContext {
NpKitEvent* event_buffer;
uint64_t event_buffer_head;
};
#pragma pack(pop)
#endif
+4
Wyświetl plik
@@ -58,6 +58,10 @@ struct ncclProxySubArgs {
uint64_t end;
void* requests[NCCL_STEPS];
void* profilingEvents[NCCL_STEPS];
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
int npKitSizesFifo[NCCL_STEPS];
#endif
};
struct ncclProxyArgs {
+21
Wyświetl plik
@@ -17,6 +17,9 @@
#include "enqueue.h"
#include "graph.h"
#include "argcheck.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
#include <fcntl.h>
#include <unistd.h>
#include <hip/hip_runtime.h>
@@ -501,6 +504,13 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
NCCLCHECK(ncclCudaMemcpy(comm->channels[r].ring.devUserRanks, comm->channels[r].ring.userRanks, comm->nRanks));
}
#if defined(ENABLE_NPKIT)
// Init NPKit
NCCLCHECK(NpKit::Init(comm->rank));
comm->hostDevComm.npKitEventCollectContexts = NpKit::GetGpuEventCollectContexts();
comm->hostDevComm.cpuTimestamp = NpKit::GetCpuTimestamp();
#endif
// Duplicate the dev comm on the device
NCCLCHECK(ncclCudaMemcpy(comm->devComm, &comm->hostDevComm, 1));
return ncclSuccess;
@@ -1399,6 +1409,17 @@ static ncclResult_t commDestroy(ncclComm_t comm) {
if (savedDevice != commDevice)
CUDACHECK(hipSetDevice(savedDevice));
#if defined(ENABLE_NPKIT)
// Dump NPKit events and shutdown
const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR");
if (npkitDumpDir == nullptr) {
WARN("NPKIT_DUMP_DIR is empty");
} else {
NCCLCHECK(NpKit::Dump(npkitDumpDir));
}
NCCLCHECK(NpKit::Shutdown());
#endif
return ncclSuccess;
}
+169
Wyświetl plik
@@ -0,0 +1,169 @@
#include <chrono>
#include <fstream>
#include <unistd.h>
#include "alloc.h"
#include "npkit/npkit.h"
uint64_t NpKit::rank_ = 0;
NpKitEvent** NpKit::gpu_event_buffers_ = nullptr;
NpKitEvent** NpKit::cpu_event_buffers_ = nullptr;
NpKitEventCollectContext* NpKit::gpu_collect_contexts_ = nullptr;
NpKitEventCollectContext* NpKit::cpu_collect_contexts_ = nullptr;
uint64_t* NpKit::cpu_timestamp_ = nullptr;
std::thread* NpKit::cpu_timestamp_update_thread_ = nullptr;
volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false;
void NpKit::CpuTimestampUpdateThread() {
uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count();
uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count();
uint64_t curr_steady_clock = 0;
volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_;
while (!cpu_timestamp_update_thread_should_stop_) {
curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count();
*volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock);
}
}
ncclResult_t NpKit::Init(int rank) {
uint64_t i = 0;
NpKitEventCollectContext ctx;
ctx.event_buffer_head = 0;
rank_ = rank;
// Init event data structures
NCCLCHECK(ncclCalloc(&gpu_event_buffers_, kNumGpuEventBuffers));
NCCLCHECK(ncclCudaCalloc(&gpu_collect_contexts_, kNumGpuEventBuffers));
for (i = 0; i < kNumGpuEventBuffers; i++) {
NCCLCHECK(ncclCudaCalloc(gpu_event_buffers_ + i, kMaxNumGpuEventsPerBuffer));
ctx.event_buffer = gpu_event_buffers_[i];
NCCLCHECK(ncclCudaMemcpy(gpu_collect_contexts_ + i, &ctx, 1));
}
NCCLCHECK(ncclCalloc(&cpu_event_buffers_, kNumCpuEventBuffers));
NCCLCHECK(ncclCalloc(&cpu_collect_contexts_, kNumCpuEventBuffers));
for (i = 0; i < kNumCpuEventBuffers; i++) {
NCCLCHECK(ncclCalloc(cpu_event_buffers_ + i, kMaxNumCpuEventsPerBuffer));
ctx.event_buffer = cpu_event_buffers_[i];
cpu_collect_contexts_[i] = ctx;
}
// Init timestamp
NCCLCHECK(ncclCudaHostCalloc(&cpu_timestamp_, 1));
cpu_timestamp_update_thread_should_stop_ = false;
cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread);
return ncclSuccess;
}
ncclResult_t NpKit::Dump(const std::string& dump_dir) {
uint64_t i = 0;
std::string dump_file_path;
// Dump CPU events
for (i = 0; i < kNumCpuEventBuffers; i++) {
dump_file_path = dump_dir;
dump_file_path += "/cpu_events_rank_";
dump_file_path += std::to_string(rank_);
dump_file_path += "_channel_";
dump_file_path += std::to_string(i);
auto cpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary);
cpu_trace_file.write(reinterpret_cast<char*>(cpu_event_buffers_[i]),
cpu_collect_contexts_[i].event_buffer_head * sizeof(NpKitEvent));
cpu_trace_file.close();
}
// Dump CPU clock info
dump_file_path = dump_dir;
dump_file_path += "/cpu_clock_period_num_rank_";
dump_file_path += std::to_string(rank_);
std::string clock_period_num_str = std::to_string(std::chrono::steady_clock::duration::period::num);
auto clock_period_num_file = std::fstream(dump_file_path, std::ios::out);
clock_period_num_file.write(clock_period_num_str.c_str(), clock_period_num_str.length());
clock_period_num_file.close();
dump_file_path = dump_dir;
dump_file_path += "/cpu_clock_period_den_rank_";
dump_file_path += std::to_string(rank_);
std::string clock_period_den_str = std::to_string(std::chrono::steady_clock::duration::period::den);
auto clock_period_den_file = std::fstream(dump_file_path, std::ios::out);
clock_period_den_file.write(clock_period_den_str.c_str(), clock_period_den_str.length());
clock_period_den_file.close();
// Dump GPU events, reuse CPU struct
for (i = 0; i < kNumGpuEventBuffers; i++) {
dump_file_path = dump_dir;
dump_file_path += "/gpu_events_rank_";
dump_file_path += std::to_string(rank_);
dump_file_path += "_buf_";
dump_file_path += std::to_string(i);
NCCLCHECK(ncclCudaMemcpy(cpu_event_buffers_[0], gpu_event_buffers_[i], kMaxNumGpuEventsPerBuffer));
NCCLCHECK(ncclCudaMemcpy(cpu_collect_contexts_, gpu_collect_contexts_ + i, 1));
auto gpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary);
gpu_trace_file.write(reinterpret_cast<char*>(cpu_event_buffers_[0]),
cpu_collect_contexts_[0].event_buffer_head * sizeof(NpKitEvent));
gpu_trace_file.close();
}
// Dump GPU clockRate
dump_file_path = dump_dir;
dump_file_path += "/gpu_clock_rate_rank_";
dump_file_path += std::to_string(rank_);
constexpr int vega_gpu_rtc_freq_in_khz = 25000;
std::string clock_rate_str = std::to_string(vega_gpu_rtc_freq_in_khz);
auto gpu_clock_rate_file = std::fstream(dump_file_path, std::ios::out);
gpu_clock_rate_file.write(clock_rate_str.c_str(), clock_rate_str.length());
gpu_clock_rate_file.close();
return ncclSuccess;
}
ncclResult_t NpKit::Shutdown() {
uint64_t i = 0;
// Stop CPU timestamp updating thread
cpu_timestamp_update_thread_should_stop_ = true;
cpu_timestamp_update_thread_->join();
// Free CPU event data structures
for (i = 0; i < kNumCpuEventBuffers; i++) {
free(cpu_event_buffers_[i]);
}
free(cpu_event_buffers_);
free(cpu_collect_contexts_);
// Free GPU event data structures
for (i = 0; i < kNumGpuEventBuffers; i++) {
CUDACHECK(hipFree(gpu_event_buffers_[i]));
}
free(gpu_event_buffers_);
CUDACHECK(hipFree(gpu_collect_contexts_));
// Free timestamp
NCCLCHECK(ncclCudaHostFree(cpu_timestamp_));
return ncclSuccess;
}
NpKitEventCollectContext* NpKit::GetGpuEventCollectContexts() {
return gpu_collect_contexts_;
}
void NpKit::CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id) {
uint64_t event_buffer_head = cpu_collect_contexts_[channel_id].event_buffer_head;
if (event_buffer_head < kMaxNumCpuEventsPerBuffer) {
NpKitEvent& event = cpu_collect_contexts_[channel_id].event_buffer[event_buffer_head];
event.fields.type = type;
event.fields.size = size;
event.fields.rsvd = rsvd;
event.fields.timestamp = timestamp;
cpu_collect_contexts_[channel_id].event_buffer_head++;
}
}
uint64_t* NpKit::GetCpuTimestamp() {
return cpu_timestamp_;
}
+86
Wyświetl plik
@@ -14,6 +14,9 @@
#include "gdrwrap.h"
#include "shm.h"
#include "profiler.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
static_assert(sizeof(ncclNetHandle_t) <= CONNECT_SIZE, "NET Connect info is too large");
@@ -777,7 +780,16 @@ static ncclResult_t recvProxyFree(struct ncclProxyConnection* connection, struct
static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps");
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
static int g_npkit_net_poll_cnt = 0;
#endif
static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
#endif
if (args->state == ncclProxyOpReady) {
for (int s=0; s<args->nsubs; s++) {
struct ncclProxySubArgs* sub = args->subs+s;
@@ -831,6 +843,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
if (sizesFifo[buffSlot] != -1 && ((*recvTail > (sub->base+sub->transmitted)) || p == NCCL_PROTO_LL)) {
// We have something to receive, let's check if it's completely ready.
int size = sizesFifo[buffSlot];
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
sub->npKitSizesFifo[buffSlot] = size;
#endif
char* buff = resources->shared ? localBuff+resources->recvMem->offsFifo[buffSlot] : localBuff+buffSlot*stepSize;
int ready = 1;
if (p == NCCL_PROTO_LL128) {
@@ -865,6 +882,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
// Data is ready, try to send.
NCCLCHECK(ncclNetIsend(resources->netSendComm, buff, size, resources->rank, mhandle, sub->requests+buffSlot));
if (sub->requests[buffSlot] != NULL) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_SEND_ENTRY,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
size,
#endif
uint64_t(sub->requests+buffSlot)/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
TRACE(NCCL_NET, "sendProxy [%ld/%d] Isend posted, req %p", sub->transmitted, buffSlot, sub->requests[buffSlot]);
sizesFifo[buffSlot] = -1;
// Make sure size is reset to zero before we update the head.
@@ -883,6 +916,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
int buffSlot = (sub->base+sub->done)%NCCL_STEPS;
NCCLCHECK(ncclNetTest(sub->requests[buffSlot], &done, NULL));
if (done) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_SEND_EXIT,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
sub->npKitSizesFifo[buffSlot],
#endif
uint64_t(sub->requests+buffSlot)/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
TRACE(NCCL_NET, "sendProxy [%ld/%d] request %p done", sub->done, buffSlot, sub->requests[buffSlot]);
sub->done += args->sliceSteps;
for (uint64_t step=sub->done-args->sliceSteps; step<sub->done; step++) ncclProfilingRecord(args, s, step, ncclProxyProfileEnd);
@@ -908,6 +957,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
}
static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
#endif
if (args->state == ncclProxyOpReady) {
// Initialize subs and group them by same recvComm.
void* recvComm;
@@ -989,6 +1043,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg
if (*requestPtr) {
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup+i;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_RECV_ENTRY,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
sizes[i],
#endif
uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
sub->posted += args->sliceSteps;
for (uint64_t step=sub->posted-args->sliceSteps; step<sub->posted; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvWait);
}
@@ -1014,6 +1084,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg
for (int i=0; i<NCCL_PROXY_MAX_SUBS; i++) totalSize += sizes[i];
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup + i;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_RECV_EXIT,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
sizes[i],
#endif
uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
sub->received += args->sliceSteps;
for (uint64_t step=sub->received-args->sliceSteps; step<sub->received; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvFlushWait);
if (step < sub->nsteps) {