Files
rocm-systems/src/ipc_policy.cpp
T
Omri Mor a0fcbf8d35 Unify environment variable management (#235)
* Add environment variable configuration infrastructure
  - Namespace rocshmem::envvar
  - Track all config env vars in per-category lists
  - Remove duplicates from list of allowed env var types
  - Reject negative inputs for unsigned integer types
  - Accept empty strings for std::string
  - Print error source location using C++20 std::source_location
  - Unit tests
* Port environment variables
  - ROCSHMEM_UNIQUEID_WITH_MPI
  - ROCSHMEM_RO_DISABLE_IPC
  - ROCSHMEM_BOOTSTRAP_TIMEOUT
  - ROCSHMEM_BOOTSTRAP_HOSTID
  - ROCSHMEM_BOOTSTRAP_SOCKET_IFNAME
  - ROCSHMEM_RO_PROGRESS_DELAY
  - ROCSHMEM_BOOTSTRAP_SOCKET_FAMILY
  - ROCSHMEM_MAX_NUM_CONTEXTS
    + Merge the independent per-backend copies into a single variable
      that is used by all three backends (IPC, RO, GDA).
    + Set default to 32 (for GDA); prior default for IPC and RO was 1024.
  - ROCSHMEM_MAX_NUM_HOST_CONTEXTS
  - ROCSHMEM_MAX_WF_BUFFERS
  - ROCSHMEM_SQ_SIZE
  - ROCSHMEM_RO_NET_CPU_QUEUE
    + Renamed from RO_NET_CPU_QUEUE
    + Change env var input type to bool, default to false
    + Invert code logic: setting RO_NET_CPU_QUEUE to anything
      would /disable/ a variable gpu_queue, which defaulted to true.
      Variable is now named config::ro::net_cpu_queue,
      with all prior checks for gpu_queue inverted.
  - ROCSHMEM_USE_IB_HCA
  - ROCSHMEM_HEAP_SIZE
    + Defaults to 1L << 30 i.e. 1 GiB,
      from default heap size in memory/heap_memory.hpp.
  - ROCSHMEM_MAX_NUM_TEAMS
    + Unlike other env vars, this can be referenced from devices.
    + Function currently narrows from size_t to int: uses need to be audited
      for safety and correctness in using size_t directly.
  - ROCSHMEM_GDA_ALTERNATE_QP_PORTS
* New env var ROCSHMEM_DEBUG
  - Debug levels:
    + NONE
    + VERSION
    + WARN
    + INFO
    + TRACE
  - Currently unused - will be added later
  - Mirrors RCCL debug control
* Remove rocshmem::rocshmem_env_config
* Change interface for GetClosestNicToGpu
  to accept const char** instead of char**:
  the pointed-to string does not need to be modified
  - Files were not audited for inclusion of util.hpp only for env vars
---------
Signed-off-by: Omri Mor <Omri.Mor@amd.com>
2025-10-06 10:05:57 -07:00

224 خطوط
7.3 KiB
C++

/******************************************************************************
* Copyright (c) Advanced Micro Devices, Inc. All rights reserved.
*
* SPDX-License-Identifier: MIT
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to
* deal in the Software without restriction, including without limitation the
* rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
* sell copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*****************************************************************************/
#include "ipc_policy.hpp"
#include "rocshmem/rocshmem_config.h" // NOLINT(build/include_subdir)
#include "backend_bc.hpp"
#include "context_incl.hpp"
#include "envvar.hpp"
#include "util.hpp"
namespace rocshmem {
__host__ void IpcOnImpl::ipcHostInit(int my_pe, const HEAP_BASES_T &heap_bases,
MPI_Comm thread_comm) {
/*
* Create an MPI communicator that deals only with local processes.
*/
MPI_Comm shmcomm;
mpilib_ftable_.Comm_split_type(thread_comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL,
&shmcomm);
/*
* Figure out how many local process there are.
*/
int Shm_size;
mpilib_ftable_.Comm_size(shmcomm, &Shm_size);
shm_size = Shm_size;
/*
* Figure out how this process' rank among local processes.
*/
mpilib_ftable_.Comm_rank(shmcomm, &shm_rank);
/*
* Allocate a host-side c-array to hold the IPC handles.
*/
void *ipc_mem_handle_uncast = malloc(shm_size * sizeof(hipIpcMemHandle_t));
hipIpcMemHandle_t *vec_ipc_handle =
reinterpret_cast<hipIpcMemHandle_t *>(ipc_mem_handle_uncast);
/*
* Call into the hip runtime to get an IPC handle for my symmetric
* heap and store that IPC handle into the host-side c-array which was
* just allocated.
*/
char *base_heap = heap_bases[my_pe];
CHECK_HIP(hipIpcGetMemHandle(&vec_ipc_handle[shm_rank], base_heap));
/*
* Do an all-to-all exchange with each local processing element to
* share the symmetric heap IPC handles.
*/
mpilib_ftable_.Allgather(MPI_IN_PLACE, sizeof(hipIpcMemHandle_t), MPI_CHAR,
vec_ipc_handle, sizeof(hipIpcMemHandle_t), MPI_CHAR, shmcomm);
/*
* Allocate device-side array to hold the IPC symmetric heap base
* addresses.
*/
char **ipc_base;
CHECK_HIP(hipMalloc(reinterpret_cast<void **>(&ipc_base),
shm_size * sizeof(char **)));
/*
* For all local processing elements, initialize the device-side array
* with the IPC symmetric heap base addresses.
*/
for (int i = 0; i < shm_size; i++) {
if (i != shm_rank) {
void **ipc_base_uncast = reinterpret_cast<void **>(&ipc_base[i]);
CHECK_HIP(hipIpcOpenMemHandle(ipc_base_uncast, vec_ipc_handle[i],
hipIpcMemLazyEnablePeerAccess));
} else {
ipc_base[i] = base_heap;
}
}
/*
* Set member variables used by subsequent method calls.
*/
ipc_bases = ipc_base;
/*
* Free the host-side memory used to exchange the symmetric heap base
* addresses.
*/
free(vec_ipc_handle);
if (!envvar::ro::disable_ipc) {
int thread_comm_rank {-1};
CHECK_HIP(hipMalloc(reinterpret_cast<void**>(&pes_with_ipc_avail), shm_size * sizeof(int)));
MPI_Group thread_grp;
MPI_Group shm_grp;
mpilib_ftable_.Comm_group(thread_comm, &thread_grp);
mpilib_ftable_.Comm_group(shmcomm, &shm_grp);
int *seqranks = new int[shm_size];
for(int i = 0; i < shm_size; i++)
seqranks[i] = i;
mpilib_ftable_.Group_translate_ranks(shm_grp, shm_size, seqranks, thread_grp, pes_with_ipc_avail);
delete [] seqranks;
mpilib_ftable_.Group_free(&shm_grp);
mpilib_ftable_.Group_free(&thread_grp);
}
}
__host__ void IpcOnImpl::ipcHostInit(int my_pe, const HEAP_BASES_T &heap_bases,
TcpBootstrap *bootstr) {
shm_size = bootstr->getNranksPerNode();
auto shm_ranks = bootstr->getLocalRanks();
shm_rank = std::find(shm_ranks.begin(), shm_ranks.end(), my_pe) - shm_ranks.begin();
/*
* Allocate a host-side c-array to hold the IPC handles.
*/
void *ipc_mem_handle_uncast = malloc(shm_size * sizeof(hipIpcMemHandle_t));
hipIpcMemHandle_t *vec_ipc_handle =
reinterpret_cast<hipIpcMemHandle_t *>(ipc_mem_handle_uncast);
/*
* Call into the hip runtime to get an IPC handle for my symmetric
* heap and store that IPC handle into the host-side c-array which was
* just allocated.
*/
char *base_heap = heap_bases[my_pe];
CHECK_HIP(hipIpcGetMemHandle(&vec_ipc_handle[shm_rank], base_heap));
/*
* Do an all-to-all exchange with each local processing element to
* share the symmetric heap IPC handles.
*/
bootstr->groupAllGather(vec_ipc_handle, sizeof(hipIpcMemHandle_t), shm_ranks);
/*
* Allocate device-side array to hold the IPC symmetric heap base
* addresses.
*/
char **ipc_base;
CHECK_HIP(hipMalloc(reinterpret_cast<void **>(&ipc_base),
shm_size * sizeof(char **)));
/*
* For all local processing elements, initialize the device-side array
* with the IPC symmetric heap base addresses.
*/
for (int i = 0; i < shm_size; i++) {
if (i != shm_rank) {
void **ipc_base_uncast = reinterpret_cast<void **>(&ipc_base[i]);
CHECK_HIP(hipIpcOpenMemHandle(ipc_base_uncast, vec_ipc_handle[i],
hipIpcMemLazyEnablePeerAccess));
} else {
ipc_base[i] = base_heap;
}
}
/*
* Set member variables used by subsequent method calls.
*/
ipc_bases = ipc_base;
/*
* Free the host-side memory used to exchange the symmetric heap base
* addresses.
*/
free(vec_ipc_handle);
if (!envvar::ro::disable_ipc) {
int thread_comm_rank {-1};
CHECK_HIP(hipMalloc(reinterpret_cast<void**>(&pes_with_ipc_avail), shm_size * sizeof(int)));
std::copy(shm_ranks.begin(), shm_ranks.end(), pes_with_ipc_avail);
}
}
__host__ void IpcOnImpl::ipcHostStop() {
for (int i = 0; i < shm_size; i++) {
if (i != shm_rank) {
CHECK_HIP(hipIpcCloseMemHandle(ipc_bases[i]));
}
}
CHECK_HIP(hipFree(ipc_bases));
if (nullptr != pes_with_ipc_avail) {
CHECK_HIP(hipFree(pes_with_ipc_avail));
}
}
__device__ void IpcOnImpl::ipcCopy(void *dst, void *src, size_t size) {
memcpy(dst, src, size);
}
__device__ void IpcOnImpl::ipcCopy_wave(void *dst, void *src, size_t size) {
memcpy_wave(dst, src, size);
}
__device__ void IpcOnImpl::ipcCopy_wg(void *dst, void *src, size_t size) {
memcpy_wg(dst, src, size);
}
} // namespace rocshmem