diff --git a/install.sh b/install.sh index 8453ac53ed..9c95219e31 100755 --- a/install.sh +++ b/install.sh @@ -1,5 +1,5 @@ #!/bin/bash -# Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. +# Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. # ################################################# # helper functions @@ -200,8 +200,10 @@ if ($run_tests); then if (test -f "./test/UnitTests"); then if ($run_tests_all); then ./test/UnitTests + NCCL_COMM_ID=$HOSTNAME:55512 ./test/UnitTestsMultiProcess else ./test/UnitTests --gtest_filter="BroadcastCorrectnessSweep*:*float32*" + NCCL_COMM_ID=$HOSTNAME:55512 ./test/UnitTestsMultiProcess --gtest_filter="BroadcastMultiProcessCorrectnessSweep*:*float32*" fi else echo "Unit tests have not been built yet; please re-run script with -t to build unit tests." diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 617e5799cc..485f83311d 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -1,14 +1,14 @@ -# Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. +# Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. cmake_minimum_required(VERSION 2.8.12) if(BUILD_TESTS) message("Going to build unit tests (Installed in /test/UnitTests)") - include_directories(${GTEST_INCLUDE_DIRS}) - + include_directories(${GTEST_INCLUDE_DIRS}) + # Collect source files for tests - set(TEST_SOURCES + set(TEST_SOURCES_SINGLE_PROCESS test_AllGather.cpp test_AllReduce.cpp test_Broadcast.cpp @@ -24,18 +24,39 @@ if(BUILD_TESTS) test_AllToAllv.cpp ) - add_executable(UnitTests ${TEST_SOURCES}) + set(TEST_SOURCES_MULTI_PROCESS + test_AllGatherMultiProcess.cpp + test_AllReduceMultiProcess.cpp + test_AllToAllMultiProcess.cpp + test_BroadcastMultiProcess.cpp + test_CombinedCallsMultiProcess.cpp + test_GatherMultiProcess.cpp + test_GroupCallsMultiProcess.cpp + test_ReduceMultiProcess.cpp + test_ReduceScatterMultiProcess.cpp + test_ScatterMultiProcess.cpp + ) + + add_executable(UnitTests ${TEST_SOURCES_SINGLE_PROCESS}) target_include_directories(UnitTests PRIVATE /opt/rocm ${GTEST_INCLUDE_DIRS}) target_link_libraries(UnitTests PRIVATE ${GTEST_BOTH_LIBRARIES}) + add_executable(UnitTestsMultiProcess ${TEST_SOURCES_MULTI_PROCESS}) + target_include_directories(UnitTestsMultiProcess PRIVATE /opt/rocm ${GTEST_INCLUDE_DIRS}) + target_link_libraries(UnitTestsMultiProcess PRIVATE ${GTEST_BOTH_LIBRARIES}) + # UnitTests using static library of rccl requires passing rccl # through -l and -L instead of command line input. if(BUILD_STATIC) add_dependencies(UnitTests rccl) target_link_libraries(UnitTests PRIVATE dl rt numa -lrccl -L${CMAKE_BINARY_DIR}) target_link_libraries(UnitTests PRIVATE amdhip64 amd_comgr hsa-runtime64::hsa-runtime64) + add_dependencies(UnitTestsMultiProcess rccl) + target_link_libraries(UnitTestsMultiProcess PRIVATE dl rt numa -lrccl -L${CMAKE_BINARY_DIR}) + target_link_libraries(UnitTestsMultiProcess PRIVATE amdhip64 amd_comgr hsa-runtime64::hsa-runtime64) else() target_link_libraries(UnitTests PRIVATE rccl) + target_link_libraries(UnitTestsMultiProcess PRIVATE rt rccl) endif() else() message("Not building unit tests") diff --git a/test/CorrectnessTest.hpp b/test/CorrectnessTest.hpp index 12c5fbe3f1..54f376d93e 100644 --- a/test/CorrectnessTest.hpp +++ b/test/CorrectnessTest.hpp @@ -1,15 +1,28 @@ /************************************************************************* - * Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. * * See LICENSE.txt for license information ************************************************************************/ #ifndef CORRECTNESSTEST_HPP #define CORRECTNESSTEST_HPP +#include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include #include + #include + #include "rccl.h" #include "../include/rccl_bfloat16.h" @@ -88,11 +101,12 @@ namespace CorrectnessTests return numElements * DataTypeToBytes(dataType); } - void Initialize(int const numDevices_, - size_t const numElements_, - ncclDataType_t const dataType_, - bool const inPlace_, - ncclFunc_t const func_ = ncclCollBroadcast) + // To be used in multi-process tests, in the parent process before forking children. + void InitializeRootProcess(int const numDevices_, + size_t const numElements_, + ncclDataType_t const dataType_, + bool const inPlace_, + ncclFunc_t const func_ = ncclCollBroadcast) { numDevices = numDevices_; numElements = numElements_; @@ -100,22 +114,68 @@ namespace CorrectnessTests inPlace = inPlace_; function = func_; - inputs.resize(numDevices); - outputs.resize(numDevices); - expected.resize(numDevices); + for (int i = 0; i < numDevices_; i++) + { + void* ptr = (void*)mmap(NULL, sizeof(void*), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + inputs.push_back(ptr); + } + for (int i = 0; i < numDevices_; i++) + { + void* ptr = (void*)mmap(NULL, sizeof(void*), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + outputs.push_back(ptr); + } + for (int i = 0; i < numDevices_; i++) + { + void* ptr = (void*)mmap(NULL, NumBytes(ncclOutputBuffer), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + expected.push_back(ptr); + } + } + + void Initialize(int const numDevices_, + size_t const numElements_, + ncclDataType_t const dataType_, + bool const inPlace_, + ncclFunc_t const func_ = ncclCollBroadcast, + int const multiProcessRank_ = -1) + { + numDevices = numDevices_; + numElements = numElements_; + dataType = dataType_; + inPlace = inPlace_; + function = func_; + + if (multiProcessRank_ == -1) + { + inputs.resize(numDevices); + outputs.resize(numDevices); + expected.resize(numDevices); + } // Allocate per-device memory - for (int i = 0; i < numDevices; i++) + if (multiProcessRank_ > -1) { - HIP_CALL(hipSetDevice(i)); - HIP_CALL(hipMalloc((void **)&inputs[i], NumBytes(ncclInputBuffer))); + HIP_CALL(hipSetDevice(multiProcessRank_)); + HIP_CALL(hipMalloc((void **)&inputs[multiProcessRank_], NumBytes(ncclInputBuffer))); if (inPlace) - outputs[i] = inputs[i]; + outputs[multiProcessRank_] = inputs[multiProcessRank_]; else - HIP_CALL(hipMalloc((void **)&outputs[i], NumBytes(ncclOutputBuffer))); - - expected[i] = malloc(NumBytes(ncclOutputBuffer)); + HIP_CALL(hipMalloc((void **)&outputs[multiProcessRank_], NumBytes(ncclOutputBuffer))); } + else + { + for (int i = 0; i < numDevices; i++) + { + HIP_CALL(hipSetDevice(i)); + HIP_CALL(hipMalloc((void **)&inputs[i], NumBytes(ncclInputBuffer))); + if (inPlace) + outputs[i] = inputs[i]; + else + HIP_CALL(hipMalloc((void **)&outputs[i], NumBytes(ncclOutputBuffer))); + + expected[i] = malloc(NumBytes(ncclOutputBuffer)); + } + } + } // Explicit memory release to avoid double-free from subDatasets @@ -131,11 +191,19 @@ namespace CorrectnessTests outputs.clear(); } + // Multi-process version of Release() where each process frees its own data + void Release(int rank) + { + if (!inPlace) hipFree(outputs[rank]); + hipFree(inputs[rank]); + } + // Creates a dataset by pointing to an existing dataset // Primarily to allow for testing with different starting byte-alignments void ExtractSubDataset(size_t const startElement, size_t const lastElement, - Dataset& subDataset) + Dataset& subDataset, + int const multiProcessRank = -1) { ASSERT_LE(startElement, lastElement); ASSERT_LT(lastElement, numElements); @@ -150,15 +218,182 @@ namespace CorrectnessTests subDataset.expected.resize(numDevices); size_t const byteOffset = (startElement * DataTypeToBytes(dataType)); - for (int i = 0; i < numDevices; i++) + if (multiProcessRank != -1) { - subDataset.inputs[i] = (int8_t *)inputs[i] + byteOffset; - subDataset.outputs[i] = (int8_t *)outputs[i] + byteOffset; - subDataset.expected[i] = (int8_t *)expected[i] + byteOffset; + subDataset.inputs[multiProcessRank] = (int8_t *)inputs[multiProcessRank] + byteOffset; + subDataset.outputs[multiProcessRank] = (int8_t *)outputs[multiProcessRank] + byteOffset; + subDataset.expected[multiProcessRank] = (int8_t *)expected[multiProcessRank] + byteOffset; + } + else + { + for (int i = 0; i < numDevices; i++) + { + subDataset.inputs[i] = (int8_t *)inputs[i] + byteOffset; + subDataset.outputs[i] = (int8_t *)outputs[i] + byteOffset; + subDataset.expected[i] = (int8_t *)expected[i] + byteOffset; + } } } }; + class Barrier + { + public: + Barrier(){}; + + Barrier(int rank, int numRanks, int uniqueId) + { + this->numRanks = numRanks; + std::string uniqueIdString = std::to_string(uniqueId); + mutexName = std::string("mutex").append(uniqueIdString); + turnstile1Name = std::string("turnstile1").append(uniqueIdString); + turnstile2Name = std::string("turnstile2").append(uniqueIdString); + counterName = std::string("counter").append(uniqueIdString); + tinyBarrierName = std::string("tinyBarrier").append(uniqueIdString); + + size_t smSize = sizeof(sem_t); + + if (rank == 0) + { + InitSemaphore(smSize, mutexName, 1, mutex); + InitSemaphore(smSize, turnstile1Name, 0, turnstile1); + InitSemaphore(smSize, turnstile2Name, 0, turnstile2); + OpenSharedMemoryVariable(sizeof(int), counterName, true, counter); + OpenSharedMemoryVariable(smSize, tinyBarrierName, true, tinyBarrier); + } + else + { + OpenSharedMemoryVariable(smSize, tinyBarrierName, false, tinyBarrier); + OpenSemaphore(smSize, mutexName, mutex); + OpenSemaphore(smSize, turnstile1Name, turnstile1); + OpenSemaphore(smSize, turnstile2Name, turnstile2); + OpenSharedMemoryVariable(sizeof(int), counterName, false, counter); + } + } + + void Wait() + { + Part1(); + Part2(); + } + + ~Barrier() + { + shm_unlink(mutexName.c_str()); + shm_unlink(turnstile1Name.c_str()); + shm_unlink(turnstile2Name.c_str()); + shm_unlink(counterName.c_str()); + shm_unlink(tinyBarrierName.c_str()); + } + + static void ClearShmFiles(int uniqueId) + { + std::string uniqueIdString = std::to_string(uniqueId); + std::vector names; + names.push_back(std::string("mutex").append(uniqueIdString)); + names.push_back(std::string("turnstile1").append(uniqueIdString)); + names.push_back(std::string("turnstile2").append(uniqueIdString)); + names.push_back(std::string("counter").append(uniqueIdString)); + names.push_back(std::string("tinyBarrier").append(uniqueIdString)); + + std::string shmDir = "/dev/shm/"; + for (auto it = names.begin(); it != names.end(); it++) + { + struct stat fileStatus; + std::string shmFullPath = shmDir + *it; + + // Check if shm file already exists; if so, unlink it + if (stat(shmFullPath.c_str(), &fileStatus) == 0) + { + shm_unlink(it->c_str()); + } + } + } + private: + template + void OpenSharedMemoryVariable(size_t size, std::string name, bool create, T& val) + { + int protection = PROT_READ | PROT_WRITE; + int visibility = MAP_SHARED; + int fd; + + if (create) + { + fd = shm_open(name.c_str(), O_CREAT | O_RDWR, S_IRUSR | S_IWUSR); + ftruncate(fd, size); + } + else + { + do + { + // TODO: Error checking so we don't just infinite loop + fd = shm_open(name.c_str(), O_RDWR, S_IRUSR | S_IWUSR); + } while (fd == -1 && errno == ENOENT); + } + val = (T)mmap(NULL, size, protection, visibility, fd, 0); + close(fd); + } + + void InitSemaphore(size_t size, std::string name, int semValue, sem_t*& semaphore) + { + OpenSharedMemoryVariable(size, name, true, semaphore); + sem_init(semaphore, 1, semValue); + } + + void OpenSemaphore(size_t size, std::string name, sem_t*& semaphore) + { + OpenSharedMemoryVariable(size, name, false, semaphore); + } + + void Part1() + { + sem_wait(mutex); + if (++(*counter) == numRanks) + { + sem_post_batch(turnstile1, numRanks); + } + sem_post(mutex); + sem_wait(turnstile1); + } + + void Part2() + { + sem_wait(mutex); + if (--(*counter) == 0) + { + sem_post_batch(turnstile2, numRanks); + } + sem_post(mutex); + sem_wait(turnstile2); + } + + int sem_post_batch(sem_t*& sem, int n) + { + int ret = 0; + for (int i = 0; i < n; i++) + { + ret = sem_post(sem); + if (ret != 0) break; + } + + return ret; + } + int numRanks; + + int* counter; + + sem_t* mutex; + sem_t* turnstile1; + sem_t* turnstile2; + sem_t* tinyBarrier; + + std::string mutexName; + std::string turnstile1Name; + std::string turnstile2Name; + std::string tinyBarrierName; + std::string counterName; + }; + typedef std::tuple dataTypeStrings { {ncclInt8, "int8"}, @@ -216,7 +450,6 @@ namespace CorrectnessTests }; }; protected: - // This code is called per test-tuple void SetUp() override { @@ -466,6 +699,265 @@ namespace CorrectnessTests char* savedEnv[MAX_ENV_TOKENS/2]; }; + class MultiProcessCorrectnessTest : public CorrectnessTest + { + protected: + void SetUp() override + { + // Check for NCCL_COMM_ID env variable (otherwise will not init) + if (!getenv("NCCL_COMM_ID")) + { + printf("Must set NCCL_COMM_ID prior to execution\n"); + exit(0); + } + + // Make the test tuple parameters accessible + std::tie(op, dataType, numElements, numDevices, inPlace, envVals) = GetParam(); + + envString = 0; + numTokens = 0; + if (strcmp(envVals, "")) { + // enable RCCL env vars testing + setenv("RCCL_TEST_ENV_VARS", "ENABLE", 1); + envString = strdup(envVals); + tokens[numTokens] = strtok(envString, "=, "); + numTokens++; + while (tokens[numTokens-1] != NULL && numTokens < MAX_ENV_TOKENS) + tokens[numTokens++] = strtok(NULL, "=, "); + for (int i = 0; i < numTokens/2; i++) { + char *val = getenv(tokens[i*2]); + if (val) + savedEnv[i] = strdup(val); + else + savedEnv[i] = 0; + setenv(tokens[i*2], tokens[i*2+1], 1); + fprintf(stdout, "[ ] setting environmental variable %s to %s\n", tokens[i*2], getenv(tokens[i*2])); + } + } + + comms.resize(numDevices); + streams.resize(numDevices); + } + + void TearDown() override + { + // Restore env vars after tests + for (int i = 0; i < numTokens/2; i++) { + if (savedEnv[i]) { + setenv(tokens[i*2], savedEnv[i], 1); + fprintf(stdout, "[ ] restored environmental variable %s to %s\n", tokens[i*2], getenv(tokens[i*2])); + free(savedEnv[i]); + } + else { + unsetenv(tokens[i*2]); + fprintf(stdout, "[ ] removed environmental variable %s\n", tokens[i*2]); + } + } + // Cleanup + unsetenv("RCCL_TEST_ENV_VARS"); + free(envString); + } + + void SetUpPerProcessHelper(int rank, ncclComm_t& comm, hipStream_t& stream) + { + // Check for NCCL_COMM_ID env variable (otherwise will not init) + if (!getenv("NCCL_COMM_ID")) + { + printf("Must set NCCL_COMM_ID prior to execution\n"); + exit(0); + } + + // Collect the number of available GPUs + HIP_CALL(hipGetDeviceCount(&numDevicesAvailable)); + + // Only proceed with testing if there are enough GPUs + if (numDevices > numDevicesAvailable) + { + fprintf(stdout, "[ SKIPPED ] Test requires %d devices (only %d available)\n", + numDevices, numDevicesAvailable); + + // Modify the number of devices so that tear-down doesn't occur + // This is temporary until GTEST_SKIP() becomes available + numDevices = 0; + numDevicesAvailable = -1; + return; + } + + HIP_CALL(hipSetDevice(rank)); + HIP_CALL(hipStreamCreate(&stream)); + + ncclUniqueId id; + NCCL_CALL(ncclGetUniqueId(&id)); + + ncclResult_t res; + res = ncclCommInitRank(&comm, numDevices, id, rank); // change to local comm and stream per process + + if (res != ncclSuccess) + { + printf("Test failure:%s %d '%s' numRanks:%d\n", __FILE__,__LINE__,ncclGetErrorString(res), numDevices); + ASSERT_EQ(res, hipSuccess); + } + } + + // To be called by each process individually + void SetUpPerProcess(int rank, ncclFunc_t const func, ncclComm_t& comm, hipStream_t& stream, Dataset& dataset) + { + SetUpPerProcessHelper(rank, comm, stream); + dataset.Initialize(numDevices, numElements, dataType, inPlace, func, rank); + } + + // To be called by each process/rank individually (see GroupCallsMultiProcess) + void SetUpPerProcess(int rank, std::vector const& func, ncclComm_t& comm, hipStream_t& stream, std::vector& datasets) + { + SetUpPerProcessHelper(rank, comm, stream); + + for (int i = 0; i < datasets.size(); i++) + { + datasets[i]->Initialize(numDevices, numElements, dataType, inPlace, func[i], rank); + } + } + + // Clean up per process + void TearDownPerProcess(ncclComm_t& comm, hipStream_t& stream) + { + NCCL_CALL(ncclCommDestroy(comm)); + HIP_CALL(hipStreamDestroy(stream)); + } + + void FillDatasetWithPattern(Dataset& dataset, int rank) + { + int8_t* arrayI1 = (int8_t *)malloc(dataset.NumBytes(ncclInputBuffer)); + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + // NOTE: Currently half-precision float tests are unsupported due to half being supported + // on GPU only and not host + + // Fills input data[i][j] with (i + j) % 6 + // - Keeping range small to reduce likelihood of overflow + // - Sticking with floating points values that are perfectly representable + + for (int j = 0; j < dataset.NumBytes(ncclInputBuffer)/DataTypeToBytes(dataset.dataType); j++) + { + int valueI = (rank + j) % 6; + float valueF = (float)valueI; + + switch (dataset.dataType) + { + case ncclInt8: arrayI1[j] = valueI; break; + case ncclUint8: arrayU1[j] = valueI; break; + case ncclInt32: arrayI4[j] = valueI; break; + case ncclUint32: arrayU4[j] = valueI; break; + case ncclInt64: arrayI8[j] = valueI; break; + case ncclUint64: arrayU8[j] = valueI; break; + case ncclFloat32: arrayF4[j] = valueF; break; + case ncclFloat64: arrayF8[j] = valueF; break; + case ncclBfloat16: arrayB2[j] = rccl_bfloat16(valueF); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + } + + HIP_CALL(hipSetDevice(rank)); + HIP_CALL(hipMemcpy(dataset.inputs[rank], arrayI1, dataset.NumBytes(ncclInputBuffer), hipMemcpyHostToDevice)); + + // Fills output data[i][j] with 0 (if not inplace) + if (!dataset.inPlace) + HIP_CALL(hipMemset(dataset.outputs[rank], 0, dataset.NumBytes(ncclOutputBuffer))); + + free(arrayI1); + } + + void ValidateResults(Dataset const& dataset, int rank, int root = 0) const + { + int8_t* outputI1 = (int8_t *)malloc(dataset.NumBytes(ncclOutputBuffer)); + uint8_t* outputU1 = (uint8_t *)outputI1; + int32_t* outputI4 = (int32_t *)outputI1; + uint32_t* outputU4 = (uint32_t *)outputI1; + int64_t* outputI8 = (int64_t *)outputI1; + uint64_t* outputU8 = (uint64_t *)outputI1; + float* outputF4 = (float *)outputI1; + double* outputF8 = (double *)outputI1; + rccl_bfloat16* outputB2 = (rccl_bfloat16 *)outputI1; + + bool isMatch = true; + + // Loop over each device's output and compare it to the expected output + // (Each collective operation computes its own expected results) + + // only output on root rank is valid for gather collective + if (dataset.function == ncclCollGather && rank != root) + return; + HIP_CALL(hipMemcpy(outputI1, dataset.outputs[rank], dataset.NumBytes(ncclOutputBuffer), hipMemcpyDeviceToHost)); + + int8_t* expectedI1 = (int8_t *)dataset.expected[rank]; + uint8_t* expectedU1 = (uint8_t *)expectedI1; + int32_t* expectedI4 = (int32_t *)expectedI1; + uint32_t* expectedU4 = (uint32_t *)expectedI1; + int64_t* expectedI8 = (int64_t *)expectedI1; + uint64_t* expectedU8 = (uint64_t *)expectedI1; + float* expectedF4 = (float *)expectedI1; + double* expectedF8 = (double *)expectedI1; + rccl_bfloat16* expectedB2 = (rccl_bfloat16 *)expectedI1; + + for (int j = 0; j < dataset.numElements && isMatch; j++) + { + switch (dataset.dataType) + { + case ncclInt8: isMatch &= (outputI1[j] == expectedI1[j]); break; + case ncclUint8: isMatch &= (outputU1[j] == expectedU1[j]); break; + case ncclInt32: isMatch &= (outputI4[j] == expectedI4[j]); break; + case ncclUint32: isMatch &= (outputU4[j] == expectedU4[j]); break; + case ncclInt64: isMatch &= (outputI8[j] == expectedI8[j]); break; + case ncclUint64: isMatch &= (outputU8[j] == expectedU8[j]); break; + case ncclFloat32: isMatch &= (outputF4[j] == expectedF4[j]); break; + case ncclFloat64: isMatch &= (outputF8[j] == expectedF8[j]); break; + case ncclBfloat16: isMatch &= (outputB2[j] == expectedB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + + if (!isMatch) + { + switch (dataset.dataType) + { + case ncclInt8: + printf("Output %d. Expected %d on device %d[%d]\n", outputI1[j], expectedI1[j], rank, j); break; + case ncclUint8: + printf("Output %u. Expected %u on device %d[%d]\n", outputU1[j], expectedU1[j], rank, j); break; + case ncclInt32: + printf("Output %d. Expected %d on device %d[%d]\n", outputI4[j], expectedI4[j], rank, j); break; + case ncclUint32: + printf("Output %u. Expected %u on device %d[%d]\n", outputU4[j], expectedU4[j], rank, j); break; + case ncclInt64: + printf("Output %ld. Expected %ld on device %d[%d]\n", outputI8[j], expectedI8[j], rank, j); break; + case ncclUint64: + printf("Output %lu. Expected %lu on device %d[%d]\n", outputU8[j], expectedU8[j], rank, j); break; + case ncclFloat32: + printf("Output %f. Expected %f on device %d[%d]\n", outputF4[j], expectedF4[j], rank, j); break; + case ncclFloat64: + printf("Output %lf. Expected %lf on device %d[%d]\n", outputF8[j], expectedF8[j], rank, j); break; + case ncclBfloat16: + printf("Output %f. Expected %f on device %d[%d]\n", (float)outputB2[j], (float)expectedB2[j], rank, j); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + } + } + ASSERT_EQ(isMatch, true); + } + }; + + std::string GenerateTestNameString(testing::TestParamInfo& info); } #endif diff --git a/test/test_AllGatherMultiProcess.cpp b/test/test_AllGatherMultiProcess.cpp new file mode 100644 index 0000000000..370f12cc71 --- /dev/null +++ b/test/test_AllGatherMultiProcess.cpp @@ -0,0 +1,95 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "test_AllGatherMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(AllGatherMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllGather); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestAllGather(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestAllGather(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestAllGather(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestAllGather(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(AllGatherMultiProcessCorrectnessSweep, + AllGatherMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator (not used) + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(3072, 3145728), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_AllGatherMultiProcess.hpp b/test/test_AllGatherMultiProcess.hpp new file mode 100644 index 0000000000..b881d4b7b9 --- /dev/null +++ b/test/test_AllGatherMultiProcess.hpp @@ -0,0 +1,69 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_ALLGATHER_MULTI_PROCESS_HPP +#define TEST_ALLGATHER_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class AllGatherMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, int const rank, int const numDevices) + { + size_t const byteCount = dataset.NumBytes() / dataset.numDevices; + + HIP_CALL(hipMemcpy(static_cast(dataset.expected[0]) + rank * byteCount, (int8_t *)dataset.inputs[rank] + (rank * byteCount), + byteCount, hipMemcpyDeviceToHost)); + + barrier.Wait(); + // Rank 0 sends answer to other ranks + if (rank == 0) + { + for (int i = 0; i < dataset.numDevices; i++) + { + if (i == rank) continue; + memcpy(dataset.expected[i], dataset.expected[0], dataset.NumBytes()); + } + } + } + + void TestAllGather(int rank, Dataset& dataset) + { + // Prepare input / output / expected results + SetUpPerProcess(rank, ncclCollAllGather, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + if (numElements % numDevices != 0) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, barrier, rank, numDevices); + + size_t const byteCount = dataset.NumBytes() / numDevices; + size_t const sendCount = dataset.numElements / numDevices; + + // Launch the reduction (1 process per GPU) + ncclAllGather((int8_t *)dataset.inputs[rank] + (rank * byteCount), + dataset.outputs[rank], sendCount, + dataType, comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank); + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_AllReduceMultiProcess.cpp b/test/test_AllReduceMultiProcess.cpp new file mode 100644 index 0000000000..2a5500e629 --- /dev/null +++ b/test/test_AllReduceMultiProcess.cpp @@ -0,0 +1,96 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_AllReduceMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(AllReduceMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllReduce); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestAllReduce(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestAllReduce(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestAllReduce(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestAllReduce(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(AllReduceMultiProcessCorrectnessSweep, + AllReduceMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator + testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_AllReduceMultiProcess.hpp b/test/test_AllReduceMultiProcess.hpp new file mode 100644 index 0000000000..2aa8e27087 --- /dev/null +++ b/test/test_AllReduceMultiProcess.hpp @@ -0,0 +1,105 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_ALLREDUCE_MULTI_PROCESS_HPP +#define TEST_ALLREDUCE_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class AllReduceMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const rank) + { + // Copy all inputs to expected arrays temporarily to perform reduction on host + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + + barrier.Wait(); + // Allocate temporary host array to accumulate results + int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes()); + uint8_t* resultU1 = (uint8_t *)resultI1; + int32_t* resultI4 = (int32_t *)resultI1; + uint32_t* resultU4 = (uint32_t *)resultI1; + int64_t* resultI8 = (int64_t *)resultI1; + uint64_t* resultU8 = (uint64_t *)resultI1; + float* resultF4 = (float *)resultI1; + double* resultF8 = (double *)resultI1; + rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; + + // Initialize the result with the first device's array + memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); + barrier.Wait(); + + // Perform reduction + for (int i = 1; i < dataset.numDevices; i++) + { + int8_t* arrayI1 = (int8_t *)dataset.expected[i]; + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + for (int j = 0; j < dataset.numElements; j++) + { + switch (dataset.dataType) + { + case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; + case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; + case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; + case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; + case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; + case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; + case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; + case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; + case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + } + } + barrier.Wait(); + // Copy results into expected array + memcpy(dataset.expected[rank], resultI1, dataset.NumBytes()); + + free(resultI1); + } + + void TestAllReduce(int rank, Dataset& dataset) + { + SetUpPerProcess(rank, ncclCollAllReduce, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, barrier, op, rank); + + // Launch the reduction + ncclAllReduce(dataset.inputs[rank], dataset.outputs[rank], + numElements, dataType, op, comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank); + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_AllToAllMultiProcess.cpp b/test/test_AllToAllMultiProcess.cpp new file mode 100644 index 0000000000..e607861833 --- /dev/null +++ b/test/test_AllToAllMultiProcess.cpp @@ -0,0 +1,95 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_AllToAllMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(AllToAllMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollAllToAll); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestAllToAll(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestAllToAll(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestAllToAll(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestAllToAll(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(AllToAllMultiProcessCorrectnessSweep, + AllToAllMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator is not used + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false), + testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_AllToAllMultiProcess.hpp b/test/test_AllToAllMultiProcess.hpp new file mode 100644 index 0000000000..8fbd56f8a0 --- /dev/null +++ b/test/test_AllToAllMultiProcess.hpp @@ -0,0 +1,53 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_ALLTOALL_MULTI_PROCESS_HPP +#define TEST_ALLTOALL_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class AllToAllMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, int const rank) + { + for (int i = 0; i < dataset.numDevices; i++) + { + HIP_CALL(hipMemcpy((int8_t *)dataset.expected[i]+dataset.NumBytes()*rank, (int8_t *)dataset.inputs[rank]+dataset.NumBytes()*i, + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } + } + + void TestAllToAll(int rank, Dataset& dataset) + { + SetUpPerProcess(rank, ncclCollAllToAll, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, rank); + + // Launch the reduction + ncclAllToAll(dataset.inputs[rank], + dataset.outputs[rank], + numElements, dataType, + comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank); + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_BroadcastMultiProcess.cpp b/test/test_BroadcastMultiProcess.cpp new file mode 100644 index 0000000000..1d43c6440a --- /dev/null +++ b/test/test_BroadcastMultiProcess.cpp @@ -0,0 +1,103 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_BroadcastMultiProcess.hpp" + +#include +#include +#include +#include +#include +#include + +namespace CorrectnessTests +{ + TEST_P(BroadcastMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollBroadcast); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestBroadcast(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestBroadcast(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestBroadcast(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestBroadcast(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(BroadcastMultiProcessCorrectnessSweep, + BroadcastMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator is not used + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_BroadcastMultiProcess.hpp b/test/test_BroadcastMultiProcess.hpp new file mode 100644 index 0000000000..0075faeeb7 --- /dev/null +++ b/test/test_BroadcastMultiProcess.hpp @@ -0,0 +1,68 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_BROADCAST_MULTI_PROCESS_HPP +#define TEST_BROADCAST_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class BroadcastMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, int const root, int const rank) + { + // Root has the answer; share it via host memcpy's + if (rank == root) + { + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + for (int i = 0; i < dataset.numDevices; i++) + { + if (i == rank) continue; + memcpy(dataset.expected[i], dataset.expected[root], dataset.NumBytes()); + } + } + } + + void TestBroadcast(int rank, Dataset& dataset) + { + SetUpPerProcess(rank, ncclCollBroadcast, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Test each possible root + for (int root = 0; root < numDevices; root++) + { + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, root, rank); + + // Launch the reduction (1 process per GPU) + ncclResult_t res = ncclBroadcast(dataset.inputs[rank], + dataset.outputs[rank], + numElements, dataType, + root, comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank); + + // Ensure all processes have finished current iteration before proceeding + barrier.Wait(); + } + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_CombinedCallsMultiProcess.cpp b/test/test_CombinedCallsMultiProcess.cpp new file mode 100644 index 0000000000..f7a4f10b4e --- /dev/null +++ b/test/test_CombinedCallsMultiProcess.cpp @@ -0,0 +1,112 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "test_CombinedCallsMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(CombinedCallsMultiProcessCorrectnessTest, Correctness) + { + // Important: Make sure the order of ncclFunc_t's here match the order of ncclFunc_ts + // as they appear in TestCombinedCalls() + std::vector ncclFuncs; + ncclFuncs.push_back(ncclCollAllGather); + ncclFuncs.push_back(ncclCollAllReduce); + ncclFuncs.push_back(ncclCollBroadcast); + ncclFuncs.push_back(ncclCollReduce); + ncclFuncs.push_back(ncclCollReduceScatter); + + // Create multiple datasets for combined operation + std::vector datasets(ncclFuncs.size()); + for (int i = 0; i < datasets.size(); i++) + { + datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]); + } + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestCombinedCalls(0, datasets, ncclFuncs); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestCombinedCalls(1, datasets, ncclFuncs); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestCombinedCalls(2, datasets, ncclFuncs); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestCombinedCalls(3, datasets, ncclFuncs); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + for (int i = 0; i < datasets.size(); i++) + { + munmap(datasets[i], sizeof(Dataset)); + } + } + + INSTANTIATE_TEST_SUITE_P(CombinedCallsMultiProcessCorrectnessSweep, + CombinedCallsMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator (not used) + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(3072, 3145728), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_CombinedCallsMultiProcess.hpp b/test/test_CombinedCallsMultiProcess.hpp new file mode 100644 index 0000000000..e48902514f --- /dev/null +++ b/test/test_CombinedCallsMultiProcess.hpp @@ -0,0 +1,78 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef TEST_COMBINEDCALLS_MULTI_PROCESS_HPP +#define TEST_COMBINEDCALLS_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +#include "test_AllGatherMultiProcess.hpp" +#include "test_AllReduceMultiProcess.hpp" +#include "test_BroadcastMultiProcess.hpp" +#include "test_ReduceMultiProcess.hpp" +#include "test_ReduceScatterMultiProcess.hpp" + +namespace CorrectnessTests +{ + class CombinedCallsMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + void TestCombinedCalls(int rank, std::vector& datasets, std::vector const& funcs) + { + SetUpPerProcess(rank, funcs, comms[rank], streams[rank], datasets); + + if (numDevices > numDevicesAvailable) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Compute expected results for each dataset in combined + int const root = 0; + AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, rank, numDevices); + AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, rank); + BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, rank); + ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, rank); + ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, rank); + + size_t const byteCount = datasets[0]->NumBytes() / numDevices; + size_t const elemCount = numElements / numDevices; + + ncclAllGather((int8_t *)datasets[0]->inputs[rank] + (rank * byteCount), + datasets[0]->outputs[rank], elemCount, + dataType, comms[rank], streams[rank]); + + ncclAllReduce(datasets[1]->inputs[rank], datasets[1]->outputs[rank], + numElements, dataType, op, comms[rank], streams[rank]); + + ncclBroadcast(datasets[2]->inputs[rank], + datasets[2]->outputs[rank], + numElements, dataType, + root, comms[rank], streams[rank]); + + ncclReduce(datasets[3]->inputs[rank], + datasets[3]->outputs[rank], + numElements, dataType, op, + root, comms[rank], streams[rank]); + + ncclReduceScatter(datasets[4]->inputs[rank], + (int8_t *)datasets[4]->outputs[rank] + (rank * byteCount), + elemCount, dataType, op, + comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results for each collective in the combined + for (int i = 0; i < 5; i++) + { + ValidateResults(*datasets[i], rank); + barrier.Wait(); + datasets[i]->Release(rank); + } + } + }; +} + +#endif diff --git a/test/test_GatherMultiProcess.cpp b/test/test_GatherMultiProcess.cpp new file mode 100644 index 0000000000..7627a15e89 --- /dev/null +++ b/test/test_GatherMultiProcess.cpp @@ -0,0 +1,96 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_GatherMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(GatherMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollGather); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestGather(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestGather(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestGather(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestGather(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(GatherMultiProcessCorrectnessSweep, + GatherMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator is not used + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false), + testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_GatherMultiProcess.hpp b/test/test_GatherMultiProcess.hpp new file mode 100644 index 0000000000..e26c7f319b --- /dev/null +++ b/test/test_GatherMultiProcess.hpp @@ -0,0 +1,59 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_GATHER_MULTI_PROCESS_HPP +#define TEST_GATHER_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class GatherMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, int const root, int const rank) + { + HIP_CALL(hipMemcpy((int8_t *)dataset.expected[root]+dataset.NumBytes()*rank, dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } + + void TestGather(int rank, Dataset& dataset) + { + SetUpPerProcess(rank, ncclCollGather, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Test each possible root + for (int root = 0; root < numDevices; root++) + { + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, root, rank); + + // Launch the reduction (1 process per GPU) + ncclGather(dataset.inputs[rank], + dataset.outputs[rank], + numElements, dataType, + root, comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank, root); + + // Ensure all processes have finished current iteration before proceeding + barrier.Wait(); + } + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_GroupCallsMultiProcess.cpp b/test/test_GroupCallsMultiProcess.cpp new file mode 100644 index 0000000000..ce7b7ecfa7 --- /dev/null +++ b/test/test_GroupCallsMultiProcess.cpp @@ -0,0 +1,127 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "test_GroupCallsMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(GroupCallsMultiProcessCorrectnessTest, Correctness) + { + // Important: Make sure the order of ncclFunc_t's here match the order of ncclFunc_ts + // as they appear in TestGroupCalls() + std::vector ncclFuncs; + ncclFuncs.push_back(ncclCollAllGather); + ncclFuncs.push_back(ncclCollAllReduce); + ncclFuncs.push_back(ncclCollBroadcast); + ncclFuncs.push_back(ncclCollReduce); + ncclFuncs.push_back(ncclCollReduceScatter); + + // Create multiple datasets for combined operation + std::vector datasets(ncclFuncs.size()); + for (int i = 0; i < datasets.size(); i++) + { + datasets[i] = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + datasets[i]->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclFuncs[i]); + } + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 4) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 4) || (pid2 > 0 && pid3 > 0 && numDevices > 4)) + { + // Process 0 + std::vector ranks; + ranks.push_back(0); + ranks.push_back(1); + + TestGroupCalls(0, ranks, datasets, ncclFuncs); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 4) || (pid2 == 0 && pid3 > 0 && numDevices > 4)) + { + // Process 1 + std::vector ranks; + ranks.push_back(2); + ranks.push_back(3); + TestGroupCalls(1, ranks, datasets, ncclFuncs); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices == 8) + { + // Process 2 (available when numDevices == 8) + std::vector ranks; + ranks.push_back(4); + ranks.push_back(5); + + TestGroupCalls(2, ranks, datasets, ncclFuncs); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 8) + { + // Process 3 (available when numDevices == 8) + std::vector ranks; + ranks.push_back(6); + ranks.push_back(7); + + TestGroupCalls(3, ranks, datasets, ncclFuncs); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + for (int i = 0; i < datasets.size(); i++) + { + munmap(datasets[i], sizeof(Dataset)); + } + } + + INSTANTIATE_TEST_SUITE_P(GroupCallsMultiProcessCorrectnessSweep, + GroupCallsMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator (not used) + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(3072, 3145728), + // Number of devices + testing::Values(4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_GroupCallsMultiProcess.hpp b/test/test_GroupCallsMultiProcess.hpp new file mode 100644 index 0000000000..bc38688923 --- /dev/null +++ b/test/test_GroupCallsMultiProcess.hpp @@ -0,0 +1,128 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef TEST_GROUPCALLS_MULTI_PROCESS_HPP +#define TEST_GROUPCALLS_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" +#include "test_AllGatherMultiProcess.hpp" +#include "test_AllReduceMultiProcess.hpp" +#include "test_BroadcastMultiProcess.hpp" +#include "test_ReduceMultiProcess.hpp" +#include "test_ReduceScatterMultiProcess.hpp" + +#include + +namespace CorrectnessTests +{ + class GroupCallsMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + void TestGroupCalls(int process, std::vector const& ranks, std::vector& datasets, std::vector const& funcs) + { + if (numDevices > numDevicesAvailable) return; + + for (int i = 0; i < ranks.size(); i++) + { + SetUpPerProcess(ranks[i], funcs, comms[ranks[i]], streams[ranks[i]], datasets); + } + + int numProcesses = numDevices / ranks.size(); + Barrier barrier(process, numProcesses, std::atoi(getenv("NCCL_COMM_ID"))); + + int const root = 0; + for (int i = 0; i < ranks.size(); i++) + { + AllGatherMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[0], barrier, numDevices, ranks[i]); + AllReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[1], barrier, op, ranks[i]); + BroadcastMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[2], root, ranks[i]); + ReduceMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[3], barrier, op, root, ranks[i]); + ReduceScatterMultiProcessCorrectnessTest::ComputeExpectedResults(*datasets[4], barrier, op, ranks[i]); + } + barrier.Wait(); + + ncclGroupStart(); + + // AllGather + size_t const byteCount = datasets[0]->NumBytes() / numDevices; + size_t const elemCount = numElements / numDevices; + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + ncclAllGather((int8_t *)datasets[0]->inputs[rank] + (rank * byteCount), + datasets[0]->outputs[rank], elemCount, + dataType, comms[rank], streams[rank]); + } + + // AllReduce + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + ncclAllReduce(datasets[1]->inputs[rank], datasets[1]->outputs[rank], + numElements, dataType, op, comms[rank], streams[rank]); + } + + // Broadcast + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + ncclBroadcast(datasets[2]->inputs[rank], + datasets[2]->outputs[rank], + numElements, dataType, + root, comms[rank], streams[rank]); + } + + // Reduce + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + ncclReduce(datasets[3]->inputs[rank], + datasets[3]->outputs[rank], + numElements, dataType, op, + root, comms[rank], streams[rank]); + } + + // ReduceScatter + for (int i = 0; i < ranks.size(); i++) + { + int rank = ranks[i]; + ncclReduceScatter(datasets[4]->inputs[rank], + (int8_t *)datasets[4]->outputs[rank] + (i * byteCount), + elemCount, dataType, op, + comms[rank], streams[rank]); + } + + // Signal end of group call + ncclGroupEnd(); + + for (int i = 0; i < ranks.size(); i++) + { + HIP_CALL(hipSetDevice(ranks[i])); + HIP_CALL(hipStreamSynchronize(streams[ranks[i]])); + } + + for (int i = 0; i < funcs.size(); i++) + { + for (int j = 0; j < ranks.size(); j++) + { + ValidateResults(*datasets[i], ranks[j]); + } + barrier.Wait(); + for (int j = 0; j < ranks.size(); j++) + { + datasets[i]->Release(ranks[j]); + } + } + + for (int i = 0; i < ranks.size(); i++) + { + TearDownPerProcess(comms[ranks[i]], streams[ranks[i]]); + } + } + }; +} + +#endif diff --git a/test/test_ReduceMultiProcess.cpp b/test/test_ReduceMultiProcess.cpp new file mode 100644 index 0000000000..bf495d382a --- /dev/null +++ b/test/test_ReduceMultiProcess.cpp @@ -0,0 +1,96 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_ReduceMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(ReduceMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollReduce); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestReduce(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestReduce(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestReduce(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestReduce(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(ReduceMultiProcessCorrectnessSweep, + ReduceMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator + testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_ReduceMultiProcess.hpp b/test/test_ReduceMultiProcess.hpp new file mode 100644 index 0000000000..65977608b8 --- /dev/null +++ b/test/test_ReduceMultiProcess.hpp @@ -0,0 +1,115 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_REDUCE_MULTI_PROCESS_HPP +#define TEST_REDUCE_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class ReduceMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const root, int const rank) + { + // Copy all inputs to expected arrays temporarily to perform reduction on host + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + barrier.Wait(); + + if (rank == root) + { + // Allocate temporary host array to accumulate results + int8_t* resultI1 = (int8_t *)malloc(dataset.NumBytes()); + uint8_t* resultU1 = (uint8_t *)resultI1; + int32_t* resultI4 = (int32_t *)resultI1; + uint32_t* resultU4 = (uint32_t *)resultI1; + int64_t* resultI8 = (int64_t *)resultI1; + uint64_t* resultU8 = (uint64_t *)resultI1; + float* resultF4 = (float *)resultI1; + double* resultF8 = (double *)resultI1; + rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; + + // Initialize the result with the first device's array + memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); + + // Perform reduction on the other device arrays + for (int i = 1; i < dataset.numDevices; i++) + { + int8_t* arrayI1 = (int8_t *)dataset.expected[i]; + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + for (int j = 0; j < dataset.numElements; j++) + { + switch (dataset.dataType) + { + case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; + case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; + case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; + case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; + case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; + case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; + case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; + case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; + case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + } + } + memcpy(dataset.expected[root], resultI1, dataset.NumBytes()); + free(resultI1); + barrier.Wait(); + } + else + { + barrier.Wait(); + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], dataset.NumBytes(), hipMemcpyDeviceToHost)); + } + } + + void TestReduce(int rank, Dataset& dataset) + { + SetUpPerProcess(rank, ncclCollReduce, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Test each possible root + for (int root = 0; root < numDevices; root++) + { + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, barrier, op, root, rank); + // Launch the reduction (1 process per GPU) + ncclResult_t res = ncclReduce(dataset.inputs[rank], + dataset.outputs[rank], + numElements, dataType, op, + root, comms[rank], streams[rank]); + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + // Check results + ValidateResults(dataset, rank); + // Ensure all processes have finished current iteration before proceeding + barrier.Wait(); + } + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_ReduceScatterMultiProcess.cpp b/test/test_ReduceScatterMultiProcess.cpp new file mode 100644 index 0000000000..48c4b0cb9b --- /dev/null +++ b/test/test_ReduceScatterMultiProcess.cpp @@ -0,0 +1,96 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_ReduceScatterMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(ReduceScatterMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollReduceScatter); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestReduceScatter(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestReduceScatter(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestReduceScatter(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestReduceScatter(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(ReduceScatterMultiProcessCorrectnessSweep, + ReduceScatterMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator + testing::Values(ncclSum, ncclProd, ncclMax, ncclMin), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(3072, 3145728), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false, true), + testing::Values("")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_ReduceScatterMultiProcess.hpp b/test/test_ReduceScatterMultiProcess.hpp new file mode 100644 index 0000000000..a38ddf2148 --- /dev/null +++ b/test/test_ReduceScatterMultiProcess.hpp @@ -0,0 +1,128 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_REDUCE_SCATTER_MULTI_PROCESS_HPP +#define TEST_REDUCE_SCATTER_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class ReduceScatterMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, Barrier& barrier, ncclRedOp_t const op, int const rank) + { + // Copy all inputs to expected arrays temporarily to perform reduction on host + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.inputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + barrier.Wait(); + + // Have rank 0 do the expected calculation, then send results to other processes + int8_t* resultI1; + if (rank == 0) + { + // Allocate temporary host array to accumulate results + resultI1 = (int8_t *)malloc(dataset.NumBytes()); + uint8_t* resultU1 = (uint8_t *)resultI1; + int32_t* resultI4 = (int32_t *)resultI1; + uint32_t* resultU4 = (uint32_t *)resultI1; + int64_t* resultI8 = (int64_t *)resultI1; + uint64_t* resultU8 = (uint64_t *)resultI1; + float* resultF4 = (float *)resultI1; + double* resultF8 = (double *)resultI1; + rccl_bfloat16* resultB2 = (rccl_bfloat16 *)resultI1; + + // Initialize the result with the first device's array + memcpy(resultI1, dataset.expected[0], dataset.NumBytes()); + + // Perform reduction on the other device arrays + for (int i = 1; i < dataset.numDevices; i++) + { + int8_t* arrayI1 = (int8_t *)dataset.expected[i]; + uint8_t* arrayU1 = (uint8_t *)arrayI1; + int32_t* arrayI4 = (int32_t *)arrayI1; + uint32_t* arrayU4 = (uint32_t *)arrayI1; + int64_t* arrayI8 = (int64_t *)arrayI1; + uint64_t* arrayU8 = (uint64_t *)arrayI1; + float* arrayF4 = (float *)arrayI1; + double* arrayF8 = (double *)arrayI1; + rccl_bfloat16* arrayB2 = (rccl_bfloat16 *)arrayI1; + + for (int j = 0; j < dataset.numElements; j++) + { + switch (dataset.dataType) + { + case ncclInt8: resultI1[j] = ReduceOp(op, resultI1[j], arrayI1[j]); break; + case ncclUint8: resultU1[j] = ReduceOp(op, resultU1[j], arrayU1[j]); break; + case ncclInt32: resultI4[j] = ReduceOp(op, resultI4[j], arrayI4[j]); break; + case ncclUint32: resultU4[j] = ReduceOp(op, resultU4[j], arrayU4[j]); break; + case ncclInt64: resultI8[j] = ReduceOp(op, resultI8[j], arrayI8[j]); break; + case ncclUint64: resultU8[j] = ReduceOp(op, resultU8[j], arrayU8[j]); break; + case ncclFloat32: resultF4[j] = ReduceOp(op, resultF4[j], arrayF4[j]); break; + case ncclFloat64: resultF8[j] = ReduceOp(op, resultF8[j], arrayF8[j]); break; + case ncclBfloat16: resultB2[j] = ReduceOp(op, resultB2[j], arrayB2[j]); break; + default: + fprintf(stderr, "[ERROR] Unsupported datatype\n"); + exit(0); + } + } + } + } + barrier.Wait(); + // Copy results into expected arrays + size_t const byteCount = dataset.NumBytes() / dataset.numDevices; + + HIP_CALL(hipMemcpy(dataset.expected[rank], dataset.outputs[rank], + dataset.NumBytes(), hipMemcpyDeviceToHost)); + + barrier.Wait(); + + if (rank == 0) + { + for (int i = 0; i < dataset.numDevices; i++) + memcpy((int8_t *)dataset.expected[i] + (i * byteCount), + resultI1 + (i * byteCount), byteCount); + + free(resultI1); + } + } + + void TestReduceScatter(int rank, Dataset& dataset) + { + // Prepare input / output / expected results + SetUpPerProcess(rank, ncclCollAllGather, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + if (numElements % numDevices != 0) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + ComputeExpectedResults(dataset, barrier, op, rank); + + size_t const byteCount = dataset.NumBytes() / numDevices; + size_t const recvCount = dataset.numElements / numDevices; + + // Launch the reduction (1 process per GPU) + ncclReduceScatter(dataset.inputs[rank], + (int8_t *)dataset.outputs[rank] + (rank * byteCount), + recvCount, dataType, op, + comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank); + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif diff --git a/test/test_ScatterMultiProcess.cpp b/test/test_ScatterMultiProcess.cpp new file mode 100644 index 0000000000..4211ace652 --- /dev/null +++ b/test/test_ScatterMultiProcess.cpp @@ -0,0 +1,96 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include "test_ScatterMultiProcess.hpp" + +namespace CorrectnessTests +{ + TEST_P(ScatterMultiProcessCorrectnessTest, Correctness) + { + Dataset* dataset = (Dataset*)mmap(NULL, sizeof(Dataset), PROT_READ|PROT_WRITE, MAP_SHARED|MAP_ANONYMOUS, -1, 0); + dataset->InitializeRootProcess(numDevices, numElements, dataType, inPlace, ncclCollScatter); + Barrier::ClearShmFiles(std::atoi(getenv("NCCL_COMM_ID"))); + + int pid1 = 0; + int pid2 = 0; + int pid3 = 0; + pid1 = fork(); + + // From this point on, ignore original process as we cannot have it create a HIP context + if (pid1 == 0) + { + pid2 = fork(); + if (numDevices > 2) + { + pid3 = fork(); + } + if ((pid2 > 0 && pid3 == 0 && numDevices == 2) || (pid2 > 0 && pid3 > 0 && numDevices > 2)) + { + // Process 0 + TestScatter(0, *dataset); + if (pid3 > 0) + { + waitpid(pid3, NULL, 0); + } + } + else if ((pid2 == 0 && pid3 == 0 && numDevices == 2) || (pid2 == 0 && pid3 > 0 && numDevices > 2)) + { + // Process 1 + TestScatter(1, *dataset); + if (numDevices > 2) + { + waitpid(pid3, NULL, 0); + } + exit(0); + } + else if (pid2 > 0 && pid3 == 0 && numDevices > 2) + { + // Process 2 (available when numDevices > 2) + TestScatter(2, *dataset); + exit(0); + } + else if (pid2 == 0 && pid3 == 0 && numDevices == 4) + { + // Process 3 (available when numDevices == 4) + TestScatter(3, *dataset); + exit(0); + } + else + { + exit(0); + } + waitpid(pid2, NULL, 0); + exit(0); + } + waitpid(pid1, NULL, 0); + munmap(dataset, sizeof(Dataset)); + } + + INSTANTIATE_TEST_SUITE_P(ScatterMultiProcessCorrectnessSweep, + ScatterMultiProcessCorrectnessTest, + testing::Combine( + // Reduction operator is not used + testing::Values(ncclSum), + // Data types + testing::Values(ncclInt8, + ncclUint8, + ncclInt32, + ncclUint32, + ncclInt64, + ncclUint64, + //ncclFloat16, + ncclFloat32, + ncclFloat64, + ncclBfloat16), + // Number of elements + testing::Values(1024, 1048576), + // Number of devices + testing::Values(2,3,4), + // In-place or not + testing::Values(false), + testing::Values("RCCL_ALLTOALL_KERNEL_DISABLE=0", "RCCL_ALLTOALL_KERNEL_DISABLE=1")), + CorrectnessTest::PrintToStringParamName()); +} // namespace diff --git a/test/test_ScatterMultiProcess.hpp b/test/test_ScatterMultiProcess.hpp new file mode 100644 index 0000000000..d48040246d --- /dev/null +++ b/test/test_ScatterMultiProcess.hpp @@ -0,0 +1,64 @@ +/************************************************************************* + * Copyright (c) 2019-2021 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#ifndef TEST_SCATTER_MULTI_PROCESS_HPP +#define TEST_SCATTER_MULTI_PROCESS_HPP + +#include "CorrectnessTest.hpp" + +namespace CorrectnessTests +{ + class ScatterMultiProcessCorrectnessTest : public MultiProcessCorrectnessTest + { + public: + static void ComputeExpectedResults(Dataset& dataset, int const root, int const rank) + { + if (rank == root) + { + for (int i = 0; i < dataset.numDevices; i++) + HIP_CALL(hipMemcpy(dataset.expected[i], (int8_t *)dataset.inputs[root]+dataset.NumBytes()*i, + dataset.NumBytes(), hipMemcpyDeviceToHost)); + } + } + + void TestScatter(int rank, Dataset& dataset) + { + // Prepare input / output / expected results + SetUpPerProcess(rank, ncclCollScatter, comms[rank], streams[rank], dataset); + + if (numDevices > numDevicesAvailable) return; + + Barrier barrier(rank, numDevices, std::atoi(getenv("NCCL_COMM_ID"))); + + // Test each possible root + for (int root = 0; root < numDevices; root++) + { + // Prepare input / output / expected results + FillDatasetWithPattern(dataset, rank); + + ComputeExpectedResults(dataset, root, rank); + + // Launch the reduction (1 process per GPU) + ncclScatter(dataset.inputs[rank], + dataset.outputs[rank], + numElements, dataType, + root, comms[rank], streams[rank]); + + // Wait for reduction to complete + HIP_CALL(hipStreamSynchronize(streams[rank])); + + // Check results + ValidateResults(dataset, rank); + + barrier.Wait(); + } + + TearDownPerProcess(comms[rank], streams[rank]); + dataset.Release(rank); + } + }; +} + +#endif