Update MP UT to support arbitrary # of GPUs; multiple bugfixes (#16)

* Fixing temp file creation/deletion for Clique kernel mode.

* Refactoring of MP unit tests; include bugfixes and general support for any number of GPUs

* GroupCall MP UT properly quits when too many devices specified

* MP UT will programmatically set NCCL_COMM_ID if not specified; updated install script

[ROCm/rccl commit: d00b7d17bd]
Cette révision appartient à :
Stanley Tsang
2021-02-05 17:49:25 -07:00
révisé par GitHub
Parent fe8923ebba
révision f152c8d160
23 fichiers modifiés avec 538 ajouts et 716 suppressions
+2 -2
Voir le fichier
@@ -200,10 +200,10 @@ if ($run_tests); then
if (test -f "./test/UnitTests"); then
if ($run_tests_all); then
./test/UnitTests
NCCL_COMM_ID=$HOSTNAME:55512 ./test/UnitTestsMultiProcess
./test/UnitTestsMultiProcess
else
./test/UnitTests --gtest_filter="BroadcastCorrectnessSweep*:*float32*"
NCCL_COMM_ID=$HOSTNAME:55512 ./test/UnitTestsMultiProcess --gtest_filter="BroadcastMultiProcessCorrectnessSweep*:*float32*"
./test/UnitTestsMultiProcess --gtest_filter="BroadcastMultiProcessCorrectnessSweep*:*float32*"
fi
else
echo "Unit tests have not been built yet; please re-run script with -t to build unit tests."
+28 -19
Voir le fichier
@@ -40,6 +40,7 @@ THE SOFTWARE.
#include <iostream>
#include <sstream>
#include <thread>
#include <unistd.h>
cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {};
int* CliqueManager::m_staticGpuBarrierMem = NULL;
@@ -502,28 +503,36 @@ void CliqueManager::WaitForBarrier()
ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash)
{
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
if (rcclParamEnableClique())
{
int msgid, fd;
std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid);
SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd);
NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid));
SYSCHECK(close(fd), "close");
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
int msgid, fd;
std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid);
SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd);
NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid));
SYSCHECK(unlink(msgQueueName.c_str()), "unlink");
SYSCHECK(close(fd), "close");
}
std::string shmDir = "/dev/shm/";
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
{
struct stat fileStatus;
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
std::string shmFullPath = shmDir + shmFileName;
// Check if shm file already exists; if so, unlink it
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
{
NCCLCHECK(shmUnlink(shmFileName.c_str()));
}
}
}
std::string shmDir = "/dev/shm/";
for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++)
else
{
struct stat fileStatus;
std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid);
std::string shmFullPath = shmDir + shmFileName;
// Check if shm file already exists; if so, unlink it
if (stat(shmFullPath.c_str(), &fileStatus) == 0)
{
NCCLCHECK(shmUnlink(shmFileName.c_str()));
}
INFO(NCCL_INIT, "Not performing bootstrap root for clique kernels as clique mode not enabled.");
}
return ncclSuccess;
}
+61 -15
Voir le fichier
@@ -702,13 +702,18 @@ namespace CorrectnessTests
class MultiProcessCorrectnessTest : public CorrectnessTest
{
protected:
// IMPORTANT: We cannot have any HIP API calls in the parent process.
// Do any HIP setup in SetupPerProcess().
void SetUp() override
{
// Check for NCCL_COMM_ID env variable (otherwise will not init)
// Check if NCCL_COMM_ID is already set; if not, set it now
if (!getenv("NCCL_COMM_ID"))
{
printf("Must set NCCL_COMM_ID prior to execution\n");
exit(0);
char hostname[HOST_NAME_MAX+1];
gethostname(hostname, HOST_NAME_MAX+1);
std::string hostnameString(hostname);
hostnameString.append(":55513");
setenv("NCCL_COMM_ID", hostnameString.c_str(), 0);
}
// Make the test tuple parameters accessible
@@ -737,10 +742,14 @@ namespace CorrectnessTests
comms.resize(numDevices);
streams.resize(numDevices);
dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
}
void TearDown() override
{
munmap(dataset, sizeof(Dataset));
// Restore env vars after tests
for (int i = 0; i < numTokens/2; i++) {
if (savedEnv[i]) {
@@ -773,9 +782,11 @@ namespace CorrectnessTests
// Only proceed with testing if there are enough GPUs
if (numDevices > numDevicesAvailable)
{
fprintf(stdout, "[ SKIPPED ] Test requires %d devices (only %d available)\n",
numDevices, numDevicesAvailable);
if (rank == 0)
{
fprintf(stdout, "[ SKIPPED ] Test requires %d devices (only %d available)\n",
numDevices, numDevicesAvailable);
}
// Modify the number of devices so that tear-down doesn't occur
// This is temporary until GTEST_SKIP() becomes available
numDevices = 0;
@@ -795,7 +806,7 @@ namespace CorrectnessTests
if (res != ncclSuccess)
{
printf("Test failure:%s %d '%s' numRanks:%d\n", __FILE__,__LINE__,ncclGetErrorString(res), numDevices);
ASSERT_EQ(res, hipSuccess);
ASSERT_EQ(res, ncclSuccess);
}
}
@@ -803,17 +814,22 @@ namespace CorrectnessTests
void SetUpPerProcess(int rank, ncclFunc_t const func, ncclComm_t& comm, hipStream_t& stream, Dataset& dataset)
{
SetUpPerProcessHelper(rank, comm, stream);
dataset.Initialize(numDevices, numElements, dataType, inPlace, func, rank);
if (numDevices <= numDevicesAvailable)
{
dataset.Initialize(numDevices, numElements, dataType, inPlace, func, rank);
}
}
// To be called by each process/rank individually (see GroupCallsMultiProcess)
void SetUpPerProcess(int rank, std::vector<ncclFunc_t> const& func, ncclComm_t& comm, hipStream_t& stream, std::vector<Dataset*>& datasets)
{
SetUpPerProcessHelper(rank, comm, stream);
for (int i = 0; i < datasets.size(); i++)
if (numDevices <= numDevicesAvailable)
{
datasets[i]->Initialize(numDevices, numElements, dataType, inPlace, func[i], rank);
for (int i = 0; i < datasets.size(); i++)
{
datasets[i]->Initialize(numDevices, numElements, dataType, inPlace, func[i], rank);
}
}
}
@@ -875,7 +891,7 @@ namespace CorrectnessTests
free(arrayI1);
}
void ValidateResults(Dataset const& dataset, int rank, int root = 0) const
bool ValidateResults(Dataset const& dataset, int rank, int root = 0) const
{
int8_t* outputI1 = (int8_t *)malloc(dataset.NumBytes(ncclOutputBuffer));
uint8_t* outputU1 = (uint8_t *)outputI1;
@@ -894,8 +910,11 @@ namespace CorrectnessTests
// only output on root rank is valid for gather collective
if (dataset.function == ncclCollGather && rank != root)
return;
HIP_CALL(hipMemcpy(outputI1, dataset.outputs[rank], dataset.NumBytes(ncclOutputBuffer), hipMemcpyDeviceToHost));
return true;
hipError_t err = hipMemcpy(outputI1, dataset.outputs[rank], dataset.NumBytes(ncclOutputBuffer), hipMemcpyDeviceToHost);
if (err != hipSuccess)
return false;
int8_t* expectedI1 = (int8_t *)dataset.expected[rank];
uint8_t* expectedU1 = (uint8_t *)expectedI1;
@@ -953,8 +972,35 @@ namespace CorrectnessTests
}
}
}
ASSERT_EQ(isMatch, true);
return isMatch;
}
void ValidateProcesses(std::vector<int> const& pids)
{
int numProcesses = pids.size();
int status[numProcesses];
for (int i = 0; i < numProcesses; i++)
{
waitpid(pids[i], &status[i], 0);
ASSERT_NE(WIFEXITED(status[i]), 0) << "[ERROR] Child process " << i << " did not exit cleanly.";
ASSERT_EQ(WEXITSTATUS(status[i]), EXIT_SUCCESS) << "[ERROR] Child process " << i << " had a test failure.";
}
}
void TerminateChildProcess(bool const pass)
{
if (pass)
{
exit(EXIT_SUCCESS);
}
else
{
exit(EXIT_FAILURE);
}
}
Dataset* dataset;
};
std::string GenerateTestNameString(testing::TestParamInfo<MultiProcessCorrectnessTest::ParamType>& info);
+13 -49
Voir le fichier
@@ -9,63 +9,27 @@ namespace CorrectnessTests
{
TEST_P(AllGatherMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllGather);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestAllGather(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestAllGather(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestAllGather(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestAllGather(3, *dataset);
exit(0);
bool pass;
TestAllGather(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(AllGatherMultiProcessCorrectnessSweep,
@@ -87,7 +51,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(3072, 3145728),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+27 -15
Voir le fichier
@@ -13,38 +13,50 @@ namespace CorrectnessTests
class AllGatherMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, int const rank, int const numDevices)
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, int const numDevices, std::vector<int> const& ranks)
{
size_t const byteCount = dataset.NumBytes() / dataset.numDevices;
HIP_CALL(hipMemcpy(static_cast<char*>(dataset.expected[0]) + rank * byteCount, (int8_t *)dataset.inputs[rank] + (rank * byteCount),
byteCount, hipMemcpyDeviceToHost));
barrier.Wait();
// Rank 0 sends answer to other ranks
if (rank == 0)
for (int i = 0; i < ranks.size(); i++)
{
for (int i = 0; i < dataset.numDevices; i++)
int rank = ranks[i];
HIP_CALL(hipMemcpy(static_cast<char*>(dataset.expected[0]) + rank * byteCount, (int8_t *)dataset.inputs[rank] + (rank * byteCount),
byteCount, hipMemcpyDeviceToHost));
}
barrier.Wait();
// Rank 0 sends answer to other ranks
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
if (rank == 0)
{
if (i == rank) continue;
memcpy(dataset.expected[i], dataset.expected[0], dataset.NumBytes());
for (int i = 0; i < dataset.numDevices; i++)
{
if (i == rank) continue;
memcpy(dataset.expected[i], dataset.expected[0], dataset.NumBytes());
}
}
}
}
void TestAllGather(int rank, Dataset& dataset)
void TestAllGather(int rank, Dataset& dataset, bool& pass)
{
// Prepare input / output / expected results
SetUpPerProcess(rank, ncclCollAllGather, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numElements % numDevices != 0) return;
if (numDevices > numDevicesAvailable || numElements % numDevices != 0)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
// Prepare input / output / expected results
FillDatasetWithPattern(dataset, rank);
ComputeExpectedResults(dataset, barrier, rank, numDevices);
ComputeExpectedResults(dataset, barrier, numDevices, std::vector<int>(1, rank));
size_t const byteCount = dataset.NumBytes() / numDevices;
size_t const sendCount = dataset.numElements / numDevices;
@@ -58,7 +70,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
TearDownPerProcess(comms[rank], streams[rank]);
dataset.Release(rank);
+13 -49
Voir le fichier
@@ -10,63 +10,27 @@ namespace CorrectnessTests
{
TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestAllReduce(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestAllReduce(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestAllReduce(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestAllReduce(3, *dataset);
exit(0);
bool pass;
TestAllReduce(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep,
@@ -88,7 +52,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+22 -10
Voir le fichier
@@ -13,13 +13,17 @@ namespace CorrectnessTests
class AllReduceMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const rank)
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, std::vector<int> const& ranks)
{
// Copy all inputs to expected arrays temporarily to perform reduction on host
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
}
barrier.Wait();
// Allocate temporary host array to accumulate results
int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes());
uint8_t* resultU1 = (uint8_t *)resultI1;
@@ -68,23 +72,31 @@ namespace CorrectnessTests
}
}
barrier.Wait();
// Copy results into expected array
memcpy(dataset.expected[rank], resultI1, dataset.NumBytes());
// Copy results into expected array
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
memcpy(dataset.expected[rank], resultI1, dataset.NumBytes());
}
free(resultI1);
}
void TestAllReduce(int rank, Dataset& dataset)
void TestAllReduce(int rank, Dataset& dataset, bool& pass)
{
SetUpPerProcess(rank, ncclCollAllReduce, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
// Prepare input / output / expected results
FillDatasetWithPattern(dataset, rank);
ComputeExpectedResults(dataset, barrier, op, rank);
ComputeExpectedResults(dataset, barrier, op, std::vector<int>(1, rank));
// Launch the reduction
ncclAllReduce(dataset.inputs[rank], dataset.outputs[rank],
@@ -94,7 +106,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
TearDownPerProcess(comms[rank], streams[rank]);
dataset.Release(rank);
+13 -48
Voir le fichier
@@ -10,62 +10,27 @@ namespace CorrectnessTests
{
TEST_P(AllToAllMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllToAll);
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestAllToAll(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestAllToAll(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestAllToAll(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestAllToAll(3, *dataset);
exit(0);
bool pass;
TestAllToAll(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(AllToAllMultiProcessCorrectnessSweep,
@@ -87,7 +52,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false),
testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")),
+16 -8
Voir le fichier
@@ -13,24 +13,32 @@ namespace CorrectnessTests
class AllToAllMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, int const rank)
static void ComputeExpectedResults(Dataset& dataset, std::vector<int> const& ranks)
{
for (int i = 0; i < dataset.numDevices; i++)
for (int i = 0; i < ranks.size(); i++)
{
HIP_CALL(hipMemcpy((int8_t *)dataset.expected[i]+dataset.NumBytes()*rank, (int8_t *)dataset.inputs[rank]+dataset.NumBytes()*i,
dataset.NumBytes(), hipMemcpyDeviceToHost));
int rank = ranks[i];
for (int j = 0; j < dataset.numDevices; j++)
{
HIP_CALL(hipMemcpy((int8_t *)dataset.expected[j]+dataset.NumBytes()*rank, (int8_t *)dataset.inputs[rank]+dataset.NumBytes()*j,
dataset.NumBytes(), hipMemcpyDeviceToHost));
}
}
}
void TestAllToAll(int rank, Dataset& dataset)
void TestAllToAll(int rank, Dataset& dataset, bool& pass)
{
SetUpPerProcess(rank, ncclCollAllToAll, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
// Prepare input / output / expected results
FillDatasetWithPattern(dataset, rank);
ComputeExpectedResults(dataset, rank);
ComputeExpectedResults(dataset, std::vector<int>(1, rank));
// Launch the reduction
ncclAllToAll(dataset.inputs[rank],
@@ -42,7 +50,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
TearDownPerProcess(comms[rank], streams[rank]);
dataset.Release(rank);
+13 -49
Voir le fichier
@@ -17,63 +17,27 @@ namespace CorrectnessTests
{
TEST_P(BroadcastMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollBroadcast);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestBroadcast(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestBroadcast(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestBroadcast(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestBroadcast(3, *dataset);
exit(0);
bool pass;
TestBroadcast(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(BroadcastMultiProcessCorrectnessSweep,
@@ -95,7 +59,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+21 -12
Voir le fichier
@@ -13,26 +13,35 @@ namespace CorrectnessTests
class BroadcastMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, int const root, int const rank)
static void ComputeExpectedResults(Dataset& dataset, int const root, std::vector<int> const& ranks)
{
// Root has the answer; share it via host memcpy's
if (rank == root)
for (int h = 0; h < ranks.size(); h++)
{
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
for (int i = 0; i < dataset.numDevices; i++)
int rank = ranks[h];
// Root has the answer; share it via host memcpy's
if (rank == root)
{
if (i == rank) continue;
memcpy(dataset.expected[i], dataset.expected[root], dataset.NumBytes());
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
for (int i = 0; i < dataset.numDevices; i++)
{
if (i == rank) continue;
memcpy(dataset.expected[i], dataset.expected[root], dataset.NumBytes());
}
break;
}
}
}
void TestBroadcast(int rank, Dataset& dataset)
void TestBroadcast(int rank, Dataset& dataset, bool& pass)
{
SetUpPerProcess(rank, ncclCollBroadcast, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
@@ -41,7 +50,7 @@ namespace CorrectnessTests
{
// Prepare input / output / expected results
FillDatasetWithPattern(dataset, rank);
ComputeExpectedResults(dataset, root, rank);
ComputeExpectedResults(dataset, root, std::vector<int>(1, rank));
// Launch the reduction (1 process per GPU)
ncclResult_t res = ncclBroadcast(dataset.inputs[rank],
@@ -53,7 +62,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
// Ensure all processes have finished current iteration before proceeding
barrier.Wait();
+14 -46
Voir le fichier
@@ -25,60 +25,28 @@ namespace CorrectnessTests
datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]);
}
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
std::vector<int> pids(numDevices);
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestCombinedCalls(0, datasets, ncclFuncs);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestCombinedCalls(1, datasets, ncclFuncs);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestCombinedCalls(2, datasets, ncclFuncs);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestCombinedCalls(3, datasets, ncclFuncs);
exit(0);
bool pass;
TestCombinedCalls(gpu, datasets, ncclFuncs, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
ValidateProcesses(pids);
for (int i = 0; i < datasets.size(); i++)
{
munmap(datasets[i], sizeof(Dataset));
@@ -104,7 +72,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(3072, 3145728),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+30 -11
Voir le fichier
@@ -20,21 +20,26 @@ namespace CorrectnessTests
class CombinedCallsMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
void TestCombinedCalls(int rank, std::vector<Dataset*>& datasets, std::vector<ncclFunc_t> const& funcs)
void TestCombinedCalls(int rank, std::vector<Dataset*>& datasets, std::vector<ncclFunc_t> const& funcs, bool& pass)
{
SetUpPerProcess(rank, funcs, comms[rank], streams[rank], datasets);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
// Compute expected results for each dataset in combined
int const root = 0;
AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, rank, numDevices);
AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, rank);
BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, rank);
ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, rank);
ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, rank);
std::vector<int> ranks(1, rank);
AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks);
AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks);
BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks);
ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks);
ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks);
size_t const byteCount = datasets[0]->NumBytes() / numDevices;
size_t const elemCount = numElements / numDevices;
@@ -64,12 +69,26 @@ namespace CorrectnessTests
// Wait for reduction to complete
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results for each collective in the combined
for (int i = 0; i < 5; i++)
for (int i = 0; i < funcs.size(); i++)
{
ValidateResults(*datasets[i], rank);
for (int j = 0; j < ranks.size(); j++)
{
pass = ValidateResults(*datasets[i], ranks[j], root);
if (!pass)
{
break;
}
}
barrier.Wait();
datasets[i]->Release(rank);
for (int j = 0; j < ranks.size(); j++)
{
datasets[i]->Release(ranks[j]);
}
}
for (int i = 0; i < ranks.size(); i++)
{
TearDownPerProcess(comms[ranks[i]], streams[ranks[i]]);
}
}
};
+13 -49
Voir le fichier
@@ -10,63 +10,27 @@ namespace CorrectnessTests
{
TEST_P(GatherMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollGather);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestGather(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestGather(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestGather(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestGather(3, *dataset);
exit(0);
bool pass;
TestGather(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(GatherMultiProcessCorrectnessSweep,
@@ -88,7 +52,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false),
testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")),
+7 -3
Voir le fichier
@@ -19,11 +19,15 @@ namespace CorrectnessTests
dataset.NumBytes(), hipMemcpyDeviceToHost));
}
void TestGather(int rank, Dataset& dataset)
void TestGather(int rank, Dataset& dataset, bool& pass)
{
SetUpPerProcess(rank, ncclCollGather, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
@@ -44,7 +48,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank, root);
pass = ValidateResults(dataset, rank, root);
// Ensure all processes have finished current iteration before proceeding
barrier.Wait();
+21 -57
Voir le fichier
@@ -25,75 +25,39 @@ namespace CorrectnessTests
datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]);
}
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
int const numGpusPerProcess = 2;
int const numProcesses = numDevices / numGpusPerProcess;
std::vector<int> pids(numProcesses);
int process = -1;
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
for (int i = 0; i < numDevices; i+= numGpusPerProcess)
{
pid2 = fork();
if (numDevices > 4)
process++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 4) || (pid2 > 0 && pid3 > 0 && numDevices > 4))
{
// Process 0
std::vector<int> ranks;
ranks.push_back(0);
ranks.push_back(1);
int gpuIdx = i;
int maxIdx = gpuIdx + (numGpusPerProcess - 1) >= numDevices ? numDevices : gpuIdx + numGpusPerProcess;
TestGroupCalls(0, ranks, datasets, ncclFuncs);
if (pid3 > 0)
std::vector<int> ranks;
for (; gpuIdx < maxIdx; gpuIdx++)
{
waitpid(pid3, NULL, 0);
ranks.push_back(gpuIdx);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 4) || (pid2 == 0 && pid3 > 0 && numDevices > 4))
{
// Process 1
std::vector<int> ranks;
ranks.push_back(2);
ranks.push_back(3);
TestGroupCalls(1, ranks, datasets, ncclFuncs);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices == 8)
{
// Process 2 (available when numDevices == 8)
std::vector<int> ranks;
ranks.push_back(4);
ranks.push_back(5);
TestGroupCalls(2, ranks, datasets, ncclFuncs);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 8)
{
// Process 3 (available when numDevices == 8)
std::vector<int> ranks;
ranks.push_back(6);
ranks.push_back(7);
TestGroupCalls(3, ranks, datasets, ncclFuncs);
exit(0);
bool pass;
TestGroupCalls(process, ranks, datasets, ncclFuncs, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[process] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
ValidateProcesses(pids);
for (int i = 0; i < datasets.size(); i++)
{
munmap(datasets[i], sizeof(Dataset));
@@ -119,7 +83,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(3072, 3145728),
// Number of devices
testing::Values(4),
testing::Values(4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+34 -14
Voir le fichier
@@ -21,34 +21,50 @@ namespace CorrectnessTests
class GroupCallsMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
void TestGroupCalls(int process, std::vector<int> const& ranks, std::vector<Dataset*>& datasets, std::vector<ncclFunc_t> const& funcs)
void TestGroupCalls(int process, std::vector<int> const& ranks, std::vector<Dataset*>& datasets, std::vector<ncclFunc_t> const& funcs, bool& pass)
{
if (numDevices > numDevicesAvailable) return;
ncclGroupStart();
for (int i = 0; i < ranks.size(); i++)
{
SetUpPerProcess(ranks[i], funcs, comms[ranks[i]], streams[ranks[i]], datasets);
if (numDevices > numDevicesAvailable)
{
break;
}
}
ncclGroupEnd();
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
int numProcesses = numDevices / ranks.size();
Barrier barrier(process, numProcesses, std::atoi(getenv("NCCL_COMM_ID")));
int const root = 0;
for (int i = 0; i < ranks.size(); i++)
{
AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks[i]);
AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks[i]);
BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks[i]);
ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks[i]);
ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks[i]);
for (int j = 0; j < datasets.size(); j++)
{
FillDatasetWithPattern(*datasets[j], ranks[i]);
}
}
int const root = 0;
AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks);
AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks);
BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks);
ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks);
ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks);
barrier.Wait();
ncclGroupStart();
// AllGather
size_t const byteCount = datasets[0]->NumBytes() / numDevices;
size_t const elemCount = numElements / numDevices;
ncclGroupStart();
// AllGather
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
@@ -90,7 +106,7 @@ namespace CorrectnessTests
{
int rank = ranks[i];
ncclReduceScatter(datasets[4]->inputs[rank],
(int8_t *)datasets[4]->outputs[rank] + (i * byteCount),
(int8_t *)datasets[4]->outputs[rank] + (rank * byteCount),
elemCount, dataType, op,
comms[rank], streams[rank]);
}
@@ -108,7 +124,11 @@ namespace CorrectnessTests
{
for (int j = 0; j < ranks.size(); j++)
{
ValidateResults(*datasets[i], ranks[j]);
pass = ValidateResults(*datasets[i], ranks[j], root);
if (!pass)
{
break;
}
}
barrier.Wait();
for (int j = 0; j < ranks.size(); j++)
+13 -49
Voir le fichier
@@ -10,63 +10,27 @@ namespace CorrectnessTests
{
TEST_P(ReduceMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollReduce);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestReduce(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestReduce(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestReduce(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestReduce(3, *dataset);
exit(0);
bool pass;
TestReduce(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(ReduceMultiProcessCorrectnessSweep,
@@ -88,7 +52,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+69 -53
Voir le fichier
@@ -13,77 +13,93 @@ namespace CorrectnessTests
class ReduceMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const root, int const rank)
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const root, std::vector<int> const& ranks)
{
// Copy all inputs to expected arrays temporarily to perform reduction on host
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
}
barrier.Wait();
if (rank == root)
for (int h = 0; h < ranks.size(); h++)
{
// Allocate temporary host array to accumulate results
int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes());
uint8_t* resultU1 = (uint8_t *)resultI1;
int32_t* resultI4 = (int32_t *)resultI1;
uint32_t* resultU4 = (uint32_t *)resultI1;
int64_t* resultI8 = (int64_t *)resultI1;
uint64_t* resultU8 = (uint64_t *)resultI1;
float* resultF4 = (float *)resultI1;
double* resultF8 = (double *)resultI1;
rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1;
// Initialize the result with the first device's array
memcpy(resultI1, dataset.expected[0], dataset.NumBytes());
// Perform reduction on the other device arrays
for (int i = 1; i < dataset.numDevices; i++)
int rank = ranks[h];
if (rank == root)
{
int8_t* arrayI1 = (int8_t *)dataset.expected[i];
uint8_t* arrayU1 = (uint8_t *)arrayI1;
int32_t* arrayI4 = (int32_t *)arrayI1;
uint32_t* arrayU4 = (uint32_t *)arrayI1;
int64_t* arrayI8 = (int64_t *)arrayI1;
uint64_t* arrayU8 = (uint64_t *)arrayI1;
float* arrayF4 = (float *)arrayI1;
double* arrayF8 = (double *)arrayI1;
rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1;
// Allocate temporary host array to accumulate results
int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes());
uint8_t* resultU1 = (uint8_t *)resultI1;
int32_t* resultI4 = (int32_t *)resultI1;
uint32_t* resultU4 = (uint32_t *)resultI1;
int64_t* resultI8 = (int64_t *)resultI1;
uint64_t* resultU8 = (uint64_t *)resultI1;
float* resultF4 = (float *)resultI1;
double* resultF8 = (double *)resultI1;
rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1;
for (int j = 0; j < dataset.numElements; j++)
// Initialize the result with the first device's array
memcpy(resultI1, dataset.expected[0], dataset.NumBytes());
// Perform reduction on the other device arrays
for (int i = 1; i < dataset.numDevices; i++)
{
switch (dataset.dataType)
int8_t* arrayI1 = (int8_t *)dataset.expected[i];
uint8_t* arrayU1 = (uint8_t *)arrayI1;
int32_t* arrayI4 = (int32_t *)arrayI1;
uint32_t* arrayU4 = (uint32_t *)arrayI1;
int64_t* arrayI8 = (int64_t *)arrayI1;
uint64_t* arrayU8 = (uint64_t *)arrayI1;
float* arrayF4 = (float *)arrayI1;
double* arrayF8 = (double *)arrayI1;
rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1;
for (int j = 0; j < dataset.numElements; j++)
{
case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break;
case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break;
case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break;
case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break;
case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break;
case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break;
case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break;
case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break;
case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break;
default:
fprintf(stderr, "[ERROR] Unsupported datatype\n");
exit(0);
switch (dataset.dataType)
{
case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break;
case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break;
case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break;
case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break;
case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break;
case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break;
case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break;
case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break;
case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break;
default:
fprintf(stderr, "[ERROR] Unsupported datatype\n");
exit(0);
}
}
}
memcpy(dataset.expected[root], resultI1, dataset.NumBytes());
free(resultI1);
}
memcpy(dataset.expected[root], resultI1, dataset.NumBytes());
free(resultI1);
barrier.Wait();
}
else
barrier.Wait();
for (int i = 0; i < ranks.size(); i++)
{
barrier.Wait();
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], dataset.NumBytes(), hipMemcpyDeviceToHost));
int rank = ranks[i];
if (rank != root)
{
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], dataset.NumBytes(), hipMemcpyDeviceToHost));
}
}
}
void TestReduce(int rank, Dataset& dataset)
void TestReduce(int rank, Dataset& dataset, bool& pass)
{
SetUpPerProcess(rank, ncclCollReduce, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
@@ -92,7 +108,7 @@ namespace CorrectnessTests
{
// Prepare input / output / expected results
FillDatasetWithPattern(dataset, rank);
ComputeExpectedResults(dataset, barrier, op, root, rank);
ComputeExpectedResults(dataset, barrier, op, root, std::vector<int>(1, rank));
// Launch the reduction (1 process per GPU)
ncclResult_t res = ncclReduce(dataset.inputs[rank],
dataset.outputs[rank],
@@ -101,7 +117,7 @@ namespace CorrectnessTests
// Wait for reduction to complete
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
// Ensure all processes have finished current iteration before proceeding
barrier.Wait();
}
+13 -49
Voir le fichier
@@ -10,63 +10,27 @@ namespace CorrectnessTests
{
TEST_P(ReduceScatterMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollReduceScatter);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestReduceScatter(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestReduceScatter(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestReduceScatter(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestReduceScatter(3, *dataset);
exit(0);
bool pass;
TestReduceScatter(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(ReduceScatterMultiProcessCorrectnessSweep,
@@ -88,7 +52,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(3072, 3145728),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false, true),
testing::Values("")),
+75 -57
Voir le fichier
@@ -13,60 +13,68 @@ namespace CorrectnessTests
class ReduceScatterMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest
{
public:
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const rank)
static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, std::vector<int> const& ranks)
{
// Copy all inputs to expected arrays temporarily to perform reduction on host
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
}
barrier.Wait();
// Have rank 0 do the expected calculation, then send results to other processes
int8_t* resultI1;
if (rank == 0)
int8_t* resultI1;
for (int h = 0; h < ranks.size(); h++)
{
// Allocate temporary host array to accumulate results
resultI1 = (int8_t *)malloc(dataset.NumBytes());
uint8_t* resultU1 = (uint8_t *)resultI1;
int32_t* resultI4 = (int32_t *)resultI1;
uint32_t* resultU4 = (uint32_t *)resultI1;
int64_t* resultI8 = (int64_t *)resultI1;
uint64_t* resultU8 = (uint64_t *)resultI1;
float* resultF4 = (float *)resultI1;
double* resultF8 = (double *)resultI1;
rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1;
// Initialize the result with the first device's array
memcpy(resultI1, dataset.expected[0], dataset.NumBytes());
// Perform reduction on the other device arrays
for (int i = 1; i < dataset.numDevices; i++)
int rank = ranks[h];
if (rank == 0)
{
int8_t* arrayI1 = (int8_t *)dataset.expected[i];
uint8_t* arrayU1 = (uint8_t *)arrayI1;
int32_t* arrayI4 = (int32_t *)arrayI1;
uint32_t* arrayU4 = (uint32_t *)arrayI1;
int64_t* arrayI8 = (int64_t *)arrayI1;
uint64_t* arrayU8 = (uint64_t *)arrayI1;
float* arrayF4 = (float *)arrayI1;
double* arrayF8 = (double *)arrayI1;
rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1;
// Allocate temporary host array to accumulate results
resultI1 = (int8_t *)malloc(dataset.NumBytes());
uint8_t* resultU1 = (uint8_t *)resultI1;
int32_t* resultI4 = (int32_t *)resultI1;
uint32_t* resultU4 = (uint32_t *)resultI1;
int64_t* resultI8 = (int64_t *)resultI1;
uint64_t* resultU8 = (uint64_t *)resultI1;
float* resultF4 = (float *)resultI1;
double* resultF8 = (double *)resultI1;
rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1;
for (int j = 0; j < dataset.numElements; j++)
// Initialize the result with the first device's array
memcpy(resultI1, dataset.expected[0], dataset.NumBytes());
// Perform reduction on the other device arrays
for (int i = 1; i < dataset.numDevices; i++)
{
switch (dataset.dataType)
int8_t* arrayI1 = (int8_t *)dataset.expected[i];
uint8_t* arrayU1 = (uint8_t *)arrayI1;
int32_t* arrayI4 = (int32_t *)arrayI1;
uint32_t* arrayU4 = (uint32_t *)arrayI1;
int64_t* arrayI8 = (int64_t *)arrayI1;
uint64_t* arrayU8 = (uint64_t *)arrayI1;
float* arrayF4 = (float *)arrayI1;
double* arrayF8 = (double *)arrayI1;
rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1;
for (int j = 0; j < dataset.numElements; j++)
{
case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break;
case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break;
case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break;
case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break;
case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break;
case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break;
case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break;
case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break;
case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break;
default:
fprintf(stderr, "[ERROR] Unsupported datatype\n");
exit(0);
switch (dataset.dataType)
{
case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break;
case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break;
case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break;
case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break;
case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break;
case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break;
case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break;
case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break;
case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break;
default:
fprintf(stderr, "[ERROR] Unsupported datatype\n");
exit(0);
}
}
}
}
@@ -75,34 +83,44 @@ namespace CorrectnessTests
// Copy results into expected arrays
size_t const byteCount = dataset.NumBytes() / dataset.numDevices;
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
for (int i = 0; i < ranks.size(); i++)
{
int rank = ranks[i];
HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank],
dataset.NumBytes(), hipMemcpyDeviceToHost));
}
barrier.Wait();
if (rank == 0)
for (int h = 0; h < ranks.size(); h++)
{
for (int i = 0; i < dataset.numDevices; i++)
memcpy((int8_t *)dataset.expected[i] + (i * byteCount),
resultI1 + (i * byteCount), byteCount);
int rank = ranks[h];
if (rank == 0)
{
for (int i = 0; i < dataset.numDevices; i++)
memcpy((int8_t *)dataset.expected[i] + (i * byteCount),
resultI1 + (i * byteCount), byteCount);
free(resultI1);
free(resultI1);
}
}
}
void TestReduceScatter(int rank, Dataset& dataset)
void TestReduceScatter(int rank, Dataset& dataset, bool& pass)
{
// Prepare input / output / expected results
SetUpPerProcess(rank, ncclCollAllGather, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numElements % numDevices != 0) return;
if (numDevices > numDevicesAvailable || numElements % numDevices != 0)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
// Prepare input / output / expected results
FillDatasetWithPattern(dataset, rank);
ComputeExpectedResults(dataset, barrier, op, rank);
ComputeExpectedResults(dataset, barrier, op, std::vector<int>(1, rank));
size_t const byteCount = dataset.NumBytes() / numDevices;
size_t const recvCount = dataset.numElements / numDevices;
@@ -117,7 +135,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
TearDownPerProcess(comms[rank], streams[rank]);
dataset.Release(rank);
+13 -49
Voir le fichier
@@ -10,63 +10,27 @@ namespace CorrectnessTests
{
TEST_P(ScatterMultiProcessCorrectnessTest, Correctness)
{
Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0);
dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollScatter);
Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID")));
std::vector<int> pids(numDevices);
int pid1 = 0;
int pid2 = 0;
int pid3 = 0;
pid1 = fork();
// From this point on, ignore original process as we cannot have it create a HIP context
if (pid1 == 0)
int gpu = -1;
for (int i = 0; i < numDevices; i++)
{
pid2 = fork();
if (numDevices > 2)
gpu++;
int pid = fork();
if (pid == 0)
{
pid3 = fork();
}
if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2))
{
// Process 0
TestScatter(0, *dataset);
if (pid3 > 0)
{
waitpid(pid3, NULL, 0);
}
}
else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2))
{
// Process 1
TestScatter(1, *dataset);
if (numDevices > 2)
{
waitpid(pid3, NULL, 0);
}
exit(0);
}
else if (pid2 > 0 && pid3 == 0 && numDevices > 2)
{
// Process 2 (available when numDevices > 2)
TestScatter(2, *dataset);
exit(0);
}
else if (pid2 == 0 && pid3 == 0 && numDevices == 4)
{
// Process 3 (available when numDevices == 4)
TestScatter(3, *dataset);
exit(0);
bool pass;
TestScatter(gpu, *dataset, pass);
TerminateChildProcess(pass);
}
else
{
exit(0);
pids[gpu] = pid;
}
waitpid(pid2, NULL, 0);
exit(0);
}
waitpid(pid1, NULL, 0);
munmap(dataset, sizeof(Dataset));
ValidateProcesses(pids);
}
INSTANTIATE_TEST_SUITE_P(ScatterMultiProcessCorrectnessSweep,
@@ -88,7 +52,7 @@ namespace CorrectnessTests
// Number of elements
testing::Values(1024, 1048576),
// Number of devices
testing::Values(2,3,4),
testing::Values(2,3,4,8),
// In-place or not
testing::Values(false),
testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")),
+7 -3
Voir le fichier
@@ -23,12 +23,16 @@ namespace CorrectnessTests
}
}
void TestScatter(int rank, Dataset& dataset)
void TestScatter(int rank, Dataset& dataset, bool& pass)
{
// Prepare input / output / expected results
SetUpPerProcess(rank, ncclCollScatter, comms[rank], streams[rank], dataset);
if (numDevices > numDevicesAvailable) return;
if (numDevices > numDevicesAvailable)
{
pass = true;
return;
}
Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID")));
@@ -50,7 +54,7 @@ namespace CorrectnessTests
HIP_CALL(hipStreamSynchronize(streams[rank]));
// Check results
ValidateResults(dataset, rank);
pass = ValidateResults(dataset, rank);
barrier.Wait();
}