From 931b58f43883fdb207eb95bf7bc2124ccd1a0e3f Mon Sep 17 00:00:00 2001 From: Ben Sander Date: Wed, 16 Mar 2016 21:16:29 -0500 Subject: [PATCH] Checkpoint code cleanup. - Refactor ihipStream in prep for thread-safe implementation. - Do some work on PinInPlace implementation. [ROCm/clr commit: 8acb53e160cbaf7230d82dd3c85d4aa52ccd566a] --- projects/clr/hipamd/src/hip_hcc.cpp | 116 ++++++++++++++++++---------- 1 file changed, 76 insertions(+), 40 deletions(-) diff --git a/projects/clr/hipamd/src/hip_hcc.cpp b/projects/clr/hipamd/src/hip_hcc.cpp index 41aea51c0c..6f28f86d9c 100644 --- a/projects/clr/hipamd/src/hip_hcc.cpp +++ b/projects/clr/hipamd/src/hip_hcc.cpp @@ -201,6 +201,8 @@ typedef FakeMutex StreamMutex; #endif +// TODO - move async copy code into stream? Stream->async-copy. +// Add PreCopy / PostCopy to manage locks? // Internal stream structure. class ihipStream_t { @@ -209,28 +211,42 @@ public: ihipStream_t(unsigned device_index, hc::accelerator_view av, unsigned int flags); ~ihipStream_t(); - inline void reclaimSignals(SIGSEQNUM sigNum); - inline void waitAndReclaimOlder(ihipSignal_t *signal); - inline void wait(); - inline ihipDevice_t * getDevice() const; - - ihipSignal_t * getSignal() ; + void ihipSyncCopy(void* dst, const void* src, size_t sizeBytes, hipMemcpyKind kind); + //--- + // Thread-safe accessors - these acquire / release mutex: inline bool preKernelCommand(); inline void postKernelCommand(hc::completion_future &kernel_future); - inline int copyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType); - inline void resetToEmpty(); + inline int preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType); + inline void postCopyCommand(); + inline void reclaimSignals_ts(SIGSEQNUM sigNum); + inline void wait(); + + + + // Non-threadsafe accessors - must be protected by high-level stream lock: inline SIGSEQNUM lastCopySeqId() { return _last_copy_signal ? _last_copy_signal->_sig_id : 0; }; + ihipSignal_t * allocSignal(); + + + //-- Non-racy accessors: + // These functions access fields set at initialization time and are non-racy (so do not acquire mutex) + inline ihipDevice_t * getDevice() const; StreamMutex & mutex() {return _mutex;}; //--- + //Member vars - these are set at initialization: hc::accelerator_view _av; unsigned _flags; private: void enqueueBarrier(hsa_queue_t* queue, ihipSignal_t *depSignal); + inline void waitCopy(ihipSignal_t *signal); + inline void resetToEmpty(); + + //--- unsigned _device_index; ihipCommand_t _last_command_type; // type of the last command @@ -417,7 +433,7 @@ void ihipStream_t::resetToEmpty() } //--- -void ihipStream_t::reclaimSignals(SIGSEQNUM sigNum) +void ihipStream_t::reclaimSignals_ts(SIGSEQNUM sigNum) { tprintf(DB_SIGNAL, "reclaim signal #%lu\n", sigNum); // Mark all signals older and including this one as available for @@ -426,23 +442,27 @@ void ihipStream_t::reclaimSignals(SIGSEQNUM sigNum) //--- -void ihipStream_t::waitAndReclaimOlder(ihipSignal_t *signal) +void ihipStream_t::waitCopy(ihipSignal_t *signal) { hsa_signal_wait_acquire(_last_copy_signal->_hsa_signal, HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE); - reclaimSignals(_last_copy_signal->_sig_id); + SIGSEQNUM sigNum = _last_copy_signal->_sig_id; + + tprintf(DB_SIGNAL, "reclaim signal #%lu\n", sigNum); + // Mark all signals older and including this one as available for + _oldest_live_sig_id = sigNum+1; } //--- -//Wait for all queues kernels in the associated accelerator_view to complete. +//Wait for all kernel and data copy commands in this stream to complete. void ihipStream_t::wait() { tprintf (DB_SYNC, "stream %p wait for queue-empty and lastCopy:#%lu...\n", this, _last_copy_signal ? _last_copy_signal->_sig_id: 0x0 ); _av.wait(); if (_last_copy_signal) { - this->waitAndReclaimOlder(_last_copy_signal); + this->waitCopy(_last_copy_signal); } resetToEmpty(); @@ -464,7 +484,7 @@ inline ihipDevice_t * ihipStream_t::getDevice() const // Allocate a new signal from the signal pool. // Returned signals have value of 0. // Signals are intended for use in this stream and are always reclaimed "in-order". -ihipSignal_t *ihipStream_t::getSignal() +ihipSignal_t *ihipStream_t::allocSignal() { int numToScan = _signalPool.size(); do { @@ -489,7 +509,7 @@ ihipSignal_t *ihipStream_t::getSignal() _signalCursor = _signalPool.size(); // set to the beginning of the new entries: _signalPool.resize(_signalPool.size() * 2); tprintf (DB_SIGNAL, "grow signal pool to %zu entries, cursor=%d\n", _signalPool.size(), _signalCursor); - return getSignal(); // try again, + return allocSignal(); // try again, // Should never reach here. assert(0); @@ -551,7 +571,7 @@ inline bool ihipStream_t::preKernelCommand() } else if (HIP_DISABLE_HW_KERNEL_DEP>0) { tprintf (DB_SYNC, "stream %p switch %s to %s (HOST wait for previous...)\n", this, ihipCommandName[_last_command_type], ihipCommandName[ihipCommandKernel]); - this->waitAndReclaimOlder(_last_copy_signal); + this->waitCopy(_last_copy_signal); } else if (HIP_DISABLE_HW_KERNEL_DEP==-1) { tprintf (DB_SYNC, "stream %p switch %s to %s (IGNORE dependency)\n", this, ihipCommandName[_last_command_type], ihipCommandName[ihipCommandKernel]); @@ -577,11 +597,14 @@ inline void ihipStream_t::postKernelCommand(hc::completion_future &kernelFuture) //--- // Called whenever a copy command is set to the stream. // Examines the last command sent to this stream and returns a signal to wait on, if required. -inline int ihipStream_t::copyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType) +int ihipStream_t::preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType) { int needSync = 0; waitSignal->handle = 0; + + //_mutex.lock(); // will be unlocked in postCopyCommand + // If switching command types, we need to add a barrier packet to synchronize things. if (FORCE_SAMEDIR_COPY_DEP || (_last_command_type != copyType)) { @@ -622,6 +645,13 @@ inline int ihipStream_t::copyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitS } +//--- +inline void ihipStream_t::postCopyCommand() +{ + //_mutex.unlock(); +} + + //================================================================================================= // //Reset the device - this is called from hipDeviceReset. @@ -1139,9 +1169,6 @@ inline hipStream_t ihipSyncAndResolveStream(hipStream_t stream) - - - // TODO - data-up to data-down: // Called just before a kernel is launched from hipLaunchKernel. // Allows runtime to track some information about the stream. @@ -1351,8 +1378,6 @@ hipError_t hipDeviceReset(void) device->reset(); // re-allocate required resources. } - // TODO - reset all streams on the device. - return ihipLogStatus(hipSuccess); } @@ -1472,6 +1497,8 @@ hipError_t hipGetLastError() ihipLogStatus(hipSuccess); } + +//--- hipError_t hipPeakAtLastError() { std::call_once(hip_initialized, ihipInit); @@ -1547,6 +1574,8 @@ hipError_t hipStreamCreateWithFlags(hipStream_t *stream, unsigned int flags) return ihipLogStatus(hipSuccess); } + +//--- /** * @bug This function conservatively waits for all work in the specified stream to complete. */ @@ -1558,7 +1587,9 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int hipError_t e = hipSuccess; { - // Super-conservative version of this - TODO - remove me: + // TODO-hcc Convert to use create_blocking_marker(...) functionality. + // Currently we have a super-conservative version of this - block on host, and drain the queue. + // This should create a barrier packet in the target queue. stream->wait(); e = hipSuccess; } @@ -1567,6 +1598,7 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int }; +//--- hipError_t hipStreamSynchronize(hipStream_t stream) { std::call_once(hip_initialized, ihipInit); @@ -1740,7 +1772,7 @@ hipError_t hipEventSynchronize(hipEvent_t event) #else eh->_marker.wait(); #endif - eh->_stream->reclaimSignals(eh->_copy_seq_id); + eh->_stream->reclaimSignals_ts(eh->_copy_seq_id); return ihipLogStatus(hipSuccess); } @@ -2055,9 +2087,8 @@ hipError_t hipHostAlloc(void** ptr, size_t sizeBytes, unsigned int flags){ if(device){ if(flags & hipHostAllocDefault){ - const unsigned am_flags = amHostPinned; - *ptr = hc::am_alloc(sizeBytes, device->_acc, am_flags); + *ptr = hc::am_alloc(sizeBytes, device->_acc, amHostPinned); if(sizeBytes && (*ptr == NULL)){ hip_status = hipErrorMemoryAllocation; }else{ @@ -2066,9 +2097,8 @@ hipError_t hipHostAlloc(void** ptr, size_t sizeBytes, unsigned int flags){ tprintf(DB_MEM, " %s: pinned ptr=%p\n", __func__, *ptr); } if(flags & hipHostAllocMapped){ - const unsigned am_flags = amHostPinned; - *ptr = hc::am_alloc(sizeBytes, device->_acc, am_flags); + *ptr = hc::am_alloc(sizeBytes, device->_acc, amHostPinned); if(sizeBytes && (*ptr == NULL)){ hip_status = hipErrorMemoryAllocation; }else{ @@ -2189,7 +2219,7 @@ hipError_t hipMemcpyToSymbol(const char* symbolName, const void *src, size_t cou auto device = ihipGetTlsDefaultDevice(); //hsa_signal_t depSignal; - //int depSignalCnt = device._null_stream->copyCommand(NULL, &depSignal, ihipCommandCopyH2D); + //int depSignalCnt = device._null_stream->preCopyCommand(NULL, &depSignal, ihipCommandCopyH2D); assert(0); // Need to properly synchronize the copy - do something with depSignal if != NULL. device->_acc.memcpy_symbol(symbolName, (void*) src,count, offset); @@ -2255,12 +2285,14 @@ void StagingBuffer::CopyHostToDevicePinInPlace(void* dst, const void* src, size_ tprintf (DB_COPY2, "H2D: waiting... on completion signal handle=%lu\n", _completion_signal[bufferIndex].handle); hsa_signal_wait_acquire(_completion_signal[bufferIndex], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE); - tprintf (DB_COPY2, "H2D: bytesRemaining=%zu: pin-in-place:%p+%zu bufferIndex[%d]\n", bytesRemaining, srcp, theseBytes, bufferIndex); - memcpy(_pinnedStagingBuffer[bufferIndex], srcp, theseBytes); + void * masked_srcp = (void*) ((uintptr_t)srcp & (uintptr_t)(~0x3f)) ; // TODO void *locked_srcp; - hsa_status_t hsa_status = hsa_amd_memory_lock(const_cast (srcp), theseBytes, &_device->_hsa_agent, 1, &locked_srcp); + hsa_status_t hsa_status = hsa_amd_memory_lock(masked_srcp, theseBytes, &_device->_hsa_agent, 1, &locked_srcp); + //hsa_status_t hsa_status = hsa_amd_memory_lock(const_cast (srcp), theseBytes, &_device->_hsa_agent, 1, &locked_srcp); + tprintf (DB_COPY2, "H2D: bytesRemaining=%zu: pin-in-place:%p+%zu bufferIndex[%d]\n", bytesRemaining, srcp, theseBytes, bufferIndex); + printf ("status=%x srcp=%p, masked_srcp=%p, locked_srcp=%p\n", hsa_status, srcp, masked_srcp, locked_srcp); if (hsa_status != HSA_STATUS_SUCCESS) { throw (ihipException(hipErrorUnknown)); @@ -2416,9 +2448,9 @@ void StagingBuffer::CopyDeviceToHost(void* dst, const void* src, size_t sizeByte -void ihipSyncCopy(ihipStream_t *stream, void* dst, const void* src, size_t sizeBytes, hipMemcpyKind kind) +void ihipStream_t::ihipSyncCopy(void* dst, const void* src, size_t sizeBytes, hipMemcpyKind kind) { - ihipDevice_t *device = stream->getDevice(); + ihipDevice_t *device = this->getDevice(); if (device == NULL) { @@ -2451,7 +2483,7 @@ void ihipSyncCopy(ihipStream_t *stream, void* dst, const void* src, size_t sizeB } hsa_signal_t depSignal; - int depSignalCnt = stream->copyCommand(NULL, &depSignal, ihipCommandCopyH2D); + int depSignalCnt = this->preCopyCommand(NULL, &depSignal, ihipCommandCopyH2D); if ((kind == hipMemcpyHostToDevice) && (srcNotTracked)) { @@ -2466,7 +2498,7 @@ void ihipSyncCopy(ihipStream_t *stream, void* dst, const void* src, size_t sizeB } // The copy waits for inputs and then completes before returning. - stream->resetToEmpty(); + this->resetToEmpty(); } else { // TODO - remove, slow path. tprintf(DB_COPY1, "H2D && srcNotTracked: am_copy dst=%p src=%p sz=%zu\n", dst, src, sizeBytes); @@ -2534,7 +2566,7 @@ hipError_t hipMemcpy(void* dst, const void* src, size_t sizeBytes, hipMemcpyKind hipError_t e = hipSuccess; try { - ihipSyncCopy(stream, dst, src, sizeBytes, kind); + stream->ihipSyncCopy(dst, src, sizeBytes, kind); } catch (ihipException ex) { e = ex._code; @@ -2616,7 +2648,7 @@ hipError_t hipMemcpyAsync(void* dst, const void* src, size_t sizeBytes, hipMemcp return hipErrorInvalidMemcpyDirection; } } - ihipSignal_t *ihip_signal = stream->getSignal(); + ihipSignal_t *ihip_signal = stream->allocSignal(); hsa_signal_store_relaxed(ihip_signal->_hsa_signal, 1); ihipCommand_t copyType; @@ -2635,7 +2667,7 @@ hipError_t hipMemcpyAsync(void* dst, const void* src, size_t sizeBytes, hipMemcp if(trueAsync == true){ hsa_signal_t depSignal; - int depSignalCnt = stream->copyCommand(ihip_signal, &depSignal, copyType); + int depSignalCnt = stream->preCopyCommand(ihip_signal, &depSignal, copyType); tprintf (DB_SYNC, " copy-async, waitFor=%lu completion=#%lu(%lu)\n", depSignalCnt? depSignal.handle:0x0, ihip_signal->_sig_id, ihip_signal->_hsa_signal.handle); @@ -2654,7 +2686,7 @@ hipError_t hipMemcpyAsync(void* dst, const void* src, size_t sizeBytes, hipMemcp e = hipErrorInvalidValue; } } else { - ihipSyncCopy(stream, dst, src, sizeBytes, kind); + stream->ihipSyncCopy(dst, src, sizeBytes, kind); } } } else { @@ -2905,3 +2937,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a } // TODO - review signal / error reporting code. +// TODO - describe naming convention. ihip _. No accessors. +// TODO - describe MT strategy +// +//