diff --git a/projects/rccl/install.sh b/projects/rccl/install.sh index 9c95219e31..fd495513b0 100755 --- a/projects/rccl/install.sh +++ b/projects/rccl/install.sh @@ -200,10 +200,10 @@ if ($run_tests); then if (test -f "./test/UnitTests"); then if ($run_tests_all); then ./test/UnitTests - NCCL_COMM_ID=$HOSTNAME:55512 ./test/UnitTestsMultiProcess + ./test/UnitTestsMultiProcess else ./test/UnitTests --gtest_filter="BroadcastCorrectnessSweep*:*float32*" - NCCL_COMM_ID=$HOSTNAME:55512 ./test/UnitTestsMultiProcess --gtest_filter="BroadcastMultiProcessCorrectnessSweep*:*float32*" + ./test/UnitTestsMultiProcess --gtest_filter="BroadcastMultiProcessCorrectnessSweep*:*float32*" fi else echo "Unit tests have not been built yet; please re-run script with -t to build unit tests." diff --git a/projects/rccl/src/clique/CliqueManager.cc b/projects/rccl/src/clique/CliqueManager.cc index d99f332dee..06dc8f539c 100644 --- a/projects/rccl/src/clique/CliqueManager.cc +++ b/projects/rccl/src/clique/CliqueManager.cc @@ -40,6 +40,7 @@ THE SOFTWARE. #include #include #include +#include cliqueDevicePtrs_t CliqueManager::m_staticCliquePtrs[NCCL_MAX_OPS] = {}; int* CliqueManager::m_staticGpuBarrierMem = NULL; @@ -502,28 +503,36 @@ void CliqueManager::WaitForBarrier() ncclResult_t CliqueManager::BootstrapRootInit(int pid, unsigned long hash) { - for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + if (rcclParamEnableClique()) { - int msgid, fd; - std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid); - SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd); - NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid)); - SYSCHECK(close(fd), "close"); + for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + { + int msgid, fd; + std::string msgQueueName = "/tmp/" + it->second + std::to_string(hash) + "_" + std::to_string(pid); + SYSCHECKVAL(open(msgQueueName.c_str(), O_CREAT | O_RDWR, 0606), "open", fd); + NCCLCHECK(MsgQueueGetId(msgQueueName, hash, true, msgid)); + SYSCHECK(unlink(msgQueueName.c_str()), "unlink"); + SYSCHECK(close(fd), "close"); + } + + std::string shmDir = "/dev/shm/"; + + for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + { + struct stat fileStatus; + std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid); + std::string shmFullPath = shmDir + shmFileName; + + // Check if shm file already exists; if so, unlink it + if (stat(shmFullPath.c_str(), &fileStatus) == 0) + { + NCCLCHECK(shmUnlink(shmFileName.c_str())); + } + } } - - std::string shmDir = "/dev/shm/"; - - for (auto it = CliqueShmNames.begin(); it != CliqueShmNames.end(); it++) + else { - struct stat fileStatus; - std::string shmFileName = it->second + std::to_string(hash) + "_" + std::to_string(pid); - std::string shmFullPath = shmDir + shmFileName; - - // Check if shm file already exists; if so, unlink it - if (stat(shmFullPath.c_str(), &fileStatus) == 0) - { - NCCLCHECK(shmUnlink(shmFileName.c_str())); - } + INFO(NCCL_INIT, "Not performing bootstrap root for clique kernels as clique mode not enabled."); } return ncclSuccess; } diff --git a/projects/rccl/test/CorrectnessTest.hpp b/projects/rccl/test/CorrectnessTest.hpp index b440db9d94..fd7187a810 100644 --- a/projects/rccl/test/CorrectnessTest.hpp +++ b/projects/rccl/test/CorrectnessTest.hpp @@ -702,13 +702,18 @@ namespace CorrectnessTests class MultiProcessCorrectnessTest : public CorrectnessTest { protected: + // IMPORTANT: We cannot have any HIP API calls in the parent process. + // Do any HIP setup in SetupPerProcess(). void SetUp() override { - // Check for NCCL_COMM_ID env variable (otherwise will not init) + // Check if NCCL_COMM_ID is already set; if not, set it now if (!getenv("NCCL_COMM_ID")) { - printf("Must set NCCL_COMM_ID prior to execution\n"); - exit(0); + char hostname[HOST_NAME_MAX+1]; + gethostname(hostname, HOST_NAME_MAX+1); + std::string hostnameString(hostname); + hostnameString.append(":55513"); + setenv("NCCL_COMM_ID", hostnameString.c_str(), 0); } // Make the test tuple parameters accessible @@ -737,10 +742,14 @@ namespace CorrectnessTests comms.resize(numDevices); streams.resize(numDevices); + dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); } void TearDown() override { + munmap(dataset, sizeof(Dataset)); + // Restore env vars after tests for (int i = 0; i < numTokens/2; i++) { if (savedEnv[i]) { @@ -773,9 +782,11 @@ namespace CorrectnessTests // Only proceed with testing if there are enough GPUs if (numDevices > numDevicesAvailable) { - fprintf(stdout, "[ SKIPPED ] Test requires %d devices (only %d available)\n", - numDevices, numDevicesAvailable); - + if (rank == 0) + { + fprintf(stdout, "[ SKIPPED ] Test requires %d devices (only %d available)\n", + numDevices, numDevicesAvailable); + } // Modify the number of devices so that tear-down doesn't occur // This is temporary until GTEST_SKIP() becomes available numDevices = 0; @@ -795,7 +806,7 @@ namespace CorrectnessTests if (res != ncclSuccess) { printf("Test failure:%s %d '%s' numRanks:%d\n", __FILE__,__LINE__,ncclGetErrorString(res), numDevices); - ASSERT_EQ(res, hipSuccess); + ASSERT_EQ(res, ncclSuccess); } } @@ -803,17 +814,22 @@ namespace CorrectnessTests void SetUpPerProcess(int rank, ncclFunc_t const func, ncclComm_t& comm, hipStream_t& stream, Dataset& dataset) { SetUpPerProcessHelper(rank, comm, stream); - dataset.Initialize(numDevices, numElements, dataType, inPlace, func, rank); + if (numDevices <= numDevicesAvailable) + { + dataset.Initialize(numDevices, numElements, dataType, inPlace, func, rank); + } } // To be called by each process/rank individually (see GroupCallsMultiProcess) void SetUpPerProcess(int rank, std::vector const& func, ncclComm_t& comm, hipStream_t& stream, std::vector& datasets) { SetUpPerProcessHelper(rank, comm, stream); - - for (int i = 0; i < datasets.size(); i++) + if (numDevices <= numDevicesAvailable) { - datasets[i]->Initialize(numDevices, numElements, dataType, inPlace, func[i], rank); + for (int i = 0; i < datasets.size(); i++) + { + datasets[i]->Initialize(numDevices, numElements, dataType, inPlace, func[i], rank); + } } } @@ -875,7 +891,7 @@ namespace CorrectnessTests free(arrayI1); } - void ValidateResults(Dataset const& dataset, int rank, int root = 0) const + bool ValidateResults(Dataset const& dataset, int rank, int root = 0) const { int8_t* outputI1 = (int8_t *)malloc(dataset.NumBytes(ncclOutputBuffer)); uint8_t* outputU1 = (uint8_t *)outputI1; @@ -894,8 +910,11 @@ namespace CorrectnessTests // only output on root rank is valid for gather collective if (dataset.function == ncclCollGather && rank != root) - return; - HIP_CALL(hipMemcpy(outputI1, dataset.outputs[rank], dataset.NumBytes(ncclOutputBuffer), hipMemcpyDeviceToHost)); + return true; + + hipError_t err = hipMemcpy(outputI1, dataset.outputs[rank], dataset.NumBytes(ncclOutputBuffer), hipMemcpyDeviceToHost); + if (err != hipSuccess) + return false; int8_t* expectedI1 = (int8_t *)dataset.expected[rank]; uint8_t* expectedU1 = (uint8_t *)expectedI1; @@ -953,8 +972,35 @@ namespace CorrectnessTests } } } - ASSERT_EQ(isMatch, true); + return isMatch; } + + void ValidateProcesses(std::vector const& pids) + { + int numProcesses = pids.size(); + int status[numProcesses]; + for (int i = 0; i < numProcesses; i++) + { + waitpid(pids[i], &status[i], 0); + + ASSERT_NE(WIFEXITED(status[i]), 0) << "[ERROR] Child process " << i << " did not exit cleanly."; + ASSERT_EQ(WEXITSTATUS(status[i]), EXIT_SUCCESS) << "[ERROR] Child process " << i << " had a test failure."; + } + } + + void TerminateChildProcess(bool const pass) + { + if (pass) + { + exit(EXIT_SUCCESS); + } + else + { + exit(EXIT_FAILURE); + } + } + + Dataset* dataset; }; std::string GenerateTestNameString(testing::TestParamInfo& info); diff --git a/projects/rccl/test/test_AllGatherMultiProcess.cpp b/projects/rccl/test/test_AllGatherMultiProcess.cpp index 370f12cc71..1a41298485 100644 --- a/projects/rccl/test/test_AllGatherMultiProcess.cpp +++ b/projects/rccl/test/test_AllGatherMultiProcess.cpp @@ -9,63 +9,27 @@ namespace CorrectnessTests { TEST_P(AllGatherMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllGather); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestAllGather(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestAllGather(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestAllGather(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestAllGather(3, *dataset); - exit(0); + bool pass; + TestAllGather(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(AllGatherMultiProcessCorrectnessSweep, @@ -87,7 +51,7 @@ namespace CorrectnessTests // Number of elements testing::Values(3072, 3145728), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_AllGatherMultiProcess.hpp b/projects/rccl/test/test_AllGatherMultiProcess.hpp index b881d4b7b9..46e4504d5d 100644 --- a/projects/rccl/test/test_AllGatherMultiProcess.hpp +++ b/projects/rccl/test/test_AllGatherMultiProcess.hpp @@ -13,38 +13,50 @@ namespace CorrectnessTests class AllGatherMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, int const rank, int const numDevices) + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, int const numDevices, std::vector const& ranks) { size_t const byteCount = dataset.NumBytes() / dataset.numDevices; - HIP_CALL(hipMemcpy(static_cast(dataset.expected[0]) + rank * byteCount, (int8_t *)dataset.inputs[rank] + (rank * byteCount), - byteCount, hipMemcpyDeviceToHost)); - - barrier.Wait(); - // Rank 0 sends answer to other ranks - if (rank == 0) + for (int i = 0; i < ranks.size(); i++) { - for (int i = 0; i < dataset.numDevices; i++) + int rank = ranks[i]; + HIP_CALL(hipMemcpy(static_cast(dataset.expected[0]) + rank * byteCount, (int8_t *)dataset.inputs[rank] + (rank * byteCount), + byteCount, hipMemcpyDeviceToHost)); + } + barrier.Wait(); + + // Rank 0 sends answer to other ranks + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + if (rank == 0) { - if (i == rank) continue; - memcpy(dataset.expected[i], dataset.expected[0], dataset.NumBytes()); + for (int i = 0; i < dataset.numDevices; i++) + { + if (i == rank) continue; + memcpy(dataset.expected[i], dataset.expected[0], dataset.NumBytes()); + } } } } - void TestAllGather(int rank, Dataset& dataset) + void TestAllGather(int rank, Dataset& dataset, bool& pass) { // Prepare input / output / expected results SetUpPerProcess(rank, ncclCollAllGather, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; - if (numElements % numDevices != 0) return; + if (numDevices > numDevicesAvailable || numElements % numDevices != 0) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); // Prepare input / output / expected results FillDatasetWithPattern(dataset, rank); - ComputeExpectedResults(dataset, barrier, rank, numDevices); + + ComputeExpectedResults(dataset, barrier, numDevices, std::vector(1, rank)); size_t const byteCount = dataset.NumBytes() / numDevices; size_t const sendCount = dataset.numElements / numDevices; @@ -58,7 +70,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); TearDownPerProcess(comms[rank], streams[rank]); dataset.Release(rank); diff --git a/projects/rccl/test/test_AllReduceMultiProcess.cpp b/projects/rccl/test/test_AllReduceMultiProcess.cpp index 2a5500e629..34ea532f40 100644 --- a/projects/rccl/test/test_AllReduceMultiProcess.cpp +++ b/projects/rccl/test/test_AllReduceMultiProcess.cpp @@ -10,63 +10,27 @@ namespace CorrectnessTests { TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestAllReduce(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestAllReduce(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestAllReduce(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestAllReduce(3, *dataset); - exit(0); + bool pass; + TestAllReduce(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep, @@ -88,7 +52,7 @@ namespace CorrectnessTests // Number of elements testing::Values(1024, 1048576), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_AllReduceMultiProcess.hpp b/projects/rccl/test/test_AllReduceMultiProcess.hpp index 2aa8e27087..f7c00de221 100644 --- a/projects/rccl/test/test_AllReduceMultiProcess.hpp +++ b/projects/rccl/test/test_AllReduceMultiProcess.hpp @@ -13,13 +13,17 @@ namespace CorrectnessTests class AllReduceMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const rank) + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, std::vector const& ranks) { // Copy all inputs to expected arrays temporarily to perform reduction on host - HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], - dataset.NumBytes(), hipMemcpyDeviceToHost)); - + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } barrier.Wait(); + // Allocate temporary host array to accumulate results int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes()); uint8_t* resultU1 = (uint8_t *)resultI1; @@ -68,23 +72,31 @@ namespace CorrectnessTests } } barrier.Wait(); - // Copy results into expected array - memcpy(dataset.expected[rank], resultI1, dataset.NumBytes()); + // Copy results into expected array + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + memcpy(dataset.expected[rank], resultI1, dataset.NumBytes()); + } free(resultI1); } - void TestAllReduce(int rank, Dataset& dataset) + void TestAllReduce(int rank, Dataset& dataset, bool& pass) { SetUpPerProcess(rank, ncclCollAllReduce, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); // Prepare input / output / expected results FillDatasetWithPattern(dataset, rank); - ComputeExpectedResults(dataset, barrier, op, rank); + ComputeExpectedResults(dataset, barrier, op, std::vector(1, rank)); // Launch the reduction ncclAllReduce(dataset.inputs[rank], dataset.outputs[rank], @@ -94,7 +106,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); TearDownPerProcess(comms[rank], streams[rank]); dataset.Release(rank); diff --git a/projects/rccl/test/test_AllToAllMultiProcess.cpp b/projects/rccl/test/test_AllToAllMultiProcess.cpp index e607861833..842c1ee9e7 100644 --- a/projects/rccl/test/test_AllToAllMultiProcess.cpp +++ b/projects/rccl/test/test_AllToAllMultiProcess.cpp @@ -10,62 +10,27 @@ namespace CorrectnessTests { TEST_P(AllToAllMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllToAll); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestAllToAll(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestAllToAll(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestAllToAll(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestAllToAll(3, *dataset); - exit(0); + bool pass; + TestAllToAll(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(AllToAllMultiProcessCorrectnessSweep, @@ -87,7 +52,7 @@ namespace CorrectnessTests // Number of elements testing::Values(1024, 1048576), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false), testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")), diff --git a/projects/rccl/test/test_AllToAllMultiProcess.hpp b/projects/rccl/test/test_AllToAllMultiProcess.hpp index 8fbd56f8a0..76205608c9 100644 --- a/projects/rccl/test/test_AllToAllMultiProcess.hpp +++ b/projects/rccl/test/test_AllToAllMultiProcess.hpp @@ -13,24 +13,32 @@ namespace CorrectnessTests class AllToAllMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - static void ComputeExpectedResults(Dataset& dataset, int const rank) + static void ComputeExpectedResults(Dataset& dataset, std::vector const& ranks) { - for (int i = 0; i < dataset.numDevices; i++) + for (int i = 0; i < ranks.size(); i++) { - HIP_CALL(hipMemcpy((int8_t *)dataset.expected[i]+dataset.NumBytes()*rank, (int8_t *)dataset.inputs[rank]+dataset.NumBytes()*i, - dataset.NumBytes(), hipMemcpyDeviceToHost)); + int rank = ranks[i]; + for (int j = 0; j < dataset.numDevices; j++) + { + HIP_CALL(hipMemcpy((int8_t *)dataset.expected[j]+dataset.NumBytes()*rank, (int8_t *)dataset.inputs[rank]+dataset.NumBytes()*j, + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } } } - void TestAllToAll(int rank, Dataset& dataset) + void TestAllToAll(int rank, Dataset& dataset, bool& pass) { SetUpPerProcess(rank, ncclCollAllToAll, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } // Prepare input / output / expected results FillDatasetWithPattern(dataset, rank); - ComputeExpectedResults(dataset, rank); + ComputeExpectedResults(dataset, std::vector(1, rank)); // Launch the reduction ncclAllToAll(dataset.inputs[rank], @@ -42,7 +50,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); TearDownPerProcess(comms[rank], streams[rank]); dataset.Release(rank); diff --git a/projects/rccl/test/test_BroadcastMultiProcess.cpp b/projects/rccl/test/test_BroadcastMultiProcess.cpp index 1d43c6440a..793f118eda 100644 --- a/projects/rccl/test/test_BroadcastMultiProcess.cpp +++ b/projects/rccl/test/test_BroadcastMultiProcess.cpp @@ -17,63 +17,27 @@ namespace CorrectnessTests { TEST_P(BroadcastMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollBroadcast); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestBroadcast(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestBroadcast(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestBroadcast(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestBroadcast(3, *dataset); - exit(0); + bool pass; + TestBroadcast(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(BroadcastMultiProcessCorrectnessSweep, @@ -95,7 +59,7 @@ namespace CorrectnessTests // Number of elements testing::Values(1024, 1048576), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_BroadcastMultiProcess.hpp b/projects/rccl/test/test_BroadcastMultiProcess.hpp index 0075faeeb7..1b5a36e57c 100644 --- a/projects/rccl/test/test_BroadcastMultiProcess.hpp +++ b/projects/rccl/test/test_BroadcastMultiProcess.hpp @@ -13,26 +13,35 @@ namespace CorrectnessTests class BroadcastMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - static void ComputeExpectedResults(Dataset& dataset, int const root, int const rank) + static void ComputeExpectedResults(Dataset& dataset, int const root, std::vector const& ranks) { - // Root has the answer; share it via host memcpy's - if (rank == root) + for (int h = 0; h < ranks.size(); h++) { - HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], - dataset.NumBytes(), hipMemcpyDeviceToHost)); - for (int i = 0; i < dataset.numDevices; i++) + int rank = ranks[h]; + // Root has the answer; share it via host memcpy's + if (rank == root) { - if (i == rank) continue; - memcpy(dataset.expected[i], dataset.expected[root], dataset.NumBytes()); + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + for (int i = 0; i < dataset.numDevices; i++) + { + if (i == rank) continue; + memcpy(dataset.expected[i], dataset.expected[root], dataset.NumBytes()); + } + break; } } } - void TestBroadcast(int rank, Dataset& dataset) + void TestBroadcast(int rank, Dataset& dataset, bool& pass) { SetUpPerProcess(rank, ncclCollBroadcast, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); @@ -41,7 +50,7 @@ namespace CorrectnessTests { // Prepare input / output / expected results FillDatasetWithPattern(dataset, rank); - ComputeExpectedResults(dataset, root, rank); + ComputeExpectedResults(dataset, root, std::vector(1, rank)); // Launch the reduction (1 process per GPU) ncclResult_t res = ncclBroadcast(dataset.inputs[rank], @@ -53,7 +62,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); // Ensure all processes have finished current iteration before proceeding barrier.Wait(); diff --git a/projects/rccl/test/test_CombinedCallsMultiProcess.cpp b/projects/rccl/test/test_CombinedCallsMultiProcess.cpp index f7a4f10b4e..9f030f3fcd 100644 --- a/projects/rccl/test/test_CombinedCallsMultiProcess.cpp +++ b/projects/rccl/test/test_CombinedCallsMultiProcess.cpp @@ -25,60 +25,28 @@ namespace CorrectnessTests datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]); } - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); + std::vector pids(numDevices); - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestCombinedCalls(0, datasets, ncclFuncs); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestCombinedCalls(1, datasets, ncclFuncs); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestCombinedCalls(2, datasets, ncclFuncs); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestCombinedCalls(3, datasets, ncclFuncs); - exit(0); + bool pass; + TestCombinedCalls(gpu, datasets, ncclFuncs, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); + + ValidateProcesses(pids); + for (int i = 0; i < datasets.size(); i++) { munmap(datasets[i], sizeof(Dataset)); @@ -104,7 +72,7 @@ namespace CorrectnessTests // Number of elements testing::Values(3072, 3145728), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_CombinedCallsMultiProcess.hpp b/projects/rccl/test/test_CombinedCallsMultiProcess.hpp index e48902514f..9b7ba6bf8b 100644 --- a/projects/rccl/test/test_CombinedCallsMultiProcess.hpp +++ b/projects/rccl/test/test_CombinedCallsMultiProcess.hpp @@ -20,21 +20,26 @@ namespace CorrectnessTests class CombinedCallsMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - void TestCombinedCalls(int rank, std::vector& datasets, std::vector const& funcs) + void TestCombinedCalls(int rank, std::vector& datasets, std::vector const& funcs, bool& pass) { SetUpPerProcess(rank, funcs, comms[rank], streams[rank], datasets); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); // Compute expected results for each dataset in combined int const root = 0; - AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, rank, numDevices); - AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, rank); - BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, rank); - ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, rank); - ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, rank); + std::vector ranks(1, rank); + AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks); + AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks); + BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks); + ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks); + ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks); size_t const byteCount = datasets[0]->NumBytes() / numDevices; size_t const elemCount = numElements / numDevices; @@ -64,12 +69,26 @@ namespace CorrectnessTests // Wait for reduction to complete HIP_CALL(hipStreamSynchronize(streams[rank])); - // Check results for each collective in the combined - for (int i = 0; i < 5; i++) + for (int i = 0; i < funcs.size(); i++) { - ValidateResults(*datasets[i], rank); + for (int j = 0; j < ranks.size(); j++) + { + pass = ValidateResults(*datasets[i], ranks[j], root); + if (!pass) + { + break; + } + } barrier.Wait(); - datasets[i]->Release(rank); + for (int j = 0; j < ranks.size(); j++) + { + datasets[i]->Release(ranks[j]); + } + } + + for (int i = 0; i < ranks.size(); i++) + { + TearDownPerProcess(comms[ranks[i]], streams[ranks[i]]); } } }; diff --git a/projects/rccl/test/test_GatherMultiProcess.cpp b/projects/rccl/test/test_GatherMultiProcess.cpp index 7627a15e89..2ba477e168 100644 --- a/projects/rccl/test/test_GatherMultiProcess.cpp +++ b/projects/rccl/test/test_GatherMultiProcess.cpp @@ -10,63 +10,27 @@ namespace CorrectnessTests { TEST_P(GatherMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollGather); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestGather(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestGather(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestGather(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestGather(3, *dataset); - exit(0); + bool pass; + TestGather(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(GatherMultiProcessCorrectnessSweep, @@ -88,7 +52,7 @@ namespace CorrectnessTests // Number of elements testing::Values(1024, 1048576), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false), testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")), diff --git a/projects/rccl/test/test_GatherMultiProcess.hpp b/projects/rccl/test/test_GatherMultiProcess.hpp index e26c7f319b..0f05e7bef8 100644 --- a/projects/rccl/test/test_GatherMultiProcess.hpp +++ b/projects/rccl/test/test_GatherMultiProcess.hpp @@ -19,11 +19,15 @@ namespace CorrectnessTests dataset.NumBytes(), hipMemcpyDeviceToHost)); } - void TestGather(int rank, Dataset& dataset) + void TestGather(int rank, Dataset& dataset, bool& pass) { SetUpPerProcess(rank, ncclCollGather, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); @@ -44,7 +48,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank, root); + pass = ValidateResults(dataset, rank, root); // Ensure all processes have finished current iteration before proceeding barrier.Wait(); diff --git a/projects/rccl/test/test_GroupCallsMultiProcess.cpp b/projects/rccl/test/test_GroupCallsMultiProcess.cpp index ce7b7ecfa7..b6f8c3fce3 100644 --- a/projects/rccl/test/test_GroupCallsMultiProcess.cpp +++ b/projects/rccl/test/test_GroupCallsMultiProcess.cpp @@ -25,75 +25,39 @@ namespace CorrectnessTests datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]); } - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); + int const numGpusPerProcess = 2; + int const numProcesses = numDevices / numGpusPerProcess; + std::vector pids(numProcesses); + int process = -1; - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + for (int i = 0; i < numDevices; i+= numGpusPerProcess) { - pid2 = fork(); - if (numDevices > 4) + process++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 4) || (pid2 > 0 && pid3 > 0 && numDevices > 4)) - { - // Process 0 - std::vector ranks; - ranks.push_back(0); - ranks.push_back(1); + int gpuIdx = i; + int maxIdx = gpuIdx + (numGpusPerProcess - 1) >= numDevices ? numDevices : gpuIdx + numGpusPerProcess; - TestGroupCalls(0, ranks, datasets, ncclFuncs); - if (pid3 > 0) + std::vector ranks; + for (; gpuIdx < maxIdx; gpuIdx++) { - waitpid(pid3, NULL, 0); + ranks.push_back(gpuIdx); } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 4) || (pid2 == 0 && pid3 > 0 && numDevices > 4)) - { - // Process 1 - std::vector ranks; - ranks.push_back(2); - ranks.push_back(3); - TestGroupCalls(1, ranks, datasets, ncclFuncs); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices == 8) - { - // Process 2 (available when numDevices == 8) - std::vector ranks; - ranks.push_back(4); - ranks.push_back(5); - TestGroupCalls(2, ranks, datasets, ncclFuncs); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 8) - { - // Process 3 (available when numDevices == 8) - std::vector ranks; - ranks.push_back(6); - ranks.push_back(7); - - TestGroupCalls(3, ranks, datasets, ncclFuncs); - exit(0); + bool pass; + TestGroupCalls(process, ranks, datasets, ncclFuncs, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[process] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); + + ValidateProcesses(pids); + for (int i = 0; i < datasets.size(); i++) { munmap(datasets[i], sizeof(Dataset)); @@ -119,7 +83,7 @@ namespace CorrectnessTests // Number of elements testing::Values(3072, 3145728), // Number of devices - testing::Values(4), + testing::Values(4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_GroupCallsMultiProcess.hpp b/projects/rccl/test/test_GroupCallsMultiProcess.hpp index bc38688923..8942ba0b1e 100644 --- a/projects/rccl/test/test_GroupCallsMultiProcess.hpp +++ b/projects/rccl/test/test_GroupCallsMultiProcess.hpp @@ -21,34 +21,50 @@ namespace CorrectnessTests class GroupCallsMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - void TestGroupCalls(int process, std::vector const& ranks, std::vector& datasets, std::vector const& funcs) + void TestGroupCalls(int process, std::vector const& ranks, std::vector& datasets, std::vector const& funcs, bool& pass) { - if (numDevices > numDevicesAvailable) return; - + ncclGroupStart(); for (int i = 0; i < ranks.size(); i++) { SetUpPerProcess(ranks[i], funcs, comms[ranks[i]], streams[ranks[i]], datasets); + if (numDevices > numDevicesAvailable) + { + break; + } + } + ncclGroupEnd(); + + if (numDevices > numDevicesAvailable) + { + pass = true; + return; } int numProcesses = numDevices / ranks.size(); Barrier barrier(process, numProcesses, std::atoi(getenv("NCCL_COMM_ID"))); - int const root = 0; for (int i = 0; i < ranks.size(); i++) { - AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks[i]); - AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks[i]); - BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks[i]); - ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks[i]); - ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks[i]); + for (int j = 0; j < datasets.size(); j++) + { + FillDatasetWithPattern(*datasets[j], ranks[i]); + } } + + int const root = 0; + + AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks); + AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks); + BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks); + ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks); + ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks); barrier.Wait(); - ncclGroupStart(); - - // AllGather size_t const byteCount = datasets[0]->NumBytes() / numDevices; size_t const elemCount = numElements / numDevices; + + ncclGroupStart(); + // AllGather for (int i = 0; i < ranks.size(); i++) { int rank = ranks[i]; @@ -90,7 +106,7 @@ namespace CorrectnessTests { int rank = ranks[i]; ncclReduceScatter(datasets[4]->inputs[rank], - (int8_t *)datasets[4]->outputs[rank] + (i * byteCount), + (int8_t *)datasets[4]->outputs[rank] + (rank * byteCount), elemCount, dataType, op, comms[rank], streams[rank]); } @@ -108,7 +124,11 @@ namespace CorrectnessTests { for (int j = 0; j < ranks.size(); j++) { - ValidateResults(*datasets[i], ranks[j]); + pass = ValidateResults(*datasets[i], ranks[j], root); + if (!pass) + { + break; + } } barrier.Wait(); for (int j = 0; j < ranks.size(); j++) diff --git a/projects/rccl/test/test_ReduceMultiProcess.cpp b/projects/rccl/test/test_ReduceMultiProcess.cpp index bf495d382a..7ac0462a98 100644 --- a/projects/rccl/test/test_ReduceMultiProcess.cpp +++ b/projects/rccl/test/test_ReduceMultiProcess.cpp @@ -10,63 +10,27 @@ namespace CorrectnessTests { TEST_P(ReduceMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollReduce); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestReduce(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestReduce(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestReduce(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestReduce(3, *dataset); - exit(0); + bool pass; + TestReduce(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(ReduceMultiProcessCorrectnessSweep, @@ -88,7 +52,7 @@ namespace CorrectnessTests // Number of elements testing::Values(1024, 1048576), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_ReduceMultiProcess.hpp b/projects/rccl/test/test_ReduceMultiProcess.hpp index 65977608b8..173cb241b9 100644 --- a/projects/rccl/test/test_ReduceMultiProcess.hpp +++ b/projects/rccl/test/test_ReduceMultiProcess.hpp @@ -13,77 +13,93 @@ namespace CorrectnessTests class ReduceMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const root, int const rank) + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const root, std::vector const& ranks) { // Copy all inputs to expected arrays temporarily to perform reduction on host - HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], - dataset.NumBytes(), hipMemcpyDeviceToHost)); + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } barrier.Wait(); - if (rank == root) + for (int h = 0; h < ranks.size(); h++) { - // Allocate temporary host array to accumulate results - int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes()); - uint8_t* resultU1 = (uint8_t *)resultI1; - int32_t* resultI4 = (int32_t *)resultI1; - uint32_t* resultU4 = (uint32_t *)resultI1; - int64_t* resultI8 = (int64_t *)resultI1; - uint64_t* resultU8 = (uint64_t *)resultI1; - float* resultF4 = (float *)resultI1; - double* resultF8 = (double *)resultI1; - rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; - - // Initialize the result with the first device's array - memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); - - // Perform reduction on the other device arrays - for (int i = 1; i < dataset.numDevices; i++) + int rank = ranks[h]; + if (rank == root) { - int8_t* arrayI1 = (int8_t *)dataset.expected[i]; - uint8_t* arrayU1 = (uint8_t *)arrayI1; - int32_t* arrayI4 = (int32_t *)arrayI1; - uint32_t* arrayU4 = (uint32_t *)arrayI1; - int64_t* arrayI8 = (int64_t *)arrayI1; - uint64_t* arrayU8 = (uint64_t *)arrayI1; - float* arrayF4 = (float *)arrayI1; - double* arrayF8 = (double *)arrayI1; - rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + // Allocate temporary host array to accumulate results + int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes()); + uint8_t* resultU1 = (uint8_t *)resultI1; + int32_t* resultI4 = (int32_t *)resultI1; + uint32_t* resultU4 = (uint32_t *)resultI1; + int64_t* resultI8 = (int64_t *)resultI1; + uint64_t* resultU8 = (uint64_t *)resultI1; + float* resultF4 = (float *)resultI1; + double* resultF8 = (double *)resultI1; + rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; - for (int j = 0; j < dataset.numElements; j++) + // Initialize the result with the first device's array + memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); + + // Perform reduction on the other device arrays + for (int i = 1; i < dataset.numDevices; i++) { - switch (dataset.dataType) + int8_t* arrayI1 = (int8_t *)dataset.expected[i]; + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + for (int j = 0; j < dataset.numElements; j++) { - case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; - case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; - case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; - case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; - case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; - case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; - case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; - case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; - case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; - default: - fprintf(stderr, "[ERROR] Unsupported datatype\n"); - exit(0); + switch (dataset.dataType) + { + case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; + case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; + case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; + case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; + case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; + case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; + case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; + case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; + case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } } } + memcpy(dataset.expected[root], resultI1, dataset.NumBytes()); + free(resultI1); } - memcpy(dataset.expected[root], resultI1, dataset.NumBytes()); - free(resultI1); - barrier.Wait(); } - else + barrier.Wait(); + + for (int i = 0; i < ranks.size(); i++) { - barrier.Wait(); - HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], dataset.NumBytes(), hipMemcpyDeviceToHost)); + int rank = ranks[i]; + if (rank != root) + { + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], dataset.NumBytes(), hipMemcpyDeviceToHost)); + } } } - void TestReduce(int rank, Dataset& dataset) + void TestReduce(int rank, Dataset& dataset, bool& pass) { SetUpPerProcess(rank, ncclCollReduce, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); @@ -92,7 +108,7 @@ namespace CorrectnessTests { // Prepare input / output / expected results FillDatasetWithPattern(dataset, rank); - ComputeExpectedResults(dataset, barrier, op, root, rank); + ComputeExpectedResults(dataset, barrier, op, root, std::vector(1, rank)); // Launch the reduction (1 process per GPU) ncclResult_t res = ncclReduce(dataset.inputs[rank], dataset.outputs[rank], @@ -101,7 +117,7 @@ namespace CorrectnessTests // Wait for reduction to complete HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); // Ensure all processes have finished current iteration before proceeding barrier.Wait(); } diff --git a/projects/rccl/test/test_ReduceScatterMultiProcess.cpp b/projects/rccl/test/test_ReduceScatterMultiProcess.cpp index 48c4b0cb9b..1f230c64a3 100644 --- a/projects/rccl/test/test_ReduceScatterMultiProcess.cpp +++ b/projects/rccl/test/test_ReduceScatterMultiProcess.cpp @@ -10,63 +10,27 @@ namespace CorrectnessTests { TEST_P(ReduceScatterMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollReduceScatter); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestReduceScatter(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestReduceScatter(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestReduceScatter(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestReduceScatter(3, *dataset); - exit(0); + bool pass; + TestReduceScatter(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(ReduceScatterMultiProcessCorrectnessSweep, @@ -88,7 +52,7 @@ namespace CorrectnessTests // Number of elements testing::Values(3072, 3145728), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false, true), testing::Values("")), diff --git a/projects/rccl/test/test_ReduceScatterMultiProcess.hpp b/projects/rccl/test/test_ReduceScatterMultiProcess.hpp index a38ddf2148..2b57b42f63 100644 --- a/projects/rccl/test/test_ReduceScatterMultiProcess.hpp +++ b/projects/rccl/test/test_ReduceScatterMultiProcess.hpp @@ -13,60 +13,68 @@ namespace CorrectnessTests class ReduceScatterMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest { public: - static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const rank) + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, std::vector const& ranks) { // Copy all inputs to expected arrays temporarily to perform reduction on host - HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], - dataset.NumBytes(), hipMemcpyDeviceToHost)); + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } barrier.Wait(); // Have rank 0 do the expected calculation, then send results to other processes - int8_t* resultI1; - if (rank == 0) + int8_t* resultI1; + for (int h = 0; h < ranks.size(); h++) { - // Allocate temporary host array to accumulate results - resultI1 = (int8_t *)malloc(dataset.NumBytes()); - uint8_t* resultU1 = (uint8_t *)resultI1; - int32_t* resultI4 = (int32_t *)resultI1; - uint32_t* resultU4 = (uint32_t *)resultI1; - int64_t* resultI8 = (int64_t *)resultI1; - uint64_t* resultU8 = (uint64_t *)resultI1; - float* resultF4 = (float *)resultI1; - double* resultF8 = (double *)resultI1; - rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; - - // Initialize the result with the first device's array - memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); - - // Perform reduction on the other device arrays - for (int i = 1; i < dataset.numDevices; i++) + int rank = ranks[h]; + if (rank == 0) { - int8_t* arrayI1 = (int8_t *)dataset.expected[i]; - uint8_t* arrayU1 = (uint8_t *)arrayI1; - int32_t* arrayI4 = (int32_t *)arrayI1; - uint32_t* arrayU4 = (uint32_t *)arrayI1; - int64_t* arrayI8 = (int64_t *)arrayI1; - uint64_t* arrayU8 = (uint64_t *)arrayI1; - float* arrayF4 = (float *)arrayI1; - double* arrayF8 = (double *)arrayI1; - rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + // Allocate temporary host array to accumulate results + resultI1 = (int8_t *)malloc(dataset.NumBytes()); + uint8_t* resultU1 = (uint8_t *)resultI1; + int32_t* resultI4 = (int32_t *)resultI1; + uint32_t* resultU4 = (uint32_t *)resultI1; + int64_t* resultI8 = (int64_t *)resultI1; + uint64_t* resultU8 = (uint64_t *)resultI1; + float* resultF4 = (float *)resultI1; + double* resultF8 = (double *)resultI1; + rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; - for (int j = 0; j < dataset.numElements; j++) + // Initialize the result with the first device's array + memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); + + // Perform reduction on the other device arrays + for (int i = 1; i < dataset.numDevices; i++) { - switch (dataset.dataType) + int8_t* arrayI1 = (int8_t *)dataset.expected[i]; + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + for (int j = 0; j < dataset.numElements; j++) { - case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; - case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; - case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; - case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; - case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; - case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; - case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; - case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; - case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; - default: - fprintf(stderr, "[ERROR] Unsupported datatype\n"); - exit(0); + switch (dataset.dataType) + { + case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; + case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; + case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; + case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; + case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; + case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; + case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; + case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; + case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } } } } @@ -75,34 +83,44 @@ namespace CorrectnessTests // Copy results into expected arrays size_t const byteCount = dataset.NumBytes() / dataset.numDevices; - HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], - dataset.NumBytes(), hipMemcpyDeviceToHost)); - + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } barrier.Wait(); - if (rank == 0) + for (int h = 0; h < ranks.size(); h++) { - for (int i = 0; i < dataset.numDevices; i++) - memcpy((int8_t *)dataset.expected[i] + (i * byteCount), - resultI1 + (i * byteCount), byteCount); + int rank = ranks[h]; + if (rank == 0) + { + for (int i = 0; i < dataset.numDevices; i++) + memcpy((int8_t *)dataset.expected[i] + (i * byteCount), + resultI1 + (i * byteCount), byteCount); - free(resultI1); + free(resultI1); + } } } - void TestReduceScatter(int rank, Dataset& dataset) + void TestReduceScatter(int rank, Dataset& dataset, bool& pass) { // Prepare input / output / expected results SetUpPerProcess(rank, ncclCollAllGather, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; - if (numElements % numDevices != 0) return; + if (numDevices > numDevicesAvailable || numElements % numDevices != 0) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); // Prepare input / output / expected results FillDatasetWithPattern(dataset, rank); - ComputeExpectedResults(dataset, barrier, op, rank); + ComputeExpectedResults(dataset, barrier, op, std::vector(1, rank)); size_t const byteCount = dataset.NumBytes() / numDevices; size_t const recvCount = dataset.numElements / numDevices; @@ -117,7 +135,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); TearDownPerProcess(comms[rank], streams[rank]); dataset.Release(rank); diff --git a/projects/rccl/test/test_ScatterMultiProcess.cpp b/projects/rccl/test/test_ScatterMultiProcess.cpp index 4211ace652..0f12c84308 100644 --- a/projects/rccl/test/test_ScatterMultiProcess.cpp +++ b/projects/rccl/test/test_ScatterMultiProcess.cpp @@ -10,63 +10,27 @@ namespace CorrectnessTests { TEST_P(ScatterMultiProcessCorrectnessTest, Correctness) { - Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollScatter); - Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + std::vector pids(numDevices); - int pid1 = 0; - int pid2 = 0; - int pid3 = 0; - pid1 = fork(); - - // From this point on, ignore original process as we cannot have it create a HIP context - if (pid1 == 0) + int gpu = -1; + for (int i = 0; i < numDevices; i++) { - pid2 = fork(); - if (numDevices > 2) + gpu++; + int pid = fork(); + if (pid == 0) { - pid3 = fork(); - } - if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) - { - // Process 0 - TestScatter(0, *dataset); - if (pid3 > 0) - { - waitpid(pid3, NULL, 0); - } - } - else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) - { - // Process 1 - TestScatter(1, *dataset); - if (numDevices > 2) - { - waitpid(pid3, NULL, 0); - } - exit(0); - } - else if (pid2 > 0 && pid3 == 0 && numDevices > 2) - { - // Process 2 (available when numDevices > 2) - TestScatter(2, *dataset); - exit(0); - } - else if (pid2 == 0 && pid3 == 0 && numDevices == 4) - { - // Process 3 (available when numDevices == 4) - TestScatter(3, *dataset); - exit(0); + bool pass; + TestScatter(gpu, *dataset, pass); + TerminateChildProcess(pass); } else { - exit(0); + pids[gpu] = pid; } - waitpid(pid2, NULL, 0); - exit(0); } - waitpid(pid1, NULL, 0); - munmap(dataset, sizeof(Dataset)); + + ValidateProcesses(pids); } INSTANTIATE_TEST_SUITE_P(ScatterMultiProcessCorrectnessSweep, @@ -88,7 +52,7 @@ namespace CorrectnessTests // Number of elements testing::Values(1024, 1048576), // Number of devices - testing::Values(2,3,4), + testing::Values(2,3,4,8), // In-place or not testing::Values(false), testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")), diff --git a/projects/rccl/test/test_ScatterMultiProcess.hpp b/projects/rccl/test/test_ScatterMultiProcess.hpp index d48040246d..0a14916c92 100644 --- a/projects/rccl/test/test_ScatterMultiProcess.hpp +++ b/projects/rccl/test/test_ScatterMultiProcess.hpp @@ -23,12 +23,16 @@ namespace CorrectnessTests } } - void TestScatter(int rank, Dataset& dataset) + void TestScatter(int rank, Dataset& dataset, bool& pass) { // Prepare input / output / expected results SetUpPerProcess(rank, ncclCollScatter, comms[rank], streams[rank], dataset); - if (numDevices > numDevicesAvailable) return; + if (numDevices > numDevicesAvailable) + { + pass = true; + return; + } Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); @@ -50,7 +54,7 @@ namespace CorrectnessTests HIP_CALL(hipStreamSynchronize(streams[rank])); // Check results - ValidateResults(dataset, rank); + pass = ValidateResults(dataset, rank); barrier.Wait(); }