From 0ce20e7e07a9b3344f355ada621d80e7fa1681b0 Mon Sep 17 00:00:00 2001 From: Mustafa Abduljabbar Date: Fri, 25 Jul 2025 10:57:05 -0400 Subject: [PATCH] Add optional bf16 software-triggered pipelining for reduceCopyPacks (#1758) - Introduced double-buffering to reduce copy overhead and overlap BF16 arithmetic with data prefetching. - Aimed to improve performance of reduction-based collectives by up to 10%. - Implemented based on recommendations from Guennadi Riguer (AMD) - Added --force-reduce-pipeline option to install.sh to activate this optimization for BF16 reductions. - Feature is disabled by default to prevent regressions with large messages until auto-tuning logic is upstreamed. --------- Co-authored-by: Jeffrey Novotny Co-authored-by: Pedram Alizadeh --- CHANGELOG.md | 3 + CMakeLists.txt | 13 +++ install.sh | 10 +- src/device/common_kernel.h | 213 +++++++++++++++++++++++++++++++++++++ 4 files changed, 238 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8522ef7cba..3f09e86568 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,9 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https: * Multi-node tuning for AllGather, AllReduce, and ReduceScatter that leverages LL/LL64/LL128 protocol to use nontemporal vector load/store for tunable message size ranges. * LL/LL128 usage ranges for AR, AG, and RS are part of the tuning models, which enable architecture-specific tuning in conjunction with the existing Rome Models scheme in RCCL. * Two new APIs are exposed as part of an initiative to separate RCCL code. These APIs are `rcclGetAlgoInfo` and `rcclFuncMaxSendRecvCount`. However, user-level invocation requires that RCCL be built with `RCCL_EXPOSE_STATIC` enabled. +* Enabled double-buffering in `reduceCopyPacks` to trigger pipelining, especially to overlap bf16 arithmetic. +* Added `--force-reduce-pipeline` as an option that can be passed to the `install.sh` script. Passing this option will enable software-triggered pipelining `bfloat16` reductions (i.e. `all_reduce`, `reduce_scatter` and `reduce`). + ### Changed diff --git a/CMakeLists.txt b/CMakeLists.txt index 6c57e352f0..e7b8ad3fa3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,6 +38,7 @@ option(PROFILE "Enable profiling" option(TIMETRACE "Enable time-trace during compilation" OFF) option(TRACE "Enable additional tracing" OFF) option(FAULT_INJECTION "Enable fault injection" ON) +option(FORCE_REDUCE_PIPELINING "Force reduce pipelining" OFF) option(DISABLE_CHEAP_THREADFENCE "Compile-time killswitch for simpler fence" OFF) # Default GPU architectures to build @@ -799,6 +800,18 @@ foreach(file ${GENERATED_FILES}) list(APPEND HIP_SOURCES ${file}) endforeach() +# Enable SW pipelining where needed +foreach(SOURCE_FILE ${HIP_SOURCES}) + # TODO: enable bf16 pipelining by default upon having the pipelined/scalar switching feature + # if (FORCE_REDUCE_PIPELINING AND (SOURCE_FILE MATCHES "gensrc/reduce_.*" OR SOURCE_FILE MATCHES "gensrc/reduce_scatter_.*" OR SOURCE_FILE MATCHES "gensrc/all_reduce_.*")) + # message(STATUS "RCCL_ENABLE_SW_PIPELINE enabled for ${SOURCE_FILE}") + # set_source_files_properties(${SOURCE_FILE} PROPERTIES COMPILE_FLAGS "-DRCCL_ENABLE_SW_PIPELINE") + if(FORCE_REDUCE_PIPELINING AND SOURCE_FILE MATCHES "gensrc/(reduce|reduce_scatter|all_reduce).*_bf16\\.cpp$") + message(STATUS "BF16 pipelining support enabled for ${SOURCE_FILE}") + set_source_files_properties(${SOURCE_FILE} PROPERTIES COMPILE_FLAGS "-DRCCL_ENABLE_SW_PIPELINE") + endif() +endforeach() + # Create an initial git_version.cpp file (that will be updated with latest git version) #================================================================================================== file(WRITE ${CMAKE_CURRENT_BINARY_DIR}/git_version.cpp "") diff --git a/install.sh b/install.sh index d67759e0d7..21d563a7ce 100755 --- a/install.sh +++ b/install.sh @@ -35,6 +35,7 @@ roctx_enabled=true run_tests=false run_tests_all=false time_trace=false +force_reduce_pipeline=false # ################################################# # helper functions @@ -71,6 +72,7 @@ function display_help() echo " -t|--tests_build Build rccl unit tests, but do not run" echo " --time-trace Plot the build time of RCCL (requires \`ninja-build\` package installed on the system)" echo " --verbose Show compile commands" + echo " --force-reduce-pipeline Force reduce_copy sw pipeline to be used for every reduce-based collectives and datatypes" } # ################################################# @@ -80,7 +82,7 @@ function display_help() # check if we have a modern version of getopt that can handle whitespace and long parameters getopt -T if [[ "$?" -eq 4 ]]; then - GETOPT_PARSE=$(getopt --name "${0}" --options cdfhij:lprt --longoptions address-sanitizer,dependencies,debug,enable-code-coverage,enable_backtrace,disable-colltrace,disable-msccl-kernel,disable-mscclpp,fast,help,install,jobs:,local_gpu_only,amdgpu_targets:,no_clean,npkit-enable,log-trace,openmp-test-enable,roctx-enable,package_build,prefix:,rm-legacy-include-dir,run_tests_all,run_tests_quick,static,tests_build,time-trace,verbose -- "$@") + GETOPT_PARSE=$(getopt --name "${0}" --options cdfhij:lprt --longoptions address-sanitizer,dependencies,debug,enable-code-coverage,enable_backtrace,disable-colltrace,disable-msccl-kernel,disable-mscclpp,fast,help,install,jobs:,local_gpu_only,amdgpu_targets:,no_clean,npkit-enable,log-trace,openmp-test-enable,roctx-enable,package_build,prefix:,rm-legacy-include-dir,run_tests_all,run_tests_quick,static,tests_build,time-trace,force-reduce-pipeline,verbose -- "$@") else echo "Need a new version of getopt" exit 1 @@ -123,6 +125,7 @@ while true; do -t | --tests_build) build_tests=true; shift ;; --time-trace) time_trace=true; shift ;; --verbose) build_verbose=true; shift ;; + --force-reduce-pipeline) force_reduce_pipeline=true; shift ;; --) shift ; break ;; *) echo "Unexpected command line parameter received; aborting"; exit 1 @@ -277,6 +280,11 @@ if [[ "${openmp_test_enabled}" == true ]]; then cmake_common_options="${cmake_common_options} -DOPENMP_TESTS_ENABLED=ON" fi +# Force Reduce pipeline +if [[ "${force_reduce_pipeline}" == true ]]; then + cmake_common_options="${cmake_common_options} -DFORCE_REDUCE_PIPELINING=ON" +fi + # Enable NPKit if [[ "${npkit_enabled}" == true ]]; then cmake_common_options="${cmake_common_options} -DENABLE_NPKIT=ON" diff --git a/src/device/common_kernel.h b/src/device/common_kernel.h index 9223c8304c..35cace5d88 100644 --- a/src/device/common_kernel.h +++ b/src/device/common_kernel.h @@ -27,6 +27,7 @@ inline __device__ int loadInt(int* ptr) { return v; } +#ifndef RCCL_ENABLE_SW_PIPELINE template +__device__ __forceinline__ void loadSources( + const RedFn& redFn, + const SrcPtrFn& srcPtrFn, + IntBytes& globalOffset, + uintptr_t* minSrcs, + uint64_t *preOpArgs, + BytePack buff[MaxSrcs + !MaxSrcs][Unroll], + int nSrcs +) { + #pragma unroll Unroll + for (int s = 0; s < MinSrcs; s++) { + RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0); + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) { + if (s < MultimemSrcs) { + buff[s][u] = applyLoadMultimem(redFn, minSrcs[s]); + } else { + buff[s][u] = ld_volatile_global(minSrcs[s]); + } + minSrcs[s] += WARP_SIZE * BytePerPack; + } + } + for (int s = MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) { + uintptr_t src = cvta_to_global(srcPtrFn(s)) + globalOffset; + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) { + buff[s][u] = ld_volatile_global(src); + src += WARP_SIZE * BytePerPack; + } + } +} + +template + __device__ __forceinline__ void reduceAndStore( + RedFn redFn, uint64_t *preOpArgs, BytePack buff[MaxSrcs + !MaxSrcs][Unroll], + uintptr_t *minDsts, bool postOp, int nDsts, DstPtrFn const &dstPtrFn, IntBytes tailThreadBytesBehind, int nSrcs) { + for (int s = 0; s < MinSrcs; s++) { + RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0); + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) { + if (s < PreOpSrcs) buff[s][u] = applyPreOp(preFn, buff[s][u]); + if (s > 0) buff[0][u] = applyReduce(redFn, buff[0][u], buff[s][u]); + } + } + for (int s = MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) { + RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0); + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) { + if (s < PreOpSrcs) buff[s][u] = applyPreOp(preFn, buff[s][u]); + buff[0][u] = applyReduce(redFn, buff[0][u], buff[s][u]); + } + } + if (postOp) { + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) + buff[0][u] = applyPostOp(redFn, buff[0][u]); + } + + #pragma unroll Unroll + for (int d = 0; d < MinDsts; d++) { + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) { + if (d < MultimemDsts) { + multimem_st_global(minDsts[d], buff[0][u]); + } else { + st_global(minDsts[d], buff[0][u]); + } + minDsts[d] += WARP_SIZE * BytePerPack; + } + } + for (int d = MinDsts; (MinDsts < MaxDsts) && (d < MaxDsts) && (d < nDsts); d++) { + uintptr_t dstPtr = cvta_to_global(dstPtrFn(d)); + uintptr_t dst = dstPtr + tailThreadBytesBehind; + #pragma unroll Unroll + for (int u = 0; u < Unroll; u++) { + st_global(dst, buff[0][u]); + dst += WARP_SIZE * BytePerPack; + } + } +} + +template +__device__ __forceinline__ void reduceCopyPacks( + int nThreads, int &thread, + uint64_t redArg, uint64_t *preOpArgs, bool postOp, + int nSrcs, SrcPtrFn const &srcPtrFn, int nDsts, DstPtrFn const &dstPtrFn, + IntBytes &nBytesBehind, IntBytes &nBytesAhead + ) { + static_assert(std::is_signed::value, "IntBytes must be a signed integral type."); + static_assert(MinSrcs <= MaxSrcs, "MinSrcs must be less than or equal to MaxSrcs."); + //if (BytePerPack == 0) __trap(); + + // A hunk is the amount of contiguous data a warp consumes per loop iteration + // assuming all threads partake. + constexpr int BytePerHunk = Unroll*WARP_SIZE*BytePerPack; + int nWarps = nThreads/WARP_SIZE; + int warp = thread/WARP_SIZE; + int lane = thread%WARP_SIZE; + + // This thread's initial position. + IntBytes threadBytesBehind = nBytesBehind + (warp*BytePerHunk + lane*BytePerPack); + IntBytes threadBytesAhead = nBytesAhead - (warp*BytePerHunk + lane*BytePerPack); + // Number of hunks to be consumed over all warps. + IntBytes nHunksAhead = nBytesAhead/(BytePerHunk + !BytePerHunk); + // Advance collective position. + nBytesBehind += nHunksAhead*BytePerHunk; + nBytesAhead -= nHunksAhead*BytePerHunk; + if (Unroll==1 && BytePerPack <= nBytesAhead) { + // Only Unroll=1 can do partial hunks (where not all threads partake). + nHunksAhead += 1; + nBytesBehind += nBytesAhead - (nBytesAhead%(BytePerPack + !BytePerPack)); + nBytesAhead = nBytesAhead%(BytePerPack + !BytePerPack); + } + nHunksAhead -= warp; + + RedFn redFn(redArg); + uintptr_t minSrcs[MinSrcs + !MinSrcs]; + uintptr_t minDsts[MinDsts + !MinDsts]; + #pragma unroll + for (int s=0; s < MinSrcs; s++) { + minSrcs[s] = cvta_to_global(srcPtrFn(s)) + threadBytesBehind; + } + + #pragma unroll + for (int d=0; d < MinDsts; d++) { + // Yes, for some template arguments this code will be unreachable. That's fine. + // coverity[dead_error_line] + minDsts[d] = cvta_to_global(dstPtrFn(d)) + threadBytesBehind; + } + BytePack acc1[MaxSrcs + !MaxSrcs][Unroll]; + BytePack acc2[MaxSrcs + !MaxSrcs][Unroll]; + bool tailProcess = false; + IntBytes tailThreadBytesBehind; + // We dictate loop termination condition according to whether partial hunks + // can be handled or not. + while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) { + + // load sources into acc1 + loadSources( + redFn, srcPtrFn, threadBytesBehind, minSrcs, preOpArgs, acc1, nSrcs + ); + + if(tailProcess) { + reduceAndStore( + redFn, preOpArgs, acc2, minDsts, postOp, nDsts, dstPtrFn, tailThreadBytesBehind, nSrcs + ); + + #pragma unroll + for (int d=0; d < MinDsts; d++) { + minDsts[d] += (nWarps-1)*BytePerHunk; + } + } + + #pragma unroll + for (int s=0; s < MinSrcs; s++) { + minSrcs[s] += (nWarps-1)*BytePerHunk; + } + threadBytesAhead -= nWarps*BytePerHunk; + nHunksAhead -= nWarps; + tailProcess = Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead); + + tailThreadBytesBehind = threadBytesBehind; + threadBytesBehind += nWarps*BytePerHunk; + if(tailProcess) { + loadSources( + redFn, srcPtrFn, threadBytesBehind, minSrcs, preOpArgs, acc2, nSrcs + ); + } + reduceAndStore( + redFn, preOpArgs, acc1, minDsts, postOp, nDsts, dstPtrFn, tailThreadBytesBehind, nSrcs + ); + + if(tailProcess) { + #pragma unroll + for (int d=0; d < MinDsts; d++) { + minDsts[d] += (nWarps-1)*BytePerHunk; + } + #pragma unroll + for (int s=0; s < MinSrcs; s++) { + minSrcs[s] += (nWarps-1)*BytePerHunk; + } + tailThreadBytesBehind = threadBytesBehind; + threadBytesBehind += nWarps*BytePerHunk; + threadBytesAhead -= nWarps*BytePerHunk; + nHunksAhead -= nWarps; + } + } + + if(tailProcess) { + reduceAndStore( + redFn, preOpArgs, acc2, minDsts, postOp, nDsts, dstPtrFn, tailThreadBytesBehind, nSrcs + ); + } + nWarps = nThreads/WARP_SIZE; + warp = thread/WARP_SIZE; + lane = thread%WARP_SIZE; + // The last loop iteration could have been partial, i.e. not taken by all + // threads. The threads that weren't included need an extra subtraction to + // make the value warp uniform. + if (Unroll==1 && nHunksAhead > 0) nHunksAhead -= nWarps; + // Rotate warps so the warp which got the least work here will be warp 0. + // This effectively assigns: warp = (warp-nHunks+nWarps)%nWarps; + warp = -nHunksAhead; + thread = warp*WARP_SIZE + lane; +} +#endif template