Fixes for P2P and hipDeviceReset
- devicereset would lose track of default stream and thus subsequent
synchronization calls might not actually sychronize.
- Also deviceReset now correctly frees streams.
- fix waits in P2P staging copy - first phase (Device0-to-Staging) must
wait for second phase (Staging to Device1) to finish draining the
buffer.
[ROCm/hip commit: e020d68309]
このコミットが含まれているのは:
@@ -530,12 +530,16 @@ public:
|
||||
bool removePeer(ihipDevice_t *peer);
|
||||
void resetPeers(ihipDevice_t *thisDevice);
|
||||
|
||||
|
||||
void addStream(ihipStream_t *stream);
|
||||
|
||||
uint32_t peerCnt() const { return _peerCnt; };
|
||||
hsa_agent_t *peerAgents() const { return _peerAgents; };
|
||||
|
||||
|
||||
private:
|
||||
std::list<ihipStream_t*> _streams; // streams associated with this device.
|
||||
//std::list< std::shared_ptr<ihipStream_t> > _streams; // streams associated with this device. TODO - convert to shared_ptr.
|
||||
std::list< ihipStream_t* > _streams; // streams associated with this device.
|
||||
ihipStream_t::SeqNum_t _stream_id;
|
||||
|
||||
// These reflect the currently Enabled set of peers for this GPU:
|
||||
|
||||
@@ -172,12 +172,6 @@ hipError_t hipDeviceReset(void)
|
||||
|
||||
|
||||
if (device) {
|
||||
//---
|
||||
//Wait for pending activity to complete? TODO - check if this is required behavior:
|
||||
//TODO, also we have small window between wait and reset.
|
||||
|
||||
device->locked_waitAllStreams();
|
||||
|
||||
// Release device resources (streams and memory):
|
||||
device->locked_reset();
|
||||
}
|
||||
|
||||
@@ -151,9 +151,11 @@ void ihipStream_t::locked_reclaimSignals(SIGSEQNUM sigNum)
|
||||
//---
|
||||
void ihipStream_t::waitCopy(LockedAccessor_StreamCrit_t &crit, ihipSignal_t *signal)
|
||||
{
|
||||
SIGSEQNUM sigNum = signal->_sig_id;
|
||||
tprintf(DB_SYNC, "waitCopy signal:#%lu\n", sigNum);
|
||||
|
||||
hsa_signal_wait_acquire(signal->_hsa_signal, HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
|
||||
|
||||
SIGSEQNUM sigNum = signal->_sig_id;
|
||||
|
||||
tprintf(DB_SIGNAL, "waitCopy reclaim signal #%lu\n", sigNum);
|
||||
// Mark all signals older and including this one as available for reclaim
|
||||
@@ -252,6 +254,14 @@ void ihipDeviceCriticalBase_t<DeviceMutex>::resetPeers(ihipDevice_t *thisDevice)
|
||||
addPeer(thisDevice); // peer-list always contains self agent.
|
||||
}
|
||||
|
||||
|
||||
template<>
|
||||
void ihipDeviceCriticalBase_t<DeviceMutex>::addStream(ihipStream_t *stream)
|
||||
{
|
||||
_streams.push_back(stream);
|
||||
stream->_id = incStreamId();
|
||||
}
|
||||
|
||||
//-------------------------------------------------------------------------------------------------
|
||||
|
||||
//---
|
||||
@@ -461,9 +471,29 @@ void ihipDevice_t::locked_reset()
|
||||
// Obtain mutex access to the device critical data, release by destructor
|
||||
LockedAccessor_DeviceCrit_t crit(_criticalData);
|
||||
|
||||
|
||||
//---
|
||||
//Wait for pending activity to complete? TODO - check if this is required behavior:
|
||||
tprintf(DB_SYNC, "locked_reset waiting for activity to complete.\n");
|
||||
|
||||
// Reset and remove streams:
|
||||
// Delete all created streams including the default one.
|
||||
for (auto streamI=crit->const_streams().begin(); streamI!=crit->const_streams().end(); streamI++) {
|
||||
ihipStream_t *stream = *streamI;
|
||||
(*streamI)->locked_wait();
|
||||
tprintf(DB_SYNC, " delete stream=%p\n", stream);
|
||||
|
||||
delete stream;
|
||||
}
|
||||
// Clear the list.
|
||||
crit->streams().clear();
|
||||
|
||||
|
||||
// Create a fresh default stream and add it:
|
||||
_default_stream = new ihipStream_t(_device_index, _acc.get_default_view(), hipStreamDefault);
|
||||
crit->addStream(_default_stream);
|
||||
|
||||
|
||||
// This resest peer list to just me:
|
||||
crit->resetPeers(this);
|
||||
|
||||
@@ -499,10 +529,7 @@ void ihipDevice_t::init(unsigned device_index, unsigned deviceCnt, hc::accelerat
|
||||
|
||||
locked_reset();
|
||||
|
||||
_default_stream = new ihipStream_t(device_index, acc.get_default_view(), hipStreamDefault);
|
||||
locked_addStream(_default_stream);
|
||||
|
||||
|
||||
tprintf(DB_SYNC, "created device with default_stream=%p\n", _default_stream);
|
||||
|
||||
hsa_region_t *pinnedHostRegion;
|
||||
@@ -783,8 +810,7 @@ void ihipDevice_t::locked_addStream(ihipStream_t *s)
|
||||
{
|
||||
LockedAccessor_DeviceCrit_t crit(_criticalData);
|
||||
|
||||
crit->streams().push_back(s);
|
||||
s->_id = crit->incStreamId();
|
||||
crit->addStream(s);
|
||||
}
|
||||
|
||||
//---
|
||||
@@ -914,7 +940,7 @@ void ihipInit()
|
||||
|
||||
READ_ENV_I(release, HIP_LAUNCH_BLOCKING, CUDA_LAUNCH_BLOCKING, "Make HIP APIs 'host-synchronous', so they block until any kernel launches or data copy commands complete. Alias: CUDA_LAUNCH_BLOCKING." );
|
||||
READ_ENV_I(release, HIP_DB, 0, "Print various debug info. Bitmask, see hip_hcc.cpp for more information.");
|
||||
if ((HIP_DB & DB_API) && (HIP_TRACE_API == 0)) {
|
||||
if ((HIP_DB & (1<<DB_API)) && (HIP_TRACE_API == 0)) {
|
||||
// Set HIP_TRACE_API default before we read it, so it is printed correctly.
|
||||
HIP_TRACE_API = 1;
|
||||
}
|
||||
|
||||
@@ -381,7 +381,7 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s
|
||||
hc::completion_future cf ;
|
||||
|
||||
if ((sizeBytes & 0x3) == 0) {
|
||||
// use a faster word-per-workitem copy:
|
||||
// use a faster dword-per-workitem copy:
|
||||
try {
|
||||
value = value & 0xff;
|
||||
unsigned value32 = (value << 24) | (value << 16) | (value << 8) | (value) ;
|
||||
@@ -404,9 +404,9 @@ hipError_t hipMemsetAsync(void* dst, int value, size_t sizeBytes, hipStream_t s
|
||||
|
||||
|
||||
if (HIP_LAUNCH_BLOCKING) {
|
||||
tprintf (DB_SYNC, "'%s' LAUNCH_BLOCKING wait for completion [stream:%p].\n", __func__, (void*)stream);
|
||||
tprintf (DB_SYNC, "'%s' LAUNCH_BLOCKING wait for memset [stream:%p].\n", __func__, (void*)stream);
|
||||
cf.wait();
|
||||
tprintf (DB_SYNC, "'%s' LAUNCH_BLOCKING completed [stream:%p].\n", __func__, (void*)stream);
|
||||
tprintf (DB_SYNC, "'%s' LAUNCH_BLOCKING memset completed [stream:%p].\n", __func__, (void*)stream);
|
||||
}
|
||||
} else {
|
||||
e = hipErrorInvalidValue;
|
||||
|
||||
@@ -282,6 +282,9 @@ void StagingBuffer::CopyPeerToPeer(void* dst, hsa_agent_t dstAgent, const void*
|
||||
|
||||
size_t theseBytes = (bytesRemaining0 > _bufferSize) ? _bufferSize : bytesRemaining0;
|
||||
|
||||
// Wait to make sure we are not overwriting a buffer before it has been drained:
|
||||
hsa_signal_wait_acquire(_completion_signal2[bufferIndex], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
|
||||
|
||||
tprintf (DB_COPY2, "P2P: bytesRemaining0=%zu async_copy %zu bytes src:%p to staging:%p\n", bytesRemaining0, theseBytes, srcp0, _pinnedStagingBuffer[bufferIndex]);
|
||||
hsa_signal_store_relaxed(_completion_signal[bufferIndex], 1);
|
||||
hsa_status_t hsa_status = hsa_amd_memory_async_copy(_pinnedStagingBuffer[bufferIndex], srcAgent, srcp0, srcAgent, theseBytes, waitFor ? 1:0, waitFor, _completion_signal[bufferIndex]);
|
||||
@@ -311,7 +314,7 @@ void StagingBuffer::CopyPeerToPeer(void* dst, hsa_agent_t dstAgent, const void*
|
||||
}
|
||||
|
||||
tprintf (DB_COPY2, "P2P: bytesRemaining1=%zu copy %zu bytes stagingBuf[%d]:%p to device:%p\n", bytesRemaining1, theseBytes, bufferIndex, _pinnedStagingBuffer[bufferIndex], dstp1);
|
||||
memcpy(dstp1, _pinnedStagingBuffer[bufferIndex], theseBytes);
|
||||
hsa_signal_store_relaxed(_completion_signal2[bufferIndex], 1);
|
||||
hsa_status_t hsa_status = hsa_amd_memory_async_copy(dstp1, dstAgent, _pinnedStagingBuffer[bufferIndex], dstAgent /*not used*/, theseBytes,
|
||||
hostWait ? 0:1, hostWait ? NULL : &_completion_signal[bufferIndex],
|
||||
_completion_signal2[bufferIndex]);
|
||||
|
||||
@@ -113,37 +113,22 @@ void enablePeerFirst()
|
||||
HIPCHECK (hipSetDevice(g_currentDevice));
|
||||
HIPCHECK (hipMalloc(&A_d0, Nbytes) );
|
||||
HIPCHECK (hipMemset(A_d0, memsetval, Nbytes) );
|
||||
// TODO - remove me:
|
||||
HIPCHECK (hipDeviceSynchronize());
|
||||
|
||||
// allocate and initialize memory on peer device
|
||||
HIPCHECK (hipSetDevice(g_peerDevice));
|
||||
HIPCHECK (hipMalloc(&A_d1, Nbytes) );
|
||||
HIPCHECK (hipMemset(A_d1, 0x13, Nbytes) );
|
||||
|
||||
// TODO - remove me:
|
||||
HIPCHECK (hipDeviceSynchronize());
|
||||
|
||||
|
||||
// Device0 push to device1, using P2P:
|
||||
HIPCHECK (hipSetDevice(p_memcpyWithPeer ? g_peerDevice : g_currentDevice));
|
||||
HIPCHECK (hipMemcpy(A_d1, A_d0, Nbytes, hipMemcpyDefault)); // This is P2P copy.
|
||||
|
||||
// TODO - remove me:
|
||||
if (1) {
|
||||
HIPCHECK (hipSetDevice(g_currentDevice));
|
||||
HIPCHECK (hipDeviceSynchronize());
|
||||
HIPCHECK (hipSetDevice(g_peerDevice));
|
||||
HIPCHECK (hipDeviceSynchronize());
|
||||
}
|
||||
|
||||
// Copy data back to host:
|
||||
HIPCHECK (hipSetDevice(g_peerDevice));
|
||||
HIPCHECK (hipMemcpy(A_h, A_d1, Nbytes, hipMemcpyDeviceToHost));
|
||||
|
||||
// TODO - remove me:
|
||||
HIPCHECK (hipDeviceSynchronize());
|
||||
|
||||
// Check host data:
|
||||
for (int i=0; i<N; i++) {
|
||||
if (A_h[i] != memsetval) {
|
||||
|
||||
新しいイシューから参照
ユーザーをブロックする