diff --git a/src/core/memory_pool.h b/src/core/memory_pool.h index 6d84305eae..27eae906e1 100644 --- a/src/core/memory_pool.h +++ b/src/core/memory_pool.h @@ -93,19 +93,22 @@ class MemoryPool { // Flush the records and block until they are all made visible to the client. void Flush() { - std::lock_guard producer_lock(producer_mutex_); - if (write_ptr_ == buffer_begin_) return; + { + std::lock_guard producer_lock(producer_mutex_); + if (write_ptr_ == buffer_begin_) return; - NotifyConsumerThread(buffer_begin_, write_ptr_); + NotifyConsumerThread(buffer_begin_, write_ptr_); - // Switch buffers - buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_; - buffer_end_ = buffer_begin_ + properties_.buffer_size; - write_ptr_ = buffer_begin_; - - // Wait for the current operation to complete. - std::unique_lock consumer_lock(consumer_mutex_); - consumer_cond_.wait(consumer_lock, [this]() { return !consumer_arg_.valid; }); + // Switch buffers + buffer_begin_ = (buffer_end_ == pool_end_) ? pool_begin_ : buffer_end_; + buffer_end_ = buffer_begin_ + properties_.buffer_size; + write_ptr_ = buffer_begin_; + } + { + // Wait for the current operation to complete. + std::unique_lock consumer_lock(consumer_mutex_); + consumer_cond_.wait(consumer_lock, [this]() { return !consumer_arg_.valid; }); + } } private: @@ -124,10 +127,11 @@ class MemoryPool { properties_.buffer_callback_fun(consumer_arg_.begin, consumer_arg_.end, properties_.buffer_callback_arg); - // Mark this operation as complete (valid=false) and notify a producer, if any, that may be - // waiting to start a new operation. See comment below in NotifyConsumerThread(). + // Mark this operation as complete (valid=false) and notify all producers that may be + // waiting for this operation to finish, or to start a new operation. See comment below in + // NotifyConsumerThread(). consumer_arg_.valid = false; - consumer_cond_.notify_one(); + consumer_cond_.notify_all(); } }