From 739ee5dc560a112312b796ab14219aeb9ecdf1d7 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Mon, 14 Jul 2025 17:59:34 +0200 Subject: [PATCH] EventsHandler(wip): start watchdog coroutine to detect disconnected clients TBD... --- lib/remote/eventqueue.cpp | 7 ++++++- lib/remote/eventshandler.cpp | 37 ++++++++++++++++++++++++++++++++++-- lib/remote/httpmessage.hpp | 13 ++++++++++++- 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/lib/remote/eventqueue.cpp b/lib/remote/eventqueue.cpp index c8674d89d..19d9f5fd4 100644 --- a/lib/remote/eventqueue.cpp +++ b/lib/remote/eventqueue.cpp @@ -136,8 +136,13 @@ std::map EventsInbox::m_Filters ({{"", EventsInbox: EventsRouter EventsRouter::m_Instance; EventsInbox::EventsInbox(String filter, const String& filterSource) - : m_Timer(IoEngine::Get().GetIoContext(), boost::posix_time::neg_infin) + : m_Timer(IoEngine::Get().GetIoContext()) { + // Initialize the timer to a negative infinity state, so that the first call to Shift() doesn't block + // until an event is pushed into the inbox in its first AsyncLock() invocation. This is necessary because + // we want Shift() to wait for events only on a proper timeout, which by default is 5 seconds. + m_Timer.expires_at(boost::posix_time::neg_infin); + std::unique_lock lock (m_FiltersMutex); m_Filter = m_Filters.find(filter); diff --git a/lib/remote/eventshandler.cpp b/lib/remote/eventshandler.cpp index 30a2f888d..aff930927 100644 --- a/lib/remote/eventshandler.cpp +++ b/lib/remote/eventshandler.cpp @@ -100,19 +100,52 @@ bool EventsHandler::HandleRequest( response.set(http::field::content_type, "application/json"); response.StartStreaming(); + /** + * We don't want to keep the connection alive once we return from this handler, as the client + * is expected to be gone at that point. Otherwise, this will cause a "broken pipe" error in + * the HttpServerConnection class when it tries to read from the stream again. + */ + request.keep_alive(false); + IoBoundWorkSlot dontLockTheIoThread (yc); auto writer = std::make_shared>(response); JsonEncoder encoder(writer); + /** + * Start a watchdog coroutine that will read from the hijacked stream until the client disconnects. + * Though, the read op should block like forever, since we don't expect the client to send any data + * at this point, it's just a way to keep track of the client liveness. Once the client disconnects, + * the read operation will complete with an error, and we can safely discard the inbox and stop processing events. + * + * This will ensure that the Shift operation below doesn't unnecessarily wait for events that are never going + * to be sent, and allows the HttpServerConnection to gracefully handle the disconnection. + * + * @note This coroutine is spawned on the very same executor used by the coroutine from which this handler is + * called (HttpServerConnection#m_IoStrand = yc.get_executor()), so we can safely perform I/O ops on the stream. + */ + IoEngine::SpawnCoroutine(yc.get_executor(), [inbox = subscriber.GetInbox(), &response](asio::yield_context yc) { + char buf[128]; + asio::mutable_buffer readBuf(buf, 128); + boost::system::error_code ec; + + const auto& stream(response.HijackStream()); + do { + stream->async_read_some(readBuf, yc[ec]); + } while (!ec); + + response.SetClientAlive(false); + inbox->Discard(yc); + }); + for (;;) { auto event(subscriber.GetInbox()->Shift(yc)); - if (event && response.IsWritable()) { + if (event) { encoder.Encode(event); response.body() << '\n'; response.Write(yc); - } else { + } else if (!response.IsWritable()) { return true; } } diff --git a/lib/remote/httpmessage.hpp b/lib/remote/httpmessage.hpp index 8b178f977..9073acbd8 100644 --- a/lib/remote/httpmessage.hpp +++ b/lib/remote/httpmessage.hpp @@ -286,7 +286,7 @@ public: */ void Write(boost::asio::yield_context yc); - bool IsWritable() const { return m_Stream->lowest_layer().is_open(); } + bool IsWritable() const { return m_Stream->lowest_layer().is_open() && m_ClientAlive; } /** * Resets the serializer of this message, so that it can be changed and written again. @@ -298,6 +298,16 @@ public: */ void StartStreaming(); + const Shared::Ptr& HijackStream() const + { + return m_Stream; + } + + void SetClientAlive(bool alive) + { + m_ClientAlive.store(alive); + } + void SendJsonBody(const Value& val, bool pretty = false); void SendJsonError(const int code, String info = String(), String diagInfo = String(), bool pretty = false, bool verbose = false); @@ -308,6 +318,7 @@ private: using Serializer = boost::beast::http::response_serializer; std::unique_ptr m_Serializer{new Serializer{*this}}; bool m_HeaderDone = false; + Atomic m_ClientAlive{true}; Shared::Ptr m_Stream; };