Files
Atul Kulkarni 142860442a Enable MPI support to execute MPI specific unit/functional tests (#1996)
* Added MPI support to execute unit/functional tests

Update node and process validation
Updated node detection count and modified validation method
Update validation logic to include max procs and nodes

* Address review comments

* Fix warnings

* Added a new NET transport test and clean up

* Added MPI test logging mechanism

* Decoupled GTest framework

* Added Net IB functional tests

* Updated with resource guards

* Added NET IB tests and refactored code

* Update P2pWorkflow test

* Update documentation

* Add MPI_TESTS_ENABLED guard to the file

* Fix Shm and NetIB tests

* Applied refactoring and cleanup

* Replaced BufferGuard with AutoGuard

* Modified test debug logging

* Use macro to reduce NcclTypeTraits code duplication

- Replace repetitive template specializations with a single
  DEFINE_NCCL_TYPE_TRAIT macro
- Use stringification operator (#) to auto-generate type name strings
- Add #undef to keep macro from polluting namespace
- Makes adding new type mappings trivial

* Unify buffer initialization with generic pattern function

- Remove initializeBufferWithCustomPattern
- Make initializeBufferWithPattern generic with PatternFunc template param
- Now single function handles all patterns via lambda injection
- Updated all test files to use lambdas for pattern generation
- Pattern logic now visible at call site (self-documenting)

* Unify buffer verification with pluggable pattern function

- Remove verifyBufferWithCustomCheck
- Make verifyBufferData generic with PatternFunc template param
- Single function handles all verification patterns via lambda injection
- Updated all test files to use lambdas
- Better defaults: num_samples=0 means verify all elements
- Pattern logic now visible at call site (self-documenting)

* Docs: Add DeviceBufferHelpers section to MPITestRunner.md

- Document new refactored buffer initialization/verification API
- Explain pluggable pattern functions with lambda examples
- Show type mapping and automatic float/int comparison
- Include migration guide from old API to new unified functions
- Demonstrate best practices with real-world examples
- Reference recent refactoring commits (macro-based type traits)

* Docs: Update documentation and examples

- Update on DeviceBufferHelpers
- Update examples using DeviceBufferHelpers methods, e.g. data verification

* Address review comment.

- Replace manual pattern generation loop with initializeBufferWithPattern call
- Use downloadBuffer to get host copy instead of manual hipMemcpy

* Remove non-existent dependency

* Remove duplicate testcase

* Code cleanup in test files

* Moved common constants to base class

[ROCm/rccl commit: 29e1567b95]
2025-12-06 16:05:37 -06:00

1307 líneas
46 KiB
C++

/*************************************************************************
* Copyright (c) 2025 Advanced Micro Devices, Inc. All rights reserved.
*
* See LICENSE.txt for license information
************************************************************************/
#include <gtest/gtest.h>
#include <hip/hip_runtime.h>
#include "MPITestBase.hpp"
#include "ResourceGuards.hpp"
#include "TestChecks.hpp"
#include "DeviceBufferHelpers.hpp"
#include "nccl.h"
#include "net.h"
#include <vector>
#include <memory>
#include <cstring>
#include <algorithm>
#ifdef MPI_TESTS_ENABLED
// Import helper namespaces
using namespace RCCLTestGuards;
using namespace RCCLTestHelpers;
// External NET IB plugin
extern ncclNet_t ncclNetIb;
// NET IB-specific resource deleters
struct NetMHandleDeleter {
ncclNet_t* net;
void* comm;
NetMHandleDeleter(ncclNet_t* n = nullptr, void* c = nullptr) : net(n), comm(c) {}
void operator()(void* mhandle) const {
if (mhandle && net && comm) {
int rank = MPIEnvironment::world_rank;
TEST_INFO("Rank %d: NetMHandleDeleter - Deregistering memory handle (mhandle=%p, comm=%p)",
rank, mhandle, comm);
ncclResult_t result = net->deregMr(comm, mhandle);
TEST_INFO("Rank %d: NetMHandleDeleter - deregMr result: %d", rank, result);
}
}
};
// NET IB connection guard
class NetConnectionGuard {
private:
ncclNet_t* net_;
void* sendComm_;
void* recvComm_;
void* listenComm_;
public:
explicit NetConnectionGuard(ncclNet_t* net)
: net_(net), sendComm_(nullptr), recvComm_(nullptr), listenComm_(nullptr) {}
~NetConnectionGuard() {
if (sendComm_ && net_) {
net_->closeSend(sendComm_);
}
if (recvComm_ && net_) {
net_->closeRecv(recvComm_);
}
if (listenComm_ && net_) {
net_->closeListen(listenComm_);
}
}
void setSendComm(void* comm) { sendComm_ = comm; }
void setRecvComm(void* comm) { recvComm_ = comm; }
void setListenComm(void* comm) { listenComm_ = comm; }
NetConnectionGuard(const NetConnectionGuard&) = delete;
NetConnectionGuard& operator=(const NetConnectionGuard&) = delete;
};
// Type alias for NetMHandleGuard using ResourceGuard
using NetMHandleGuard = RCCLTestGuards::ResourceGuard<void*, NetMHandleDeleter>;
// Test fixture for NET IB tests
class NetIbMPITest : public MPITestBase {
protected:
static constexpr int kMinProcessesForMPI = 2;
static constexpr bool kRequirePowerOfTwo = true;
static constexpr int kNoNodeLimit = MPITestConstants::kNoNodeLimit;
// Buffer pattern constants
static constexpr int kBytePatternModulo = 256;
// Timing constants
static constexpr int kDefaultTimeoutMs = 5000;
static constexpr int kLargeTransferTimeoutMs = 30000;
static constexpr int kPollIntervalUs = 10000; // 10ms
static constexpr int kPollIntervalMs = 10;
static constexpr int kMaxRetryAttempts = 1000; // For NULL request handling
// Buffer size constants
static constexpr size_t kSmallBufferSize = 4096;
static constexpr size_t kLargeBufferSize = 16 * 1024 * 1024; // 16 MB
// Test seed constants
static constexpr int kBaseSeedOffset = 1000;
static constexpr int kMultiSizeSeedOffset = 2000;
// Debug output constants
static constexpr int kNumDebugSamples = 4;
// Invalid device ID offset for negative tests
static constexpr int kInvalidDeviceOffset = 100;
// Process count constants
static constexpr int kExactTwoProcesses = 2;
static constexpr int kMinGpusPerNode = 1;
// Transfer test constants
static constexpr int kNumSequentialTransfers = 100;
static constexpr int kTransferTagBase = 300;
// Timeout constants
static constexpr int kLargeTransferTimeout = 30000;
ncclNet_t* net_;
int numDevices_;
std::vector<int> deviceIds_;
void SetUp() override {
MPITestBase::SetUp();
net_ = &ncclNetIb;
numDevices_ = 0;
}
void TearDown() override {
MPITestBase::TearDown();
}
// Helper: Initialize NET IB plugin
ncclResult_t InitNetIb() {
return net_->init(nullptr, nullptr);
}
// Helper: Get number of devices
ncclResult_t GetDeviceCount(int* ndev) {
return net_->devices(ndev);
}
// Helper: Get device properties
ncclResult_t GetDeviceProperties(int dev, ncclNetProperties_t* props) {
return net_->getProperties(dev, props);
}
// Helper: Create listen comm
ncclResult_t CreateListenComm(int dev, ncclNetHandle_t* handle, void** listenComm) {
return net_->listen(dev, handle, listenComm);
}
// Helper: Connect to remote
ncclResult_t ConnectToRemote(int dev, ncclNetHandle_t* handle, void** sendComm) {
return net_->connect(dev, nullptr, handle, sendComm, nullptr);
}
// Helper: Accept connection
ncclResult_t AcceptConnection(void* listenComm, void** recvComm) {
return net_->accept(listenComm, recvComm, nullptr);
}
// Helper: Register memory
ncclResult_t RegisterMemory(void* comm, void* data, size_t size, int type, void** mhandle) {
return net_->regMr(comm, data, size, type, mhandle);
}
// Helper: Register DMA-BUF memory
ncclResult_t RegisterDmaBufMemory(void* comm, void* data, size_t size, int type,
uint64_t offset, int fd, void** mhandle) {
return net_->regMrDmaBuf(comm, data, size, type, offset, fd, mhandle);
}
// Helper: Deregister memory
ncclResult_t DeregisterMemory(void* comm, void* mhandle) {
return net_->deregMr(comm, mhandle);
}
// Helper: Post send operation
ncclResult_t PostSend(void* sendComm, void* data, size_t size, int tag,
void* mhandle, void** request) {
return net_->isend(sendComm, data, size, tag, mhandle, nullptr, request);
}
// Helper: Post recv operation
ncclResult_t PostRecv(void* recvComm, int n, void** data, size_t* sizes,
int* tags, void** mhandles, void** request) {
return net_->irecv(recvComm, n, data, sizes, tags, mhandles, nullptr, request);
}
// Helper: Flush operation
ncclResult_t FlushRecv(void* recvComm, int n, void** data, int* sizes,
void** mhandles, void** request) {
return net_->iflush(recvComm, n, data, sizes, mhandles, request);
}
// Helper: Test request completion
// No implementation for this method in the NET IB plugin
ncclResult_t TestRequest(void* request, int* done, int* sizes) {
return net_->test(request, done, sizes);
}
// Helper: Close send comm
ncclResult_t CloseSendComm(void* sendComm) {
return net_->closeSend(sendComm);
}
// Helper: Close recv comm
ncclResult_t CloseRecvComm(void* recvComm) {
return net_->closeRecv(recvComm);
}
// Helper: Close listen comm
ncclResult_t CloseListenComm(void* listenComm) {
return net_->closeListen(listenComm);
}
// Helper: Make virtual device
ncclResult_t MakeVirtualDevice(int* dev, ncclNetVDeviceProps_t* props) {
return net_->makeVDevice(dev, props);
}
// Helper: Setup connection between two ranks
struct ConnectionPair {
void* sendComm = nullptr;
void* recvComm = nullptr;
void* listenComm = nullptr;
ncclNetHandle_t handle;
};
ncclResult_t SetupConnection(int dev, ConnectionPair& pair, int rank, int peerRank) {
if (rank == 0) {
// Rank 0: Listen
RCCL_TEST_CHECK(CreateListenComm(dev, &pair.handle, &pair.listenComm));
// Send handle to peer
MPI_Send(&pair.handle, sizeof(ncclNetHandle_t), MPI_BYTE, peerRank, 0, MPI_COMM_WORLD);
// Accept connection
int done = 0;
while (!done) {
ncclResult_t result = AcceptConnection(pair.listenComm, &pair.recvComm);
if (result == ncclSuccess && pair.recvComm != nullptr) {
done = 1;
}
}
} else {
// Rank 1: Connect
MPI_Recv(&pair.handle, sizeof(ncclNetHandle_t), MPI_BYTE, peerRank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
// Connect to peer
int done = 0;
while (!done) {
ncclResult_t result = ConnectToRemote(dev, &pair.handle, &pair.sendComm);
if (result == ncclSuccess && pair.sendComm != nullptr) {
done = 1;
}
}
}
MPI_Barrier(MPI_COMM_WORLD);
return ncclSuccess;
}
// Helper: Initialize device buffer with pattern using DeviceBufferHelpers
hipError_t InitializeBuffer(void* buffer, size_t size, int pattern) {
// Use template-based helper with custom pattern: (pattern + i) % kBytePatternModulo
return initializeBufferWithPattern<uint8_t>(
buffer, size,
[pattern](size_t i) { return static_cast<uint8_t>((pattern + i) % kBytePatternModulo); }
);
}
// Helper: Verify device buffer pattern using DeviceBufferHelpers
bool VerifyBuffer(void* buffer, size_t size, int pattern) {
// Use template-based helper with pattern verification
return verifyBufferData<uint8_t>(
buffer, size,
[pattern](size_t i) {
return static_cast<uint8_t>((pattern + i) % kBytePatternModulo);
}
);
}
// Helper: Wait for request completion with timeout
ncclResult_t WaitForCompletion(void* request, int* sizes, int timeoutMs = kDefaultTimeoutMs) {
int done = 0;
int attempts = 0;
const int maxAttempts = timeoutMs / kPollIntervalMs;
while (!done && attempts < maxAttempts) {
ncclResult_t result = TestRequest(request, &done, sizes);
if (result != ncclSuccess) {
return result;
}
if (done) {
break;
} else {
usleep(kPollIntervalUs); // 10ms
attempts++;
}
}
return done ? ncclSuccess : ncclInternalError;
}
};
// Initialization Tests
TEST_F(NetIbMPITest, InitializePlugin) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ncclResult_t result = InitNetIb();
EXPECT_EQ(result, ncclSuccess) << "Failed to initialize NET IB plugin";
}
TEST_F(NetIbMPITest, GetDeviceCount) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
EXPECT_EQ(GetDeviceCount(&ndev), ncclSuccess);
EXPECT_GT(ndev, 0) << "No IB devices found";
if (MPIEnvironment::world_rank == 0) {
TEST_INFO("Found %d IB device(s)", ndev);
}
}
// Device Properties Tests
TEST_F(NetIbMPITest, GetDeviceProperties) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
for (int i = 0; i < ndev; i++) {
ncclNetProperties_t props;
memset(&props, 0, sizeof(props));
EXPECT_EQ(GetDeviceProperties(i, &props), ncclSuccess)
<< "Failed to get properties for device " << i;
// Verify properties are valid
EXPECT_NE(props.name, nullptr) << "Device " << i << " has NULL name";
EXPECT_GT(props.speed, 0) << "Device " << i << " has invalid speed";
EXPECT_NE(props.pciPath, nullptr) << "Device " << i << " has NULL pciPath";
if (MPIEnvironment::world_rank == 0) {
TEST_INFO("Device %d: name=%s speed=%d pciPath=%s",
i, props.name, props.speed, props.pciPath);
}
}
}
TEST_F(NetIbMPITest, GetDevicePropertiesInvalidDevice) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ncclNetProperties_t props;
// Invalid device ID (too large)
ncclResult_t result = GetDeviceProperties(ndev + kInvalidDeviceOffset, &props);
EXPECT_NE(result, ncclSuccess) << "Should fail for invalid device ID";
}
// Connection Setup Tests
TEST_F(NetIbMPITest, ListenAndConnect) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
if (rank == 0) {
EXPECT_NE(pair.recvComm, nullptr) << "Recv comm should be established";
EXPECT_NE(pair.listenComm, nullptr) << "Listen comm should exist";
} else {
EXPECT_NE(pair.sendComm, nullptr) << "Send comm should be established";
}
}
TEST_F(NetIbMPITest, ConnectWithInvalidHandle) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ncclNetHandle_t invalidHandle;
memset(&invalidHandle, 0xFF, sizeof(invalidHandle));
void* sendComm = nullptr;
// Negative test: Connect with garbage handle
ncclResult_t result = ConnectToRemote(0, &invalidHandle, &sendComm);
EXPECT_EQ(result, ncclInternalError) << "Should fail with invalid handle";
}
// Memory Registration Tests
TEST_F(NetIbMPITest, RegisterHostMemory) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kSmallBufferSize;
void* buffer = malloc(bufferSize);
ASSERT_NE(buffer, nullptr);
auto bufferGuard = makeHostBufferAutoGuard(buffer);
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
EXPECT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_HOST, &mhandle), ncclSuccess);
EXPECT_NE(mhandle, nullptr);
// Use NetMHandleGuard for automatic deregistration before connection closes
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
}
TEST_F(NetIbMPITest, RegisterGpuMemory) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kSmallBufferSize;
void* buffer = nullptr;
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, bufferSize));
auto bufferGuard = makeDeviceBufferAutoGuard(buffer);
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
EXPECT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_CUDA, &mhandle), ncclSuccess);
EXPECT_NE(mhandle, nullptr);
// Use NetMHandleGuard for automatic deregistration before connection closes
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
}
TEST_F(NetIbMPITest, RegisterMemoryNullPointer) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
// Negative test: NULL buffer pointer
ncclResult_t result = RegisterMemory(comm, nullptr, 4096, NCCL_PTR_HOST, &mhandle);
EXPECT_NE(result, ncclSuccess) << "Should fail with NULL buffer";
}
TEST_F(NetIbMPITest, DeregisterNullHandle) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
// Edge case: Deregister NULL handle (should be no-op)
EXPECT_EQ(DeregisterMemory(comm, nullptr), ncclSuccess);
}
// Send/Recv Tests
TEST_F(NetIbMPITest, SimpleSendRecv) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kSmallBufferSize;
const int tag = 42;
void* buffer = malloc(bufferSize);
ASSERT_NE(buffer, nullptr);
auto bufferGuard = makeHostBufferAutoGuard(buffer);
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
ASSERT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_HOST, &mhandle), ncclSuccess);
// Use NetMHandleGuard for automatic cleanup on failure (exception safety)
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
void* request = nullptr;
if (rank == 0) {
// Receiver
void* recvBuffers[1] = {buffer};
size_t recvSizes[1] = {bufferSize};
int recvTags[1] = {tag};
void* recvHandles[1] = {mhandle};
ASSERT_EQ(PostRecv(pair.recvComm, 1, recvBuffers, recvSizes, recvTags,
recvHandles, &request), ncclSuccess);
} else {
// Sender - initialize host buffer directly
uint8_t* hostBuffer = static_cast<uint8_t*>(buffer);
for (size_t i = 0; i < bufferSize; i++) {
hostBuffer[i] = static_cast<uint8_t>((rank + i) % kBytePatternModulo);
}
ASSERT_EQ(PostSend(pair.sendComm, buffer, bufferSize, tag, mhandle, &request), ncclSuccess);
}
MPI_Barrier(MPI_COMM_WORLD);
// Wait for completion
int sizes[1] = {0};
ASSERT_EQ(WaitForCompletion(request, sizes), ncclSuccess);
if (rank == 0) {
EXPECT_EQ(sizes[0], bufferSize) << "Received size mismatch";
// Verify received data
uint8_t* hostBuffer = static_cast<uint8_t*>(buffer);
bool dataValid = true;
int senderRank = 1; // Data was sent by rank 1
for (size_t i = 0; i < bufferSize && dataValid; i++) {
uint8_t expected = static_cast<uint8_t>((senderRank + i) % kBytePatternModulo);
if (hostBuffer[i] != expected) {
dataValid = false;
}
}
EXPECT_TRUE(dataValid) << "Data validation failed";
}
// NetMHandleGuard will automatically deregister memory when test scope ends
// Destructor order ensures MR is deregistered before connection closes:
// 1. mhandleGuard destructor (deregisters MR)
// 2. bufferGuard destructor (frees buffer)
// 3. connGuard destructor (closes connection)
}
TEST_F(NetIbMPITest, SendRecvMultipleSizes) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
int rank = MPIEnvironment::world_rank;
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
// Test various sizes
std::vector<size_t> testSizes = {1, 64, 256, 1024, 4096, 16384, 65536};
for (size_t size : testSizes) {
const int tag = 100;
const int seed = 2000 + static_cast<int>(size); // Unique seed per size
void* buffer = malloc(size);
ASSERT_NE(buffer, nullptr);
auto bufferGuard = makeHostBufferAutoGuard(buffer); // Local guard for loop iteration
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
ASSERT_EQ(RegisterMemory(comm, buffer, size, NCCL_PTR_HOST, &mhandle), ncclSuccess);
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
void* request = nullptr;
if (rank == 0) {
memset(buffer, 0, size);
void* recvBuffers[1] = {buffer};
size_t recvSizes[1] = {size};
int recvTags[1] = {tag};
void* recvHandles[1] = {mhandle};
ASSERT_EQ(PostRecv(pair.recvComm, 1, recvBuffers, recvSizes, recvTags,
recvHandles, &request), ncclSuccess);
ASSERT_NE(request, nullptr) << "Recv request should never be NULL";
} else {
// Initialize host buffer directly (not using InitializeBuffer which expects device memory)
uint8_t* hostBuffer = static_cast<uint8_t*>(buffer);
for (size_t i = 0; i < size; i++) {
hostBuffer[i] = static_cast<uint8_t>((seed + i) % kBytePatternModulo);
}
// NET IB isend can return success with NULL request if FIFO isn't ready
// This means the receiver hasn't posted recv yet - retry until ready
int attempts = 0;
do {
ncclResult_t result = PostSend(pair.sendComm, buffer, size, tag, mhandle, &request);
ASSERT_EQ(result, ncclSuccess);
if (request != nullptr) {
break;
}
// NULL request means "not ready yet", wait and retry
if (++attempts >= kMaxRetryAttempts) {
FAIL() << "PostSend returned NULL request after " << kMaxRetryAttempts << " attempts";
}
usleep(kPollIntervalUs);
} while (request == nullptr);
}
// Barrier 1: Ensure both ranks have posted their operations before waiting
MPI_Barrier(MPI_COMM_WORLD);
// Wait for completion
int sizes[1] = {0};
ASSERT_EQ(WaitForCompletion(request, sizes), ncclSuccess);
// Barrier 2: CRITICAL - Ensure BOTH ranks have completed before EITHER continues
// This prevents rank A from starting next transfer while rank B is still
// completing current transfer, which would cause request object reuse race conditions
MPI_Barrier(MPI_COMM_WORLD);
if (rank == 0) {
EXPECT_EQ(sizes[0], size) << "Size mismatch for transfer of " << size << " bytes";
// Validate received data matches expected pattern (host buffer verification)
uint8_t* hostBuffer = static_cast<uint8_t*>(buffer);
bool dataValid = true;
for (size_t j = 0; j < size && dataValid; j++) {
uint8_t expected = static_cast<uint8_t>((seed + j) % kBytePatternModulo);
if (hostBuffer[j] != expected) {
dataValid = false;
}
}
EXPECT_TRUE(dataValid) << "Data validation failed for size " << size;
}
// NetMHandleGuard will automatically deregister at end of loop iteration
}
}
TEST_F(NetIbMPITest, SendRecvZeroSize) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kSmallBufferSize;
const int tag = 50;
void* buffer = malloc(bufferSize);
ASSERT_NE(buffer, nullptr);
auto bufferGuard = makeHostBufferAutoGuard(buffer);
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
ASSERT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_HOST, &mhandle), ncclSuccess);
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
void* request = nullptr;
if (rank == 0) {
// Receiver - expect zero bytes
void* recvBuffers[1] = {buffer};
size_t recvSizes[1] = {bufferSize};
int recvTags[1] = {tag};
void* recvHandles[1] = {mhandle};
ASSERT_EQ(PostRecv(pair.recvComm, 1, recvBuffers, recvSizes, recvTags,
recvHandles, &request), ncclSuccess);
} else {
// Sender - send zero bytes
ASSERT_EQ(PostSend(pair.sendComm, buffer, 0, tag, mhandle, &request), ncclSuccess);
}
MPI_Barrier(MPI_COMM_WORLD);
// Wait for completion
int sizes[1] = {-1};
ASSERT_EQ(WaitForCompletion(request, sizes), ncclSuccess);
if (rank == 0) {
EXPECT_EQ(sizes[0], 0) << "Should receive zero bytes";
}
}
// Flush Tests
TEST_F(NetIbMPITest, FlushAfterRecv) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
// Check if GDR is available
ncclNetProperties_t props;
ASSERT_EQ(GetDeviceProperties(0, &props), ncclSuccess);
if (!(props.ptrSupport & NCCL_PTR_CUDA)) {
GTEST_SKIP() << "GDR not supported, skipping flush test";
}
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kSmallBufferSize;
const int tag = 200;
void* buffer = nullptr;
HIP_TEST_CHECK_GTEST_FAIL(hipMalloc(&buffer, bufferSize));
auto bufferGuard = makeDeviceBufferAutoGuard(buffer); // false = device memory
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
ASSERT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_CUDA, &mhandle), ncclSuccess);
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
void* request = nullptr;
if (rank == 0) {
// Receiver
void* recvBuffers[1] = {buffer};
size_t recvSizes[1] = {bufferSize};
int recvTags[1] = {tag};
void* recvHandles[1] = {mhandle};
ASSERT_EQ(PostRecv(pair.recvComm, 1, recvBuffers, recvSizes, recvTags,
recvHandles, &request), ncclSuccess);
} else {
// Sender
void* hostBuffer = malloc(bufferSize);
ASSERT_NE(hostBuffer, nullptr);
auto hostBufferGuard = makeHostBufferAutoGuard(hostBuffer);
// Initialize host buffer directly (not using InitializeBuffer which expects device memory)
uint8_t* hostBuf = static_cast<uint8_t*>(hostBuffer);
for (size_t i = 0; i < bufferSize; i++) {
hostBuf[i] = static_cast<uint8_t>((rank + i) % kBytePatternModulo);
}
HIP_TEST_CHECK_GTEST_FAIL(hipMemcpy(buffer, hostBuffer, bufferSize, hipMemcpyHostToDevice));
ASSERT_EQ(PostSend(pair.sendComm, buffer, bufferSize, tag, mhandle, &request), ncclSuccess);
}
MPI_Barrier(MPI_COMM_WORLD);
// Wait for completion
int sizes[1] = {0};
ASSERT_EQ(WaitForCompletion(request, sizes), ncclSuccess);
if (rank == 0) {
// Issue flush
void* flushBuffers[1] = {buffer};
int flushSizes[1] = {static_cast<int>(bufferSize)};
void* flushHandles[1] = {mhandle};
void* flushRequest = nullptr;
ncclResult_t result = FlushRecv(pair.recvComm, 1, flushBuffers, flushSizes,
flushHandles, &flushRequest);
if (result == ncclSuccess && flushRequest != nullptr) {
int flushDone = 0;
ASSERT_EQ(WaitForCompletion(flushRequest, nullptr), ncclSuccess);
}
}
// NetMHandleGuard will automatically deregister at scope end
}
// Virtual Device Tests
TEST_F(NetIbMPITest, MakeVirtualDevice) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
if (ndev < 2) {
GTEST_SKIP() << "Need at least 2 devices for virtual device test";
}
ncclNetVDeviceProps_t vProps;
vProps.ndevs = 2;
vProps.devs[0] = 0;
vProps.devs[1] = 1;
int vdev = -1;
ncclResult_t result = MakeVirtualDevice(&vdev, &vProps);
// Virtual device creation may or may not be supported
if (result == ncclSuccess) {
EXPECT_GE(vdev, 0) << "Virtual device ID should be non-negative";
if (MPIEnvironment::world_rank == 0) {
TEST_INFO("Created virtual device %d from physical devices 0 and 1", vdev);
}
}
}
TEST_F(NetIbMPITest, MakeVirtualDeviceInvalidProps) {
ASSERT_TRUE(validateTestPrerequisites(kMinProcessesForMPI, MPITestConstants::kNoProcessLimit,
kRequirePowerOfTwo, 1, kNoNodeLimit))
<< "Test requirements not met";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
// Negative test: Zero devices
ncclNetVDeviceProps_t vProps;
vProps.ndevs = 0;
int vdev = -1;
ncclResult_t result = MakeVirtualDevice(&vdev, &vProps);
EXPECT_EQ(result, ncclInvalidUsage) << "Should fail with zero devices";
}
// Stress and Edge Case Tests
// Tests multiple sequential transfers on the same connection using host memory.
// This test validates that request objects can be properly reused across multiple
// send/recv operations without resource exhaustion or state corruption.
//
// SYNCHRONIZATION STRATEGY:
// Two barriers per iteration ensure proper ordering:
// 1. After Post: Ensures both ranks have posted before either waits
// 2. After Completion: CRITICAL - Ensures BOTH ranks complete before EITHER
// starts next iteration. Without this, rapid request reuse causes races.
//
// NULL REQUEST HANDLING:
// NET IB isend() can return ncclSuccess with NULL request when the FIFO
// isn't ready yet (receiver's irecv RDMA write hasn't reached sender yet).
// This is NOT an error - it means "try again". The sender must retry until
// it gets a valid request pointer. This is normal NET IB protocol behavior.
//
// NOTE: Flush (iflush) is intentionally NOT called because:
// 1. Flush is only needed for GPU Direct RDMA to ensure data visibility
// 2. For NCCL_PTR_HOST transfers, flush is unnecessary
TEST_F(NetIbMPITest, MultipleSequentialTransfers) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
int rank = MPIEnvironment::world_rank;
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kSmallBufferSize;
const int numTransfers = kNumSequentialTransfers;
void* sendBuffer = nullptr;
void* recvBuffer = nullptr;
HostBufferAutoGuard sendBufferGuard(nullptr);
HostBufferAutoGuard recvBufferGuard(nullptr);
if (rank == 0) {
recvBuffer = malloc(bufferSize);
ASSERT_NE(recvBuffer, nullptr);
recvBufferGuard = makeHostBufferAutoGuard(recvBuffer);
} else {
sendBuffer = malloc(bufferSize);
ASSERT_NE(sendBuffer, nullptr);
sendBufferGuard = makeHostBufferAutoGuard(sendBuffer);
}
void* mhandle = nullptr;
void* buffer = (rank == 0) ? recvBuffer : sendBuffer;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
ASSERT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_HOST, &mhandle), ncclSuccess);
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
for (int i = 0; i < numTransfers; i++) {
const int tag = kTransferTagBase + i;
const int seed = kBaseSeedOffset + i; // Unique seed for each transfer
void* request = nullptr;
if (rank == 0) {
memset(recvBuffer, 0, bufferSize);
void* recvBuffers[1] = {recvBuffer};
size_t recvSizes[1] = {bufferSize};
int recvTags[1] = {tag};
void* recvHandles[1] = {mhandle};
ASSERT_EQ(PostRecv(pair.recvComm, 1, recvBuffers, recvSizes, recvTags,
recvHandles, &request), ncclSuccess);
ASSERT_NE(request, nullptr) << "Recv request should never be NULL";
} else {
// Initialize host buffer directly (not using InitializeBuffer which expects device memory)
uint8_t* hostBuffer = static_cast<uint8_t*>(sendBuffer);
for (size_t j = 0; j < bufferSize; j++) {
hostBuffer[j] = static_cast<uint8_t>((seed + j) % kBytePatternModulo);
}
// NET IB isend can return success with NULL request if FIFO isn't ready
// This means the receiver hasn't posted recv yet - retry until ready
int attempts = 0;
do {
ncclResult_t result = PostSend(pair.sendComm, sendBuffer, bufferSize, tag, mhandle, &request);
ASSERT_EQ(result, ncclSuccess);
if (request != nullptr) {
break;
}
// NULL request means "not ready yet", wait and retry
if (++attempts >= kMaxRetryAttempts) {
FAIL() << "PostSend returned NULL request after " << kMaxRetryAttempts << " attempts";
}
usleep(kPollIntervalUs);
} while (request == nullptr);
}
// Barrier 1: Ensure both ranks have posted their operations before waiting
MPI_Barrier(MPI_COMM_WORLD);
// Wait for completion
int sizes[1] = {0};
ASSERT_EQ(WaitForCompletion(request, sizes), ncclSuccess);
// Barrier 2: CRITICAL - Ensure BOTH ranks have completed before EITHER continues
// This prevents rank A from starting transfer N+1 while rank B is still
// completing transfer N, which would cause request object reuse race conditions
MPI_Barrier(MPI_COMM_WORLD);
if (rank == 0) {
EXPECT_EQ(sizes[0], bufferSize) << "Transfer " << i << " size mismatch";
// Validate received data matches expected pattern (host buffer verification)
uint8_t* hostBuffer = static_cast<uint8_t*>(recvBuffer);
bool dataValid = true;
for (size_t j = 0; j < bufferSize && dataValid; j++) {
uint8_t expected = static_cast<uint8_t>((seed + j) % kBytePatternModulo);
if (hostBuffer[j] != expected) {
dataValid = false;
}
}
EXPECT_TRUE(dataValid) << "Transfer " << i << " data validation failed (seed=" << seed << ")";
if (!dataValid) {
// Print first few mismatched values for debugging
TEST_WARN("Rank %d: Transfer %d data mismatch. First %d values:", rank, i, kNumDebugSamples);
for (size_t j = 0; j < kNumDebugSamples && j < bufferSize; j++) {
uint8_t expected = static_cast<uint8_t>((seed + j) % kBytePatternModulo);
TEST_WARN(" [%zu] expected=%u, got=%u %s",
j, expected, hostBuffer[j],
(hostBuffer[j] == expected) ? "PASS" : "FAIL");
}
}
// NOTE: Flush is NOT called for host memory transfers
// Flush (iflush) is only needed for GPU Direct RDMA to ensure data visibility on GPU.
// For NCCL_PTR_HOST transfers, flush is unnecessary and calling it can cause
// race conditions when request objects are rapidly reused.
// The NET IB implementation will no-op the flush call for host memory anyway.
}
}
// NetMHandleGuard will automatically deregister at scope end
}
TEST_F(NetIbMPITest, LargeTransfer) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
const size_t bufferSize = kLargeBufferSize; // 16 MB
const int tag = 400;
void* buffer = malloc(bufferSize);
ASSERT_NE(buffer, nullptr);
auto bufferGuard = makeHostBufferAutoGuard(buffer);
void* mhandle = nullptr;
void* comm = (rank == 0) ? pair.recvComm : pair.sendComm;
ASSERT_EQ(RegisterMemory(comm, buffer, bufferSize, NCCL_PTR_HOST, &mhandle), ncclSuccess);
NetMHandleGuard mhandleGuard(mhandle, NetMHandleDeleter(net_, comm));
void* request = nullptr;
if (rank == 0) {
// Receiver
void* recvBuffers[1] = {buffer};
size_t recvSizes[1] = {bufferSize};
int recvTags[1] = {tag};
void* recvHandles[1] = {mhandle};
ASSERT_EQ(PostRecv(pair.recvComm, 1, recvBuffers, recvSizes, recvTags,
recvHandles, &request), ncclSuccess);
} else {
// Sender - Initialize host buffer directly (not using InitializeBuffer which expects device memory)
uint8_t* hostBuffer = static_cast<uint8_t*>(buffer);
for (size_t i = 0; i < bufferSize; i++) {
hostBuffer[i] = static_cast<uint8_t>((rank + i) % kBytePatternModulo);
}
ASSERT_EQ(PostSend(pair.sendComm, buffer, bufferSize, tag, mhandle, &request), ncclSuccess);
}
MPI_Barrier(MPI_COMM_WORLD);
// Wait for completion with longer timeout for large transfer
int sizes[1] = {0};
ASSERT_EQ(WaitForCompletion(request, sizes, kLargeTransferTimeout), ncclSuccess);
if (rank == 0) {
EXPECT_EQ(sizes[0], bufferSize) << "Large transfer size mismatch";
// Verify received data
uint8_t* hostBuffer = static_cast<uint8_t*>(buffer);
bool dataValid = true;
int senderRank = 1; // Data was sent by rank 1
size_t errorsFound = 0;
const size_t maxErrorsToReport = 10;
for (size_t i = 0; i < bufferSize && errorsFound < maxErrorsToReport; i++) {
uint8_t expected = static_cast<uint8_t>((senderRank + i) % kBytePatternModulo);
if (hostBuffer[i] != expected) {
if (errorsFound == 0) {
TEST_WARN("Rank %d: Data validation errors found in large transfer:", rank);
}
TEST_WARN(" Index %zu: expected=%u, got=%u", i, expected, hostBuffer[i]);
dataValid = false;
errorsFound++;
}
}
if (!dataValid && errorsFound >= maxErrorsToReport) {
TEST_WARN(" ... (showing first %zu errors only)", maxErrorsToReport);
}
EXPECT_TRUE(dataValid) << "Large transfer data validation failed";
}
// NetMHandleGuard will automatically deregister at scope end
}
TEST_F(NetIbMPITest, CloseWithoutWaitingForCompletion) {
ASSERT_TRUE(validateTestPrerequisites(kExactTwoProcesses, kExactTwoProcesses,
false, kMinGpusPerNode, kNoNodeLimit))
<< "Test requires exactly " << kExactTwoProcesses << " processes";
ASSERT_EQ(InitNetIb(), ncclSuccess);
int ndev = 0;
ASSERT_EQ(GetDeviceCount(&ndev), ncclSuccess);
ASSERT_GT(ndev, 0);
ConnectionPair pair;
int rank = MPIEnvironment::world_rank;
int peerRank = (rank + 1) % 2;
ASSERT_EQ(SetupConnection(0, pair, rank, peerRank), ncclSuccess);
// Guard connections for automatic cleanup
NetConnectionGuard connGuard(net_);
if (rank == 0) {
connGuard.setRecvComm(pair.recvComm);
connGuard.setListenComm(pair.listenComm);
} else {
connGuard.setSendComm(pair.sendComm);
}
}
#endif // MPI_TESTS_ENABLED