From e86b217182ca1c1f4f1836cc7ff1cdaa4df461a3 Mon Sep 17 00:00:00 2001 From: Wenkai Du <43822138+wenkaidu@users.noreply.github.com> Date: Thu, 20 Mar 2025 16:11:43 -0700 Subject: [PATCH] Add fault injection of starting warps with random variations (#1593) * Add fault injection of starting warps with random variations This is done by inserting randomly delays after __syncthreads(). The feature can be turned off by FAULT_INJECTION=OFF in cmake. * Remove manually introduced bug for demo purpose * Use only one thread per warp for checking wall clock [ROCm/rccl commit: 90ad586d9415a460e952c0dec4441cc485330294] --- projects/rccl/CMakeLists.txt | 33 +++++++++++++----- projects/rccl/cmake/scripts/add_faults.sh | 27 +++++++++++++++ projects/rccl/src/device/common.h | 42 ++++++++++++++++++++--- projects/rccl/src/include/comm.h | 4 +++ projects/rccl/src/include/device.h | 14 ++++++-- projects/rccl/src/init.cc | 26 +++++++++++--- 6 files changed, 126 insertions(+), 20 deletions(-) create mode 100755 projects/rccl/cmake/scripts/add_faults.sh diff --git a/projects/rccl/CMakeLists.txt b/projects/rccl/CMakeLists.txt index fd32754cef..177012a21d 100644 --- a/projects/rccl/CMakeLists.txt +++ b/projects/rccl/CMakeLists.txt @@ -33,6 +33,7 @@ option(ROCTX "Enable ROCTX" option(PROFILE "Enable profiling" OFF) option(TIMETRACE "Enable time-trace during compilation" OFF) option(TRACE "Enable additional tracing" OFF) +option(FAULT_INJECTION "Enable fault injection" ON) # Default GPU architectures to build #================================================================================================== @@ -651,14 +652,26 @@ foreach(SRC_FILE ${SRC_FILES}) list(APPEND HIP_SOURCES ${HIP_FILE}) # Create a custom command to create hipified source code - add_custom_command( - OUTPUT ${HIP_FILE} - COMMAND mkdir -p ${HIP_FILE_DIR} - && ${hipify-perl_executable} -quiet-warnings ${CMAKE_SOURCE_DIR}/${SRC_FILE} -o ${HIP_FILE} - && ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_unroll.sh ${HIP_FILE} - MAIN_DEPENDENCY ${SRC_FILE} - COMMENT "Hipifying ${SRC_FILE} -> ${HIP_FILE}" - ) + if (FAULT_INJECTION) + add_custom_command( + OUTPUT ${HIP_FILE} + COMMAND mkdir -p ${HIP_FILE_DIR} + && ${hipify-perl_executable} -quiet-warnings ${CMAKE_SOURCE_DIR}/${SRC_FILE} -o ${HIP_FILE} + && ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_unroll.sh ${HIP_FILE} + && ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_faults.sh ${HIP_FILE} + MAIN_DEPENDENCY ${SRC_FILE} + COMMENT "Hipifying ${SRC_FILE} -> ${HIP_FILE}" + ) + else() + add_custom_command( + OUTPUT ${HIP_FILE} + COMMAND mkdir -p ${HIP_FILE_DIR} + && ${hipify-perl_executable} -quiet-warnings ${CMAKE_SOURCE_DIR}/${SRC_FILE} -o ${HIP_FILE} + && ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_unroll.sh ${HIP_FILE} + MAIN_DEPENDENCY ${SRC_FILE} + COMMENT "Hipifying ${SRC_FILE} -> ${HIP_FILE}" + ) + endif() endforeach() # Generate device/host tables and all the collective functions that are going to be in librccl.so @@ -821,6 +834,10 @@ endif() if(TIMETRACE) target_compile_options(rccl PRIVATE -ftime-trace) endif() +if (FAULT_INJECTION) + target_compile_definitions(rccl PRIVATE ENABLE_FAULT_INJECTION) + message(STATUS "Fault injection enabled") +endif() ## Set RCCL linked library directories target_link_directories(rccl PRIVATE ${ROCM_SMI_LIB_DIR}) diff --git a/projects/rccl/cmake/scripts/add_faults.sh b/projects/rccl/cmake/scripts/add_faults.sh new file mode 100755 index 0000000000..5d6c59fa83 --- /dev/null +++ b/projects/rccl/cmake/scripts/add_faults.sh @@ -0,0 +1,27 @@ +# Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +HIP_FILE=$1 + +if [[ "$HIP_FILE" =~ .*/src/device/.*\.h ]]; then + sed -i "s/__syncthreads()/__syncthreads(); insert_random_delay_per_warp()/" "$HIP_FILE" + + echo "Added fault injection to $HIP_FILE" +fi \ No newline at end of file diff --git a/projects/rccl/src/device/common.h b/projects/rccl/src/device/common.h index 0eb3ea9915..01c529c437 100644 --- a/projects/rccl/src/device/common.h +++ b/projects/rccl/src/device/common.h @@ -44,15 +44,15 @@ // TODO: switch to atomicInc after llvm crash is fixed // uint32_t pos = atomicInc(&ncclShmem.collTraceTail->tail, COLLTRACE_NUM_ITEMS) - #define traceKernelLaunch(launch_type) { \ + #define traceKernelLaunch(launch_type, ix) { \ INC_COLL_TRACE \ collTrace->funcIndex = ncclShmem.funcId; \ __trace_hwreg()\ + collTrace->batchIx = ix; \ if (ncclShmem.workType == ncclDevWorkTypeP2p) { \ struct ncclDevWorkP2p *p2pWork = (struct ncclDevWorkP2p*)ncclShmem.workStorage; \ collTrace->p2p.sendRank = p2pWork->sendRank; \ collTrace->p2p.recvRank = p2pWork->recvRank; \ - collTrace->p2p.nP2pChannels = p2pWork->nP2pChannels; \ collTrace->p2p.nSendChannels = p2pWork->nSendChannels; \ collTrace->p2p.nRecvChannels = p2pWork->nRecvChannels; \ collTrace->p2p.channelBase = p2pWork->channelBase; \ @@ -60,6 +60,8 @@ collTrace->p2p.recvConnIndex = p2pWork->recvConnIndex; \ collTrace->p2p.sendProtoLL = p2pWork->sendProtoLL; \ collTrace->p2p.recvProtoLL = p2pWork->recvProtoLL; \ + collTrace->p2p.sendRegistered = p2pWork->sendRegistered; \ + collTrace->p2p.recvRegistered = p2pWork->recvRegistered; \ collTrace->p2pOpCount[0] = p2pWork->sendOpCount; \ collTrace->p2pOpCount[1] = p2pWork->recvOpCount; \ collTrace->type = (launch_type) | ncclCollTraceP2pElemType; \ @@ -95,7 +97,7 @@ collTrace->type = ncclCollTraceDataType; \ } #else -#define traceKernelLaunch(launch_type) +#define traceKernelLaunch(launch_type, batchIx) #define traceKernelEnd(end_type) #define traceData(data2, data4, data8_0, data8_1) #endif @@ -151,6 +153,9 @@ struct ncclShmemData { #ifdef ENABLE_PROFILING struct ncclProf prof; #endif +#ifdef ENABLE_FAULT_INJECTION + uint64_t faults; +#endif }; extern __shared__ ncclShmemData ncclShmem; @@ -160,6 +165,28 @@ extern __shared__ ncclShmemData ncclShmem; extern __shared__ ulong2 ncclShmemPerWarp[ncclShmemScratchWarpSize()*(NCCL_MAX_NTHREADS/WARP_SIZE)/sizeof(ulong2)]; #endif +#ifdef ENABLE_FAULT_INJECTION +__device__ inline void insert_random_delay_per_warp() { + if ((ncclShmem.faults & RANDOM_DELAY_ON_WARP_START) && (threadIdx.x%WARP_SIZE == 0)) { + switch ((wall_clock64()>>(threadIdx.x/WARP_SIZE*2))&0x3) { + case 0: + __builtin_amdgcn_s_sleep(0); + break; + case 1: + __builtin_amdgcn_s_sleep(8); + break; + case 2: + __builtin_amdgcn_s_sleep(16); + break; + case 3: + default: + __builtin_amdgcn_s_sleep(32); + break; + } + } +} +#endif + __device__ inline void* ncclScratchForWarp(int warp) { return (char*)ncclShmemPerWarp + warp*ncclShmemScratchWarpSize(); } @@ -468,6 +495,10 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a ncclShmem.groups[tid-WARP_SIZE].barrier = 0; break; case 2: +#ifdef ENABLE_FAULT_INJECTION + /* load faults injection before first sync threads */ + if (tid == 2*WARP_SIZE) ncclShmem.faults = args->comm->faults; +#endif break; case 3: /* set abort flag to 0 */ @@ -508,6 +539,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } #endif __syncthreads(); // publish shmem + #ifdef ENABLE_PROFILING if (tid == 0) { ncclShmem.prof.count = 0; @@ -515,7 +547,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } #endif if (tid == 0) __insert_timestamp(__LINE__); - if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceKernelLaunchType); + if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceKernelLaunchType, 0); if (tid == 0 && ncclShmem.args.workStorageType == ncclDevWorkStorageTypeFifo) { // ncclShmem.workConsumed written by loadWorkBatchToShmem before __syncthreads() @@ -565,7 +597,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a } if (aborted) break; - if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceCollLaunchType); + if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceCollLaunchType, batchIx); } if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelEnd(ncclCollTraceKernelEndType); diff --git a/projects/rccl/src/include/comm.h b/projects/rccl/src/include/comm.h index db4df7c0cb..ad699a9a73 100644 --- a/projects/rccl/src/include/comm.h +++ b/projects/rccl/src/include/comm.h @@ -585,6 +585,10 @@ struct ncclComm { bool collTraceEnabled; #endif +#ifdef ENABLE_FAULT_INJECTION + uint64_t faults; +#endif + ncclConfig_t config; // initState is to more conveniently reclaim resources when errors happen. ncclResult_t initState; diff --git a/projects/rccl/src/include/device.h b/projects/rccl/src/include/device.h index 9c652c43cf..13c568e1d9 100644 --- a/projects/rccl/src/include/device.h +++ b/projects/rccl/src/include/device.h @@ -433,7 +433,8 @@ struct ncclCollTrace { uint8_t type; uint8_t bid; int16_t funcIndex; - uint32_t data_0:24; + uint16_t data_0; + uint8_t batchIx; uint8_t tid; uint8_t channelId; uint64_t timeStamp:56; @@ -452,7 +453,6 @@ struct ncclCollTrace { struct { uint8_t sendRank; uint8_t recvRank; - uint8_t nP2pChannels; uint8_t nSendChannels; uint8_t nRecvChannels; uint8_t channelBase; @@ -460,6 +460,8 @@ struct ncclCollTrace { uint8_t recvConnIndex:2; uint8_t sendProtoLL:1; uint8_t recvProtoLL:1; + uint8_t sendRegistered:1; + uint8_t recvRegistered:1; } p2p; }; }; @@ -518,8 +520,16 @@ struct ncclDevComm { #ifdef ENABLE_PROFILING struct ncclProf* devProf; #endif + +#ifdef ENABLE_FAULT_INJECTION + uint64_t faults; +#endif }; +#ifdef ENABLE_FAULT_INJECTION +#define RANDOM_DELAY_ON_WARP_START 0x1L +#endif + struct alignas(16) ncclDevCommAndChannels { struct ncclDevComm comm; struct ncclDevChannel channels[MAXCHANNELS]; diff --git a/projects/rccl/src/init.cc b/projects/rccl/src/init.cc index d7facdc439..d0427f187d 100644 --- a/projects/rccl/src/init.cc +++ b/projects/rccl/src/init.cc @@ -266,8 +266,8 @@ void *ncclCommThreadMain(void *arg) { if (type == ncclCollTraceCollElemType) { sprintf(line+offset, " CE %s nw %d bi %d nc %d root %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks); } else if (type == ncclCollTraceP2pElemType) { - sprintf(line+offset, " Send %d -> %d/%d connIdx/LL %d/%d -> Recv %d nc %d cb %d busId %lx nRanks %d", - td->p2p.sendRank, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRank, td->p2p.nP2pChannels, td->p2p.channelBase, + sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d", + td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase, comm->busId, comm->nRanks); } else { switch (type&0xf) { @@ -276,13 +276,13 @@ void *ncclCommThreadMain(void *arg) { if ((type&0xf) == ncclCollTraceKernelLaunchType) sprintf(line+offset, " KL HWID %8x %s", td->data_0, funcNames[fIdx]); else if ((type&0xf) == ncclCollTraceCollLaunchType) - sprintf(line+offset, " CL %s", funcNames[fIdx]); + sprintf(line+offset, " CL %d %s", td->batchIx, funcNames[fIdx]); offset = strlen(line); if ((type&0xf0) == ncclCollTraceCollElemType) sprintf(line+offset, " nw %d bi %d nc %d root %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks); else if ((type&0xf0) == ncclCollTraceP2pElemType) - sprintf(line+offset, " Send %d -> %d/%d ConnIdx/LL %d/%d -> Recv %d nc %d cb %d busId %lx nRanks %d", - td->p2p.sendRank, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRank, td->p2p.nP2pChannels, td->p2p.channelBase, + sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d", + td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase, comm->busId, comm->nRanks); break; case ncclCollTraceKernelEndType: @@ -544,6 +544,8 @@ exit: return ret; } +RCCL_PARAM(InjectFaults, "INJECT_FAULTS", 0); + static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, int ndev, int rank) { if (ndev < 1) { WARN("invalid device count (%d) requested", ndev); @@ -608,6 +610,16 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in comm->collTraceThread = 0; } #endif + + if (rcclParamInjectFaults() != 0) { +#ifdef ENABLE_FAULT_INJECTION + comm->faults = rcclParamInjectFaults(); + if (comm->rank == 0) INFO(NCCL_INIT, "Enabled RCCL faults injection with value 0x%lx", comm->faults); +#else + WARN("Ignore faults injection of value 0x%lx as RCCL is not compiled to support it", rcclParamInjectFaults()); +#endif + } + comm->collNetSupport = 0; memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix)); @@ -763,6 +775,10 @@ static ncclResult_t devCommSetup(ncclComm_t comm) { NCCLCHECK(ncclCudaCalloc(&tmpCommAndChans.comm.devProf, MAXCHANNELS*PROFILE_NUM_LAUNCHES, comm->sideStream)); #endif +#ifdef ENABLE_FAULT_INJECTION + tmpCommAndChans.comm.faults = comm->faults; +#endif + NCCLCHECKGOTO(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, comm->sharedRes->deviceStream.cudaStream), ret, fail); exit: NCCLCHECK(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream));