From f418a4c6d04e6ff496dcedee806865927e5c0e99 Mon Sep 17 00:00:00 2001 From: "Kapil S. Pawar" Date: Fri, 5 Sep 2025 09:29:11 -0500 Subject: [PATCH] Added new tests for rccl_wrap - rcclSetPipelining (#1890) * Added tests for rcclSetPipelining * Added conditions to skip the test * Updated message size --- test/RcclWrapTests.cpp | 241 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 238 insertions(+), 3 deletions(-) diff --git a/test/RcclWrapTests.cpp b/test/RcclWrapTests.cpp index 31780c9b07..a97efc2f6a 100644 --- a/test/RcclWrapTests.cpp +++ b/test/RcclWrapTests.cpp @@ -167,6 +167,16 @@ static void CleanupMockComm(ncclComm_t &mockComm) { } } +// Helper function to determine if rcclSetPipelining test should be skipped +static bool ShouldSkipRcclSetPipeliningTests() { + const char *disable = getenv("RCCL_DISABLE_REDUCE_COPY_PIPELINING"); + // Skip the test if RCCL_DISABLE_REDUCE_COPY_PIPELINING is set + if (disable && strcmp(disable, "0") != 0) { + return true; + } + return false; +} + TEST(Rcclwrap, RcclFuncMaxSendRecvCount) { ncclResult_t staticCheckResult = testStaticExposeCheck(); #ifdef RCCL_EXPOSE_STATIC @@ -277,7 +287,7 @@ TEST(Rcclwrap, comm->nNodes = 2; // triggers inter-node logic comm->rank = 0; comm->topo = new ncclTopoSystem(); //(struct ncclTopoSystem*)calloc(1, - //sizeof(struct ncclTopoSystem)); + // sizeof(struct ncclTopoSystem)); *comm->topo = {}; comm->topo->ll128Enabled = true; comm->topo->nodes[GPU].nodes[0] = {}; @@ -308,7 +318,7 @@ TEST(Rcclwrap, RcclUpdateCollectiveProtocol_SimpleFallbackWhenNoRanges) { comm->nNodes = 2; // triggers inter-node logic comm->rank = 0; comm->topo = new ncclTopoSystem(); //(struct ncclTopoSystem*)calloc(1, - //sizeof(struct ncclTopoSystem)); + // sizeof(struct ncclTopoSystem)); *comm->topo = {}; comm->topo->ll128Enabled = true; comm->topo->nodes[GPU].nodes[0] = {}; @@ -339,7 +349,8 @@ TEST(Rcclwrap, validHsaScratchEnvSettingTest) { EXPECT_TRUE(validHsaScratchEnvSetting("1", 0, 0, "gfx942")); - // When HSA_NO_SCRATCH_RECLAIM is not set, looking at hip version and firmware version + // When HSA_NO_SCRATCH_RECLAIM is not set, looking at hip version and firmware + // version EXPECT_TRUE(validHsaScratchEnvSetting(nullptr, 60443484, 24, "gfx950")); EXPECT_FALSE(validHsaScratchEnvSetting(nullptr, 60443483, 24, "gfx950")); @@ -1711,4 +1722,228 @@ TEST(Rcclwrap, PXN_ZeroRanks_GFX950) { CleanupMockComm(mockComm); } +TEST(Rcclwrap, RcclSetPipelining_Invalid_DType) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // Skip the test if pipelining has been enabled for all data types + // (RCCL_PIPELINE_ALL_DATA_TYPES=1) + const char *allowAllDTypes = getenv("RCCL_PIPELINE_ALL_DATA_TYPES"); + if (allowAllDTypes && strcmp(allowAllDTypes, "0") != 0) { + GTEST_SKIP() << "Skipping test: RCCL_PIPELINE_ALL_DATA_TYPES environment " + "variable is set. Unset this variable to enable pipelining " + "only for bf16 data type."; + } + + // Pipeline should not be set for non-bf16 datatypes, unless + // rcclParamPipelineAllDTypes() returns true + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx950", 8); + comm->nNodes = 2; // Multi node + + ncclTaskColl info = {}; + info.func = ncclFuncAllReduce; + info.datatype = ncclFloat32; + + size_t nBytes = 16 * 1024 * 1024; // 16MB + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 0) << "Non-bf16 should not set pipeline by default"; + + CleanupMockComm(comm); +} + +TEST(Rcclwrap, RcclSetPipelining_GFX950_MultiNode_Enable) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // For multi-node, pipeline is set to 1 for AllReduce with bf16 + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx950", 8); + comm->nNodes = 2; // Multi node + + ncclTaskColl info = {}; + // In rcclSetPipelining(), ncclFuncAllReduce, ncclFuncReduceScatter, and + // ncclFuncReduce share the same case body. Testing any one of them is + // sufficient to validate that code path. + info.func = ncclFuncAllReduce; + info.datatype = ncclBfloat16; + + size_t nBytes = 16 * 1024 * 1024; // 16MB + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 1) + << "gfx950 multi-node AllReduce bf16 should enable pipelining"; + + CleanupMockComm(comm); +} + +TEST(Rcclwrap, RcclSetPipelining_GFX950_SingleNode_Disable) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // For single-node, pipeline remains 0 + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx950", 8); + comm->nNodes = 1; // Single node + + ncclTaskColl info = {}; + // In rcclSetPipelining(), ncclFuncAllReduce, ncclFuncReduceScatter, and + // ncclFuncReduce share the same case body. Testing any one of them is + // sufficient to validate that code path. + info.func = ncclFuncAllReduce; + info.datatype = ncclBfloat16; + + size_t nBytes = 16 * 1024 * 1024; // 16MB + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 0) + << "gfx950 single-node should not enable pipelining"; + + CleanupMockComm(comm); +} + +TEST(Rcclwrap, RcclSetPipelining_GFX942_SingleNode_AllReduce_Enable) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // For single-node, pipeline is set to 1 for AllReduce with bf16 + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx942", 8); + comm->nNodes = 1; // Single node + + ncclTaskColl info = {}; + info.func = ncclFuncAllReduce; + info.datatype = ncclBfloat16; + + size_t nBytes = 16 * 1024 * 1024; // 16MB + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 1) + << "gfx942 single-node AllReduce bf16 should enable pipelining"; + + CleanupMockComm(comm); +} + +TEST(Rcclwrap, RcclSetPipelining_GFX942_MultiNode_AllReduce_Enable) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // For multi-node AllReduce with bf16, pipelining is enabled if + // nBytes <= 512MB * 2^(log2(nNodes)-1) + // Testing with nNodes = 4 => threshold = 512MB * 2^(2-1) = 1GB + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx942", 8); + comm->nNodes = 4; + + ncclTaskColl info = {}; + info.func = ncclFuncAllReduce; + info.datatype = ncclBfloat16; + + size_t nBytes = (1ULL << 30); // 1GB, exactly at threshold + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 1) + << "gfx942 4-node AllReduce at threshold should enable pipelining"; + + CleanupMockComm(comm); +} + +TEST(Rcclwrap, RcclSetPipelining_GFX942_MultiNode_AllReduce_Disable) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // When nBytes is just above the threshold, pipelining should be disabled + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx942", 8); + comm->nNodes = 4; + + ncclTaskColl info = {}; + info.func = ncclFuncAllReduce; + info.datatype = ncclBfloat16; + + size_t nBytes = (1ULL << 30) + 1024; // 1GB + 1KB, just above threshold + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 0) + << "gfx942 4-node AllReduce above threshold should disable pipelining"; + + CleanupMockComm(comm); +} + +TEST(Rcclwrap, RcclSetPipelining_GFX942_Enable) { + // Skip the test if pipelining has been disabled + // (RCCL_DISABLE_REDUCE_COPY_PIPELINING=1) + if (ShouldSkipRcclSetPipeliningTests()) { + GTEST_SKIP() + << "Skipping test: RCCL_DISABLE_REDUCE_COPY_PIPELINING environment " + "variable is set. Unset this variable to enable pipelining."; + } + + // ReduceScatter & Reduce should enable pipelining regardless of no. of nodes + ncclComm_t comm = nullptr; + struct ncclTopoSystem topo; + struct ncclTopoNode gpu; + CreateMockComm(comm, topo, gpu, "gfx942", 8); + comm->nNodes = 8; + + ncclTaskColl info = {}; + // In rcclSetPipelining(), ncclFuncReduceScatter, and + // ncclFuncReduce share the same case body. Testing any one of them is + // sufficient to validate that code path. + info.func = ncclFuncReduceScatter; + info.datatype = ncclBfloat16; + + size_t nBytes = 16 * 1024 * 1024; // 16MB + rcclSetPipelining(comm, nBytes, &info); + + EXPECT_EQ(info.pipeline, 1) + << "gfx942 ReduceScatter and Reduce should enable " + "pipelining with single or multi-node"; + + CleanupMockComm(comm); +} + } // namespace RcclUnitTesting