Added Unit test for nccl send recv (#506)

Added Send Receive test that tests through all pairs

[ROCm/rccl commit: ff54e79799]
This commit is contained in:
akolliasAMD
2022-03-02 15:50:16 -05:00
کامیت شده توسط GitHub
والد a182076a0e
کامیت 2419a950fe
8فایلهای تغییر یافته به همراه135 افزوده شده و 9 حذف شده
@@ -85,6 +85,8 @@ if(BUILD_TESTS)
Gather_InPlace.cpp
Gather_ManagedMem.cpp
Gather_OutOfPlace.cpp
#SendRecv
SendRecv_SinglePairs.cpp
)
endif()
@@ -0,0 +1,79 @@
/*************************************************************************
* Copyright (c) 2022 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "TestBed.hpp"
namespace RcclUnitTesting
{
TEST(SendRecv, SinglePairs)
{
TestBed testBed;
// Configuration
std::vector<ncclDataType_t> const& dataTypes = {ncclInt32, ncclFloat64};
std::vector<int> const numElements = {1048576, 53327, 1024};
bool const inPlace = false;
bool const useManagedMem = false;
bool isCorrect = true;
int totalRanks = testBed.ev.maxGpus;
for (int isMultiProcess = 0; isMultiProcess <= 1 && isCorrect; ++isMultiProcess)
{
int const numProcesses = isMultiProcess ? totalRanks : 1;
testBed.InitComms(TestBed::GetDeviceIdsList(numProcesses, totalRanks), 1);
for (int dataIdx = 0; dataIdx < dataTypes.size() && isCorrect; ++dataIdx)
for (int numIdx = 0; numIdx < numElements.size() && isCorrect; ++numIdx)
for (int sendRank = 0; sendRank < totalRanks; ++sendRank)
{
for (int recvRank = 0; recvRank < totalRanks; ++recvRank)
{
testBed.SetCollectiveArgs(ncclCollSend,
dataTypes[dataIdx],
ncclSum, // This should be moved to optional variables struct
recvRank,
numElements[numIdx],
numElements[numIdx],
0,
sendRank);
if (recvRank == 0)
{
testBed.AllocateMem(inPlace, useManagedMem, 0, sendRank);
testBed.PrepareData(0, sendRank);
}
if (recvRank != sendRank)
{
if (testBed.ev.showNames) // Show test names
INFO("%s process Datatype: %s SendReceive test Rank %d -> Rank %d for %d Elements\n",
isMultiProcess ? "Multi " : "Single",
ncclDataTypeNames[dataTypes[dataIdx]],
sendRank,
recvRank,
numElements[numIdx]);
testBed.SetCollectiveArgs(ncclCollRecv,
dataTypes[dataIdx],
ncclSum, // This should be moved to optional variables struct
sendRank,
numElements[numIdx],
numElements[numIdx],
0,
recvRank);
testBed.AllocateMem(inPlace, useManagedMem, 0, recvRank);
testBed.PrepareData(0, recvRank);
testBed.ExecuteCollectives({sendRank,recvRank });
testBed.ValidateResults(isCorrect, 0, recvRank);
testBed.DeallocateMem(0, recvRank);
}
}
testBed.DeallocateMem(0, sendRank);
}
testBed.DestroyComms();
}
testBed.Finalize();
}
}
@@ -175,7 +175,7 @@ namespace RcclUnitTesting
case ncclCollScatter: ss << "ncclScatter"; break;
case ncclCollAllToAll: ss << "ncclAllToAll"; break;
case ncclCollSend: ss << "ncclSend"; break;
case ncclCollRecv: ss << "ncclRevv"; break;
case ncclCollRecv: ss << "ncclRecv"; break;
default: ss << "[Unknown]"; break;
}
@@ -277,6 +277,7 @@ namespace RcclUnitTesting
return (funcType == ncclCollBroadcast ||
funcType == ncclCollReduce ||
funcType == ncclCollGather ||
funcType == ncclCollScatter);
funcType == ncclCollScatter ||
funcType == ncclCollSend); // this is incorrect but it works because in Send root is not root it is the peer
}
}
@@ -23,7 +23,8 @@ namespace RcclUnitTesting
case ncclCollGather: return DefaultPrepData_Gather(collArgs, false);
case ncclCollScatter: return DefaultPrepData_Scatter(collArgs);
case ncclCollAllToAll: return DefaultPrepData_AllToAll(collArgs);
//case ncclCollSendRecv: return DefaultPrepData_SendRecv(collArgs);
case ncclCollSend: return DefaultPrepData_Send(collArgs);
case ncclCollRecv: return DefaultPrepData_Recv(collArgs);
default:
ERROR("Unknown func type %d\n", collArgs.funcType);
return TEST_FAIL;
@@ -339,4 +340,21 @@ namespace RcclUnitTesting
}
return TEST_SUCCESS;
}
ErrCode DefaultPrepData_Send(CollectiveArgs &collArgs)
{
CHECK_CALL(CheckAllocation(collArgs));
return collArgs.inputGpu.FillPattern(collArgs.dataType,
collArgs.numInputElements,
collArgs.globalRank, true);
}
ErrCode DefaultPrepData_Recv(CollectiveArgs &collArgs)
{
CHECK_CALL(CheckAllocation(collArgs));
return collArgs.expected.FillPattern(collArgs.dataType,
collArgs.numOutputElements,
collArgs.root,
false);
}
}
@@ -22,5 +22,6 @@ namespace RcclUnitTesting
ErrCode DefaultPrepData_ReduceScatter(CollectiveArgs &collArgs);
ErrCode DefaultPrepData_Scatter(CollectiveArgs &collArgs);
ErrCode DefaultPrepData_AllToAll(CollectiveArgs &collArgs);
ErrCode DefaultPrepData_SendRecv(CollectiveArgs &collArgs);
ErrCode DefaultPrepData_Send(CollectiveArgs &collArgs);
ErrCode DefaultPrepData_Recv(CollectiveArgs &collArgs);
}
@@ -220,21 +220,35 @@ namespace RcclUnitTesting
}
}
void TestBed::ExecuteCollectives()
void TestBed::ExecuteCollectives(std::vector<int> const &currentRanks)
{
int const cmd = TestBedChild::CHILD_EXECUTE_COLL;
++TestBed::NumTestsRun();
std::vector<std::vector<int>> ranksPerChild(this->numActiveChildren);
for (int rank = 0; rank < currentRanks.size(); ++rank)
{
ranksPerChild[rankToChildMap[currentRanks[rank]]].push_back(rank);
}
// Send ExecuteColl command to each active child process
for (int childId = 0; childId < this->numActiveChildren; ++childId)
{
PIPE_WRITE(childId, cmd);
if ((currentRanks.size() == 0) || (ranksPerChild[childId].size() > 0))
{
PIPE_WRITE(childId, cmd);
int tempCurrentRanks = currentRanks.size();
PIPE_WRITE(childId, tempCurrentRanks);
for (int rank = 0; rank < currentRanks.size(); ++rank){
PIPE_WRITE(childId, currentRanks[rank]);
}
}
}
// Wait for child acknowledgement
for (int childId = 0; childId < this->numActiveChildren; ++childId)
{
PIPE_CHECK(childId);
if ((currentRanks.size() == 0) || (ranksPerChild[childId].size() > 0)) PIPE_CHECK(childId);
}
}
@@ -72,7 +72,7 @@ namespace RcclUnitTesting
// Execute all collectives on all test children
// Blocks until collective is completed
void ExecuteCollectives();
void ExecuteCollectives(std::vector<int> const &currentRanks = {});
// Perform results validation - compare output to expected
void ValidateResults(bool& isCorrect, int collId = -1, int const rank = -1);
@@ -262,7 +262,7 @@ namespace RcclUnitTesting
{
if (collId == -1 || collId == collIdx)
{
CollectiveArgs& collArg = this->collArgs[localRank][collIdx];
CollectiveArgs& collArg = this->collArgs[localRank][collIdx];
CHECK_CALL(collArg.AllocateMem(inPlace, useManagedMem));
if (this->verbose) INFO("Rank %d on child %d allocates memory for collective %d on device %d (%s,%s) Input: %p Output %p\n",
globalRank, this->childId, collIdx, this->deviceIds[localRank],
@@ -315,6 +315,14 @@ namespace RcclUnitTesting
ErrCode TestBedChild::ExecuteCollectives()
{
int numRanksToExecute, tempRank;
std::vector<int> ranksToExecute = {};
PIPE_READ(numRanksToExecute);
for (int rank = 0; rank < numRanksToExecute; ++rank){
PIPE_READ(tempRank);
ranksToExecute.push_back(tempRank - this->rankOffset);
}
if (this->verbose) INFO("Child %d begins ExecuteCollectives()\n", this->childId);
// Start group call
@@ -326,6 +334,9 @@ namespace RcclUnitTesting
// Loop over all local ranks
for (int localRank = 0; localRank < this->deviceIds.size(); ++localRank)
{
// If ranks to execute is empty, execute all ranks belonging to child
if (!ranksToExecute.empty() && (std::count(ranksToExecute.begin(), ranksToExecute.end(), localRank) == 0)) continue;
CHECK_HIP(hipSetDevice(this->deviceIds[localRank]));
CollectiveArgs const& collArg = this->collArgs[localRank][collId];