2.9.6-1
Add support for CUDA graphs. Fuse BCM Gen4 switches to avoid suboptimal performance on some platforms. Issue #439. Fix bootstrap issue caused by connection reordering. Fix CPU locking block. Improve CollNet algorithm. Improve performance on DGX A100 for communicators with only one GPU per node.
Этот коммит содержится в:
+264
-254
@@ -1,12 +1,11 @@
|
||||
/*************************************************************************
|
||||
* Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved.
|
||||
* Copyright (c) 2016-2021, NVIDIA CORPORATION. All rights reserved.
|
||||
*
|
||||
* See LICENSE.txt for license information
|
||||
************************************************************************/
|
||||
|
||||
#include "comm.h"
|
||||
#include "info.h"
|
||||
#include "graph.h"
|
||||
#include "collectives.h"
|
||||
|
||||
enum { proxyRecv=0, proxySend=1 };
|
||||
@@ -34,26 +33,32 @@ struct ncclProxyPool {
|
||||
static ncclResult_t allocateArgs(struct ncclComm* comm, struct ncclProxyArgs** argsptr) {
|
||||
struct ncclProxyState* state = &comm->proxyState;
|
||||
struct ncclProxyArgs* elem;
|
||||
pthread_mutex_lock(&state->poolMutex);
|
||||
if (state->pool == NULL) {
|
||||
// Allocate a new pool of elements
|
||||
struct ncclProxyPool* newPool;
|
||||
NCCLCHECK(ncclCalloc(&newPool, 1));
|
||||
struct ncclProxyArgs* newElems = newPool->elems;
|
||||
// Chain newly allocated elements
|
||||
for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) {
|
||||
if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1;
|
||||
// Check whether there are freed elements
|
||||
if (state->poolReturned) {
|
||||
pthread_mutex_lock(&state->poolMutex);
|
||||
state->pool = state->poolReturned;
|
||||
state->poolReturned = NULL;
|
||||
pthread_mutex_unlock(&state->poolMutex);
|
||||
} else {
|
||||
// Allocate a new pool of elements
|
||||
struct ncclProxyPool* newPool;
|
||||
NCCLCHECK(ncclCalloc(&newPool, 1));
|
||||
struct ncclProxyArgs* newElems = newPool->elems;
|
||||
// Chain newly allocated elements
|
||||
for (int i=0; i<PROXYARGS_ALLOCATE_SIZE; i++) {
|
||||
if (i+1 < PROXYARGS_ALLOCATE_SIZE) newElems[i].next = newElems+i+1;
|
||||
}
|
||||
// Add them all to the pool list
|
||||
state->pool = newElems;
|
||||
// Save the pool memory block for later resource release
|
||||
newPool->next = state->pools;
|
||||
state->pools = newPool;
|
||||
}
|
||||
// Add them all to the pool list
|
||||
state->pool = newElems;
|
||||
// Save the pool memory block for later resource release
|
||||
newPool->next = state->pools;
|
||||
state->pools = newPool;
|
||||
}
|
||||
elem = state->pool;
|
||||
state->pool = state->pool->next;
|
||||
pthread_mutex_unlock(&state->poolMutex);
|
||||
elem->next = elem->nextPeer = elem->nextGroup = NULL;
|
||||
elem->next = elem->nextPeer = NULL;
|
||||
*argsptr = elem;
|
||||
return ncclSuccess;
|
||||
}
|
||||
@@ -75,23 +80,18 @@ ncclResult_t dumpProxyState(struct ncclProxyState* state) {
|
||||
WARN("Active list loop at element %ld", OP_INDEX(op));
|
||||
}
|
||||
op->idle |= OP_SEEN;
|
||||
printf("[%ld]", OP_INDEX(op));
|
||||
printf("[%ld(%ld/%d)]", OP_INDEX(op), op->opCount, op->nsubs);
|
||||
if (op->nextPeer) {
|
||||
printf("(%ld)", OP_INDEX(op->nextPeer));
|
||||
struct ncclProxyArgs* n = op->nextPeer;
|
||||
n->idle |= OP_SEEN;
|
||||
while (n->nextGroup || n->nextPeer) {
|
||||
n = n->nextGroup ? n->nextGroup : n->nextPeer;
|
||||
while (n->nextPeer) {
|
||||
n = n->nextPeer;
|
||||
n->idle |= OP_SEEN;
|
||||
}
|
||||
}
|
||||
if (op->nextGroup) {
|
||||
printf("--G->");
|
||||
op = op->nextGroup;
|
||||
} else {
|
||||
printf("--N->");
|
||||
op = op->next;
|
||||
}
|
||||
printf("->");
|
||||
op = op->next;
|
||||
}
|
||||
printf("[X]\n");
|
||||
|
||||
@@ -128,44 +128,62 @@ ncclResult_t dumpProxyState(struct ncclProxyState* state) {
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
static ncclResult_t ProxyAppend(struct ncclProxyState* state, struct ncclProxyArgs* args, int shared) {
|
||||
static ncclResult_t ProxyAppend(struct ncclProxyState* state, struct ncclProxyArgs* args) {
|
||||
struct ncclProxyArgs* proxyAppend = *args->proxyAppendPtr;
|
||||
int shared = args->subs[0].connector->conn.shared;
|
||||
if (proxyAppend) {
|
||||
if (shared && proxyAppend->opCount == args->opCount) {
|
||||
if ((proxyAppend->sliceSteps != args->sliceSteps) ||
|
||||
(proxyAppend->chunkSteps != args->chunkSteps) ||
|
||||
(proxyAppend->protocol != args->protocol) ||
|
||||
(proxyAppend->dtype != args->dtype) ||
|
||||
(proxyAppend->redOp != args->redOp)) {
|
||||
WARN("Proxy append mismatch");
|
||||
return ncclInternalError;
|
||||
}
|
||||
if (proxyAppend->nsubs >= NCCL_PROXY_MAX_SUBS) {
|
||||
WARN("Proxy append out of bound");
|
||||
return ncclInternalError;
|
||||
}
|
||||
memcpy(proxyAppend->subs+proxyAppend->nsubs, args->subs, sizeof(struct ncclProxySubArgs));
|
||||
proxyAppend->nsubs++;
|
||||
args->next = proxyAppend->next;
|
||||
proxyAppend->next = NULL;
|
||||
proxyAppend->nextGroup = args;
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as group, prevGroup %5ld, next %5ld : \n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend), OP_INDEX(args->next));
|
||||
// Free args as we merged them
|
||||
args->next = state->poolFreed;
|
||||
state->poolFreed = args;
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as group with %5ld\n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend));
|
||||
} else {
|
||||
proxyAppend->nextPeer = args;
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld : \n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend));
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld/%5ld) as nextPeer of %5ld\n", OP_INDEX(args), shared, proxyAppend->opCount, args->opCount, OP_INDEX(proxyAppend));
|
||||
*(args->proxyAppendPtr) = args;
|
||||
}
|
||||
} else {
|
||||
// Nothing running for that peer. Add to the list
|
||||
if (state->ops == NULL) {
|
||||
// Create the list
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element : \n", OP_INDEX(args), shared, args->opCount);
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as first element\n", OP_INDEX(args), shared, args->opCount);
|
||||
state->ops = args;
|
||||
} else {
|
||||
// Append element at the end of the list
|
||||
struct ncclProxyArgs* last = state->ops;
|
||||
while (last->nextGroup || last->next) last = last->nextGroup ? last->nextGroup : last->next;
|
||||
while (last->next) last = last->next;
|
||||
last->next = args;
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element : \n", OP_INDEX(args),shared, args->opCount);
|
||||
DEBUG_PROXY_PRINT("Insert %5ld (%d/%5ld) as last element\n", OP_INDEX(args),shared, args->opCount);
|
||||
}
|
||||
*(args->proxyAppendPtr) = args;
|
||||
}
|
||||
*(args->proxyAppendPtr) = args;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) {
|
||||
static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args, int connIndex) {
|
||||
if (peer < 0) return ncclSuccess;
|
||||
|
||||
struct ncclPeer* peerComm = args->channel->peers+peer;
|
||||
struct ncclConnector* connector = type == proxyRecv ? &peerComm->recv : &peerComm->send;
|
||||
struct ncclChannel* channel = args->subs[0].channel;
|
||||
struct ncclPeer* peerComm = channel->peers+peer;
|
||||
struct ncclConnector* connector = type == proxyRecv ? peerComm->recv+connIndex : peerComm->send+connIndex;
|
||||
if (connector->transportComm == NULL) {
|
||||
WARN("[%d] Error no transport for %s peer %d on channel %d", connector->comm->rank,
|
||||
type == proxyRecv ? "recv" : "send", peer, args->channel->id);
|
||||
WARN("Rank %d has no transport for %s peer %d on channel %d", connector->comm->rank,
|
||||
type == proxyRecv ? "recv" : "send", peer, channel->id);
|
||||
return ncclInternalError;
|
||||
}
|
||||
if (connector->transportComm->proxy == NULL) return ncclSuccess;
|
||||
@@ -174,14 +192,10 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) {
|
||||
struct ncclProxyArgs* op;
|
||||
NCCLCHECK(allocateArgs(connector->comm, &op));
|
||||
memcpy(op, args, sizeof(struct ncclProxyArgs));
|
||||
op->connector = connector;
|
||||
op->subs[0].connector = connector;
|
||||
op->progress = connector->transportComm->proxy;
|
||||
op->state = ncclProxyOpReady;
|
||||
|
||||
op->proxyAppendPtr =
|
||||
connector->conn.shared ?
|
||||
state->sharedBuffs->proxyAppend+2*args->channel->id+type : // Shared buffers
|
||||
&connector->proxyAppend; // Dedicated buffers
|
||||
op->proxyAppendPtr = connector->proxyAppendPtr;
|
||||
|
||||
if (state->nextOps == NULL) state->nextOps = op;
|
||||
else state->nextOpsEnd->next = op;
|
||||
@@ -189,120 +203,131 @@ static ncclResult_t SaveProxy(int type, int peer, struct ncclProxyArgs* args) {
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int pattern, int root, int nranks) {
|
||||
ncclResult_t ncclProxySaveColl(struct ncclProxyArgs* args, int nranks) {
|
||||
struct ncclChannel* channel = args->subs[0].channel;
|
||||
int pattern = args->pattern;
|
||||
if (pattern == ncclPatternRing || pattern == ncclPatternRingTwice || pattern == ncclPatternPipelineFrom || pattern == ncclPatternPipelineTo) {
|
||||
struct ncclRing* ring = &args->channel->ring;
|
||||
if (NeedProxy(proxyRecv, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy(proxyRecv, ring->prev, args));
|
||||
if (NeedProxy(proxySend, pattern, root, ring, nranks)) NCCLCHECK(SaveProxy(proxySend, ring->next, args));
|
||||
struct ncclRing* ring = &channel->ring;
|
||||
if (NeedProxy(proxyRecv, pattern, args->root, ring, nranks)) NCCLCHECK(SaveProxy(proxyRecv, ring->prev, args, 0));
|
||||
if (NeedProxy(proxySend, pattern, args->root, ring, nranks)) NCCLCHECK(SaveProxy(proxySend, ring->next, args, 0));
|
||||
}
|
||||
if (pattern == ncclPatternTreeUp || pattern == ncclPatternTreeUpDown) {
|
||||
// Tree up
|
||||
struct ncclTree* tree = &args->channel->tree;
|
||||
for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxyRecv, tree->down[i], args));
|
||||
NCCLCHECK(SaveProxy(proxySend, tree->up, args));
|
||||
struct ncclTree* tree = &channel->tree;
|
||||
for (int i=0; i<NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxyRecv, tree->down[i], args, 0));
|
||||
NCCLCHECK(SaveProxy(proxySend, tree->up, args, 0));
|
||||
}
|
||||
if (pattern == ncclPatternTreeDown || pattern == ncclPatternTreeUpDown) {
|
||||
// Tree down
|
||||
struct ncclTree* tree = &args->channel->tree;
|
||||
for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxySend, tree->down[i], args));
|
||||
NCCLCHECK(SaveProxy(proxyRecv, tree->up, args));
|
||||
struct ncclTree* tree = &channel->tree;
|
||||
for (int i=0; i< NCCL_MAX_TREE_ARITY; i++) NCCLCHECK(SaveProxy(proxySend, tree->down[i], args, 0));
|
||||
NCCLCHECK(SaveProxy(proxyRecv, tree->up, args, 0));
|
||||
}
|
||||
if (pattern == ncclPatternCollTreeUp) {
|
||||
if (pattern == ncclPatternCollTreeUpDown) {
|
||||
// CollTree up
|
||||
struct ncclTree* tree = &args->channel->collTree;
|
||||
NCCLCHECK(SaveProxy(proxyRecv, tree->down[0], args));
|
||||
NCCLCHECK(SaveProxy(proxySend, tree->up, args));
|
||||
}
|
||||
if (pattern == ncclPatternCollTreeDown) {
|
||||
NCCLCHECK(SaveProxy(proxySend, channel->collTree.out, args, 1)); // For CollTree up, we are using push
|
||||
// CollTree down
|
||||
struct ncclTree* tree = &args->channel->collTree;
|
||||
NCCLCHECK(SaveProxy(proxySend, tree->down[0], args));
|
||||
NCCLCHECK(SaveProxy(proxyRecv, tree->up, args));
|
||||
NCCLCHECK(SaveProxy(proxyRecv, channel->collTree.out, args, 0));
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxySaveP2p(struct ncclInfo* info, struct ncclChannel* channel, int segment) {
|
||||
struct ncclProxyArgs args;
|
||||
memset(&args, 0, sizeof(struct ncclProxyArgs));
|
||||
args.channel = channel;
|
||||
args.sliceSteps = 1;
|
||||
args.chunkSteps = 1;
|
||||
args.protocol = NCCL_PROTO_SIMPLE;
|
||||
args.segment = segment;
|
||||
args.opCount = channel->workFifoTail-1;
|
||||
args.dtype = info->datatype;
|
||||
ncclResult_t ncclProxyComputeP2p(struct ncclInfo* info, struct ncclProxyArgs* args) {
|
||||
memset(args, 0, sizeof(struct ncclProxyArgs));
|
||||
int channelId = info->channelId;
|
||||
args->nsubs = 1;
|
||||
struct ncclProxySubArgs* sub = args->subs;
|
||||
|
||||
struct ncclChannel* channel = info->comm->channels+channelId;
|
||||
sub->channel = channel;
|
||||
args->sliceSteps = 1;
|
||||
args->chunkSteps = 1;
|
||||
args->protocol = NCCL_PROTO_SIMPLE;
|
||||
args->dtype = info->datatype;
|
||||
sub->delta = info->delta;
|
||||
sub->recvbytes = info->recvbytes;
|
||||
sub->sendbytes = info->sendbytes;
|
||||
|
||||
int stepSize = info->comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/SENDRECV_SLICEFACTOR;
|
||||
info->recvChunkSize = stepSize;
|
||||
info->sendChunkSize = stepSize;
|
||||
|
||||
if (info->delta > 0 && info->recvbytes >= 0) {
|
||||
int peerrecv = (info->comm->nRanks+info->comm->rank-info->delta)%info->comm->nRanks;
|
||||
args.nsteps = DIVUP(info->recvbytes, info->comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/SENDRECV_SLICEFACTOR);
|
||||
if (args.nsteps == 0) args.nsteps = 1;
|
||||
args.recvbytes = info->recvbytes;
|
||||
args.sendbytes = 0;
|
||||
NCCLCHECK(SaveProxy(proxyRecv, peerrecv, &args));
|
||||
if (channel->peers[peerrecv].recv[0].transportComm && channel->peers[peerrecv].recv[0].transportComm->proxy) {
|
||||
// Tune chunk size for the network
|
||||
if (info->recvbytes < stepSize) info->recvChunkSize /= 4;
|
||||
else if (info->recvbytes < 8*stepSize) info->recvChunkSize /= 2;
|
||||
}
|
||||
sub->recvChunkSize = info->recvChunkSize;
|
||||
}
|
||||
if (info->delta > 0 && info->sendbytes >= 0) {
|
||||
int peersend = (info->comm->rank+info->delta)%info->comm->nRanks;
|
||||
args.nsteps = DIVUP(info->sendbytes, info->comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS/SENDRECV_SLICEFACTOR);
|
||||
if (args.nsteps == 0) args.nsteps = 1;
|
||||
args.sendbytes = info->sendbytes;
|
||||
args.recvbytes = 0;
|
||||
NCCLCHECK(SaveProxy(proxySend, peersend, &args));
|
||||
if (channel->peers[peersend].send[0].transportComm && channel->peers[peersend].send[0].transportComm->proxy) {
|
||||
// Tune chunk size for the network
|
||||
if (info->sendbytes < stepSize) info->sendChunkSize /= 4;
|
||||
else if (info->sendbytes < 8*stepSize) info->sendChunkSize /= 2;
|
||||
}
|
||||
sub->sendChunkSize = info->sendChunkSize;
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
static ncclResult_t removeOp(struct ncclProxyState* state, struct ncclProxyArgs** opPtr, struct ncclProxyArgs** prevOpPtr, struct ncclProxyArgs** prevGroupPtr) {
|
||||
ncclResult_t ncclProxySaveP2p(struct ncclComm* comm, struct ncclProxyArgs* args) {
|
||||
struct ncclProxySubArgs* sub = args->subs;
|
||||
struct ncclChannel* channel = sub->channel;
|
||||
args->opCount = channel->workFifoTail-1;
|
||||
args->commOpCount = comm->opCount;
|
||||
const ssize_t recvbytesOrig = sub->recvbytes;
|
||||
const ssize_t sendbytesOrig = sub->sendbytes;
|
||||
if (sub->delta > 0 && recvbytesOrig >= ssize_t(0)) {
|
||||
int peerrecv = (comm->nRanks+comm->rank-sub->delta)%comm->nRanks;
|
||||
sub->recvbytes = recvbytesOrig;
|
||||
sub->sendbytes = 0;
|
||||
sub->nsteps = DIVUP(sub->recvbytes, sub->recvChunkSize);
|
||||
if (sub->nsteps == 0) sub->nsteps = 1;
|
||||
NCCLCHECK(SaveProxy(proxyRecv, peerrecv, args, 0));
|
||||
}
|
||||
if (sub->delta > 0 && sendbytesOrig >= ssize_t(0)) {
|
||||
int peersend = (comm->rank+sub->delta)%comm->nRanks;
|
||||
sub->sendbytes = sendbytesOrig;
|
||||
sub->recvbytes = 0;
|
||||
sub->nsteps = DIVUP(sub->sendbytes, sub->sendChunkSize);
|
||||
if (sub->nsteps == 0) sub->nsteps = 1;
|
||||
NCCLCHECK(SaveProxy(proxySend, peersend, args, 0));
|
||||
}
|
||||
// Reset proxy args for potentially multiple cuda graph launches
|
||||
// It is safe as long as SaveProxy copies contents of args to op
|
||||
sub->recvbytes = recvbytesOrig;
|
||||
sub->sendbytes = sendbytesOrig;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
static ncclResult_t removeOp(struct ncclProxyState* state, struct ncclProxyArgs** opPtr, struct ncclProxyArgs** prevOpPtr) {
|
||||
struct ncclProxyArgs* freeOp = *opPtr;
|
||||
DEBUG_PROXY_PRINT("Remove %ld/%ld -> %ld -> %ld/%ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(*prevGroupPtr), OP_INDEX(freeOp), OP_INDEX(freeOp->next), OP_INDEX(freeOp->nextGroup));
|
||||
if (*prevGroupPtr && *prevOpPtr) return ncclInternalError;
|
||||
if (freeOp->nextGroup) {
|
||||
// Part of a group : remove the element
|
||||
struct ncclProxyArgs* next = freeOp->nextGroup;
|
||||
*opPtr = next;
|
||||
if (*prevGroupPtr) {
|
||||
(*prevGroupPtr)->nextGroup = next;
|
||||
} else if (*prevOpPtr) {
|
||||
DEBUG_PROXY_PRINT("Remove %ld -> %ld -> %ld\n", OP_INDEX(*prevOpPtr), OP_INDEX(freeOp), OP_INDEX(freeOp->next));
|
||||
struct ncclProxyArgs* next = freeOp->next;
|
||||
*opPtr = next;
|
||||
if (freeOp->nextPeer) {
|
||||
// replace op by nextPeer
|
||||
struct ncclProxyArgs* nextPeer = freeOp->nextPeer;
|
||||
if (*prevOpPtr) {
|
||||
(*prevOpPtr)->next = nextPeer;
|
||||
} else {
|
||||
state->ops = nextPeer;
|
||||
}
|
||||
nextPeer->next = next;
|
||||
*(prevOpPtr) = nextPeer;
|
||||
} else {
|
||||
*(freeOp->proxyAppendPtr) = NULL;
|
||||
if (*prevOpPtr) {
|
||||
(*prevOpPtr)->next = next;
|
||||
} else {
|
||||
state->ops = next;
|
||||
}
|
||||
} else {
|
||||
struct ncclProxyArgs* next = freeOp->next;
|
||||
*opPtr = next;
|
||||
if ((*prevGroupPtr)) {
|
||||
(*prevGroupPtr)->next = next;
|
||||
(*prevGroupPtr)->nextGroup = NULL;
|
||||
(*prevGroupPtr)->nextPeer = freeOp->nextPeer;
|
||||
if (*(freeOp->proxyAppendPtr) == freeOp) *(freeOp->proxyAppendPtr) = *prevGroupPtr;
|
||||
(*prevOpPtr) = *prevGroupPtr;
|
||||
(*prevGroupPtr) = NULL;
|
||||
} else {
|
||||
if (freeOp->nextPeer) {
|
||||
// replace op by nextPeer
|
||||
struct ncclProxyArgs* nextPeer = freeOp->nextPeer;
|
||||
if (*prevOpPtr) {
|
||||
(*prevOpPtr)->next = nextPeer;
|
||||
} else {
|
||||
state->ops = nextPeer;
|
||||
}
|
||||
struct ncclProxyArgs* lastGroup = nextPeer;
|
||||
while (lastGroup->nextGroup) lastGroup = lastGroup->nextGroup;
|
||||
lastGroup->next = next;
|
||||
*(prevOpPtr) = lastGroup;
|
||||
} else {
|
||||
*(freeOp->proxyAppendPtr) = NULL;
|
||||
if (*prevOpPtr) {
|
||||
(*prevOpPtr)->next = next;
|
||||
} else {
|
||||
state->ops = next;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
pthread_mutex_lock(&state->poolMutex);
|
||||
freeOp->next = state->pool;
|
||||
state->pool = freeOp;
|
||||
pthread_mutex_unlock(&state->poolMutex);
|
||||
freeOp->next = state->poolFreed;
|
||||
state->poolFreed = freeOp;
|
||||
DEBUG_PROXY_PRINT("Removed %5ld (%5ld) : ", OP_INDEX(freeOp), OP_INDEX(*freeOp->proxyAppendPtr));
|
||||
NCCLCHECK(dumpProxyState(state));
|
||||
return ncclSuccess;
|
||||
@@ -310,33 +335,81 @@ static ncclResult_t removeOp(struct ncclProxyState* state, struct ncclProxyArgs*
|
||||
|
||||
static ncclResult_t progressOps(struct ncclProxyState* state, struct ncclProxyArgs** opsPtr, int* idle, struct ncclComm* comm) {
|
||||
struct ncclProxyArgs* prevOp = NULL;
|
||||
struct ncclProxyArgs* prevGroup = NULL;
|
||||
struct ncclProxyArgs* op = *opsPtr;
|
||||
while (op) {
|
||||
if (op->state == ncclProxyOpNone) return ncclInternalError;
|
||||
// opCount >= lastOpCount are part of an ongoing GroupStart/GroupEnd that hasn't started
|
||||
// yet and might be cancelled before they even start. Hold on on those.
|
||||
if (op->opCount < comm->lastOpCount) {
|
||||
NCCLCHECK(op->progress(op));
|
||||
*idle &= op->idle;
|
||||
}
|
||||
NCCLCHECK(op->progress(op));
|
||||
*idle &= op->idle;
|
||||
if (op->state == ncclProxyOpNone) {
|
||||
NCCLCHECK(removeOp(state, &op, &prevOp, &prevGroup));
|
||||
NCCLCHECK(removeOp(state, &op, &prevOp));
|
||||
} else {
|
||||
if (op->nextGroup) {
|
||||
prevGroup = op;
|
||||
prevOp = NULL;
|
||||
op = op->nextGroup;
|
||||
} else {
|
||||
prevOp = op;
|
||||
prevGroup = NULL;
|
||||
op = op->next;
|
||||
}
|
||||
prevOp = op;
|
||||
op = op->next;
|
||||
}
|
||||
}
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxyAppendPosted(struct ncclProxyState* state) {
|
||||
// Return any freed element first
|
||||
if (state->poolFreed) {
|
||||
struct ncclProxyArgs* end = state->poolFreed;
|
||||
while (end->next) end = end->next;
|
||||
pthread_mutex_lock(&state->poolMutex);
|
||||
end->next = state->poolReturned;
|
||||
state->poolReturned = state->poolFreed;
|
||||
pthread_mutex_unlock(&state->poolMutex);
|
||||
state->poolFreed = NULL;
|
||||
}
|
||||
|
||||
// Then wait until we have new work to do
|
||||
pthread_mutex_lock(&state->opsMutex);
|
||||
while (state->postedOps == NULL) {
|
||||
if (state->stop) return ncclSuccess;
|
||||
pthread_cond_wait(&state->cond, &state->opsMutex);
|
||||
}
|
||||
|
||||
// Sort operations as we append them : collectives and
|
||||
// receives first, then sends.
|
||||
|
||||
struct ncclProxyArgs* next, *prev = NULL, *op = state->postedOps;
|
||||
int commOpCount = op->commOpCount;
|
||||
while (op && op->commOpCount == commOpCount) {
|
||||
next = op->next;
|
||||
if (op->subs[0].sendbytes) {
|
||||
if (prev) prev->next = next;
|
||||
else state->postedOps = next;
|
||||
op->next = NULL;
|
||||
NCCLCHECK(ProxyAppend(state, op));
|
||||
} else prev = op;
|
||||
op = next;
|
||||
}
|
||||
op = state->postedOps;
|
||||
while (op && op->commOpCount == commOpCount) {
|
||||
next = op->next;
|
||||
op->next = NULL;
|
||||
NCCLCHECK(ProxyAppend(state, op));
|
||||
op = next;
|
||||
}
|
||||
state->postedOps = op;
|
||||
if (op == NULL) state->postedOpsEnd = NULL;
|
||||
NCCLCHECK(dumpProxyState(state));
|
||||
pthread_mutex_unlock(&state->opsMutex);
|
||||
|
||||
if (state->poolFreed) {
|
||||
struct ncclProxyArgs* end = state->poolFreed;
|
||||
while (end->next) end = end->next;
|
||||
pthread_mutex_lock(&state->poolMutex);
|
||||
end->next = state->poolReturned;
|
||||
state->poolReturned = state->poolFreed;
|
||||
pthread_mutex_unlock(&state->poolMutex);
|
||||
state->poolFreed = NULL;
|
||||
}
|
||||
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
|
||||
void* persistentThread(void *comm_) {
|
||||
struct ncclComm* comm = (struct ncclComm*)comm_;
|
||||
struct ncclProxyState* state = &comm->proxyState;
|
||||
@@ -344,158 +417,95 @@ void* persistentThread(void *comm_) {
|
||||
sprintf(threadName, "NCCLproxy %5d", comm->rank);
|
||||
nvtxNameOsThreadA(syscall(SYS_gettid), threadName);
|
||||
|
||||
pthread_mutex_lock(&state->opsMutex);
|
||||
struct ncclProxyArgs** opsPtr = &state->ops;
|
||||
while (1) {
|
||||
if (*comm->abortFlag) {
|
||||
pthread_mutex_unlock(&state->opsMutex);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
while (*opsPtr == NULL) {
|
||||
if (state->stop) {
|
||||
// No more commands to process and proxy has been requested to stop
|
||||
pthread_mutex_unlock(&state->opsMutex);
|
||||
return NULL;
|
||||
}
|
||||
pthread_cond_wait(&state->cond, &state->opsMutex);
|
||||
ncclResult_t ret = ncclProxyAppendPosted(state);
|
||||
if (ret != ncclSuccess) {
|
||||
comm->fatalError = ret;
|
||||
INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
int idle = 1;
|
||||
ncclResult_t ret = progressOps(state, opsPtr, &idle, comm);
|
||||
if (ret != ncclSuccess) {
|
||||
comm->fatalError = ret;
|
||||
INFO(NCCL_ALL,"%s:%d -> %d [Proxy Thread]", __FILE__, __LINE__, ret);
|
||||
pthread_mutex_unlock(&state->opsMutex);
|
||||
return NULL;
|
||||
}
|
||||
if (idle) {
|
||||
pthread_mutex_unlock(&state->opsMutex);
|
||||
sched_yield(); // No request progressed. Let others run.
|
||||
pthread_mutex_lock(&state->opsMutex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxyStart(struct ncclComm* comm) {
|
||||
struct ncclProxyState* state = &comm->proxyState;
|
||||
if (state->nextOps == NULL) return ncclSuccess;
|
||||
pthread_mutex_lock(&state->opsMutex);
|
||||
|
||||
// Sort operations as we append them : collectives and
|
||||
// receives first, then sends.
|
||||
ncclProxyArgs* next, *prev = NULL, *op = state->nextOps;
|
||||
while (op) {
|
||||
next = op->next;
|
||||
if (op->sendbytes) {
|
||||
if (prev) prev->next = next;
|
||||
else state->nextOps = next;
|
||||
op->next = NULL;
|
||||
NCCLCHECK(ProxyAppend(state, op, op->connector->conn.shared));
|
||||
} else prev = op;
|
||||
op = next;
|
||||
}
|
||||
op = state->nextOps;
|
||||
while (op) {
|
||||
next = op->next;
|
||||
op->next = NULL;
|
||||
NCCLCHECK(ProxyAppend(state, op, op->connector->conn.shared));
|
||||
op = next;
|
||||
}
|
||||
if (state->postedOps) state->postedOpsEnd->next = state->nextOps;
|
||||
else state->postedOps = state->nextOps;
|
||||
state->postedOpsEnd = state->nextOpsEnd;
|
||||
state->nextOps = state->nextOpsEnd = NULL;
|
||||
NCCLCHECK(dumpProxyState(state));
|
||||
|
||||
if (state->ops != NULL)
|
||||
pthread_cond_signal(&state->cond);
|
||||
pthread_cond_signal(&state->cond);
|
||||
pthread_mutex_unlock(&state->opsMutex);
|
||||
comm->opCount++;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
NCCL_PARAM(ProxySharedBuffersCount, "SHARED_BUFF_COUNT", -2);
|
||||
|
||||
ncclResult_t ncclProxySharedBuffersInit(struct ncclComm* comm, int cuda, int* size, char** ptr) {
|
||||
struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs;
|
||||
if (state == NULL) {
|
||||
NCCLCHECK(ncclCalloc(&state, 1));
|
||||
comm->proxyState.sharedBuffs = state;
|
||||
state->nslots = ncclParamProxySharedBuffersCount();
|
||||
if (state->nslots == -2) {
|
||||
state->nslots = NCCL_STEPS*NCCL_MAX_WORK_ELEMENTS;
|
||||
}
|
||||
state->slotSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/(NCCL_STEPS*SENDRECV_SLICEFACTOR);
|
||||
struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs;
|
||||
if (state->size == 0) {
|
||||
int p2pnChannels = 1;
|
||||
while (p2pnChannels < comm->nChannels) p2pnChannels *= 2;
|
||||
int p2pSize = 2*p2pnChannels*NCCL_MAX_WORK_ELEMENTS*comm->buffSizes[NCCL_PROTO_SIMPLE]/SENDRECV_SLICEFACTOR;
|
||||
int collNetSize = 2*comm->nChannels*comm->buffSizes[NCCL_PROTO_SIMPLE];
|
||||
state->size = std::max(p2pSize, collNetSize);
|
||||
}
|
||||
|
||||
char* buff;
|
||||
int* used;
|
||||
*size = 2*comm->p2pnChannels*state->slotSize*state->nslots;
|
||||
*size = state->size;
|
||||
|
||||
if (cuda && state->cudaBuff[0] == NULL) {
|
||||
NCCLCHECK(ncclCudaCalloc(&buff, *size));
|
||||
NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots));
|
||||
for (int i=0; i<2*comm->p2pnChannels; i++) {
|
||||
state->cudaBuff[i] = buff + state->nslots*state->slotSize*i;
|
||||
state->cudaUsed[i] = used + state->nslots*i;
|
||||
}
|
||||
} else if (state->hostBuff[0] == NULL) {
|
||||
NCCLCHECK(ncclCudaHostCalloc(&buff, *size));
|
||||
NCCLCHECK(ncclCalloc(&used, 2*comm->p2pnChannels*state->nslots));
|
||||
for (int i=0; i<2*comm->p2pnChannels; i++) {
|
||||
state->hostBuff[i] = buff + state->nslots*state->slotSize*i;
|
||||
state->hostUsed[i] = used + state->nslots*i;
|
||||
}
|
||||
if (cuda && state->cudaBuff == NULL) {
|
||||
NCCLCHECK(ncclCudaCalloc(&state->cudaBuff, *size));
|
||||
} else if (state->hostBuff == NULL) {
|
||||
NCCLCHECK(ncclCudaHostCalloc(&state->hostBuff, *size));
|
||||
}
|
||||
buff = cuda ? state->cudaBuff[0] : state->hostBuff[0];
|
||||
|
||||
*ptr = buff;
|
||||
*ptr = cuda ? state->cudaBuff : state->hostBuff;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxySharedBuffersAlloc(struct ncclComm* comm, int cuda, int type, int channel, int size, char** ptr) {
|
||||
struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs;
|
||||
// Use different pools for different channels and also separate send/recv.
|
||||
int p = 2*channel+type;
|
||||
int* used = cuda ? state->cudaUsed[p] : state->hostUsed[p];
|
||||
char* buff = cuda ? state->cudaBuff[p] : state->hostBuff[p];
|
||||
if (buff == NULL) return ncclInternalError;
|
||||
int nslots = 1;
|
||||
while (nslots*state->slotSize < size) nslots *= 2;
|
||||
for (int s=0; s<state->nslots; s+=nslots) {
|
||||
int u = 0;
|
||||
for (int i=0; i<nslots; i++) u += used[s+i];
|
||||
if (u == 0) {
|
||||
for (int i=0; i<nslots; i++) used[s+i] = 1;
|
||||
*ptr = buff+state->slotSize*s;
|
||||
return ncclSuccess;
|
||||
}
|
||||
}
|
||||
*ptr = NULL;
|
||||
ncclResult_t ncclProxySharedBuffersGetP2p(struct ncclComm* comm, int cuda, int type, int channel, int slot, int index, char** ptr) {
|
||||
struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs;
|
||||
// Use different pools for separate send/recv.
|
||||
char* buff = cuda ? state->cudaBuff : state->hostBuff;
|
||||
int slotSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/(NCCL_STEPS*SENDRECV_SLICEFACTOR);
|
||||
int globalSlot = (((type*comm->p2pnChannels+channel)*NCCL_STEPS)+slot)*NCCL_MAX_WORK_ELEMENTS+index;
|
||||
*ptr = buff + slotSize * globalSlot;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxySharedBuffersFree(struct ncclComm* comm, int cuda, int type, int channel, int size, char* ptr) {
|
||||
struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs;
|
||||
int p = 2*channel+type;
|
||||
int* used = cuda ? state->cudaUsed[p] : state->hostUsed[p];
|
||||
char* buff = cuda ? state->cudaBuff[p] : state->hostBuff[p];
|
||||
if (buff == NULL) return ncclInternalError;
|
||||
int nslots = 1;
|
||||
while (nslots*state->slotSize < size) nslots *= 2;
|
||||
int s = (ptr-buff)/state->slotSize;
|
||||
if (s < 0 || s+nslots > state->nslots) {
|
||||
WARN("Error freeing shared buffer : freeing ptr %p size %d (start %p slot size %d nslots %d)", ptr, size, buff, state->slotSize, state->nslots);
|
||||
return ncclInternalError;
|
||||
}
|
||||
for (int i=0; i<nslots; i++) used[s+i] = 0;
|
||||
ncclResult_t ncclProxySharedBuffersGetCollNet(struct ncclComm* comm, int cuda, int type, int slot, int channel, char** ptr) {
|
||||
struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs;
|
||||
// Use different pools for different channels.
|
||||
char* buff = cuda ? state->cudaBuff : state->hostBuff;
|
||||
int slotSize = comm->buffSizes[NCCL_PROTO_SIMPLE]/NCCL_STEPS;
|
||||
int globalSlot = (type*NCCL_STEPS+slot)*comm->nChannels+channel;
|
||||
*ptr = buff + slotSize * globalSlot;
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
ncclResult_t ncclProxySharedBuffersDestroy(struct ncclComm* comm) {
|
||||
struct ncclProxySharedBuffers* state = comm->proxyState.sharedBuffs;
|
||||
if (state) {
|
||||
CUDACHECK(cudaFree(state->cudaBuff[0]));
|
||||
free(state->cudaUsed[0]);
|
||||
NCCLCHECK(ncclCudaHostFree(state->hostBuff[0]));
|
||||
free(state->hostUsed[0]);
|
||||
free(state);
|
||||
}
|
||||
struct ncclProxySharedBuffers* state = &comm->proxyState.sharedBuffs;
|
||||
CUDACHECK(cudaFree(state->cudaBuff));
|
||||
NCCLCHECK(ncclCudaHostFree(state->hostBuff));
|
||||
return ncclSuccess;
|
||||
}
|
||||
|
||||
|
||||
Ссылка в новой задаче
Block a user