From 33777f6f3f3a8fa94d462d4e4b561acc0360034a Mon Sep 17 00:00:00 2001 From: Johannes Schmidt Date: Fri, 6 Jun 2025 09:53:03 +0200 Subject: [PATCH] Disconnect JSON-RPC clients on ApiListner::Stop() --- lib/remote/apilistener.cpp | 19 ++++++++++++++++++- lib/remote/apilistener.hpp | 1 + lib/remote/endpoint.cpp | 2 +- lib/remote/jsonrpcconnection.cpp | 15 ++++++++++----- lib/remote/jsonrpcconnection.hpp | 8 ++++++-- 5 files changed, 36 insertions(+), 9 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index d8f7b0888..9285f747f 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -370,7 +370,10 @@ void ApiListener::Stop(bool runtimeDeleted) StopListener(); + DisconnectJsonRpcConnections(); + m_WaitGroup->Join(); + ObjectImpl::Stop(runtimeDeleted); Log(LogInformation, "ApiListener") @@ -891,7 +894,7 @@ void ApiListener::NewClientHandlerInternal( return; } - JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role); + JsonRpcConnection::Ptr aclient = new JsonRpcConnection(m_WaitGroup, identity, verify_ok, client, role); if (endpoint) { endpoint->AddClient(aclient); @@ -1805,6 +1808,20 @@ std::set ApiListener::GetAnonymousClients() const return m_AnonymousClients; } +void ApiListener::DisconnectJsonRpcConnections() +{ + for (auto endpoint : ConfigType::GetObjectsByType()) { + for (const auto& client : endpoint->GetClients()) { + client->Disconnect(); + } + } + + std::unique_lock lock(m_AnonymousClientsLock); + for (const auto & client : m_AnonymousClients){ + client->Disconnect(); + } +} + void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient) { std::unique_lock lock(m_HttpClientsLock); diff --git a/lib/remote/apilistener.hpp b/lib/remote/apilistener.hpp index 82137fa32..f278c2e9b 100644 --- a/lib/remote/apilistener.hpp +++ b/lib/remote/apilistener.hpp @@ -114,6 +114,7 @@ public: bool AddAnonymousClient(const JsonRpcConnection::Ptr& aclient); void RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient); std::set GetAnonymousClients() const; + void DisconnectJsonRpcConnections(); void AddHttpClient(const HttpServerConnection::Ptr& aclient); void RemoveHttpClient(const HttpServerConnection::Ptr& aclient); diff --git a/lib/remote/endpoint.cpp b/lib/remote/endpoint.cpp index e534fc178..55ab68f12 100644 --- a/lib/remote/endpoint.cpp +++ b/lib/remote/endpoint.cpp @@ -60,7 +60,7 @@ void Endpoint::RemoveClient(const JsonRpcConnection::Ptr& client) std::unique_lock lock(m_ClientsLock); m_Clients.erase(client); - Log(LogWarning, "ApiListener") + Log(LogInformation, "ApiListener") << "Removing API client for endpoint '" << GetName() << "'. " << m_Clients.size() << " API clients left."; SetConnecting(false); diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 889d4452c..a84f98d9f 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -29,17 +29,17 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler); static RingBuffer l_TaskStats (15 * 60); -JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, +JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role) - : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext()) + : JsonRpcConnection(waitGroup, identity, authenticated, stream, role, IoEngine::Get().GetIoContext()) { } -JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, +JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io), - m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), + m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup), m_CheckLivenessTimer(io), m_HeartbeatTimer(io) { if (authenticated) @@ -284,7 +284,7 @@ void JsonRpcConnection::Disconnect() ApiListener::GetInstance()->RemoveAnonymousClient(this); } - Log(LogWarning, "JsonRpcConnection") + Log(LogInformation, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; }); } @@ -303,6 +303,11 @@ void JsonRpcConnection::Disconnect() */ void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message) { + std::shared_lock wgLock(*m_WaitGroup, std::try_to_lock); + if (!wgLock) { + return; + } + if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 826d3b46a..df846527a 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -8,6 +8,7 @@ #include "base/atomic.hpp" #include "base/io-engine.hpp" #include "base/tlsstream.hpp" +#include "base/wait-group.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" #include @@ -43,7 +44,8 @@ class JsonRpcConnection final : public Object public: DECLARE_PTR_TYPEDEFS(JsonRpcConnection); - JsonRpcConnection(const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role); + JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated, + const Shared::Ptr& stream, ConnectionRole role); void Start(); @@ -78,9 +80,11 @@ private: AsioEvent m_OutgoingMessagesQueued; AsioEvent m_WriterDone; Atomic m_ShuttingDown; + WaitGroup::Ptr m_WaitGroup; boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; - JsonRpcConnection(const String& identity, bool authenticated, const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); + JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated, + const Shared::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); void HandleIncomingMessages(boost::asio::yield_context yc); void WriteOutgoingMessages(boost::asio::yield_context yc);