EventsInbox: simplify Shift & make it cancellable

This simplifies the `Shift()` method and makes it also cancellable from
external. Apart from that, there were a potential race condition in the
`Shift` method, where it called `m_Timer.expires_at(boost::posix_time::neg_infin)`
without holding the lock, and vilated so the boost Asio operationguarantee on
the shared objects.
This commit is contained in:
Yonas Habteab 2025-07-14 17:52:58 +02:00
parent 1d29260d77
commit a8553dd4d6
2 changed files with 71 additions and 19 deletions

View File

@ -136,7 +136,7 @@ std::map<String, EventsInbox::Filter> EventsInbox::m_Filters ({{"", EventsInbox:
EventsRouter EventsRouter::m_Instance;
EventsInbox::EventsInbox(String filter, const String& filterSource)
: m_Timer(IoEngine::Get().GetIoContext())
: m_Timer(IoEngine::Get().GetIoContext(), boost::posix_time::neg_infin)
{
std::unique_lock<std::mutex> lock (m_FiltersMutex);
m_Filter = m_Filters.find(filter);
@ -176,40 +176,68 @@ const Expression::Ptr& EventsInbox::GetFilter()
void EventsInbox::Push(Dictionary::Ptr event)
{
if (m_Discarded.load(std::memory_order_relaxed)) {
return; // If the inbox has been discarded, do not push any events.
}
std::unique_lock<std::mutex> lock (m_Mutex);
m_Queue.emplace(std::move(event));
m_Timer.expires_at(boost::posix_time::neg_infin);
}
Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout)
/**
* Discards all events in the inbox and sets the inbox to a discarded state.
*
* This method discards all events currently in the inbox and cancels @c Shift() operation that might be waiting
* for an event to be pushed into the inbox. After this method is called, any subsequent calls to @c Shift() will
* return nullptr immediately, indicating that the inbox has been discarded and no further events will be processed.
*
* This method is typically used when the inbox is no longer needed, as it allows for immediate wake-up of any
* waiting operations in @c Shift() and prevents further processing of events.
*
* @param yc The yield_context for this operation. This method must be called from within a coroutine that uses @c yc.
*/
void EventsInbox::Discard(boost::asio::yield_context& yc)
{
std::unique_lock<std::mutex> lock (m_Mutex, std::defer_lock);
m_Discarded.store(true, std::memory_order_relaxed);
auto lock(AsyncLock(yc));
m_Timer.expires_at(boost::posix_time::neg_infin);
}
{
boost::system::error_code ec;
while (!lock.try_lock()) {
m_Timer.async_wait(yc[ec]);
}
/**
* Shifts the first event from the inbox, blocking until an event is available or the timeout expires.
*
* This method attempts to retrieve the first event from the inbox. If the inbox is empty, it will block until
* either an event is pushed into the inbox or the specified timeout expires. If the inbox has been discarded,
* this method will return nullptr immediately.
*
* @param yc The yield_context for this operation. This method must be called from within a coroutine that uses @c yc.
* @param timeout The maximum time to wait for an event in seconds. Defaults to 5 seconds.
*
* @return A pointer to the first event in the inbox, or nullptr if the inbox has been discarded or no events are available.
*/
Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context& yc, double timeout)
{
if (m_Discarded.load(std::memory_order_relaxed)) {
return nullptr; // Nothing to shift, inbox has been discarded.
}
auto lock(AsyncLock(yc));
if (m_Queue.empty()) {
m_Timer.expires_from_now(boost::posix_time::milliseconds((unsigned long)(timeout * 1000.0)));
m_Timer.expires_from_now(boost::posix_time::milliseconds(static_cast<long>(timeout * 1000.0)));
lock.unlock();
{
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
boost::system::error_code ec;
m_Timer.async_wait(yc[ec]);
while (!lock.try_lock()) {
m_Timer.async_wait(yc[ec]);
}
}
// Someone has woken us up (either by pushing an event, by timeout or by Discard()), so we need
// to re-acquire the lock to check the queue again and return the event if not discarded.
lock = AsyncLock(yc);
if (m_Queue.empty()) {
if (m_Discarded || m_Queue.empty()) {
return nullptr;
}
}
@ -219,6 +247,26 @@ Dictionary::Ptr EventsInbox::Shift(boost::asio::yield_context yc, double timeout
return event;
}
/**
* Asynchronously acquires a unique lock on @c m_Mutex and returns it.
*
* @param yc The yield_context for this operation.
*
* @return A unique_lock on @c m_Mutex that is acquired asynchronously.
*/
std::unique_lock<std::mutex> EventsInbox::AsyncLock(boost::asio::yield_context& yc)
{
std::unique_lock lock(m_Mutex, std::defer_lock);
boost::system::error_code ec;
// Try to acquire the lock without blocking, since there might be some mutex contention,
// and we want to avoid blocking the I/O thread for too long.
while (!lock.try_lock()) {
m_Timer.async_wait(yc[ec]);
}
return std::move(lock);
}
EventsSubscriber::EventsSubscriber(std::set<EventType> types, String filter, const String& filterSource)
: m_Types(std::move(types)), m_Inbox(new EventsInbox(std::move(filter), filterSource))
{

View File

@ -99,9 +99,12 @@ public:
const Expression::Ptr& GetFilter();
void Push(Dictionary::Ptr event);
Dictionary::Ptr Shift(boost::asio::yield_context yc, double timeout = 5);
void Discard(boost::asio::yield_context& yc);
Dictionary::Ptr Shift(boost::asio::yield_context& yc, double timeout = 5);
private:
std::unique_lock<std::mutex> AsyncLock(boost::asio::yield_context& yc);
struct Filter
{
std::size_t Refs;
@ -114,6 +117,7 @@ private:
std::mutex m_Mutex;
decltype(m_Filters.begin()) m_Filter;
std::queue<Dictionary::Ptr> m_Queue;
Atomic<bool> m_Discarded{false}; // Set when Discard() is called, so that Shift() can return nullptr instead of blocking indefinitely.
boost::asio::deadline_timer m_Timer;
};