From cfa97eccd359b0053e7fcb720b7de4224aedc138 Mon Sep 17 00:00:00 2001 From: Wenkai Du Date: Fri, 5 Jun 2020 23:23:46 +0000 Subject: [PATCH] Add IB/RDMA unit test --- src/include/socket.h | 4 + tools/ib-test/Makefile | 20 ++ tools/ib-test/ib_test.cpp | 461 +++++++++++++++++++++++++++++++++++ tools/ib-test/include/nccl.h | 354 +++++++++++++++++++++++++++ tools/ib-test/utils.cpp | 119 +++++++++ 5 files changed, 958 insertions(+) create mode 100755 tools/ib-test/Makefile create mode 100755 tools/ib-test/ib_test.cpp create mode 100755 tools/ib-test/include/nccl.h create mode 100755 tools/ib-test/utils.cpp diff --git a/src/include/socket.h b/src/include/socket.h index 46b204db08..a3246eabb8 100644 --- a/src/include/socket.h +++ b/src/include/socket.h @@ -332,6 +332,10 @@ static ncclResult_t createListenSocket(int *fd, union socketAddress *localAddr) return ncclSystemError; } +#if defined(RCCL_IB_TEST) + localAddr->sin.sin_port = htons(23456); +#endif + if (socketToPort(&localAddr->sa)) { // Port is forced by env. Make sure we get the port. int opt = 1; diff --git a/tools/ib-test/Makefile b/tools/ib-test/Makefile new file mode 100755 index 0000000000..2cdd46e7e4 --- /dev/null +++ b/tools/ib-test/Makefile @@ -0,0 +1,20 @@ +# Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. +HIP_PATH ?= $(wildcard /opt/rocm/hip) +ifeq (,$(HIP_PATH)) +HIP_PATH = ../../.. +endif +HIPCC = $(HIP_PATH)/bin/hipcc + +EXE = ib_test +CXXFLAGS = -g -O3 -Iinclude -I../../src/include -DENABLE_TRACE -DRCCL_IB_TEST -ldl -lnuma + +files = $(EXE).cpp utils.cpp ../../src/transport/net_ib.cc ../../src/misc/ibvwrap.cc ../../src/debug.cc + +all: $(EXE) + +$(EXE): $(files) + $(HIPCC) $(CXXFLAGS) $^ -o $@ + #scp $(EXE) rocm-framework-3:$(shell pwd) + +clean: + rm -f *.o $(EXE) diff --git a/tools/ib-test/ib_test.cpp b/tools/ib-test/ib_test.cpp new file mode 100755 index 0000000000..27c41218c0 --- /dev/null +++ b/tools/ib-test/ib_test.cpp @@ -0,0 +1,461 @@ +/************************************************************************* + * Copyright (c) 2016-2020, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "comm.h" +#include "net.h" +#include "graph.h" +#include +#include +#include +#include +#include + +ncclResult_t initNet(); + +char* getCmdOption(char ** begin, char ** end, const std::string & option) { + char ** itr = std::find(begin, end, option); + if (itr != end && ++itr != end) + { + return *itr; + } + return 0; +} + +bool cmdOptionExists(char** begin, char** end, const std::string& option) { + return std::find(begin, end, option) != end; +} + +#define DEFAULT_BUFFSIZE (1LL << 22) /* 4MiB */ +#define SLICE_STEPS 4 +#define ITERATIONS 2000 +#define VEGA_GPU_RTC_FREQUENCY 2.5E7 +#define ENABLE_VALIDATION + +typedef ulong2 Pack128; + +struct sockaddr_in netConnectAddr; +void* netSendComm; +int netSendDev; +char *sendDevBuffer; +char *sendHostBuffer; +void *sendDevHandle; +void *sendHostHandle; +int sendBuffSize; +uint64_t *sendHead, *sendTail, *sourceCycle, *sourceBytes; +struct timeval send_tvs; +uint64_t send_sizes; +int send_active_req; +float send_bw_cumulative; +int send_bw_count; + +struct sockaddr_in netListenAddr; +void* netListenComm; +void* netRecvComm; +int netRecvDev; +char *recvDevBuffer; +char *recvHostBuffer; +void *recvDevHandle; +void *recvHostHandle; +int recvBuffSize; +uint64_t *recvHead, *recvTail, *recvErrorCount, *sinkCycle, *sinkBytes; +struct timeval recv_tvs; +uint64_t recv_sizes; +int recv_active_req; +float recv_bw_cumulative; +int recv_bw_count; + +bool use_gdr_read = false, use_gdr_write = true; +bool runSend = false, runRecv = false; + +uint64_t send_byte; +uint64_t recv_byte; + +__device__ +inline __attribute((always_inline)) +long long int __rtc64() { +#if __HIP__ + return (long long int) __builtin_amdgcn_s_memrealtime(); +#else + return (long long int) __clock_u64(); +#endif +} + +inline __device__ void Fetch128(Pack128& v, const Pack128* p) { + v.x = p->x; + v.y = p->y; +} +inline __device__ void Store128(Pack128* p, Pack128& v) { + p->x = v.x; + p->y = v.y; +} + +template +inline __device__ void DataSourceOrSink(const int w, const int nw, const int t, + Pack128* buff, const int Npack, uint64_t seq, uint64_t *error) { + const int inc = nw * UNROLL * WARP_SIZE; + int offset = w * UNROLL * WARP_SIZE + t; + + Pack128* src = buff + offset; + + uint64_t x = (uint64_t)(offset) + (seq<<32); + uint64_t y = seq + (((uint64_t)(offset))<<32); + while (offset < Npack) { + Pack128 vals[UNROLL]; + if (SINK) { + for (int u = 0; u < UNROLL; ++u) Fetch128(vals[u], src + u*WARP_SIZE); + for (int u = 0; u < UNROLL; ++u) { + if (vals[u].x != x++ || vals[u].y != y++ ) { + __atomic_fetch_add(error, 1, __ATOMIC_SEQ_CST); + } + } + } else { + for (int u = 0; u < UNROLL; ++u) { + vals[u].x = x++; + vals[u].y = y++; + } + for (int u = 0; u < UNROLL; ++u) Store128(src + u*WARP_SIZE, vals[u]); + } + src += inc; + offset += inc; + } +} + +__global__ void DataSinkKernel(Pack128* data, uint64_t* recv_head, uint64_t* recv_tail, uint64_t* mismatch, uint64_t *sink_cycle, uint64_t *sink_bytes) { + const int N = DEFAULT_BUFFSIZE*SLICE_STEPS/NCCL_STEPS/sizeof(Pack128); + Pack128* recvBuff[NCCL_STEPS]; + const int tid = threadIdx.x; + uint64_t tail = LOAD(recv_tail); + __shared__ uint64_t error; + uint64_t t0; + if (tid == 0) error = 0; + __syncthreads(); + for (int i = 0; i < NCCL_STEPS; i++) + recvBuff[i] = data + (i/SLICE_STEPS)*N; + do { + if (tid == 0) while (LOAD(recv_head) < tail + SLICE_STEPS); + __syncthreads(); + if (tid == 0) t0 = __rtc64(); +#ifdef ENABLE_VALIDATION + Pack128* d = recvBuff[tail%NCCL_STEPS]; + int w = tid / WARP_SIZE; + int nw = blockDim.x / WARP_SIZE; + int t = tid % WARP_SIZE; + DataSourceOrSink<2, 1>(w, nw, t, recvBuff[tail%NCCL_STEPS], N, tail, &error); + __syncthreads(); +#endif + tail += SLICE_STEPS; + if (tid == 0) { + STORE(recv_tail, tail); + *sink_cycle += (__rtc64() - t0); + *sink_bytes += N; + } + } while (tail < NCCL_STEPS*ITERATIONS); + if (tid == 0) STORE(mismatch, error); +} + +ncclResult_t netRecvProxy(struct ncclProxyArgs* args) { + char* localBuff = use_gdr_write ? recvDevBuffer : recvHostBuffer; + void* mhandle = use_gdr_write ? recvDevHandle : recvHostHandle; + int stepSize = recvBuffSize / NCCL_STEPS; + if (args->head < args->end) { + if ((args->tail < args->head + NCCL_STEPS) && (args->tail < LOAD(recvTail) + NCCL_STEPS) && (args->tail < args->end)) { + int buffSlot = args->tail%NCCL_STEPS; + int sliceSize = stepSize * args->sliceSteps; + NCCLCHECK(ncclNetIrecv(netRecvComm, localBuff+buffSlot*stepSize, sliceSize, mhandle, args->requests+buffSlot)); + if (args->requests[buffSlot] != NULL) { + if (recv_active_req == 0) { + gettimeofday(&recv_tvs, NULL); + recv_sizes = 0; + } + recv_active_req ++; + args->tail += args->sliceSteps; + args->idle = 0; + } + } + if (args->tail > args->head) { + int buffSlot = args->head%NCCL_STEPS; + int done, size; + NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, &size)); + if (done) { + recv_active_req --; + recv_sizes += size; + if (recv_active_req == 0) { + struct timeval tv; + gettimeofday(&tv, NULL); + recv_bw_cumulative += (float)recv_sizes/((tv.tv_sec - recv_tvs.tv_sec)*1000*1000 + tv.tv_usec - recv_tvs.tv_usec)/1000.0; + recv_bw_count ++; + } + args->head += args->sliceSteps; + recv_byte += size; + NCCLCHECK(ncclNetFlush(netRecvComm, localBuff+buffSlot*stepSize, size, mhandle)); + STORE(recvHead, args->head); + args->idle = 0; + } + } + } else { + runRecv = false; + } + return ncclSuccess; +} + +__global__ void DataSourceKernel(Pack128* data, uint64_t* send_head, uint64_t* send_tail, uint64_t *source_cycle, uint64_t *source_bytes) { + const int N = DEFAULT_BUFFSIZE*SLICE_STEPS/NCCL_STEPS/sizeof(Pack128); + Pack128* sendBuff[NCCL_STEPS]; + const int tid = threadIdx.x; + uint64_t head = LOAD(send_head); + uint64_t t0; + for (int i = 0; i < NCCL_STEPS; i++) + sendBuff[i] = data + (i/SLICE_STEPS)*N; + do { + if (tid == 0) while (LOAD(send_tail) + NCCL_STEPS < head + SLICE_STEPS); + __syncthreads(); + if (tid == 0) t0 = __rtc64(); + int w = tid / WARP_SIZE; + int nw = blockDim.x / WARP_SIZE; + int t = tid % WARP_SIZE; + DataSourceOrSink<2, 0>(w, nw, t, sendBuff[head%NCCL_STEPS], N, head, 0); + __syncthreads(); + head += SLICE_STEPS; + if (tid == 0) { + STORE(send_head, head); + *source_cycle += (__rtc64() - t0); + *source_bytes += N; + } + } while (head < NCCL_STEPS*ITERATIONS); +} + +ncclResult_t netSendProxy(struct ncclProxyArgs* args) { + char* localBuff = use_gdr_read ? sendDevBuffer : sendHostBuffer; + void* mhandle = use_gdr_read ? sendDevHandle : sendHostHandle; + int stepSize = sendBuffSize / NCCL_STEPS; + int sliceSize = stepSize * args->sliceSteps; + if (args->head < args->end) { + if (args->tail < args->end && args->tail < args->head + NCCL_STEPS) { + if (args->tail < LOAD(sendHead)) { + int buffSlot = args->tail%NCCL_STEPS; + NCCLCHECK(ncclNetIsend(netSendComm, localBuff+buffSlot*stepSize, sliceSize, mhandle, args->requests+buffSlot)); + if (args->requests[buffSlot] != NULL) { + if (send_active_req == 0) { + gettimeofday(&send_tvs, NULL); + send_sizes = 0; + } + send_active_req ++; + send_sizes += sliceSize; + send_byte += sliceSize; + __sync_synchronize(); + args->tail += args->sliceSteps; + args->idle = 0; + } + } + } + if (args->head < args->tail) { + int done; + int buffSlot = args->head%NCCL_STEPS; + NCCLCHECK(ncclNetTest(args->requests[buffSlot], &done, NULL)); + if (done) { + send_active_req --; + if (send_active_req == 0) { + struct timeval tv; + gettimeofday(&tv, NULL); + send_bw_cumulative += (float)send_sizes/((tv.tv_sec - send_tvs.tv_sec)*1000*1000 + tv.tv_usec - send_tvs.tv_usec)/1000.0; + send_bw_count ++; + } + args->head += args->sliceSteps; + STORE(sendTail, args->head); + args->idle = 0; + } + } + } + else + runSend = false; + return ncclSuccess; +} + +int main(int argc,char* argv[]) +{ + struct ncclComm *comm; + int sliceSteps = SLICE_STEPS; + + NCCLCHECK(initNet()); + int ndev; + NCCLCHECK(ncclNetDevices(&ndev)); + if (ndev == 0) { + printf("No IB devices found.\n"); + return 0; + } + else + printf("Found %d IB devices\n", ndev); + + sendBuffSize = recvBuffSize = DEFAULT_BUFFSIZE; + + char *gpu = getCmdOption(argv, argv + argc, "-g"); + if (gpu) { + printf("Select GPU %s\n", gpu); + CUDACHECK(hipSetDevice(atol(gpu))); + } + + char *gdr_read = getCmdOption(argv, argv + argc, "-r"); + if (gdr_read) { + use_gdr_read = atol(gdr_read); + } + + char *gdr_write = getCmdOption(argv, argv + argc, "-w"); + if (gdr_write) { + use_gdr_write = atol(gdr_write); + } + + if (cmdOptionExists(argv, argv + argc, "-d")) { + char *ip = getCmdOption(argv, argv + argc, "-d"); + if (ip) + inet_pton(AF_INET, ip, &netConnectAddr.sin_addr); + char *port = getCmdOption(argv, argv + argc, "-p"); + if (port) + netConnectAddr.sin_port = htons(atoi(port)); + else + netConnectAddr.sin_port = htons(23456); + + netConnectAddr.sin_family = AF_INET; + printf("Connecting to %s:%s\n", ip, port); + + printf("GDR Read %s\n", use_gdr_read ? "enabled" : "disabled"); + + NCCLCHECK(ncclCudaCalloc(&sendDevBuffer, sendBuffSize, 1)); + NCCLCHECK(ncclCudaHostCalloc(&sendHostBuffer, sendBuffSize)); + int status[1] = {-1}; + if (!move_pages(0, 1, (void **)&sendHostBuffer, NULL, status, 0)) + printf("Allocated sendHostBuffer %p of %d bytes on node %d, sliceSteps %d\n", + sendHostBuffer, sendBuffSize, status[0], sliceSteps); + + NCCLCHECK(ncclCudaHostCalloc(&sendHead, 1)); + NCCLCHECK(ncclCudaHostCalloc(&sendTail, 1)); + NCCLCHECK(ncclCudaHostCalloc(&sourceCycle, 1)); + NCCLCHECK(ncclCudaHostCalloc(&sourceBytes, 1)); + netSendDev = 0; + NCCLCHECK(ncclNetConnect(netSendDev, &netConnectAddr, &netSendComm)); + + NCCLCHECK(ncclNetRegMr(netSendComm, sendDevBuffer, sendBuffSize, NCCL_PTR_CUDA, &sendDevHandle)); + NCCLCHECK(ncclNetRegMr(netSendComm, sendHostBuffer, sendBuffSize, NCCL_PTR_HOST, &sendHostHandle)); + + hipLaunchKernelGGL(DataSourceKernel, dim3(1, 1, 1), dim3(256, 1, 1), 0, 0, + (Pack128 *)(use_gdr_read ? sendDevBuffer : sendHostBuffer), sendHead, sendTail, sourceCycle, sourceBytes); + + runSend = true; + } else { + printf("GDR Write %s\n", use_gdr_write ? "enabled" : "disabled"); + + NCCLCHECK(ncclCudaCalloc(&recvDevBuffer, recvBuffSize, 1)); + NCCLCHECK(ncclCudaHostCalloc(&recvHostBuffer, recvBuffSize)); + int status[1] = {-1}; + if (!move_pages(0, 1, (void **)&recvHostBuffer, NULL, status, 0)) + printf("Allocated recvHostBuffer %p of %d bytes on node %d, sliceSteps %d\n", + recvHostBuffer, recvBuffSize, status[0], sliceSteps); + + NCCLCHECK(ncclCudaHostCalloc(&recvHead, 1)); + NCCLCHECK(ncclCudaHostCalloc(&recvTail, 1)); + NCCLCHECK(ncclCudaHostCalloc(&recvErrorCount, 1)); + NCCLCHECK(ncclCudaHostCalloc(&sinkCycle, 1)); + NCCLCHECK(ncclCudaHostCalloc(&sinkBytes, 1)); + netRecvDev = 0; + NCCLCHECK(ncclNetListen(netRecvDev, (void *)&netListenAddr, &netListenComm)); + char ip[INET_ADDRSTRLEN]; + uint16_t port; + inet_ntop(AF_INET, &netListenAddr.sin_addr, ip, sizeof(ip)); + port = htons(netListenAddr.sin_port); + printf("Listening on socket %s:%d\n", ip, port); + + NCCLCHECK(ncclNetAccept(netListenComm, &netRecvComm)); + NCCLCHECK(ncclNetCloseListen(netListenComm)); + + NCCLCHECK(ncclNetRegMr(netRecvComm, recvDevBuffer, recvBuffSize, NCCL_PTR_CUDA, &recvDevHandle)); + NCCLCHECK(ncclNetRegMr(netRecvComm, recvHostBuffer, recvBuffSize, NCCL_PTR_HOST, &recvHostHandle)); + + hipLaunchKernelGGL(DataSinkKernel, dim3(1, 1, 1), dim3(256, 1, 1), 0, 0, + (Pack128 *)(use_gdr_write ? recvDevBuffer : recvHostBuffer), recvHead, recvTail, recvErrorCount, sinkCycle, sinkBytes); + + runRecv = true; + } + + struct ncclProxyArgs sendArgs, recvArgs; + + memset(&sendArgs, 0, sizeof(struct ncclProxyArgs)); + sendArgs.head = 0; + sendArgs.tail = 0; + sendArgs.end = NCCL_STEPS*ITERATIONS; + sendArgs.sliceSteps = sliceSteps; + sendArgs.opCount = 1; + + memset(&recvArgs, 0, sizeof(struct ncclProxyArgs)); + recvArgs.head = 0; + recvArgs.tail = 0; + recvArgs.end = NCCL_STEPS*ITERATIONS; + recvArgs.sliceSteps = sliceSteps; + recvArgs.opCount = 1; + + struct timeval tv_start, tv_end; + gettimeofday(&tv_start, NULL); + + do { + if (runRecv) + NCCLCHECK(netRecvProxy(&recvArgs)); + if (runSend) + NCCLCHECK(netSendProxy(&sendArgs)); + } while (runSend || runRecv); + + CUDACHECK(hipDeviceSynchronize()); + + gettimeofday(&tv_end, NULL); + uint64_t total_time = ((uint64_t)(tv_end.tv_sec - tv_start.tv_sec)*1000*1000 + tv_end.tv_usec - tv_start.tv_usec); + + if (send_byte) printf("# Send %6.2f GB/s (%ld bytes %ld us) Proxy %6.2f GB/s (%d mmts) Kernel %6.2f GB/s (%ld bytes)\n", + (total_time) ? (double)send_byte/total_time/1000.0 : 0, + send_byte, total_time, send_bw_count ? (float)send_bw_cumulative/send_bw_count : 0, send_bw_count, + *sourceCycle ? (double)(*sourceBytes)*sizeof(Pack128)/((double)(*sourceCycle)/VEGA_GPU_RTC_FREQUENCY*1.0E9) : 0, *sourceBytes*sizeof(Pack128)); + if (recv_byte) printf("# Recv %6.2f GB/s (%ld bytes %ld us) Proxy %6.2f GB/s (%d mmts) Kernel %6.2f GB/s (%ld bytes) Data Error Counts %ld\n", + (total_time) ? (double)recv_byte/total_time/1000.0 : 0, + recv_byte, total_time, recv_bw_count ? (float)recv_bw_cumulative/recv_bw_count : 0, recv_bw_count, + *sinkCycle ? (double)(*sinkBytes)*sizeof(Pack128)/((double)(*sinkCycle)/VEGA_GPU_RTC_FREQUENCY*1.0E9) : 0, *sinkBytes*sizeof(Pack128), + *recvErrorCount); + + if (cmdOptionExists(argv, argv + argc, "-d")) { + NCCLCHECK(ncclCudaHostFree(sourceCycle)); + NCCLCHECK(ncclCudaHostFree(sourceBytes)); + NCCLCHECK(ncclCudaHostFree(sendHead)); + NCCLCHECK(ncclCudaHostFree(sendTail)); + NCCLCHECK(ncclNetDeregMr(netSendComm, sendDevHandle)); + NCCLCHECK(ncclNetDeregMr(netSendComm, sendHostHandle)); + NCCLCHECK(ncclCudaHostFree(sendHostBuffer)); + CUDACHECK(hipFree(sendDevBuffer)); + NCCLCHECK(ncclNetCloseSend(netSendComm)); + } else { + NCCLCHECK(ncclCudaHostFree(sinkCycle)); + NCCLCHECK(ncclCudaHostFree(sinkBytes)); + NCCLCHECK(ncclCudaHostFree(recvErrorCount)); + NCCLCHECK(ncclCudaHostFree(recvHead)); + NCCLCHECK(ncclCudaHostFree(recvTail)); + NCCLCHECK(ncclNetDeregMr(netRecvComm, recvDevHandle)); + NCCLCHECK(ncclNetDeregMr(netRecvComm, recvHostHandle)); + NCCLCHECK(ncclCudaHostFree(recvHostBuffer)); + CUDACHECK(hipFree(recvDevBuffer)); + NCCLCHECK(ncclNetCloseRecv(netRecvComm)); + } + + return 0; +} diff --git a/tools/ib-test/include/nccl.h b/tools/ib-test/include/nccl.h new file mode 100755 index 0000000000..76da780694 --- /dev/null +++ b/tools/ib-test/include/nccl.h @@ -0,0 +1,354 @@ +/************************************************************************* + * Copyright (c) 2015-2020, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ + +#ifndef NCCL_H_ +#define NCCL_H_ + +#include +#include + +#define NCCL_MAJOR 2 +#define NCCL_MINOR 7 +#define NCCL_PATCH 0 +#define NCCL_SUFFIX "" + +#define NCCL_VERSION_CODE 2700 +#define NCCL_VERSION(X,Y,Z) ((X) * 1000 + (Y) * 100 + (Z)) + +#define RCCL_BFLOAT16 1 +#define RCCL_GATHER_SCATTER 1 + +#ifdef __cplusplus +extern "C" { +#endif + +/* Opaque handle to communicator */ +typedef struct ncclComm* ncclComm_t; + +#define NCCL_UNIQUE_ID_BYTES 128 +typedef struct { char internal[NCCL_UNIQUE_ID_BYTES]; } ncclUniqueId; + +/* Error type */ +typedef enum { ncclSuccess = 0, + ncclUnhandledCudaError = 1, + ncclSystemError = 2, + ncclInternalError = 3, + ncclInvalidArgument = 4, + ncclInvalidUsage = 5, + ncclNumResults = 6 } ncclResult_t; + +/* Return the NCCL_VERSION_CODE of the NCCL library in the supplied integer. + * This integer is coded with the MAJOR, MINOR and PATCH level of the + * NCCL library + */ +ncclResult_t ncclGetVersion(int *version); +ncclResult_t pncclGetVersion(int *version); + +/* Generates an Id to be used in ncclCommInitRank. ncclGetUniqueId should be + * called once and the Id should be distributed to all ranks in the + * communicator before calling ncclCommInitRank. */ +ncclResult_t ncclGetUniqueId(ncclUniqueId* uniqueId); +ncclResult_t pncclGetUniqueId(ncclUniqueId* uniqueId); + +/* Creates a new communicator (multi thread/process version). + * rank must be between 0 and nranks-1 and unique within a communicator clique. + * Each rank is associated to a CUDA device, which has to be set before calling + * ncclCommInitRank. + * ncclCommInitRank implicitly syncronizes with other ranks, so it must be + * called by different threads/processes or use ncclGroupStart/ncclGroupEnd. */ +ncclResult_t ncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank); +ncclResult_t pncclCommInitRank(ncclComm_t* comm, int nranks, ncclUniqueId commId, int rank); + +/* Creates a clique of communicators (single process version). + * This is a convenience function to create a single-process communicator clique. + * Returns an array of ndev newly initialized communicators in comm. + * comm should be pre-allocated with size at least ndev*sizeof(ncclComm_t). + * If devlist is NULL, the first ndev CUDA devices are used. + * Order of devlist defines user-order of processors within the communicator. */ +ncclResult_t ncclCommInitAll(ncclComm_t* comm, int ndev, const int* devlist); +ncclResult_t pncclCommInitAll(ncclComm_t* comm, int ndev, const int* devlist); + +/* Frees resources associated with communicator object, but waits for any operations + * that might still be running on the device. */ +ncclResult_t ncclCommDestroy(ncclComm_t comm); +ncclResult_t pncclCommDestroy(ncclComm_t comm); + +/* Frees resources associated with communicator object and aborts any operations + * that might still be running on the device. */ +ncclResult_t ncclCommAbort(ncclComm_t comm); +ncclResult_t pncclCommAbort(ncclComm_t comm); + +/* Returns a human-readable error message. */ +const char* ncclGetErrorString(ncclResult_t result); +const char* pncclGetErrorString(ncclResult_t result); + +/* Checks whether the comm has encountered any asynchronous errors */ +ncclResult_t ncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError); +ncclResult_t pncclCommGetAsyncError(ncclComm_t comm, ncclResult_t *asyncError); + +/* Gets the number of ranks in the communicator clique. */ +ncclResult_t ncclCommCount(const ncclComm_t comm, int* count); +ncclResult_t pncclCommCount(const ncclComm_t comm, int* count); + +/* Returns the cuda device number associated with the communicator. */ +ncclResult_t ncclCommCuDevice(const ncclComm_t comm, int* device); +ncclResult_t pncclCommCuDevice(const ncclComm_t comm, int* device); + +/* Returns the user-ordered "rank" associated with the communicator. */ +ncclResult_t ncclCommUserRank(const ncclComm_t comm, int* rank); +ncclResult_t pncclCommUserRank(const ncclComm_t comm, int* rank); + +/* Reduction operation selector */ +typedef enum { ncclSum = 0, + ncclProd = 1, + ncclMax = 2, + ncclMin = 3, + ncclNumOps = 4 } ncclRedOp_t; + +/* Data types */ +typedef enum { ncclInt8 = 0, ncclChar = 0, + ncclUint8 = 1, + ncclInt32 = 2, ncclInt = 2, + ncclUint32 = 3, + ncclInt64 = 4, + ncclUint64 = 5, + ncclFloat16 = 6, ncclHalf = 6, + ncclFloat32 = 7, ncclFloat = 7, + ncclFloat64 = 8, ncclDouble = 8, + ncclBfloat16 = 9, + ncclNumTypes = 10 } ncclDataType_t; + +/* + * Collective communication operations + * + * Collective communication operations must be called separately for each + * communicator in a communicator clique. + * + * They return when operations have been enqueued on the CUDA stream. + * + * Since they may perform inter-CPU synchronization, each call has to be done + * from a different thread or process, or need to use Group Semantics (see + * below). + */ + +/* + * Reduce + * + * Reduces data arrays of length count in sendbuff into recvbuff using op + * operation. + * recvbuff may be NULL on all calls except for root device. + * root is the rank (not the CUDA device) where data will reside after the + * operation is complete. + * + * In-place operation will happen if sendbuff == recvbuff. + */ +ncclResult_t ncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclReduce(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t op, int root, ncclComm_t comm, hipStream_t stream); + +/* + * (deprecated) Broadcast (in-place) + * + * Copies count values from root to all other devices. + * root is the rank (not the CUDA device) where data resides before the + * operation is started. + * + * This operation is implicitely in place. + */ +ncclResult_t ncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root, + ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclBcast(void* buff, size_t count, ncclDataType_t datatype, int root, + ncclComm_t comm, hipStream_t stream); + +/* + * Broadcast + * + * Copies count values from root to all other devices. + * root is the rank (not the CUDA device) where data resides before the + * operation is started. + * + * In-place operation will happen if sendbuff == recvbuff. + */ +ncclResult_t ncclBroadcast(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, int root, + ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclBroadcast(const void* sendbuff, void* recvbuff, size_t count, ncclDataType_t datatype, int root, + ncclComm_t comm, hipStream_t stream); + +/* + * All-Reduce + * + * Reduces data arrays of length count in sendbuff using op operation, and + * leaves identical copies of result on each recvbuff. + * + * In-place operation will happen if sendbuff == recvbuff. + */ +ncclResult_t ncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclAllReduce(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, hipStream_t stream); + +/* + * Reduce-Scatter + * + * Reduces data in sendbuff using op operation and leaves reduced result + * scattered over the devices so that recvbuff on rank i will contain the i-th + * block of the result. + * Assumes sendcount is equal to nranks*recvcount, which means that sendbuff + * should have a size of at least nranks*recvcount elements. + * + * In-place operations will happen if recvbuff == sendbuff + rank * recvcount. + */ +ncclResult_t ncclReduceScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, + hipStream_t stream); +ncclResult_t pncclReduceScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, ncclRedOp_t op, ncclComm_t comm, + hipStream_t stream); + +/* + * All-Gather + * + * Each device gathers sendcount values from other GPUs into recvbuff, + * receiving data from rank i at offset i*sendcount. + * Assumes recvcount is equal to nranks*sendcount, which means that recvbuff + * should have a size of at least nranks*sendcount elements. + * + * In-place operations will happen if sendbuff == recvbuff + rank * sendcount. + */ +ncclResult_t ncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclAllGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); + +/* + * Send + * + * Send data from sendbuff to rank peer. + * + * Rank peer needs to call ncclRecv with the same datatype and the same count from this + * rank. + * + * This operation is blocking for the GPU. If multiple ncclSend and ncclRecv operations + * need to progress concurrently to complete, they must be fused within a ncclGroupStart/ + * ncclGroupEnd section. + */ +ncclResult_t ncclSend(const void* sendbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclSend(const void* sendbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, hipStream_t stream); + +/* + * Receive + * + * Receive data from rank peer into recvbuff. + * + * Rank peer needs to call ncclSend with the same datatype and the same count to this + * rank. + * + * This operation is blocking for the GPU. If multiple ncclSend and ncclRecv operations + * need to progress concurrently to complete, they must be fused within a ncclGroupStart/ + * ncclGroupEnd section. + */ +ncclResult_t pncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, hipStream_t stream); +ncclResult_t ncclRecv(void* recvbuff, size_t count, ncclDataType_t datatype, int peer, + ncclComm_t comm, hipStream_t stream); + +/* + * Gather + * + * Root device gathers sendcount values from other GPUs into recvbuff, + * receiving data from rank i at offset i*sendcount. + * Assumes recvcount is equal to nranks*sendcount, which means that recvbuff + * should have a size of at least nranks*sendcount elements. + * + * In-place operations will happen if sendbuff == recvbuff + rank * sendcount. + */ +ncclResult_t ncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclGather(const void* sendbuff, void* recvbuff, size_t sendcount, + ncclDataType_t datatype, int root, ncclComm_t comm, hipStream_t stream); + +/* + * Scatter + * + * Scattered over the devices so that recvbuff on rank i will contain the i-th + * block of the data on root. + * Assumes sendcount is equal to nranks*recvcount, which means that sendbuff + * should have a size of at least nranks*recvcount elements. + * + * In-place operations will happen if recvbuff == sendbuff + rank * recvcount. + */ +ncclResult_t ncclScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm, + hipStream_t stream); +ncclResult_t pncclScatter(const void* sendbuff, void* recvbuff, + size_t recvcount, ncclDataType_t datatype, int root, ncclComm_t comm, + hipStream_t stream); + +/* + * All-To-All + * + * Device (i) send (j)th block of data to device (j) and be placed as (i)th + * block. Each block for sending/receiving has count elements, which means + * that recvbuff and sendbuff should have a size of nranks*count elements. + * + * In-place operation will happen if sendbuff == recvbuff. + */ +ncclResult_t ncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); +ncclResult_t pncclAllToAll(const void* sendbuff, void* recvbuff, size_t count, + ncclDataType_t datatype, ncclComm_t comm, hipStream_t stream); + +/* + * Group semantics + * + * When managing multiple GPUs from a single thread, and since NCCL collective + * calls may perform inter-CPU synchronization, we need to "group" calls for + * different ranks/devices into a single call. + * + * Grouping NCCL calls as being part of the same collective operation is done + * using ncclGroupStart and ncclGroupEnd. ncclGroupStart will enqueue all + * collective calls until the ncclGroupEnd call, which will wait for all calls + * to be complete. Note that for collective communication, ncclGroupEnd only + * guarantees that the operations are enqueued on the streams, not that + * the operation is effectively done. + * + * Both collective communication and ncclCommInitRank can be used in conjunction + * of ncclGroupStart/ncclGroupEnd, but not together. + * + * Group semantics also allow to fuse multiple operations on the same device + * to improve performance (for aggregated collective calls), or to permit + * concurrent progress of multiple send/receive operations. + */ + +/* + * Group Start + * + * Start a group call. All calls to NCCL until ncclGroupEnd will be fused into + * a single NCCL operation. Nothing will be started on the CUDA stream until + * ncclGroupEnd. + */ +ncclResult_t ncclGroupStart(); +ncclResult_t pncclGroupStart(); + +/* + * Group End + * + * End a group call. Start a fused NCCL operation consisting of all calls since + * ncclGroupStart. Operations on the CUDA stream depending on the NCCL operations + * need to be called after ncclGroupEnd. + */ +ncclResult_t ncclGroupEnd(); +ncclResult_t pncclGroupEnd(); + +#ifdef __cplusplus +} // end extern "C" +#endif + +#endif // end include guard diff --git a/tools/ib-test/utils.cpp b/tools/ib-test/utils.cpp new file mode 100755 index 0000000000..42d92b9082 --- /dev/null +++ b/tools/ib-test/utils.cpp @@ -0,0 +1,119 @@ +/************************************************************************* + * Copyright (c) 2016-2019, NVIDIA CORPORATION. All rights reserved. + * Modifications Copyright (c) 2019-2020 Advanced Micro Devices, Inc. All rights reserved. + * + * See LICENSE.txt for license information + ************************************************************************/ +#include "nccl.h" +#include "channel.h" +#include "nvmlwrap.h" +#include "bootstrap.h" +#include "transport.h" +#include "group.h" +#include "net.h" +#include "coll_net.h" +#include "enqueue.h" +#include "graph.h" +#include "argcheck.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef ENABLE_TRACE +std::chrono::high_resolution_clock::time_point ncclEpoch; +#endif + +ncclNet_t* ncclNet = NULL; + +// Returns ncclInternalError if anything fails, causing that network to be ignored. +ncclResult_t initNet(ncclNet_t* net) { + int ndev; + if (net->init(ncclDebugLog) != ncclSuccess) return ncclInternalError; + if (net->devices(&ndev) != ncclSuccess) return ncclInternalError; + if (ndev <= 0) return ncclSystemError; + return ncclSuccess; +} + +ncclResult_t initNet() { + if (initNet(&ncclNetIb) == ncclSuccess) { + ncclNet = &ncclNetIb; + } + return ncclSuccess; +} + +ncclResult_t getHostName(char* hostname, int maxlen, const char delim) { + if (gethostname(hostname, maxlen) != 0) { + strncpy(hostname, "unknown", maxlen); + return ncclSystemError; + } + int i = 0; + while ((hostname[i] != delim) && (hostname[i] != '\0') && (i < maxlen-1)) i++; + hostname[i] = '\0'; + return ncclSuccess; +} + +int parseStringList(const char* string, struct netIf* ifList, int maxList) { + if (!string) return 0; + + const char* ptr = string; + + int ifNum = 0; + int ifC = 0; + char c; + do { + c = *ptr; + if (c == ':') { + if (ifC > 0) { + ifList[ifNum].prefix[ifC] = '\0'; + ifList[ifNum].port = atoi(ptr+1); + ifNum++; ifC = 0; + } + while (c != ',' && c != '\0') c = *(++ptr); + } else if (c == ',' || c == '\0') { + if (ifC > 0) { + ifList[ifNum].prefix[ifC] = '\0'; + ifList[ifNum].port = -1; + ifNum++; ifC = 0; + } + } else { + ifList[ifNum].prefix[ifC] = c; + ifC++; + } + ptr++; + } while (ifNum < maxList && c); + return ifNum; +} + +static bool matchIf(const char* string, const char* ref, bool matchExact) { + // Make sure to include '\0' in the exact case + int matchLen = matchExact ? strlen(string) + 1 : strlen(ref); + return strncmp(string, ref, matchLen) == 0; +} + +static bool matchPort(const int port1, const int port2) { + if (port1 == -1) return true; + if (port2 == -1) return true; + if (port1 == port2) return true; + return false; +} + + +bool matchIfList(const char* string, int port, struct netIf* ifList, int listSize, bool matchExact) { + // Make an exception for the case where no user list is defined + if (listSize == 0) return true; + + for (int i=0; i