diff --git a/CMakeLists.txt b/CMakeLists.txt index 5027018786..1aa2a7c668 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -188,6 +188,7 @@ set(CC_SOURCES src/misc/nvmlwrap_stub.cc src/misc/rocm_smi_wrap.cc src/misc/profiler.cc + src/misc/npkit.cc src/misc/shmutils.cc src/misc/signals.cc # RCCL src/misc/socket.cc @@ -220,6 +221,10 @@ if(PROFILE) add_definitions(-DENABLE_PROFILING) endif() +if(NPKIT_FLAGS) + add_definitions(${NPKIT_FLAGS}) +endif() + set(COLLTRACE 1 CACHE BOOL "Collective Trace Option") if(COLLTRACE) add_definitions(-DENABLE_COLLTRACE) diff --git a/README.md b/README.md index 6c3aa69d63..e0026476a9 100644 --- a/README.md +++ b/README.md @@ -83,6 +83,18 @@ will run only AllReduce correctness tests with float32 datatype. See "Running a There are also other performance and error-checking tests for RCCL. These are maintained separately at https://github.com/ROCmSoftwarePlatform/rccl-tests. See the rccl-tests README for more information on how to build and run those tests. +## NPKit + +RCCL integrates [NPKit](https://github.com/microsoft/npkit), a profiler framework that enables collecting fine-grained trace events in RCCL components, especially in giant collective GPU kernels. + +Please check [NPKit sample workflow for RCCL](https://github.com/microsoft/NPKit/tree/main/rccl_samples) as a fully automated usage example. It also provides good templates for the following manual instructions. + +To manually build RCCL with NPKit enabled, pass `-DNPKIT_FLAGS="-DENABLE_NPKIT -DENABLE_NPKIT_...(other NPKit compile-time switches)"` with cmake command. All NPKit compile-time switches are declared in the RCCL code base as macros with prefix `ENABLE_NPKIT_`, and they control which information will be collected. Also note that currently NPKit only supports collecting non-overlapped events on GPU, and `-DNPKIT_FLAGS` should follow this rule. + +To manually run RCCL with NPKit enabled, environment variable `NPKIT_DUMP_DIR` needs to be set as the NPKit event dump directory. Also note that currently NPKit only supports 1 GPU per process. + +To manually analyze NPKit dump results, please leverage [npkit_trace_generator.py](https://github.com/microsoft/NPKit/blob/main/rccl_samples/npkit_trace_generator.py). + ## Library and API Documentation Please refer to the [Library documentation](https://rccl.readthedocs.io/) for current documentation. diff --git a/src/collectives/device/all_reduce.h b/src/collectives/device/all_reduce.h index 0529653248..b041b6bf1b 100644 --- a/src/collectives/device/all_reduce.h +++ b/src/collectives/device/all_reduce.h @@ -10,6 +10,10 @@ #include "primitives.h" //#include "clique/AllReduceCliqueKernel.h" // [RCCL] AllReduce Clique-based kernel support +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif + namespace { template __device__ __attribute__((noinline)) void runRing(ncclWorkElem *args) { @@ -29,6 +33,32 @@ namespace { #endif const ssize_t size = args->count; +#if defined(ENABLE_NPKIT) + int npKitCtxIdx = bid; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (tid == 0) { + uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + int minChunkSize; if (Proto::Id == NCCL_PROTO_LL) minChunkSize = nthreads*(Proto::calcBytePerGrain()/sizeof(T)); @@ -42,6 +72,12 @@ namespace { (tid, nthreads, &ring->prev, &ring->next, args->sendbuff, args->recvbuff, args->redOpArg, args->connIndex << 16); ACCUMULATE_PRIM_COUNTER(prim); +#if defined(ENABLE_NPKIT) + if (tid == 0) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t realChunkSize; if (Proto::Id == NCCL_PROTO_SIMPLE) { @@ -70,11 +106,36 @@ namespace { chunk = modRanks(ringIx + nranks-1); offset = calcOffset(chunk); nelem = min(realChunkSize, size-offset); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_SEND_ENTRY, nelem*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + INIT_COUNTER; prims.send(offset, nelem); ACCUMULATE_COUNTER(send); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_SEND_EXIT, nelem*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + // k-2 steps: reduce and copy to next GPU + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_ENTRY) + if (tid == 0 && nranks > 2) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_ENTRY, nelem*(nranks-2)*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + for (int j=2; j 2) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_EXIT, nelem*(nranks-2)*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + // step k-1: reduce this buffer and data, which will produce the final // result that we store in this data and push to the next GPU chunk = ringIx + 0; offset = calcOffset(chunk); nelem = min(realChunkSize, size-offset); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY, nelem*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + INIT_COUNTER; prims.directRecvReduceCopySend(offset, offset, offset, nelem, /*postOp=*/true); ACCUMULATE_COUNTER(directRecvReduceCopySend); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_EXIT, nelem*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_ENTRY) + if (tid == 0 && nranks > 2) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_ENTRY, nelem*(nranks-2)*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + // k-2 steps: copy to next GPU for (int j=1; j 2) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_EXIT, nelem*(nranks-2)*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + // Make final copy from buffer to dest. chunk = modRanks(ringIx + 1); offset = calcOffset(chunk); nelem = min(realChunkSize, size-offset); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_ENTRY, nelem*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + INIT_COUNTER; prims.directRecv(offset, nelem); ACCUMULATE_COUNTER(directRecv); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_EXIT, nelem*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } #ifdef ENABLE_PROFILING if (tid == 0) { @@ -117,6 +233,14 @@ namespace { elem->elem[blockIdx.x].total_cycle += (__builtin_amdgcn_s_memrealtime() - clk); } #endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_RING_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_RING_EXIT, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } template @@ -135,12 +259,53 @@ namespace { const ssize_t loopSize = int(nChannels*chunkSize); const ssize_t size = args->count; +#if defined(ENABLE_NPKIT) + int npKitCtxIdx = bid; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (tid == 0) { + uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + if (loopSize > size) chunkSize = divUp((int)size, int(nChannels*minChunkSize))*int(minChunkSize); { // Reduce : max number of recv is 3, max number of send is 1 (binary tree + local) Primitives, /*Direct=*/0, Proto, 0> prims (tid, nthreads, tree->down, &tree->up, args->sendbuff, args->recvbuff, args->redOpArg); + +#if defined(ENABLE_NPKIT) + if (tid == 0) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + if (tree->up == -1) { for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t offset = gridOffset + bid*int(chunkSize); @@ -162,11 +327,34 @@ namespace { prims.recvReduceSend(offset, nelem); } } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } { // Broadcast : max number of recv is 1, max number of send is 3 (binary tree + local) Primitives, /*Direct=*/0, Proto, 0> prims (tid, nthreads, &tree->up, tree->down, args->sendbuff, args->recvbuff, args->redOpArg); + +#if defined(ENABLE_NPKIT) + if (tid == 0) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + if (tree->up == -1) { for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t offset = gridOffset + bid*int(chunkSize); @@ -188,7 +376,23 @@ namespace { prims.directRecvCopySend(offset, offset, nelem); } } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_EXIT, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } template @@ -217,6 +421,40 @@ namespace { nthreadsSplit = (nthreads*7/(10*WARP_SIZE))*WARP_SIZE; } +#if defined(ENABLE_NPKIT) + bool isNpKitThread = false; + int npKitCtxIdx = 0; + if (threadIdx.x == 0) { + isNpKitThread = true; + npKitCtxIdx = bid * 2; + } else if (tree->up != -1 && threadIdx.x == nthreadsSplit) { + isNpKitThread = true; + npKitCtxIdx = bid * 2 + 1; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (isNpKitThread) { + uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + if (loopSize > size) chunkSize = divUp((int)size, nChannels*int(minChunkSize))*int(minChunkSize); @@ -224,11 +462,34 @@ namespace { // Reduce and broadcast. Max number of recv is 3, max number of send is 3 Primitives, /*Direct=*/0, Proto, 0> prims(tid, nthreads, tree->down, tree->down, args->sendbuff, args->recvbuff, args->redOpArg); + +#if defined(ENABLE_NPKIT) + if (isNpKitThread) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t offset = gridOffset + bid*int(chunkSize); int nelem = min(chunkSize, size-offset); prims.directRecvReduceCopySend(offset, offset, offset, nelem, /*doPost=*/true); } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } else if (tid < nthreadsSplit) { /* Reduce up. Max number of recv is 3, max number of send is 1 (binary tree + local). @@ -241,6 +502,21 @@ namespace { */ Primitives, /*Direct=*/0, Proto, 0> prims(tid, nthreadsSplit, tree->down, &tree->up, args->sendbuff, args->recvbuff, args->redOpArg, 0*Proto::MaxGroupWidth); + +#if defined(ENABLE_NPKIT) + if (isNpKitThread) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + if (tree->down[0] == -1) { for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t offset = gridOffset + bid*int(chunkSize); @@ -255,11 +531,34 @@ namespace { prims.recvReduceSend(offset, nelem); } } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } else { // Broadcast down. Max number of recv is 1, max number of send is 3 (binary tree + local) Primitives, /*Direct=*/0, Proto, 0> prims(tid-nthreadsSplit, nthreads-nthreadsSplit, &tree->up, tree->down, args->sendbuff, args->recvbuff, args->redOpArg, 1*Proto::MaxGroupWidth); + +#if defined(ENABLE_NPKIT) + if (isNpKitThread) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_ENTRY, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + if (tree->down[0] == -1) { for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t offset = gridOffset + bid*int(chunkSize); @@ -274,7 +573,23 @@ namespace { prims.directRecvCopySend(offset, offset, nelem); } } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_EXIT, size*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_EXIT, size*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } } diff --git a/src/collectives/device/prims_ll.h b/src/collectives/device/prims_ll.h index ef916628bb..bdc711058f 100644 --- a/src/collectives/device/prims_ll.h +++ b/src/collectives/device/prims_ll.h @@ -5,6 +5,10 @@ * See LICENSE.txt for license information ************************************************************************/ +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif + template class Primitives: public PrimitivesWithoutDirect> { @@ -34,6 +38,22 @@ class Primitives: union ncclLLFifoLine* recvBuff[MaxRecv]; union ncclLLFifoLine* sendBuff[MaxSend]; +#if defined(ENABLE_NPKIT) +public: + int npKitCtxIdx = 0; + uint64_t npKitDataProcessEntryTime = 0; + uint64_t npKitDataProcessExitTime = 0; + uint64_t npKitDataProcessTotalTime = 0; +private: +#endif + +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + uint64_t npKitWaitRecvDataProcessSize = 0; + uint64_t npKitWaitRecvEntryTime = 0; + uint64_t npKitWaitRecvExitTime = 0; + uint64_t npKitWaitRecvTotalTime = 0; +#endif + inline __device__ int recvOffset(int i) { return (recvStep[i]%NCCL_STEPS)*stepLines; } inline __device__ int sendOffset(int i) { return (sendStep[i]%NCCL_STEPS)*stepLines; } inline __device__ union ncclLLFifoLine* recvPtr(int i) { return recvBuff[i]+recvOffset(i); } @@ -70,6 +90,12 @@ class Primitives: } inline __device__ void waitSend(int nbytes) { +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_WAIT_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_WAIT_SEND_ENTRY, nbytes, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif if (sendConnHeadPtr) { int spins = 0; while (sendConnHeadCache + NCCL_STEPS < sendConnHead + 1) { @@ -83,6 +109,12 @@ class Primitives: sendConnHead += 1; } barrier(); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_WAIT_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_WAIT_SEND_EXIT, nbytes, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } inline __device__ void incRecv(int i) { @@ -107,21 +139,43 @@ class Primitives: uint32_t flag = recvFlag(i); uint32_t data1, flag1, data2, flag2; int spins = 0; + +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + int npkitWaitRecvSpins = 0; + if (tid == 0) { + npKitWaitRecvEntryTime = __builtin_amdgcn_s_memrealtime(); + } +#endif + #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) union ncclLLFifoLine i4; do { i4.v[0] = __builtin_nontemporal_load(src->v); i4.v[1] = __builtin_nontemporal_load(src->v+1); +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + npkitWaitRecvSpins++; +#endif if (checkAbort(spins, 0)) break; } while ((i4.flag1 != flag) || (i4.flag2 != flag)); uint64_t val64 = (uint64_t)(i4.data1) + (((uint64_t)i4.data2) << 32); #else do { asm("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" : "=r"(data1), "=r"(flag1), "=r"(data2), "=r"(flag2) : "l"(&src->i4)); +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + npkitWaitRecvSpins++; +#endif if (checkAbort(spins, 0)) break; } while ((flag1 != flag) || (flag2 != flag)); uint64_t val64 = data1 + (((uint64_t)data2) << 32); #endif + +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + if (tid == 0) { + npKitWaitRecvExitTime = __builtin_amdgcn_s_memrealtime(); + npKitWaitRecvTotalTime += (npKitWaitRecvExitTime - npKitWaitRecvEntryTime) * (npkitWaitRecvSpins - 1) / npkitWaitRecvSpins; + } +#endif + return val64; } @@ -144,16 +198,35 @@ class Primitives: union ncclLLFifoLine* src = recvPtr(i) + offset; uint32_t flag = recvFlag(i); int spins = 0; + +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + int npkitWaitRecvSpins = 0; + if (tid == 0) { + npKitWaitRecvEntryTime = __builtin_amdgcn_s_memrealtime(); + } +#endif + do { #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) line[i].v[0] = __builtin_nontemporal_load(src->v); line[i].v[1] = __builtin_nontemporal_load(src->v+1); #else asm("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" : "=r"(line[i].data1), "=r"(line[i].flag1), "=r"(line[i].data2), "=r"(line[i].flag2) : "l"(&src->i4)); +#endif +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + npkitWaitRecvSpins++; #endif if (checkAbort(spins, 0)) break; } while(line[i].flag1 != flag || line[i].flag2 != flag); uint64_t val64 = line[i].data1 + (((uint64_t)line[i].data2) << 32); + +#if defined(ENABLE_NPKIT) && (defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) || defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME)) + if (tid == 0) { + npKitWaitRecvExitTime = __builtin_amdgcn_s_memrealtime(); + npKitWaitRecvTotalTime += (npKitWaitRecvExitTime - npKitWaitRecvEntryTime) * (npkitWaitRecvSpins - 1) / npkitWaitRecvSpins; + } +#endif + return val64; } @@ -296,6 +369,22 @@ class Primitives: nelem = nelem < 0 ? 0 : nelem; if (SEND) waitSend(divUp(nelem, EltPerLine)*sizeof(ncclLLFifoLine)); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) + if (tid == 0) { + npKitWaitRecvTotalTime = 0; + npKitWaitRecvDataProcessSize = nelem*sizeof(T); + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY, + npKitWaitRecvDataProcessSize, 0, __builtin_amdgcn_s_memrealtime(), ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitWaitRecvTotalTime = 0; + npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime(); + } +#endif + nelem -= tid*EltPerLine; srcElts += tid*EltPerLine; dstElts += tid*EltPerLine; @@ -344,6 +433,21 @@ class Primitives: offset += nthreads; } +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime(); + npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime - npKitWaitRecvTotalTime; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY) && defined(ENABLE_NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT, + npKitWaitRecvDataProcessSize, npKitWaitRecvTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + if (RECV) { for (int i=0; i < MaxRecv; i++) incRecv(i); postRecv(); @@ -430,28 +534,124 @@ class Primitives: } __device__ void send(intptr_t inpIx, int eltN) { - return LLGenericOp<0, 1, Input, -1>(inpIx, -1, eltN, false); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<0, 1, Input, -1>(inpIx, -1, eltN, false); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void sendFromOutput(intptr_t outIx, int eltN) { - return LLGenericOp<0, 1, Output, -1>(outIx, -1, eltN, false); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_FROM_OUTPUT_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_FROM_OUTPUT_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<0, 1, Output, -1>(outIx, -1, eltN, false); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_FROM_OUTPUT_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_FROM_OUTPUT_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void recv(intptr_t outIx, int eltN, bool postOp=false) { - return LLGenericOp<1, 0, -1, Output>(-1, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<1, 0, -1, Output>(-1, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void recvReduceSend(intptr_t inpIx, int eltN) { - return LLGenericOp<1, 1, Input, -1>(inpIx, -1, eltN, false); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<1, 1, Input, -1>(inpIx, -1, eltN, false); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void recvReduceCopy(intptr_t inpIx, intptr_t outIx, int eltN, bool postOp=false) { - return LLGenericOp<1, 0, Input, Output>(inpIx, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<1, 0, Input, Output>(inpIx, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void copySend(intptr_t inpIx, intptr_t outIx, int eltN, bool postOp=false) { - return LLGenericOp<0, 1, Input, Output>(inpIx, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_COPY_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_COPY_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<0, 1, Input, Output>(inpIx, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_COPY_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_COPY_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void recvCopySend(intptr_t outIx, int eltN, bool postOp=false) { - return LLGenericOp<1, 1, -1, Output>(-1, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_COPY_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_COPY_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<1, 1, -1, Output>(-1, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_COPY_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_COPY_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void recvReduceCopySend(intptr_t inpIx, intptr_t outIx, int eltN, bool postOp=false) { - return LLGenericOp<1, 1, Input, Output>(inpIx, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_SEND_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_SEND_ENTRY, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + LLGenericOp<1, 1, Input, Output>(inpIx, outIx, eltN, postOp); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_RECV_REDUCE_COPY_SEND_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_RECV_REDUCE_COPY_SEND_EXIT, eltN*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif } __device__ void recvSend(int eltN) { return LLGenericOp<1, 1, -1, -1>(-1, -1, eltN, false); diff --git a/src/collectives/device/prims_simple.h b/src/collectives/device/prims_simple.h index 45c8544fdb..cc6f275041 100644 --- a/src/collectives/device/prims_simple.h +++ b/src/collectives/device/prims_simple.h @@ -5,6 +5,10 @@ * See LICENSE.txt for license information ************************************************************************/ +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif + template class Primitives< @@ -49,6 +53,15 @@ class Primitives< const uint64_t opCount; uint32_t* next_hdp_reg; +#if defined(ENABLE_NPKIT) +public: + int npKitCtxIdx = 0; + uint64_t npKitDataProcessEntryTime = 0; + uint64_t npKitDataProcessExitTime = 0; + uint64_t npKitDataProcessTotalTime = 0; +private: +#endif + // Don't use barrier 0 as it's used by the final sync inline __device__ void barrier() { #if defined(__HIP_PLATFORM_HCC__) || defined(__HCC__) || defined(__HIPCC__) @@ -206,21 +219,93 @@ class Primitives< if (DirectRecv && ncclShmem->groups[group].srcs[0] == ncclShmem->groups[group].dsts[0]) { // We can only have one direct receive. Since srcs[0] == dstPtr+offset, skip one copy if (Send) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime(); + } +#endif + // (1-Send) is only there to avoid compilation errors in case MaxSend=0 (and Send=0). ReduceOrCopyMulti (tid, nworkers, nullptr, false, 1, (T const**)ncclShmem->groups[group].srcs, fan.nsend(), (T**)ncclShmem->groups[group].dsts+1, sliceSize); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime(); + npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } } else if (DirectSend && !DirectRecv && SrcBuf != Input && ncclShmem->groups[group].dsts[Dst] == nullptr) { // For broadcast in CollNet to do empty send + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime(); + } +#endif + ReduceOrCopyMulti (tid, nworkers, ncclShmem->redOpArgs, postOp, Recv, (T const**)ncclShmem->groups[group].srcs, Dst, (T**)ncclShmem->groups[group].dsts, sliceSize); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime(); + npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } else { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessEntryTime = __builtin_amdgcn_s_memrealtime(); + } +#endif + constexpr int PreOpN = SrcBuf != Input ? 0 : DirectRecv*MaxRecv == NCCL_MAX_DIRECT_ARITY ? (1+NCCL_MAX_DIRECT_ARITY) : 1; ReduceOrCopyMulti @@ -228,6 +313,21 @@ class Primitives< Recv*fan.nrecv()+Src, (T const**)ncclShmem->groups[group].srcs, Send*fan.nsend()+Dst, (T**)ncclShmem->groups[group].dsts, sliceSize); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME) + if (tid == 0) { + npKitDataProcessExitTime = __builtin_amdgcn_s_memrealtime(); + npKitDataProcessTotalTime += npKitDataProcessExitTime - npKitDataProcessEntryTime; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, sliceSize*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } barrier(); // This barrier has a counterpart in following loop #if defined(__gfx1030__) diff --git a/src/collectives/device/sendrecv.h b/src/collectives/device/sendrecv.h index 28eef3ea59..fa906d2d65 100644 --- a/src/collectives/device/sendrecv.h +++ b/src/collectives/device/sendrecv.h @@ -8,14 +8,68 @@ #include "devcomm.h" #include "collectives.h" #include "primitives.h" +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif template struct RunWork { __device__ __forceinline__ void runSend(const int tid, const int nthreads, const int group, struct ncclWorkElemP2p* args) { + +#if defined(ENABLE_NPKIT) + bool isNpKitThread = (tid == 0); + int npKitCtxIdx = blockIdx.x * NCCL_MAX_WORK_ELEMENTS_P2P; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (isNpKitThread) { + uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + if (args->peer == ncclShmem->comm.rank) { struct ncclWorkElemP2p* recvArgs = args-1; if (args->buff != recvArgs->buff) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_LOCAL_COPY_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_LOCAL_COPY_ENTRY, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + ReduceOrCopyMulti(tid, nthreads, nullptr, false, 1, (const T**)&args->buff, 1, (T**)&recvArgs->buff, args->count); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_LOCAL_COPY_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_LOCAL_COPY_EXIT, args->count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } } else { using Proto = ProtoSimple<1, 1>; @@ -24,16 +78,59 @@ struct RunWork { int const peer = args->peer; Primitives, 0, Proto, 1> prims (tid, nthreads, nullptr, &peer, args->buff, nullptr, /*redOpArg(ignored)=*/0, group); + +#if defined(ENABLE_NPKIT) + if (isNpKitThread) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_SEND_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_SEND_ENTRY, count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + ssize_t offset = 0; do { int nelem = min(chunkSize, count-offset); prims.directSend(offset, offset, nelem); offset += nelem; } while(offset < count); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_SEND_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_SEND_EXIT, count*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } } __device__ __forceinline__ void runRecv(const int tid, const int nthreads, const int group, struct ncclWorkElemP2p* args) { +#if defined(ENABLE_NPKIT) + bool isNpKitThread = (tid == 0); + int npKitCtxIdx = blockIdx.x * NCCL_MAX_WORK_ELEMENTS_P2P + 1; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (isNpKitThread) { + uint64_t* cpuTimestamp = ncclShmem->comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + if (args->peer != ncclShmem->comm.rank) { using Proto = ProtoSimple<1, 1>; ssize_t const count = args->count; @@ -41,12 +138,35 @@ struct RunWork { int const peer = args->peer; Primitives, 0, Proto, 1> prims (tid, nthreads, &peer, nullptr, nullptr, args->buff, /*redOpArg(ignored)=*/0, group); + +#if defined(ENABLE_NPKIT) + if (isNpKitThread) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_RECV_ENTRY) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_RECV_ENTRY, count*sizeof(T), 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + prims.npKitDataProcessTotalTime = 0; + } +#endif + ssize_t offset = 0; do { int nelem = min(chunkSize, count-offset); prims.directRecv(offset, nelem); offset += nelem; } while(offset < count); + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_SEND_RECV_RECV_EXIT) + if (isNpKitThread) { + NpKit::CollectGpuEvent(NPKIT_EVENT_SEND_RECV_RECV_EXIT, count*sizeof(T), prims.npKitDataProcessTotalTime, __builtin_amdgcn_s_memrealtime(), + ncclShmem->comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + } } diff --git a/src/include/devcomm.h b/src/include/devcomm.h index 0c7fb3dfbe..855923cfb1 100644 --- a/src/include/devcomm.h +++ b/src/include/devcomm.h @@ -11,6 +11,9 @@ #include "nccl.h" #include "rccl_bfloat16.h" #include "align.h" +#if defined(ENABLE_NPKIT) +#include "npkit/npkit_struct.h" +#endif #include // [RCCL] Support for clique-based kernels //#include "clique/CliqueCommon.h" @@ -419,6 +422,11 @@ struct ncclDevComm { // Channels, device side struct ncclChannel* channels; +#if defined(ENABLE_NPKIT) + NpKitEventCollectContext* npKitEventCollectContexts; + uint64_t* cpuTimestamp; +#endif + #ifdef ENABLE_PROFILING // Profiling counters struct ncclProf devProf; diff --git a/src/include/npkit/npkit.h b/src/include/npkit/npkit.h new file mode 100644 index 0000000000..e12cf88ec1 --- /dev/null +++ b/src/include/npkit/npkit.h @@ -0,0 +1,65 @@ +#ifndef NPKIT_H_ +#define NPKIT_H_ + +#include +#include + +#include + +#include "npkit/npkit_event.h" +#include "npkit/npkit_struct.h" + +class NpKit { + public: + static const uint64_t kNumGpuEventBuffers = 512; + + static const uint64_t kNumCpuEventBuffers = 32; + + static ncclResult_t Init(int rank); + + static ncclResult_t Dump(const std::string& dump_dir); + + static ncclResult_t Shutdown(); + + static NpKitEventCollectContext* GetGpuEventCollectContexts(); + + static inline __device__ void CollectGpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, + NpKitEventCollectContext* ctx) { + uint64_t event_buffer_head = ctx->event_buffer_head; + if (event_buffer_head < kMaxNumGpuEventsPerBuffer) { + NpKitEvent& event = ctx->event_buffer[event_buffer_head]; + event.fields.type = type; + event.fields.size = size; + event.fields.rsvd = rsvd; + event.fields.timestamp = timestamp; + ctx->event_buffer_head++; + } + } + + static void CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id); + + static uint64_t* GetCpuTimestamp(); + + private: + static void CpuTimestampUpdateThread(); + + // 64K * 512 * 16B = 512MB per GPU + static const uint64_t kMaxNumGpuEventsPerBuffer = 1ULL << 16; + + // 64K * 2 (send/recv) * (512/32) = 2M, 2M * 32 * 16B = 1GB per CPU + static const uint64_t kMaxNumCpuEventsPerBuffer = 1ULL << 21; + + static NpKitEvent** gpu_event_buffers_; + static NpKitEvent** cpu_event_buffers_; + + static NpKitEventCollectContext* gpu_collect_contexts_; + static NpKitEventCollectContext* cpu_collect_contexts_; + static uint64_t* cpu_timestamp_; + + static uint64_t rank_; + + static std::thread* cpu_timestamp_update_thread_; + static volatile bool cpu_timestamp_update_thread_should_stop_; +}; + +#endif diff --git a/src/include/npkit/npkit_event.h b/src/include/npkit/npkit_event.h new file mode 100644 index 0000000000..b328fc9e55 --- /dev/null +++ b/src/include/npkit/npkit_event.h @@ -0,0 +1,98 @@ +#ifndef NPKIT_EVENT_H_ +#define NPKIT_EVENT_H_ + +#define NPKIT_EVENT_INVALID 0x0 + +#define NPKIT_EVENT_ALL_REDUCE_RING_ENTRY 0x1 +#define NPKIT_EVENT_ALL_REDUCE_RING_EXIT 0x2 +#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_ENTRY 0x3 +#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_EXIT 0x4 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_ENTRY 0x5 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_EXIT 0x6 + +#define NPKIT_EVENT_COPY_SEND_ENTRY 0x7 +#define NPKIT_EVENT_COPY_SEND_EXIT 0x8 +#define NPKIT_EVENT_DIRECT_COPY_SEND_ENTRY 0x9 +#define NPKIT_EVENT_DIRECT_COPY_SEND_EXIT 0xA +#define NPKIT_EVENT_DIRECT_RECV_ENTRY 0xB +#define NPKIT_EVENT_DIRECT_RECV_EXIT 0xC +#define NPKIT_EVENT_DIRECT_RECV_COPY_SEND_ENTRY 0xD +#define NPKIT_EVENT_DIRECT_RECV_COPY_SEND_EXIT 0xE +#define NPKIT_EVENT_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY 0xF +#define NPKIT_EVENT_DIRECT_RECV_REDUCE_COPY_SEND_EXIT 0x10 +#define NPKIT_EVENT_DIRECT_SEND_ENTRY 0x11 +#define NPKIT_EVENT_DIRECT_SEND_EXIT 0x12 +#define NPKIT_EVENT_DIRECT_SEND_FROM_OUTPUT_ENTRY 0x13 +#define NPKIT_EVENT_DIRECT_SEND_FROM_OUTPUT_EXIT 0x14 +#define NPKIT_EVENT_RECV_ENTRY 0x15 +#define NPKIT_EVENT_RECV_EXIT 0x16 +#define NPKIT_EVENT_RECV_COPY_SEND_ENTRY 0x17 +#define NPKIT_EVENT_RECV_COPY_SEND_EXIT 0x18 +#define NPKIT_EVENT_RECV_REDUCE_COPY_ENTRY 0x19 +#define NPKIT_EVENT_RECV_REDUCE_COPY_EXIT 0x1A +#define NPKIT_EVENT_RECV_REDUCE_COPY_SEND_ENTRY 0x1B +#define NPKIT_EVENT_RECV_REDUCE_COPY_SEND_EXIT 0x1C +#define NPKIT_EVENT_RECV_REDUCE_SEND_ENTRY 0x1D +#define NPKIT_EVENT_RECV_REDUCE_SEND_EXIT 0x1E +#define NPKIT_EVENT_SEND_ENTRY 0x1F +#define NPKIT_EVENT_SEND_EXIT 0x20 +#define NPKIT_EVENT_SEND_FROM_OUTPUT_ENTRY 0x21 +#define NPKIT_EVENT_SEND_FROM_OUTPUT_EXIT 0x22 + +#define NPKIT_EVENT_PRIM_SIMPLE_WAIT_PEER_ENTRY 0x23 +#define NPKIT_EVENT_PRIM_SIMPLE_WAIT_PEER_EXIT 0x24 +#define NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_ENTRY 0x25 +#define NPKIT_EVENT_PRIM_SIMPLE_REDUCE_OR_COPY_MULTI_EXIT 0x26 + +#define NPKIT_EVENT_PRIM_LL_WAIT_SEND_ENTRY 0x27 +#define NPKIT_EVENT_PRIM_LL_WAIT_SEND_EXIT 0x28 +#define NPKIT_EVENT_PRIM_LL_DATA_PROCESS_ENTRY 0x29 +#define NPKIT_EVENT_PRIM_LL_DATA_PROCESS_EXIT 0x2A + +#define NPKIT_EVENT_PRIM_LL128_WAIT_SEND_ENTRY 0x2B +#define NPKIT_EVENT_PRIM_LL128_WAIT_SEND_EXIT 0x2C +#define NPKIT_EVENT_PRIM_LL128_DATA_PROCESS_ENTRY 0x2D +#define NPKIT_EVENT_PRIM_LL128_DATA_PROCESS_EXIT 0x2E + +#define NPKIT_EVENT_NET_SEND_ENTRY 0x2F +#define NPKIT_EVENT_NET_SEND_EXIT 0x30 + +#define NPKIT_EVENT_NET_RECV_ENTRY 0x31 +#define NPKIT_EVENT_NET_RECV_EXIT 0x32 + +#define NPKIT_EVENT_TIME_SYNC_GPU 0x33 +#define NPKIT_EVENT_TIME_SYNC_CPU 0x34 + +#define NPKIT_EVENT_ALL_REDUCE_RING_SEND_ENTRY 0x35 +#define NPKIT_EVENT_ALL_REDUCE_RING_SEND_EXIT 0x36 +#define NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_ENTRY 0x37 +#define NPKIT_EVENT_ALL_REDUCE_RING_RECV_REDUCE_SEND_EXIT 0x38 +#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_ENTRY 0x39 +#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_REDUCE_COPY_SEND_EXIT 0x3A +#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_ENTRY 0x3B +#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_COPY_SEND_EXIT 0x3C +#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_ENTRY 0x3D +#define NPKIT_EVENT_ALL_REDUCE_RING_DIRECT_RECV_EXIT 0x3E + +#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_ENTRY 0x3F +#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_REDUCE_EXIT 0x40 +#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_ENTRY 0x41 +#define NPKIT_EVENT_ALL_REDUCE_TREE_UPDOWN_BROADCAST_EXIT 0x42 + +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_ENTRY 0x43 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_BROADCAST_EXIT 0x44 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_ENTRY 0x45 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_REDUCE_EXIT 0x46 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_ENTRY 0x47 +#define NPKIT_EVENT_ALL_REDUCE_TREE_SPLIT_BROADCAST_EXIT 0x48 + +#define NPKIT_EVENT_SEND_RECV_LOCAL_COPY_ENTRY 0x49 +#define NPKIT_EVENT_SEND_RECV_LOCAL_COPY_EXIT 0x4A +#define NPKIT_EVENT_SEND_RECV_SEND_ENTRY 0x4B +#define NPKIT_EVENT_SEND_RECV_SEND_EXIT 0x4C +#define NPKIT_EVENT_SEND_RECV_RECV_ENTRY 0x4D +#define NPKIT_EVENT_SEND_RECV_RECV_EXIT 0x4E + +#define NPKIT_PRIM_COLLECT_DATA_PROCESS_TIME 0x4F + +#endif diff --git a/src/include/npkit/npkit_struct.h b/src/include/npkit/npkit_struct.h new file mode 100644 index 0000000000..89dadcb4e4 --- /dev/null +++ b/src/include/npkit/npkit_struct.h @@ -0,0 +1,25 @@ +#ifndef NPKIT_STRUCT_H_ +#define NPKIT_STRUCT_H_ + +#include + +#pragma pack(push, 1) + +union NpKitEvent { + uint64_t bits[2]; + struct { + uint64_t type : 8; + uint64_t size : 32; + uint64_t rsvd : 24; + uint64_t timestamp; + } fields; +}; + +struct NpKitEventCollectContext { + NpKitEvent* event_buffer; + uint64_t event_buffer_head; +}; + +#pragma pack(pop) + +#endif diff --git a/src/include/proxy.h b/src/include/proxy.h index 1cf88d7b1c..f3c59ec9ba 100644 --- a/src/include/proxy.h +++ b/src/include/proxy.h @@ -58,6 +58,10 @@ struct ncclProxySubArgs { uint64_t end; void* requests[NCCL_STEPS]; void* profilingEvents[NCCL_STEPS]; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + int npKitSizesFifo[NCCL_STEPS]; +#endif }; struct ncclProxyArgs { diff --git a/src/init.cc b/src/init.cc index 9e2fd2dcf8..bd20ec1519 100644 --- a/src/init.cc +++ b/src/init.cc @@ -17,6 +17,9 @@ #include "enqueue.h" #include "graph.h" #include "argcheck.h" +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif #include #include #include @@ -501,6 +504,13 @@ static ncclResult_t devCommSetup(ncclComm_t comm) { NCCLCHECK(ncclCudaMemcpy(comm->channels[r].ring.devUserRanks, comm->channels[r].ring.userRanks, comm->nRanks)); } +#if defined(ENABLE_NPKIT) + // Init NPKit + NCCLCHECK(NpKit::Init(comm->rank)); + comm->hostDevComm.npKitEventCollectContexts = NpKit::GetGpuEventCollectContexts(); + comm->hostDevComm.cpuTimestamp = NpKit::GetCpuTimestamp(); +#endif + // Duplicate the dev comm on the device NCCLCHECK(ncclCudaMemcpy(comm->devComm, &comm->hostDevComm, 1)); return ncclSuccess; @@ -1399,6 +1409,17 @@ static ncclResult_t commDestroy(ncclComm_t comm) { if (savedDevice != commDevice) CUDACHECK(hipSetDevice(savedDevice)); +#if defined(ENABLE_NPKIT) + // Dump NPKit events and shutdown + const char* npkitDumpDir = getenv("NPKIT_DUMP_DIR"); + if (npkitDumpDir == nullptr) { + WARN("NPKIT_DUMP_DIR is empty"); + } else { + NCCLCHECK(NpKit::Dump(npkitDumpDir)); + } + NCCLCHECK(NpKit::Shutdown()); +#endif + return ncclSuccess; } diff --git a/src/misc/npkit.cc b/src/misc/npkit.cc new file mode 100644 index 0000000000..89122fd495 --- /dev/null +++ b/src/misc/npkit.cc @@ -0,0 +1,169 @@ +#include +#include +#include + +#include "alloc.h" +#include "npkit/npkit.h" + +uint64_t NpKit::rank_ = 0; + +NpKitEvent** NpKit::gpu_event_buffers_ = nullptr; +NpKitEvent** NpKit::cpu_event_buffers_ = nullptr; + +NpKitEventCollectContext* NpKit::gpu_collect_contexts_ = nullptr; +NpKitEventCollectContext* NpKit::cpu_collect_contexts_ = nullptr; +uint64_t* NpKit::cpu_timestamp_ = nullptr; + +std::thread* NpKit::cpu_timestamp_update_thread_ = nullptr; +volatile bool NpKit::cpu_timestamp_update_thread_should_stop_ = false; + +void NpKit::CpuTimestampUpdateThread() { + uint64_t init_system_clock = std::chrono::system_clock::now().time_since_epoch().count(); + uint64_t init_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); + uint64_t curr_steady_clock = 0; + volatile uint64_t* volatile_cpu_timestamp_ = cpu_timestamp_; + while (!cpu_timestamp_update_thread_should_stop_) { + curr_steady_clock = std::chrono::steady_clock::now().time_since_epoch().count(); + *volatile_cpu_timestamp_ = init_system_clock + (curr_steady_clock - init_steady_clock); + } +} + +ncclResult_t NpKit::Init(int rank) { + uint64_t i = 0; + NpKitEventCollectContext ctx; + ctx.event_buffer_head = 0; + rank_ = rank; + + // Init event data structures + NCCLCHECK(ncclCalloc(&gpu_event_buffers_, kNumGpuEventBuffers)); + NCCLCHECK(ncclCudaCalloc(&gpu_collect_contexts_, kNumGpuEventBuffers)); + for (i = 0; i < kNumGpuEventBuffers; i++) { + NCCLCHECK(ncclCudaCalloc(gpu_event_buffers_ + i, kMaxNumGpuEventsPerBuffer)); + ctx.event_buffer = gpu_event_buffers_[i]; + NCCLCHECK(ncclCudaMemcpy(gpu_collect_contexts_ + i, &ctx, 1)); + } + + NCCLCHECK(ncclCalloc(&cpu_event_buffers_, kNumCpuEventBuffers)); + NCCLCHECK(ncclCalloc(&cpu_collect_contexts_, kNumCpuEventBuffers)); + for (i = 0; i < kNumCpuEventBuffers; i++) { + NCCLCHECK(ncclCalloc(cpu_event_buffers_ + i, kMaxNumCpuEventsPerBuffer)); + ctx.event_buffer = cpu_event_buffers_[i]; + cpu_collect_contexts_[i] = ctx; + } + + // Init timestamp + NCCLCHECK(ncclCudaHostCalloc(&cpu_timestamp_, 1)); + cpu_timestamp_update_thread_should_stop_ = false; + cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread); + + return ncclSuccess; +} + +ncclResult_t NpKit::Dump(const std::string& dump_dir) { + uint64_t i = 0; + std::string dump_file_path; + + // Dump CPU events + for (i = 0; i < kNumCpuEventBuffers; i++) { + dump_file_path = dump_dir; + dump_file_path += "/cpu_events_rank_"; + dump_file_path += std::to_string(rank_); + dump_file_path += "_channel_"; + dump_file_path += std::to_string(i); + auto cpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); + cpu_trace_file.write(reinterpret_cast(cpu_event_buffers_[i]), + cpu_collect_contexts_[i].event_buffer_head * sizeof(NpKitEvent)); + cpu_trace_file.close(); + } + + // Dump CPU clock info + dump_file_path = dump_dir; + dump_file_path += "/cpu_clock_period_num_rank_"; + dump_file_path += std::to_string(rank_); + std::string clock_period_num_str = std::to_string(std::chrono::steady_clock::duration::period::num); + auto clock_period_num_file = std::fstream(dump_file_path, std::ios::out); + clock_period_num_file.write(clock_period_num_str.c_str(), clock_period_num_str.length()); + clock_period_num_file.close(); + + dump_file_path = dump_dir; + dump_file_path += "/cpu_clock_period_den_rank_"; + dump_file_path += std::to_string(rank_); + std::string clock_period_den_str = std::to_string(std::chrono::steady_clock::duration::period::den); + auto clock_period_den_file = std::fstream(dump_file_path, std::ios::out); + clock_period_den_file.write(clock_period_den_str.c_str(), clock_period_den_str.length()); + clock_period_den_file.close(); + + // Dump GPU events, reuse CPU struct + for (i = 0; i < kNumGpuEventBuffers; i++) { + dump_file_path = dump_dir; + dump_file_path += "/gpu_events_rank_"; + dump_file_path += std::to_string(rank_); + dump_file_path += "_buf_"; + dump_file_path += std::to_string(i); + NCCLCHECK(ncclCudaMemcpy(cpu_event_buffers_[0], gpu_event_buffers_[i], kMaxNumGpuEventsPerBuffer)); + NCCLCHECK(ncclCudaMemcpy(cpu_collect_contexts_, gpu_collect_contexts_ + i, 1)); + auto gpu_trace_file = std::fstream(dump_file_path, std::ios::out | std::ios::binary); + gpu_trace_file.write(reinterpret_cast(cpu_event_buffers_[0]), + cpu_collect_contexts_[0].event_buffer_head * sizeof(NpKitEvent)); + gpu_trace_file.close(); + } + + // Dump GPU clockRate + dump_file_path = dump_dir; + dump_file_path += "/gpu_clock_rate_rank_"; + dump_file_path += std::to_string(rank_); + constexpr int vega_gpu_rtc_freq_in_khz = 25000; + std::string clock_rate_str = std::to_string(vega_gpu_rtc_freq_in_khz); + auto gpu_clock_rate_file = std::fstream(dump_file_path, std::ios::out); + gpu_clock_rate_file.write(clock_rate_str.c_str(), clock_rate_str.length()); + gpu_clock_rate_file.close(); + + return ncclSuccess; +} + +ncclResult_t NpKit::Shutdown() { + uint64_t i = 0; + + // Stop CPU timestamp updating thread + cpu_timestamp_update_thread_should_stop_ = true; + cpu_timestamp_update_thread_->join(); + + // Free CPU event data structures + for (i = 0; i < kNumCpuEventBuffers; i++) { + free(cpu_event_buffers_[i]); + } + free(cpu_event_buffers_); + free(cpu_collect_contexts_); + + // Free GPU event data structures + for (i = 0; i < kNumGpuEventBuffers; i++) { + CUDACHECK(hipFree(gpu_event_buffers_[i])); + } + free(gpu_event_buffers_); + CUDACHECK(hipFree(gpu_collect_contexts_)); + + // Free timestamp + NCCLCHECK(ncclCudaHostFree(cpu_timestamp_)); + + return ncclSuccess; +} + +NpKitEventCollectContext* NpKit::GetGpuEventCollectContexts() { + return gpu_collect_contexts_; +} + +void NpKit::CollectCpuEvent(uint8_t type, uint32_t size, uint32_t rsvd, uint64_t timestamp, int channel_id) { + uint64_t event_buffer_head = cpu_collect_contexts_[channel_id].event_buffer_head; + if (event_buffer_head < kMaxNumCpuEventsPerBuffer) { + NpKitEvent& event = cpu_collect_contexts_[channel_id].event_buffer[event_buffer_head]; + event.fields.type = type; + event.fields.size = size; + event.fields.rsvd = rsvd; + event.fields.timestamp = timestamp; + cpu_collect_contexts_[channel_id].event_buffer_head++; + } +} + +uint64_t* NpKit::GetCpuTimestamp() { + return cpu_timestamp_; +} diff --git a/src/transport/net.cc b/src/transport/net.cc index b7b8b753b4..2867e35452 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -14,6 +14,9 @@ #include "gdrwrap.h" #include "shm.h" #include "profiler.h" +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif static_assert(sizeof(ncclNetHandle_t) <= CONNECT_SIZE, "NET Connect info is too large"); @@ -777,7 +780,16 @@ static ncclResult_t recvProxyFree(struct ncclProxyConnection* connection, struct static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps"); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) +static int g_npkit_net_poll_cnt = 0; +#endif + static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt++; +#endif + if (args->state == ncclProxyOpReady) { for (int s=0; snsubs; s++) { struct ncclProxySubArgs* sub = args->subs+s; @@ -831,6 +843,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg if (sizesFifo[buffSlot] != -1 && ((*recvTail > (sub->base+sub->transmitted)) || p == NCCL_PROTO_LL)) { // We have something to receive, let's check if it's completely ready. int size = sizesFifo[buffSlot]; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + sub->npKitSizesFifo[buffSlot] = size; +#endif + char* buff = resources->shared ? localBuff+resources->recvMem->offsFifo[buffSlot] : localBuff+buffSlot*stepSize; int ready = 1; if (p == NCCL_PROTO_LL128) { @@ -865,6 +882,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg // Data is ready, try to send. NCCLCHECK(ncclNetIsend(resources->netSendComm, buff, size, resources->rank, mhandle, sub->requests+buffSlot)); if (sub->requests[buffSlot] != NULL) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_SEND_ENTRY, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + size, +#endif + uint64_t(sub->requests+buffSlot)/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + TRACE(NCCL_NET, "sendProxy [%ld/%d] Isend posted, req %p", sub->transmitted, buffSlot, sub->requests[buffSlot]); sizesFifo[buffSlot] = -1; // Make sure size is reset to zero before we update the head. @@ -883,6 +916,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg int buffSlot = (sub->base+sub->done)%NCCL_STEPS; NCCLCHECK(ncclNetTest(sub->requests[buffSlot], &done, NULL)); if (done) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_SEND_EXIT, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + sub->npKitSizesFifo[buffSlot], +#endif + uint64_t(sub->requests+buffSlot)/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + TRACE(NCCL_NET, "sendProxy [%ld/%d] request %p done", sub->done, buffSlot, sub->requests[buffSlot]); sub->done += args->sliceSteps; for (uint64_t step=sub->done-args->sliceSteps; stepdone; step++) ncclProfilingRecord(args, s, step, ncclProxyProfileEnd); @@ -908,6 +957,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg } static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt++; +#endif + if (args->state == ncclProxyOpReady) { // Initialize subs and group them by same recvComm. void* recvComm; @@ -989,6 +1043,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg if (*requestPtr) { for (int i=0; igroupSize; i++) { struct ncclProxySubArgs* sub = subGroup+i; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_RECV_ENTRY, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + sizes[i], +#endif + uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + sub->posted += args->sliceSteps; for (uint64_t step=sub->posted-args->sliceSteps; stepposted; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvWait); } @@ -1014,6 +1084,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg for (int i=0; igroupSize; i++) { struct ncclProxySubArgs* sub = subGroup + i; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_RECV_EXIT, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + sizes[i], +#endif + uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + sub->received += args->sliceSteps; for (uint64_t step=sub->received-args->sliceSteps; stepreceived; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvFlushWait); if (step < sub->nsteps) {