From 8d2504d6c15eff20ea904148a11b989b34c0b91d Mon Sep 17 00:00:00 2001 From: Edgar Gabriel Date: Tue, 6 Jan 2026 10:15:38 -0600 Subject: [PATCH] fix reduction test for gfx1201 (#374) * fix reduction for gfx942 and 1201 match the synchronizaation of internal_putmem_wg and internal_getmem_wg to their non-internal counterparts. the internal_putmem_wg is used in the ipc reduction * move specialization to internal_putmem --- scripts/functional_tests/driver.sh | 3 --- src/ipc/context_ipc_device.cpp | 24 ++++++++++++++++++++++++ src/ipc/context_ipc_tmpl_device.hpp | 15 +++------------ 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/scripts/functional_tests/driver.sh b/scripts/functional_tests/driver.sh index 53aa28b71b..b1f4597b80 100755 --- a/scripts/functional_tests/driver.sh +++ b/scripts/functional_tests/driver.sh @@ -178,7 +178,6 @@ ExecTest() { fi CMD+=" >> $LOG_DIR/$TEST_LOG_NAME.log 2>&1" - # Run Test if [ $NUM_GPUS -ge $NUM_RANKS ] || [[ "" != "$HOSTFILE" ]]; then echo $TEST_LOG_NAME @@ -452,7 +451,6 @@ TestColl() { ExecTest "teambroadcast" 2 1 64 32768 - ExecTest "fcollect" 2 1 64 512 ExecTest "fcollect" 2 1 64 32768 ExecTest "teamreduction" 2 1 64 32768 @@ -641,7 +639,6 @@ TestGDA() { ExecTest "teambroadcast" 2 1 1 32768 - ExecTest "fcollect" 2 1 1 512 ExecTest "fcollect" 2 1 1 32768 # deadlock on gda, size 8KB diff --git a/src/ipc/context_ipc_device.cpp b/src/ipc/context_ipc_device.cpp index bc0ef91d09..3ed4831d70 100644 --- a/src/ipc/context_ipc_device.cpp +++ b/src/ipc/context_ipc_device.cpp @@ -164,7 +164,13 @@ __device__ void IPCContext::internal_putmem(void *dest, const void *source, size_t nelems, int pe) { uint64_t L_offset = reinterpret_cast(dest) - wrk_sync_pool_bases_[my_pe]; memcpy_lane(wrk_sync_pool_bases_[pe] + L_offset, const_cast(source), nelems); +#if defined(__gfx90a__) + __threadfence_system(); +#elif defined (__gfx1201__) || defined (__gfx1100__) + fence(pe); +#else ipcImpl_.ipcFence(); +#endif } __device__ void IPCContext::internal_getmem(void *dest, const void *source, @@ -180,6 +186,15 @@ __device__ void IPCContext::internal_putmem_wg(void *dest, const void *source, uint64_t L_offset = reinterpret_cast(dest) - wrk_sync_pool_bases_[my_pe]; memcpy_wg(wrk_sync_pool_bases_[pe] + L_offset, const_cast(source), nelems); __syncthreads(); +#if defined(__gfx90a__) + __threadfence_system(); +#elif defined (__gfx1201__) || defined (__gfx1100__) + if (is_thread_zero_in_block() ) { + fence(pe); + } +#else + ipcImpl_.ipcFence(); +#endif } __device__ void IPCContext::internal_getmem_wg(void *dest, const void *source, @@ -188,13 +203,22 @@ __device__ void IPCContext::internal_getmem_wg(void *dest, const void *source, uint64_t L_offset = const_cast(src_typed) - wrk_sync_pool_bases_[my_pe]; memcpy_wg(dest, wrk_sync_pool_bases_[pe] + L_offset, nelems); __syncthreads(); + ipcImpl_.ipcFence(); } __device__ void IPCContext::internal_putmem_wave(void *dest, const void *source, size_t nelems, int pe) { uint64_t L_offset = reinterpret_cast(dest) - wrk_sync_pool_bases_[my_pe]; memcpy_wave(wrk_sync_pool_bases_[pe] + L_offset, const_cast(source), nelems); +#if defined(__gfx90a__) + __threadfence_system(); +#elif defined (__gfx1201__) || defined (__gfx1100__) + if (is_thread_zero_in_wave() ) { + fence(pe); + } +#else ipcImpl_.ipcFence(); +#endif } __device__ void IPCContext::internal_getmem_wave(void *dest, diff --git a/src/ipc/context_ipc_tmpl_device.hpp b/src/ipc/context_ipc_tmpl_device.hpp index f4a76a9b7d..36f1c17247 100644 --- a/src/ipc/context_ipc_tmpl_device.hpp +++ b/src/ipc/context_ipc_tmpl_device.hpp @@ -217,7 +217,6 @@ __device__ void IPCContext::internal_direct_allreduce( threadfence_system(); } } - __syncthreads(); for (int i = wg_id; i < num_pes; i += wg_size) { @@ -311,7 +310,7 @@ __device__ void IPCContext::internal_ring_allreduce( for (int seg = 0; seg < n_seg; seg++) { off_seg = seg * seg_size; - // Loop 2 in the algorithm above + // Loop 1 in the algorithm above for (int iter = 0; iter < PE_size - 1; iter++) { off_send = (((my_pe_in_team + 1 - iter + 2 * PE_size) % PE_size) * chunk_size); off_recv = (((my_pe_in_team - iter + 2 * PE_size) % PE_size) * chunk_size); @@ -322,12 +321,8 @@ __device__ void IPCContext::internal_ring_allreduce( if (is_thread_zero_in_block()) { fence(); - wait_val = seg + 100; internal_putmem(&pSync[iter], &wait_val, sizeof(*pSync), send_pe); -#if defined(__gfx90a__) - __threadfence_system(); -#endif /* __gfx90a__ */ wait_until(&pSync[iter], ROCSHMEM_CMP_EQ, wait_val); } __syncthreads(); @@ -344,21 +339,18 @@ __device__ void IPCContext::internal_ring_allreduce( if (is_thread_zero_in_block()) { fence(); - wait_val = seg + 100; + wait_val = seg + 10; internal_putmem(&pSync[iter], &wait_val, sizeof(*pSync), send_pe); -#if defined(__gfx90a__) - __threadfence_system(); -#endif /* __gfx90a__ */ wait_until(&pSync[iter], ROCSHMEM_CMP_EQ, wait_val); } __syncthreads(); } } - __syncthreads(); for (int i = wg_id; i < 2 * num_pes - 2; i += wg_size) { pSync[i] = ROCSHMEM_SYNC_VALUE; } + threadfence_system(); __syncthreads(); } @@ -400,7 +392,6 @@ __device__ int IPCContext::reduce(rocshmem_team_t team, T *dest, const T *p_src = (source + (n_seg * seg_size)); int p_count = nreduce - (n_seg * seg_size); int p_chunk = p_count / PE_size; - internal_ring_allreduce(p_dst, p_src, p_count, team_obj, 1, (p_chunk * PE_size), p_chunk);