diff --git a/test/AllReduce_NonBlockingConf.cpp b/test/AllReduce_NonBlockingConf.cpp new file mode 100644 index 0000000000..2730b719c3 --- /dev/null +++ b/test/AllReduce_NonBlockingConf.cpp @@ -0,0 +1,64 @@ +/************************************************************************* + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "TestBed.hpp" +namespace RcclUnitTesting +{ + TEST(AllReduce, NonBlocking) + { + TestBed testBed; + // Configuration + ncclFunc_t const funcType = ncclCollAllReduce; + std::vector const& dataTypes = {ncclFloat}; + std::vector const& redOps = {ncclSum}; + std::vector const numElements = {1048576, 1024}; + bool const inPlace = false; + bool const useManagedMem = false; + bool const useBlocking = false; + + OptionalColArgs options; + // Terminate the test as soon as first failure occurs + bool isCorrect = true; + for (int totalRanks = testBed.ev.minGpus; totalRanks <= testBed.ev.maxGpus && isCorrect; ++totalRanks) + for (int isMultiProcess = 0; isMultiProcess <= 1 && isCorrect; ++isMultiProcess) + { + if (!(testBed.ev.processMask & (1 << isMultiProcess))) continue; + + // Test either single process all GPUs, or 1 process per GPU + int const numProcesses = isMultiProcess ? totalRanks : 1; + testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1, useBlocking); + + for (int redOpIdx = 0; redOpIdx < redOps.size() && isCorrect; ++redOpIdx) + { + options.redOp = redOps[redOpIdx]; + for (int dataIdx = 0; dataIdx < dataTypes.size() && isCorrect; ++dataIdx) + { + if (testBed.ev.showNames) + INFO("%s %d-ranks AllReduce %s Blocking Config (%s-%s)\n", + isMultiProcess ? "MP" : "SP", + totalRanks, useBlocking ? "true" : "false", + ncclRedOpNames[redOps[redOpIdx]], ncclDataTypeNames[dataTypes[dataIdx]]); + + + for (int numIdx = 0; numIdx < numElements.size() && isCorrect; ++numIdx) + { + testBed.SetCollectiveArgs(funcType, + dataTypes[dataIdx], + numElements[numIdx], + numElements[numIdx], + options); + } + testBed.AllocateMem(inPlace, useManagedMem); + testBed.PrepareData(); + testBed.ExecuteCollectives(); + testBed.ValidateResults(isCorrect); + testBed.DeallocateMem(); + } + } + testBed.DestroyComms(); + } + testBed.Finalize(); + } +} diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index a2eeaca7ae..2e2c83557c 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -46,6 +46,7 @@ if(BUILD_TESTS) set(TEST_SOURCE_FILES AllReduce_Clique.cpp AllReduce_GroupCall.cpp + AllReduce_NonBlockingConf.cpp AllReduce_InPlace.cpp AllReduce_ManagedMem.cpp AllReduce_OutOfPlace.cpp @@ -57,6 +58,7 @@ if(BUILD_TESTS) #AllReduce AllReduce_Clique.cpp AllReduce_GroupCall.cpp + AllReduce_NonBlockingConf.cpp AllReduce_InPlace.cpp AllReduce_ManagedMem.cpp AllReduce_OutOfPlace.cpp diff --git a/test/common/TestBed.cpp b/test/common/TestBed.cpp index f65a1ee59b..f922929237 100644 --- a/test/common/TestBed.cpp +++ b/test/common/TestBed.cpp @@ -85,12 +85,13 @@ namespace RcclUnitTesting } void TestBed::InitComms(std::vector> const& deviceIdsPerProcess, - int const numCollectivesInGroup) + int const numCollectivesInGroup, bool const useBlocking) { // Count up the total number of GPUs to use and track child/deviceId per rank this->numActiveChildren = deviceIdsPerProcess.size(); this->numActiveRanks = 0; this->numCollectivesInGroup = numCollectivesInGroup; + this->useBlocking = useBlocking; this->rankToChildMap.clear(); this->rankToDeviceMap.clear(); if (ev.verbose) INFO("Setting up %d active child processes\n", this->numActiveChildren); @@ -139,6 +140,9 @@ namespace RcclUnitTesting // Send the number of collectives to be run per group call PIPE_WRITE(childId, numCollectivesInGroup); + // Send the RCCL communication with blocking or non-blocking option + PIPE_WRITE(childId, useBlocking); + // Send whether to use MultiRank interfaces or not. PIPE_WRITE(childId, useMulti); @@ -159,9 +163,9 @@ namespace RcclUnitTesting } } - void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup) + void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, bool const useBlocking) { - InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup); + InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup, useBlocking); } void TestBed::SetCollectiveArgs(ncclFunc_t const funcType, diff --git a/test/common/TestBed.hpp b/test/common/TestBed.hpp index e72daebae3..ccb22a177e 100644 --- a/test/common/TestBed.hpp +++ b/test/common/TestBed.hpp @@ -25,7 +25,7 @@ namespace RcclUnitTesting int numActiveChildren; // List of active children (with usable RCCL comms) int numActiveRanks; // Current # of ranks in use int numCollectivesInGroup; // # of collectives to execute per group call - + bool useBlocking; // RCCL communication with blocking or non-blocking option EnvVars ev; // Environment variables // Constructor - Creates one child process per detected GPU device that waits for further commands @@ -33,11 +33,11 @@ namespace RcclUnitTesting // Prepare TestBed for use with GPUs across multiple child processes void InitComms(std::vector> const& deviceIdsPerChild, - int const numCollectivesInGroup = 1); - + int const numCollectivesInGroup = 1, bool const useBlocking = true); + // Prepare TestBed for use with GPUs on a single child process void InitComms(int const numGpus, - int const numCollectivesInGroup = 1); + int const numCollectivesInGroup = 1, bool const useBlocking = true); // Set collectives arguments for specified collective / rank // Setting scalarsPerRank to non-null will create custom reduction operator diff --git a/test/common/TestBedChild.cpp b/test/common/TestBedChild.cpp index d6437effc5..b829530123 100644 --- a/test/common/TestBedChild.cpp +++ b/test/common/TestBedChild.cpp @@ -20,6 +20,26 @@ } \ } +#define CHILD_NCCL_CALL_NON_BLOCKING(msg) \ + { \ + for (int i = 0; i < this->comms.size(); ++i) \ + { \ + ncclResult_t ncclAsyncErr; \ + int loop_counter = 0; \ + do \ + { \ + loop_counter++; \ + if (loop_counter == MAX_LOOP_COUNTER) break; \ + ncclCommGetAsyncError(this->comms[i], &ncclAsyncErr); \ + } while(ncclAsyncErr == ncclInProgress); \ + if (ncclAsyncErr != ncclSuccess) \ + { \ + ERROR("Child process %d fails NCCL call %s with code %d\n", this->childId, msg, ncclAsyncErr); \ + return TEST_FAIL; \ + } \ + } \ + } + #define PIPE_READ(val) \ if (read(childReadFd, &val, sizeof(val)) != sizeof(val)) return TEST_FAIL; @@ -126,6 +146,7 @@ namespace RcclUnitTesting PIPE_READ(this->totalRanks); PIPE_READ(this->rankOffset); PIPE_READ(this->numCollectivesInGroup); + PIPE_READ(this->useBlocking); bool useMultiRankPerGpu; PIPE_READ(useMultiRankPerGpu); @@ -177,6 +198,18 @@ namespace RcclUnitTesting break; } } + else if (this->useBlocking == false) + { + // When non-blocking communicator is desired call ncclCommInitRankConfig with appropriate flag + ncclConfig_t config = NCCL_CONFIG_INITIALIZER; + config.blocking = 0; + if (ncclCommInitRankConfig(&this->comms[localRank], this->totalRanks, id, globalRank, &config) != ncclSuccess) + { + ERROR("Rank %d on child %d unable to call ncclCommInitRankConfig\n", globalRank, this->childId); + status = TEST_FAIL; + break; + } + } else { if (ncclCommInitRank(&this->comms[localRank], this->totalRanks, id, globalRank) != ncclSuccess) @@ -187,10 +220,26 @@ namespace RcclUnitTesting } } } - if (status == TEST_SUCCESS) + if (this->useBlocking == false) { - CHILD_NCCL_CALL(ncclGroupEnd(), "ncclGroupStart"); + CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorInitRankConfig"); } + if (status == TEST_SUCCESS) + { + // Check if the communicator is non-blocking + if (this->useBlocking == false) + { + // handle the ncclGroupEnd in case of non-blocking communication + ncclResult_t Group_End_state = ncclGroupEnd(); + if (Group_End_state != ncclSuccess) CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorGroup"); + } + else + { + // In case of blocking communication just call ncclGroupEnd + CHILD_NCCL_CALL(ncclGroupEnd(), "ncclGroupEnd"); + } + } + if (this->verbose) INFO("Child %d finishes InitComms() [%s]\n", this->childId, status == TEST_SUCCESS ? "SUCCESS" : "FAIL"); return status; @@ -680,6 +729,22 @@ namespace RcclUnitTesting if (this->verbose) INFO("Child %d begins DestroyComms\n", this->childId); // Release comms + for (int i = 0; i < this->comms.size(); ++i) + { + // Check if the communicator is non-blocking + if (this->useBlocking == false) + { + // handle the non-blocking case + ncclCommFinalize(this->comms[i]); + CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorCommFinalize"); + } + else + { + // In case of blocking just call Finalize + CHILD_NCCL_CALL(ncclCommFinalize(this->comms[i]), "ncclCommFinalize"); + } + } + for (int i = 0; i < this->comms.size(); ++i) { CHILD_NCCL_CALL(ncclCommDestroy(this->comms[i]), "ncclCommDestroy"); diff --git a/test/common/TestBedChild.hpp b/test/common/TestBedChild.hpp index 43b511317e..2a0e43e6a6 100644 --- a/test/common/TestBedChild.hpp +++ b/test/common/TestBedChild.hpp @@ -12,6 +12,7 @@ #include "rccl/rccl.h" #define MAX_RANKS 32 +#define MAX_LOOP_COUNTER 1000000000 namespace RcclUnitTesting { class TestBedChild @@ -63,6 +64,7 @@ namespace RcclUnitTesting int totalRanks; // Total ranks int rankOffset; // Global rank offset for this child int numCollectivesInGroup; // # of collectives to run per group call + bool useBlocking; // RCCL communication with blocking or non-blocking option std::vector comms; // RCCL communicators for each rank std::vector deviceIds; // Device IDs for each rank std::vector streams; // Streams for executing collectives