From 7d6bbc19d4e0d3f1f4ec864dbd9c11f498524e24 Mon Sep 17 00:00:00 2001 From: Ziyue Yang Date: Fri, 14 Oct 2022 01:27:05 +0000 Subject: [PATCH] apply npkit --- src/collectives/device/all_gather.h | 25 +++++++++ src/collectives/device/broadcast.h | 25 +++++++++ src/include/npkit/npkit.h | 5 ++ src/include/npkit/npkit_event.h | 5 ++ src/include/npkit/npkit_struct.h | 5 ++ src/misc/npkit.cc | 7 +++ src/transport/net.cc | 86 +++++++++++++++++++++++++++++ 7 files changed, 158 insertions(+) diff --git a/src/collectives/device/all_gather.h b/src/collectives/device/all_gather.h index 81a091cb23..9307cddee5 100644 --- a/src/collectives/device/all_gather.h +++ b/src/collectives/device/all_gather.h @@ -25,11 +25,36 @@ namespace { const ssize_t loopSize = nChannels*int(chunkSize); const ssize_t size = args->count; +#if defined(ENABLE_NPKIT) + int npKitCtxIdx = bid; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (tid == 0) { + uint64_t* cpuTimestamp = ncclShmem.comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + T *inputBuf = (T*)args->sendbuff; T *outputBuf = (T*)args->recvbuff; Primitives, 0, Proto, 0> prims (tid, nthreads, &ring->prev, &ring->next, inputBuf, outputBuf, args->redOpArg, args->connIndex << 16); +#if defined(ENABLE_NPKIT) + if (tid == 0) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t realChunkSize; if (Proto::Id == NCCL_PROTO_SIMPLE) { diff --git a/src/collectives/device/broadcast.h b/src/collectives/device/broadcast.h index 85c1999ded..498143b6d9 100644 --- a/src/collectives/device/broadcast.h +++ b/src/collectives/device/broadcast.h @@ -24,11 +24,36 @@ namespace { const int nextRank = ring->userRanks[1]; const int root = args->root; +#if defined(ENABLE_NPKIT) + int npKitCtxIdx = bid; +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU) + if (tid == 0) { + uint64_t* cpuTimestamp = ncclShmem.comm.cpuTimestamp; + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp, + ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU) + if (tid == 0) { + NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(), + ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx); + } +#endif + T *inputBuf = (T*)args->sendbuff; T *outputBuf = (T*)args->recvbuff; Primitives, 0, Proto, 0> prims(tid, nthreads, &ring->prev, &ring->next, inputBuf, outputBuf, args->redOpArg, args->connIndex << 16); +#if defined(ENABLE_NPKIT) + if (tid == 0) { + prims.npKitCtxIdx = npKitCtxIdx; + } +#endif + for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) { ssize_t realChunkSize; if (Proto::Id == NCCL_PROTO_SIMPLE) { diff --git a/src/include/npkit/npkit.h b/src/include/npkit/npkit.h index e12cf88ec1..8fea4df689 100644 --- a/src/include/npkit/npkit.h +++ b/src/include/npkit/npkit.h @@ -1,3 +1,8 @@ +/************************************************************************* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + ************************************************************************/ + #ifndef NPKIT_H_ #define NPKIT_H_ diff --git a/src/include/npkit/npkit_event.h b/src/include/npkit/npkit_event.h index b328fc9e55..fd1f940a88 100644 --- a/src/include/npkit/npkit_event.h +++ b/src/include/npkit/npkit_event.h @@ -1,3 +1,8 @@ +/************************************************************************* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + ************************************************************************/ + #ifndef NPKIT_EVENT_H_ #define NPKIT_EVENT_H_ diff --git a/src/include/npkit/npkit_struct.h b/src/include/npkit/npkit_struct.h index 89dadcb4e4..a31a0900fd 100644 --- a/src/include/npkit/npkit_struct.h +++ b/src/include/npkit/npkit_struct.h @@ -1,3 +1,8 @@ +/************************************************************************* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + ************************************************************************/ + #ifndef NPKIT_STRUCT_H_ #define NPKIT_STRUCT_H_ diff --git a/src/misc/npkit.cc b/src/misc/npkit.cc index 89122fd495..5f1d7a2dd2 100644 --- a/src/misc/npkit.cc +++ b/src/misc/npkit.cc @@ -1,3 +1,8 @@ +/************************************************************************* + * Copyright (c) Microsoft Corporation. + * Licensed under the MIT License. + ************************************************************************/ + #include #include #include @@ -53,6 +58,8 @@ ncclResult_t NpKit::Init(int rank) { // Init timestamp NCCLCHECK(ncclCudaHostCalloc(&cpu_timestamp_, 1)); + volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_; + *volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count(); cpu_timestamp_update_thread_should_stop_ = false; cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread); diff --git a/src/transport/net.cc b/src/transport/net.cc index f818f2d4be..86af4a1fed 100644 --- a/src/transport/net.cc +++ b/src/transport/net.cc @@ -15,6 +15,9 @@ #include "profiler.h" #include "graph.h" #include "graph/topo.h" +#if defined(ENABLE_NPKIT) +#include "npkit/npkit.h" +#endif static_assert(sizeof(ncclNetHandle_t) <= CONNECT_SIZE, "NET Connect info is too large"); @@ -831,7 +834,16 @@ static ncclResult_t recvProxyFree(struct ncclProxyConnection* connection, struct static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps"); +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) +static int g_npkit_net_poll_cnt = 0; +#endif + static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt++; +#endif + if (args->state == ncclProxyOpReady) { for (int s=0; snsubs; s++) { struct ncclProxySubArgs* sub = args->subs+s; @@ -885,6 +897,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg if (sizesFifo[buffSlot] != -1 && ((*recvTail > (sub->base+sub->transmitted)) || p == NCCL_PROTO_LL)) { // We have something to receive, let's check if it's completely ready. int size = sizesFifo[buffSlot]; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + sub->npKitSizesFifo[buffSlot] = size; +#endif + char* buff = resources->shared ? localBuff+resources->recvMem->offsFifo[buffSlot] : localBuff+buffSlot*stepSize; int ready = 1; if (p == NCCL_PROTO_LL128) { @@ -919,6 +936,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg // Data is ready, try to send. NCCLCHECK(ncclNetIsend(comm, resources->netSendComm, buff, size, resources->rank, mhandle, sub->requests+buffSlot)); if (sub->requests[buffSlot] != NULL) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_SEND_ENTRY, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + size, +#endif + uint64_t(sub->requests+buffSlot)/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + TRACE(NCCL_NET, "sendProxy [%ld/%d] Isend posted, req %p", sub->transmitted, buffSlot, sub->requests[buffSlot]); sizesFifo[buffSlot] = -1; // Make sure size is reset to zero before we update the head. @@ -937,6 +970,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg int buffSlot = (sub->base+sub->done)%NCCL_STEPS; NCCLCHECK(ncclNetTest(comm, sub->requests[buffSlot], &done, NULL)); if (done) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_SEND_EXIT, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + sub->npKitSizesFifo[buffSlot], +#endif + uint64_t(sub->requests+buffSlot)/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + TRACE(NCCL_NET, "sendProxy [%ld/%d] request %p done", sub->done, buffSlot, sub->requests[buffSlot]); sub->done += args->sliceSteps; for (uint64_t step=sub->done-args->sliceSteps; stepdone; step++) ncclProfilingRecord(args, s, step, ncclProxyProfileEnd); @@ -962,6 +1011,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg } static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) { + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt++; +#endif + if (args->state == ncclProxyOpReady) { // Initialize subs and group them by same recvComm. void* recvComm; @@ -1043,6 +1097,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg if (*requestPtr) { for (int i=0; igroupSize; i++) { struct ncclProxySubArgs* sub = subGroup+i; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_RECV_ENTRY, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + sizes[i], +#endif + uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + sub->posted += args->sliceSteps; for (uint64_t step=sub->posted-args->sliceSteps; stepposted; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvWait); } @@ -1068,6 +1138,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg for (int i=0; igroupSize; i++) { struct ncclProxySubArgs* sub = subGroup + i; + +#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT) + NpKit::CollectCpuEvent( + NPKIT_EVENT_NET_RECV_EXIT, +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt, +#else + sizes[i], +#endif + uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*), + *(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId); +#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT) + g_npkit_net_poll_cnt = 0; +#endif +#endif + sub->received += args->sliceSteps; for (uint64_t step=sub->received-args->sliceSteps; stepreceived; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvFlushWait); if (step < sub->nsteps) {