Files
Atul Kulkarni 142860442a Enable MPI support to execute MPI specific unit/functional tests (#1996)
* Added MPI support to execute unit/functional tests

Update node and process validation
Updated node detection count and modified validation method
Update validation logic to include max procs and nodes

* Address review comments

* Fix warnings

* Added a new NET transport test and clean up

* Added MPI test logging mechanism

* Decoupled GTest framework

* Added Net IB functional tests

* Updated with resource guards

* Added NET IB tests and refactored code

* Update P2pWorkflow test

* Update documentation

* Add MPI_TESTS_ENABLED guard to the file

* Fix Shm and NetIB tests

* Applied refactoring and cleanup

* Replaced BufferGuard with AutoGuard

* Modified test debug logging

* Use macro to reduce NcclTypeTraits code duplication

- Replace repetitive template specializations with a single
  DEFINE_NCCL_TYPE_TRAIT macro
- Use stringification operator (#) to auto-generate type name strings
- Add #undef to keep macro from polluting namespace
- Makes adding new type mappings trivial

* Unify buffer initialization with generic pattern function

- Remove initializeBufferWithCustomPattern
- Make initializeBufferWithPattern generic with PatternFunc template param
- Now single function handles all patterns via lambda injection
- Updated all test files to use lambdas for pattern generation
- Pattern logic now visible at call site (self-documenting)

* Unify buffer verification with pluggable pattern function

- Remove verifyBufferWithCustomCheck
- Make verifyBufferData generic with PatternFunc template param
- Single function handles all verification patterns via lambda injection
- Updated all test files to use lambdas
- Better defaults: num_samples=0 means verify all elements
- Pattern logic now visible at call site (self-documenting)

* Docs: Add DeviceBufferHelpers section to MPITestRunner.md

- Document new refactored buffer initialization/verification API
- Explain pluggable pattern functions with lambda examples
- Show type mapping and automatic float/int comparison
- Include migration guide from old API to new unified functions
- Demonstrate best practices with real-world examples
- Reference recent refactoring commits (macro-based type traits)

* Docs: Update documentation and examples

- Update on DeviceBufferHelpers
- Update examples using DeviceBufferHelpers methods, e.g. data verification

* Address review comment.

- Replace manual pattern generation loop with initializeBufferWithPattern call
- Use downloadBuffer to get host copy instead of manual hipMemcpy

* Remove non-existent dependency

* Remove duplicate testcase

* Code cleanup in test files

* Moved common constants to base class

[ROCm/rccl commit: 29e1567b95]
2025-12-06 16:05:37 -06:00

314 строки
12 KiB
C++

/*************************************************************************
* Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include "TransportMPIBase.hpp"
#ifdef MPI_TESTS_ENABLED
namespace
{
// Test pattern generation constants for TransportTestBase
inline constexpr int kDefaultPatternMultiplier = 100; // For transport base patterns
inline constexpr int kByteValueModulo = 256; // For uint8_t wraparound
} // namespace
// Override createTestCommunicator to also update config and transport components
ncclResult_t TransportTestBase::createTestCommunicator()
{
// Call base class implementation
ncclResult_t result = MPITestBase::createTestCommunicator();
if(result == ncclSuccess)
{
// Update config with the new communicator and stream
config.nccl_comm = getActiveCommunicator();
config.stream = getActiveStream();
// Initialize transport components now that we have a valid communicator
comm_handle = config.nccl_comm;
local_peer_info = &comm_handle->peerInfo[config.world_rank];
remote_peer_info = &comm_handle->peerInfo[config.peer_rank];
if(config.world_rank == 0)
{
TEST_INFO("TransportTestBase config and transport components updated with per-test "
"communicator");
}
}
return result;
}
// Set transport type and initialize connectors accordingly
void TransportTestBase::setTransportType(TransportType type)
{
initialized_transport = type;
switch(type)
{
case TransportType::P2P:
send_connector.transportComm = &p2pTransport.send;
recv_connector.transportComm = &p2pTransport.recv;
break;
case TransportType::Network:
send_connector.transportComm = &netTransport.send;
recv_connector.transportComm = &netTransport.recv;
break;
case TransportType::SHM:
send_connector.transportComm = &shmTransport.send;
recv_connector.transportComm = &shmTransport.recv;
break;
case TransportType::None:
send_connector.transportComm = nullptr;
recv_connector.transportComm = nullptr;
break;
}
}
// SetUp: Initialize common transport test components
void TransportTestBase::SetUp()
{
// Call GTest's SetUp (which will call MPITestCore::initializeTest())
MPITestBase::SetUp();
// Initialize test configuration using aggregate initialization
// Note: rccl_comm and stream are set to nullptr initially; tests must call createTestCommunicator()
config = {.world_rank = MPIEnvironment::world_rank,
.world_size = MPIEnvironment::world_size,
.peer_rank = (MPIEnvironment::world_rank == 0) ? 1 : 0,
.nccl_comm = nullptr,
.stream = nullptr};
// Require at least 2 MPI processes for testing
if(config.world_size < 2)
{
GTEST_SKIP() << "Transport testing requires at least 2 MPI processes";
}
// Check if MPIEnvironment was properly initialized
if(MPIEnvironment::retCode != 0)
{
GTEST_FAIL() << "MPIEnvironment initialization failed";
}
// Initialize transport component pointers to nullptr
// They will be set in createTestCommunicator() after the communicator is created
comm_handle = nullptr;
local_peer_info = nullptr;
remote_peer_info = nullptr;
// Create and initialize topology graph
topology_graph = static_cast<ncclTopoGraph*>(malloc(sizeof(ncclTopoGraph)));
if(topology_graph)
{
*topology_graph = {.id = 0,
.pattern = NCCL_TOPO_PATTERN_RING,
.nChannels = 1,
.bwIntra = 0.0f,
.bwInter = 0.0f,
.typeIntra = PATH_SYS,
.typeInter = PATH_NET};
}
// Initialize with P2P transport by default
// Tests can call setTransportType() to switch to SHM or Network
setTransportType(TransportType::P2P);
}
// TearDown: Cleanup common transport test components
void TransportTestBase::TearDown()
{
// CRITICAL: Synchronize device before freeing connectors
// The transport proxy may have its own internal stream for CE memcpy operations
// that must be idle before we can destroy it
// Note: We ignore errors here as we're in cleanup path
(void)hipDeviceSynchronize();
// Cleanup topology graph
if(topology_graph)
{
free(topology_graph);
topology_graph = nullptr;
}
// Cleanup transport resources based on initialized transport type
if(send_connector.transportResources)
{
if(initialized_transport == TransportType::P2P)
{
p2pTransport.send.free(&send_connector);
}
else if(initialized_transport == TransportType::SHM)
{
shmTransport.send.free(&send_connector);
}
else if(initialized_transport == TransportType::Network)
{
netTransport.send.free(&send_connector);
}
send_connector.transportResources = nullptr;
}
if(recv_connector.transportResources)
{
if(initialized_transport == TransportType::P2P)
{
p2pTransport.recv.free(&recv_connector);
}
else if(initialized_transport == TransportType::SHM)
{
shmTransport.recv.free(&recv_connector);
}
else if(initialized_transport == TransportType::Network)
{
netTransport.recv.free(&recv_connector);
}
recv_connector.transportResources = nullptr;
}
// Reset transport type
initialized_transport = TransportType::None;
// Nullify peer info pointers
local_peer_info = nullptr;
remote_peer_info = nullptr;
comm_handle = nullptr;
// Note: Clear RAII guard vectors BEFORE destroying communicator
// The guards (especially NcclRegHandleGuard) need the communicator to be valid
// when they call ncclCommDeregister() in their destructors
reg_handle_guards_.clear();
buffer_guards_.clear();
// Call base class TearDown to cleanup test communicator
// This calls MPITestBase::TearDown() -> MPITestCore::cleanupTest() -> cleanupTestCommunicator()
MPITestBase::TearDown();
}
// Allocate and initialize test buffers
void TransportTestBase::allocateAndInitBuffers(void** send_buffer,
void** recv_buffer,
size_t send_bytes,
size_t recv_bytes)
{
// Allocate send buffer
ASSERT_EQ(hipSuccess, hipMalloc(send_buffer, send_bytes))
<< "Rank " << config.world_rank << ": Failed to allocate send buffer";
// Allocate recv buffer
ASSERT_EQ(hipSuccess, hipMalloc(recv_buffer, recv_bytes))
<< "Rank " << config.world_rank << ": Failed to allocate recv buffer";
std::vector<uint8_t> host_data(send_bytes);
for(size_t i = 0; i < host_data.size(); i++)
{
host_data[i] = static_cast<uint8_t>((config.world_rank * kDefaultPatternMultiplier + i)
% kByteValueModulo);
}
ASSERT_EQ(hipSuccess,
hipMemcpy(*send_buffer, host_data.data(), send_bytes, hipMemcpyHostToDevice))
<< "Rank " << config.world_rank << ": Failed to initialize send buffer";
if(config.world_rank == 0)
{
TEST_INFO("Allocated and initialized buffers (%zu bytes each)", send_bytes);
}
}
// Pre-register buffers with ncclCommRegister
void TransportTestBase::preRegisterBuffers(void* send_buffer,
void* recv_buffer,
size_t send_bytes,
size_t recv_bytes,
void** send_reg_handle,
void** recv_reg_handle)
{
ncclComm_t comm = getActiveCommunicator();
// Register send buffer
ncclResult_t result = ncclCommRegister(comm, send_buffer, send_bytes, send_reg_handle);
ASSERT_EQ(ncclSuccess, result)
<< "Rank " << config.world_rank
<< ": Failed to pre-register send buffer: " << ncclGetErrorString(result);
// Register recv buffer
result = ncclCommRegister(comm, recv_buffer, recv_bytes, recv_reg_handle);
ASSERT_EQ(ncclSuccess, result)
<< "Rank " << config.world_rank
<< ": Failed to pre-register recv buffer: " << ncclGetErrorString(result);
}
// Buffer allocation with automatic RAII guards
std::pair<DeviceBufferAutoGuard, DeviceBufferAutoGuard>
TransportTestBase::allocateAndInitBuffersGuarded(void** send_buffer,
void** recv_buffer,
size_t send_bytes,
size_t recv_bytes,
bool store_in_base)
{
// Allocate buffers using existing method
allocateAndInitBuffers(send_buffer, recv_buffer, send_bytes, recv_bytes);
// Create guards
auto sendGuard = makeDeviceBufferAutoGuard(*send_buffer); // Device memory
auto recvGuard = makeDeviceBufferAutoGuard(*recv_buffer); // Device memory
if(store_in_base)
{
// Store guards in base class for cleanup at test end
buffer_guards_.push_back(std::move(sendGuard));
buffer_guards_.push_back(std::move(recvGuard));
// Return empty guards (resources now managed by base class)
return {makeDeviceBufferAutoGuard(nullptr), makeDeviceBufferAutoGuard(nullptr)};
}
else
{
// Return guards for caller to manage (cleanup at caller's scope exit)
return {std::move(sendGuard), std::move(recvGuard)};
}
}
// Buffer registration with automatic RAII guards
std::pair<NcclRegHandleGuard, NcclRegHandleGuard>
TransportTestBase::preRegisterBuffersGuarded(void* send_buffer,
void* recv_buffer,
size_t send_bytes,
size_t recv_bytes,
void** send_reg_handle,
void** recv_reg_handle,
bool store_in_base)
{
// Register buffers using existing method
preRegisterBuffers(send_buffer,
recv_buffer,
send_bytes,
recv_bytes,
send_reg_handle,
recv_reg_handle);
// Create guards (handles may be nullptr if registration is not needed)
NcclRegHandleGuard sendGuard(*send_reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
NcclRegHandleGuard recvGuard(*recv_reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
if(store_in_base)
{
// Store guards in base class for cleanup at test end
reg_handle_guards_.push_back(std::move(sendGuard));
reg_handle_guards_.push_back(std::move(recvGuard));
// Return empty guards (resources now managed by base class)
return {makeRegHandleGuard(nullptr, nullptr), makeRegHandleGuard(nullptr, nullptr)};
}
else
{
// Return guards for caller to manage (cleanup at caller's scope exit)
return {std::move(sendGuard), std::move(recvGuard)};
}
}
#endif // MPI_TESTS_ENABLED