diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3ba633d633..b405660d9d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -52,6 +52,7 @@ if(BUILD_TESTS) AllReduce_OutOfPlace.cpp AllReduce_PreMultScalar.cpp AllReduce_Msccl.cpp + Multistream.cpp ) else() set(TEST_SOURCE_FILES @@ -95,6 +96,8 @@ if(BUILD_TESTS) Gather_OutOfPlace.cpp #SendRecv SendRecv_SinglePairs.cpp + #Misc + Multistream.cpp ) endif() diff --git a/test/Multistream.cpp b/test/Multistream.cpp new file mode 100644 index 0000000000..fa601ccbfb --- /dev/null +++ b/test/Multistream.cpp @@ -0,0 +1,61 @@ +/************************************************************************* + * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "TestBed.hpp" +#include +namespace RcclUnitTesting +{ + TEST(Multistream, NoGraph) + { + TestBed testBed; + + // Configuration + int const numElements = 1048576; + bool const inPlace = false; + bool const useManagedMem = false; + + OptionalColArgs options; + + // This test runs multiple AllReduce collectives on different streams within the same group call + bool isCorrect = true; + for (int totalRanks = testBed.ev.minGpus; totalRanks <= testBed.ev.maxGpus && isCorrect; ++totalRanks) + for (int isMultiProcess = 0; isMultiProcess <= 1 && isCorrect; ++isMultiProcess) + { + if (!(testBed.ev.processMask & (1 << isMultiProcess))) continue; + + // Test either single process all GPUs, or 1 process per GPU + int const numProcesses = isMultiProcess ? totalRanks : 1; + + for (int numCollPerGroup = 2; numCollPerGroup <= 6; numCollPerGroup += 2) + { + for (int numStreamsPerGroup = numCollPerGroup; numStreamsPerGroup >= 2; numStreamsPerGroup -= 3) + { + if (testBed.ev.showNames) + INFO("%s %d-ranks Multistream %d-Group Calls across %d streams\n", + isMultiProcess ? "MP" : "SP", totalRanks, numCollPerGroup, numStreamsPerGroup); + + testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), + numCollPerGroup, false, numStreamsPerGroup); + + // Set up each collective in group in different stream (modulo numStreamsPerGroup) + options.redOp = ncclSum; + for (int collIdx = 0; collIdx < numCollPerGroup; ++collIdx) + { + testBed.SetCollectiveArgs(ncclCollAllReduce, ncclFloat, numElements, numElements, + options, collIdx, -1, collIdx % numStreamsPerGroup); + } + + testBed.AllocateMem(inPlace, useManagedMem); + testBed.PrepareData(); + testBed.ExecuteCollectives(); + testBed.ValidateResults(isCorrect); + testBed.DeallocateMem(); + testBed.DestroyComms(); + } + } + } + testBed.Finalize(); + } +} diff --git a/test/common/CollectiveArgs.cpp b/test/common/CollectiveArgs.cpp index 08beed737d..36c9b45e40 100644 --- a/test/common/CollectiveArgs.cpp +++ b/test/common/CollectiveArgs.cpp @@ -16,6 +16,7 @@ namespace RcclUnitTesting ncclDataType_t const dataType, size_t const numInputElements, size_t const numOutputElements, + int const streamIdx, OptionalColArgs const &optionalColArgs) { // Free scalar based on previous scalarMode @@ -35,6 +36,7 @@ namespace RcclUnitTesting this->dataType = dataType; this->numInputElements = numInputElements; this->numOutputElements = numOutputElements; + this->streamIdx = streamIdx; this->options = optionalColArgs; if (this->options.scalarMode != -1) diff --git a/test/common/CollectiveArgs.hpp b/test/common/CollectiveArgs.hpp index 1d26155c29..7bdd005f93 100644 --- a/test/common/CollectiveArgs.hpp +++ b/test/common/CollectiveArgs.hpp @@ -103,6 +103,7 @@ namespace RcclUnitTesting size_t numInputElements; size_t numOutputElements; PtrUnion localScalar; + int streamIdx; OptionalColArgs options; // Data @@ -125,6 +126,7 @@ namespace RcclUnitTesting ncclDataType_t const dataType, size_t const numInputElements, size_t const numOutputElements, + int const streamIdx, OptionalColArgs const &optionalArgs = {}); // Allocates GPU memory for input/output and CPU memory for expected diff --git a/test/common/TestBed.cpp b/test/common/TestBed.cpp index 64185712ff..8a44d570e9 100644 --- a/test/common/TestBed.cpp +++ b/test/common/TestBed.cpp @@ -86,13 +86,16 @@ namespace RcclUnitTesting } void TestBed::InitComms(std::vector> const& deviceIdsPerProcess, - int const numCollectivesInGroup, bool const useBlocking) + int const numCollectivesInGroup, + bool const useBlocking, + int const numStreamsPerGroup) { // Count up the total number of GPUs to use and track child/deviceId per rank this->numActiveChildren = deviceIdsPerProcess.size(); this->numActiveRanks = 0; this->numCollectivesInGroup = numCollectivesInGroup; this->useBlocking = useBlocking; + this->numStreamsPerGroup = numStreamsPerGroup; this->rankToChildMap.clear(); this->rankToDeviceMap.clear(); if (ev.verbose) INFO("Setting up %d active child processes\n", this->numActiveChildren); @@ -147,6 +150,9 @@ namespace RcclUnitTesting // Send whether to use MultiRank interfaces or not. PIPE_WRITE(childId, useMulti); + // Send how many streams to use per group call + PIPE_WRITE(childId, numStreamsPerGroup); + // Send the GPUs this child uses int const numGpus = deviceIdsPerProcess[childId].size(); PIPE_WRITE(childId, numGpus); @@ -164,9 +170,9 @@ namespace RcclUnitTesting } } - void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, bool const useBlocking) + void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, bool const useBlocking, int const numStreamsPerGroup) { - InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup, useBlocking); + InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup, useBlocking, numStreamsPerGroup); } void TestBed::SetCollectiveArgs(ncclFunc_t const funcType, @@ -175,13 +181,20 @@ namespace RcclUnitTesting size_t const numOutputElements, OptionalColArgs const &optionalArgs, int const collId, - int const rank) + int const rank, + int const streamIdx) { // Build list of ranks this applies to (-1 for rank means to set for all) std::vector rankList; for (int i = 0; i < this->numActiveRanks; ++i) if (rank == -1 || rank == i) rankList.push_back(i); + if (streamIdx < 0 || streamIdx >= this->numStreamsPerGroup) + { + ERROR("StreamIdx for collective %d is out of bounds (%d/%d):\n", collId, streamIdx, numStreamsPerGroup); + FAIL(); + } + // Loop over all ranks and send CollectiveArgs to appropriate child process int const cmd = TestBedChild::CHILD_SET_COLL_ARGS; for (auto currRank : rankList) @@ -194,6 +207,7 @@ namespace RcclUnitTesting PIPE_WRITE(childId, dataType); PIPE_WRITE(childId, numInputElements); PIPE_WRITE(childId, numOutputElements); + PIPE_WRITE(childId, streamIdx); PIPE_WRITE(childId, optionalArgs); PIPE_CHECK(childId); } diff --git a/test/common/TestBed.hpp b/test/common/TestBed.hpp index d3fc621d57..d9d9cf7a10 100644 --- a/test/common/TestBed.hpp +++ b/test/common/TestBed.hpp @@ -26,6 +26,7 @@ namespace RcclUnitTesting int numActiveRanks; // Current # of ranks in use int numCollectivesInGroup; // # of collectives to execute per group call bool useBlocking; // RCCL communication with blocking or non-blocking option + int numStreamsPerGroup; // # of different streams available per group call EnvVars ev; // Environment variables // Constructor - Creates one child process per detected GPU device that waits for further commands @@ -33,24 +34,28 @@ namespace RcclUnitTesting // Prepare TestBed for use with GPUs across multiple child processes void InitComms(std::vector> const& deviceIdsPerChild, - int const numCollectivesInGroup = 1, bool const useBlocking = true); + int const numCollectivesInGroup = 1, + bool const useBlocking = true, + int const numStreamsPerGroup = 1); // Prepare TestBed for use with GPUs on a single child process - void InitComms(int const numGpus, - int const numCollectivesInGroup = 1, bool const useBlocking = true); + void InitComms(int const numGpus, + int const numCollectivesInGroup = 1, + bool const useBlocking = true, + int const numStreamsPerGroup = 1); // Set collectives arguments for specified collective / rank // Setting scalarsPerRank to non-null will create custom reduction operator // Using collId = -1 (default) applies settings to all collectives in group // Using rank = -1 (default) applies settings to all ranks - void SetCollectiveArgs(ncclFunc_t const funcType, ncclDataType_t const dataType, size_t const numInputElements, size_t const numOutputElements, OptionalColArgs const &optionalArgs = {}, - int const collId = -1, - int const rank = -1); + int const collId = -1, + int const rank = -1, + int const streamIdx = 0); // Allocate memory for specified collective / rank // - Requires SetCollectiveArgs to have been called already diff --git a/test/common/TestBedChild.cpp b/test/common/TestBedChild.cpp index e063599ace..feb9c0a346 100644 --- a/test/common/TestBedChild.cpp +++ b/test/common/TestBedChild.cpp @@ -147,11 +147,13 @@ namespace RcclUnitTesting PIPE_READ(this->useBlocking); bool useMultiRankPerGpu; PIPE_READ(useMultiRankPerGpu); + PIPE_READ(this->numStreamsPerGroup); // Read the GPUs this child uses and prepare storage for collective args / datasets int numGpus; PIPE_READ(numGpus); this->deviceIds.resize(numGpus); + this->streams.clear(); this->streams.resize(numGpus); this->collArgs.resize(numGpus); for (int i = 0; i < numGpus; i++) @@ -159,6 +161,7 @@ namespace RcclUnitTesting PIPE_READ(this->deviceIds[i]); this->collArgs[i].clear(); this->collArgs[i].resize(numCollectivesInGroup); + this->streams[i].resize(numStreamsPerGroup); } // Initialize communicators @@ -180,11 +183,14 @@ namespace RcclUnitTesting break; } - if (hipStreamCreate(&this->streams[localRank]) != hipSuccess) + for (int i = 0; i < this->numStreamsPerGroup; i++) { - ERROR("Rank %d on child %d unable to create stream for GPU %d\n", globalRank, this->childId, currGpu); - status = TEST_FAIL; - break; + if (hipStreamCreate(&(this->streams[localRank][i])) != hipSuccess) + { + ERROR("Rank %d on child %d unable to create stream %d for GPU %d\n", globalRank, this->childId, i, currGpu); + status = TEST_FAIL; + break; + } } if (useMultiRankPerGpu) @@ -253,6 +259,7 @@ namespace RcclUnitTesting ncclDataType_t dataType; size_t numInputElements; size_t numOutputElements; + int streamIdx; OptionalColArgs options; PIPE_READ(globalRank); @@ -261,6 +268,7 @@ namespace RcclUnitTesting PIPE_READ(dataType); PIPE_READ(numInputElements); PIPE_READ(numOutputElements); + PIPE_READ(streamIdx); PIPE_READ(options); if (globalRank < this->rankOffset || (this->rankOffset + comms.size() <= globalRank)) @@ -280,6 +288,7 @@ namespace RcclUnitTesting this->deviceIds[localRank], funcType, dataType, numInputElements, numOutputElements, + streamIdx, options)); if (this->verbose) INFO("Rank %d on child %d sets collective %d [%s]\n", globalRank, this->childId, collIdx, @@ -407,8 +416,15 @@ namespace RcclUnitTesting } numRanksToExecute = (int)localRanksToExecute.size(); - hipGraph_t graphs[numRanksToExecute]; - hipGraphExec_t graphExec[numRanksToExecute]; + std::vector> graphs; + std::vector> graphExec; + graphs.resize(numRanksToExecute); + graphExec.resize(numRanksToExecute); + for (int i = 0; i < numRanksToExecute; i++) + { + graphs[i].resize(this->numStreamsPerGroup); + graphExec[i].resize(this->numStreamsPerGroup); + } // Start HIP graph stream capture if requested if (useHipGraph) @@ -416,7 +432,10 @@ namespace RcclUnitTesting for (int localRank : localRanksToExecute) { if (this->verbose) INFO("Capturing stream for rank %d\n", localRank); - CHECK_HIP(hipStreamBeginCapture(this->streams[localRank], hipStreamCaptureModeRelaxed)); + for (int i = 0; i < this->numStreamsPerGroup; i++) + { + CHECK_HIP(hipStreamBeginCapture(this->streams[localRank][i], hipStreamCaptureModeRelaxed)); + } } } @@ -460,7 +479,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclBroadcast"); break; case ncclCollReduce: @@ -471,7 +490,7 @@ namespace RcclUnitTesting collArg.options.redOp, collArg.options.root, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclReduce"); break; case ncclCollAllGather: @@ -480,7 +499,7 @@ namespace RcclUnitTesting collArg.numInputElements, collArg.dataType, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclAllGather"); break; case ncclCollReduceScatter: @@ -490,7 +509,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.redOp, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclReduceScatter"); break; case ncclCollAllReduce: @@ -500,7 +519,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.redOp, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclAllReduce"); break; case ncclCollGather: @@ -510,7 +529,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclGather"); break; case ncclCollScatter: @@ -520,7 +539,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclScatter"); break; case ncclCollAllToAll: @@ -529,7 +548,7 @@ namespace RcclUnitTesting collArg.numInputElements / collArg.totalRanks, collArg.dataType, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclAllToAll"); break; case ncclCollAllToAllv: @@ -541,7 +560,7 @@ namespace RcclUnitTesting collArg.options.rdispls + (this->rankOffset + localRank)*this->totalRanks, collArg.dataType, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclAllToAllv"); break; case ncclCollSend: @@ -550,7 +569,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclSend"); break; case ncclCollRecv: @@ -559,7 +578,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank]), + this->streams[localRank][collArg.streamIdx]), "ncclRecv"); break; default: @@ -599,23 +618,33 @@ namespace RcclUnitTesting { if (this->verbose) INFO("Ending stream capture for rank %d\n", localRank); - CHECK_HIP(hipStreamEndCapture(this->streams[localRank], &graphs[localRank])); - if (this->verbose) + for (int i = 0; i < this->numStreamsPerGroup; i++) { - size_t numNodes; - hipGraphNode_t* nodes; - CHECK_HIP(hipGraphGetNodes(graphs[localRank], nodes, &numNodes)); - INFO("Graph for rank %d has %lu nodes\n", localRank, numNodes); + CHECK_HIP(hipStreamEndCapture(this->streams[localRank][i], &graphs[localRank][i])); + + if (this->verbose) + { + size_t numNodes; + hipGraphNode_t* nodes; + CHECK_HIP(hipGraphGetNodes(graphs[localRank][i], nodes, &numNodes)); + INFO("Graph for rank %d stream %d has %lu nodes\n", localRank, i, numNodes); + } } if (this->verbose) INFO("Instantiating executable graph for rank %d\n", localRank); - CHECK_HIP(hipGraphInstantiate(&graphExec[localRank], graphs[localRank], NULL, NULL, 0)); + for (int i = 0; i < this->numStreamsPerGroup; i++) + { + CHECK_HIP(hipGraphInstantiate(&graphExec[localRank][i], graphs[localRank][i], NULL, NULL, 0)); + } } for (int localRank : localRanksToExecute) { if (this->verbose) INFO("Launch graph for rank %d\n", localRank); - CHECK_HIP(hipGraphLaunch(graphExec[localRank], this->streams[localRank])); + for (int i = 0; i < this->numStreamsPerGroup; i++) + { + CHECK_HIP(hipGraphLaunch(graphExec[localRank][i], this->streams[localRank][i])); + } } } else @@ -628,7 +657,8 @@ namespace RcclUnitTesting for (int localRank : localRanksToExecute) { if (this->verbose) INFO("Starting synchronization for rank %d\n", localRank); - CHECK_HIP(hipStreamSynchronize(this->streams[localRank])); + for (int i = 0; i < this->numStreamsPerGroup; i++) + CHECK_HIP(hipStreamSynchronize(this->streams[localRank][i])); } // Destroy graphs @@ -637,8 +667,11 @@ namespace RcclUnitTesting for (int localRank : localRanksToExecute) { if (this->verbose) INFO("Destroying graphs for rank %d\n", localRank); - CHECK_HIP(hipGraphDestroy(graphs[localRank])); - CHECK_HIP(hipGraphExecDestroy(graphExec[localRank])); + for (int i = 0; i < this->numStreamsPerGroup; i++) + { + CHECK_HIP(hipGraphDestroy(graphs[localRank][i])); + CHECK_HIP(hipGraphExecDestroy(graphExec[localRank][i])); + } } } @@ -768,7 +801,10 @@ namespace RcclUnitTesting } for (int i = 0; i < this->streams.size(); ++i) { - CHECK_HIP(hipStreamDestroy(this->streams[i])); + for (int j = 0; j < this->numStreamsPerGroup; j++) + { + CHECK_HIP(hipStreamDestroy(this->streams[i][j])); + } } this->comms.clear(); this->streams.clear(); diff --git a/test/common/TestBedChild.hpp b/test/common/TestBedChild.hpp index 3c30138e3c..0394fd170a 100644 --- a/test/common/TestBedChild.hpp +++ b/test/common/TestBedChild.hpp @@ -65,9 +65,10 @@ namespace RcclUnitTesting int rankOffset; // Global rank offset for this child int numCollectivesInGroup; // # of collectives to run per group call bool useBlocking; // RCCL communication with blocking or non-blocking option + int numStreamsPerGroup; // # of different streams allowed per group call std::vector comms; // RCCL communicators for each rank std::vector deviceIds; // Device IDs for each rank - std::vector streams; // Streams for executing collectives + std::vector> streams; // Streams for executing collectives std::vector> collArgs; // Info for each collective for each rank // Constructor