From 0ec41f13864733421dcac67c9eb291738d7ea2ef Mon Sep 17 00:00:00 2001 From: Bertan Dogancay <111835151+BertanDogancay@users.noreply.github.com> Date: Thu, 25 Apr 2024 11:11:16 -0600 Subject: [PATCH] [UT] Start supporting multiple group calls and graphs (#1151) * Start supporting multiple group calls UT --- test/AllToAllVTests.cpp | 2 + test/GroupCallTests.cpp | 91 ++++++++- test/NonBlockingTests.cpp | 2 +- test/SendRecvTests.cpp | 17 +- test/common/TestBed.cpp | 237 +++++++++++++++++------ test/common/TestBed.hpp | 57 ++++-- test/common/TestBedChild.cpp | 358 +++++++++++++++++++++-------------- test/common/TestBedChild.hpp | 42 ++-- 8 files changed, 566 insertions(+), 240 deletions(-) diff --git a/test/AllToAllVTests.cpp b/test/AllToAllVTests.cpp index b86ff6c556..b8d1afd966 100644 --- a/test/AllToAllVTests.cpp +++ b/test/AllToAllVTests.cpp @@ -98,6 +98,7 @@ namespace RcclUnitTesting numOutputElements[rank], options, -1, + 0, rank); } testBed.AllocateMem(inPlace, useManagedMem); @@ -154,6 +155,7 @@ namespace RcclUnitTesting numOutputElements[rank], options, -1, + 0, rank); } testBed.AllocateMem(inPlace, useManagedMem); diff --git a/test/GroupCallTests.cpp b/test/GroupCallTests.cpp index 6ed0ff7f82..6bc01be052 100644 --- a/test/GroupCallTests.cpp +++ b/test/GroupCallTests.cpp @@ -195,14 +195,14 @@ namespace RcclUnitTesting isMultiProcess ? "MP" : "SP", totalRanks, numCollPerGroup, numStreamsPerGroup); testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), - numCollPerGroup, true, numStreamsPerGroup); + numCollPerGroup, numStreamsPerGroup); // Set up each collective in group in different stream (modulo numStreamsPerGroup) options.redOp = ncclSum; for (int collIdx = 0; collIdx < numCollPerGroup; ++collIdx) { testBed.SetCollectiveArgs(ncclCollAllReduce, ncclFloat, numElements, numElements, - options, collIdx, -1, collIdx % numStreamsPerGroup); + options, collIdx, 0, -1, collIdx % numStreamsPerGroup); } testBed.AllocateMem(inPlace, useManagedMem); @@ -216,4 +216,91 @@ namespace RcclUnitTesting } testBed.Finalize(); } + + TEST(GroupCall, MultiGroupCall) + { + TestBed testBed; + + // Configuration + std::vector> const groupCalls = {{ncclCollAllReduce, ncclCollAllGather}, + {ncclCollAllToAll, ncclCollGather}, + {ncclCollBroadcast, ncclCollReduceScatter}}; + std::vector> const numElements = {{1250, 1048576}, {384, 384 * 1024}, {1048576, 127}}; + std::vector const dataTypes = {ncclFloat16, ncclFloat32, ncclBfloat16}; + std::vector const redops = {ncclSum, ncclProd, ncclMax}; + std::vector const numCollsPerGroup = {2, 2, 2}; + std::vector const numStreamsPerGroup = {1, 1, 1}; + std::vector const useHipGraphList = {true, false, true}; + bool const inPlace = false; + bool const useManagedMem = false; + bool const useBlocking = true; + int const numGroupCalls = groupCalls.size(); + int const numIterations = 10; + + bool isCorrect = true; + for (int totalRanks : testBed.ev.GetNumGpusList()) + for (int isMultiProcess : testBed.ev.GetIsMultiProcessList()) + { + int const numProcesses = isMultiProcess ? totalRanks : 1; + + // Initialize comms by specifying the # of group calls + testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), numCollsPerGroup, numStreamsPerGroup, numGroupCalls, useBlocking); + + if (testBed.ev.showNames) + INFO("%s %d-ranks GroupCall MultiGroupCall\n", isMultiProcess ? "MP" : "SP", totalRanks); + + for (int groupCallIdx = 0; groupCallIdx < groupCalls.size(); ++groupCallIdx) + { + std::vector funcTypes = groupCalls[groupCallIdx]; + OptionalColArgs options; + options.redOp = redops[groupCallIdx]; + options.root = 0; + + for (int collIdx = 0; collIdx < numCollsPerGroup[groupCallIdx]; ++collIdx) + { + int numInputElements; + int numOutputElements; + CollectiveArgs::GetNumElementsForFuncType(funcTypes[collIdx], + numElements[groupCallIdx][collIdx], + totalRanks, + &numInputElements, + &numOutputElements); + + testBed.SetCollectiveArgs(funcTypes[collIdx], + dataTypes[groupCallIdx], + numInputElements, + numOutputElements, + options, + collIdx, + groupCallIdx); + } + + testBed.AllocateMem(inPlace, useManagedMem, groupCallIdx); + testBed.PrepareData(groupCallIdx); + + // Stream capture in advance for HIP graph enabled collective groups + if (useHipGraphList[groupCallIdx]) + { + testBed.ExecuteCollectives({}, groupCallIdx, useHipGraphList[groupCallIdx]); + } + } + + // Execute collectives based on groupIdx + for (int i = 0; i < numIterations; ++i) + { + // Select a random group call + int groupCallIdx = i % groupCalls.size(); + + // Use graphs if enabled otherwise execute the collective + if (useHipGraphList[groupCallIdx]) testBed.LaunchGraphs(groupCallIdx); + else testBed.ExecuteCollectives({}, groupCallIdx); + testBed.ValidateResults(isCorrect, groupCallIdx); + } + + testBed.DeallocateMem(); + testBed.DestroyGraphs(); + testBed.DestroyComms(); + } + testBed.Finalize(); + } } diff --git a/test/NonBlockingTests.cpp b/test/NonBlockingTests.cpp index fab505ad70..5b505c1869 100644 --- a/test/NonBlockingTests.cpp +++ b/test/NonBlockingTests.cpp @@ -34,7 +34,7 @@ namespace RcclUnitTesting { int const numProcesses = isMultiProcess ? totalRanks : 1; // Initialize communicators in non-blocking mode - testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1, useBlocking); + testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1, 1, 1, useBlocking); // Loop over various collective functions for (auto funcType : funcTypes) diff --git a/test/SendRecvTests.cpp b/test/SendRecvTests.cpp index 53c603e4c1..b2cd892e10 100644 --- a/test/SendRecvTests.cpp +++ b/test/SendRecvTests.cpp @@ -16,6 +16,7 @@ namespace RcclUnitTesting std::vector const numElements = {1048576, 53327, 1024, 0}; bool const inPlace = false; bool const useManagedMem = false; + int const groupCallId = 0; OptionalColArgs options; bool isCorrect = true; @@ -42,11 +43,12 @@ namespace RcclUnitTesting numElements[numIdx], options, 0, + groupCallId, sendRank); if (recvRank == 0) { - testBed.AllocateMem(inPlace, useManagedMem, 0, sendRank); - testBed.PrepareData(0, sendRank); + testBed.AllocateMem(inPlace, useManagedMem, groupCallId, 0, sendRank); + testBed.PrepareData(groupCallId, 0, sendRank); } if (recvRank != sendRank) { @@ -65,15 +67,16 @@ namespace RcclUnitTesting numElements[numIdx], options, 0, + groupCallId, recvRank); - testBed.AllocateMem(inPlace, useManagedMem, 0, recvRank); - testBed.PrepareData(0, recvRank); + testBed.AllocateMem(inPlace, useManagedMem, groupCallId, 0, recvRank); + testBed.PrepareData(groupCallId, 0, recvRank); testBed.ExecuteCollectives({sendRank, recvRank}); - testBed.ValidateResults(isCorrect, 0, recvRank); - testBed.DeallocateMem(0, recvRank); + testBed.ValidateResults(isCorrect, groupCallId, 0, recvRank); + testBed.DeallocateMem(groupCallId, 0, recvRank); } } - testBed.DeallocateMem(0, sendRank); + testBed.DeallocateMem(groupCallId, 0, sendRank); } testBed.DestroyComms(); } diff --git a/test/common/TestBed.cpp b/test/common/TestBed.cpp index d61b6b832c..df6f5f7593 100644 --- a/test/common/TestBed.cpp +++ b/test/common/TestBed.cpp @@ -58,15 +58,17 @@ namespace RcclUnitTesting } void TestBed::InitComms(std::vector> const& deviceIdsPerProcess, - int const numCollectivesInGroup, - bool const useBlocking, - int const numStreamsPerGroup) + std::vector const& numCollectivesInGroup, + std::vector const& numStreamsPerGroup, + int const numGroupCalls, + bool const useBlocking) { InteractiveWait("Starting InitComms"); // Count up the total number of GPUs to use and track child/deviceId per rank this->numActiveChildren = deviceIdsPerProcess.size(); this->numActiveRanks = 0; + this->numGroupCalls = numGroupCalls; this->numCollectivesInGroup = numCollectivesInGroup; this->useBlocking = useBlocking; this->numStreamsPerGroup = numStreamsPerGroup; @@ -150,6 +152,9 @@ namespace RcclUnitTesting // Send the rank offset for this child process PIPE_WRITE(childId, rankOffset); + // Send the total number of group calls for this child process + PIPE_WRITE(childId, numGroupCalls); + // Send the number of collectives to be run per group call PIPE_WRITE(childId, numCollectivesInGroup); @@ -180,9 +185,15 @@ namespace RcclUnitTesting InteractiveWait("Finishing InitComms"); } - void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, bool const useBlocking, int const numStreamsPerGroup) + void TestBed::InitComms(std::vector> const& deviceIdsPerProcess, + int const numCollectivesInGroup, int const numStreamsPerGroup, int const numGroupCalls, bool const useBlocking) { - InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup, useBlocking, numStreamsPerGroup); + InitComms(deviceIdsPerProcess, TestBed::GetNumCollsPerGroup(numCollectivesInGroup, numGroupCalls), TestBed::GetNumStreamsPerGroup(numStreamsPerGroup, numGroupCalls), numGroupCalls, useBlocking); + } + + void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, int const numStreamsPerGroup, int const numGroupCalls, bool const useBlocking) + { + InitComms(TestBed::GetDeviceIdsList(1, numGpus), TestBed::GetNumCollsPerGroup(numCollectivesInGroup, numGroupCalls), TestBed::GetNumStreamsPerGroup(numStreamsPerGroup, numGroupCalls), numGroupCalls, useBlocking); } void TestBed::SetCollectiveArgs(ncclFunc_t const funcType, @@ -191,6 +202,7 @@ namespace RcclUnitTesting size_t const numOutputElements, OptionalColArgs const &optionalArgs, int const collId, + int const groupId, int const rank, int const streamIdx) { @@ -200,9 +212,9 @@ namespace RcclUnitTesting for (int i = 0; i < this->numActiveRanks; ++i) if (rank == -1 || rank == i) rankList.push_back(i); - if (streamIdx < 0 || streamIdx >= this->numStreamsPerGroup) + if (streamIdx < 0 || streamIdx >= this->numStreamsPerGroup[groupId]) { - ERROR("StreamIdx for collective %d is out of bounds (%d/%d):\n", collId, streamIdx, numStreamsPerGroup); + ERROR("StreamIdx for group %d collective %d is out of bounds (%d/%d):\n", groupId, collId, streamIdx, numStreamsPerGroup[groupId]); FAIL(); } @@ -214,6 +226,7 @@ namespace RcclUnitTesting PIPE_WRITE(childId, cmd); PIPE_WRITE(childId, currRank); PIPE_WRITE(childId, collId); + PIPE_WRITE(childId, groupId); PIPE_WRITE(childId, funcType); PIPE_WRITE(childId, dataType); PIPE_WRITE(childId, numInputElements); @@ -227,6 +240,7 @@ namespace RcclUnitTesting void TestBed::AllocateMem(bool const inPlace, bool const useManagedMem, + int const groupId, int const collId, int const rank) { @@ -236,23 +250,32 @@ namespace RcclUnitTesting std::vector rankList; for (int i = 0; i < this->numActiveRanks; ++i) if (rank == -1 || rank == i) rankList.push_back(i); + + // Build list of groups this applies to (-1 for groupId means to set for all) + std::vector groupList; + for (int i = 0; i < this->numGroupCalls; ++i) + if (groupId == -1 || groupId == i) groupList.push_back(i); // Loop over all ranks and send allocation command to appropriate child process int const cmd = TestBedChild::CHILD_ALLOCATE_MEM; - for (auto currRank : rankList) - { - int const childId = rankToChildMap[currRank]; - PIPE_WRITE(childId, cmd); - PIPE_WRITE(childId, currRank); - PIPE_WRITE(childId, collId); - PIPE_WRITE(childId, inPlace); - PIPE_WRITE(childId, useManagedMem); - PIPE_CHECK(childId); + for (auto currGroup : groupList) { + for (auto currRank : rankList) + { + int const childId = rankToChildMap[currRank]; + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, currRank); + PIPE_WRITE(childId, collId); + PIPE_WRITE(childId, inPlace); + PIPE_WRITE(childId, useManagedMem); + PIPE_WRITE(childId, currGroup); + PIPE_CHECK(childId); + } } InteractiveWait("Finishing AllocateMem"); } - void TestBed::PrepareData(int const collId, + void TestBed::PrepareData(int const groupId, + int const collId, int const rank, CollFuncPtr const prepDataFunc) { @@ -261,22 +284,32 @@ namespace RcclUnitTesting std::vector rankList; for (int i = 0; i < this->numActiveRanks; ++i) if (rank == -1 || rank == i) rankList.push_back(i); + + // Build list of groups this applies to (-1 for groupId means to set for all) + std::vector groupList; + for (int i = 0; i < this->numGroupCalls; ++i) + if (groupId == -1 || groupId == i) groupList.push_back(i); // Loop over all ranks and send prepare data command to appropriate child process int const cmd = TestBedChild::CHILD_PREPARE_DATA; - for (auto currRank : rankList) + for (auto currGroup : groupList) { - int const childId = rankToChildMap[currRank]; - PIPE_WRITE(childId, cmd); - PIPE_WRITE(childId, currRank); - PIPE_WRITE(childId, collId); - PIPE_WRITE(childId, prepDataFunc); - PIPE_CHECK(childId); + for (auto currRank : rankList) + { + int const childId = rankToChildMap[currRank]; + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, currRank); + PIPE_WRITE(childId, currGroup); + PIPE_WRITE(childId, collId); + PIPE_WRITE(childId, prepDataFunc); + PIPE_CHECK(childId); + } } InteractiveWait("Finishing PrepareData"); } - void TestBed::ExecuteCollectives(std::vector const ¤tRanks, bool const useHipGraph) + void TestBed::ExecuteCollectives(std::vector const ¤tRanks, int const groupId, + bool const useHipGraph) { InteractiveWait("Starting ExecuteCollectives"); @@ -289,19 +322,27 @@ namespace RcclUnitTesting ranksPerChild[rankToChildMap[currentRanks[rank]]].push_back(rank); } - // Send ExecuteColl command to each active child process - for (int childId = 0; childId < this->numActiveChildren; ++childId) - { - if ((currentRanks.size() == 0) || (ranksPerChild[childId].size() > 0)) + // Build list of groups this applies to (-1 for groupId means to set for all) + std::vector groupList; + for (int i = 0; i < this->numGroupCalls; ++i) + if (groupId == -1 || groupId == i) groupList.push_back(i); + + for (auto currGroup : groupList) { + // Send ExecuteColl command to each active child process + for (int childId = 0; childId < this->numActiveChildren; ++childId) { - InteractiveWait("Starting ExecuteCollectives for child " + std::to_string(childId)); - PIPE_WRITE(childId, cmd); - PIPE_WRITE(childId, ev.timeoutUs); - PIPE_WRITE(childId, useHipGraph); - int tempCurrentRanks = currentRanks.size(); - PIPE_WRITE(childId, tempCurrentRanks); - for (int rank = 0; rank < currentRanks.size(); ++rank){ - PIPE_WRITE(childId, currentRanks[rank]); + if ((currentRanks.size() == 0) || (ranksPerChild[childId].size() > 0)) + { + InteractiveWait("Starting ExecuteCollectives for child " + std::to_string(childId)); + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, ev.timeoutUs); + PIPE_WRITE(childId, currGroup); + PIPE_WRITE(childId, useHipGraph); + int tempCurrentRanks = currentRanks.size(); + PIPE_WRITE(childId, tempCurrentRanks); + for (int rank = 0; rank < currentRanks.size(); ++rank){ + PIPE_WRITE(childId, currentRanks[rank]); + } } } } @@ -315,7 +356,7 @@ namespace RcclUnitTesting InteractiveWait("Finishing ExecuteCollectives"); } - void TestBed::ValidateResults(bool& isCorrect, int const collId, int const rank) + void TestBed::ValidateResults(bool& isCorrect, int const groupId, int const collId, int const rank) { InteractiveWait("Starting ValidateResults"); @@ -323,21 +364,30 @@ namespace RcclUnitTesting std::vector rankList; for (int i = 0; i < this->numActiveRanks; ++i) if (rank == -1 || rank == i) rankList.push_back(i); + + // Build list of groups this applies to (-1 for groupId means to set for all) + std::vector groupList; + for (int i = 0; i < this->numGroupCalls; ++i) + if (groupId == -1 || groupId == i) groupList.push_back(i); int const cmd = TestBedChild::CHILD_VALIDATE_RESULTS; isCorrect = true; - // Send ValidateResults command to each active child process - for (auto currRank : rankList) + for (auto currGroup : groupList) { - int const childId = rankToChildMap[currRank]; - PIPE_WRITE(childId, cmd); - PIPE_WRITE(childId, currRank); - PIPE_WRITE(childId, collId); + // Send ValidateResults command to each active child process + for (auto currRank : rankList) + { + int const childId = rankToChildMap[currRank]; + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, currRank); + PIPE_WRITE(childId, currGroup); + PIPE_WRITE(childId, collId); - int response = 0; - ASSERT_EQ(read(childList[childId]->parentReadFd, &response, sizeof(int)), sizeof(int)); - isCorrect &= (response == TEST_SUCCESS); + int response = 0; + ASSERT_EQ(read(childList[childId]->parentReadFd, &response, sizeof(int)), sizeof(int)); + isCorrect &= (response == TEST_SUCCESS); + } } ASSERT_EQ(isCorrect, true) << "Output does not match expected"; @@ -345,27 +395,62 @@ namespace RcclUnitTesting InteractiveWait("Finishing ValidateResults"); } - void TestBed::DeallocateMem(int const collId, int const rank) + void TestBed::LaunchGraphs(int const groupId) { - InteractiveWait("Starting ValidateResults"); + InteractiveWait("Starting LaunchGraphs"); + + // Build list of groups this applies to (-1 for groupId means to set for all) + std::vector groupList; + for (int i = 0; i < this->numGroupCalls; ++i) + if (groupId == -1 || groupId == i) groupList.push_back(i); + + int const cmd = TestBedChild::CHILD_LAUNCH_GRAPHS; + for (auto currGroup : groupList) + { + for (int childId = 0; childId < this->numActiveChildren; ++childId) + { + // Send LaunchGraphs command to each active child process + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, currGroup); + + // Wait for child acknowledgement + PIPE_CHECK(childId); + } + } + + InteractiveWait("Finishing LaunchGraphs"); + } + + void TestBed::DeallocateMem(int const groupId, int const collId, int const rank) + { + InteractiveWait("Starting DeallocateMem"); // Build list of ranks this applies to (-1 for rank means to set for all) std::vector rankList; for (int i = 0; i < this->numActiveRanks; ++i) if (rank == -1 || rank == i) rankList.push_back(i); + // Build list of groups this applies to (-1 for groupId means to set for all) + std::vector groupList; + for (int i = 0; i < this->numGroupCalls; ++i) + if (groupId == -1 || groupId == i) groupList.push_back(i); + int const cmd = TestBedChild::CHILD_DEALLOCATE_MEM; - for (auto currRank : rankList) + for (auto currGroup : groupList) { - int const childId = rankToChildMap[currRank]; - PIPE_WRITE(childId, cmd); - PIPE_WRITE(childId, currRank); - PIPE_WRITE(childId, collId); - PIPE_CHECK(childId); + for (auto currRank : rankList) + { + int const childId = rankToChildMap[currRank]; + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, currRank); + PIPE_WRITE(childId, currGroup); + PIPE_WRITE(childId, collId); + PIPE_CHECK(childId); + } } - InteractiveWait("Finishing ValidateResults"); + InteractiveWait("Finishing DeallocateMem"); } void TestBed::DestroyComms() @@ -388,6 +473,27 @@ namespace RcclUnitTesting InteractiveWait("Finishing DestroyComms"); } + void TestBed::DestroyGraphs() + { + InteractiveWait("Starting DestroyGraphs"); + + int const cmd = TestBedChild::CHILD_DESTROY_GRAPHS; + for (int currGroup = 0; currGroup < this->numGroupCalls; ++currGroup) + { + for (int childId = 0; childId < this->numActiveChildren; ++childId) + { + // Send DestroyGraphs command to each active child process + PIPE_WRITE(childId, cmd); + PIPE_WRITE(childId, currGroup); + + // Wait for child acknowledgement + PIPE_CHECK(childId); + } + } + + InteractiveWait("Finishing DestroyGraphs"); + } + void TestBed::Finalize() { if (this->numActiveChildren == 0) @@ -422,7 +528,6 @@ namespace RcclUnitTesting // Reset bookkeeping this->numActiveChildren = 0; this->numActiveRanks = 0; - this->numCollectivesInGroup = 0; InteractiveWait("Finishing Finalize"); } @@ -442,6 +547,18 @@ namespace RcclUnitTesting return ev.GetAllSupportedDataTypes(); } + std::vector const TestBed::GetNumCollsPerGroup(int numCollectivesInGroup, + int numGroupCalls) + { + return std::vector(numGroupCalls, numCollectivesInGroup); + } + + std::vector const TestBed::GetNumStreamsPerGroup(int numStreamsPerGroup, + int numGroupCalls) + { + return std::vector(numGroupCalls, numStreamsPerGroup); + } + std::vector> TestBed::GetDeviceIdsList(int const numProcesses, int const numGpus) { @@ -618,7 +735,11 @@ namespace RcclUnitTesting } std::vector currentRanksEmpty = {}; - this->ExecuteCollectives(currentRanksEmpty, useHipGraphList[hgIdx]); + this->ExecuteCollectives(currentRanksEmpty, /*all groups*/ -1, useHipGraphList[hgIdx]); + if (useHipGraphList[hgIdx]) { + this->LaunchGraphs(); + this->DestroyGraphs(); + } if (testing::Test::HasFailure()) { isCorrect = false; diff --git a/test/common/TestBed.hpp b/test/common/TestBed.hpp index 74ba242f4d..dbc251ddf5 100644 --- a/test/common/TestBed.hpp +++ b/test/common/TestBed.hpp @@ -22,27 +22,37 @@ namespace RcclUnitTesting std::vector childList; // List of child processes std::vector rankToChildMap; // Tracks which child process each rank is assigned to std::vector rankToDeviceMap; // Tracks which device each rank is assigned to + std::vector numCollectivesInGroup; // # of collectives to execute per group call + std::vector numStreamsPerGroup; // # of different streams available per group call + int numGroupCalls; // Total # of group calls to be executed int numActiveChildren; // List of active children (with usable RCCL comms) int numActiveRanks; // Current # of ranks in use - int numCollectivesInGroup; // # of collectives to execute per group call bool useBlocking; // RCCL communication with blocking or non-blocking option - int numStreamsPerGroup; // # of different streams available per group call EnvVars ev; // Environment variables // Constructor - Creates one child process per detected GPU device that waits for further commands TestBed(); + // Prepare TestBed with multiple group call customization + void InitComms(std::vector> const& deviceIdsPerChild, + std::vector const& numCollectivesInGroup, + std::vector const& numStreamsPerGroup, + int const numGroupCalls = 1, + bool const useBlocking = true); + // Prepare TestBed for use with GPUs across multiple child processes void InitComms(std::vector> const& deviceIdsPerChild, int const numCollectivesInGroup = 1, - bool const useBlocking = true, - int const numStreamsPerGroup = 1); + int const numStreamsPerGroup = 1, + int const numGroupCalls = 1, + bool const useBlocking = true); // Prepare TestBed for use with GPUs on a single child process void InitComms(int const numGpus, int const numCollectivesInGroup = 1, - bool const useBlocking = true, - int const numStreamsPerGroup = 1); + int const numStreamsPerGroup = 1, + int const numGroupCalls = 1, + bool const useBlocking = true); // Set collectives arguments for specified collective / rank // Setting scalarsPerRank to non-null will create custom reduction operator @@ -54,6 +64,7 @@ namespace RcclUnitTesting size_t const numOutputElements, OptionalColArgs const &optionalArgs = {}, int const collId = -1, + int const groupId = 0, int const rank = -1, int const streamIdx = 0); @@ -61,33 +72,45 @@ namespace RcclUnitTesting // - Requires SetCollectiveArgs to have been called already // Using collId = -1 (default) applies settings to all collectives in group // Using rank = -1 (default) applies settings to all ranks + // Using groupIdx = -1 (default) applies setting to all groups void AllocateMem(bool const inPlace = false, bool const useManagedMem = false, - int const collId = -1, - int const rank = -1); + int const groupId = -1, + int const collId = -1, + int const rank = -1); // Initialize input and compute expected results // - requires that SetCollectiveArgs and AllocateMemory have already been called + // Setting groupId to -1 applies setting to all groups // Setting collId to -1 applies settings to all collectives in group // Setting rank to -1 applies settings to all ranks // Setting prepDataFunc to nullptr uses the default fill pattern routine - void PrepareData(int const collId = -1, - int const rank = -1, + void PrepareData(int const groupId = -1, + int const collId = -1, + int const rank = -1, CollFuncPtr const prepDataFunc = nullptr); // Execute all collectives on all test children // Blocks until collective is completed - void ExecuteCollectives(std::vector const ¤tRanks = {}, bool const useHipGraph = false); + void ExecuteCollectives(std::vector const ¤tRanks = {}, + int const groupId = -1, + bool const useHipGraph = false); // Perform results validation - compare output to expected - void ValidateResults(bool& isCorrect, int collId = -1, int const rank = -1); + void ValidateResults(bool& isCorrect, int const groupId = -1, int const collId = -1, int const rank = -1); + + // Launch instantiated graphs + void LaunchGraphs(int const groupId = -1); // Release allocated memory - void DeallocateMem(int collId = -1, int const rank = -1); + void DeallocateMem(int const groupId = -1, int const collId = -1, int const rank = -1); // Release the RCCL comms void DestroyComms(); + // Release created graphs + void DestroyGraphs(); + // Explicit TestBed destructor that releases all child processes // No further calls to TestBed should be performed after this call void Finalize(); @@ -101,6 +124,14 @@ namespace RcclUnitTesting // Return all the supported data types based on build settings std::vector const& GetAllSupportedDataTypes(); + // Return a list for # of collectives per group + std::vector const GetNumCollsPerGroup(int const numCollectivesInGroup, + int const numGroupCalls); + + // Return a list for # of streams per group + std::vector const GetNumStreamsPerGroup(int const numStreamsPerGroup, + int const numGroupCalls); + // Helper function that splits up GPUs to the given number of processes static std::vector> GetDeviceIdsList(int const numProcesses, int const numGpus, diff --git a/test/common/TestBedChild.cpp b/test/common/TestBedChild.cpp index bd8ed50014..cd9b8de8b3 100644 --- a/test/common/TestBedChild.cpp +++ b/test/common/TestBedChild.cpp @@ -97,8 +97,10 @@ namespace RcclUnitTesting case CHILD_PREPARE_DATA : status = PrepareData(); break; case CHILD_EXECUTE_COLL : status = ExecuteCollectives(); break; case CHILD_VALIDATE_RESULTS: status = ValidateResults(); break; + case CHILD_LAUNCH_GRAPHS : status = LaunchGraphs(); break; case CHILD_DEALLOCATE_MEM : status = DeallocateMem(); break; case CHILD_DESTROY_COMMS : status = DestroyComms(); break; + case CHILD_DESTROY_GRAPHS : status = DestroyGraphs(); break; case CHILD_STOP : goto stop; default: exit(0); } @@ -144,6 +146,7 @@ namespace RcclUnitTesting PIPE_READ(id); PIPE_READ(this->totalRanks); PIPE_READ(this->rankOffset); + PIPE_READ(this->numGroupCalls); PIPE_READ(this->numCollectivesInGroup); PIPE_READ(this->useBlocking); bool useMultiRankPerGpu; @@ -155,16 +158,29 @@ namespace RcclUnitTesting PIPE_READ(numGpus); this->deviceIds.resize(numGpus); this->streams.clear(); - this->streams.resize(numGpus); - this->collArgs.resize(numGpus); - for (int i = 0; i < numGpus; i++) + this->streams.resize(this->numGroupCalls); + this->collArgs.resize(this->numGroupCalls); + for (int i = 0; i < this->numGroupCalls; i++) { - PIPE_READ(this->deviceIds[i]); - this->collArgs[i].clear(); - this->collArgs[i].resize(numCollectivesInGroup); - this->streams[i].resize(numStreamsPerGroup); + this->collArgs[i].resize(numGpus); + this->streams[i].resize(numGpus); + for (int j = 0; j < numGpus; j++) + { + //PIPE_READ(this->deviceIds[j]); + this->collArgs[i][j].clear(); + this->collArgs[i][j].resize(numCollectivesInGroup[i]); + this->streams[i][j].resize(numStreamsPerGroup[i]); + } } + for (int i = 0; i < numGpus; i++) + PIPE_READ(this->deviceIds[i]); + + // Initialize graphs + this->graphs.resize(this->numGroupCalls); + this->graphExecs.resize(this->numGroupCalls); + this->graphEnabled.resize(this->numGroupCalls); + // Initialize communicators comms.clear(); comms.resize(numGpus); @@ -172,52 +188,57 @@ namespace RcclUnitTesting // Initialize within a group call to avoid deadlock when using multiple ranks per child ErrCode status = TEST_SUCCESS; CHILD_NCCL_CALL(ncclGroupStart(), "ncclGroupStart"); - for (int localRank = 0; localRank < numGpus; ++localRank) + for (int groupCallIdx = 0; groupCallIdx < this->numGroupCalls; ++groupCallIdx) { - int const globalRank = this->rankOffset + localRank; - int const currGpu = this->deviceIds[localRank]; - - if (hipSetDevice(currGpu) != hipSuccess) + for (int localRank = 0; localRank < numGpus; ++localRank) { - ERROR("Rank %d on child %d unable to switch to GPU %d\n", globalRank, this->childId, currGpu); - status = TEST_FAIL; - break; - } + int const globalRank = this->rankOffset + localRank; + int const currGpu = this->deviceIds[localRank]; - for (int i = 0; i < this->numStreamsPerGroup; i++) - { - if (hipStreamCreate(&(this->streams[localRank][i])) != hipSuccess) + if (hipSetDevice(currGpu) != hipSuccess) { - ERROR("Rank %d on child %d unable to create stream %d for GPU %d\n", globalRank, this->childId, i, currGpu); + ERROR("Rank %d on child %d unable to switch to GPU %d\n", globalRank, this->childId, currGpu); status = TEST_FAIL; break; } - } - if (useMultiRankPerGpu) - { - //if (ncclCommInitRankMulti(&this->comms[localRank], this->totalRanks, id, globalRank, globalRank) != ncclSuccess) + for (int i = 0; i < this->numStreamsPerGroup[groupCallIdx]; i++) { - ERROR("Rank %d on child %d unable to call ncclCommInitRankMulti\n", globalRank, this->childId); - status = TEST_FAIL; - break; + if (hipStreamCreate(&(this->streams[groupCallIdx][localRank][i])) != hipSuccess) + { + ERROR("Rank %d on child %d unable to create stream %d for GPU %d in group %d\n", globalRank, this->childId, i, currGpu, groupCallIdx); + status = TEST_FAIL; + break; + } } - } - else if (this->useBlocking == false) - { - // When non-blocking communicator is desired call ncclCommInitRankConfig with appropriate flag - ncclConfig_t config = NCCL_CONFIG_INITIALIZER; - config.blocking = 0; - ncclCommInitRankConfig(&this->comms[localRank], this->totalRanks, id, globalRank, &config); - CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorInitRankConfig", localRank); - } - else - { - if (ncclCommInitRank(&this->comms[localRank], this->totalRanks, id, globalRank) != ncclSuccess) - { - ERROR("Rank %d on child %d unable to call ncclCommInitRank\n", globalRank, this->childId); - status = TEST_FAIL; - break; + + if (groupCallIdx == 0) { + if (useMultiRankPerGpu) + { + //if (ncclCommInitRankMulti(&this->comms[localRank], this->totalRanks, id, globalRank, globalRank) != ncclSuccess) + { + ERROR("Rank %d on child %d unable to call ncclCommInitRankMulti\n", globalRank, this->childId); + status = TEST_FAIL; + break; + } + } + else if (this->useBlocking == false) + { + // When non-blocking communicator is desired call ncclCommInitRankConfig with appropriate flag + ncclConfig_t config = NCCL_CONFIG_INITIALIZER; + config.blocking = 0; + ncclCommInitRankConfig(&this->comms[localRank], this->totalRanks, id, globalRank, &config); + CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorInitRankConfig", localRank); + } + else + { + if (ncclCommInitRank(&this->comms[localRank], this->totalRanks, id, globalRank) != ncclSuccess) + { + ERROR("Rank %d on child %d unable to call ncclCommInitRank\n", globalRank, this->childId); + status = TEST_FAIL; + break; + } + } } } } @@ -256,6 +277,7 @@ namespace RcclUnitTesting // Read values sent by parent [see TestBed::SetCollectiveArgs()] int globalRank; int collId; + int groupId; ncclFunc_t funcType; ncclDataType_t dataType; size_t numInputElements; @@ -265,6 +287,7 @@ namespace RcclUnitTesting PIPE_READ(globalRank); PIPE_READ(collId); + PIPE_READ(groupId); PIPE_READ(funcType); PIPE_READ(dataType); PIPE_READ(numInputElements); @@ -280,19 +303,19 @@ namespace RcclUnitTesting int const localRank = globalRank - rankOffset; CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx) + for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx) { if (collId == -1 || collId == collIdx) { - CollectiveArgs& collArg = this->collArgs[localRank][collIdx]; + CollectiveArgs& collArg = this->collArgs[groupId][localRank][collIdx]; CHECK_CALL(collArg.SetArgs(globalRank, this->totalRanks, this->deviceIds[localRank], funcType, dataType, numInputElements, numOutputElements, streamIdx, options)); - if (this->verbose) INFO("Rank %d on child %d sets collective %d [%s]\n", - globalRank, this->childId, collIdx, + if (this->verbose) INFO("Rank %d on child %d sets collective %d in group %d [%s]\n", + globalRank, this->childId, collIdx, groupId, collArg.GetDescription().c_str()); // If pre-mult scalars are provided, then create a custom reduction operator @@ -304,8 +327,8 @@ namespace RcclUnitTesting (ncclScalarResidence_t)options.scalarMode, this->comms[localRank]), "ncclRedOpCreatePreMulSum"); - if (verbose) INFO("Child %d created custom redop %d for collective %d\n", - this->childId, collArg.options.redOp, collIdx); + if (verbose) INFO("Child %d created custom redop %d for group %d collective %d\n", + this->childId, collArg.options.redOp, groupId, collIdx); } } } @@ -322,11 +345,13 @@ namespace RcclUnitTesting int collId; bool inPlace; bool useManagedMem; + int groupId; PIPE_READ(globalRank); PIPE_READ(collId); PIPE_READ(inPlace); PIPE_READ(useManagedMem); + PIPE_READ(groupId); if (globalRank < this->rankOffset || (this->rankOffset + comms.size() <= globalRank)) { @@ -336,14 +361,14 @@ namespace RcclUnitTesting int const localRank = globalRank - rankOffset; CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx) + for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx) { if (collId == -1 || collId == collIdx) { - CollectiveArgs& collArg = this->collArgs[localRank][collIdx]; + CollectiveArgs& collArg = this->collArgs[groupId][localRank][collIdx]; CHECK_CALL(collArg.AllocateMem(inPlace, useManagedMem)); - if (this->verbose) INFO("Rank %d on child %d allocates memory for collective %d on device %d (%s,%s) Input: %p Output %p\n", - globalRank, this->childId, collIdx, this->deviceIds[localRank], + if (this->verbose) INFO("Rank %d on child %d allocates memory for collective %d in group %d on device %d (%s,%s) Input: %p Output %p\n", + globalRank, this->childId, collIdx, groupId, this->deviceIds[localRank], inPlace ? "in-place" : "out-of-place", useManagedMem ? "managed" : "unmanaged", collArg.inputGpu.ptr, @@ -363,9 +388,11 @@ namespace RcclUnitTesting // Read values sent by parent [see TestBed::PrepareData()] int globalRank; int collId; + int groupId; CollFuncPtr prepDataFunc; PIPE_READ(globalRank); + PIPE_READ(groupId); PIPE_READ(collId); PIPE_READ(prepDataFunc); @@ -378,13 +405,13 @@ namespace RcclUnitTesting int const localRank = globalRank - rankOffset; CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx) + for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx) { if (collId == -1 || collId == collIdx) { - if (this->verbose) INFO("Rank %d on child %d prepares data for collective %d\n", - globalRank, this->childId, collIdx); - CHECK_CALL(this->collArgs[localRank][collIdx].PrepareData(prepDataFunc)); + if (this->verbose) INFO("Rank %d on child %d prepares data for collective %d in group %d\n", + globalRank, this->childId, collIdx, groupId); + CHECK_CALL(this->collArgs[groupId][localRank][collIdx].PrepareData(prepDataFunc)); } } if (this->verbose) INFO("Child %d finishes PrepareData()\n", this->childId); @@ -394,9 +421,11 @@ namespace RcclUnitTesting ErrCode TestBedChild::ExecuteCollectives() { int timeoutUs = 0; - PIPE_READ(timeoutUs); - + int groupId = 0; bool useHipGraph = false; + + PIPE_READ(timeoutUs); + PIPE_READ(groupId); PIPE_READ(useHipGraph); int numRanksToExecute, tempRank; @@ -420,14 +449,14 @@ namespace RcclUnitTesting } numRanksToExecute = (int)localRanksToExecute.size(); - std::vector> graphs; - std::vector> graphExec; - graphs.resize(numRanksToExecute); - graphExec.resize(numRanksToExecute); + this->graphs[groupId].resize(numRanksToExecute); + this->graphExecs[groupId].resize(numRanksToExecute); + this->graphEnabled[groupId].resize(numRanksToExecute); for (int i = 0; i < numRanksToExecute; i++) { - graphs[i].resize(this->numStreamsPerGroup); - graphExec[i].resize(this->numStreamsPerGroup); + this->graphs[groupId][i].resize(this->numStreamsPerGroup[groupId]); + this->graphExecs[groupId][i].resize(this->numStreamsPerGroup[groupId]); + this->graphEnabled[groupId][i].resize(this->numStreamsPerGroup[groupId]); } // Start HIP graph stream capture if requested @@ -435,11 +464,11 @@ namespace RcclUnitTesting { for (int localRank : localRanksToExecute) { - if (this->verbose) INFO("Capturing stream for rank %d\n", localRank); + if (this->verbose) INFO("Capturing stream for group %d rank %d\n", groupId, localRank); CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int i = 0; i < this->numStreamsPerGroup; i++) + for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++) { - CHECK_HIP(hipStreamBeginCapture(this->streams[localRank][i], hipStreamCaptureModeRelaxed)); + CHECK_HIP(hipStreamBeginCapture(this->streams[groupId][localRank][i], hipStreamCaptureModeRelaxed)); } } } @@ -448,14 +477,14 @@ namespace RcclUnitTesting CHILD_NCCL_CALL(ncclGroupStart(), "ncclGroupStart"); // Loop over all collectives to be executed in group call - for (int collId = 0; collId < this->numCollectivesInGroup; ++collId) + for (int collId = 0; collId < this->numCollectivesInGroup[groupId]; ++collId) { // Loop over all local ranks for (int localRank : localRanksToExecute) { CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - CollectiveArgs const& collArg = this->collArgs[localRank][collId]; + CollectiveArgs const& collArg = this->collArgs[groupId][localRank][collId]; if (this->printValues && !useHipGraph) { @@ -464,14 +493,14 @@ namespace RcclUnitTesting size_t const numInputBytes = numInputElementsToPrint * DataTypeToBytes(collArg.dataType); inputCpu.AllocateCpuMem(numInputBytes); CHECK_HIP(hipMemcpy(inputCpu.ptr, collArg.inputGpu.ptr, numInputBytes, hipMemcpyDeviceToHost)); - printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Input", + printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Input", inputCpu.ToString(collArg.dataType, numInputElementsToPrint).c_str()); inputCpu.FreeCpuMem(); int const numOutputElementsToPrint = (this->printValues < 0 ? collArg.numOutputElements : this->printValues); size_t const numOutputBytes = numOutputElementsToPrint * DataTypeToBytes(collArg.dataType); CHECK_HIP(hipMemcpy(collArg.outputCpu.ptr, collArg.outputGpu.ptr, numOutputBytes, hipMemcpyDeviceToHost)); - printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Pre-Output", + printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Pre-Output", collArg.outputCpu.ToString(collArg.dataType, numOutputElementsToPrint).c_str()); } @@ -484,7 +513,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclBroadcast"); break; case ncclCollReduce: @@ -495,7 +524,7 @@ namespace RcclUnitTesting collArg.options.redOp, collArg.options.root, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclReduce"); break; case ncclCollAllGather: @@ -504,7 +533,7 @@ namespace RcclUnitTesting collArg.numInputElements, collArg.dataType, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclAllGather"); break; case ncclCollReduceScatter: @@ -514,7 +543,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.redOp, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclReduceScatter"); break; case ncclCollAllReduce: @@ -524,7 +553,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.redOp, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclAllReduce"); break; case ncclCollGather: @@ -534,7 +563,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclGather"); break; case ncclCollScatter: @@ -544,7 +573,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclScatter"); break; case ncclCollAllToAll: @@ -553,7 +582,7 @@ namespace RcclUnitTesting collArg.numInputElements / collArg.totalRanks, collArg.dataType, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclAllToAll"); break; case ncclCollAllToAllv: @@ -565,7 +594,7 @@ namespace RcclUnitTesting collArg.options.rdispls + (this->rankOffset + localRank)*this->totalRanks, collArg.dataType, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclAllToAllv"); break; case ncclCollSend: @@ -574,7 +603,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclSend"); break; case ncclCollRecv: @@ -583,7 +612,7 @@ namespace RcclUnitTesting collArg.dataType, collArg.options.root, this->comms[localRank], - this->streams[localRank][collArg.streamIdx]), + this->streams[groupId][localRank][collArg.streamIdx]), "ncclRecv"); break; default: @@ -624,33 +653,24 @@ namespace RcclUnitTesting { if (this->verbose) INFO("Ending stream capture for rank %d\n", localRank); CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int i = 0; i < this->numStreamsPerGroup; i++) + for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++) { - CHECK_HIP(hipStreamEndCapture(this->streams[localRank][i], &graphs[localRank][i])); + CHECK_HIP(hipStreamEndCapture(this->streams[groupId][localRank][i], &this->graphs[groupId][localRank][i])); - if (this->verbose) - { - size_t numNodes; - hipGraphNode_t* nodes; - CHECK_HIP(hipGraphGetNodes(graphs[localRank][i], nodes, &numNodes)); - INFO("Graph for rank %d stream %d has %lu nodes\n", localRank, i, numNodes); - } + // if (this->verbose) + // { + // size_t numNodes; + // hipGraphNode_t* nodes; + // CHECK_HIP(hipGraphGetNodes(graphs[localRank][i], nodes, &numNodes)); + // INFO("Graph for rank %d stream %d has %lu nodes\n", localRank, i, numNodes); + // } } - if (this->verbose) INFO("Instantiating executable graph for rank %d\n", localRank); - for (int i = 0; i < this->numStreamsPerGroup; i++) + if (this->verbose) INFO("Instantiating executable graph for group %d rank %d\n", groupId, localRank); + for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++) { - CHECK_HIP(hipGraphInstantiate(&graphExec[localRank][i], graphs[localRank][i], NULL, NULL, 0)); - } - } - - for (int localRank : localRanksToExecute) - { - if (this->verbose) INFO("Launch graph for rank %d\n", localRank); - CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int i = 0; i < this->numStreamsPerGroup; i++) - { - CHECK_HIP(hipGraphLaunch(graphExec[localRank][i], this->streams[localRank][i])); + CHECK_HIP(hipGraphInstantiate(&this->graphExecs[groupId][localRank][i], this->graphs[groupId][localRank][i], NULL, NULL, 0)); + graphEnabled[groupId][localRank][i] = true; } } } @@ -664,8 +684,8 @@ namespace RcclUnitTesting std::vector streamsToComplete; for (int localRank : localRanksToExecute) { - for (int i = 0; i < this->numStreamsPerGroup; i++) - streamsToComplete.push_back(this->streams[localRank][i]); + for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++) + streamsToComplete.push_back(this->streams[groupId][localRank][i]); } int usElapsed = 0, timedout = 0; using namespace std::chrono; @@ -701,40 +721,25 @@ namespace RcclUnitTesting // of fencing between kernels and at hipStreamQuery for (int localRank : localRanksToExecute) { - if (this->verbose) INFO("Starting synchronization for rank %d\n", localRank); - for (int i = 0; i < this->numStreamsPerGroup; i++) - CHECK_HIP(hipStreamSynchronize(this->streams[localRank][i])); - } - - // Destroy graphs - if (useHipGraph) - { - for (int localRank : localRanksToExecute) - { - if (this->verbose) INFO("Destroying graphs for rank %d\n", localRank); - CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int i = 0; i < this->numStreamsPerGroup; i++) - { - CHECK_HIP(hipGraphDestroy(graphs[localRank][i])); - CHECK_HIP(hipGraphExecDestroy(graphExec[localRank][i])); - } - } + if (this->verbose) INFO("Starting synchronization for group %d rank %d\n", groupId, localRank); + for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++) + CHECK_HIP(hipStreamSynchronize(this->streams[groupId][localRank][i])); } if (this->printValues) { - for (int collId = 0; collId < this->numCollectivesInGroup; ++collId) + for (int collId = 0; collId < this->numCollectivesInGroup[groupId]; ++collId) for (int localRank : localRanksToExecute) { - CollectiveArgs const& collArg = this->collArgs[localRank][collId]; + CollectiveArgs const& collArg = this->collArgs[groupId][localRank][collId]; CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); int numOutputElementsToPrint = (this->printValues < 0 ? collArg.numOutputElements : this->printValues); size_t const numOutputBytes = numOutputElementsToPrint * DataTypeToBytes(collArg.dataType); CHECK_HIP(hipMemcpy(collArg.outputCpu.ptr, collArg.outputGpu.ptr, numOutputBytes, hipMemcpyDeviceToHost)); - printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Output", + printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Output", collArg.outputCpu.ToString(collArg.dataType, numOutputElementsToPrint).c_str()); - printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Expected", + printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Expected", collArg.expected.ToString(collArg.dataType, numOutputElementsToPrint).c_str()); } } @@ -752,8 +757,9 @@ namespace RcclUnitTesting ErrCode TestBedChild::ValidateResults() { // Read values sent by parent [see TestBed::ValidateResults()] - int globalRank, collId; + int globalRank, groupId, collId; PIPE_READ(globalRank); + PIPE_READ(groupId); PIPE_READ(collId); if (this->verbose) INFO("Child %d begins ValidateResults()\n", this->childId); @@ -767,15 +773,15 @@ namespace RcclUnitTesting CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); ErrCode status = TEST_SUCCESS; - for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx) + for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx) { if (collId == -1 || collId == collIdx) { - if (this->verbose) INFO("Rank %d on child %d validating collective %d results\n", - globalRank, this->childId, collIdx); - if (this->collArgs[localRank][collIdx].ValidateResults() != TEST_SUCCESS) + if (this->verbose) INFO("Rank %d on child %d validating collective %d in group %d results\n", + globalRank, this->childId, collIdx, groupId); + if (this->collArgs[groupId][localRank][collIdx].ValidateResults() != TEST_SUCCESS) { - ERROR("Rank %d Collective %d output does not match expected\n", globalRank, collIdx); + ERROR("Rank %d Group %d Collective %d output does not match expected\n", globalRank, groupId, collIdx); status = TEST_FAIL; } } @@ -785,13 +791,35 @@ namespace RcclUnitTesting return status; } + ErrCode TestBedChild::LaunchGraphs() + { + int groupId; + PIPE_READ(groupId); + + if (this->verbose) INFO("Child %d begins LaunchGraphs for group %d\n", this->childId, groupId); + + for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank) { + CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); + + for (int streamIdx = 0; streamIdx < this->numStreamsPerGroup[groupId]; ++streamIdx) + { + if (this->verbose) INFO("Launch graph for group %d rank %d stream %d\n", groupId, localRank, streamIdx); + CHECK_HIP(hipGraphLaunch(this->graphExecs[groupId][localRank][streamIdx], this->streams[groupId][localRank][streamIdx])); + } + } + + if (this->verbose) INFO("Child %d finishes LaunchGraphs for group %d\n", this->childId, groupId); + return TEST_SUCCESS; + } + ErrCode TestBedChild::DeallocateMem() { if (this->verbose) INFO("Child %d begins DeallocateMem\n", this->childId); // Read values sent by parent [see TestBed::DeallocateMem()] - int globalRank, collId; + int globalRank, groupId, collId; PIPE_READ(globalRank); + PIPE_READ(groupId); PIPE_READ(collId); if (globalRank < this->rankOffset || (this->rankOffset + comms.size() <= globalRank)) @@ -802,15 +830,15 @@ namespace RcclUnitTesting int const localRank = globalRank - rankOffset; CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); - for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx) + for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx) { - CollectiveArgs& collArg = this->collArgs[localRank][collIdx]; + CollectiveArgs& collArg = this->collArgs[groupId][localRank][collIdx]; if (collId == -1 || collId == collIdx) { if (this->verbose) { - INFO("Child %d release memory for collective %d (Input: %p Output %p\n", - this->childId, collIdx, collArg.inputGpu.ptr, collArg.outputGpu.ptr); + INFO("Child %d release memory for collective %d in group %d (Input: %p Output %p\n", + this->childId, collIdx, groupId, collArg.inputGpu.ptr, collArg.outputGpu.ptr); } CHECK_CALL(collArg.DeallocateMem()); @@ -819,8 +847,8 @@ namespace RcclUnitTesting { CHILD_NCCL_CALL(ncclRedOpDestroy(collArg.options.redOp, this->comms[localRank]), "ncclRedOpDestroy"); - if (verbose) INFO("Child %d destroys custom redop %d for collective %d\n", - this->childId, collArg.options.redOp, collIdx); + if (verbose) INFO("Child %d destroys custom redop %d for collective %d in group %d\n", + this->childId, collArg.options.redOp, collIdx, groupId); } } if (this->verbose) INFO("Child %d finishes DeallocateMem\n", this->childId); @@ -852,11 +880,14 @@ namespace RcclUnitTesting { CHILD_NCCL_CALL(ncclCommDestroy(this->comms[i]), "ncclCommDestroy"); } - for (int i = 0; i < this->streams.size(); ++i) + for (int i = 0; i < this->numGroupCalls; ++i) { - for (int j = 0; j < this->numStreamsPerGroup; j++) + for (int j = 0; j < this->streams[i].size(); ++j) { - CHECK_HIP(hipStreamDestroy(this->streams[i][j])); + for (int k = 0; k < this->streams[i][j].size(); ++k) + { + CHECK_HIP(hipStreamDestroy(this->streams[i][j][k])); + } } } this->comms.clear(); @@ -864,4 +895,41 @@ namespace RcclUnitTesting if (this->verbose) INFO("Child %d finishes DestroyComms\n", this->childId); return TEST_SUCCESS; } + + ErrCode TestBedChild::DestroyGraphs() + { + if (this->verbose) INFO("Child %d begins DestroyGraphs\n", this->childId); + + int groupId; + PIPE_READ(groupId); + + // Release graphs + for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank) + { + CHECK_HIP(hipSetDevice(this->deviceIds[localRank])); + for (int streamIdx = 0; streamIdx < this->numStreamsPerGroup[groupId]; ++streamIdx) + { + if (graphEnabled[groupId][localRank][streamIdx]) + { + if (this->verbose) INFO("Destroying graphs for group %d rank %d stream %d\n", groupId, localRank, streamIdx); + + CHECK_HIP(hipGraphDestroy(this->graphs[groupId][localRank][streamIdx])); + CHECK_HIP(hipGraphExecDestroy(this->graphExecs[groupId][localRank][streamIdx])); + } + } + } + + for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank) + { + for (int i = 0; i < this->numStreamsPerGroup[groupId]; ++i) + CHECK_HIP(hipStreamSynchronize(this->streams[groupId][localRank][i])); + } + + this->graphs[groupId].clear(); + this->graphExecs[groupId].clear(); + this->graphEnabled[groupId].clear(); + + if (this->verbose) INFO("Child %d finishes DestroyGraphs\n", this->childId); + return TEST_SUCCESS; + } } diff --git a/test/common/TestBedChild.hpp b/test/common/TestBedChild.hpp index 9e94c09b25..fdb65964ba 100644 --- a/test/common/TestBedChild.hpp +++ b/test/common/TestBedChild.hpp @@ -1,5 +1,5 @@ /************************************************************************* - * Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ @@ -28,10 +28,12 @@ namespace RcclUnitTesting CHILD_PREPARE_DATA = 4, // PrepareData() CHILD_EXECUTE_COLL = 5, // ExecuteCollectives() CHILD_VALIDATE_RESULTS = 6, // ValidateResults() - CHILD_DEALLOCATE_MEM = 7, // DeallocateMem() - CHILD_DESTROY_COMMS = 8, // DestroyComms() - CHILD_STOP = 9, // Stop() - NUM_CHILD_COMMANDS = 10 + CHILD_LAUNCH_GRAPHS = 7, // LaunchGraphs() + CHILD_DEALLOCATE_MEM = 8, // DeallocateMem() + CHILD_DESTROY_COMMS = 9, // DestroyComms() + CHILD_DESTROY_GRAPHS = 10, // DestroyGraphs() + CHILD_STOP = 11, // Stop() + NUM_CHILD_COMMANDS = 12 }; char const ChildCommandNames[NUM_CHILD_COMMANDS][20] = @@ -43,8 +45,10 @@ namespace RcclUnitTesting "PREPARE_DATA", "EXECUTE_COLL", "VALIDATE_RESULTS", + "LAUNCH_GRAPHS", "DEALLOCATE_MEM", "DESTROY_COMMS", + "DESTROY_GRAPHS", "STOP" }; @@ -61,15 +65,19 @@ namespace RcclUnitTesting int childReadFd; // These varibles may change based on commands issued by parent - int totalRanks; // Total ranks - int rankOffset; // Global rank offset for this child - int numCollectivesInGroup; // # of collectives to run per group call - bool useBlocking; // RCCL communication with blocking or non-blocking option - int numStreamsPerGroup; // # of different streams allowed per group call - std::vector comms; // RCCL communicators for each rank - std::vector deviceIds; // Device IDs for each rank - std::vector> streams; // Streams for executing collectives - std::vector> collArgs; // Info for each collective for each rank + int totalRanks; // Total ranks + int rankOffset; // Global rank offset for this child + int numGroupCalls; // Toatal # of group calls to be executed + bool useBlocking; // RCCL communication with blocking or non-blocking option + std::vector numCollectivesInGroup; // # of collectives to run per group call + std::vector numStreamsPerGroup; // # of different streams allowed per group call + std::vector comms; // RCCL communicators for each rank + std::vector deviceIds; // Device IDs for each rank + std::vector>> streams; // Streams for executing collectives per group call + std::vector>> collArgs; // Info for each collective for each rank per group call + std::vector>> graphs; // Graphs for executing collectives per group call + std::vector>> graphExecs; // GraphExecs for executing collectives per group call + std::vector>> graphEnabled; // Constructor TestBedChild(int const childId, bool const verbose, int const printValues); @@ -102,10 +110,16 @@ namespace RcclUnitTesting // Validate that output matches expected ErrCode ValidateResults(); + // Launch instantiated graphs + ErrCode LaunchGraphs(); + // Release allocated memory ErrCode DeallocateMem(); // Destroys RCCL communicators ErrCode DestroyComms(); + + // Destroys graphs + ErrCode DestroyGraphs(); }; }