diff --git a/lib/base/defer.hpp b/lib/base/defer.hpp index 2a232619a..2625e02ff 100644 --- a/lib/base/defer.hpp +++ b/lib/base/defer.hpp @@ -22,6 +22,8 @@ public: { } + Defer() = default; + Defer(const Defer&) = delete; Defer(Defer&&) = delete; Defer& operator=(const Defer&) = delete; @@ -39,6 +41,11 @@ public: } } + inline void SetFunc(std::function func) + { + m_Func = std::move(func); + } + inline void Cancel() { diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 85443e218..b689bf7a3 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -1020,17 +1020,22 @@ void ApiListener::ApiTimerHandler() maxTs = client->GetTimestamp(); } + Log(LogNotice, "ApiListener") + << "Setting log position for identity '" << endpoint->GetName() << "': " + << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts); + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { if (client->GetTimestamp() == maxTs) { - client->SendMessage(lmessage); + try { + client->SendMessage(lmessage); + } catch (const std::runtime_error& ex) { + Log(LogNotice, "ApiListener") + << "Error while setting log position for identity '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false); + } } else { client->Disconnect(); } } - - Log(LogNotice, "ApiListener") - << "Setting log position for identity '" << endpoint->GetName() << "': " - << Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", ts); } } @@ -1194,7 +1199,12 @@ void ApiListener::SyncSendMessage(const Endpoint::Ptr& endpoint, const Dictionar if (client->GetTimestamp() != maxTs) continue; - client->SendMessage(message); + try { + client->SendMessage(message); + } catch (const std::runtime_error& ex) { + Log(LogNotice, "ApiListener") + << "Error while sending message to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex, false); + } } } } @@ -1434,10 +1444,12 @@ void ApiListener::LogGlobHandler(std::vector& files, const String& file) void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) { Endpoint::Ptr endpoint = client->GetEndpoint(); + Defer resetEndpointSyncing ([&endpoint]() { + ObjectLock olock(endpoint); + endpoint->SetSyncing(false); + }); if (endpoint->GetLogDuration() == 0) { - ObjectLock olock2(endpoint); - endpoint->SetSyncing(false); return; } @@ -1454,8 +1466,6 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) Zone::Ptr target_zone = target_endpoint->GetZone(); if (!target_zone) { - ObjectLock olock2(endpoint); - endpoint->SetSyncing(false); return; } @@ -1463,12 +1473,14 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) std::unique_lock lock(m_LogLock); CloseLogFile(); + Defer reopenLog; if (count == -1 || count > 50000) { OpenLogFile(); lock.unlock(); } else { last_sync = true; + reopenLog.SetFunc([this]() { OpenLogFile(); }); } count = 0; @@ -1541,8 +1553,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) Log(LogDebug, "ApiListener") << "Error while replaying log for endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); - - break; + return; } peer_ts = pmessage->Get("timestamp"); @@ -1575,14 +1586,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client) } if (last_sync) { - { - ObjectLock olock2(endpoint); - endpoint->SetSyncing(false); - } - - OpenLogFile(); - - break; + return; } } } diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 3bae3cafd..30f1a12db 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -165,6 +165,10 @@ ConnectionRole JsonRpcConnection::GetRole() const void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { + if (m_ShuttingDown) { + BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); + } + Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); }); @@ -172,9 +176,17 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) void JsonRpcConnection::SendRawMessage(const String& message) { + if (m_ShuttingDown) { + BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); + } + Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { + if (m_ShuttingDown) { + return; + } + m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueued.Set(); }); @@ -182,6 +194,10 @@ void JsonRpcConnection::SendRawMessage(const String& message) void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) { + if (m_ShuttingDown) { + return; + } + m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueued.Set(); } @@ -190,12 +206,10 @@ void JsonRpcConnection::Disconnect() { namespace asio = boost::asio; - JsonRpcConnection::Ptr keepAlive (this); - - IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { - if (!m_ShuttingDown) { - m_ShuttingDown = true; + if (!m_ShuttingDown.exchange(true)) { + JsonRpcConnection::Ptr keepAlive (this); + IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; @@ -243,8 +257,8 @@ void JsonRpcConnection::Disconnect() shutdownTimeout->Cancel(); m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both, ec); - } - }); + }); + } } void JsonRpcConnection::MessageHandler(const String& jsonString) diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 591ddcb1f..3515573bb 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -5,6 +5,7 @@ #include "remote/i2-remote.hpp" #include "remote/endpoint.hpp" +#include "base/atomic.hpp" #include "base/io-engine.hpp" #include "base/tlsstream.hpp" #include "base/timer.hpp" @@ -77,7 +78,7 @@ private: std::vector m_OutgoingMessagesQueue; AsioConditionVariable m_OutgoingMessagesQueued; AsioConditionVariable m_WriterDone; - bool m_ShuttingDown; + Atomic m_ShuttingDown; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; JsonRpcConnection(const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);