diff --git a/projects/rccl/tools/TransferBench/TransferBench.cpp b/projects/rccl/tools/TransferBench/TransferBench.cpp index 447eb6ca96..9751e726a1 100644 --- a/projects/rccl/tools/TransferBench/TransferBench.cpp +++ b/projects/rccl/tools/TransferBench/TransferBench.cpp @@ -63,6 +63,7 @@ int main(int argc, char **argv) printf(" USE_INTERACTIVE - Waits for user-input prior to start and after transfer loop (for profiling)\n"); printf(" USE_ITERATIONS=N - Sets number of iterations to run (default is 10)\n"); printf(" USE_SLEEP - Adds a 100ms sleep after sync (for profiling)\n"); + printf(" REUSE_STREAMS - Re-uses streams instead of creating/destroying per topology\n"); exit(0); } @@ -83,6 +84,7 @@ int main(int argc, char **argv) bool useSingleSync = getenv("USE_SINGLE_SYNC"); bool useInteractive = getenv("USE_INTERACTIVE"); bool useSleep = getenv("USE_SLEEP"); + bool reuseStreams = getenv("REUSE_STREAMS"); int numWarmups = 3; int numIterations = getenv("USE_ITERATIONS") ? atoi(getenv("USE_ITERATIONS")) : 10; @@ -112,8 +114,12 @@ int main(int argc, char **argv) printf("Adding 100ms sleep after sync (USE_SLEEP)\n"); else printf("No sleep per sync (enable sleep via USE_SLEEP)\n"); + if (reuseStreams) + printf("Re-using streams per topology (REUSE_STREAMS)\n"); + else + printf("Creating/destroying streams per topology (re-use streams via REUSE_STREAMS)\n"); - printf("Executing %d warmup iteration(s), and %d timed iteration(s) (Set via USE_ITERATION=#)\n", + printf("Executing %d warmup iteration(s), and %d timed iteration(s) (Set via USE_ITERATIONS=#)\n", numWarmups, numIterations); // Currently an environment variable is required in order to enable fine-grained VRAM allocations @@ -162,6 +168,7 @@ int main(int argc, char **argv) // Track links that get used std::map, int> linkMap; + std::vector> streamCache(numDevices); char line[2048]; while(fgets(line, 2048, fp)) @@ -216,18 +223,31 @@ int main(int argc, char **argv) HIP_CALL(hipDeviceEnablePeerAccess(dst, 0)); } - // Count # of links / total blocks each GPU will be working on - linkCount[src]++; - // Allocate GPU memory on source GPU / streams / events - HIP_CALL(hipSetDevice(links[i].srcGpu)); - HIP_CALL(hipStreamCreate(&streams[i])); + HIP_CALL(hipSetDevice(src)); + if (reuseStreams) + { + // Create new stream if necessary + if (streamCache[src].size() <= linkCount[src]) + { + streamCache[src].resize(linkCount[src] + 1); + HIP_CALL(hipStreamCreate(&streamCache[src][linkCount[src]])); + } + streams[i] = streamCache[src][linkCount[src]]; + } + else + { + HIP_CALL(hipStreamCreate(&streams[i])); + } HIP_CALL(hipEventCreate(&startEvents[i])); HIP_CALL(hipEventCreate(&stopEvents[i])); HIP_CALL(hipMalloc((void **)&linkSrcMem[i], numBytesPerLink)); HIP_CALL(hipMalloc((void**)&gpuBlockParams[i], sizeof(BlockParam) * numLinks)); CheckOrFill(N, linkSrcMem[i], false, useMemset, useHipCall); + // Count # of links / total blocks each GPU will be working on + linkCount[src]++; + // Allocate GPU memory on destination GPU HIP_CALL(hipSetDevice(links[i].dstGpu)); if (useCoarseMem) @@ -416,7 +436,8 @@ int main(int argc, char **argv) HIP_CALL(hipFree(linkSrcMem[i])); HIP_CALL(hipFree(linkDstMem[i])); HIP_CALL(hipFree(gpuBlockParams[i])); - HIP_CALL(hipStreamDestroy(streams[i])); + if (!reuseStreams) + HIP_CALL(hipStreamDestroy(streams[i])); HIP_CALL(hipEventDestroy(startEvents[i])); HIP_CALL(hipEventDestroy(stopEvents[i])); @@ -424,6 +445,13 @@ int main(int argc, char **argv) } fclose(fp); + if (reuseStreams) + { + for (auto streamVector : streamCache) + for (auto stream : streamVector) + HIP_CALL(hipStreamDestroy(stream)); + } + // Print link information for (int i = 0; i < MAX_NAME_LEN + (8 * (numDevices + 1)); i++) printf("="); printf("=|=");