diff --git a/lib/remote/eventqueue.cpp b/lib/remote/eventqueue.cpp index 819f95a6a..c8674d89d 100644 --- a/lib/remote/eventqueue.cpp +++ b/lib/remote/eventqueue.cpp @@ -136,7 +136,7 @@ std::map EventsInbox::m_Filters ({{"", EventsInbox: EventsRouter EventsRouter::m_Instance; EventsInbox::EventsInbox(String filter, const String& filterSource) - : m_Timer(IoEngine::Get().GetIoContext()) + : m_Timer(IoEngine::Get().GetIoContext(), boost::posix_time::neg_infin) { std::unique_lock lock (m_FiltersMutex); m_Filter = m_Filters.find(filter); @@ -176,40 +176,68 @@ const Expression::Ptr& EventsInbox::GetFilter() void EventsInbox::Push(Dictionary::Ptr event) { + if (m_Discarded.load(std::memory_order_relaxed)) { + return; // If the inbox has been discarded, do not push any events. + } + std::unique_lock lock (m_Mutex); m_Queue.emplace(std::move(event)); m_Timer.expires_at(boost::posix_time::neg_infin); } -Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout) +/** + * Discards all events in the inbox and sets the inbox to a discarded state. + * + * This method discards all events currently in the inbox and cancels @c Shift() operation that might be waiting + * for an event to be pushed into the inbox. After this method is called, any subsequent calls to @c Shift() will + * return nullptr immediately, indicating that the inbox has been discarded and no further events will be processed. + * + * This method is typically used when the inbox is no longer needed, as it allows for immediate wake-up of any + * waiting operations in @c Shift() and prevents further processing of events. + * + * @param yc The yield_context for this operation. This method must be called from within a coroutine that uses @c yc. + */ +void EventsInbox::Discard(boost::asio::yield_context& yc) { - std::unique_lock lock (m_Mutex, std::defer_lock); + m_Discarded.store(true, std::memory_order_relaxed); + auto lock(AsyncLock(yc)); m_Timer.expires_at(boost::posix_time::neg_infin); +} - { - boost::system::error_code ec; - - while (!lock.try_lock()) { - m_Timer.async_wait(yc[ec]); - } +/** + * Shifts the first event from the inbox, blocking until an event is available or the timeout expires. + * + * This method attempts to retrieve the first event from the inbox. If the inbox is empty, it will block until + * either an event is pushed into the inbox or the specified timeout expires. If the inbox has been discarded, + * this method will return nullptr immediately. + * + * @param yc The yield_context for this operation. This method must be called from within a coroutine that uses @c yc. + * @param timeout The maximum time to wait for an event in seconds. Defaults to 5 seconds. + * + * @return A pointer to the first event in the inbox, or nullptr if the inbox has been discarded or no events are available. + */ +Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context& yc, double timeout) +{ + if (m_Discarded.load(std::memory_order_relaxed)) { + return nullptr; // Nothing to shift, inbox has been discarded. } + auto lock(AsyncLock(yc)); + if (m_Queue.empty()) { - m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0))); + m_Timer.expires_from_now(boost::posix_time::milliseconds(static_cast(timeout * 1000.0))); lock.unlock(); - { - boost::system::error_code ec; - m_Timer.async_wait(yc[ec]); + boost::system::error_code ec; + m_Timer.async_wait(yc[ec]); - while (!lock.try_lock()) { - m_Timer.async_wait(yc[ec]); - } - } + // Someone has woken us up (either by pushing an event, by timeout or by Discard()), so we need + // to re-acquire the lock to check the queue again and return the event if not discarded. + lock = AsyncLock(yc); - if (m_Queue.empty()) { + if (m_Discarded || m_Queue.empty()) { return nullptr; } } @@ -219,6 +247,26 @@ Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout return event; } +/** + * Asynchronously acquires a unique lock on @c m_Mutex and returns it. + * + * @param yc The yield_context for this operation. + * + * @return A unique_lock on @c m_Mutex that is acquired asynchronously. + */ +std::unique_lock EventsInbox::AsyncLock(boost::asio::yield_context& yc) +{ + std::unique_lock lock(m_Mutex, std::defer_lock); + + boost::system::error_code ec; + // Try to acquire the lock without blocking, since there might be some mutex contention, + // and we want to avoid blocking the I/O thread for too long. + while (!lock.try_lock()) { + m_Timer.async_wait(yc[ec]); + } + return std::move(lock); +} + EventsSubscriber::EventsSubscriber(std::set types, String filter, const String& filterSource) : m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource)) { diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp index 833714f9d..34a79d80f 100644 --- a/lib/remote/eventqueue.hpp +++ b/lib/remote/eventqueue.hpp @@ -99,9 +99,12 @@ public: const Expression::Ptr& GetFilter(); void Push(Dictionary::Ptr event); - Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5); + void Discard(boost::asio::yield_context& yc); + Dictionary::Ptr Shift(boost::asio::yield_context& yc, double timeout = 5); private: + std::unique_lock AsyncLock(boost::asio::yield_context& yc); + struct Filter { std::size_t Refs; @@ -114,6 +117,7 @@ private: std::mutex m_Mutex; decltype(m_Filters.begin()) m_Filter; std::queue m_Queue; + Atomic m_Discarded{false}; // Set when Discard() is called, so that Shift() can return nullptr instead of blocking indefinitely. boost::asio::deadline_timer m_Timer; };