From 81da1cdb26227d252abb93071b158348e0b8f144 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 7 Feb 2024 13:46:13 +0100 Subject: [PATCH 1/9] JsonRpcConnection#Disconnect(): spawn coroutine only if necessary by checking the now atomic #m_ShuttingDown outside of it. --- lib/remote/jsonrpcconnection.cpp | 12 +++++------- lib/remote/jsonrpcconnection.hpp | 3 ++- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 3bae3cafd..c86d59eb0 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -190,12 +190,10 @@ void JsonRpcConnection::Disconnect() { namespace asio = boost::asio; - JsonRpcConnection::Ptr keepAlive (this); - - IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { - if (!m_ShuttingDown) { - m_ShuttingDown = true; + if (!m_ShuttingDown.exchange(true)) { + JsonRpcConnection::Ptr keepAlive (this); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; @@ -243,8 +241,8 @@ void JsonRpcConnection::Disconnect() shutdownTimeout->Cancel(); m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec); - } - }); + }); + } } void JsonRpcConnection::MessageHandler(const String& jsonString) diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 591ddcb1f..3515573bb 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -5,6 +5,7 @@ #include "remote/i2-remote.hpp" #include "remote/endpoint.hpp" +#include "base/atomic.hpp" #include "base/io-engine.hpp" #include "base/tlsstream.hpp" #include "base/timer.hpp" @@ -77,7 +78,7 @@ private: std::vector m_OutgoingMessagesQueue; AsioConditionVariable m_OutgoingMessagesQueued; AsioConditionVariable m_WriterDone; - bool m_ShuttingDown; + Atomic m_ShuttingDown; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; JsonRpcConnection(const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); From a6946f9dbf83af6ec7376cfa5cef98689a37ede0 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 7 Feb 2024 13:56:31 +0100 Subject: [PATCH 2/9] JsonRpcConnection#Send*(): discard messages ASAP once shutting down Especially ApiListener#ReplayLog() enqueued lots of messages into JsonRpcConnection#{m_IoStrand,m_OutgoingMessagesQueue} (RAM) even if the connection was shut(ting) down. Now #Disconnect() takes effect ASAP. --- lib/remote/jsonrpcconnection.cpp | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index c86d59eb0..2c67b78d0 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -165,6 +165,10 @@ ConnectionRole JsonRpcConnection::GetRole() const void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { + if (m_ShuttingDown) { + return; + } + Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); }); @@ -172,9 +176,17 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) void JsonRpcConnection::SendRawMessage(const String& message) { + if (m_ShuttingDown) { + return; + } + Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { + if (m_ShuttingDown) { + return; + } + m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueued.Set(); }); @@ -182,6 +194,10 @@ void JsonRpcConnection::SendRawMessage(const String& message) void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) { + if (m_ShuttingDown) { + return; + } + m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueued.Set(); } From 9a0c7d7c75526413b6354e58133c429d3da6cd91 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Fri, 9 Feb 2024 14:44:52 +0100 Subject: [PATCH 3/9] ApiListener#ReplayLog(): stop reading files ASAP on send error --- lib/remote/apilistener.cpp | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 85443e218..7d62c28bb 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -1459,7 +1459,9 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) return; } - for (;;) { + bool stopReplay = false; + + do { std::unique_lock lock(m_LogLock); CloseLogFile(); @@ -1542,6 +1544,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) Log(LogDebug, "ApiListener") << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); + stopReplay = true; break; } @@ -1563,6 +1566,10 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) } logStream->Close(); + + if (stopReplay) { + break; + } } if (count > 0) { @@ -1575,16 +1582,14 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) } if (last_sync) { - { - ObjectLock olock2(endpoint); - endpoint->SetSyncing(false); - } - OpenLogFile(); break; } - } + } while (!stopReplay); + + ObjectLock olock2 (endpoint); + endpoint->SetSyncing(false); } void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) From 5f2d31bf3c9e8105ba0002e4195f2eceb6f33de9 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 13 Feb 2024 09:30:26 +0100 Subject: [PATCH 4/9] Use `Defer` class for cleanup in `ApiListener::ReplayLog()` --- lib/remote/apilistener.cpp | 27 ++++++++------------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 7d62c28bb..e5530515b 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -1434,10 +1434,12 @@ void ApiListener::LogGlobHandler(std::vector& files, const String& file) void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) { Endpoint::Ptr endpoint = client->GetEndpoint(); + Defer resetEndpointSyncing ([&endpoint]() { + ObjectLock olock(endpoint); + endpoint->SetSyncing(false); + }); if (endpoint->GetLogDuration() == 0) { - ObjectLock olock2(endpoint); - endpoint->SetSyncing(false); return; } @@ -1454,14 +1456,10 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) Zone::Ptr target_zone = target_endpoint->GetZone(); if (!target_zone) { - ObjectLock olock2(endpoint); - endpoint->SetSyncing(false); return; } - bool stopReplay = false; - - do { + for (;;) { std::unique_lock lock(m_LogLock); CloseLogFile(); @@ -1543,9 +1541,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) Log(LogDebug, "ApiListener") << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); - - stopReplay = true; - break; + return; } peer_ts = pmessage->Get("timestamp"); @@ -1566,10 +1562,6 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) } logStream->Close(); - - if (stopReplay) { - break; - } } if (count > 0) { @@ -1584,12 +1576,9 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) if (last_sync) { OpenLogFile(); - break; + return; } - } while (!stopReplay); - - ObjectLock olock2 (endpoint); - endpoint->SetSyncing(false); + } } void ApiListener::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) From b0b41b1affaaca946b684442ec769c89d6e471d7 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Tue, 13 Feb 2024 09:31:40 +0100 Subject: [PATCH 5/9] Defer: Allow empty initialization & add `SetFunc()` method --- lib/base/defer.hpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/lib/base/defer.hpp b/lib/base/defer.hpp index 2a232619a..2625e02ff 100644 --- a/lib/base/defer.hpp +++ b/lib/base/defer.hpp @@ -22,6 +22,8 @@ public: { } + Defer() = default; + Defer(const Defer&) = delete; Defer(Defer&&) = delete; Defer& operator=(const Defer&) = delete; @@ -39,6 +41,11 @@ public: } } + inline void SetFunc(std::function func) + { + m_Func = std::move(func); + } + inline void Cancel() { From 02334c5f29477d9a6926851ce304df07c251c586 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Tue, 13 Feb 2024 09:33:47 +0100 Subject: [PATCH 6/9] Make sure log file is reopened when `ApiListener::ReplayLog()` returns --- lib/remote/apilistener.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index e5530515b..401ccb63e 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -1463,12 +1463,14 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) std::unique_lock lock(m_LogLock); CloseLogFile(); + Defer reopenLog; if (count == -1 || count > 50000) { OpenLogFile(); lock.unlock(); } else { last_sync = true; + reopenLog.SetFunc([this]() { OpenLogFile(); }); } count = 0; @@ -1574,8 +1576,6 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) } if (last_sync) { - OpenLogFile(); - return; } } From 561aedab1d91a3d966131b19ceb1f990176bb53d Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 21 Feb 2024 12:04:40 +0100 Subject: [PATCH 7/9] JsonRpcConnection: Raise an exception when trying to send to disconnected clients --- lib/remote/jsonrpcconnection.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 2c67b78d0..30f1a12db 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -166,7 +166,7 @@ ConnectionRole JsonRpcConnection::GetRole() const void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { if (m_ShuttingDown) { - return; + BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); } Ptr keepAlive (this); @@ -177,7 +177,7 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) void JsonRpcConnection::SendRawMessage(const String& message) { if (m_ShuttingDown) { - return; + BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); } Ptr keepAlive (this); From b9b3e7a925e4466ad5e1de7b0176dae1bb5ab153 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 21 Feb 2024 12:06:11 +0100 Subject: [PATCH 8/9] ApiListener: Catch & supress clients runtime errors --- lib/remote/apilistener.cpp | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 401ccb63e..2162c2120 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -1022,7 +1022,12 @@ void ApiListener::ApiTimerHandler() for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { if (client->GetTimestamp() == maxTs) { - client->SendMessage(lmessage); + try { + client->SendMessage(lmessage); + } catch (const std::runtime_error& ex) { + Log(LogNotice, "ApiListener") + << "Error while setting log position for identity '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false); + } } else { client->Disconnect(); } @@ -1194,7 +1199,12 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar if (client->GetTimestamp() != maxTs) continue; - client->SendMessage(message); + try { + client->SendMessage(message); + } catch (const std::runtime_error& ex) { + Log(LogNotice, "ApiListener") + << "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false); + } } } } From 96839d829b23daf0f1cdd4c4e282c274b2252f30 Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Wed, 21 Feb 2024 12:18:35 +0100 Subject: [PATCH 9/9] ApiListener: Reorder logging in `ApiTimerHandler()` --- lib/remote/apilistener.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 2162c2120..b689bf7a3 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -1020,6 +1020,10 @@ void ApiListener::ApiTimerHandler() maxTs = client->GetTimestamp(); } + Log(LogNotice, "ApiListener") + << "Setting log position for identity '" << endpoint->GetName() << "': " + << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts); + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { if (client->GetTimestamp() == maxTs) { try { @@ -1032,10 +1036,6 @@ void ApiListener::ApiTimerHandler() client->Disconnect(); } } - - Log(LogNotice, "ApiListener") - << "Setting log position for identity '" << endpoint->GetName() << "': " - << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts); } }