Add optional bf16 software-triggered pipelining for reduceCopyPacks (#1758)

- Introduced double-buffering to reduce copy overhead and overlap BF16 arithmetic with data prefetching.
- Aimed to improve performance of reduction-based collectives by up to 10%.
- Implemented based on recommendations from Guennadi Riguer (AMD)
- Added --force-reduce-pipeline option to install.sh to activate this optimization for BF16 reductions.
- Feature is disabled by default to prevent regressions with large messages until auto-tuning logic is upstreamed.
---------

Co-authored-by: Jeffrey Novotny <jnovotny@amd.com>
Co-authored-by: Pedram Alizadeh <pmohamma@amd.com>
此提交包含在:
Mustafa Abduljabbar
2025-07-25 10:57:05 -04:00
提交者 GitHub
父節點 1c3d1b3842
當前提交 0ce20e7e07
共有 4 個檔案被更改,包括 238 行新增1 行删除
+3
查看文件
@@ -22,6 +22,9 @@ Full documentation for RCCL is available at [https://rccl.readthedocs.io](https:
* Multi-node tuning for AllGather, AllReduce, and ReduceScatter that leverages LL/LL64/LL128 protocol to use nontemporal vector load/store for tunable message size ranges.
* LL/LL128 usage ranges for AR, AG, and RS are part of the tuning models, which enable architecture-specific tuning in conjunction with the existing Rome Models scheme in RCCL.
* Two new APIs are exposed as part of an initiative to separate RCCL code. These APIs are `rcclGetAlgoInfo` and `rcclFuncMaxSendRecvCount`. However, user-level invocation requires that RCCL be built with `RCCL_EXPOSE_STATIC` enabled.
* Enabled double-buffering in `reduceCopyPacks` to trigger pipelining, especially to overlap bf16 arithmetic.
* Added `--force-reduce-pipeline` as an option that can be passed to the `install.sh` script. Passing this option will enable software-triggered pipelining `bfloat16` reductions (i.e. `all_reduce`, `reduce_scatter` and `reduce`).
### Changed
+13
查看文件
@@ -38,6 +38,7 @@ option(PROFILE "Enable profiling"
option(TIMETRACE "Enable time-trace during compilation" OFF)
option(TRACE "Enable additional tracing" OFF)
option(FAULT_INJECTION "Enable fault injection" ON)
option(FORCE_REDUCE_PIPELINING "Force reduce pipelining" OFF)
option(DISABLE_CHEAP_THREADFENCE "Compile-time killswitch for simpler fence" OFF)
# Default GPU architectures to build
@@ -799,6 +800,18 @@ foreach(file ${GENERATED_FILES})
list(APPEND HIP_SOURCES ${file})
endforeach()
# Enable SW pipelining where needed
foreach(SOURCE_FILE ${HIP_SOURCES})
# TODO: enable bf16 pipelining by default upon having the pipelined/scalar switching feature
# if (FORCE_REDUCE_PIPELINING AND (SOURCE_FILE MATCHES "gensrc/reduce_.*" OR SOURCE_FILE MATCHES "gensrc/reduce_scatter_.*" OR SOURCE_FILE MATCHES "gensrc/all_reduce_.*"))
# message(STATUS "RCCL_ENABLE_SW_PIPELINE enabled for ${SOURCE_FILE}")
# set_source_files_properties(${SOURCE_FILE} PROPERTIES COMPILE_FLAGS "-DRCCL_ENABLE_SW_PIPELINE")
if(FORCE_REDUCE_PIPELINING AND SOURCE_FILE MATCHES "gensrc/(reduce|reduce_scatter|all_reduce).*_bf16\\.cpp$")
message(STATUS "BF16 pipelining support enabled for ${SOURCE_FILE}")
set_source_files_properties(${SOURCE_FILE} PROPERTIES COMPILE_FLAGS "-DRCCL_ENABLE_SW_PIPELINE")
endif()
endforeach()
# Create an initial git_version.cpp file (that will be updated with latest git version)
#==================================================================================================
file(WRITE ${CMAKE_CURRENT_BINARY_DIR}/git_version.cpp "")
+9 -1
查看文件
@@ -35,6 +35,7 @@ roctx_enabled=true
run_tests=false
run_tests_all=false
time_trace=false
force_reduce_pipeline=false
# #################################################
# helper functions
@@ -71,6 +72,7 @@ function display_help()
echo " -t|--tests_build Build rccl unit tests, but do not run"
echo " --time-trace Plot the build time of RCCL (requires \`ninja-build\` package installed on the system)"
echo " --verbose Show compile commands"
echo " --force-reduce-pipeline Force reduce_copy sw pipeline to be used for every reduce-based collectives and datatypes"
}
# #################################################
@@ -80,7 +82,7 @@ function display_help()
# check if we have a modern version of getopt that can handle whitespace and long parameters
getopt -T
if [[ "$?" -eq 4 ]]; then
GETOPT_PARSE=$(getopt --name "${0}" --options cdfhij:lprt --longoptions address-sanitizer,dependencies,debug,enable-code-coverage,enable_backtrace,disable-colltrace,disable-msccl-kernel,disable-mscclpp,fast,help,install,jobs:,local_gpu_only,amdgpu_targets:,no_clean,npkit-enable,log-trace,openmp-test-enable,roctx-enable,package_build,prefix:,rm-legacy-include-dir,run_tests_all,run_tests_quick,static,tests_build,time-trace,verbose -- "$@")
GETOPT_PARSE=$(getopt --name "${0}" --options cdfhij:lprt --longoptions address-sanitizer,dependencies,debug,enable-code-coverage,enable_backtrace,disable-colltrace,disable-msccl-kernel,disable-mscclpp,fast,help,install,jobs:,local_gpu_only,amdgpu_targets:,no_clean,npkit-enable,log-trace,openmp-test-enable,roctx-enable,package_build,prefix:,rm-legacy-include-dir,run_tests_all,run_tests_quick,static,tests_build,time-trace,force-reduce-pipeline,verbose -- "$@")
else
echo "Need a new version of getopt"
exit 1
@@ -123,6 +125,7 @@ while true; do
-t | --tests_build) build_tests=true; shift ;;
--time-trace) time_trace=true; shift ;;
--verbose) build_verbose=true; shift ;;
--force-reduce-pipeline) force_reduce_pipeline=true; shift ;;
--) shift ; break ;;
*) echo "Unexpected command line parameter received; aborting";
exit 1
@@ -277,6 +280,11 @@ if [[ "${openmp_test_enabled}" == true ]]; then
cmake_common_options="${cmake_common_options} -DOPENMP_TESTS_ENABLED=ON"
fi
# Force Reduce pipeline
if [[ "${force_reduce_pipeline}" == true ]]; then
cmake_common_options="${cmake_common_options} -DFORCE_REDUCE_PIPELINING=ON"
fi
# Enable NPKit
if [[ "${npkit_enabled}" == true ]]; then
cmake_common_options="${cmake_common_options} -DENABLE_NPKIT=ON"
+213
查看文件
@@ -27,6 +27,7 @@ inline __device__ int loadInt(int* ptr) {
return v;
}
#ifndef RCCL_ENABLE_SW_PIPELINE
template<typename RedFn, typename T, int Unroll, int BytePerPack,
int MultimemSrcs, int MinSrcs, int MaxSrcs,
int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,
@@ -210,6 +211,218 @@ __device__ __attribute__((noinline)) void reduceCopyPacks(
warp = -nHunksAhead;
thread = warp*WARP_SIZE + lane;
}
#else
template <typename RedFn, typename SrcPtrFn, typename IntBytes, int MultimemSrcs, int MinSrcs, int MaxSrcs, int PreOpSrcs, int Unroll, int BytePerPack>
__device__ __forceinline__ void loadSources(
const RedFn& redFn,
const SrcPtrFn& srcPtrFn,
IntBytes& globalOffset,
uintptr_t* minSrcs,
uint64_t *preOpArgs,
BytePack<BytePerPack> buff[MaxSrcs + !MaxSrcs][Unroll],
int nSrcs
) {
#pragma unroll Unroll
for (int s = 0; s < MinSrcs; s++) {
RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++) {
if (s < MultimemSrcs) {
buff[s][u] = applyLoadMultimem<RedFn, BytePerPack>(redFn, minSrcs[s]);
} else {
buff[s][u] = ld_volatile_global<BytePerPack>(minSrcs[s]);
}
minSrcs[s] += WARP_SIZE * BytePerPack;
}
}
for (int s = MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) {
uintptr_t src = cvta_to_global(srcPtrFn(s)) + globalOffset;
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++) {
buff[s][u] = ld_volatile_global<BytePerPack>(src);
src += WARP_SIZE * BytePerPack;
}
}
}
template <typename RedFn, typename DstPtrFn, typename IntBytes, int MultimemDsts, int MinSrcs, int MaxSrcs, int MinDsts, int MaxDsts, int PreOpSrcs, int Unroll, int BytePerPack>
__device__ __forceinline__ void reduceAndStore(
RedFn redFn, uint64_t *preOpArgs, BytePack<BytePerPack> buff[MaxSrcs + !MaxSrcs][Unroll],
uintptr_t *minDsts, bool postOp, int nDsts, DstPtrFn const &dstPtrFn, IntBytes tailThreadBytesBehind, int nSrcs) {
for (int s = 0; s < MinSrcs; s++) {
RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++) {
if (s < PreOpSrcs) buff[s][u] = applyPreOp(preFn, buff[s][u]);
if (s > 0) buff[0][u] = applyReduce(redFn, buff[0][u], buff[s][u]);
}
}
for (int s = MinSrcs; (MinSrcs < MaxSrcs) && (s < MaxSrcs) && (s < nSrcs); s++) {
RedFn preFn(s < PreOpSrcs ? preOpArgs[s] : 0);
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++) {
if (s < PreOpSrcs) buff[s][u] = applyPreOp(preFn, buff[s][u]);
buff[0][u] = applyReduce(redFn, buff[0][u], buff[s][u]);
}
}
if (postOp) {
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++)
buff[0][u] = applyPostOp(redFn, buff[0][u]);
}
#pragma unroll Unroll
for (int d = 0; d < MinDsts; d++) {
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++) {
if (d < MultimemDsts) {
multimem_st_global(minDsts[d], buff[0][u]);
} else {
st_global<BytePerPack>(minDsts[d], buff[0][u]);
}
minDsts[d] += WARP_SIZE * BytePerPack;
}
}
for (int d = MinDsts; (MinDsts < MaxDsts) && (d < MaxDsts) && (d < nDsts); d++) {
uintptr_t dstPtr = cvta_to_global(dstPtrFn(d));
uintptr_t dst = dstPtr + tailThreadBytesBehind;
#pragma unroll Unroll
for (int u = 0; u < Unroll; u++) {
st_global<BytePerPack>(dst, buff[0][u]);
dst += WARP_SIZE * BytePerPack;
}
}
}
template<typename RedFn, typename T, int Unroll, int BytePerPack,
int MultimemSrcs, int MinSrcs, int MaxSrcs,
int MultimemDsts, int MinDsts, int MaxDsts, int PreOpSrcs,
typename IntBytes, typename SrcPtrFn, typename DstPtrFn>
__device__ __forceinline__ void reduceCopyPacks(
int nThreads, int &thread,
uint64_t redArg, uint64_t *preOpArgs, bool postOp,
int nSrcs, SrcPtrFn const &srcPtrFn, int nDsts, DstPtrFn const &dstPtrFn,
IntBytes &nBytesBehind, IntBytes &nBytesAhead
) {
static_assert(std::is_signed<IntBytes>::value, "IntBytes must be a signed integral type.");
static_assert(MinSrcs <= MaxSrcs, "MinSrcs must be less than or equal to MaxSrcs.");
//if (BytePerPack == 0) __trap();
// A hunk is the amount of contiguous data a warp consumes per loop iteration
// assuming all threads partake.
constexpr int BytePerHunk = Unroll*WARP_SIZE*BytePerPack;
int nWarps = nThreads/WARP_SIZE;
int warp = thread/WARP_SIZE;
int lane = thread%WARP_SIZE;
// This thread's initial position.
IntBytes threadBytesBehind = nBytesBehind + (warp*BytePerHunk + lane*BytePerPack);
IntBytes threadBytesAhead = nBytesAhead - (warp*BytePerHunk + lane*BytePerPack);
// Number of hunks to be consumed over all warps.
IntBytes nHunksAhead = nBytesAhead/(BytePerHunk + !BytePerHunk);
// Advance collective position.
nBytesBehind += nHunksAhead*BytePerHunk;
nBytesAhead -= nHunksAhead*BytePerHunk;
if (Unroll==1 && BytePerPack <= nBytesAhead) {
// Only Unroll=1 can do partial hunks (where not all threads partake).
nHunksAhead += 1;
nBytesBehind += nBytesAhead - (nBytesAhead%(BytePerPack + !BytePerPack));
nBytesAhead = nBytesAhead%(BytePerPack + !BytePerPack);
}
nHunksAhead -= warp;
RedFn redFn(redArg);
uintptr_t minSrcs[MinSrcs + !MinSrcs];
uintptr_t minDsts[MinDsts + !MinDsts];
#pragma unroll
for (int s=0; s < MinSrcs; s++) {
minSrcs[s] = cvta_to_global(srcPtrFn(s)) + threadBytesBehind;
}
#pragma unroll
for (int d=0; d < MinDsts; d++) {
// Yes, for some template arguments this code will be unreachable. That's fine.
// coverity[dead_error_line]
minDsts[d] = cvta_to_global(dstPtrFn(d)) + threadBytesBehind;
}
BytePack<BytePerPack> acc1[MaxSrcs + !MaxSrcs][Unroll];
BytePack<BytePerPack> acc2[MaxSrcs + !MaxSrcs][Unroll];
bool tailProcess = false;
IntBytes tailThreadBytesBehind;
// We dictate loop termination condition according to whether partial hunks
// can be handled or not.
while (Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead)) {
// load sources into acc1
loadSources<RedFn, SrcPtrFn, IntBytes, MultimemSrcs, MinSrcs, MaxSrcs, PreOpSrcs, Unroll, BytePerPack>(
redFn, srcPtrFn, threadBytesBehind, minSrcs, preOpArgs, acc1, nSrcs
);
if(tailProcess) {
reduceAndStore<RedFn, DstPtrFn, IntBytes, MultimemDsts, MinSrcs, MaxSrcs, MinDsts, MaxDsts, PreOpSrcs, Unroll, BytePerPack>(
redFn, preOpArgs, acc2, minDsts, postOp, nDsts, dstPtrFn, tailThreadBytesBehind, nSrcs
);
#pragma unroll
for (int d=0; d < MinDsts; d++) {
minDsts[d] += (nWarps-1)*BytePerHunk;
}
}
#pragma unroll
for (int s=0; s < MinSrcs; s++) {
minSrcs[s] += (nWarps-1)*BytePerHunk;
}
threadBytesAhead -= nWarps*BytePerHunk;
nHunksAhead -= nWarps;
tailProcess = Unroll==1 ? (BytePerPack <= threadBytesAhead) : (0 < nHunksAhead);
tailThreadBytesBehind = threadBytesBehind;
threadBytesBehind += nWarps*BytePerHunk;
if(tailProcess) {
loadSources<RedFn, SrcPtrFn, IntBytes, MultimemSrcs, MinSrcs, MaxSrcs, PreOpSrcs, Unroll, BytePerPack>(
redFn, srcPtrFn, threadBytesBehind, minSrcs, preOpArgs, acc2, nSrcs
);
}
reduceAndStore<RedFn, DstPtrFn, IntBytes, MultimemDsts, MinSrcs, MaxSrcs, MinDsts, MaxDsts, PreOpSrcs, Unroll, BytePerPack>(
redFn, preOpArgs, acc1, minDsts, postOp, nDsts, dstPtrFn, tailThreadBytesBehind, nSrcs
);
if(tailProcess) {
#pragma unroll
for (int d=0; d < MinDsts; d++) {
minDsts[d] += (nWarps-1)*BytePerHunk;
}
#pragma unroll
for (int s=0; s < MinSrcs; s++) {
minSrcs[s] += (nWarps-1)*BytePerHunk;
}
tailThreadBytesBehind = threadBytesBehind;
threadBytesBehind += nWarps*BytePerHunk;
threadBytesAhead -= nWarps*BytePerHunk;
nHunksAhead -= nWarps;
}
}
if(tailProcess) {
reduceAndStore<RedFn, DstPtrFn, IntBytes, MultimemDsts, MinSrcs, MaxSrcs, MinDsts, MaxDsts, PreOpSrcs, Unroll, BytePerPack>(
redFn, preOpArgs, acc2, minDsts, postOp, nDsts, dstPtrFn, tailThreadBytesBehind, nSrcs
);
}
nWarps = nThreads/WARP_SIZE;
warp = thread/WARP_SIZE;
lane = thread%WARP_SIZE;
// The last loop iteration could have been partial, i.e. not taken by all
// threads. The threads that weren't included need an extra subtraction to
// make the value warp uniform.
if (Unroll==1 && nHunksAhead > 0) nHunksAhead -= nWarps;
// Rotate warps so the warp which got the least work here will be warp 0.
// This effectively assigns: warp = (warp-nHunks+nWarps)%nWarps;
warp = -nHunksAhead;
thread = warp*WARP_SIZE + lane;
}
#endif
template<typename RedFn, typename T, int Unroll, int BytePerPack,
int MultimemSrcs, int MinSrcs, int MaxSrcs,