Added new tests for rccl_wrap - rcclSetPipelining (#1890)

* Added tests for rcclSetPipelining

* Added conditions to skip the test

* Updated message size
Этот коммит содержится в:
Kapil S. Pawar
2025-09-05 09:29:11 -05:00
коммит произвёл GitHub
родитель 6e45eaf75e
Коммит f418a4c6d0
+238 -3
Просмотреть файл
@@ -167,6 +167,16 @@ static void CleanupMockComm(ncclComm_t &mockComm) {
}
}
// Helper function to determine if rcclSetPipelining test should be skipped
static bool ShouldSkipRcclSetPipeliningTests() {
const char *disable = getenv("RCCL_DISABLE_REDUCE_COPY_PIPELINING");
// Skip the test if RCCL_DISABLE_REDUCE_COPY_PIPELINING is set
if (disable && strcmp(disable, "0") != 0) {
return true;
}
return false;
}
TEST(Rcclwrap, RcclFuncMaxSendRecvCount) {
ncclResult_t staticCheckResult = testStaticExposeCheck();
#ifdef RCCL_EXPOSE_STATIC
@@ -277,7 +287,7 @@ TEST(Rcclwrap,
comm->nNodes = 2; // triggers inter-node logic
comm->rank = 0;
comm->topo = new ncclTopoSystem(); //(struct ncclTopoSystem*)calloc(1,
//sizeof(struct ncclTopoSystem));
// sizeof(struct ncclTopoSystem));
*comm->topo = {};
comm->topo->ll128Enabled = true;
comm->topo->nodes[GPU].nodes[0] = {};
@@ -308,7 +318,7 @@ TEST(Rcclwrap, RcclUpdateCollectiveProtocol_SimpleFallbackWhenNoRanges) {
comm->nNodes = 2; // triggers inter-node logic
comm->rank = 0;
comm->topo = new ncclTopoSystem(); //(struct ncclTopoSystem*)calloc(1,
//sizeof(struct ncclTopoSystem));
// sizeof(struct ncclTopoSystem));
*comm->topo = {};
comm->topo->ll128Enabled = true;
comm->topo->nodes[GPU].nodes[0] = {};
@@ -339,7 +349,8 @@ TEST(Rcclwrap, validHsaScratchEnvSettingTest) {
EXPECT_TRUE(validHsaScratchEnvSetting("1", 0, 0, "gfx942"));
// When HSA_NO_SCRATCH_RECLAIM is not set, looking at hip version and firmware version
// When HSA_NO_SCRATCH_RECLAIM is not set, looking at hip version and firmware
// version
EXPECT_TRUE(validHsaScratchEnvSetting(nullptr, 60443484, 24, "gfx950"));
EXPECT_FALSE(validHsaScratchEnvSetting(nullptr, 60443483, 24, "gfx950"));
@@ -1711,4 +1722,228 @@ TEST(Rcclwrap, PXN_ZeroRanks_GFX950) {
CleanupMockComm(mockComm);
}
TEST(Rcclwrap, RcclSetPipelining_Invalid_DType) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// Skip the test if pipelining has been enabled for all data types
// (RCCL_PIPELINE_ALL_DATA_TYPES=1)
const char *allowAllDTypes = getenv("RCCL_PIPELINE_ALL_DATA_TYPES");
if (allowAllDTypes && strcmp(allowAllDTypes, "0") != 0) {
GTEST_SKIP() << "Skipping test: RCCL_PIPELINE_ALL_DATA_TYPES environment "
"variable is set. Unset this variable to enable pipelining "
"only for bf16 data type.";
}
// Pipeline should not be set for non-bf16 datatypes, unless
// rcclParamPipelineAllDTypes() returns true
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx950", 8);
comm->nNodes = 2; // Multi node
ncclTaskColl info = {};
info.func = ncclFuncAllReduce;
info.datatype = ncclFloat32;
size_t nBytes = 16 * 1024 * 1024; // 16MB
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 0) << "Non-bf16 should not set pipeline by default";
CleanupMockComm(comm);
}
TEST(Rcclwrap, RcclSetPipelining_GFX950_MultiNode_Enable) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// For multi-node, pipeline is set to 1 for AllReduce with bf16
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx950", 8);
comm->nNodes = 2; // Multi node
ncclTaskColl info = {};
// In rcclSetPipelining(), ncclFuncAllReduce, ncclFuncReduceScatter, and
// ncclFuncReduce share the same case body. Testing any one of them is
// sufficient to validate that code path.
info.func = ncclFuncAllReduce;
info.datatype = ncclBfloat16;
size_t nBytes = 16 * 1024 * 1024; // 16MB
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 1)
<< "gfx950 multi-node AllReduce bf16 should enable pipelining";
CleanupMockComm(comm);
}
TEST(Rcclwrap, RcclSetPipelining_GFX950_SingleNode_Disable) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// For single-node, pipeline remains 0
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx950", 8);
comm->nNodes = 1; // Single node
ncclTaskColl info = {};
// In rcclSetPipelining(), ncclFuncAllReduce, ncclFuncReduceScatter, and
// ncclFuncReduce share the same case body. Testing any one of them is
// sufficient to validate that code path.
info.func = ncclFuncAllReduce;
info.datatype = ncclBfloat16;
size_t nBytes = 16 * 1024 * 1024; // 16MB
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 0)
<< "gfx950 single-node should not enable pipelining";
CleanupMockComm(comm);
}
TEST(Rcclwrap, RcclSetPipelining_GFX942_SingleNode_AllReduce_Enable) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// For single-node, pipeline is set to 1 for AllReduce with bf16
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx942", 8);
comm->nNodes = 1; // Single node
ncclTaskColl info = {};
info.func = ncclFuncAllReduce;
info.datatype = ncclBfloat16;
size_t nBytes = 16 * 1024 * 1024; // 16MB
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 1)
<< "gfx942 single-node AllReduce bf16 should enable pipelining";
CleanupMockComm(comm);
}
TEST(Rcclwrap, RcclSetPipelining_GFX942_MultiNode_AllReduce_Enable) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// For multi-node AllReduce with bf16, pipelining is enabled if
// nBytes <= 512MB * 2^(log2(nNodes)-1)
// Testing with nNodes = 4 => threshold = 512MB * 2^(2-1) = 1GB
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx942", 8);
comm->nNodes = 4;
ncclTaskColl info = {};
info.func = ncclFuncAllReduce;
info.datatype = ncclBfloat16;
size_t nBytes = (1ULL << 30); // 1GB, exactly at threshold
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 1)
<< "gfx942 4-node AllReduce at threshold should enable pipelining";
CleanupMockComm(comm);
}
TEST(Rcclwrap, RcclSetPipelining_GFX942_MultiNode_AllReduce_Disable) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// When nBytes is just above the threshold, pipelining should be disabled
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx942", 8);
comm->nNodes = 4;
ncclTaskColl info = {};
info.func = ncclFuncAllReduce;
info.datatype = ncclBfloat16;
size_t nBytes = (1ULL << 30) + 1024; // 1GB + 1KB, just above threshold
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 0)
<< "gfx942 4-node AllReduce above threshold should disable pipelining";
CleanupMockComm(comm);
}
TEST(Rcclwrap, RcclSetPipelining_GFX942_Enable) {
// Skip the test if pipelining has been disabled
// (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1)
if (ShouldSkipRcclSetPipeliningTests()) {
GTEST_SKIP()
<< "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment "
"variable is set. Unset this variable to enable pipelining.";
}
// ReduceScatter & Reduce should enable pipelining regardless of no. of nodes
ncclComm_t comm = nullptr;
struct ncclTopoSystem topo;
struct ncclTopoNode gpu;
CreateMockComm(comm, topo, gpu, "gfx942", 8);
comm->nNodes = 8;
ncclTaskColl info = {};
// In rcclSetPipelining(), ncclFuncReduceScatter, and
// ncclFuncReduce share the same case body. Testing any one of them is
// sufficient to validate that code path.
info.func = ncclFuncReduceScatter;
info.datatype = ncclBfloat16;
size_t nBytes = 16 * 1024 * 1024; // 16MB
rcclSetPipelining(comm, nBytes, &info);
EXPECT_EQ(info.pipeline, 1)
<< "gfx942 ReduceScatter and Reduce should enable "
"pipelining with single or multi-node";
CleanupMockComm(comm);
}
} // namespace RcclUnitTesting