EventsHandler(wip): start watchdog coroutine to detect disconnected clients

TBD...
This commit is contained in:
Yonas Habteab 2025-07-14 17:59:34 +02:00
parent a8553dd4d6
commit 739ee5dc56
3 changed files with 53 additions and 4 deletions

View File

@ -136,8 +136,13 @@ std::map<String, EventsInbox::Filter> EventsInbox::m_Filters ({{"", EventsInbox:
EventsRouter EventsRouter::m_Instance;
EventsInbox::EventsInbox(String filter, const String& filterSource)
: m_Timer(IoEngine::Get().GetIoContext(), boost::posix_time::neg_infin)
: m_Timer(IoEngine::Get().GetIoContext())
{
// Initialize the timer to a negative infinity state, so that the first call to Shift() doesn't block
// until an event is pushed into the inbox in its first AsyncLock() invocation. This is necessary because
// we want Shift() to wait for events only on a proper timeout, which by default is 5 seconds.
m_Timer.expires_at(boost::posix_time::neg_infin);
std::unique_lock<std::mutex> lock (m_FiltersMutex);
m_Filter = m_Filters.find(filter);

View File

@ -100,19 +100,52 @@ bool EventsHandler::HandleRequest(
response.set(http::field::content_type, "application/json");
response.StartStreaming();
/**
* We don't want to keep the connection alive once we return from this handler, as the client
* is expected to be gone at that point. Otherwise, this will cause a "broken pipe" error in
* the HttpServerConnection class when it tries to read from the stream again.
*/
request.keep_alive(false);
IoBoundWorkSlot dontLockTheIoThread (yc);
auto writer = std::make_shared<BeastHttpMessageAdapter<HttpResponse>>(response);
JsonEncoder encoder(writer);
/**
* Start a watchdog coroutine that will read from the hijacked stream until the client disconnects.
* Though, the read op should block like forever, since we don't expect the client to send any data
* at this point, it's just a way to keep track of the client liveness. Once the client disconnects,
* the read operation will complete with an error, and we can safely discard the inbox and stop processing events.
*
* This will ensure that the Shift operation below doesn't unnecessarily wait for events that are never going
* to be sent, and allows the HttpServerConnection to gracefully handle the disconnection.
*
* @note This coroutine is spawned on the very same executor used by the coroutine from which this handler is
* called (HttpServerConnection#m_IoStrand = yc.get_executor()), so we can safely perform I/O ops on the stream.
*/
IoEngine::SpawnCoroutine(yc.get_executor(), [inbox = subscriber.GetInbox(), &response](asio::yield_context yc) {
char buf[128];
asio::mutable_buffer readBuf(buf, 128);
boost::system::error_code ec;
const auto& stream(response.HijackStream());
do {
stream->async_read_some(readBuf, yc[ec]);
} while (!ec);
response.SetClientAlive(false);
inbox->Discard(yc);
});
for (;;) {
auto event(subscriber.GetInbox()->Shift(yc));
if (event && response.IsWritable()) {
if (event) {
encoder.Encode(event);
response.body() << '\n';
response.Write(yc);
} else {
} else if (!response.IsWritable()) {
return true;
}
}

View File

@ -286,7 +286,7 @@ public:
*/
void Write(boost::asio::yield_context yc);
bool IsWritable() const { return m_Stream->lowest_layer().is_open(); }
bool IsWritable() const { return m_Stream->lowest_layer().is_open() && m_ClientAlive; }
/**
* Resets the serializer of this message, so that it can be changed and written again.
@ -298,6 +298,16 @@ public:
*/
void StartStreaming();
const Shared<AsioTlsStream>::Ptr& HijackStream() const
{
return m_Stream;
}
void SetClientAlive(bool alive)
{
m_ClientAlive.store(alive);
}
void SendJsonBody(const Value& val, bool pretty = false);
void SendJsonError(const int code, String info = String(), String diagInfo = String(),
bool pretty = false, bool verbose = false);
@ -308,6 +318,7 @@ private:
using Serializer = boost::beast::http::response_serializer<HttpResponse::body_type>;
std::unique_ptr<Serializer> m_Serializer{new Serializer{*this}};
bool m_HeaderDone = false;
Atomic<bool> m_ClientAlive{true};
Shared<AsioTlsStream>::Ptr m_Stream;
};