Files
Anatolii Rozanov f98c72d627 Add host API for *_on_stream operations (#340)
* Add functional test for barrier_all_on_stream

* Add rocshmem_barrier_all_on_stream support for GDA and RO backends

Implements rocshmem_barrier_all_on_stream operation for
GPU Direct Access and Reverse Offload backends.

Previously, rocshmem_barrier_all_on_stream was only supported for IPC backend.

* Add functional test for rocshmem_broadcastmem_on_stream

* Add host-side rocshmem_broadcastmem_on_stream API

Implement stream-based broadcast collective operation

- Add rocshmem_broadcastmem_on_stream host API and kernel implementation
- Add functional test TeamBroadcastmemOnStreamTester with multi-stream
  support and correctness verification
- Use per-workgroup contexts to avoid contention across parallel streams

API:
rocshmem_broadcastmem_on_stream(team, dest, source, nelems, pe_root, stream)

* Add functional test for rocshmem_getmem_on_stream

* Add host-side rocshmem_getmem_on_stream API

Implement stream-based point-to-point RMA get operation

- Add rocshmem_getmem_on_stream host API and kernel implementation
- Support for asynchronous getmem operations on HIP streams
- Add backend support for GDA, RO, and IPC contexts
- Use work-group collective getmem for efficient memory transfer

API:
rocshmem_getmem_on_stream(dest, source, nelems, pe, stream)

(AI Assist)

* Add host-side rocshmem_putmem_on_stream API

- Add rocshmem_putmem_on_stream for asynchronous remote writes
- Support for concurrent RMA operations on HIP streams
- Add backend support for GDA, RO, and IPC contexts
- Use work-group device collective operation

API:
rocshmem_putmem_on_stream(dest, source, bytes, pe, stream)

(AI Assist)

* Add functional test for rocshmem_putmem_on_stream

* Add host-side rocshmem_putmem_signal_on_stream API

Enables asynchronous putmem operations with signaling on HIP streams.

The implementation includes:
- Kernel wrapper rocshmem_putmem_signal_kernel
- Host interface putmem_signal_on_stream method
- Context layer support across all backends (IPC, GDA, RO)
- Public API

Function signature:
void rocshmem_putmem_signal_on_stream(void *dest, const void *source,
                                      size_t bytes, uint64_t *sig_addr,
                                      uint64_t signal, int sig_op,
                                      int pe, hipStream_t stream);

* Add functional test for rocshmem_putmem_signal_on_stream

* Add host-side rocshmem_signal_wait_until_on_stream API

Enables asynchronous signal wait operations on HIP streams.

The implementation includes:
- Kernel wrapper rocshmem_signal_wait_until_kernel
- Host interface signal_wait_until_on_stream method
- Context layer support across all backends (IPC, GDA, RO)
- Native uint64_t support in wait_until API (generated from P2P_SYNC.py)

Function signature:
void rocshmem_signal_wait_until_on_stream(uint64_t *sig_addr, int cmp,
                                          uint64_t cmp_value,
                                          hipStream_t stream);

(AI Assist)

* Add functional test for rocshmem_signal_wait_until_on_stream

* Add documentation for stream API functions

This commit adds API documentation for the following host-side
stream functions:

- rocshmem_barrier_all_on_stream (collective routines)
- rocshmem_broadcastmem_on_stream (collective routines)
- rocshmem_getmem_on_stream (RMA operations)
- rocshmem_putmem_on_stream (RMA operations)
- rocshmem_putmem_signal_on_stream (signaling operations)
- rocshmem_signal_wait_until_on_stream (point-to-point sync)

The documentation includes function signatures, parameter descriptions,
and detailed explanations of asynchronous behavior and stream handling.

(AI Assist)

* Rename "bytes" -> "nelems"

* Add "_TEST_" to the variables used in tests

* Remove incorrect hipStreamDefault usage

hipStreamDefault is not a default stream. This is a flag.

If stream == nullptr, then just pass it to kernel. It will launch the kernel on the default stream

[ROCm/rocshmem commit: d0c8380650]
2025-12-09 08:55:46 -06:00

178 lines
6.3 KiB
Python

###############################################################################
# Copyright (c) Advanced Micro Devices, Inc. All rights reserved.
#
# SPDX-License-Identifier: MIT
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
###############################################################################
import os
types = [
("float", "float"),
("double", "double"),
("char", "char"),
("signed char", "schar"),
("short", "short"),
("int", "int"),
("long", "long"),
("long long", "longlong"),
("unsigned char", "uchar"),
("unsigned short", "ushort"),
("unsigned int", "uint"),
("unsigned long", "ulong"),
("unsigned long long", "ulonglong"),
("uint64_t", "uint64"),
]
def wait_until_api(T, TNAME):
return (
f"__device__ void rocshmem_{TNAME}_wait_until(\n"
f" {T} *ivars, int cmp, {T} val);\n"
f"__device__ size_t rocshmem_{TNAME}_wait_until_any(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__device__ void rocshmem_{TNAME}_wait_until_all(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__device__ size_t rocshmem_{TNAME}_wait_until_some(\n"
f" {T} *ivars, size_t nelems, size_t* indices, const int* status,\n"
f" int cmp, {T} val);\n"
f"__device__ size_t rocshmem_{TNAME}_wait_until_any_vector(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__device__ void rocshmem_{TNAME}_wait_until_all_vector(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__device__ size_t rocshmem_{TNAME}_wait_until_some_vector(\n"
f" {T} *ivars, size_t nelems, size_t* indices, const int* status,\n"
f" int cmp, {T} val);\n"
f"__host__ void rocshmem_{TNAME}_wait_until(\n"
f" {T} *ivars, int cmp, {T} val);\n"
f"__host__ size_t rocshmem_{TNAME}_wait_until_any(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__host__ void rocshmem_{TNAME}_wait_until_all(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__host__ size_t rocshmem_{TNAME}_wait_until_some(\n"
f" {T} *ivars, size_t nelems, size_t* indices, const int* status,\n"
f" int cmp, {T} val);\n"
f"__host__ size_t rocshmem_{TNAME}_wait_until_any_vector(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__host__ void rocshmem_{TNAME}_wait_until_all_vector(\n"
f" {T} *ivars, size_t nelems, const int* status,\n"
f" int cmp, {T} val);\n"
f"__host__ size_t rocshmem_{TNAME}_wait_until_some_vector(\n"
f" {T} *ivars, size_t nelems, size_t* indices, const int* status,\n"
f" int cmp, {T} val);\n\n"
)
def generate_wait_until_api():
expanded_code = """
/**
* @name SHMEM_WAIT_UNTIL
* @brief Block the caller until the condition (* \p ptr \p cmps \p val) is
* true.
*
* This function can be called from divergent control paths at per-thread
* granularity. However, performance may be improved if the caller can
* coalesce contiguous messages and elect a leader thread to call into the
* ROCSHMEM function.
*
* @param[in] ivars Pointer to memory on the symmetric heap to wait for.
* @param[in] cmp Operation for the comparison.
* @param[in] val Value to compare the memory at \p ptr to.
*
* @return void
*/\n"""
for type_, tname_ in types:
expanded_code += wait_until_api(type_, tname_)
return expanded_code
def test_api(T, TNAME):
return (
f"__device__ int rocshmem_{TNAME}_test(\n"
f" {T} *ivars, int cmp, {T} val);\n"
f"__host__ int rocshmem_{TNAME}_test(\n"
f" {T} *ivars, int cmp, {T} val);\n\n"
)
def generate_test_api():
expanded_code = """
/**
* @name SHMEM_TEST
* @brief test if the condition (* \p ptr \p cmps \p val) is
* true.
*
* This function can be called from divergent control paths at per-thread
* granularity. However, performance may be improved if the caller can
* coalesce contiguous messages and elect a leader thread to call into the
* ROCSHMEM function.
*
* @param[in] ivars Pointer to memory on the symmetric heap to wait for.
* @param[in] cmp Operation for the comparison.
* @param[in] val Value to compare the memory at \p ptr to.
*
* @return 1 if the evaluation is true else 0
*/\n"""
for type_, tname_ in types:
expanded_code += test_api(type_, tname_)
return expanded_code
def write_to_file(filename, content):
with open(filename, 'w') as file:
file.write(content)
def generate_P2P_SYNC_header(output_dir, copyright):
expanded_code = copyright
expanded_code += """
#ifndef LIBRARY_INCLUDE_ROCSHMEM_P2P_SYNC_HPP
#define LIBRARY_INCLUDE_ROCSHMEM_P2P_SYNC_HPP
namespace rocshmem {
"""
expanded_code += (
generate_wait_until_api() +
generate_test_api()
)
expanded_code += """
} // namespace rocshmem
#endif // LIBRARY_INCLUDE_ROCSHMEM_P2P_SYNC_HPP
"""
output_file = os.path.join(
output_dir, 'rocshmem_P2P_SYNC.hpp'
)
write_to_file(output_file, expanded_code)