From dcabc9dbf738294f2cd44d5c408690bc5035c487 Mon Sep 17 00:00:00 2001 From: Ben Sander Date: Sat, 16 Apr 2016 10:18:56 -0500 Subject: [PATCH] P2P Update. - add P2P staging buffer copy. - If copy device does not have sufficient access permissions, fall back to staging buffer. - improve docs for which copy device is used. --- include/hcc_detail/hip_hcc.h | 11 ++-- include/hcc_detail/hip_runtime_api.h | 21 +++++-- include/hcc_detail/staging_buffer.h | 3 + src/hip_hcc.cpp | 73 ++++++++++++++++++++---- src/hip_peer.cpp | 4 +- src/staging_buffer.cpp | 83 ++++++++++++++++++++++++++-- tests/src/hipPeerToPeer_simple.cpp | 5 +- 7 files changed, 172 insertions(+), 28 deletions(-) diff --git a/include/hcc_detail/hip_hcc.h b/include/hcc_detail/hip_hcc.h index deb9fd0b04..45af34a152 100644 --- a/include/hcc_detail/hip_hcc.h +++ b/include/hcc_detail/hip_hcc.h @@ -38,7 +38,7 @@ THE SOFTWARE. // Compile peer-to-peer support. // >= 2 : use HCC hc:accelerator::get_is_peer // >= 3 : use hc::am_memtracker_update_peers(...) -#define USE_PEER_TO_PEER 0 +#define USE_PEER_TO_PEER 2 // Use new lock API in HCC: #define USE_HCC_LOCK 0 @@ -247,11 +247,12 @@ enum ihipCommand_t { ihipCommandCopyH2D, ihipCommandCopyD2H, ihipCommandCopyD2D, + ihipCommandCopyP2P, ihipCommandKernel, }; static const char* ihipCommandName[] = { - "CopyH2H", "CopyH2D", "CopyD2H", "CopyD2D", "Kernel" + "CopyH2H", "CopyH2D", "CopyD2H", "CopyD2D", "CopyP2P", "Kernel" }; @@ -451,7 +452,7 @@ private: // The unsigned return is hipMemcpyKind unsigned resolveMemcpyDirection(bool srcInDeviceMem, bool dstInDeviceMem); - void setCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent); + void setAsyncCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent); unsigned _device_index; // index into the g_device array @@ -527,6 +528,7 @@ public: // "Allocate" a stream ID: ihipStream_t::SeqNum_t incStreamId() { return _stream_id++; }; + bool isPeer(const ihipDevice_t *peer); // returns Trus if peer has access to memory physically located on this device. bool addPeer(ihipDevice_t *peer); bool removePeer(ihipDevice_t *peer); void resetPeers(ihipDevice_t *thisDevice); @@ -540,9 +542,10 @@ private: ihipStream_t::SeqNum_t _stream_id; // These reflect the currently Enabled set of peers for this GPU: + // Enabled peers have permissions to access the memory physically allocated on this device. std::list _peers; // list of enabled peer devices. uint32_t _peerCnt; // number of enabled peers - hsa_agent_t *_peerAgents; // efficient packed array of enabled agents (to use for allocations.) + hsa_agent_t *_peerAgents; // efficient packed array of enabled agents (to use for allocations.) private: void recomputePeerAgents(); }; diff --git a/include/hcc_detail/hip_runtime_api.h b/include/hcc_detail/hip_runtime_api.h index 013597bba6..e92636dade 100644 --- a/include/hcc_detail/hip_runtime_api.h +++ b/include/hcc_detail/hip_runtime_api.h @@ -798,6 +798,12 @@ hipError_t hipHostFree(void* ptr); * device to host, device to device and host to host * The src and dst must not overlap. * + * For hipMemcpy, the copy is always performed by the current device (set by hipSetDevice). + * For multi-gpu or peer-to-peer configurations, it is recommended to set the current device to the device where the src data is physically located. + * For optimal peer-to-peer copies, the copy device must be able to access the src and dst pointers (by calling hipDeviceEnablePeerAccess with copy agent as the + * current device and src/dest as the peerDevice argument. if this is not done, the hipMemcpy will still work, but will perform the copy using a staging buffer + * on the host. + * * @param[out] dst Data being copy to * @param[in] src Data being copy from * @param[in] sizeBytes Data size in bytes @@ -830,6 +836,13 @@ hipError_t hipMemcpyToSymbol(const char* symbolName, const void *src, size_t siz * @warning If host or dest are not pinned, the memory copy will be performed synchronously. For best performance, use hipHostMalloc to * allocate host memory that is transferred asynchronously. * + * For hipMemcpy, the copy is always performed by the device associated with the specified stream. + * + * For multi-gpu or peer-to-peer configurations, it is recommended to use a stream which is a attached to the device where the src data is physically located. + * For optimal peer-to-peer copies, the copy device must be able to access the src and dst pointers (by calling hipDeviceEnablePeerAccess with copy agent as the + * current device and src/dest as the peerDevice argument. if this is not done, the hipMemcpy will still work, but will perform the copy using a staging buffer + * on the host. + * * @param[out] dst Data being copy to * @param[in] src Data being copy from * @param[in] sizeBytes Data size in bytes @@ -902,17 +915,15 @@ hipError_t hipMemGetInfo (size_t * free, size_t * total) ; /** * @brief Determine if a device can access a peer's memory. * - * @param [out] canAccessPeer returns true if specified devices are peers. - * @param [in] device - * @param [in] peerDevice + * @param [out] canAccessPeer Returns the peer access capability (0 or 1) + * @param [in] device - device from where memory may be accessed. + * @param [in] peerDevice - device where memory is physically located * * Returns "1" in @p canAccessPeer if the specified @p device is capable * of directly accessing memory physically located on peerDevice , or "0" if not. * * Returns "0" in @p canAccessPeer if deviceId == peerDeviceId, and both are valid devices : a device is not a peer of itself. * - * - * * @returns #hipSuccess, * @returns #hipErrorInvalidDevice if deviceId or peerDeviceId are not valid devices */ diff --git a/include/hcc_detail/staging_buffer.h b/include/hcc_detail/staging_buffer.h index fe53f8474d..4dd4b251e7 100644 --- a/include/hcc_detail/staging_buffer.h +++ b/include/hcc_detail/staging_buffer.h @@ -50,6 +50,8 @@ struct StagingBuffer { void CopyDeviceToHost (void* dst, const void* src, size_t sizeBytes, hsa_signal_t *waitFor); void CopyDeviceToHostPinInPlace(void* dst, const void* src, size_t sizeBytes, hsa_signal_t *waitFor); + void CopyPeerToPeer( void* dst, hsa_agent_t dstAgent, const void* src, hsa_agent_t srcAgent, size_t sizeBytes, hsa_signal_t *waitFor); + private: hsa_agent_t _hsa_agent; @@ -58,6 +60,7 @@ private: char *_pinnedStagingBuffer[_max_buffers]; hsa_signal_t _completion_signal[_max_buffers]; + hsa_signal_t _completion_signal2[_max_buffers]; // P2P needs another set of signals. std::mutex _copy_lock; // provide thread-safe access }; diff --git a/src/hip_hcc.cpp b/src/hip_hcc.cpp index 41469c5ee1..7cf8d3d1f6 100644 --- a/src/hip_hcc.cpp +++ b/src/hip_hcc.cpp @@ -210,6 +210,14 @@ void ihipDeviceCriticalBase_t::recomputePeerAgents() } +template<> +bool ihipDeviceCriticalBase_t::isPeer(const ihipDevice_t *peer) +{ + auto match = std::find(_peers.begin(), _peers.end(), peer); + return (match != std::end(_peers)); +} + + template<> bool ihipDeviceCriticalBase_t::addPeer(ihipDevice_t *peer) { @@ -252,16 +260,24 @@ void ihipDeviceCriticalBase_t::resetPeers(ihipDevice_t *thisDevice) //------------------------------------------------------------------------------------------------- //--- -ihipDevice_t * ihipStream_t::getDevice() const +//Flavor that takes device index. +ihipDevice_t * getDevice(unsigned deviceIndex) { - if (ihipIsValidDevice(_device_index)) { - return &g_devices[_device_index]; + if (ihipIsValidDevice(deviceIndex)) { + return &g_devices[deviceIndex]; } else { return NULL; } }; +//--- +ihipDevice_t * ihipStream_t::getDevice() const +{ + return ::getDevice(_device_index); +}; + + //--- // Allocate a new signal from the signal pool. // Returned signals have value of 0. @@ -1155,16 +1171,23 @@ unsigned ihipStream_t::resolveMemcpyDirection(bool srcInDeviceMem, bool dstInDev // Setup the copyCommandType and the copy agents (for hsa_amd_memory_async_copy) -void ihipStream_t::setCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent) +// srcPhysAcc is the physical location of the src data. For many copies this is the +void ihipStream_t::setAsyncCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent) { - ihipDevice_t *device = this->getDevice(); - hsa_agent_t deviceAgent = device->_hsa_agent; + // current* represents the device associated with the specified stream. + ihipDevice_t *streamDevice = this->getDevice(); + hsa_agent_t streamAgent = streamDevice->_hsa_agent; + + // ROCR runtime logic is : + // - If both src and dst are cpu agent, launch thread and memcpy. We want to avoid this. + // - If either/both src or dst is a gpu agent, use the first gpu agent’s DMA engine to perform the copy. switch (kind) { + //case hipMemcpyHostToHost : *commandType = ihipCommandCopyH2H; *srcAgent=streamAgent; *dstAgent=streamAgent; break; // TODO - enable me, for async copy use SDMA. case hipMemcpyHostToHost : *commandType = ihipCommandCopyH2H; *srcAgent=g_cpu_agent; *dstAgent=g_cpu_agent; break; - case hipMemcpyHostToDevice : *commandType = ihipCommandCopyH2D; *srcAgent=g_cpu_agent; *dstAgent=deviceAgent; break; - case hipMemcpyDeviceToHost : *commandType = ihipCommandCopyD2H; *srcAgent=deviceAgent; *dstAgent=g_cpu_agent; break; - case hipMemcpyDeviceToDevice : *commandType = ihipCommandCopyD2D; *srcAgent=deviceAgent; *dstAgent=deviceAgent; break; + case hipMemcpyHostToDevice : *commandType = ihipCommandCopyH2D; *srcAgent=g_cpu_agent; *dstAgent=streamAgent; break; + case hipMemcpyDeviceToHost : *commandType = ihipCommandCopyD2H; *srcAgent=streamAgent; *dstAgent=g_cpu_agent; break; + case hipMemcpyDeviceToDevice : *commandType = ihipCommandCopyD2D; *srcAgent=streamAgent; *dstAgent=streamAgent; break; default: throw ihipException(hipErrorInvalidMemcpyDirection); }; } @@ -1195,6 +1218,17 @@ void ihipStream_t::copySync(LockedAccessor_StreamCrit_t &crit, void* dst, const hsa_signal_t depSignal; + bool copyEngineCanSeeSrcAndDest = false; + if (kind == hipMemcpyDeviceToDevice) { +#if USE_PEER_TO_PEER>=2 + // TODO - consider refactor. Do we need to support simul access of enable/disable peers with access? + LockedAccessor_DeviceCrit_t dcrit(device->criticalData()); + if (dcrit->isPeer(::getDevice(dstPtrInfo._appId)) && (dcrit->isPeer(::getDevice(srcPtrInfo._appId)))) { + copyEngineCanSeeSrcAndDest = true; + } +#endif + } + if ((kind == hipMemcpyHostToDevice) && (!srcTracked)) { int depSignalCnt = preCopyCommand(crit, NULL, &depSignal, ihipCommandCopyH2D); if (HIP_STAGING_BUFFERS) { @@ -1246,11 +1280,28 @@ void ihipStream_t::copySync(LockedAccessor_StreamCrit_t &crit, void* dst, const tprintf(DB_COPY1, "H2H memcpy dst=%p src=%p sz=%zu\n", dst, src, sizeBytes); memcpy(dst, src, sizeBytes); + } else if ((kind == hipMemcpyDeviceToDevice) && !copyEngineCanSeeSrcAndDest) { + int depSignalCnt = preCopyCommand(crit, NULL, &depSignal, ihipCommandCopyP2P); + if (HIP_STAGING_BUFFERS) { + tprintf(DB_COPY1, "P2P but engine can't see both pointers: staged copy P2P dst=%p src=%p sz=%zu\n", dst, src, sizeBytes); + //printf ("staged-copy- read dep signals\n"); + hsa_agent_t dstAgent = * (static_cast (dstPtrInfo._acc.get_hsa_agent())); + hsa_agent_t srcAgent = * (static_cast (srcPtrInfo._acc.get_hsa_agent())); + + device->_staging_buffer[1]->CopyPeerToPeer(dst, dstAgent, src, srcAgent, sizeBytes, depSignalCnt ? &depSignal : NULL); + + // The copy completes before returning so can reset queue to empty: + this->wait(crit, true); + + } else { + assert(0); // currently no fallback for this path. + } + } else { // If not special case - these can all be handled by the hsa async copy: ihipCommand_t commandType; hsa_agent_t srcAgent, dstAgent; - setCopyAgents(kind, &commandType, &srcAgent, &dstAgent); + setAsyncCopyAgents(kind, &commandType, &srcAgent, &dstAgent); int depSignalCnt = preCopyCommand(crit, NULL, &depSignal, commandType); @@ -1335,7 +1386,7 @@ void ihipStream_t::copyAsync(void* dst, const void* src, size_t sizeBytes, unsig ihipCommand_t commandType; hsa_agent_t srcAgent, dstAgent; - setCopyAgents(kind, &commandType, &srcAgent, &dstAgent); + setAsyncCopyAgents(kind, &commandType, &srcAgent, &dstAgent); hsa_signal_t depSignal; int depSignalCnt = preCopyCommand(crit, ihip_signal, &depSignal, commandType); diff --git a/src/hip_peer.cpp b/src/hip_peer.cpp index d45f95dc6c..abe80ee54d 100644 --- a/src/hip_peer.cpp +++ b/src/hip_peer.cpp @@ -108,7 +108,9 @@ hipError_t hipDeviceEnablePeerAccess (int peerDeviceId, unsigned int flags) } else { auto thisDevice = ihipGetTlsDefaultDevice(); auto peerDevice = ihipGetDevice(peerDeviceId); - if ((thisDevice != NULL) && (peerDevice != NULL)) { + if (thisDevice == peerDevice) { + err = hipErrorInvalidDevice; // Can't enable peer access to self. + } else if ((thisDevice != NULL) && (peerDevice != NULL)) { LockedAccessor_DeviceCrit_t crit(thisDevice->criticalData()); bool isNewPeer = crit->addPeer(peerDevice); if (isNewPeer) { diff --git a/src/staging_buffer.cpp b/src/staging_buffer.cpp index 9ce458797b..ffc1566380 100644 --- a/src/staging_buffer.cpp +++ b/src/staging_buffer.cpp @@ -44,6 +44,7 @@ StagingBuffer::StagingBuffer(hsa_agent_t hsaAgent, hsa_region_t systemRegion, si THROW_ERROR(hipErrorMemoryAllocation); } hsa_signal_create(0, 0, NULL, &_completion_signal[i]); + hsa_signal_create(0, 0, NULL, &_completion_signal2[i]); } }; @@ -57,6 +58,7 @@ StagingBuffer::~StagingBuffer() _pinnedStagingBuffer[i] = NULL; } hsa_signal_destroy(_completion_signal[i]); + hsa_signal_destroy(_completion_signal2[i]); } } @@ -245,9 +247,80 @@ void StagingBuffer::CopyDeviceToHost(void* dst, const void* src, size_t sizeByte dstp1 += theseBytes; } } - - - //for (int i=0; i<_numBuffers; i++) { - // hsa_signal_wait_acquire(_completion_signal[i], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE); - //} +} + + +//--- +//Copies sizeBytes from src to dst, using either a copy to a staging buffer or a staged pin-in-place strategy +//IN: dst - dest pointer - must be accessible from agent this buffer is associated with (via _hsa_agent). +//IN: src - src pointer for copy. Must be accessible from host CPU. +//IN: waitFor - hsaSignal to wait for - the copy will begin only when the specified dependency is resolved. May be NULL indicating no dependency. +void StagingBuffer::CopyPeerToPeer(void* dst, hsa_agent_t dstAgent, const void* src, hsa_agent_t srcAgent, size_t sizeBytes, hsa_signal_t *waitFor) +{ + std::lock_guard l (_copy_lock); + + const char *srcp0 = static_cast (src); + char *dstp1 = static_cast (dst); + + for (int i=0; i<_numBuffers; i++) { + hsa_signal_store_relaxed(_completion_signal[i], 0); + hsa_signal_store_relaxed(_completion_signal2[i], 0); + } + + if (sizeBytes >= UINT64_MAX/2) { + THROW_ERROR (hipErrorInvalidValue); + } + + int64_t bytesRemaining0 = sizeBytes; // bytes to copy from dest into staging buffer. + int64_t bytesRemaining1 = sizeBytes; // bytes to copy from staging buffer into final dest + + while (bytesRemaining1 > 0) { + // First launch the async copies to copy from dest to host + for (int bufferIndex = 0; (bytesRemaining0>0) && (bufferIndex < _numBuffers); bytesRemaining0 -= _bufferSize, bufferIndex++) { + + size_t theseBytes = (bytesRemaining0 > _bufferSize) ? _bufferSize : bytesRemaining0; + + tprintf (DB_COPY2, "P2P: bytesRemaining0=%zu async_copy %zu bytes src:%p to staging:%p\n", bytesRemaining0, theseBytes, srcp0, _pinnedStagingBuffer[bufferIndex]); + hsa_signal_store_relaxed(_completion_signal[bufferIndex], 1); + hsa_status_t hsa_status = hsa_amd_memory_async_copy(_pinnedStagingBuffer[bufferIndex], srcAgent, srcp0, srcAgent, theseBytes, waitFor ? 1:0, waitFor, _completion_signal[bufferIndex]); + if (hsa_status != HSA_STATUS_SUCCESS) { + THROW_ERROR (hipErrorRuntimeMemory); + } + + srcp0 += theseBytes; + + + // Assume subsequent commands are dependent on previous and don't need dependency after first copy submitted, HIP_ONESHOT_COPY_DEP=1 + waitFor = NULL; + } + + // Now unload the staging buffers: + for (int bufferIndex=0; (bytesRemaining1>0) && (bufferIndex < _numBuffers); bytesRemaining1 -= _bufferSize, bufferIndex++) { + + size_t theseBytes = (bytesRemaining1 > _bufferSize) ? _bufferSize : bytesRemaining1; + + tprintf (DB_COPY2, "P2P: wait_completion[%d] bytesRemaining=%zu\n", bufferIndex, bytesRemaining1); + + bool hostWait = 0; + + if (hostWait) { + // Host-side wait, should not be necessary: + hsa_signal_wait_acquire(_completion_signal[bufferIndex], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE); + } + + tprintf (DB_COPY2, "P2P: bytesRemaining1=%zu copy %zu bytes stagingBuf[%d]:%p to device:%p\n", bytesRemaining1, theseBytes, bufferIndex, _pinnedStagingBuffer[bufferIndex], dstp1); + memcpy(dstp1, _pinnedStagingBuffer[bufferIndex], theseBytes); + hsa_status_t hsa_status = hsa_amd_memory_async_copy(dstp1, dstAgent, _pinnedStagingBuffer[bufferIndex], dstAgent /*not used*/, theseBytes, + hostWait ? 0:1, hostWait ? NULL : &_completion_signal[bufferIndex], + _completion_signal2[bufferIndex]); + + dstp1 += theseBytes; + } + } + + + // Wait for the staging-buffer to dest copies to complete: + for (int i=0; i<_numBuffers; i++) { + hsa_signal_wait_acquire(_completion_signal2[i], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE); + } } diff --git a/tests/src/hipPeerToPeer_simple.cpp b/tests/src/hipPeerToPeer_simple.cpp index 2c0dd95b36..d756ee3e18 100644 --- a/tests/src/hipPeerToPeer_simple.cpp +++ b/tests/src/hipPeerToPeer_simple.cpp @@ -88,6 +88,7 @@ void enablePeerFirst() { printf ("\n==testing: %s\n", __func__); + setupPeerTests(); HIPCHECK(hipSetDevice(g_currentDevice)); HIPCHECK(hipDeviceEnablePeerAccess(g_peerDevice, 0)); @@ -111,12 +112,12 @@ void enablePeerFirst() // allocate and initialize memory on device0 HIPCHECK (hipSetDevice(g_currentDevice)); HIPCHECK (hipMalloc(&A_d0, Nbytes) ); - HIPCHECK ( hipMemset(A_d0, memsetval, Nbytes) ); + HIPCHECK (hipMemset(A_d0, memsetval, Nbytes) ); // allocate and initialize memory on peer device HIPCHECK (hipSetDevice(g_peerDevice)); HIPCHECK (hipMalloc(&A_d1, Nbytes) ); - HIPCHECK ( hipMemset(A_d1, 0x13, Nbytes) ); + HIPCHECK (hipMemset(A_d1, 0x13, Nbytes) );