This commit is contained in:
Ziyue Yang
2022-10-14 01:27:05 +00:00
والد 4972c129e3
کامیت 7d6bbc19d4
7فایلهای تغییر یافته به همراه158 افزوده شده و 0 حذف شده
@@ -25,11 +25,36 @@ namespace {
const ssize_t loopSize = nChannels*int(chunkSize);
const ssize_t size = args->count;
#if defined(ENABLE_NPKIT)
int npKitCtxIdx = bid;
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (tid == 0) {
uint64_t* cpuTimestamp = ncclShmem.comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
T *inputBuf = (T*)args->sendbuff;
T *outputBuf = (T*)args->recvbuff;
Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0> prims
(tid, nthreads, &ring->prev, &ring->next, inputBuf, outputBuf, args->redOpArg, args->connIndex << 16);
#if defined(ENABLE_NPKIT)
if (tid == 0) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t realChunkSize;
if (Proto::Id == NCCL_PROTO_SIMPLE) {
@@ -24,11 +24,36 @@ namespace {
const int nextRank = ring->userRanks[1];
const int root = args->root;
#if defined(ENABLE_NPKIT)
int npKitCtxIdx = bid;
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_CPU)
if (tid == 0) {
uint64_t* cpuTimestamp = ncclShmem.comm.cpuTimestamp;
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_CPU, 0, 0, *cpuTimestamp,
ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_TIME_SYNC_GPU)
if (tid == 0) {
NpKit::CollectGpuEvent(NPKIT_EVENT_TIME_SYNC_GPU, 0, 0, __builtin_amdgcn_s_memrealtime(),
ncclShmem.comm.npKitEventCollectContexts + npKitCtxIdx);
}
#endif
T *inputBuf = (T*)args->sendbuff;
T *outputBuf = (T*)args->recvbuff;
Primitives<T, RedOp, FanSymmetric<1>, 0, Proto, 0>
prims(tid, nthreads, &ring->prev, &ring->next, inputBuf, outputBuf, args->redOpArg, args->connIndex << 16);
#if defined(ENABLE_NPKIT)
if (tid == 0) {
prims.npKitCtxIdx = npKitCtxIdx;
}
#endif
for (ssize_t gridOffset = 0; gridOffset < size; gridOffset += loopSize) {
ssize_t realChunkSize;
if (Proto::Id == NCCL_PROTO_SIMPLE) {
@@ -1,3 +1,8 @@
/*************************************************************************
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
************************************************************************/
#ifndef NPKIT_H_
#define NPKIT_H_
@@ -1,3 +1,8 @@
/*************************************************************************
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
************************************************************************/
#ifndef NPKIT_EVENT_H_
#define NPKIT_EVENT_H_
@@ -1,3 +1,8 @@
/*************************************************************************
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
************************************************************************/
#ifndef NPKIT_STRUCT_H_
#define NPKIT_STRUCT_H_
+7
مشاهده پرونده
@@ -1,3 +1,8 @@
/*************************************************************************
* Copyright (c) Microsoft Corporation.
* Licensed under the MIT License.
************************************************************************/
#include <chrono>
#include <fstream>
#include <unistd.h>
@@ -53,6 +58,8 @@ ncclResult_t NpKit::Init(int rank) {
// Init timestamp
NCCLCHECK(ncclCudaHostCalloc(&cpu_timestamp_, 1));
volatile uint64_t* volatile_cpu_timestamp = cpu_timestamp_;
*volatile_cpu_timestamp = std::chrono::system_clock::now().time_since_epoch().count();
cpu_timestamp_update_thread_should_stop_ = false;
cpu_timestamp_update_thread_ = new std::thread(CpuTimestampUpdateThread);
+86
مشاهده پرونده
@@ -15,6 +15,9 @@
#include "profiler.h"
#include "graph.h"
#include "graph/topo.h"
#if defined(ENABLE_NPKIT)
#include "npkit/npkit.h"
#endif
static_assert(sizeof(ncclNetHandle_t) <= CONNECT_SIZE, "NET Connect info is too large");
@@ -831,7 +834,16 @@ static ncclResult_t recvProxyFree(struct ncclProxyConnection* connection, struct
static_assert(NCCL_STEPS <= NCCL_NET_MAX_REQUESTS, "Not enough net requests to cover for steps");
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
static int g_npkit_net_poll_cnt = 0;
#endif
static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
#endif
if (args->state == ncclProxyOpReady) {
for (int s=0; s<args->nsubs; s++) {
struct ncclProxySubArgs* sub = args->subs+s;
@@ -885,6 +897,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
if (sizesFifo[buffSlot] != -1 && ((*recvTail > (sub->base+sub->transmitted)) || p == NCCL_PROTO_LL)) {
// We have something to receive, let's check if it's completely ready.
int size = sizesFifo[buffSlot];
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
sub->npKitSizesFifo[buffSlot] = size;
#endif
char* buff = resources->shared ? localBuff+resources->recvMem->offsFifo[buffSlot] : localBuff+buffSlot*stepSize;
int ready = 1;
if (p == NCCL_PROTO_LL128) {
@@ -919,6 +936,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
// Data is ready, try to send.
NCCLCHECK(ncclNetIsend(comm, resources->netSendComm, buff, size, resources->rank, mhandle, sub->requests+buffSlot));
if (sub->requests[buffSlot] != NULL) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_SEND_ENTRY,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
size,
#endif
uint64_t(sub->requests+buffSlot)/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
TRACE(NCCL_NET, "sendProxy [%ld/%d] Isend posted, req %p", sub->transmitted, buffSlot, sub->requests[buffSlot]);
sizesFifo[buffSlot] = -1;
// Make sure size is reset to zero before we update the head.
@@ -937,6 +970,22 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
int buffSlot = (sub->base+sub->done)%NCCL_STEPS;
NCCLCHECK(ncclNetTest(comm, sub->requests[buffSlot], &done, NULL));
if (done) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_SEND_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_SEND_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_SEND_EXIT,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
sub->npKitSizesFifo[buffSlot],
#endif
uint64_t(sub->requests+buffSlot)/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
TRACE(NCCL_NET, "sendProxy [%ld/%d] request %p done", sub->done, buffSlot, sub->requests[buffSlot]);
sub->done += args->sliceSteps;
for (uint64_t step=sub->done-args->sliceSteps; step<sub->done; step++) ncclProfilingRecord(args, s, step, ncclProxyProfileEnd);
@@ -962,6 +1011,11 @@ static ncclResult_t sendProxyProgress(struct ncclComm* comm, struct ncclProxyArg
}
static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArgs* args) {
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt++;
#endif
if (args->state == ncclProxyOpReady) {
// Initialize subs and group them by same recvComm.
void* recvComm;
@@ -1043,6 +1097,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg
if (*requestPtr) {
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup+i;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_RECV_ENTRY,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
sizes[i],
#endif
uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
sub->posted += args->sliceSteps;
for (uint64_t step=sub->posted-args->sliceSteps; step<sub->posted; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvWait);
}
@@ -1068,6 +1138,22 @@ static ncclResult_t recvProxyProgress(struct ncclComm* comm, struct ncclProxyArg
for (int i=0; i<NCCL_PROXY_MAX_SUBS; i++) totalSize += sizes[i];
for (int i=0; i<subGroup->groupSize; i++) {
struct ncclProxySubArgs* sub = subGroup + i;
#if defined(ENABLE_NPKIT) && defined(ENABLE_NPKIT_EVENT_NET_RECV_ENTRY) && defined(ENABLE_NPKIT_EVENT_NET_RECV_EXIT)
NpKit::CollectCpuEvent(
NPKIT_EVENT_NET_RECV_EXIT,
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt,
#else
sizes[i],
#endif
uint64_t(sub->requests+(step%NCCL_STEPS))/sizeof(void*),
*(volatile uint64_t*)NpKit::GetCpuTimestamp(), sub->channelId);
#if defined(ENABLE_NPKIT_NET_COLLECT_POLL_CNT)
g_npkit_net_poll_cnt = 0;
#endif
#endif
sub->received += args->sliceSteps;
for (uint64_t step=sub->received-args->sliceSteps; step<sub->received; step++) ncclProfilingRecord(args, s+i, step, ncclProxyProfileRecvFlushWait);
if (step < sub->nsteps) {