Allow MemoryPool::Write while Flushing

Before this change, when a producer was blocked by a flush operation,
no other producer could write to the memory pool.  This change allows
other producer threads to continue to write by releasing the producer
lock before waiting on the consumer condition variable.

Change-Id: Idc1c07173d2edb18fbe1a61961f10c02e7ca8c20
This commit is contained in:
Laurent Morichetti
2022-04-21 18:12:30 -07:00
parent 121a84b449
commit dc8717a6b5
+18 -14
View File
@@ -93,19 +93,22 @@ class MemoryPool {
// Flush the records and block until they are all made visible to the client.
void Flush() {
std::lock_guard producer_lock(producer_mutex_);
if (write_ptr_ == buffer_begin_) return;
{
std::lock_guard producer_lock(producer_mutex_);
if (write_ptr_ == buffer_begin_) return;
NotifyConsumerThread(buffer_begin_, write_ptr_);
NotifyConsumerThread(buffer_begin_, write_ptr_);
// Switch buffers
buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_;
buffer_end_ = buffer_begin_ + properties_.buffer_size;
write_ptr_ = buffer_begin_;
// Wait for the current operation to complete.
std::unique_lock consumer_lock(consumer_mutex_);
consumer_cond_.wait(consumer_lock, [this]() { return !consumer_arg_.valid; });
// Switch buffers
buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_;
buffer_end_ = buffer_begin_ + properties_.buffer_size;
write_ptr_ = buffer_begin_;
}
{
// Wait for the current operation to complete.
std::unique_lock consumer_lock(consumer_mutex_);
consumer_cond_.wait(consumer_lock, [this]() { return !consumer_arg_.valid; });
}
}
private:
@@ -124,10 +127,11 @@ class MemoryPool {
properties_.buffer_callback_fun(consumer_arg_.begin, consumer_arg_.end,
properties_.buffer_callback_arg);
// Mark this operation as complete (valid=false) and notify a producer, if any, that may be
// waiting to start a new operation. See comment below in NotifyConsumerThread().
// Mark this operation as complete (valid=false) and notify all producers that may be
// waiting for this operation to finish, or to start a new operation. See comment below in
// NotifyConsumerThread().
consumer_arg_.valid = false;
consumer_cond_.notify_one();
consumer_cond_.notify_all();
}
}