[UT] Start supporting multiple group calls and graphs (#1151)

* Start supporting multiple group calls UT
This commit is contained in:
Bertan Dogancay
2024-04-25 11:11:16 -06:00
committed by GitHub
parent efe99057b0
commit 0ec41f1386
8 changed files with 566 additions and 240 deletions
+2
View File
@@ -98,6 +98,7 @@ namespace RcclUnitTesting
numOutputElements[rank],
options,
-1,
0,
rank);
}
testBed.AllocateMem(inPlace, useManagedMem);
@@ -154,6 +155,7 @@ namespace RcclUnitTesting
numOutputElements[rank],
options,
-1,
0,
rank);
}
testBed.AllocateMem(inPlace, useManagedMem);
+89 -2
View File
@@ -195,14 +195,14 @@ namespace RcclUnitTesting
isMultiProcess ? "MP" : "SP", totalRanks, numCollPerGroup, numStreamsPerGroup);
testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks),
numCollPerGroup, true, numStreamsPerGroup);
numCollPerGroup, numStreamsPerGroup);
// Set up each collective in group in different stream (modulo numStreamsPerGroup)
options.redOp = ncclSum;
for (int collIdx = 0; collIdx < numCollPerGroup; ++collIdx)
{
testBed.SetCollectiveArgs(ncclCollAllReduce, ncclFloat, numElements, numElements,
options, collIdx, -1, collIdx % numStreamsPerGroup);
options, collIdx, 0, -1, collIdx % numStreamsPerGroup);
}
testBed.AllocateMem(inPlace, useManagedMem);
@@ -216,4 +216,91 @@ namespace RcclUnitTesting
}
testBed.Finalize();
}
TEST(GroupCall, MultiGroupCall)
{
TestBed testBed;
// Configuration
std::vector<std::vector<ncclFunc_t>> const groupCalls = {{ncclCollAllReduce, ncclCollAllGather},
{ncclCollAllToAll, ncclCollGather},
{ncclCollBroadcast, ncclCollReduceScatter}};
std::vector<std::vector<int>> const numElements = {{1250, 1048576}, {384, 384 * 1024}, {1048576, 127}};
std::vector<ncclDataType_t> const dataTypes = {ncclFloat16, ncclFloat32, ncclBfloat16};
std::vector<ncclRedOp_t> const redops = {ncclSum, ncclProd, ncclMax};
std::vector<int> const numCollsPerGroup = {2, 2, 2};
std::vector<int> const numStreamsPerGroup = {1, 1, 1};
std::vector<bool> const useHipGraphList = {true, false, true};
bool const inPlace = false;
bool const useManagedMem = false;
bool const useBlocking = true;
int const numGroupCalls = groupCalls.size();
int const numIterations = 10;
bool isCorrect = true;
for (int totalRanks : testBed.ev.GetNumGpusList())
for (int isMultiProcess : testBed.ev.GetIsMultiProcessList())
{
int const numProcesses = isMultiProcess ? totalRanks : 1;
// Initialize comms by specifying the # of group calls
testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), numCollsPerGroup, numStreamsPerGroup, numGroupCalls, useBlocking);
if (testBed.ev.showNames)
INFO("%s %d-ranks GroupCall MultiGroupCall\n", isMultiProcess ? "MP" : "SP", totalRanks);
for (int groupCallIdx = 0; groupCallIdx < groupCalls.size(); ++groupCallIdx)
{
std::vector<ncclFunc_t> funcTypes = groupCalls[groupCallIdx];
OptionalColArgs options;
options.redOp = redops[groupCallIdx];
options.root = 0;
for (int collIdx = 0; collIdx < numCollsPerGroup[groupCallIdx]; ++collIdx)
{
int numInputElements;
int numOutputElements;
CollectiveArgs::GetNumElementsForFuncType(funcTypes[collIdx],
numElements[groupCallIdx][collIdx],
totalRanks,
&numInputElements,
&numOutputElements);
testBed.SetCollectiveArgs(funcTypes[collIdx],
dataTypes[groupCallIdx],
numInputElements,
numOutputElements,
options,
collIdx,
groupCallIdx);
}
testBed.AllocateMem(inPlace, useManagedMem, groupCallIdx);
testBed.PrepareData(groupCallIdx);
// Stream capture in advance for HIP graph enabled collective groups
if (useHipGraphList[groupCallIdx])
{
testBed.ExecuteCollectives({}, groupCallIdx, useHipGraphList[groupCallIdx]);
}
}
// Execute collectives based on groupIdx
for (int i = 0; i < numIterations; ++i)
{
// Select a random group call
int groupCallIdx = i % groupCalls.size();
// Use graphs if enabled otherwise execute the collective
if (useHipGraphList[groupCallIdx]) testBed.LaunchGraphs(groupCallIdx);
else testBed.ExecuteCollectives({}, groupCallIdx);
testBed.ValidateResults(isCorrect, groupCallIdx);
}
testBed.DeallocateMem();
testBed.DestroyGraphs();
testBed.DestroyComms();
}
testBed.Finalize();
}
}
+1 -1
View File
@@ -34,7 +34,7 @@ namespace RcclUnitTesting
{
int const numProcesses = isMultiProcess ? totalRanks : 1;
// Initialize communicators in non-blocking mode
testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1, useBlocking);
testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1, 1, 1, useBlocking);
// Loop over various collective functions
for (auto funcType : funcTypes)
+10 -7
View File
@@ -16,6 +16,7 @@ namespace RcclUnitTesting
std::vector<int> const numElements = {1048576, 53327, 1024, 0};
bool const inPlace = false;
bool const useManagedMem = false;
int const groupCallId = 0;
OptionalColArgs options;
bool isCorrect = true;
@@ -42,11 +43,12 @@ namespace RcclUnitTesting
numElements[numIdx],
options,
0,
groupCallId,
sendRank);
if (recvRank == 0)
{
testBed.AllocateMem(inPlace, useManagedMem, 0, sendRank);
testBed.PrepareData(0, sendRank);
testBed.AllocateMem(inPlace, useManagedMem, groupCallId, 0, sendRank);
testBed.PrepareData(groupCallId, 0, sendRank);
}
if (recvRank != sendRank)
{
@@ -65,15 +67,16 @@ namespace RcclUnitTesting
numElements[numIdx],
options,
0,
groupCallId,
recvRank);
testBed.AllocateMem(inPlace, useManagedMem, 0, recvRank);
testBed.PrepareData(0, recvRank);
testBed.AllocateMem(inPlace, useManagedMem, groupCallId, 0, recvRank);
testBed.PrepareData(groupCallId, 0, recvRank);
testBed.ExecuteCollectives({sendRank, recvRank});
testBed.ValidateResults(isCorrect, 0, recvRank);
testBed.DeallocateMem(0, recvRank);
testBed.ValidateResults(isCorrect, groupCallId, 0, recvRank);
testBed.DeallocateMem(groupCallId, 0, recvRank);
}
}
testBed.DeallocateMem(0, sendRank);
testBed.DeallocateMem(groupCallId, 0, sendRank);
}
testBed.DestroyComms();
}
+179 -58
View File
@@ -58,15 +58,17 @@ namespace RcclUnitTesting
}
void TestBed::InitComms(std::vector<std::vector<int>> const& deviceIdsPerProcess,
int const numCollectivesInGroup,
bool const useBlocking,
int const numStreamsPerGroup)
std::vector<int> const& numCollectivesInGroup,
std::vector<int> const& numStreamsPerGroup,
int const numGroupCalls,
bool const useBlocking)
{
InteractiveWait("Starting InitComms");
// Count up the total number of GPUs to use and track child/deviceId per rank
this->numActiveChildren = deviceIdsPerProcess.size();
this->numActiveRanks = 0;
this->numGroupCalls = numGroupCalls;
this->numCollectivesInGroup = numCollectivesInGroup;
this->useBlocking = useBlocking;
this->numStreamsPerGroup = numStreamsPerGroup;
@@ -150,6 +152,9 @@ namespace RcclUnitTesting
// Send the rank offset for this child process
PIPE_WRITE(childId, rankOffset);
// Send the total number of group calls for this child process
PIPE_WRITE(childId, numGroupCalls);
// Send the number of collectives to be run per group call
PIPE_WRITE(childId, numCollectivesInGroup);
@@ -180,9 +185,15 @@ namespace RcclUnitTesting
InteractiveWait("Finishing InitComms");
}
void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, bool const useBlocking, int const numStreamsPerGroup)
void TestBed::InitComms(std::vector<std::vector<int>> const& deviceIdsPerProcess,
int const numCollectivesInGroup, int const numStreamsPerGroup, int const numGroupCalls, bool const useBlocking)
{
InitComms(TestBed::GetDeviceIdsList(1, numGpus), numCollectivesInGroup, useBlocking, numStreamsPerGroup);
InitComms(deviceIdsPerProcess, TestBed::GetNumCollsPerGroup(numCollectivesInGroup, numGroupCalls), TestBed::GetNumStreamsPerGroup(numStreamsPerGroup, numGroupCalls), numGroupCalls, useBlocking);
}
void TestBed::InitComms(int const numGpus, int const numCollectivesInGroup, int const numStreamsPerGroup, int const numGroupCalls, bool const useBlocking)
{
InitComms(TestBed::GetDeviceIdsList(1, numGpus), TestBed::GetNumCollsPerGroup(numCollectivesInGroup, numGroupCalls), TestBed::GetNumStreamsPerGroup(numStreamsPerGroup, numGroupCalls), numGroupCalls, useBlocking);
}
void TestBed::SetCollectiveArgs(ncclFunc_t const funcType,
@@ -191,6 +202,7 @@ namespace RcclUnitTesting
size_t const numOutputElements,
OptionalColArgs const &optionalArgs,
int const collId,
int const groupId,
int const rank,
int const streamIdx)
{
@@ -200,9 +212,9 @@ namespace RcclUnitTesting
for (int i = 0; i < this->numActiveRanks; ++i)
if (rank == -1 || rank == i) rankList.push_back(i);
if (streamIdx < 0 || streamIdx >= this->numStreamsPerGroup)
if (streamIdx < 0 || streamIdx >= this->numStreamsPerGroup[groupId])
{
ERROR("StreamIdx for collective %d is out of bounds (%d/%d):\n", collId, streamIdx, numStreamsPerGroup);
ERROR("StreamIdx for group %d collective %d is out of bounds (%d/%d):\n", groupId, collId, streamIdx, numStreamsPerGroup[groupId]);
FAIL();
}
@@ -214,6 +226,7 @@ namespace RcclUnitTesting
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, collId);
PIPE_WRITE(childId, groupId);
PIPE_WRITE(childId, funcType);
PIPE_WRITE(childId, dataType);
PIPE_WRITE(childId, numInputElements);
@@ -227,6 +240,7 @@ namespace RcclUnitTesting
void TestBed::AllocateMem(bool const inPlace,
bool const useManagedMem,
int const groupId,
int const collId,
int const rank)
{
@@ -236,23 +250,32 @@ namespace RcclUnitTesting
std::vector<int> rankList;
for (int i = 0; i < this->numActiveRanks; ++i)
if (rank == -1 || rank == i) rankList.push_back(i);
// Build list of groups this applies to (-1 for groupId means to set for all)
std::vector<int> groupList;
for (int i = 0; i < this->numGroupCalls; ++i)
if (groupId == -1 || groupId == i) groupList.push_back(i);
// Loop over all ranks and send allocation command to appropriate child process
int const cmd = TestBedChild::CHILD_ALLOCATE_MEM;
for (auto currRank : rankList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, collId);
PIPE_WRITE(childId, inPlace);
PIPE_WRITE(childId, useManagedMem);
PIPE_CHECK(childId);
for (auto currGroup : groupList) {
for (auto currRank : rankList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, collId);
PIPE_WRITE(childId, inPlace);
PIPE_WRITE(childId, useManagedMem);
PIPE_WRITE(childId, currGroup);
PIPE_CHECK(childId);
}
}
InteractiveWait("Finishing AllocateMem");
}
void TestBed::PrepareData(int const collId,
void TestBed::PrepareData(int const groupId,
int const collId,
int const rank,
CollFuncPtr const prepDataFunc)
{
@@ -261,22 +284,32 @@ namespace RcclUnitTesting
std::vector<int> rankList;
for (int i = 0; i < this->numActiveRanks; ++i)
if (rank == -1 || rank == i) rankList.push_back(i);
// Build list of groups this applies to (-1 for groupId means to set for all)
std::vector<int> groupList;
for (int i = 0; i < this->numGroupCalls; ++i)
if (groupId == -1 || groupId == i) groupList.push_back(i);
// Loop over all ranks and send prepare data command to appropriate child process
int const cmd = TestBedChild::CHILD_PREPARE_DATA;
for (auto currRank : rankList)
for (auto currGroup : groupList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, collId);
PIPE_WRITE(childId, prepDataFunc);
PIPE_CHECK(childId);
for (auto currRank : rankList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, currGroup);
PIPE_WRITE(childId, collId);
PIPE_WRITE(childId, prepDataFunc);
PIPE_CHECK(childId);
}
}
InteractiveWait("Finishing PrepareData");
}
void TestBed::ExecuteCollectives(std::vector<int> const &currentRanks, bool const useHipGraph)
void TestBed::ExecuteCollectives(std::vector<int> const &currentRanks, int const groupId,
bool const useHipGraph)
{
InteractiveWait("Starting ExecuteCollectives");
@@ -289,19 +322,27 @@ namespace RcclUnitTesting
ranksPerChild[rankToChildMap[currentRanks[rank]]].push_back(rank);
}
// Send ExecuteColl command to each active child process
for (int childId = 0; childId < this->numActiveChildren; ++childId)
{
if ((currentRanks.size() == 0) || (ranksPerChild[childId].size() > 0))
// Build list of groups this applies to (-1 for groupId means to set for all)
std::vector<int> groupList;
for (int i = 0; i < this->numGroupCalls; ++i)
if (groupId == -1 || groupId == i) groupList.push_back(i);
for (auto currGroup : groupList) {
// Send ExecuteColl command to each active child process
for (int childId = 0; childId < this->numActiveChildren; ++childId)
{
InteractiveWait("Starting ExecuteCollectives for child " + std::to_string(childId));
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, ev.timeoutUs);
PIPE_WRITE(childId, useHipGraph);
int tempCurrentRanks = currentRanks.size();
PIPE_WRITE(childId, tempCurrentRanks);
for (int rank = 0; rank < currentRanks.size(); ++rank){
PIPE_WRITE(childId, currentRanks[rank]);
if ((currentRanks.size() == 0) || (ranksPerChild[childId].size() > 0))
{
InteractiveWait("Starting ExecuteCollectives for child " + std::to_string(childId));
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, ev.timeoutUs);
PIPE_WRITE(childId, currGroup);
PIPE_WRITE(childId, useHipGraph);
int tempCurrentRanks = currentRanks.size();
PIPE_WRITE(childId, tempCurrentRanks);
for (int rank = 0; rank < currentRanks.size(); ++rank){
PIPE_WRITE(childId, currentRanks[rank]);
}
}
}
}
@@ -315,7 +356,7 @@ namespace RcclUnitTesting
InteractiveWait("Finishing ExecuteCollectives");
}
void TestBed::ValidateResults(bool& isCorrect, int const collId, int const rank)
void TestBed::ValidateResults(bool& isCorrect, int const groupId, int const collId, int const rank)
{
InteractiveWait("Starting ValidateResults");
@@ -323,21 +364,30 @@ namespace RcclUnitTesting
std::vector<int> rankList;
for (int i = 0; i < this->numActiveRanks; ++i)
if (rank == -1 || rank == i) rankList.push_back(i);
// Build list of groups this applies to (-1 for groupId means to set for all)
std::vector<int> groupList;
for (int i = 0; i < this->numGroupCalls; ++i)
if (groupId == -1 || groupId == i) groupList.push_back(i);
int const cmd = TestBedChild::CHILD_VALIDATE_RESULTS;
isCorrect = true;
// Send ValidateResults command to each active child process
for (auto currRank : rankList)
for (auto currGroup : groupList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, collId);
// Send ValidateResults command to each active child process
for (auto currRank : rankList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, currGroup);
PIPE_WRITE(childId, collId);
int response = 0;
ASSERT_EQ(read(childList[childId]->parentReadFd, &response, sizeof(int)), sizeof(int));
isCorrect &= (response == TEST_SUCCESS);
int response = 0;
ASSERT_EQ(read(childList[childId]->parentReadFd, &response, sizeof(int)), sizeof(int));
isCorrect &= (response == TEST_SUCCESS);
}
}
ASSERT_EQ(isCorrect, true) << "Output does not match expected";
@@ -345,27 +395,62 @@ namespace RcclUnitTesting
InteractiveWait("Finishing ValidateResults");
}
void TestBed::DeallocateMem(int const collId, int const rank)
void TestBed::LaunchGraphs(int const groupId)
{
InteractiveWait("Starting ValidateResults");
InteractiveWait("Starting LaunchGraphs");
// Build list of groups this applies to (-1 for groupId means to set for all)
std::vector<int> groupList;
for (int i = 0; i < this->numGroupCalls; ++i)
if (groupId == -1 || groupId == i) groupList.push_back(i);
int const cmd = TestBedChild::CHILD_LAUNCH_GRAPHS;
for (auto currGroup : groupList)
{
for (int childId = 0; childId < this->numActiveChildren; ++childId)
{
// Send LaunchGraphs command to each active child process
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currGroup);
// Wait for child acknowledgement
PIPE_CHECK(childId);
}
}
InteractiveWait("Finishing LaunchGraphs");
}
void TestBed::DeallocateMem(int const groupId, int const collId, int const rank)
{
InteractiveWait("Starting DeallocateMem");
// Build list of ranks this applies to (-1 for rank means to set for all)
std::vector<int> rankList;
for (int i = 0; i < this->numActiveRanks; ++i)
if (rank == -1 || rank == i) rankList.push_back(i);
// Build list of groups this applies to (-1 for groupId means to set for all)
std::vector<int> groupList;
for (int i = 0; i < this->numGroupCalls; ++i)
if (groupId == -1 || groupId == i) groupList.push_back(i);
int const cmd = TestBedChild::CHILD_DEALLOCATE_MEM;
for (auto currRank : rankList)
for (auto currGroup : groupList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, collId);
PIPE_CHECK(childId);
for (auto currRank : rankList)
{
int const childId = rankToChildMap[currRank];
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currRank);
PIPE_WRITE(childId, currGroup);
PIPE_WRITE(childId, collId);
PIPE_CHECK(childId);
}
}
InteractiveWait("Finishing ValidateResults");
InteractiveWait("Finishing DeallocateMem");
}
void TestBed::DestroyComms()
@@ -388,6 +473,27 @@ namespace RcclUnitTesting
InteractiveWait("Finishing DestroyComms");
}
void TestBed::DestroyGraphs()
{
InteractiveWait("Starting DestroyGraphs");
int const cmd = TestBedChild::CHILD_DESTROY_GRAPHS;
for (int currGroup = 0; currGroup < this->numGroupCalls; ++currGroup)
{
for (int childId = 0; childId < this->numActiveChildren; ++childId)
{
// Send DestroyGraphs command to each active child process
PIPE_WRITE(childId, cmd);
PIPE_WRITE(childId, currGroup);
// Wait for child acknowledgement
PIPE_CHECK(childId);
}
}
InteractiveWait("Finishing DestroyGraphs");
}
void TestBed::Finalize()
{
if (this->numActiveChildren == 0)
@@ -422,7 +528,6 @@ namespace RcclUnitTesting
// Reset bookkeeping
this->numActiveChildren = 0;
this->numActiveRanks = 0;
this->numCollectivesInGroup = 0;
InteractiveWait("Finishing Finalize");
}
@@ -442,6 +547,18 @@ namespace RcclUnitTesting
return ev.GetAllSupportedDataTypes();
}
std::vector<int> const TestBed::GetNumCollsPerGroup(int numCollectivesInGroup,
int numGroupCalls)
{
return std::vector<int>(numGroupCalls, numCollectivesInGroup);
}
std::vector<int> const TestBed::GetNumStreamsPerGroup(int numStreamsPerGroup,
int numGroupCalls)
{
return std::vector<int>(numGroupCalls, numStreamsPerGroup);
}
std::vector<std::vector<int>> TestBed::GetDeviceIdsList(int const numProcesses,
int const numGpus)
{
@@ -618,7 +735,11 @@ namespace RcclUnitTesting
}
std::vector<int> currentRanksEmpty = {};
this->ExecuteCollectives(currentRanksEmpty, useHipGraphList[hgIdx]);
this->ExecuteCollectives(currentRanksEmpty, /*all groups*/ -1, useHipGraphList[hgIdx]);
if (useHipGraphList[hgIdx]) {
this->LaunchGraphs();
this->DestroyGraphs();
}
if (testing::Test::HasFailure())
{
isCorrect = false;
+44 -13
View File
@@ -22,27 +22,37 @@ namespace RcclUnitTesting
std::vector<TestBedChild*> childList; // List of child processes
std::vector<int> rankToChildMap; // Tracks which child process each rank is assigned to
std::vector<int> rankToDeviceMap; // Tracks which device each rank is assigned to
std::vector<int> numCollectivesInGroup; // # of collectives to execute per group call
std::vector<int> numStreamsPerGroup; // # of different streams available per group call
int numGroupCalls; // Total # of group calls to be executed
int numActiveChildren; // List of active children (with usable RCCL comms)
int numActiveRanks; // Current # of ranks in use
int numCollectivesInGroup; // # of collectives to execute per group call
bool useBlocking; // RCCL communication with blocking or non-blocking option
int numStreamsPerGroup; // # of different streams available per group call
EnvVars ev; // Environment variables
// Constructor - Creates one child process per detected GPU device that waits for further commands
TestBed();
// Prepare TestBed with multiple group call customization
void InitComms(std::vector<std::vector<int>> const& deviceIdsPerChild,
std::vector<int> const& numCollectivesInGroup,
std::vector<int> const& numStreamsPerGroup,
int const numGroupCalls = 1,
bool const useBlocking = true);
// Prepare TestBed for use with GPUs across multiple child processes
void InitComms(std::vector<std::vector<int>> const& deviceIdsPerChild,
int const numCollectivesInGroup = 1,
bool const useBlocking = true,
int const numStreamsPerGroup = 1);
int const numStreamsPerGroup = 1,
int const numGroupCalls = 1,
bool const useBlocking = true);
// Prepare TestBed for use with GPUs on a single child process
void InitComms(int const numGpus,
int const numCollectivesInGroup = 1,
bool const useBlocking = true,
int const numStreamsPerGroup = 1);
int const numStreamsPerGroup = 1,
int const numGroupCalls = 1,
bool const useBlocking = true);
// Set collectives arguments for specified collective / rank
// Setting scalarsPerRank to non-null will create custom reduction operator
@@ -54,6 +64,7 @@ namespace RcclUnitTesting
size_t const numOutputElements,
OptionalColArgs const &optionalArgs = {},
int const collId = -1,
int const groupId = 0,
int const rank = -1,
int const streamIdx = 0);
@@ -61,33 +72,45 @@ namespace RcclUnitTesting
// - Requires SetCollectiveArgs to have been called already
// Using collId = -1 (default) applies settings to all collectives in group
// Using rank = -1 (default) applies settings to all ranks
// Using groupIdx = -1 (default) applies setting to all groups
void AllocateMem(bool const inPlace = false,
bool const useManagedMem = false,
int const collId = -1,
int const rank = -1);
int const groupId = -1,
int const collId = -1,
int const rank = -1);
// Initialize input and compute expected results
// - requires that SetCollectiveArgs and AllocateMemory have already been called
// Setting groupId to -1 applies setting to all groups
// Setting collId to -1 applies settings to all collectives in group
// Setting rank to -1 applies settings to all ranks
// Setting prepDataFunc to nullptr uses the default fill pattern routine
void PrepareData(int const collId = -1,
int const rank = -1,
void PrepareData(int const groupId = -1,
int const collId = -1,
int const rank = -1,
CollFuncPtr const prepDataFunc = nullptr);
// Execute all collectives on all test children
// Blocks until collective is completed
void ExecuteCollectives(std::vector<int> const &currentRanks = {}, bool const useHipGraph = false);
void ExecuteCollectives(std::vector<int> const &currentRanks = {},
int const groupId = -1,
bool const useHipGraph = false);
// Perform results validation - compare output to expected
void ValidateResults(bool& isCorrect, int collId = -1, int const rank = -1);
void ValidateResults(bool& isCorrect, int const groupId = -1, int const collId = -1, int const rank = -1);
// Launch instantiated graphs
void LaunchGraphs(int const groupId = -1);
// Release allocated memory
void DeallocateMem(int collId = -1, int const rank = -1);
void DeallocateMem(int const groupId = -1, int const collId = -1, int const rank = -1);
// Release the RCCL comms
void DestroyComms();
// Release created graphs
void DestroyGraphs();
// Explicit TestBed destructor that releases all child processes
// No further calls to TestBed should be performed after this call
void Finalize();
@@ -101,6 +124,14 @@ namespace RcclUnitTesting
// Return all the supported data types based on build settings
std::vector<ncclDataType_t> const& GetAllSupportedDataTypes();
// Return a list for # of collectives per group
std::vector<int> const GetNumCollsPerGroup(int const numCollectivesInGroup,
int const numGroupCalls);
// Return a list for # of streams per group
std::vector<int> const GetNumStreamsPerGroup(int const numStreamsPerGroup,
int const numGroupCalls);
// Helper function that splits up GPUs to the given number of processes
static std::vector<std::vector<int>> GetDeviceIdsList(int const numProcesses,
int const numGpus,
+213 -145
View File
@@ -97,8 +97,10 @@ namespace RcclUnitTesting
case CHILD_PREPARE_DATA : status = PrepareData(); break;
case CHILD_EXECUTE_COLL : status = ExecuteCollectives(); break;
case CHILD_VALIDATE_RESULTS: status = ValidateResults(); break;
case CHILD_LAUNCH_GRAPHS : status = LaunchGraphs(); break;
case CHILD_DEALLOCATE_MEM : status = DeallocateMem(); break;
case CHILD_DESTROY_COMMS : status = DestroyComms(); break;
case CHILD_DESTROY_GRAPHS : status = DestroyGraphs(); break;
case CHILD_STOP : goto stop;
default: exit(0);
}
@@ -144,6 +146,7 @@ namespace RcclUnitTesting
PIPE_READ(id);
PIPE_READ(this->totalRanks);
PIPE_READ(this->rankOffset);
PIPE_READ(this->numGroupCalls);
PIPE_READ(this->numCollectivesInGroup);
PIPE_READ(this->useBlocking);
bool useMultiRankPerGpu;
@@ -155,16 +158,29 @@ namespace RcclUnitTesting
PIPE_READ(numGpus);
this->deviceIds.resize(numGpus);
this->streams.clear();
this->streams.resize(numGpus);
this->collArgs.resize(numGpus);
for (int i = 0; i < numGpus; i++)
this->streams.resize(this->numGroupCalls);
this->collArgs.resize(this->numGroupCalls);
for (int i = 0; i < this->numGroupCalls; i++)
{
PIPE_READ(this->deviceIds[i]);
this->collArgs[i].clear();
this->collArgs[i].resize(numCollectivesInGroup);
this->streams[i].resize(numStreamsPerGroup);
this->collArgs[i].resize(numGpus);
this->streams[i].resize(numGpus);
for (int j = 0; j < numGpus; j++)
{
//PIPE_READ(this->deviceIds[j]);
this->collArgs[i][j].clear();
this->collArgs[i][j].resize(numCollectivesInGroup[i]);
this->streams[i][j].resize(numStreamsPerGroup[i]);
}
}
for (int i = 0; i < numGpus; i++)
PIPE_READ(this->deviceIds[i]);
// Initialize graphs
this->graphs.resize(this->numGroupCalls);
this->graphExecs.resize(this->numGroupCalls);
this->graphEnabled.resize(this->numGroupCalls);
// Initialize communicators
comms.clear();
comms.resize(numGpus);
@@ -172,52 +188,57 @@ namespace RcclUnitTesting
// Initialize within a group call to avoid deadlock when using multiple ranks per child
ErrCode status = TEST_SUCCESS;
CHILD_NCCL_CALL(ncclGroupStart(), "ncclGroupStart");
for (int localRank = 0; localRank < numGpus; ++localRank)
for (int groupCallIdx = 0; groupCallIdx < this->numGroupCalls; ++groupCallIdx)
{
int const globalRank = this->rankOffset + localRank;
int const currGpu = this->deviceIds[localRank];
if (hipSetDevice(currGpu) != hipSuccess)
for (int localRank = 0; localRank < numGpus; ++localRank)
{
ERROR("Rank %d on child %d unable to switch to GPU %d\n", globalRank, this->childId, currGpu);
status = TEST_FAIL;
break;
}
int const globalRank = this->rankOffset + localRank;
int const currGpu = this->deviceIds[localRank];
for (int i = 0; i < this->numStreamsPerGroup; i++)
{
if (hipStreamCreate(&(this->streams[localRank][i])) != hipSuccess)
if (hipSetDevice(currGpu) != hipSuccess)
{
ERROR("Rank %d on child %d unable to create stream %d for GPU %d\n", globalRank, this->childId, i, currGpu);
ERROR("Rank %d on child %d unable to switch to GPU %d\n", globalRank, this->childId, currGpu);
status = TEST_FAIL;
break;
}
}
if (useMultiRankPerGpu)
{
//if (ncclCommInitRankMulti(&this->comms[localRank], this->totalRanks, id, globalRank, globalRank) != ncclSuccess)
for (int i = 0; i < this->numStreamsPerGroup[groupCallIdx]; i++)
{
ERROR("Rank %d on child %d unable to call ncclCommInitRankMulti\n", globalRank, this->childId);
status = TEST_FAIL;
break;
if (hipStreamCreate(&(this->streams[groupCallIdx][localRank][i])) != hipSuccess)
{
ERROR("Rank %d on child %d unable to create stream %d for GPU %d in group %d\n", globalRank, this->childId, i, currGpu, groupCallIdx);
status = TEST_FAIL;
break;
}
}
}
else if (this->useBlocking == false)
{
// When non-blocking communicator is desired call ncclCommInitRankConfig with appropriate flag
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
config.blocking = 0;
ncclCommInitRankConfig(&this->comms[localRank], this->totalRanks, id, globalRank, &config);
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorInitRankConfig", localRank);
}
else
{
if (ncclCommInitRank(&this->comms[localRank], this->totalRanks, id, globalRank) != ncclSuccess)
{
ERROR("Rank %d on child %d unable to call ncclCommInitRank\n", globalRank, this->childId);
status = TEST_FAIL;
break;
if (groupCallIdx == 0) {
if (useMultiRankPerGpu)
{
//if (ncclCommInitRankMulti(&this->comms[localRank], this->totalRanks, id, globalRank, globalRank) != ncclSuccess)
{
ERROR("Rank %d on child %d unable to call ncclCommInitRankMulti\n", globalRank, this->childId);
status = TEST_FAIL;
break;
}
}
else if (this->useBlocking == false)
{
// When non-blocking communicator is desired call ncclCommInitRankConfig with appropriate flag
ncclConfig_t config = NCCL_CONFIG_INITIALIZER;
config.blocking = 0;
ncclCommInitRankConfig(&this->comms[localRank], this->totalRanks, id, globalRank, &config);
CHILD_NCCL_CALL_NON_BLOCKING("ncclCommGetAsyncErrorInitRankConfig", localRank);
}
else
{
if (ncclCommInitRank(&this->comms[localRank], this->totalRanks, id, globalRank) != ncclSuccess)
{
ERROR("Rank %d on child %d unable to call ncclCommInitRank\n", globalRank, this->childId);
status = TEST_FAIL;
break;
}
}
}
}
}
@@ -256,6 +277,7 @@ namespace RcclUnitTesting
// Read values sent by parent [see TestBed::SetCollectiveArgs()]
int globalRank;
int collId;
int groupId;
ncclFunc_t funcType;
ncclDataType_t dataType;
size_t numInputElements;
@@ -265,6 +287,7 @@ namespace RcclUnitTesting
PIPE_READ(globalRank);
PIPE_READ(collId);
PIPE_READ(groupId);
PIPE_READ(funcType);
PIPE_READ(dataType);
PIPE_READ(numInputElements);
@@ -280,19 +303,19 @@ namespace RcclUnitTesting
int const localRank = globalRank - rankOffset;
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx)
for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx)
{
if (collId == -1 || collId == collIdx)
{
CollectiveArgs& collArg = this->collArgs[localRank][collIdx];
CollectiveArgs& collArg = this->collArgs[groupId][localRank][collIdx];
CHECK_CALL(collArg.SetArgs(globalRank, this->totalRanks,
this->deviceIds[localRank],
funcType, dataType,
numInputElements, numOutputElements,
streamIdx,
options));
if (this->verbose) INFO("Rank %d on child %d sets collective %d [%s]\n",
globalRank, this->childId, collIdx,
if (this->verbose) INFO("Rank %d on child %d sets collective %d in group %d [%s]\n",
globalRank, this->childId, collIdx, groupId,
collArg.GetDescription().c_str());
// If pre-mult scalars are provided, then create a custom reduction operator
@@ -304,8 +327,8 @@ namespace RcclUnitTesting
(ncclScalarResidence_t)options.scalarMode,
this->comms[localRank]),
"ncclRedOpCreatePreMulSum");
if (verbose) INFO("Child %d created custom redop %d for collective %d\n",
this->childId, collArg.options.redOp, collIdx);
if (verbose) INFO("Child %d created custom redop %d for group %d collective %d\n",
this->childId, collArg.options.redOp, groupId, collIdx);
}
}
}
@@ -322,11 +345,13 @@ namespace RcclUnitTesting
int collId;
bool inPlace;
bool useManagedMem;
int groupId;
PIPE_READ(globalRank);
PIPE_READ(collId);
PIPE_READ(inPlace);
PIPE_READ(useManagedMem);
PIPE_READ(groupId);
if (globalRank < this->rankOffset || (this->rankOffset + comms.size() <= globalRank))
{
@@ -336,14 +361,14 @@ namespace RcclUnitTesting
int const localRank = globalRank - rankOffset;
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx)
for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx)
{
if (collId == -1 || collId == collIdx)
{
CollectiveArgs& collArg = this->collArgs[localRank][collIdx];
CollectiveArgs& collArg = this->collArgs[groupId][localRank][collIdx];
CHECK_CALL(collArg.AllocateMem(inPlace, useManagedMem));
if (this->verbose) INFO("Rank %d on child %d allocates memory for collective %d on device %d (%s,%s) Input: %p Output %p\n",
globalRank, this->childId, collIdx, this->deviceIds[localRank],
if (this->verbose) INFO("Rank %d on child %d allocates memory for collective %d in group %d on device %d (%s,%s) Input: %p Output %p\n",
globalRank, this->childId, collIdx, groupId, this->deviceIds[localRank],
inPlace ? "in-place" : "out-of-place",
useManagedMem ? "managed" : "unmanaged",
collArg.inputGpu.ptr,
@@ -363,9 +388,11 @@ namespace RcclUnitTesting
// Read values sent by parent [see TestBed::PrepareData()]
int globalRank;
int collId;
int groupId;
CollFuncPtr prepDataFunc;
PIPE_READ(globalRank);
PIPE_READ(groupId);
PIPE_READ(collId);
PIPE_READ(prepDataFunc);
@@ -378,13 +405,13 @@ namespace RcclUnitTesting
int const localRank = globalRank - rankOffset;
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx)
for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx)
{
if (collId == -1 || collId == collIdx)
{
if (this->verbose) INFO("Rank %d on child %d prepares data for collective %d\n",
globalRank, this->childId, collIdx);
CHECK_CALL(this->collArgs[localRank][collIdx].PrepareData(prepDataFunc));
if (this->verbose) INFO("Rank %d on child %d prepares data for collective %d in group %d\n",
globalRank, this->childId, collIdx, groupId);
CHECK_CALL(this->collArgs[groupId][localRank][collIdx].PrepareData(prepDataFunc));
}
}
if (this->verbose) INFO("Child %d finishes PrepareData()\n", this->childId);
@@ -394,9 +421,11 @@ namespace RcclUnitTesting
ErrCode TestBedChild::ExecuteCollectives()
{
int timeoutUs = 0;
PIPE_READ(timeoutUs);
int groupId = 0;
bool useHipGraph = false;
PIPE_READ(timeoutUs);
PIPE_READ(groupId);
PIPE_READ(useHipGraph);
int numRanksToExecute, tempRank;
@@ -420,14 +449,14 @@ namespace RcclUnitTesting
}
numRanksToExecute = (int)localRanksToExecute.size();
std::vector<std::vector<hipGraph_t>> graphs;
std::vector<std::vector<hipGraphExec_t>> graphExec;
graphs.resize(numRanksToExecute);
graphExec.resize(numRanksToExecute);
this->graphs[groupId].resize(numRanksToExecute);
this->graphExecs[groupId].resize(numRanksToExecute);
this->graphEnabled[groupId].resize(numRanksToExecute);
for (int i = 0; i < numRanksToExecute; i++)
{
graphs[i].resize(this->numStreamsPerGroup);
graphExec[i].resize(this->numStreamsPerGroup);
this->graphs[groupId][i].resize(this->numStreamsPerGroup[groupId]);
this->graphExecs[groupId][i].resize(this->numStreamsPerGroup[groupId]);
this->graphEnabled[groupId][i].resize(this->numStreamsPerGroup[groupId]);
}
// Start HIP graph stream capture if requested
@@ -435,11 +464,11 @@ namespace RcclUnitTesting
{
for (int localRank : localRanksToExecute)
{
if (this->verbose) INFO("Capturing stream for rank %d\n", localRank);
if (this->verbose) INFO("Capturing stream for group %d rank %d\n", groupId, localRank);
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int i = 0; i < this->numStreamsPerGroup; i++)
for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++)
{
CHECK_HIP(hipStreamBeginCapture(this->streams[localRank][i], hipStreamCaptureModeRelaxed));
CHECK_HIP(hipStreamBeginCapture(this->streams[groupId][localRank][i], hipStreamCaptureModeRelaxed));
}
}
}
@@ -448,14 +477,14 @@ namespace RcclUnitTesting
CHILD_NCCL_CALL(ncclGroupStart(), "ncclGroupStart");
// Loop over all collectives to be executed in group call
for (int collId = 0; collId < this->numCollectivesInGroup; ++collId)
for (int collId = 0; collId < this->numCollectivesInGroup[groupId]; ++collId)
{
// Loop over all local ranks
for (int localRank : localRanksToExecute)
{
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
CollectiveArgs const& collArg = this->collArgs[localRank][collId];
CollectiveArgs const& collArg = this->collArgs[groupId][localRank][collId];
if (this->printValues && !useHipGraph)
{
@@ -464,14 +493,14 @@ namespace RcclUnitTesting
size_t const numInputBytes = numInputElementsToPrint * DataTypeToBytes(collArg.dataType);
inputCpu.AllocateCpuMem(numInputBytes);
CHECK_HIP(hipMemcpy(inputCpu.ptr, collArg.inputGpu.ptr, numInputBytes, hipMemcpyDeviceToHost));
printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Input",
printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Input",
inputCpu.ToString(collArg.dataType, numInputElementsToPrint).c_str());
inputCpu.FreeCpuMem();
int const numOutputElementsToPrint = (this->printValues < 0 ? collArg.numOutputElements : this->printValues);
size_t const numOutputBytes = numOutputElementsToPrint * DataTypeToBytes(collArg.dataType);
CHECK_HIP(hipMemcpy(collArg.outputCpu.ptr, collArg.outputGpu.ptr, numOutputBytes, hipMemcpyDeviceToHost));
printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Pre-Output",
printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Pre-Output",
collArg.outputCpu.ToString(collArg.dataType, numOutputElementsToPrint).c_str());
}
@@ -484,7 +513,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.root,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclBroadcast");
break;
case ncclCollReduce:
@@ -495,7 +524,7 @@ namespace RcclUnitTesting
collArg.options.redOp,
collArg.options.root,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclReduce");
break;
case ncclCollAllGather:
@@ -504,7 +533,7 @@ namespace RcclUnitTesting
collArg.numInputElements,
collArg.dataType,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclAllGather");
break;
case ncclCollReduceScatter:
@@ -514,7 +543,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.redOp,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclReduceScatter");
break;
case ncclCollAllReduce:
@@ -524,7 +553,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.redOp,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclAllReduce");
break;
case ncclCollGather:
@@ -534,7 +563,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.root,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclGather");
break;
case ncclCollScatter:
@@ -544,7 +573,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.root,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclScatter");
break;
case ncclCollAllToAll:
@@ -553,7 +582,7 @@ namespace RcclUnitTesting
collArg.numInputElements / collArg.totalRanks,
collArg.dataType,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclAllToAll");
break;
case ncclCollAllToAllv:
@@ -565,7 +594,7 @@ namespace RcclUnitTesting
collArg.options.rdispls + (this->rankOffset + localRank)*this->totalRanks,
collArg.dataType,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclAllToAllv");
break;
case ncclCollSend:
@@ -574,7 +603,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.root,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclSend");
break;
case ncclCollRecv:
@@ -583,7 +612,7 @@ namespace RcclUnitTesting
collArg.dataType,
collArg.options.root,
this->comms[localRank],
this->streams[localRank][collArg.streamIdx]),
this->streams[groupId][localRank][collArg.streamIdx]),
"ncclRecv");
break;
default:
@@ -624,33 +653,24 @@ namespace RcclUnitTesting
{
if (this->verbose) INFO("Ending stream capture for rank %d\n", localRank);
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int i = 0; i < this->numStreamsPerGroup; i++)
for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++)
{
CHECK_HIP(hipStreamEndCapture(this->streams[localRank][i], &graphs[localRank][i]));
CHECK_HIP(hipStreamEndCapture(this->streams[groupId][localRank][i], &this->graphs[groupId][localRank][i]));
if (this->verbose)
{
size_t numNodes;
hipGraphNode_t* nodes;
CHECK_HIP(hipGraphGetNodes(graphs[localRank][i], nodes, &numNodes));
INFO("Graph for rank %d stream %d has %lu nodes\n", localRank, i, numNodes);
}
// if (this->verbose)
// {
// size_t numNodes;
// hipGraphNode_t* nodes;
// CHECK_HIP(hipGraphGetNodes(graphs[localRank][i], nodes, &numNodes));
// INFO("Graph for rank %d stream %d has %lu nodes\n", localRank, i, numNodes);
// }
}
if (this->verbose) INFO("Instantiating executable graph for rank %d\n", localRank);
for (int i = 0; i < this->numStreamsPerGroup; i++)
if (this->verbose) INFO("Instantiating executable graph for group %d rank %d\n", groupId, localRank);
for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++)
{
CHECK_HIP(hipGraphInstantiate(&graphExec[localRank][i], graphs[localRank][i], NULL, NULL, 0));
}
}
for (int localRank : localRanksToExecute)
{
if (this->verbose) INFO("Launch graph for rank %d\n", localRank);
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int i = 0; i < this->numStreamsPerGroup; i++)
{
CHECK_HIP(hipGraphLaunch(graphExec[localRank][i], this->streams[localRank][i]));
CHECK_HIP(hipGraphInstantiate(&this->graphExecs[groupId][localRank][i], this->graphs[groupId][localRank][i], NULL, NULL, 0));
graphEnabled[groupId][localRank][i] = true;
}
}
}
@@ -664,8 +684,8 @@ namespace RcclUnitTesting
std::vector<hipStream_t> streamsToComplete;
for (int localRank : localRanksToExecute)
{
for (int i = 0; i < this->numStreamsPerGroup; i++)
streamsToComplete.push_back(this->streams[localRank][i]);
for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++)
streamsToComplete.push_back(this->streams[groupId][localRank][i]);
}
int usElapsed = 0, timedout = 0;
using namespace std::chrono;
@@ -701,40 +721,25 @@ namespace RcclUnitTesting
// of fencing between kernels and at hipStreamQuery
for (int localRank : localRanksToExecute)
{
if (this->verbose) INFO("Starting synchronization for rank %d\n", localRank);
for (int i = 0; i < this->numStreamsPerGroup; i++)
CHECK_HIP(hipStreamSynchronize(this->streams[localRank][i]));
}
// Destroy graphs
if (useHipGraph)
{
for (int localRank : localRanksToExecute)
{
if (this->verbose) INFO("Destroying graphs for rank %d\n", localRank);
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int i = 0; i < this->numStreamsPerGroup; i++)
{
CHECK_HIP(hipGraphDestroy(graphs[localRank][i]));
CHECK_HIP(hipGraphExecDestroy(graphExec[localRank][i]));
}
}
if (this->verbose) INFO("Starting synchronization for group %d rank %d\n", groupId, localRank);
for (int i = 0; i < this->numStreamsPerGroup[groupId]; i++)
CHECK_HIP(hipStreamSynchronize(this->streams[groupId][localRank][i]));
}
if (this->printValues)
{
for (int collId = 0; collId < this->numCollectivesInGroup; ++collId)
for (int collId = 0; collId < this->numCollectivesInGroup[groupId]; ++collId)
for (int localRank : localRanksToExecute)
{
CollectiveArgs const& collArg = this->collArgs[localRank][collId];
CollectiveArgs const& collArg = this->collArgs[groupId][localRank][collId];
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
int numOutputElementsToPrint = (this->printValues < 0 ? collArg.numOutputElements : this->printValues);
size_t const numOutputBytes = numOutputElementsToPrint * DataTypeToBytes(collArg.dataType);
CHECK_HIP(hipMemcpy(collArg.outputCpu.ptr, collArg.outputGpu.ptr, numOutputBytes, hipMemcpyDeviceToHost));
printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Output",
printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Output",
collArg.outputCpu.ToString(collArg.dataType, numOutputElementsToPrint).c_str());
printf("[ DEBUG ] Rank %02d Coll %d %-10s: %s\n", collArg.globalRank, collId, "Expected",
printf("[ DEBUG ] Rank %02d Group %d Coll %d %-10s: %s\n", collArg.globalRank, groupId, collId, "Expected",
collArg.expected.ToString(collArg.dataType, numOutputElementsToPrint).c_str());
}
}
@@ -752,8 +757,9 @@ namespace RcclUnitTesting
ErrCode TestBedChild::ValidateResults()
{
// Read values sent by parent [see TestBed::ValidateResults()]
int globalRank, collId;
int globalRank, groupId, collId;
PIPE_READ(globalRank);
PIPE_READ(groupId);
PIPE_READ(collId);
if (this->verbose) INFO("Child %d begins ValidateResults()\n", this->childId);
@@ -767,15 +773,15 @@ namespace RcclUnitTesting
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
ErrCode status = TEST_SUCCESS;
for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx)
for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx)
{
if (collId == -1 || collId == collIdx)
{
if (this->verbose) INFO("Rank %d on child %d validating collective %d results\n",
globalRank, this->childId, collIdx);
if (this->collArgs[localRank][collIdx].ValidateResults() != TEST_SUCCESS)
if (this->verbose) INFO("Rank %d on child %d validating collective %d in group %d results\n",
globalRank, this->childId, collIdx, groupId);
if (this->collArgs[groupId][localRank][collIdx].ValidateResults() != TEST_SUCCESS)
{
ERROR("Rank %d Collective %d output does not match expected\n", globalRank, collIdx);
ERROR("Rank %d Group %d Collective %d output does not match expected\n", globalRank, groupId, collIdx);
status = TEST_FAIL;
}
}
@@ -785,13 +791,35 @@ namespace RcclUnitTesting
return status;
}
ErrCode TestBedChild::LaunchGraphs()
{
int groupId;
PIPE_READ(groupId);
if (this->verbose) INFO("Child %d begins LaunchGraphs for group %d\n", this->childId, groupId);
for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank) {
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int streamIdx = 0; streamIdx < this->numStreamsPerGroup[groupId]; ++streamIdx)
{
if (this->verbose) INFO("Launch graph for group %d rank %d stream %d\n", groupId, localRank, streamIdx);
CHECK_HIP(hipGraphLaunch(this->graphExecs[groupId][localRank][streamIdx], this->streams[groupId][localRank][streamIdx]));
}
}
if (this->verbose) INFO("Child %d finishes LaunchGraphs for group %d\n", this->childId, groupId);
return TEST_SUCCESS;
}
ErrCode TestBedChild::DeallocateMem()
{
if (this->verbose) INFO("Child %d begins DeallocateMem\n", this->childId);
// Read values sent by parent [see TestBed::DeallocateMem()]
int globalRank, collId;
int globalRank, groupId, collId;
PIPE_READ(globalRank);
PIPE_READ(groupId);
PIPE_READ(collId);
if (globalRank < this->rankOffset || (this->rankOffset + comms.size() <= globalRank))
@@ -802,15 +830,15 @@ namespace RcclUnitTesting
int const localRank = globalRank - rankOffset;
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int collIdx = 0; collIdx < collArgs[localRank].size(); ++collIdx)
for (int collIdx = 0; collIdx < collArgs[groupId][localRank].size(); ++collIdx)
{
CollectiveArgs& collArg = this->collArgs[localRank][collIdx];
CollectiveArgs& collArg = this->collArgs[groupId][localRank][collIdx];
if (collId == -1 || collId == collIdx)
{
if (this->verbose)
{
INFO("Child %d release memory for collective %d (Input: %p Output %p\n",
this->childId, collIdx, collArg.inputGpu.ptr, collArg.outputGpu.ptr);
INFO("Child %d release memory for collective %d in group %d (Input: %p Output %p\n",
this->childId, collIdx, groupId, collArg.inputGpu.ptr, collArg.outputGpu.ptr);
}
CHECK_CALL(collArg.DeallocateMem());
@@ -819,8 +847,8 @@ namespace RcclUnitTesting
{
CHILD_NCCL_CALL(ncclRedOpDestroy(collArg.options.redOp, this->comms[localRank]),
"ncclRedOpDestroy");
if (verbose) INFO("Child %d destroys custom redop %d for collective %d\n",
this->childId, collArg.options.redOp, collIdx);
if (verbose) INFO("Child %d destroys custom redop %d for collective %d in group %d\n",
this->childId, collArg.options.redOp, collIdx, groupId);
}
}
if (this->verbose) INFO("Child %d finishes DeallocateMem\n", this->childId);
@@ -852,11 +880,14 @@ namespace RcclUnitTesting
{
CHILD_NCCL_CALL(ncclCommDestroy(this->comms[i]), "ncclCommDestroy");
}
for (int i = 0; i < this->streams.size(); ++i)
for (int i = 0; i < this->numGroupCalls; ++i)
{
for (int j = 0; j < this->numStreamsPerGroup; j++)
for (int j = 0; j < this->streams[i].size(); ++j)
{
CHECK_HIP(hipStreamDestroy(this->streams[i][j]));
for (int k = 0; k < this->streams[i][j].size(); ++k)
{
CHECK_HIP(hipStreamDestroy(this->streams[i][j][k]));
}
}
}
this->comms.clear();
@@ -864,4 +895,41 @@ namespace RcclUnitTesting
if (this->verbose) INFO("Child %d finishes DestroyComms\n", this->childId);
return TEST_SUCCESS;
}
ErrCode TestBedChild::DestroyGraphs()
{
if (this->verbose) INFO("Child %d begins DestroyGraphs\n", this->childId);
int groupId;
PIPE_READ(groupId);
// Release graphs
for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank)
{
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
for (int streamIdx = 0; streamIdx < this->numStreamsPerGroup[groupId]; ++streamIdx)
{
if (graphEnabled[groupId][localRank][streamIdx])
{
if (this->verbose) INFO("Destroying graphs for group %d rank %d stream %d\n", groupId, localRank, streamIdx);
CHECK_HIP(hipGraphDestroy(this->graphs[groupId][localRank][streamIdx]));
CHECK_HIP(hipGraphExecDestroy(this->graphExecs[groupId][localRank][streamIdx]));
}
}
}
for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank)
{
for (int i = 0; i < this->numStreamsPerGroup[groupId]; ++i)
CHECK_HIP(hipStreamSynchronize(this->streams[groupId][localRank][i]));
}
this->graphs[groupId].clear();
this->graphExecs[groupId].clear();
this->graphEnabled[groupId].clear();
if (this->verbose) INFO("Child %d finishes DestroyGraphs\n", this->childId);
return TEST_SUCCESS;
}
}
+28 -14
View File
@@ -1,5 +1,5 @@
/*************************************************************************
* Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved.
* Copyright (c) 2022-2024 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
@@ -28,10 +28,12 @@ namespace RcclUnitTesting
CHILD_PREPARE_DATA = 4, // PrepareData()
CHILD_EXECUTE_COLL = 5, // ExecuteCollectives()
CHILD_VALIDATE_RESULTS = 6, // ValidateResults()
CHILD_DEALLOCATE_MEM = 7, // DeallocateMem()
CHILD_DESTROY_COMMS = 8, // DestroyComms()
CHILD_STOP = 9, // Stop()
NUM_CHILD_COMMANDS = 10
CHILD_LAUNCH_GRAPHS = 7, // LaunchGraphs()
CHILD_DEALLOCATE_MEM = 8, // DeallocateMem()
CHILD_DESTROY_COMMS = 9, // DestroyComms()
CHILD_DESTROY_GRAPHS = 10, // DestroyGraphs()
CHILD_STOP = 11, // Stop()
NUM_CHILD_COMMANDS = 12
};
char const ChildCommandNames[NUM_CHILD_COMMANDS][20] =
@@ -43,8 +45,10 @@ namespace RcclUnitTesting
"PREPARE_DATA",
"EXECUTE_COLL",
"VALIDATE_RESULTS",
"LAUNCH_GRAPHS",
"DEALLOCATE_MEM",
"DESTROY_COMMS",
"DESTROY_GRAPHS",
"STOP"
};
@@ -61,15 +65,19 @@ namespace RcclUnitTesting
int childReadFd;
// These varibles may change based on commands issued by parent
int totalRanks; // Total ranks
int rankOffset; // Global rank offset for this child
int numCollectivesInGroup; // # of collectives to run per group call
bool useBlocking; // RCCL communication with blocking or non-blocking option
int numStreamsPerGroup; // # of different streams allowed per group call
std::vector<ncclComm_t> comms; // RCCL communicators for each rank
std::vector<int> deviceIds; // Device IDs for each rank
std::vector<std::vector<hipStream_t>> streams; // Streams for executing collectives
std::vector<std::vector<CollectiveArgs>> collArgs; // Info for each collective for each rank
int totalRanks; // Total ranks
int rankOffset; // Global rank offset for this child
int numGroupCalls; // Toatal # of group calls to be executed
bool useBlocking; // RCCL communication with blocking or non-blocking option
std::vector<int> numCollectivesInGroup; // # of collectives to run per group call
std::vector<int> numStreamsPerGroup; // # of different streams allowed per group call
std::vector<ncclComm_t> comms; // RCCL communicators for each rank
std::vector<int> deviceIds; // Device IDs for each rank
std::vector<std::vector<std::vector<hipStream_t>>> streams; // Streams for executing collectives per group call
std::vector<std::vector<std::vector<CollectiveArgs>>> collArgs; // Info for each collective for each rank per group call
std::vector<std::vector<std::vector<hipGraph_t>>> graphs; // Graphs for executing collectives per group call
std::vector<std::vector<std::vector<hipGraphExec_t>>> graphExecs; // GraphExecs for executing collectives per group call
std::vector<std::vector<std::vector<bool>>> graphEnabled;
// Constructor
TestBedChild(int const childId, bool const verbose, int const printValues);
@@ -102,10 +110,16 @@ namespace RcclUnitTesting
// Validate that output matches expected
ErrCode ValidateResults();
// Launch instantiated graphs
ErrCode LaunchGraphs();
// Release allocated memory
ErrCode DeallocateMem();
// Destroys RCCL communicators
ErrCode DestroyComms();
// Destroys graphs
ErrCode DestroyGraphs();
};
}