Adding option to re-use streams instead of re-creating per topology

[ROCm/rccl commit: 339bf9ff19]
This commit is contained in:
Gilbert Lee
2020-04-23 15:53:40 +00:00
orang tua 7d31b2d4d1
melakukan eebc6f2844
@@ -63,6 +63,7 @@ int main(int argc, char **argv)
printf(" USE_INTERACTIVE - Waits for user-input prior to start and after transfer loop (for profiling)\n");
printf(" USE_ITERATIONS=N - Sets number of iterations to run (default is 10)\n");
printf(" USE_SLEEP - Adds a 100ms sleep after sync (for profiling)\n");
printf(" REUSE_STREAMS - Re-uses streams instead of creating/destroying per topology\n");
exit(0);
}
@@ -83,6 +84,7 @@ int main(int argc, char **argv)
bool useSingleSync = getenv("USE_SINGLE_SYNC");
bool useInteractive = getenv("USE_INTERACTIVE");
bool useSleep = getenv("USE_SLEEP");
bool reuseStreams = getenv("REUSE_STREAMS");
int numWarmups = 3;
int numIterations = getenv("USE_ITERATIONS") ? atoi(getenv("USE_ITERATIONS")) : 10;
@@ -112,8 +114,12 @@ int main(int argc, char **argv)
printf("Adding 100ms sleep after sync (USE_SLEEP)\n");
else
printf("No sleep per sync (enable sleep via USE_SLEEP)\n");
if (reuseStreams)
printf("Re-using streams per topology (REUSE_STREAMS)\n");
else
printf("Creating/destroying streams per topology (re-use streams via REUSE_STREAMS)\n");
printf("Executing %d warmup iteration(s), and %d timed iteration(s) (Set via USE_ITERATION=#)\n",
printf("Executing %d warmup iteration(s), and %d timed iteration(s) (Set via USE_ITERATIONS=#)\n",
numWarmups, numIterations);
// Currently an environment variable is required in order to enable fine-grained VRAM allocations
@@ -162,6 +168,7 @@ int main(int argc, char **argv)
// Track links that get used
std::map<std::pair<int, int>, int> linkMap;
std::vector<std::vector<hipStream_t>> streamCache(numDevices);
char line[2048];
while(fgets(line, 2048, fp))
@@ -216,18 +223,31 @@ int main(int argc, char **argv)
HIP_CALL(hipDeviceEnablePeerAccess(dst, 0));
}
// Count # of links / total blocks each GPU will be working on
linkCount[src]++;
// Allocate GPU memory on source GPU / streams / events
HIP_CALL(hipSetDevice(links[i].srcGpu));
HIP_CALL(hipStreamCreate(&streams[i]));
HIP_CALL(hipSetDevice(src));
if (reuseStreams)
{
// Create new stream if necessary
if (streamCache[src].size() <= linkCount[src])
{
streamCache[src].resize(linkCount[src] + 1);
HIP_CALL(hipStreamCreate(&streamCache[src][linkCount[src]]));
}
streams[i] = streamCache[src][linkCount[src]];
}
else
{
HIP_CALL(hipStreamCreate(&streams[i]));
}
HIP_CALL(hipEventCreate(&startEvents[i]));
HIP_CALL(hipEventCreate(&stopEvents[i]));
HIP_CALL(hipMalloc((void **)&linkSrcMem[i], numBytesPerLink));
HIP_CALL(hipMalloc((void**)&gpuBlockParams[i], sizeof(BlockParam) * numLinks));
CheckOrFill(N, linkSrcMem[i], false, useMemset, useHipCall);
// Count # of links / total blocks each GPU will be working on
linkCount[src]++;
// Allocate GPU memory on destination GPU
HIP_CALL(hipSetDevice(links[i].dstGpu));
if (useCoarseMem)
@@ -416,7 +436,8 @@ int main(int argc, char **argv)
HIP_CALL(hipFree(linkSrcMem[i]));
HIP_CALL(hipFree(linkDstMem[i]));
HIP_CALL(hipFree(gpuBlockParams[i]));
HIP_CALL(hipStreamDestroy(streams[i]));
if (!reuseStreams)
HIP_CALL(hipStreamDestroy(streams[i]));
HIP_CALL(hipEventDestroy(startEvents[i]));
HIP_CALL(hipEventDestroy(stopEvents[i]));
@@ -424,6 +445,13 @@ int main(int argc, char **argv)
}
fclose(fp);
if (reuseStreams)
{
for (auto streamVector : streamCache)
for (auto stream : streamVector)
HIP_CALL(hipStreamDestroy(stream));
}
// Print link information
for (int i = 0; i < MAX_NAME_LEN + (8 * (numDevices + 1)); i++) printf("=");
printf("=|=");