Add fault injection of starting warps with random variations (#1593)

* Add fault injection of starting warps with random variations

This is done by inserting randomly delays after __syncthreads().
The feature can be turned off by FAULT_INJECTION=OFF in cmake.

* Remove manually introduced bug for demo purpose

* Use only one thread per warp for checking wall clock

[ROCm/rccl commit: 90ad586d94]
This commit is contained in:
Wenkai Du
2025-03-20 16:11:43 -07:00
committad av GitHub
förälder 12c1fe8fdf
incheckning e86b217182
6 ändrade filer med 126 tillägg och 20 borttagningar
+25 -8
Visa fil
@@ -33,6 +33,7 @@ option(ROCTX "Enable ROCTX"
option(PROFILE "Enable profiling" OFF)
option(TIMETRACE "Enable time-trace during compilation" OFF)
option(TRACE "Enable additional tracing" OFF)
option(FAULT_INJECTION "Enable fault injection" ON)
# Default GPU architectures to build
#==================================================================================================
@@ -651,14 +652,26 @@ foreach(SRC_FILE ${SRC_FILES})
list(APPEND HIP_SOURCES ${HIP_FILE})
# Create a custom command to create hipified source code
add_custom_command(
OUTPUT ${HIP_FILE}
COMMAND mkdir -p ${HIP_FILE_DIR}
&& ${hipify-perl_executable} -quiet-warnings ${CMAKE_SOURCE_DIR}/${SRC_FILE} -o ${HIP_FILE}
&& ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_unroll.sh ${HIP_FILE}
MAIN_DEPENDENCY ${SRC_FILE}
COMMENT "Hipifying ${SRC_FILE} -> ${HIP_FILE}"
)
if (FAULT_INJECTION)
add_custom_command(
OUTPUT ${HIP_FILE}
COMMAND mkdir -p ${HIP_FILE_DIR}
&& ${hipify-perl_executable} -quiet-warnings ${CMAKE_SOURCE_DIR}/${SRC_FILE} -o ${HIP_FILE}
&& ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_unroll.sh ${HIP_FILE}
&& ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_faults.sh ${HIP_FILE}
MAIN_DEPENDENCY ${SRC_FILE}
COMMENT "Hipifying ${SRC_FILE} -> ${HIP_FILE}"
)
else()
add_custom_command(
OUTPUT ${HIP_FILE}
COMMAND mkdir -p ${HIP_FILE_DIR}
&& ${hipify-perl_executable} -quiet-warnings ${CMAKE_SOURCE_DIR}/${SRC_FILE} -o ${HIP_FILE}
&& ${CMAKE_COMMAND} -E env bash ${CMAKE_CURRENT_SOURCE_DIR}/cmake/scripts/add_unroll.sh ${HIP_FILE}
MAIN_DEPENDENCY ${SRC_FILE}
COMMENT "Hipifying ${SRC_FILE} -> ${HIP_FILE}"
)
endif()
endforeach()
# Generate device/host tables and all the collective functions that are going to be in librccl.so
@@ -821,6 +834,10 @@ endif()
if(TIMETRACE)
target_compile_options(rccl PRIVATE -ftime-trace)
endif()
if (FAULT_INJECTION)
target_compile_definitions(rccl PRIVATE ENABLE_FAULT_INJECTION)
message(STATUS "Fault injection enabled")
endif()
## Set RCCL linked library directories
target_link_directories(rccl PRIVATE ${ROCM_SMI_LIB_DIR})
+27
Visa fil
@@ -0,0 +1,27 @@
# Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
HIP_FILE=$1
if [[ "$HIP_FILE" =~ .*/src/device/.*\.h ]]; then
sed -i "s/__syncthreads()/__syncthreads(); insert_random_delay_per_warp()/" "$HIP_FILE"
echo "Added fault injection to $HIP_FILE"
fi
+37 -5
Visa fil
@@ -44,15 +44,15 @@
// TODO: switch to atomicInc after llvm crash is fixed
// uint32_t pos = atomicInc(&ncclShmem.collTraceTail->tail, COLLTRACE_NUM_ITEMS)
#define traceKernelLaunch(launch_type) { \
#define traceKernelLaunch(launch_type, ix) { \
INC_COLL_TRACE \
collTrace->funcIndex = ncclShmem.funcId; \
__trace_hwreg()\
collTrace->batchIx = ix; \
if (ncclShmem.workType == ncclDevWorkTypeP2p) { \
struct ncclDevWorkP2p *p2pWork = (struct ncclDevWorkP2p*)ncclShmem.workStorage; \
collTrace->p2p.sendRank = p2pWork->sendRank; \
collTrace->p2p.recvRank = p2pWork->recvRank; \
collTrace->p2p.nP2pChannels = p2pWork->nP2pChannels; \
collTrace->p2p.nSendChannels = p2pWork->nSendChannels; \
collTrace->p2p.nRecvChannels = p2pWork->nRecvChannels; \
collTrace->p2p.channelBase = p2pWork->channelBase; \
@@ -60,6 +60,8 @@
collTrace->p2p.recvConnIndex = p2pWork->recvConnIndex; \
collTrace->p2p.sendProtoLL = p2pWork->sendProtoLL; \
collTrace->p2p.recvProtoLL = p2pWork->recvProtoLL; \
collTrace->p2p.sendRegistered = p2pWork->sendRegistered; \
collTrace->p2p.recvRegistered = p2pWork->recvRegistered; \
collTrace->p2pOpCount[0] = p2pWork->sendOpCount; \
collTrace->p2pOpCount[1] = p2pWork->recvOpCount; \
collTrace->type = (launch_type) | ncclCollTraceP2pElemType; \
@@ -95,7 +97,7 @@
collTrace->type = ncclCollTraceDataType; \
}
#else
#define traceKernelLaunch(launch_type)
#define traceKernelLaunch(launch_type, batchIx)
#define traceKernelEnd(end_type)
#define traceData(data2, data4, data8_0, data8_1)
#endif
@@ -151,6 +153,9 @@ struct ncclShmemData {
#ifdef ENABLE_PROFILING
struct ncclProf prof;
#endif
#ifdef ENABLE_FAULT_INJECTION
uint64_t faults;
#endif
};
extern __shared__ ncclShmemData ncclShmem;
@@ -160,6 +165,28 @@ extern __shared__ ncclShmemData ncclShmem;
extern __shared__ ulong2 ncclShmemPerWarp[ncclShmemScratchWarpSize()*(NCCL_MAX_NTHREADS/WARP_SIZE)/sizeof(ulong2)];
#endif
#ifdef ENABLE_FAULT_INJECTION
__device__ inline void insert_random_delay_per_warp() {
if ((ncclShmem.faults & RANDOM_DELAY_ON_WARP_START) && (threadIdx.x%WARP_SIZE == 0)) {
switch ((wall_clock64()>>(threadIdx.x/WARP_SIZE*2))&0x3) {
case 0:
__builtin_amdgcn_s_sleep(0);
break;
case 1:
__builtin_amdgcn_s_sleep(8);
break;
case 2:
__builtin_amdgcn_s_sleep(16);
break;
case 3:
default:
__builtin_amdgcn_s_sleep(32);
break;
}
}
}
#endif
__device__ inline void* ncclScratchForWarp(int warp) {
return (char*)ncclShmemPerWarp + warp*ncclShmemScratchWarpSize();
}
@@ -468,6 +495,10 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a
ncclShmem.groups[tid-WARP_SIZE].barrier = 0;
break;
case 2:
#ifdef ENABLE_FAULT_INJECTION
/* load faults injection before first sync threads */
if (tid == 2*WARP_SIZE) ncclShmem.faults = args->comm->faults;
#endif
break;
case 3:
/* set abort flag to 0 */
@@ -508,6 +539,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a
}
#endif
__syncthreads(); // publish shmem
#ifdef ENABLE_PROFILING
if (tid == 0) {
ncclShmem.prof.count = 0;
@@ -515,7 +547,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a
}
#endif
if (tid == 0) __insert_timestamp(__LINE__);
if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceKernelLaunchType);
if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceKernelLaunchType, 0);
if (tid == 0 && ncclShmem.args.workStorageType == ncclDevWorkStorageTypeFifo) {
// ncclShmem.workConsumed written by loadWorkBatchToShmem before __syncthreads()
@@ -565,7 +597,7 @@ __device__ __forceinline__ void ncclKernelMain(struct ncclDevKernelArgs const* a
}
if (aborted) break;
if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceCollLaunchType);
if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelLaunch(ncclCollTraceCollLaunchType, batchIx);
}
if (COLLTRACE && tid%WARP_SIZE == 0) traceKernelEnd(ncclCollTraceKernelEndType);
+4
Visa fil
@@ -585,6 +585,10 @@ struct ncclComm {
bool collTraceEnabled;
#endif
#ifdef ENABLE_FAULT_INJECTION
uint64_t faults;
#endif
ncclConfig_t config;
// initState is to more conveniently reclaim resources when errors happen.
ncclResult_t initState;
+12 -2
Visa fil
@@ -433,7 +433,8 @@ struct ncclCollTrace {
uint8_t type;
uint8_t bid;
int16_t funcIndex;
uint32_t data_0:24;
uint16_t data_0;
uint8_t batchIx;
uint8_t tid;
uint8_t channelId;
uint64_t timeStamp:56;
@@ -452,7 +453,6 @@ struct ncclCollTrace {
struct {
uint8_t sendRank;
uint8_t recvRank;
uint8_t nP2pChannels;
uint8_t nSendChannels;
uint8_t nRecvChannels;
uint8_t channelBase;
@@ -460,6 +460,8 @@ struct ncclCollTrace {
uint8_t recvConnIndex:2;
uint8_t sendProtoLL:1;
uint8_t recvProtoLL:1;
uint8_t sendRegistered:1;
uint8_t recvRegistered:1;
} p2p;
};
};
@@ -518,8 +520,16 @@ struct ncclDevComm {
#ifdef ENABLE_PROFILING
struct ncclProf* devProf;
#endif
#ifdef ENABLE_FAULT_INJECTION
uint64_t faults;
#endif
};
#ifdef ENABLE_FAULT_INJECTION
#define RANDOM_DELAY_ON_WARP_START 0x1L
#endif
struct alignas(16) ncclDevCommAndChannels {
struct ncclDevComm comm;
struct ncclDevChannel channels[MAXCHANNELS];
+21 -5
Visa fil
@@ -266,8 +266,8 @@ void *ncclCommThreadMain(void *arg) {
if (type == ncclCollTraceCollElemType) {
sprintf(line+offset, " CE %s nw %d bi %d nc %d root %d busId %lx nRanks %d", funcNames[fIdx], td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
} else if (type == ncclCollTraceP2pElemType) {
sprintf(line+offset, " Send %d -> %d/%d connIdx/LL %d/%d -> Recv %d nc %d cb %d busId %lx nRanks %d",
td->p2p.sendRank, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRank, td->p2p.nP2pChannels, td->p2p.channelBase,
sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d",
td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase,
comm->busId, comm->nRanks);
} else {
switch (type&0xf) {
@@ -276,13 +276,13 @@ void *ncclCommThreadMain(void *arg) {
if ((type&0xf) == ncclCollTraceKernelLaunchType)
sprintf(line+offset, " KL HWID %8x %s", td->data_0, funcNames[fIdx]);
else if ((type&0xf) == ncclCollTraceCollLaunchType)
sprintf(line+offset, " CL %s", funcNames[fIdx]);
sprintf(line+offset, " CL %d %s", td->batchIx, funcNames[fIdx]);
offset = strlen(line);
if ((type&0xf0) == ncclCollTraceCollElemType)
sprintf(line+offset, " nw %d bi %d nc %d root %d busId %lx nRanks %d", td->coll.nWarps, td->coll.bid, td->coll.nChannels, td->coll.root, comm->busId, comm->nRanks);
else if ((type&0xf0) == ncclCollTraceP2pElemType)
sprintf(line+offset, " Send %d -> %d/%d ConnIdx/LL %d/%d -> Recv %d nc %d cb %d busId %lx nRanks %d",
td->p2p.sendRank, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRank, td->p2p.nP2pChannels, td->p2p.channelBase,
sprintf(line+offset, " Recv %d -> %d/%d/%d/%d ConnIdx/LL/Reg/nc %d/%d/%d/%d -> Send %d cb %d busId %lx nRanks %d",
td->p2p.recvRank, td->p2p.recvConnIndex, td->p2p.recvProtoLL, td->p2p.recvRegistered, td->p2p.nRecvChannels, td->p2p.sendConnIndex, td->p2p.sendProtoLL, td->p2p.sendRegistered, td->p2p.nSendChannels, td->p2p.sendRank, td->p2p.channelBase,
comm->busId, comm->nRanks);
break;
case ncclCollTraceKernelEndType:
@@ -544,6 +544,8 @@ exit:
return ret;
}
RCCL_PARAM(InjectFaults, "INJECT_FAULTS", 0);
static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, int ndev, int rank) {
if (ndev < 1) {
WARN("invalid device count (%d) requested", ndev);
@@ -608,6 +610,16 @@ static ncclResult_t commAlloc(struct ncclComm* comm, struct ncclComm* parent, in
comm->collTraceThread = 0;
}
#endif
if (rcclParamInjectFaults() != 0) {
#ifdef ENABLE_FAULT_INJECTION
comm->faults = rcclParamInjectFaults();
if (comm->rank == 0) INFO(NCCL_INIT, "Enabled RCCL faults injection with value 0x%lx", comm->faults);
#else
WARN("Ignore faults injection of value 0x%lx as RCCL is not compiled to support it", rcclParamInjectFaults());
#endif
}
comm->collNetSupport = 0;
memset(comm->collNetSupportMatrix, 0, sizeof(comm->collNetSupportMatrix));
@@ -763,6 +775,10 @@ static ncclResult_t devCommSetup(ncclComm_t comm) {
NCCLCHECK(ncclCudaCalloc(&tmpCommAndChans.comm.devProf, MAXCHANNELS*PROFILE_NUM_LAUNCHES, comm->sideStream));
#endif
#ifdef ENABLE_FAULT_INJECTION
tmpCommAndChans.comm.faults = comm->faults;
#endif
NCCLCHECKGOTO(ncclCudaMemcpyAsync(devCommAndChans, &tmpCommAndChans, 1, comm->sharedRes->deviceStream.cudaStream), ret, fail);
exit:
NCCLCHECK(ncclStrongStreamSynchronize(&comm->sharedRes->deviceStream));