UnitTest: add test cases for 2.14 API (ncclCommInitRankConfig and ncclCommFinalize for non-blocking communicator) (#674)
[ROCm/rccl commit: fddb5e6be8]
This commit is contained in:
committed by
GitHub
vanhempi
847899dd7e
commit
f7982e9bed
@@ -0,0 +1,64 @@
|
||||
/*************************************************************************
|
||||
* Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved.
|
||||
*
|
||||
* See LICENSE.txt for license information
|
||||
************************************************************************/
|
||||
#include "TestBed.hpp"
|
||||
namespace RcclUnitTesting
|
||||
{
|
||||
TEST(AllReduce, NonBlocking)
|
||||
{
|
||||
TestBed testBed;
|
||||
// Configuration
|
||||
ncclFunc_t const funcType = ncclCollAllReduce;
|
||||
std::vector<ncclDataType_t> const& dataTypes = {ncclFloat};
|
||||
std::vector<ncclRedOp_t> const& redOps = {ncclSum};
|
||||
std::vector<int> const numElements = {1048576};
|
||||
bool const inPlace = false;
|
||||
bool const useManagedMem = false;
|
||||
bool const useBlocking = false;
|
||||
|
||||
OptionalColArgs options;
|
||||
// Terminate the test as soon as first failure occurs
|
||||
bool isCorrect = true;
|
||||
for (int totalRanks = testBed.ev.minGpus; totalRanks <= testBed.ev.maxGpus && isCorrect; ++totalRanks)
|
||||
for (int isMultiProcess = 0; isMultiProcess <= 1 && isCorrect; ++isMultiProcess)
|
||||
{
|
||||
if (!(testBed.ev.processMask & (1 << isMultiProcess))) continue;
|
||||
|
||||
// Test either single process all GPUs, or 1 process per GPU
|
||||
int const numProcesses = isMultiProcess ? totalRanks : 1;
|
||||
testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1, useBlocking);
|
||||
|
||||
for (int redOpIdx = 0; redOpIdx < redOps.size() && isCorrect; ++redOpIdx)
|
||||
{
|
||||
options.redOp = redOps[redOpIdx];
|
||||
for (int dataIdx = 0; dataIdx < dataTypes.size() && isCorrect; ++dataIdx)
|
||||
{
|
||||
if (testBed.ev.showNames)
|
||||
INFO("%s %d-ranks AllReduce %s Blocking Config (%s-%s)\n",
|
||||
isMultiProcess ? "MP" : "SP",
|
||||
totalRanks, useBlocking ? "true" : "false",
|
||||
ncclRedOpNames[redOps[redOpIdx]], ncclDataTypeNames[dataTypes[dataIdx]]);
|
||||
|
||||
|
||||
for (int numIdx = 0; numIdx < numElements.size() && isCorrect; ++numIdx)
|
||||
{
|
||||
testBed.SetCollectiveArgs(funcType,
|
||||
dataTypes[dataIdx],
|
||||
numElements[numIdx],
|
||||
numElements[numIdx],
|
||||
options);
|
||||
}
|
||||
testBed.AllocateMem(inPlace, useManagedMem);
|
||||
testBed.PrepareData();
|
||||
testBed.ExecuteCollectives();
|
||||
testBed.ValidateResults(isCorrect);
|
||||
testBed.DeallocateMem();
|
||||
}
|
||||
}
|
||||
testBed.DestroyComms();
|
||||
}
|
||||
testBed.Finalize();
|
||||
}
|
||||
}
|
||||
@@ -46,6 +46,7 @@ if(BUILD_TESTS)
|
||||
set(TEST_SOURCE_FILES
|
||||
AllReduce_Clique.cpp
|
||||
AllReduce_GroupCall.cpp
|
||||
AllReduce_NonBlockingConf.cpp
|
||||
AllReduce_InPlace.cpp
|
||||
AllReduce_ManagedMem.cpp
|
||||
AllReduce_OutOfPlace.cpp
|
||||
@@ -57,6 +58,7 @@ if(BUILD_TESTS)
|
||||
#AllReduce
|
||||
AllReduce_Clique.cpp
|
||||
AllReduce_GroupCall.cpp
|
||||
AllReduce_NonBlockingConf.cpp
|
||||
AllReduce_InPlace.cpp
|
||||
AllReduce_ManagedMem.cpp
|
||||
AllReduce_OutOfPlace.cpp
|
||||
|
||||
@@ -85,12 +85,13 @@ namespace RcclUnitTesting
|
||||
}
|
||||
|
||||
void TestBed::InitComms(std::vector<std::vector<int>> const& deviceIdsPerProcess,
|
||||
int const numCollectivesInGroup)
|
||||
int const numCollectivesInGroup, bool const useBlocking)
|
||||
{
|
||||
// Count up the total number of GPUs to use and track child/deviceId per rank
|
||||
this->numActiveChildren = deviceIdsPerProcess.size();
|
||||
this->numActiveRanks = 0;
|
||||
this->numCollectivesInGroup = numCollectivesInGroup;
|
||||
this->useBlocking = useBlocking;
|
||||
this->rankToChildMap.clear();
|
||||
this->rankToDeviceMap.clear();
|
||||
if (ev.verbose) INFO("Setting up %d active child processes\n", this->numActiveChildren);
|
||||
@@ -139,6 +140,9 @@ namespace RcclUnitTesting
|
||||
// Send the number of collectives to be run per group call
|
||||
PIPE_WRITE(childId, numCollectivesInGroup);
|
||||
|
||||
// Send the RCCL communication with blocking or non-blocking option
|
||||
PIPE_WRITE(childId, useBlocking);
|
||||
|
||||
// Send whether to use MultiRank interfaces or not.
|
||||
PIPE_WRITE(childId, useMulti);
|
||||
|
||||
@@ -159,9 +163,9 @@ namespace RcclUnitTesting
|
||||
}
|
||||
}
|
||||
|
||||
void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup)
|
||||
void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, bool const useBlocking)
|
||||
{
|
||||
InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup);
|
||||
InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup, useBlocking);
|
||||
}
|
||||
|
||||
void TestBed::SetCollectiveArgs(ncclFunc_t const funcType,
|
||||
|
||||
@@ -25,7 +25,7 @@ namespace RcclUnitTesting
|
||||
int numActiveChildren; // List of active children (with usable RCCL comms)
|
||||
int numActiveRanks; // Current # of ranks in use
|
||||
int numCollectivesInGroup; // # of collectives to execute per group call
|
||||
|
||||
bool useBlocking; // RCCL communication with blocking or non-blocking option
|
||||
EnvVars ev; // Environment variables
|
||||
|
||||
// Constructor - Creates one child process per detected GPU device that waits for further commands
|
||||
@@ -33,11 +33,11 @@ namespace RcclUnitTesting
|
||||
|
||||
// Prepare TestBed for use with GPUs across multiple child processes
|
||||
void InitComms(std::vector<std::vector<int>> const& deviceIdsPerChild,
|
||||
int const numCollectivesInGroup = 1);
|
||||
int const numCollectivesInGroup = 1, bool const useBlocking = true);
|
||||
|
||||
// Prepare TestBed for use with GPUs on a single child process
|
||||
void InitComms(int const numGpus,
|
||||
int const numCollectivesInGroup = 1);
|
||||
void InitComms(int const numGpus,
|
||||
int const numCollectivesInGroup = 1, bool const useBlocking = true);
|
||||
|
||||
// Set collectives arguments for specified collective / rank
|
||||
// Setting scalarsPerRank to non-null will create custom reduction operator
|
||||
|
||||
@@ -20,6 +20,24 @@
|
||||
} \
|
||||
}
|
||||
|
||||
#define CHILD_NCCL_CALL_NON_BLOCKING(msg, localRank) \
|
||||
{ \
|
||||
unsigned long int loop_counter = 0; \
|
||||
ncclResult_t ncclAsyncErr; \
|
||||
loop_counter = 0; \
|
||||
do \
|
||||
{ \
|
||||
loop_counter++; \
|
||||
if (loop_counter == MAX_LOOP_COUNTER) break; \
|
||||
ncclCommGetAsyncError(this->comms[localRank], &ncclAsyncErr); \
|
||||
} while(ncclAsyncErr == ncclInProgress); \
|
||||
if (ncclAsyncErr != ncclSuccess) \
|
||||
{ \
|
||||
ERROR("Child process %d fails NCCL call %s with code %d\n", this->childId, msg, ncclAsyncErr); \
|
||||
return TEST_FAIL; \
|
||||
} \
|
||||
}
|
||||
|
||||
#define PIPE_READ(val) \
|
||||
if (read(childReadFd, &val, sizeof(val)) != sizeof(val)) return TEST_FAIL;
|
||||
|
||||
@@ -126,6 +144,7 @@ namespace RcclUnitTesting
|
||||
PIPE_READ(this->totalRanks);
|
||||
PIPE_READ(this->rankOffset);
|
||||
PIPE_READ(this->numCollectivesInGroup);
|
||||
PIPE_READ(this->useBlocking);
|
||||
bool useMultiRankPerGpu;
|
||||
PIPE_READ(useMultiRankPerGpu);
|
||||
|
||||
@@ -177,6 +196,14 @@ namespace RcclUnitTesting
|
||||
break;
|
||||
}
|
||||
}
|
||||
else if (this->useBlocking == false)
|
||||
{
|
||||
// When non-blocking communicator is desired call ncclCommInitRankConfig with appropriate flag
|
||||
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
|
||||
config.blocking = 0;
|
||||
ncclCommInitRankConfig(&this->comms[localRank], this->totalRanks, id, globalRank, &config);
|
||||
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorInitRankConfig", localRank);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (ncclCommInitRank(&this->comms[localRank], this->totalRanks, id, globalRank) != ncclSuccess)
|
||||
@@ -187,10 +214,29 @@ namespace RcclUnitTesting
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (status == TEST_SUCCESS)
|
||||
{
|
||||
CHILD_NCCL_CALL(ncclGroupEnd(), "ncclGroupStart");
|
||||
{
|
||||
// Check if the communicator is non-blocking
|
||||
if (this->useBlocking == false)
|
||||
{
|
||||
// handle the ncclGroupEnd in case of non-blocking communication
|
||||
ncclResult_t Group_End_state = ncclGroupEnd();
|
||||
if (Group_End_state != ncclSuccess)
|
||||
{
|
||||
for (int localRank = 0; localRank < numGpus; ++localRank)
|
||||
{
|
||||
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorGroupEnd", localRank);
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
// In case of blocking communication just call ncclGroupEnd
|
||||
CHILD_NCCL_CALL(ncclGroupEnd(), "ncclGroupEnd");
|
||||
}
|
||||
}
|
||||
|
||||
if (this->verbose) INFO("Child %d finishes InitComms() [%s]\n",
|
||||
this->childId, status == TEST_SUCCESS ? "SUCCESS" : "FAIL");
|
||||
return status;
|
||||
@@ -520,11 +566,31 @@ namespace RcclUnitTesting
|
||||
ERROR("Unknown func type %d\n", collArg.funcType);
|
||||
return TEST_FAIL;
|
||||
}
|
||||
if (this->useBlocking == false)
|
||||
{
|
||||
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorExecuteCollectives", localRank);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
// End group call
|
||||
if (this->useBlocking == false)
|
||||
{
|
||||
// handle the ncclGroupEnd in case of non-blocking communication
|
||||
ncclResult_t Group_End_state = ncclGroupEnd();
|
||||
if (Group_End_state != ncclSuccess)
|
||||
{
|
||||
for (int localRank = 0; localRank < this->comms.size(); ++localRank)
|
||||
{
|
||||
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorGroupEnd", localRank);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// End group call
|
||||
CHILD_NCCL_CALL(ncclGroupEnd(), "ncclGroupEnd");
|
||||
else
|
||||
{
|
||||
// In case of blocking communication just call ncclGroupEnd
|
||||
CHILD_NCCL_CALL(ncclGroupEnd(), "ncclGroupEnd");
|
||||
}
|
||||
|
||||
// Instantiate and launch HIP graph if requested
|
||||
if (useHipGraph)
|
||||
@@ -680,6 +746,22 @@ namespace RcclUnitTesting
|
||||
if (this->verbose) INFO("Child %d begins DestroyComms\n", this->childId);
|
||||
|
||||
// Release comms
|
||||
for (int i = 0; i < this->comms.size(); ++i)
|
||||
{
|
||||
// Check if the communicator is non-blocking
|
||||
if (this->useBlocking == false)
|
||||
{
|
||||
// handle the non-blocking case
|
||||
ncclCommFinalize(this->comms[i]);
|
||||
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorCommFinalize", i);
|
||||
}
|
||||
else
|
||||
{
|
||||
// In case of blocking just call Finalize
|
||||
CHILD_NCCL_CALL(ncclCommFinalize(this->comms[i]), "ncclCommFinalize");
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < this->comms.size(); ++i)
|
||||
{
|
||||
CHILD_NCCL_CALL(ncclCommDestroy(this->comms[i]), "ncclCommDestroy");
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
#include "rccl/rccl.h"
|
||||
|
||||
#define MAX_RANKS 32
|
||||
#define MAX_LOOP_COUNTER 400000000000
|
||||
namespace RcclUnitTesting
|
||||
{
|
||||
class TestBedChild
|
||||
@@ -63,6 +64,7 @@ namespace RcclUnitTesting
|
||||
int totalRanks; // Total ranks
|
||||
int rankOffset; // Global rank offset for this child
|
||||
int numCollectivesInGroup; // # of collectives to run per group call
|
||||
bool useBlocking; // RCCL communication with blocking or non-blocking option
|
||||
std::vector<ncclComm_t> comms; // RCCL communicators for each rank
|
||||
std::vector<int> deviceIds; // Device IDs for each rank
|
||||
std::vector<hipStream_t> streams; // Streams for executing collectives
|
||||
|
||||
Viittaa uudesa ongelmassa
Block a user