- Refactor ihipStream in prep for thread-safe implementation.
- Do some work on PinInPlace implementation.


[ROCm/clr commit: 8acb53e160]
Этот коммит содержится в:
Ben Sander
2016-03-16 21:16:29 -05:00
родитель 0573890c39
Коммит 931b58f438
+76 -40
Просмотреть файл
@@ -201,6 +201,8 @@ typedef FakeMutex StreamMutex;
#endif
// TODO - move async copy code into stream? Stream->async-copy.
// Add PreCopy / PostCopy to manage locks?
// Internal stream structure.
class ihipStream_t {
@@ -209,28 +211,42 @@ public:
ihipStream_t(unsigned device_index, hc::accelerator_view av, unsigned int flags);
~ihipStream_t();
inline void reclaimSignals(SIGSEQNUM sigNum);
inline void waitAndReclaimOlder(ihipSignal_t *signal);
inline void wait();
inline ihipDevice_t * getDevice() const;
ihipSignal_t * getSignal() ;
void ihipSyncCopy(void* dst, const void* src, size_t sizeBytes, hipMemcpyKind kind);
//---
// Thread-safe accessors - these acquire / release mutex:
inline bool preKernelCommand();
inline void postKernelCommand(hc::completion_future &kernel_future);
inline int copyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType);
inline void resetToEmpty();
inline int preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType);
inline void postCopyCommand();
inline void reclaimSignals_ts(SIGSEQNUM sigNum);
inline void wait();
// Non-threadsafe accessors - must be protected by high-level stream lock:
inline SIGSEQNUM lastCopySeqId() { return _last_copy_signal ? _last_copy_signal->_sig_id : 0; };
ihipSignal_t * allocSignal();
//-- Non-racy accessors:
// These functions access fields set at initialization time and are non-racy (so do not acquire mutex)
inline ihipDevice_t * getDevice() const;
StreamMutex & mutex() {return _mutex;};
//---
//Member vars - these are set at initialization:
hc::accelerator_view _av;
unsigned _flags;
private:
void enqueueBarrier(hsa_queue_t* queue, ihipSignal_t *depSignal);
inline void waitCopy(ihipSignal_t *signal);
inline void resetToEmpty();
//---
unsigned _device_index;
ihipCommand_t _last_command_type; // type of the last command
@@ -417,7 +433,7 @@ void ihipStream_t::resetToEmpty()
}
//---
void ihipStream_t::reclaimSignals(SIGSEQNUM sigNum)
void ihipStream_t::reclaimSignals_ts(SIGSEQNUM sigNum)
{
tprintf(DB_SIGNAL, "reclaim signal #%lu\n", sigNum);
// Mark all signals older and including this one as available for
@@ -426,23 +442,27 @@ void ihipStream_t::reclaimSignals(SIGSEQNUM sigNum)
//---
void ihipStream_t::waitAndReclaimOlder(ihipSignal_t *signal)
void ihipStream_t::waitCopy(ihipSignal_t *signal)
{
hsa_signal_wait_acquire(_last_copy_signal->_hsa_signal, HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
reclaimSignals(_last_copy_signal->_sig_id);
SIGSEQNUM sigNum = _last_copy_signal->_sig_id;
tprintf(DB_SIGNAL, "reclaim signal #%lu\n", sigNum);
// Mark all signals older and including this one as available for
_oldest_live_sig_id = sigNum+1;
}
//---
//Wait for all queues kernels in the associated accelerator_view to complete.
//Wait for all kernel and data copy commands in this stream to complete.
void ihipStream_t::wait()
{
tprintf (DB_SYNC, "stream %p wait for queue-empty and lastCopy:#%lu...\n", this, _last_copy_signal ? _last_copy_signal->_sig_id: 0x0 );
_av.wait();
if (_last_copy_signal) {
this->waitAndReclaimOlder(_last_copy_signal);
this->waitCopy(_last_copy_signal);
}
resetToEmpty();
@@ -464,7 +484,7 @@ inline ihipDevice_t * ihipStream_t::getDevice() const
// Allocate a new signal from the signal pool.
// Returned signals have value of 0.
// Signals are intended for use in this stream and are always reclaimed "in-order".
ihipSignal_t *ihipStream_t::getSignal()
ihipSignal_t *ihipStream_t::allocSignal()
{
int numToScan = _signalPool.size();
do {
@@ -489,7 +509,7 @@ ihipSignal_t *ihipStream_t::getSignal()
_signalCursor = _signalPool.size(); // set to the beginning of the new entries:
_signalPool.resize(_signalPool.size() * 2);
tprintf (DB_SIGNAL, "grow signal pool to %zu entries, cursor=%d\n", _signalPool.size(), _signalCursor);
return getSignal(); // try again,
return allocSignal(); // try again,
// Should never reach here.
assert(0);
@@ -551,7 +571,7 @@ inline bool ihipStream_t::preKernelCommand()
} else if (HIP_DISABLE_HW_KERNEL_DEP>0) {
tprintf (DB_SYNC, "stream %p switch %s to %s (HOST wait for previous...)\n",
this, ihipCommandName[_last_command_type], ihipCommandName[ihipCommandKernel]);
this->waitAndReclaimOlder(_last_copy_signal);
this->waitCopy(_last_copy_signal);
} else if (HIP_DISABLE_HW_KERNEL_DEP==-1) {
tprintf (DB_SYNC, "stream %p switch %s to %s (IGNORE dependency)\n",
this, ihipCommandName[_last_command_type], ihipCommandName[ihipCommandKernel]);
@@ -577,11 +597,14 @@ inline void ihipStream_t::postKernelCommand(hc::completion_future &kernelFuture)
//---
// Called whenever a copy command is set to the stream.
// Examines the last command sent to this stream and returns a signal to wait on, if required.
inline int ihipStream_t::copyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType)
int ihipStream_t::preCopyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitSignal, ihipCommand_t copyType)
{
int needSync = 0;
waitSignal->handle = 0;
//_mutex.lock(); // will be unlocked in postCopyCommand
// If switching command types, we need to add a barrier packet to synchronize things.
if (FORCE_SAMEDIR_COPY_DEP || (_last_command_type != copyType)) {
@@ -622,6 +645,13 @@ inline int ihipStream_t::copyCommand(ihipSignal_t *lastCopy, hsa_signal_t *waitS
}
//---
inline void ihipStream_t::postCopyCommand()
{
//_mutex.unlock();
}
//=================================================================================================
//
//Reset the device - this is called from hipDeviceReset.
@@ -1139,9 +1169,6 @@ inline hipStream_t ihipSyncAndResolveStream(hipStream_t stream)
// TODO - data-up to data-down:
// Called just before a kernel is launched from hipLaunchKernel.
// Allows runtime to track some information about the stream.
@@ -1351,8 +1378,6 @@ hipError_t hipDeviceReset(void)
device->reset(); // re-allocate required resources.
}
// TODO - reset all streams on the device.
return ihipLogStatus(hipSuccess);
}
@@ -1472,6 +1497,8 @@ hipError_t hipGetLastError()
ihipLogStatus(hipSuccess);
}
//---
hipError_t hipPeakAtLastError()
{
std::call_once(hip_initialized, ihipInit);
@@ -1547,6 +1574,8 @@ hipError_t hipStreamCreateWithFlags(hipStream_t *stream, unsigned int flags)
return ihipLogStatus(hipSuccess);
}
//---
/**
* @bug This function conservatively waits for all work in the specified stream to complete.
*/
@@ -1558,7 +1587,9 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int
hipError_t e = hipSuccess;
{
// Super-conservative version of this - TODO - remove me:
// TODO-hcc Convert to use create_blocking_marker(...) functionality.
// Currently we have a super-conservative version of this - block on host, and drain the queue.
// This should create a barrier packet in the target queue.
stream->wait();
e = hipSuccess;
}
@@ -1567,6 +1598,7 @@ hipError_t hipStreamWaitEvent(hipStream_t stream, hipEvent_t event, unsigned int
};
//---
hipError_t hipStreamSynchronize(hipStream_t stream)
{
std::call_once(hip_initialized, ihipInit);
@@ -1740,7 +1772,7 @@ hipError_t hipEventSynchronize(hipEvent_t event)
#else
eh->_marker.wait();
#endif
eh->_stream->reclaimSignals(eh->_copy_seq_id);
eh->_stream->reclaimSignals_ts(eh->_copy_seq_id);
return ihipLogStatus(hipSuccess);
}
@@ -2055,9 +2087,8 @@ hipError_t hipHostAlloc(void** ptr, size_t sizeBytes, unsigned int flags){
if(device){
if(flags & hipHostAllocDefault){
const unsigned am_flags = amHostPinned;
*ptr = hc::am_alloc(sizeBytes, device->_acc, am_flags);
*ptr = hc::am_alloc(sizeBytes, device->_acc, amHostPinned);
if(sizeBytes && (*ptr == NULL)){
hip_status = hipErrorMemoryAllocation;
}else{
@@ -2066,9 +2097,8 @@ hipError_t hipHostAlloc(void** ptr, size_t sizeBytes, unsigned int flags){
tprintf(DB_MEM, " %s: pinned ptr=%p\n", __func__, *ptr);
}
if(flags & hipHostAllocMapped){
const unsigned am_flags = amHostPinned;
*ptr = hc::am_alloc(sizeBytes, device->_acc, am_flags);
*ptr = hc::am_alloc(sizeBytes, device->_acc, amHostPinned);
if(sizeBytes && (*ptr == NULL)){
hip_status = hipErrorMemoryAllocation;
}else{
@@ -2189,7 +2219,7 @@ hipError_t hipMemcpyToSymbol(const char* symbolName, const void *src, size_t cou
auto device = ihipGetTlsDefaultDevice();
//hsa_signal_t depSignal;
//int depSignalCnt = device._null_stream->copyCommand(NULL, &depSignal, ihipCommandCopyH2D);
//int depSignalCnt = device._null_stream->preCopyCommand(NULL, &depSignal, ihipCommandCopyH2D);
assert(0); // Need to properly synchronize the copy - do something with depSignal if != NULL.
device->_acc.memcpy_symbol(symbolName, (void*) src,count, offset);
@@ -2255,12 +2285,14 @@ void StagingBuffer::CopyHostToDevicePinInPlace(void* dst, const void* src, size_
tprintf (DB_COPY2, "H2D: waiting... on completion signal handle=%lu\n", _completion_signal[bufferIndex].handle);
hsa_signal_wait_acquire(_completion_signal[bufferIndex], HSA_SIGNAL_CONDITION_LT, 1, UINT64_MAX, HSA_WAIT_STATE_ACTIVE);
tprintf (DB_COPY2, "H2D: bytesRemaining=%zu: pin-in-place:%p+%zu bufferIndex[%d]\n", bytesRemaining, srcp, theseBytes, bufferIndex);
memcpy(_pinnedStagingBuffer[bufferIndex], srcp, theseBytes);
void * masked_srcp = (void*) ((uintptr_t)srcp & (uintptr_t)(~0x3f)) ; // TODO
void *locked_srcp;
hsa_status_t hsa_status = hsa_amd_memory_lock(const_cast<char *> (srcp), theseBytes, &_device->_hsa_agent, 1, &locked_srcp);
hsa_status_t hsa_status = hsa_amd_memory_lock(masked_srcp, theseBytes, &_device->_hsa_agent, 1, &locked_srcp);
//hsa_status_t hsa_status = hsa_amd_memory_lock(const_cast<char*> (srcp), theseBytes, &_device->_hsa_agent, 1, &locked_srcp);
tprintf (DB_COPY2, "H2D: bytesRemaining=%zu: pin-in-place:%p+%zu bufferIndex[%d]\n", bytesRemaining, srcp, theseBytes, bufferIndex);
printf ("status=%x srcp=%p, masked_srcp=%p, locked_srcp=%p\n", hsa_status, srcp, masked_srcp, locked_srcp);
if (hsa_status != HSA_STATUS_SUCCESS) {
throw (ihipException(hipErrorUnknown));
@@ -2416,9 +2448,9 @@ void StagingBuffer::CopyDeviceToHost(void* dst, const void* src, size_t sizeByte
void ihipSyncCopy(ihipStream_t *stream, void* dst, const void* src, size_t sizeBytes, hipMemcpyKind kind)
void ihipStream_t::ihipSyncCopy(void* dst, const void* src, size_t sizeBytes, hipMemcpyKind kind)
{
ihipDevice_t *device = stream->getDevice();
ihipDevice_t *device = this->getDevice();
if (device == NULL) {
@@ -2451,7 +2483,7 @@ void ihipSyncCopy(ihipStream_t *stream, void* dst, const void* src, size_t sizeB
}
hsa_signal_t depSignal;
int depSignalCnt = stream->copyCommand(NULL, &depSignal, ihipCommandCopyH2D);
int depSignalCnt = this->preCopyCommand(NULL, &depSignal, ihipCommandCopyH2D);
if ((kind == hipMemcpyHostToDevice) && (srcNotTracked)) {
@@ -2466,7 +2498,7 @@ void ihipSyncCopy(ihipStream_t *stream, void* dst, const void* src, size_t sizeB
}
// The copy waits for inputs and then completes before returning.
stream->resetToEmpty();
this->resetToEmpty();
} else {
// TODO - remove, slow path.
tprintf(DB_COPY1, "H2D && srcNotTracked: am_copy dst=%p src=%p sz=%zu\n", dst, src, sizeBytes);
@@ -2534,7 +2566,7 @@ hipError_t hipMemcpy(void* dst, const void* src, size_t sizeBytes, hipMemcpyKind
hipError_t e = hipSuccess;
try {
ihipSyncCopy(stream, dst, src, sizeBytes, kind);
stream->ihipSyncCopy(dst, src, sizeBytes, kind);
}
catch (ihipException ex) {
e = ex._code;
@@ -2616,7 +2648,7 @@ hipError_t hipMemcpyAsync(void* dst, const void* src, size_t sizeBytes, hipMemcp
return hipErrorInvalidMemcpyDirection;
}
}
ihipSignal_t *ihip_signal = stream->getSignal();
ihipSignal_t *ihip_signal = stream->allocSignal();
hsa_signal_store_relaxed(ihip_signal->_hsa_signal, 1);
ihipCommand_t copyType;
@@ -2635,7 +2667,7 @@ hipError_t hipMemcpyAsync(void* dst, const void* src, size_t sizeBytes, hipMemcp
if(trueAsync == true){
hsa_signal_t depSignal;
int depSignalCnt = stream->copyCommand(ihip_signal, &depSignal, copyType);
int depSignalCnt = stream->preCopyCommand(ihip_signal, &depSignal, copyType);
tprintf (DB_SYNC, " copy-async, waitFor=%lu completion=#%lu(%lu)\n", depSignalCnt? depSignal.handle:0x0, ihip_signal->_sig_id, ihip_signal->_hsa_signal.handle);
@@ -2654,7 +2686,7 @@ hipError_t hipMemcpyAsync(void* dst, const void* src, size_t sizeBytes, hipMemcp
e = hipErrorInvalidValue;
}
} else {
ihipSyncCopy(stream, dst, src, sizeBytes, kind);
stream->ihipSyncCopy(dst, src, sizeBytes, kind);
}
}
} else {
@@ -2905,3 +2937,7 @@ hipError_t hipHccGetAcceleratorView(hipStream_t stream, hc::accelerator_view **a
}
// TODO - review signal / error reporting code.
// TODO - describe naming convention. ihip _. No accessors.
// TODO - describe MT strategy
//
//