29e1567b95
* Added MPI support to execute unit/functional tests Update node and process validation Updated node detection count and modified validation method Update validation logic to include max procs and nodes * Address review comments * Fix warnings * Added a new NET transport test and clean up * Added MPI test logging mechanism * Decoupled GTest framework * Added Net IB functional tests * Updated with resource guards * Added NET IB tests and refactored code * Update P2pWorkflow test * Update documentation * Add MPI_TESTS_ENABLED guard to the file * Fix Shm and NetIB tests * Applied refactoring and cleanup * Replaced BufferGuard with AutoGuard * Modified test debug logging * Use macro to reduce NcclTypeTraits code duplication - Replace repetitive template specializations with a single DEFINE_NCCL_TYPE_TRAIT macro - Use stringification operator (#) to auto-generate type name strings - Add #undef to keep macro from polluting namespace - Makes adding new type mappings trivial * Unify buffer initialization with generic pattern function - Remove initializeBufferWithCustomPattern - Make initializeBufferWithPattern generic with PatternFunc template param - Now single function handles all patterns via lambda injection - Updated all test files to use lambdas for pattern generation - Pattern logic now visible at call site (self-documenting) * Unify buffer verification with pluggable pattern function - Remove verifyBufferWithCustomCheck - Make verifyBufferData generic with PatternFunc template param - Single function handles all verification patterns via lambda injection - Updated all test files to use lambdas - Better defaults: num_samples=0 means verify all elements - Pattern logic now visible at call site (self-documenting) * Docs: Add DeviceBufferHelpers section to MPITestRunner.md - Document new refactored buffer initialization/verification API - Explain pluggable pattern functions with lambda examples - Show type mapping and automatic float/int comparison - Include migration guide from old API to new unified functions - Demonstrate best practices with real-world examples - Reference recent refactoring commits (macro-based type traits) * Docs: Update documentation and examples - Update on DeviceBufferHelpers - Update examples using DeviceBufferHelpers methods, e.g. data verification * Address review comment. - Replace manual pattern generation loop with initializeBufferWithPattern call - Use downloadBuffer to get host copy instead of manual hipMemcpy * Remove non-existent dependency * Remove duplicate testcase * Code cleanup in test files * Moved common constants to base class
1707 line
70 KiB
C++
1707 line
70 KiB
C++
/*************************************************************************
|
|
* Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
|
|
*
|
|
* See LICENSE.txt for license information
|
|
************************************************************************/
|
|
|
|
#include "DeviceBufferHelpers.hpp"
|
|
#include "ResourceGuards.hpp"
|
|
#include "TransportMPIBase.hpp"
|
|
|
|
#include <algorithm>
|
|
#include <cmath>
|
|
#include <string>
|
|
|
|
#ifdef MPI_TESTS_ENABLED
|
|
|
|
// Import MPI test constants
|
|
using namespace MPITestConstants;
|
|
using namespace RCCLTestGuards;
|
|
using namespace RCCLTestHelpers;
|
|
using namespace TransportTestConstants;
|
|
|
|
// P2P-specific test configuration
|
|
struct P2PTestConfig
|
|
{
|
|
bool is_sender{false};
|
|
void* send_buffer{nullptr};
|
|
void* recv_buffer{nullptr};
|
|
size_t buffer_size{0};
|
|
};
|
|
|
|
class P2pMPITest : public TransportTestBase
|
|
{
|
|
protected:
|
|
P2PTestConfig p2p_config;
|
|
|
|
// Connection info structures for MPI exchange between helper methods
|
|
ncclConnect send_connect_info{};
|
|
ncclConnect recv_connect_info{};
|
|
|
|
void SetUp() override
|
|
{
|
|
// Call base class SetUp first
|
|
TransportTestBase::SetUp();
|
|
|
|
}
|
|
|
|
// Helper: Allocate P2P-specific resources
|
|
void setupP2PBuffers()
|
|
{
|
|
// Set up P2P-specific test configuration
|
|
p2p_config.is_sender = (config.world_rank == 0);
|
|
p2p_config.buffer_size = kDefaultBufferSize;
|
|
|
|
// Allocate and initialize send buffer with test pattern
|
|
constexpr size_t num_elements = kDefaultBufferSize / sizeof(float);
|
|
auto [send_err, _] = allocateAndInitialize<float>(&p2p_config.send_buffer,
|
|
num_elements,
|
|
config.world_rank);
|
|
ASSERT_EQ(hipSuccess, send_err)
|
|
<< "Rank " << config.world_rank << ": Failed to allocate/initialize send buffer";
|
|
auto sendGuard = makeDeviceBufferAutoGuard(p2p_config.send_buffer);
|
|
|
|
// Allocate and zero-initialize receive buffer
|
|
hipError_t hip_result = hipMalloc(&p2p_config.recv_buffer, p2p_config.buffer_size);
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank << ": Failed to allocate recv buffer";
|
|
auto recvGuard = makeDeviceBufferAutoGuard(p2p_config.recv_buffer);
|
|
|
|
hip_result = zeroInitializeBuffer<float>(p2p_config.recv_buffer, num_elements);
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank << ": Failed to zero-initialize recv buffer";
|
|
|
|
// Synchronize default stream to ensure all buffer operations complete
|
|
// Note: This is called before createTestCommunicator(), so we use the default stream (0)
|
|
ASSERT_EQ(hipSuccess, hipStreamSynchronize(0))
|
|
<< "Rank " << config.world_rank
|
|
<< ": Failed to synchronize default stream after buffer initialization";
|
|
|
|
// Release guards - buffers must persist beyond setupP2PBuffers() scope
|
|
// They will be manually cleaned up in TearDown() BEFORE base class teardown
|
|
// to avoid "invalid resource handle" errors with proxy connections
|
|
sendGuard.release();
|
|
recvGuard.release();
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("P2P buffers allocated successfully");
|
|
}
|
|
}
|
|
|
|
void TearDown() override
|
|
{
|
|
// Cleanup P2P-specific test resources BEFORE calling base class TearDown
|
|
// Note: Buffers must be freed while the communicator and proxy connections
|
|
// are still valid. The base class TearDown() destroys the communicator, which
|
|
// triggers proxy thread shutdown and frees all proxy connections. If we free
|
|
// buffers after that, we get "invalid resource handle" errors.
|
|
if(p2p_config.send_buffer)
|
|
{
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipFree(p2p_config.send_buffer));
|
|
p2p_config.send_buffer = nullptr;
|
|
}
|
|
if(p2p_config.recv_buffer)
|
|
{
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipFree(p2p_config.recv_buffer));
|
|
p2p_config.recv_buffer = nullptr;
|
|
}
|
|
|
|
// Call base class TearDown to cleanup communicator and proxy connections
|
|
TransportTestBase::TearDown();
|
|
}
|
|
|
|
public:
|
|
// Test P2P capability detection (peer-to-peer communication)
|
|
void testP2pCanConnect()
|
|
{
|
|
// Validate preconditions
|
|
ASSERT_NE(nullptr, comm_handle)
|
|
<< "Rank " << config.world_rank
|
|
<< ": comm_handle is null - NCCL communicator not initialized";
|
|
ASSERT_NE(nullptr, local_peer_info)
|
|
<< "Rank " << config.world_rank
|
|
<< ": local_peer_info is null - peer information not initialized";
|
|
ASSERT_NE(nullptr, remote_peer_info)
|
|
<< "Rank " << config.world_rank
|
|
<< ": remote_peer_info is null - peer information not initialized";
|
|
|
|
int can_connect = 0;
|
|
const auto result = p2pTransport.canConnect(&can_connect,
|
|
comm_handle,
|
|
topology_graph,
|
|
local_peer_info,
|
|
remote_peer_info);
|
|
|
|
ASSERT_EQ(ncclSuccess, result) << "Rank " << config.world_rank
|
|
<< ": p2pCanConnect failed: " << ncclGetErrorString(result);
|
|
|
|
// Synchronize the stream to ensure all operations complete
|
|
ASSERT_EQ(hipSuccess, syncStream(config.stream, config.world_rank))
|
|
<< "Rank " << config.world_rank << ": Stream synchronization failed";
|
|
}
|
|
|
|
// Test P2P setup phase
|
|
void testP2pSetup()
|
|
{
|
|
// Call setup() and save the connect_info to class members for later MPI exchange
|
|
const auto result = p2p_config.is_sender
|
|
? p2pTransport.send.setup(comm_handle,
|
|
topology_graph,
|
|
local_peer_info,
|
|
remote_peer_info,
|
|
&send_connect_info,
|
|
&send_connector,
|
|
0,
|
|
0)
|
|
: p2pTransport.recv.setup(comm_handle,
|
|
topology_graph,
|
|
local_peer_info,
|
|
remote_peer_info,
|
|
&recv_connect_info,
|
|
&recv_connector,
|
|
0,
|
|
0);
|
|
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": " << (p2p_config.is_sender ? "Send" : "Recv")
|
|
<< " setup failed: " << ncclGetErrorString(result);
|
|
}
|
|
|
|
// Test P2P connection phase
|
|
void testP2pConnect()
|
|
{
|
|
// Validate preconditions
|
|
ASSERT_NE(nullptr, comm_handle) << "Rank " << config.world_rank << ": comm_handle is null";
|
|
ASSERT_NE(nullptr, local_peer_info)
|
|
<< "Rank " << config.world_rank << ": local_peer_info is null";
|
|
ASSERT_NE(nullptr, remote_peer_info)
|
|
<< "Rank " << config.world_rank << ": remote_peer_info is null";
|
|
|
|
// NOTE: setup() was already called in testP2pSetup() and saved connect_info to class members
|
|
// This method only does MPI exchange of connect_info and then calls connect()
|
|
|
|
if(p2p_config.is_sender)
|
|
{
|
|
// Exchange connect info with receiver using MPI
|
|
ASSERT_EQ(MPI_SUCCESS,
|
|
MPI_Send(&send_connect_info, // Use class member from testP2pSetup()
|
|
sizeof(ncclConnect),
|
|
MPI_BYTE,
|
|
config.peer_rank,
|
|
0,
|
|
MPI_COMM_WORLD))
|
|
<< "Rank " << config.world_rank << ": MPI_Send failed";
|
|
|
|
ASSERT_EQ(MPI_SUCCESS,
|
|
MPI_Recv(&recv_connect_info, // Receive into class member
|
|
sizeof(ncclConnect),
|
|
MPI_BYTE,
|
|
config.peer_rank,
|
|
0,
|
|
MPI_COMM_WORLD,
|
|
MPI_STATUS_IGNORE))
|
|
<< "Rank " << config.world_rank << ": MPI_Recv failed";
|
|
|
|
// Perform the actual connection using the received info
|
|
auto result = p2pTransport.send.connect(comm_handle,
|
|
&recv_connect_info,
|
|
config.world_size,
|
|
config.world_rank,
|
|
&send_connector);
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": Send connect failed: " << ncclGetErrorString(result);
|
|
}
|
|
else
|
|
{
|
|
// Exchange connect info with sender using MPI
|
|
ASSERT_EQ(MPI_SUCCESS,
|
|
MPI_Recv(&send_connect_info, // Receive into class member
|
|
sizeof(ncclConnect),
|
|
MPI_BYTE,
|
|
config.peer_rank,
|
|
0,
|
|
MPI_COMM_WORLD,
|
|
MPI_STATUS_IGNORE))
|
|
<< "Rank " << config.world_rank << ": MPI_Recv failed";
|
|
|
|
ASSERT_EQ(MPI_SUCCESS,
|
|
MPI_Send(&recv_connect_info, // Use class member from testP2pSetup()
|
|
sizeof(ncclConnect),
|
|
MPI_BYTE,
|
|
config.peer_rank,
|
|
0,
|
|
MPI_COMM_WORLD))
|
|
<< "Rank " << config.world_rank << ": MPI_Send failed";
|
|
|
|
// Perform the actual connection using the received info
|
|
auto result = p2pTransport.recv.connect(comm_handle,
|
|
&send_connect_info,
|
|
config.world_size,
|
|
config.world_rank,
|
|
&recv_connector);
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": Recv connect failed: " << ncclGetErrorString(result);
|
|
}
|
|
|
|
// Synchronize the stream to ensure all RCCL operations complete
|
|
ASSERT_EQ(hipSuccess, syncStream(config.stream, config.world_rank))
|
|
<< "Rank " << config.world_rank << ": Stream synchronization failed";
|
|
}
|
|
|
|
// Test actual data transfer through P2P
|
|
void testP2pDataTransfer()
|
|
{
|
|
// Use RCCL point-to-point operations to validate P2P transport
|
|
const size_t count = p2p_config.buffer_size / sizeof(float);
|
|
const auto result = p2p_config.is_sender ? ncclSend(p2p_config.send_buffer,
|
|
count,
|
|
ncclFloat,
|
|
config.peer_rank,
|
|
config.nccl_comm,
|
|
config.stream)
|
|
: ncclRecv(p2p_config.recv_buffer,
|
|
count,
|
|
ncclFloat,
|
|
config.peer_rank,
|
|
config.nccl_comm,
|
|
config.stream);
|
|
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": RCCL " << (p2p_config.is_sender ? "Send" : "Recv")
|
|
<< " failed: " << ncclGetErrorString(result);
|
|
|
|
ASSERT_EQ(hipSuccess, syncStream(config.stream, config.world_rank))
|
|
<< "Rank " << config.world_rank << ": Stream synchronization failed";
|
|
|
|
// Only validate data on the receiver side
|
|
if(!p2p_config.is_sender)
|
|
{
|
|
size_t error_idx;
|
|
float expected_val, actual_val;
|
|
|
|
// Validate received data using verifyBufferData template
|
|
// The data should match the SENDER's pattern (peer_rank is the sender's rank)
|
|
bool data_correct = verifyBufferData<float>(
|
|
p2p_config.recv_buffer,
|
|
count,
|
|
[peer_rank = config.peer_rank](size_t i) {
|
|
return static_cast<float>(peer_rank * kDefaultPatternMultiplier + i);
|
|
},
|
|
kMaxValidationElements,
|
|
1e-5,
|
|
&error_idx,
|
|
&expected_val,
|
|
&actual_val);
|
|
|
|
EXPECT_TRUE(data_correct) << "Rank " << config.world_rank
|
|
<< ": Data validation failed at index " << error_idx
|
|
<< ": expected " << expected_val << ", got " << actual_val;
|
|
|
|
if(data_correct && config.world_rank == 0)
|
|
{
|
|
TEST_INFO("P2P data transfer validation successful - received correct data from rank %d",
|
|
config.peer_rank);
|
|
}
|
|
else if(!data_correct)
|
|
{
|
|
TEST_WARN("Rank %d: Failed to validate data received from rank %d",
|
|
config.world_rank,
|
|
config.peer_rank);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Test resource cleanup
|
|
void testP2pCleanup()
|
|
{
|
|
// Ensure all stream operations complete before validation
|
|
[[maybe_unused]] auto err = syncStream(config.stream, config.world_rank);
|
|
// Don't return error on sync failure - continue with validation
|
|
|
|
// Validate that connector resources are still valid at this point
|
|
// The actual cleanup will be handled by base class TearDown()
|
|
auto* connector = p2p_config.is_sender ? &send_connector : &recv_connector;
|
|
|
|
EXPECT_NE(nullptr, connector)
|
|
<< "Rank " << config.world_rank << ": Connector pointer is null";
|
|
|
|
if(connector)
|
|
{
|
|
EXPECT_NE(nullptr, connector->transportResources)
|
|
<< "Rank " << config.world_rank << ": " << (p2p_config.is_sender ? "Send" : "Recv")
|
|
<< " connector transport resources are null (premature cleanup)";
|
|
|
|
if(config.world_rank == 0 && connector->transportResources)
|
|
{
|
|
TEST_INFO("Connector resources validated - still active (will be freed by base class)");
|
|
}
|
|
}
|
|
|
|
// NOTE: Connectors will be automatically freed by base class TearDown()
|
|
// Device sync + connector cleanup happens BEFORE buffers are freed, which is critical for CE memcpy
|
|
}
|
|
|
|
// Test proxyConnect and proxyProgress specifically when useMemcpy is enabled
|
|
void testProxyConnectProgressWithMemcpy()
|
|
{
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing proxyConnect and proxyProgress with CE memcpy support (V2)...");
|
|
}
|
|
|
|
// Check if NCCL_P2P_USE_CUDA_MEMCPY is set externally - skip test if not
|
|
const char* p2p_memcpy_env = getenv("NCCL_P2P_USE_CUDA_MEMCPY");
|
|
if(!p2p_memcpy_env || strcmp(p2p_memcpy_env, "1") != 0)
|
|
{
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Skipping CE memcpy test - NCCL_P2P_USE_CUDA_MEMCPY not set to '1'");
|
|
TEST_INFO("To enable this test, set: export NCCL_P2P_USE_CUDA_MEMCPY=1");
|
|
}
|
|
return; // Skip test gracefully
|
|
}
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Found NCCL_P2P_USE_CUDA_MEMCPY=1 - CE memcpy mode enabled");
|
|
}
|
|
|
|
// Use the test communicator created by createTestCommunicator()
|
|
// This provides better isolation and automatic cleanup
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Using test communicator with CE memcpy mode");
|
|
}
|
|
|
|
// Test with a smaller buffer size to ensure successful operations
|
|
const size_t buffer_size = 1024 * sizeof(float);
|
|
void* send_buffer = nullptr;
|
|
void* recv_buffer = nullptr;
|
|
|
|
hipError_t hip_result = hipMalloc(&send_buffer, buffer_size);
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": Failed to allocate send buffer for memcpy test, error: "
|
|
<< hipGetErrorString(hip_result);
|
|
auto sendBufferGuard = makeDeviceBufferAutoGuard(send_buffer);
|
|
|
|
hip_result = hipMalloc(&recv_buffer, buffer_size);
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": Failed to allocate recv buffer for memcpy test, error: "
|
|
<< hipGetErrorString(hip_result);
|
|
auto recvBufferGuard = makeDeviceBufferAutoGuard(recv_buffer);
|
|
|
|
// Initialize send buffer with test pattern
|
|
hip_result = initializeBufferWithPattern<float>(
|
|
send_buffer, 1024, [rank = config.world_rank](size_t i) {
|
|
return static_cast<float>(rank * kSmallPatternMultiplier + i);
|
|
});
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": Failed to initialize send buffer for memcpy test, error: "
|
|
<< hipGetErrorString(hip_result);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Allocated buffers for CE memcpy testing");
|
|
}
|
|
|
|
// Test: AllReduce operation - this triggers proxyConnect and proxyProgress
|
|
// in CE memcpy mode using the test communicator and stream
|
|
const ncclResult_t allreduce_result = ncclAllReduce(send_buffer,
|
|
recv_buffer,
|
|
1024,
|
|
ncclFloat,
|
|
ncclSum,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, allreduce_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": AllReduce with CE memcpy failed, result: " << allreduce_result << " ("
|
|
<< ncclGetErrorString(allreduce_result) << ")";
|
|
if(allreduce_result == ncclSuccess && config.world_rank == 0)
|
|
{
|
|
TEST_INFO("AllReduce with CE memcpy successful (exercises proxyProgress)");
|
|
}
|
|
|
|
// Synchronize stream using getActiveStream()
|
|
hip_result = hipStreamSynchronize(getActiveStream());
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": Stream sync failed after CE memcpy AllReduce, error: "
|
|
<< hipGetErrorString(hip_result);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("CE memcpy proxy test completed successfully");
|
|
TEST_INFO("Summary of CE memcpy proxy functions exercised:");
|
|
TEST_INFO(" - proxyConnect: Called with CE memcpy setup (p2pSendProxyConnect)");
|
|
TEST_INFO(" - proxyProgress: Called during operations (p2pSendProxyProgress)");
|
|
TEST_INFO(" - CE memcpy features: CUDA streams, events, shared memory");
|
|
TEST_INFO(" - Proxy resource management: Buffer allocation and cleanup");
|
|
}
|
|
}
|
|
|
|
// Test basic P2P IPC buffer registration with comprehensive steps
|
|
void testP2PRegistrationBasicBuffers()
|
|
{
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing P2P IPC buffer registration via ncclSend/ncclRecv...");
|
|
}
|
|
|
|
// Step 1: Allocate and initialize test buffers
|
|
void* send_buffer = nullptr;
|
|
void* recv_buffer = nullptr;
|
|
|
|
allocateAndInitBuffers(&send_buffer, &recv_buffer, kLargeBufferSize, kLargeBufferSize);
|
|
|
|
// Step 2: Pre-register buffers with ncclCommRegister (required for SIMPLE protocol)
|
|
void* send_reg_handle = nullptr;
|
|
void* recv_reg_handle = nullptr;
|
|
|
|
preRegisterBuffers(send_buffer,
|
|
recv_buffer,
|
|
kLargeBufferSize,
|
|
kLargeBufferSize,
|
|
&send_reg_handle,
|
|
&recv_reg_handle);
|
|
|
|
// Guard registration handles for automatic cleanup
|
|
NcclRegHandleGuard sendRegGuard(send_reg_handle,
|
|
NcclRegHandleDeleter(getActiveCommunicator()));
|
|
NcclRegHandleGuard recvRegGuard(recv_reg_handle,
|
|
NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Pre-registered buffers with ncclCommRegister");
|
|
}
|
|
|
|
// Step 3: Initialize send buffer with test pattern
|
|
const size_t num_floats = kLargeBufferSize / sizeof(float);
|
|
hipError_t hip_result = initializeBufferWithPattern<float>(
|
|
send_buffer, num_floats, [rank = config.world_rank](size_t i) {
|
|
return static_cast<float>(rank * 1000 + i);
|
|
});
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank << ": Failed to initialize send buffer";
|
|
|
|
// Step 4: Determine peer ranks (ring topology like rccl-tests)
|
|
const int nranks = config.world_size;
|
|
const int rank = config.world_rank;
|
|
const int recv_peer = (rank - 1 + nranks) % nranks; // Receive from left neighbor
|
|
const int send_peer = (rank + 1) % nranks; // Send to right neighbor
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Using ring topology - recv from rank %d, send to rank %d",
|
|
recv_peer,
|
|
send_peer);
|
|
}
|
|
|
|
// Step 5: Perform ncclSend/ncclRecv which internally triggers ncclRegisterP2pIpcBuffer
|
|
const size_t count = num_floats;
|
|
|
|
auto nccl_result = ncclGroupStart();
|
|
ASSERT_EQ(ncclSuccess, nccl_result)
|
|
<< "Rank " << config.world_rank << ": ncclGroupStart failed";
|
|
|
|
nccl_result = ncclSend(send_buffer,
|
|
count,
|
|
ncclFloat,
|
|
send_peer,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, nccl_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": ncclSend failed: " << ncclGetErrorString(nccl_result);
|
|
|
|
nccl_result = ncclRecv(recv_buffer,
|
|
count,
|
|
ncclFloat,
|
|
recv_peer,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, nccl_result)
|
|
<< "Rank " << config.world_rank
|
|
<< ": ncclRecv failed: " << ncclGetErrorString(nccl_result);
|
|
|
|
nccl_result = ncclGroupEnd();
|
|
ASSERT_EQ(ncclSuccess, nccl_result)
|
|
<< "Rank " << config.world_rank << ": ncclGroupEnd failed";
|
|
|
|
// Ensure both ranks have completed their NCCL operations before synchronizing
|
|
MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
// Step 6: Synchronize stream to ensure operations complete
|
|
hip_result = hipStreamSynchronize(getActiveStream());
|
|
ASSERT_EQ(hipSuccess, hip_result)
|
|
<< "Rank " << config.world_rank << ": hipStreamSynchronize failed";
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("ncclSend/ncclRecv operations completed successfully");
|
|
}
|
|
|
|
// Step 7: Verify received data correctness
|
|
size_t error_idx;
|
|
float expected_val, actual_val;
|
|
bool data_correct = verifyBufferData<float>(
|
|
recv_buffer,
|
|
num_floats,
|
|
[recv_peer](size_t i) { return static_cast<float>(recv_peer * 1000 + i); },
|
|
10,
|
|
1e-5,
|
|
&error_idx,
|
|
&expected_val,
|
|
&actual_val);
|
|
|
|
EXPECT_TRUE(data_correct) << "Rank " << config.world_rank
|
|
<< ": Data verification failed at index " << error_idx
|
|
<< ": expected " << expected_val << ", got " << actual_val;
|
|
|
|
if(data_correct && config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Data verification passed - received correct data from rank %d", recv_peer);
|
|
}
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("P2P Send/Recv test with IPC registration completed successfully");
|
|
}
|
|
}
|
|
|
|
void testP2PSendRecvRegistration()
|
|
{
|
|
// Allocate and initialize buffers (>16KB for SIMPLE protocol)
|
|
void* send_buffer = nullptr;
|
|
void* recv_buffer = nullptr;
|
|
|
|
allocateAndInitBuffers(&send_buffer, &recv_buffer, kLargeBufferSize, kLargeBufferSize);
|
|
|
|
// Zero recv buffer for clean verification
|
|
ASSERT_EQ(hipSuccess, hipMemset(recv_buffer, 0, kLargeBufferSize))
|
|
<< "Rank " << config.world_rank << ": Failed to zero recv buffer";
|
|
|
|
// Pre-register buffers (creates cache entries for IPC registration)
|
|
void* send_reg_handle = nullptr;
|
|
void* recv_reg_handle = nullptr;
|
|
|
|
preRegisterBuffers(send_buffer,
|
|
recv_buffer,
|
|
kLargeBufferSize,
|
|
kLargeBufferSize,
|
|
&send_reg_handle,
|
|
&recv_reg_handle);
|
|
|
|
// Guard registration handles for automatic cleanup
|
|
NcclRegHandleGuard sendRegGuard(send_reg_handle,
|
|
NcclRegHandleDeleter(getActiveCommunicator()));
|
|
NcclRegHandleGuard recvRegGuard(recv_reg_handle,
|
|
NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
// Execute ncclSend/ncclRecv
|
|
const size_t count = kLargeBufferSize / sizeof(float);
|
|
const int peer = (config.world_rank == 0) ? 1 : 0;
|
|
|
|
auto nccl_result = ncclGroupStart();
|
|
ASSERT_EQ(ncclSuccess, nccl_result);
|
|
|
|
nccl_result = ncclSend(send_buffer,
|
|
count,
|
|
ncclFloat,
|
|
peer,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, nccl_result);
|
|
|
|
nccl_result = ncclRecv(recv_buffer,
|
|
count,
|
|
ncclFloat,
|
|
peer,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, nccl_result);
|
|
|
|
nccl_result = ncclGroupEnd();
|
|
ASSERT_EQ(ncclSuccess, nccl_result);
|
|
|
|
// Ensure both ranks have completed their NCCL operations before synchronizing
|
|
MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
// Synchronize stream (GPU memory access via IPC happens here)
|
|
ASSERT_EQ(hipSuccess, syncStream(getActiveStream(), config.world_rank))
|
|
<< "Rank " << config.world_rank << ": Stream sync failed - try NCCL_P2P_DISABLE=1";
|
|
|
|
// Verify data correctness
|
|
const int peer_rank_verify = 1 - config.world_rank;
|
|
size_t error_idx;
|
|
float expected_val, actual_val;
|
|
bool data_correct = verifyBufferData<float>(
|
|
recv_buffer,
|
|
count,
|
|
[peer_rank_verify](size_t i) {
|
|
return static_cast<float>(peer_rank_verify * 1000 + i);
|
|
},
|
|
10,
|
|
1e-5,
|
|
&error_idx,
|
|
&expected_val,
|
|
&actual_val);
|
|
EXPECT_TRUE(data_correct) << "Rank " << config.world_rank << ": Data mismatch at index "
|
|
<< error_idx << ": expected " << expected_val << ", got "
|
|
<< actual_val;
|
|
}
|
|
|
|
// Test ncclIpcGraphRegisterBuffer API with multiple peers
|
|
void testIpcGraphRegisterBuffer()
|
|
{
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclIpcGraphRegisterBuffer API...");
|
|
}
|
|
|
|
// Allocate and initialize test buffer using helper
|
|
void* send_buffer = nullptr;
|
|
void* recv_buffer = nullptr;
|
|
|
|
allocateAndInitBuffers(&send_buffer, &recv_buffer, kLargeBufferSize, kLargeBufferSize);
|
|
|
|
// Pre-register buffers with ncclCommRegister
|
|
void* send_reg_handle = nullptr;
|
|
void* recv_reg_handle = nullptr;
|
|
|
|
preRegisterBuffers(send_buffer,
|
|
recv_buffer,
|
|
kLargeBufferSize,
|
|
kLargeBufferSize,
|
|
&send_reg_handle,
|
|
&recv_reg_handle);
|
|
|
|
// Guard registration handles for automatic cleanup
|
|
NcclRegHandleGuard sendRegGuard(send_reg_handle,
|
|
NcclRegHandleDeleter(getActiveCommunicator()));
|
|
NcclRegHandleGuard recvRegGuard(recv_reg_handle,
|
|
NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Pre-registered buffers (size: %zu bytes)", kLargeBufferSize);
|
|
}
|
|
|
|
// Set up peer ranks array for IPC registration
|
|
// In a 2-process setup, each rank registers with the other
|
|
const int peer_rank = (config.world_rank == 0) ? 1 : 0;
|
|
int peer_ranks[1] = {peer_rank};
|
|
const int n_peers = 1;
|
|
|
|
// Call ncclIpcGraphRegisterBuffer for send buffer
|
|
int reg_buf_flag = 0;
|
|
uintptr_t offset = 0;
|
|
uintptr_t* peer_rmt_addrs = nullptr;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
int n_cleanup_queue_elts = 0;
|
|
|
|
ncclResult_t result = ncclIpcGraphRegisterBuffer(
|
|
reinterpret_cast<ncclComm*>(getActiveCommunicator()),
|
|
send_buffer,
|
|
kLargeBufferSize,
|
|
peer_ranks,
|
|
n_peers,
|
|
NCCL_IPC_SENDRECV, // Registration type for send/recv operations
|
|
®_buf_flag,
|
|
&offset,
|
|
&peer_rmt_addrs,
|
|
reinterpret_cast<void*>(&cleanup_queue),
|
|
&n_cleanup_queue_elts);
|
|
|
|
ASSERT_EQ(ncclSuccess, result) << "Rank " << config.world_rank
|
|
<< ": ncclIpcGraphRegisterBuffer failed for send buffer: "
|
|
<< ncclGetErrorString(result);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("ncclIpcGraphRegisterBuffer completed successfully");
|
|
TEST_INFO(" Registration flag: %d", reg_buf_flag);
|
|
TEST_INFO(" Buffer offset: %lu", offset);
|
|
TEST_INFO(" Number of peers: %d", n_peers);
|
|
TEST_INFO(" Cleanup queue elements: %d", n_cleanup_queue_elts);
|
|
TEST_INFO(" Remote addresses pointer: %p", static_cast<void*>(peer_rmt_addrs));
|
|
}
|
|
|
|
// Synchronize all ranks after registration
|
|
MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
// Perform communication to verify IPC registration worked correctly
|
|
// This validates that the proxy registration set up the mappings properly
|
|
const size_t count = kLargeBufferSize / sizeof(float);
|
|
|
|
auto nccl_result = ncclGroupStart();
|
|
ASSERT_EQ(ncclSuccess, nccl_result)
|
|
<< "Rank " << config.world_rank << ": ncclGroupStart failed";
|
|
|
|
nccl_result = ncclSend(send_buffer,
|
|
count,
|
|
ncclFloat,
|
|
peer_rank,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, nccl_result) << "Rank " << config.world_rank << ": ncclSend failed";
|
|
|
|
nccl_result = ncclRecv(recv_buffer,
|
|
count,
|
|
ncclFloat,
|
|
peer_rank,
|
|
getActiveCommunicator(),
|
|
getActiveStream());
|
|
ASSERT_EQ(ncclSuccess, nccl_result) << "Rank " << config.world_rank << ": ncclRecv failed";
|
|
|
|
nccl_result = ncclGroupEnd();
|
|
ASSERT_EQ(ncclSuccess, nccl_result)
|
|
<< "Rank " << config.world_rank << ": ncclGroupEnd failed";
|
|
|
|
// Ensure both ranks have completed their NCCL operations before synchronizing
|
|
MPI_Barrier(MPI_COMM_WORLD);
|
|
|
|
// Synchronize stream
|
|
ASSERT_EQ(hipSuccess, hipStreamSynchronize(getActiveStream()))
|
|
<< "Rank " << config.world_rank << ": Stream sync failed after IPC communication";
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Communication with IPC-registered buffer completed");
|
|
}
|
|
|
|
// Verify received data
|
|
size_t error_idx;
|
|
float expected_val, actual_val;
|
|
bool data_correct = verifyBufferData<float>(
|
|
recv_buffer,
|
|
count,
|
|
[peer_rank](size_t i) { return static_cast<float>(peer_rank * 1000 + i); },
|
|
10,
|
|
1e-5,
|
|
&error_idx,
|
|
&expected_val,
|
|
&actual_val);
|
|
|
|
EXPECT_TRUE(data_correct)
|
|
<< "Rank " << config.world_rank
|
|
<< ": IPC graph registered buffer data verification failed at index " << error_idx
|
|
<< ": expected " << expected_val << ", got " << actual_val;
|
|
|
|
if(data_correct && config.world_rank == 0)
|
|
{
|
|
TEST_INFO("IPC graph buffer data verification passed");
|
|
}
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("ncclIpcGraphRegisterBuffer test completed successfully");
|
|
}
|
|
}
|
|
};
|
|
|
|
TEST_F(P2pMPITest, P2pWorkflow)
|
|
{
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kRequirePowerOfTwo,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Setup P2P-specific buffers BEFORE creating communicator
|
|
setupP2PBuffers();
|
|
|
|
// Create test-specific communicator for isolation
|
|
// Use ASSERT_MPI_SUCCESS to prevent deadlock if creation fails on some ranks
|
|
ASSERT_MPI_SUCCESS(createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Starting comprehensive P2P transport workflow test with %d processes", config.world_size);
|
|
TEST_INFO("This test exercises the low-level P2P transport API");
|
|
}
|
|
|
|
// Test 1: P2P Capability Detection
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Step 1: Testing P2P canConnect capability");
|
|
}
|
|
testP2pCanConnect();
|
|
|
|
// Test 2: P2P Setup
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Step 2: Setting up P2P transport connectors");
|
|
}
|
|
testP2pSetup();
|
|
|
|
// Test 3: P2P Connection
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Step 3: Connecting P2P transport");
|
|
}
|
|
testP2pConnect();
|
|
|
|
// Test 4: Data Transfer through P2P
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Step 4: Performing P2P data transfer");
|
|
}
|
|
testP2pDataTransfer();
|
|
|
|
// Test 5: Resource Cleanup
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Step 5: Validating resource cleanup");
|
|
}
|
|
testP2pCleanup();
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("P2P transport workflow test completed successfully");
|
|
TEST_INFO("NOTE: Base class TearDown() handles connector cleanup automatically");
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pWithMemcpyTest)
|
|
{
|
|
// Test validation and resource allocation
|
|
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
|
|
// Create test-specific communicator for isolation
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Starting proxy connect/progress test with memcpy enabled (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
// This test specifically exercises proxyConnect and proxyProgress when
|
|
// useMemcpy is enabled by setting the NCCL_P2P_USE_CUDA_MEMCPY environment
|
|
// variable
|
|
testProxyConnectProgressWithMemcpy();
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Proxy connect/progress memcpy test completed successfully");
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pSendRecvRegistrationTest)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kRequirePowerOfTwo,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
|
|
// TODO: Enable this test once IPC buffer registration feature works as
|
|
// expected
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Skipping P2P Send/Recv with IPC registration test");
|
|
TEST_INFO(
|
|
"This test will be enabled once IPC buffer registration feature works as expected");
|
|
}
|
|
GTEST_SKIP() << "Test disabled - enable once IPC buffer registration feature "
|
|
"works as expected";
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Starting P2P Send/Recv with IPC registration test (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
// This test performs Send/Recv operations which internally trigger
|
|
// ncclRegisterP2pIpcBuffer from sendrecv_reg.cc
|
|
testP2PSendRecvRegistration();
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("P2P Send/Recv with IPC registration test completed successfully");
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pRegistrationBasicBuffersTest)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
|
|
// Create test-specific communicator for isolation (solves shared memory issue)
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Starting basic P2P IPC buffer registration test (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
testP2PRegistrationBasicBuffers();
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Basic P2P IPC buffer registration test completed successfully");
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_NullBufferPointer)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with null buffer pointer (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
// Note: Cannot pre-register null buffer, so this tests the null pointer handling directly
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
nullptr,
|
|
1024,
|
|
peer_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
// Expected behavior: Should handle gracefully (likely return error or skip registration)
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Null buffer test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that null buffer doesn't crash and flag is appropriately set
|
|
EXPECT_NE(result, ncclInternalError)
|
|
<< "Rank " << config.world_rank << ": API should handle null buffer gracefully";
|
|
EXPECT_EQ(0, ipc_reg_flag) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be 0 for null buffer";
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_ZeroSize)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with zero size buffer (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, 1024));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer with actual size (1024)
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, 1024, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
// Test with zero size (buffer is registered but size is 0)
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
0,
|
|
peer_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Zero size buffer test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that zero size is handled appropriately (should not succeed in registration)
|
|
EXPECT_NE(result, ncclInternalError)
|
|
<< "Rank " << config.world_rank << ": API should handle zero size gracefully";
|
|
EXPECT_EQ(0, ipc_reg_flag) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be 0 for zero size buffer";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_VerySmallBuffer)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO(
|
|
"Testing ncclRegisterP2pIpcBuffer with very small buffer (64 bytes) (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
const size_t small_size = 64;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, small_size));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess,
|
|
ncclCommRegister(getActiveCommunicator(), buffer, small_size, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
small_size,
|
|
peer_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Small buffer (64B) test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that small buffer registration succeeds
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": Small buffer registration should succeed";
|
|
// Registration flag may be set depending on whether IPC is available
|
|
EXPECT_GE(ipc_reg_flag, 0) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be non-negative";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_LargeBuffer)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with large buffer (256 MB) (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
const size_t large_size = 256 * 1024 * 1024; // 256 MB
|
|
hipError_t hip_result = hipMalloc(&buffer, large_size);
|
|
|
|
if(hip_result == hipSuccess)
|
|
{
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess,
|
|
ncclCommRegister(getActiveCommunicator(), buffer, large_size, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register large buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
large_size,
|
|
peer_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Large buffer (256MB) test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that large buffer registration succeeds (since allocation succeeded)
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": Large buffer registration should succeed";
|
|
EXPECT_GE(ipc_reg_flag, 0)
|
|
<< "Rank " << config.world_rank << ": Registration flag should be non-negative";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister large buffer";
|
|
}
|
|
}
|
|
else
|
|
{
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Large buffer (256MB) test - Skipped (allocation failed: %s)",
|
|
hipGetErrorString(hip_result));
|
|
}
|
|
GTEST_SKIP() << "Large buffer allocation failed";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_InvalidPeerRank)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with boundary peer rank (%d processes)",
|
|
config.world_size);
|
|
TEST_INFO(
|
|
"NOTE: Testing with last valid peer rank (world_size - 1) instead of invalid rank");
|
|
TEST_INFO(" Out-of-bounds peer ranks cause segfault - implementation should validate "
|
|
"inputs");
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, 1024));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, 1024, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
// Use last valid peer rank instead of out-of-bounds to avoid segfault
|
|
const int boundary_peer = config.world_size - 1;
|
|
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
1024,
|
|
boundary_peer,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Boundary peer rank (%d) test - Result: %s (regFlag=%d)",
|
|
boundary_peer,
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that boundary peer rank is handled correctly
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": Boundary peer rank should succeed";
|
|
EXPECT_GE(ipc_reg_flag, 0) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be non-negative";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_NegativePeerRank)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with peer rank 0 (%d processes)",
|
|
config.world_size);
|
|
TEST_INFO("NOTE: Testing with peer rank 0 instead of negative rank");
|
|
TEST_INFO(
|
|
" Negative peer ranks cause segfault - implementation should validate inputs");
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, 1024));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, 1024, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
// Use peer rank 0 (valid lower boundary) instead of negative to avoid segfault
|
|
const int lower_boundary_peer = 0;
|
|
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
1024,
|
|
lower_boundary_peer,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Lower boundary peer rank (%d) test - Result: %s (regFlag=%d)",
|
|
lower_boundary_peer,
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that peer rank 0 (lower boundary) is handled correctly
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": Lower boundary peer rank should succeed";
|
|
EXPECT_GE(ipc_reg_flag, 0) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be non-negative";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_SameBufferMultipleTimes)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with same buffer multiple times (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, 4096));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, 4096, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
// First registration
|
|
int ipc_reg_flag_1 = 0;
|
|
void* ipc_reg_addr_1 = nullptr;
|
|
ncclResult_t result1 = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
4096,
|
|
peer_rank,
|
|
&ipc_reg_flag_1,
|
|
&ipc_reg_addr_1,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("First registration - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result1),
|
|
ipc_reg_flag_1);
|
|
}
|
|
|
|
// Second registration of same buffer
|
|
int ipc_reg_flag_2 = 0;
|
|
void* ipc_reg_addr_2 = nullptr;
|
|
ncclResult_t result2 = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
4096,
|
|
peer_rank,
|
|
&ipc_reg_flag_2,
|
|
&ipc_reg_addr_2,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Second registration (same buffer) - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result2),
|
|
ipc_reg_flag_2);
|
|
}
|
|
|
|
// Validate both registrations - API should handle duplicate registration gracefully
|
|
ASSERT_EQ(ncclSuccess, result1)
|
|
<< "Rank " << config.world_rank << ": First registration should succeed";
|
|
// Second registration may succeed (idempotent) or return success
|
|
EXPECT_NE(result2, ncclInternalError)
|
|
<< "Rank " << config.world_rank << ": Second registration should not cause internal error";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_SelfPeerRank)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with self peer rank (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, 1024));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, 1024, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
1024,
|
|
config.world_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Self peer rank test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate self peer rank handling - should handle gracefully
|
|
// Self-registration might be allowed or rejected depending on use case
|
|
EXPECT_NE(result, ncclInternalError)
|
|
<< "Rank " << config.world_rank << ": Self peer rank should be handled gracefully";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_UnalignedBufferAddress)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with unaligned buffer address (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, 4096));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register the aligned buffer first
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, 4096, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
// Create unaligned pointer (offset by 1 byte)
|
|
void* unaligned_buffer = static_cast<char*>(buffer) + 1;
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
// Test with unaligned pointer (ncclRegFind should still find the registered buffer)
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
unaligned_buffer,
|
|
1024,
|
|
peer_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Unaligned buffer test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that ncclRegFind can locate the registered buffer even with unaligned pointer
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": Unaligned pointer should find registered buffer";
|
|
EXPECT_GE(ipc_reg_flag, 0) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be non-negative";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, P2pIpcBufferRegistration_NonPowerOfTwoSize)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Testing ncclRegisterP2pIpcBuffer with non-power-of-2 buffer size (%d processes)",
|
|
config.world_size);
|
|
}
|
|
|
|
auto* comm = reinterpret_cast<ncclComm*>(getActiveCommunicator());
|
|
const int peer_rank = (config.world_rank + 1) % config.world_size;
|
|
ncclIntruQueue<ncclCommCallback, &ncclCommCallback::next> cleanup_queue{};
|
|
|
|
void* buffer = nullptr;
|
|
const size_t odd_size = 12345;
|
|
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, odd_size));
|
|
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // GPU memory
|
|
|
|
// Pre-register buffer
|
|
void* reg_handle = nullptr;
|
|
ASSERT_EQ(ncclSuccess, ncclCommRegister(getActiveCommunicator(), buffer, odd_size, ®_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to pre-register buffer";
|
|
NcclRegHandleGuard regGuard(reg_handle, NcclRegHandleDeleter(getActiveCommunicator()));
|
|
|
|
int ipc_reg_flag = 0;
|
|
void* ipc_reg_addr = nullptr;
|
|
|
|
ncclResult_t result = ncclRegisterP2pIpcBuffer(comm,
|
|
buffer,
|
|
odd_size,
|
|
peer_rank,
|
|
&ipc_reg_flag,
|
|
&ipc_reg_addr,
|
|
&cleanup_queue);
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Non-power-of-2 size (12345 bytes) test - Result: %s (regFlag=%d)",
|
|
ncclGetErrorString(result),
|
|
ipc_reg_flag);
|
|
}
|
|
|
|
// Validate that non-power-of-2 sizes are supported
|
|
ASSERT_EQ(ncclSuccess, result)
|
|
<< "Rank " << config.world_rank << ": Non-power-of-2 size should be supported";
|
|
EXPECT_GE(ipc_reg_flag, 0) << "Rank " << config.world_rank
|
|
<< ": Registration flag should be non-negative";
|
|
|
|
if(reg_handle)
|
|
{
|
|
ASSERT_EQ(ncclSuccess, ncclCommDeregister(getActiveCommunicator(), reg_handle))
|
|
<< "Rank " << config.world_rank << ": Failed to deregister buffer";
|
|
}
|
|
}
|
|
|
|
TEST_F(P2pMPITest, IpcGraphRegisterBufferTest)
|
|
{
|
|
// Test validation and resource allocation
|
|
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI,
|
|
kNoProcessLimit,
|
|
kNoPowerOfTwoRequired,
|
|
1,
|
|
kRequireSingleNode))
|
|
<< "Test requirements not met - all ranks must meet requirements";
|
|
|
|
// Allocate P2P resources
|
|
setupP2PBuffers();
|
|
ASSERT_EQ(ncclSuccess, createTestCommunicator());
|
|
|
|
// TODO: Enable this test once IPC buffer registration feature works as
|
|
// expected
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Skipping P2P Send/Recv with IPC registration test");
|
|
TEST_INFO(
|
|
"This test will be enabled once IPC buffer registration feature works as expected");
|
|
}
|
|
GTEST_SKIP() << "Test disabled - enable once IPC buffer registration feature "
|
|
"works as expected";
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("Starting ncclIpcGraphRegisterBuffer test (%d processes)", config.world_size);
|
|
}
|
|
|
|
testIpcGraphRegisterBuffer();
|
|
|
|
if(config.world_rank == 0)
|
|
{
|
|
TEST_INFO("ncclIpcGraphRegisterBuffer test completed successfully");
|
|
}
|
|
}
|
|
|
|
#endif // MPI_TESTS_ENABLED
|
|
|