P2P Update.

- add P2P staging buffer copy.
- If copy device does not have sufficient access permissions, fall back
  to staging buffer.
- improve docs for which copy device is used.
此提交包含在:
Ben Sander
2016-04-16 10:18:56 -05:00
父節點 22d15dcdbc
當前提交 dcabc9dbf7
共有 7 個檔案被更改,包括 172 行新增28 行删除
+7 -4
查看文件
@@ -38,7 +38,7 @@ THE SOFTWARE.
// Compile peer-to-peer support.
// >= 2 : use HCC hc:accelerator::get_is_peer
// >= 3 : use hc::am_memtracker_update_peers(...)
#define USE_PEER_TO_PEER 0
#define USE_PEER_TO_PEER 2
// Use new lock API in HCC:
#define USE_HCC_LOCK 0
@@ -247,11 +247,12 @@ enum ihipCommand_t {
ihipCommandCopyH2D,
ihipCommandCopyD2H,
ihipCommandCopyD2D,
ihipCommandCopyP2P,
ihipCommandKernel,
};
static const char* ihipCommandName[] = {
"CopyH2H", "CopyH2D", "CopyD2H", "CopyD2D", "Kernel"
"CopyH2H", "CopyH2D", "CopyD2H", "CopyD2D", "CopyP2P", "Kernel"
};
@@ -451,7 +452,7 @@ private:
// The unsigned return is hipMemcpyKind
unsigned resolveMemcpyDirection(bool srcInDeviceMem, bool dstInDeviceMem);
void setCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent);
void setAsyncCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent);
unsigned _device_index; // index into the g_device array
@@ -527,6 +528,7 @@ public:
// "Allocate" a stream ID:
ihipStream_t::SeqNum_t incStreamId() { return _stream_id++; };
bool isPeer(const ihipDevice_t *peer); // returns Trus if peer has access to memory physically located on this device.
bool addPeer(ihipDevice_t *peer);
bool removePeer(ihipDevice_t *peer);
void resetPeers(ihipDevice_t *thisDevice);
@@ -540,9 +542,10 @@ private:
ihipStream_t::SeqNum_t _stream_id;
// These reflect the currently Enabled set of peers for this GPU:
// Enabled peers have permissions to access the memory physically allocated on this device.
std::list<ihipDevice_t*> _peers; // list of enabled peer devices.
uint32_t _peerCnt; // number of enabled peers
hsa_agent_t *_peerAgents; // efficient packed array of enabled agents (to use for allocations.)
hsa_agent_t *_peerAgents; // efficient packed array of enabled agents (to use for allocations.)
private:
void recomputePeerAgents();
};
+16 -5
查看文件
@@ -798,6 +798,12 @@ hipError_t hipHostFree(void* ptr);
* device to host, device to device and host to host
* The src and dst must not overlap.
*
* For hipMemcpy, the copy is always performed by the current device (set by hipSetDevice).
* For multi-gpu or peer-to-peer configurations, it is recommended to set the current device to the device where the src data is physically located.
* For optimal peer-to-peer copies, the copy device must be able to access the src and dst pointers (by calling hipDeviceEnablePeerAccess with copy agent as the
* current device and src/dest as the peerDevice argument. if this is not done, the hipMemcpy will still work, but will perform the copy using a staging buffer
* on the host.
*
* @param[out] dst Data being copy to
* @param[in] src Data being copy from
* @param[in] sizeBytes Data size in bytes
@@ -830,6 +836,13 @@ hipError_t hipMemcpyToSymbol(const char* symbolName, const void *src, size_t siz
* @warning If host or dest are not pinned, the memory copy will be performed synchronously. For best performance, use hipHostMalloc to
* allocate host memory that is transferred asynchronously.
*
* For hipMemcpy, the copy is always performed by the device associated with the specified stream.
*
* For multi-gpu or peer-to-peer configurations, it is recommended to use a stream which is a attached to the device where the src data is physically located.
* For optimal peer-to-peer copies, the copy device must be able to access the src and dst pointers (by calling hipDeviceEnablePeerAccess with copy agent as the
* current device and src/dest as the peerDevice argument. if this is not done, the hipMemcpy will still work, but will perform the copy using a staging buffer
* on the host.
*
* @param[out] dst Data being copy to
* @param[in] src Data being copy from
* @param[in] sizeBytes Data size in bytes
@@ -902,17 +915,15 @@ hipError_t hipMemGetInfo (size_t * free, size_t * total) ;
/**
* @brief Determine if a device can access a peer's memory.
*
* @param [out] canAccessPeer returns true if specified devices are peers.
* @param [in] device
* @param [in] peerDevice
* @param [out] canAccessPeer Returns the peer access capability (0 or 1)
* @param [in] device - device from where memory may be accessed.
* @param [in] peerDevice - device where memory is physically located
*
* Returns "1" in @p canAccessPeer if the specified @p device is capable
* of directly accessing memory physically located on peerDevice , or "0" if not.
*
* Returns "0" in @p canAccessPeer if deviceId == peerDeviceId, and both are valid devices : a device is not a peer of itself.
*
*
*
* @returns #hipSuccess,
* @returns #hipErrorInvalidDevice if deviceId or peerDeviceId are not valid devices
*/
+3
查看文件
@@ -50,6 +50,8 @@ struct StagingBuffer {
void CopyDeviceToHost (void* dst, const void* src, size_t sizeBytes, hsa_signal_t *waitFor);
void CopyDeviceToHostPinInPlace(void* dst, const void* src, size_t sizeBytes, hsa_signal_t *waitFor);
void CopyPeerToPeer( void* dst, hsa_agent_t dstAgent, const void* src, hsa_agent_t srcAgent, size_t sizeBytes, hsa_signal_t *waitFor);
private:
hsa_agent_t _hsa_agent;
@@ -58,6 +60,7 @@ private:
char *_pinnedStagingBuffer[_max_buffers];
hsa_signal_t _completion_signal[_max_buffers];
hsa_signal_t _completion_signal2[_max_buffers]; // P2P needs another set of signals.
std::mutex _copy_lock; // provide thread-safe access
};
+62 -11
查看文件
@@ -210,6 +210,14 @@ void ihipDeviceCriticalBase_t<DeviceMutex>::recomputePeerAgents()
}
template<>
bool ihipDeviceCriticalBase_t<DeviceMutex>::isPeer(const ihipDevice_t *peer)
{
auto match = std::find(_peers.begin(), _peers.end(), peer);
return (match != std::end(_peers));
}
template<>
bool ihipDeviceCriticalBase_t<DeviceMutex>::addPeer(ihipDevice_t *peer)
{
@@ -252,16 +260,24 @@ void ihipDeviceCriticalBase_t<DeviceMutex>::resetPeers(ihipDevice_t *thisDevice)
//-------------------------------------------------------------------------------------------------
//---
ihipDevice_t * ihipStream_t::getDevice() const
//Flavor that takes device index.
ihipDevice_t * getDevice(unsigned deviceIndex)
{
if (ihipIsValidDevice(_device_index)) {
return &g_devices[_device_index];
if (ihipIsValidDevice(deviceIndex)) {
return &g_devices[deviceIndex];
} else {
return NULL;
}
};
//---
ihipDevice_t * ihipStream_t::getDevice() const
{
return ::getDevice(_device_index);
};
//---
// Allocate a new signal from the signal pool.
// Returned signals have value of 0.
@@ -1155,16 +1171,23 @@ unsigned ihipStream_t::resolveMemcpyDirection(bool srcInDeviceMem, bool dstInDev
// Setup the copyCommandType and the copy agents (for hsa_amd_memory_async_copy)
void ihipStream_t::setCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent)
// srcPhysAcc is the physical location of the src data. For many copies this is the
void ihipStream_t::setAsyncCopyAgents(unsigned kind, ihipCommand_t *commandType, hsa_agent_t *srcAgent, hsa_agent_t *dstAgent)
{
ihipDevice_t *device = this->getDevice();
hsa_agent_t deviceAgent = device->_hsa_agent;
// current* represents the device associated with the specified stream.
ihipDevice_t *streamDevice = this->getDevice();
hsa_agent_t streamAgent = streamDevice->_hsa_agent;
// ROCR runtime logic is :
// - If both src and dst are cpu agent, launch thread and memcpy. We want to avoid this.
// - If either/both src or dst is a gpu agent, use the first gpu agent’s DMA engine to perform the copy.
switch (kind) {
//case hipMemcpyHostToHost : *commandType = ihipCommandCopyH2H; *srcAgent=streamAgent; *dstAgent=streamAgent; break; // TODO - enable me, for async copy use SDMA.
case hipMemcpyHostToHost : *commandType = ihipCommandCopyH2H; *srcAgent=g_cpu_agent; *dstAgent=g_cpu_agent; break;
case hipMemcpyHostToDevice : *commandType = ihipCommandCopyH2D; *srcAgent=g_cpu_agent; *dstAgent=deviceAgent; break;
case hipMemcpyDeviceToHost : *commandType = ihipCommandCopyD2H; *srcAgent=deviceAgent; *dstAgent=g_cpu_agent; break;
case hipMemcpyDeviceToDevice : *commandType = ihipCommandCopyD2D; *srcAgent=deviceAgent; *dstAgent=deviceAgent; break;
case hipMemcpyHostToDevice : *commandType = ihipCommandCopyH2D; *srcAgent=g_cpu_agent; *dstAgent=streamAgent; break;
case hipMemcpyDeviceToHost : *commandType = ihipCommandCopyD2H; *srcAgent=streamAgent; *dstAgent=g_cpu_agent; break;
case hipMemcpyDeviceToDevice : *commandType = ihipCommandCopyD2D; *srcAgent=streamAgent; *dstAgent=streamAgent; break;
default: throw ihipException(hipErrorInvalidMemcpyDirection);
};
}
@@ -1195,6 +1218,17 @@ void ihipStream_t::copySync(LockedAccessor_StreamCrit_t &crit, void* dst, const
hsa_signal_t depSignal;
bool copyEngineCanSeeSrcAndDest = false;
if (kind == hipMemcpyDeviceToDevice) {
#if USE_PEER_TO_PEER>=2
// TODO - consider refactor. Do we need to support simul access of enable/disable peers with access?
LockedAccessor_DeviceCrit_t dcrit(device->criticalData());
if (dcrit->isPeer(::getDevice(dstPtrInfo._appId)) && (dcrit->isPeer(::getDevice(srcPtrInfo._appId)))) {
copyEngineCanSeeSrcAndDest = true;
}
#endif
}
if ((kind == hipMemcpyHostToDevice) && (!srcTracked)) {
int depSignalCnt = preCopyCommand(crit, NULL, &depSignal, ihipCommandCopyH2D);
if (HIP_STAGING_BUFFERS) {
@@ -1246,11 +1280,28 @@ void ihipStream_t::copySync(LockedAccessor_StreamCrit_t &crit, void* dst, const
tprintf(DB_COPY1, "H2H memcpy dst=%p src=%p sz=%zu\n", dst, src, sizeBytes);
memcpy(dst, src, sizeBytes);
} else if ((kind == hipMemcpyDeviceToDevice) && !copyEngineCanSeeSrcAndDest) {
int depSignalCnt = preCopyCommand(crit, NULL, &depSignal, ihipCommandCopyP2P);
if (HIP_STAGING_BUFFERS) {
tprintf(DB_COPY1, "P2P but engine can't see both pointers: staged copy P2P dst=%p src=%p sz=%zu\n", dst, src, sizeBytes);
//printf ("staged-copy- read dep signals\n");
hsa_agent_t dstAgent = * (static_cast<hsa_agent_t*> (dstPtrInfo._acc.get_hsa_agent()));
hsa_agent_t srcAgent = * (static_cast<hsa_agent_t*> (srcPtrInfo._acc.get_hsa_agent()));
device->_staging_buffer[1]->CopyPeerToPeer(dst, dstAgent, src, srcAgent, sizeBytes, depSignalCnt ? &depSignal : NULL);
// The copy completes before returning so can reset queue to empty:
this->wait(crit, true);
} else {
assert(0); // currently no fallback for this path.
}
} else {
// If not special case - these can all be handled by the hsa async copy:
ihipCommand_t commandType;
hsa_agent_t srcAgent, dstAgent;
setCopyAgents(kind, &commandType, &srcAgent, &dstAgent);
setAsyncCopyAgents(kind, &commandType, &srcAgent, &dstAgent);
int depSignalCnt = preCopyCommand(crit, NULL, &depSignal, commandType);
@@ -1335,7 +1386,7 @@ void ihipStream_t::copyAsync(void* dst, const void* src, size_t sizeBytes, unsig
ihipCommand_t commandType;
hsa_agent_t srcAgent, dstAgent;
setCopyAgents(kind, &commandType, &srcAgent, &dstAgent);
setAsyncCopyAgents(kind, &commandType, &srcAgent, &dstAgent);
hsa_signal_t depSignal;
int depSignalCnt = preCopyCommand(crit, ihip_signal, &depSignal, commandType);
+3 -1
查看文件
@@ -108,7 +108,9 @@ hipError_t hipDeviceEnablePeerAccess (int peerDeviceId, unsigned int flags)
} else {
auto thisDevice = ihipGetTlsDefaultDevice();
auto peerDevice = ihipGetDevice(peerDeviceId);
if ((thisDevice != NULL) && (peerDevice != NULL)) {
if (thisDevice == peerDevice) {
err = hipErrorInvalidDevice; // Can't enable peer access to self.
} else if ((thisDevice != NULL) && (peerDevice != NULL)) {
LockedAccessor_DeviceCrit_t crit(thisDevice->criticalData());
bool isNewPeer = crit->addPeer(peerDevice);
if (isNewPeer) {
+78 -5
查看文件
@@ -44,6 +44,7 @@ StagingBuffer::StagingBuffer(hsa_agent_t hsaAgent, hsa_region_t systemRegion, si
THROW_ERROR(hipErrorMemoryAllocation);
}
hsa_signal_create(0, 0, NULL, &_completion_signal[i]);
hsa_signal_create(0, 0, NULL, &_completion_signal2[i]);
}
};
@@ -57,6 +58,7 @@ StagingBuffer::~StagingBuffer()
_pinnedStagingBuffer[i] = NULL;
}
hsa_signal_destroy(_completion_signal[i]);
hsa_signal_destroy(_completion_signal2[i]);
}
}
@@ -245,9 +247,80 @@ void StagingBuffer::CopyDeviceToHost(void* dst, const void* src, size_t sizeByte
dstp1 += theseBytes;
}
}
//for (int i=0; i<_numBuffers; i++) {
// hsa_signal_wait_acquire(_completion_signal[i], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
//}
}
//---
//Copies sizeBytes from src to dst, using either a copy to a staging buffer or a staged pin-in-place strategy
//IN: dst - dest pointer - must be accessible from agent this buffer is associated with (via _hsa_agent).
//IN: src - src pointer for copy. Must be accessible from host CPU.
//IN: waitFor - hsaSignal to wait for - the copy will begin only when the specified dependency is resolved. May be NULL indicating no dependency.
void StagingBuffer::CopyPeerToPeer(void* dst, hsa_agent_t dstAgent, const void* src, hsa_agent_t srcAgent, size_t sizeBytes, hsa_signal_t *waitFor)
{
std::lock_guard<std::mutex> l (_copy_lock);
const char *srcp0 = static_cast<const char*> (src);
char *dstp1 = static_cast<char*> (dst);
for (int i=0; i<_numBuffers; i++) {
hsa_signal_store_relaxed(_completion_signal[i], 0);
hsa_signal_store_relaxed(_completion_signal2[i], 0);
}
if (sizeBytes >= UINT64_MAX/2) {
THROW_ERROR (hipErrorInvalidValue);
}
int64_t bytesRemaining0 = sizeBytes; // bytes to copy from dest into staging buffer.
int64_t bytesRemaining1 = sizeBytes; // bytes to copy from staging buffer into final dest
while (bytesRemaining1 > 0) {
// First launch the async copies to copy from dest to host
for (int bufferIndex = 0; (bytesRemaining0>0) && (bufferIndex < _numBuffers); bytesRemaining0 -= _bufferSize, bufferIndex++) {
size_t theseBytes = (bytesRemaining0 > _bufferSize) ? _bufferSize : bytesRemaining0;
tprintf (DB_COPY2, "P2P: bytesRemaining0=%zu async_copy %zu bytes src:%p to staging:%p\n", bytesRemaining0, theseBytes, srcp0, _pinnedStagingBuffer[bufferIndex]);
hsa_signal_store_relaxed(_completion_signal[bufferIndex], 1);
hsa_status_t hsa_status = hsa_amd_memory_async_copy(_pinnedStagingBuffer[bufferIndex], srcAgent, srcp0, srcAgent, theseBytes, waitFor ? 1:0, waitFor, _completion_signal[bufferIndex]);
if (hsa_status != HSA_STATUS_SUCCESS) {
THROW_ERROR (hipErrorRuntimeMemory);
}
srcp0 += theseBytes;
// Assume subsequent commands are dependent on previous and don't need dependency after first copy submitted, HIP_ONESHOT_COPY_DEP=1
waitFor = NULL;
}
// Now unload the staging buffers:
for (int bufferIndex=0; (bytesRemaining1>0) && (bufferIndex < _numBuffers); bytesRemaining1 -= _bufferSize, bufferIndex++) {
size_t theseBytes = (bytesRemaining1 > _bufferSize) ? _bufferSize : bytesRemaining1;
tprintf (DB_COPY2, "P2P: wait_completion[%d] bytesRemaining=%zu\n", bufferIndex, bytesRemaining1);
bool hostWait = 0;
if (hostWait) {
// Host-side wait, should not be necessary:
hsa_signal_wait_acquire(_completion_signal[bufferIndex], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
}
tprintf (DB_COPY2, "P2P: bytesRemaining1=%zu copy %zu bytes stagingBuf[%d]:%p to device:%p\n", bytesRemaining1, theseBytes, bufferIndex, _pinnedStagingBuffer[bufferIndex], dstp1);
memcpy(dstp1, _pinnedStagingBuffer[bufferIndex], theseBytes);
hsa_status_t hsa_status = hsa_amd_memory_async_copy(dstp1, dstAgent, _pinnedStagingBuffer[bufferIndex], dstAgent /*not used*/, theseBytes,
hostWait ? 0:1, hostWait ? NULL : &_completion_signal[bufferIndex],
_completion_signal2[bufferIndex]);
dstp1 += theseBytes;
}
}
// Wait for the staging-buffer to dest copies to complete:
for (int i=0; i<_numBuffers; i++) {
hsa_signal_wait_acquire(_completion_signal2[i], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
}
}
+3 -2
查看文件
@@ -88,6 +88,7 @@ void enablePeerFirst()
{
printf ("\n==testing: %s\n", __func__);
setupPeerTests();
HIPCHECK(hipSetDevice(g_currentDevice));
HIPCHECK(hipDeviceEnablePeerAccess(g_peerDevice, 0));
@@ -111,12 +112,12 @@ void enablePeerFirst()
// allocate and initialize memory on device0
HIPCHECK (hipSetDevice(g_currentDevice));
HIPCHECK (hipMalloc(&A_d0, Nbytes) );
HIPCHECK ( hipMemset(A_d0, memsetval, Nbytes) );
HIPCHECK (hipMemset(A_d0, memsetval, Nbytes) );
// allocate and initialize memory on peer device
HIPCHECK (hipSetDevice(g_peerDevice));
HIPCHECK (hipMalloc(&A_d1, Nbytes) );
HIPCHECK ( hipMemset(A_d1, 0x13, Nbytes) );
HIPCHECK (hipMemset(A_d1, 0x13, Nbytes) );