Disconnect JSON-RPC clients on ApiListner::Stop()

This commit is contained in:
Johannes Schmidt 2025-06-06 09:53:03 +02:00
parent 00802ed9fa
commit 33777f6f3f
5 changed files with 36 additions and 9 deletions

View File

@ -370,7 +370,10 @@ void ApiListener::Stop(bool runtimeDeleted)
StopListener();
DisconnectJsonRpcConnections();
m_WaitGroup->Join();
ObjectImpl<ApiListener>::Stop(runtimeDeleted);
Log(LogInformation, "ApiListener")
@ -891,7 +894,7 @@ void ApiListener::NewClientHandlerInternal(
return;
}
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(m_WaitGroup, identity, verify_ok, client, role);
if (endpoint) {
endpoint->AddClient(aclient);
@ -1805,6 +1808,20 @@ std::set<JsonRpcConnection::Ptr> ApiListener::GetAnonymousClients() const
return m_AnonymousClients;
}
void ApiListener::DisconnectJsonRpcConnections()
{
for (auto endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const auto& client : endpoint->GetClients()) {
client->Disconnect();
}
}
std::unique_lock lock(m_AnonymousClientsLock);
for (const auto & client : m_AnonymousClients){
client->Disconnect();
}
}
void ApiListener::AddHttpClient(const HttpServerConnection::Ptr& aclient)
{
std::unique_lock<std::mutex> lock(m_HttpClientsLock);

View File

@ -114,6 +114,7 @@ public:
bool AddAnonymousClient(const JsonRpcConnection::Ptr& aclient);
void RemoveAnonymousClient(const JsonRpcConnection::Ptr& aclient);
std::set<JsonRpcConnection::Ptr> GetAnonymousClients() const;
void DisconnectJsonRpcConnections();
void AddHttpClient(const HttpServerConnection::Ptr& aclient);
void RemoveHttpClient(const HttpServerConnection::Ptr& aclient);

View File

@ -60,7 +60,7 @@ void Endpoint::RemoveClient(const JsonRpcConnection::Ptr& client)
std::unique_lock<std::mutex> lock(m_ClientsLock);
m_Clients.erase(client);
Log(LogWarning, "ApiListener")
Log(LogInformation, "ApiListener")
<< "Removing API client for endpoint '" << GetName() << "'. " << m_Clients.size() << " API clients left.";
SetConnecting(false);

View File

@ -29,17 +29,17 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
static RingBuffer l_TaskStats (15 * 60);
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role)
: JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
: JsonRpcConnection(waitGroup, identity, authenticated, stream, role, IoEngine::Get().GetIoContext())
{
}
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
JsonRpcConnection::JsonRpcConnection(const WaitGroup::Ptr& waitGroup, const String& identity, bool authenticated,
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role),
m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io),
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false),
m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), m_WaitGroup(waitGroup),
m_CheckLivenessTimer(io), m_HeartbeatTimer(io)
{
if (authenticated)
@ -284,7 +284,7 @@ void JsonRpcConnection::Disconnect()
ApiListener::GetInstance()->RemoveAnonymousClient(this);
}
Log(LogWarning, "JsonRpcConnection")
Log(LogInformation, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
});
}
@ -303,6 +303,11 @@ void JsonRpcConnection::Disconnect()
*/
void JsonRpcConnection::MessageHandler(const Dictionary::Ptr& message)
{
std::shared_lock wgLock(*m_WaitGroup, std::try_to_lock);
if (!wgLock) {
return;
}
if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");

View File

@ -8,6 +8,7 @@
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/tlsstream.hpp"
#include "base/wait-group.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <memory>
@ -43,7 +44,8 @@ class JsonRpcConnection final : public Object
public:
DECLARE_PTR_TYPEDEFS(JsonRpcConnection);
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role);
JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated,
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role);
void Start();
@ -78,9 +80,11 @@ private:
AsioEvent m_OutgoingMessagesQueued;
AsioEvent m_WriterDone;
Atomic<bool> m_ShuttingDown;
WaitGroup::Ptr m_WaitGroup;
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;
JsonRpcConnection(const String& identity, bool authenticated, const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
JsonRpcConnection(const WaitGroup::Ptr& waitgroup, const String& identity, bool authenticated,
const Shared<AsioTlsStream>::Ptr& stream, ConnectionRole role, boost::asio::io_context& io);
void HandleIncomingMessages(boost::asio::yield_context yc);
void WriteOutgoingMessages(boost::asio::yield_context yc);