From 9ff3d585351d97b839974d02d683d2a59b6979d2 Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 13 Nov 2024 15:28:49 +0100 Subject: [PATCH] WIP: rework JsonRpcConnection disconnect logic --- CMakeLists.txt | 2 + lib/remote/apilistener.cpp | 14 +- lib/remote/jsonrpcconnection-heartbeat.cpp | 28 +-- lib/remote/jsonrpcconnection.cpp | 215 ++++++++++++++++----- lib/remote/jsonrpcconnection.hpp | 15 +- 5 files changed, 209 insertions(+), 65 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e531a059..5ee0e244c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -185,6 +185,8 @@ add_definitions(-DBOOST_FILESYSTEM_NO_DEPRECATED) # Required for Boost v1.74+ add_definitions(-DBOOST_ASIO_USE_TS_EXECUTOR_AS_DEFAULT) +#add_definitions(-DBOOST_ASIO_ENABLE_HANDLER_TRACKING) + link_directories(${Boost_LIBRARY_DIRS}) include_directories(${Boost_INCLUDE_DIRS}) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 2c2bb7c81..4df8a267e 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -368,6 +368,18 @@ void ApiListener::Stop(bool runtimeDeleted) m_Timer->Stop(true); m_RenewOwnCertTimer->Stop(true); + for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType()) { + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + client->Disconnect(); + } + } + + for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) { + client->Disconnect(); + } + + Utility::Sleep(3); + ObjectImpl::Stop(runtimeDeleted); Log(LogInformation, "ApiListener") @@ -507,7 +519,7 @@ void ApiListener::ListenerCoroutineProc(boost::asio::yield_context yc, const Sha lastModified = Utility::GetFileCreationTime(crlPath); } - for (;;) { + while (IsActive()) { try { asio::ip::tcp::socket socket (io); diff --git a/lib/remote/jsonrpcconnection-heartbeat.cpp b/lib/remote/jsonrpcconnection-heartbeat.cpp index 9b83c1363..310b278f7 100644 --- a/lib/remote/jsonrpcconnection-heartbeat.cpp +++ b/lib/remote/jsonrpcconnection-heartbeat.cpp @@ -23,22 +23,22 @@ REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler); void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc) { - boost::system::error_code ec; + //boost::system::error_code ec; - for (;;) { - m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20)); - m_HeartbeatTimer.async_wait(yc[ec]); + //for (;;) { + // m_HeartbeatTimer.expires_from_now(boost::posix_time::seconds(20)); + // m_HeartbeatTimer.async_wait(yc[ec]); - if (m_ShuttingDown) { - break; - } - - SendMessageInternal(new Dictionary({ - { "jsonrpc", "2.0" }, - { "method", "event::Heartbeat" }, - { "params", new Dictionary() } - })); - } + // if (m_State != State::Active) { + // break; + // } + // + // SendMessageInternal(new Dictionary({ + // { "jsonrpc", "2.0" }, + // { "method", "event::Heartbeat" }, + // { "params", new Dictionary() } + // })); + //} } Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 588154fe5..fa9444f92 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -39,11 +39,15 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const AsioTlsStream::Ptr& stream, ConnectionRole role, boost::asio::io_context& io) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io), - m_OutgoingMessagesQueued(io), m_WriterDone(io), m_ShuttingDown(false), - m_CheckLivenessTimer(io), m_HeartbeatTimer(io) + m_OutgoingMessagesQueued(io), m_ReadLoopDone(io), m_State(State::Active) + // m_CheckLivenessTimer(io), m_HeartbeatTimer(io) { if (authenticated) m_Endpoint = Endpoint::GetByName(identity); + + // SSL *ssl = m_Stream->next_layer().native_handle(); + // SSL_set_msg_callback(ssl, SSL_trace); + // SSL_set_msg_callback_arg(ssl, BIO_new_fp(stdout,0)); } void JsonRpcConnection::Start() @@ -54,24 +58,42 @@ void JsonRpcConnection::Start() IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); }); IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); }); - IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); - IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); + //IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); + //IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); } void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { + Defer signalDone ([this]() { m_ReadLoopDone.Set(); }); + m_Stream->next_layer().SetSeen(&m_Seen); - while (!m_ShuttingDown) { + // Log(LogInformation, "JsonRpcConnection") << "Starting read loop for " << m_Identity; + while (true) { String message; try { + // Log(LogInformation, "JsonRpcConnection") << "Waiting for message from " << m_Identity; message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); + // Log(LogInformation, "JsonRpcConnection") << "Successfully read message from " << m_Identity; } catch (const std::exception& ex) { - Log(m_ShuttingDown ? LogDebug : LogNotice, "JsonRpcConnection") + // Log(LogInformation, "JsonRpcConnection") << "Read loop exception for '" << m_Identity << "': " << ex.what(); + + if (auto err = dynamic_cast(&ex); err && err->code() == boost::asio::error::eof) { + Log(LogInformation, "JsonRpcConnection") << "Remote " << m_Identity << " sent TLS shutdown"; + break; + } + + Log(LogInformation, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); + ForceDisconnectInternal(); + return; + } + + if (message == "{}") { + Log(LogInformation, "JsonRpcConnection") << "Remote " << m_Identity << " send JSON-RPC shutdown"; break; } @@ -84,20 +106,31 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) l_TaskStats.InsertValue(Utility::GetTime(), 1); } catch (const std::exception& ex) { - Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection") + Log(LogInformation, "JsonRpcConnection") << "Error while processing JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); - break; + ForceDisconnectInternal(); + return; } } + // Log(LogInformation, "JsonRpcConnection") << "Read loop for " << m_Identity << " terminated"; + + // boost::asio::deadline_timer timer (m_IoStrand); + // timer.expires_from_now(boost::posix_time::seconds(3)); + // boost::system::error_code ec; + // timer.async_wait(yc[ec]); + // Utility::Sleep(2); + + Log(LogInformation, "JsonRpcConnection") << "Trigger disconnect after read loop teminated for " << m_Identity; + Disconnect(); } void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) { - Defer signalWriterDone ([this]() { m_WriterDone.Set(); }); + // Log(LogInformation, "JsonRpcConnection") << "Starting write loop for " << m_Identity; do { m_OutgoingMessagesQueued.Wait(yc); @@ -119,16 +152,78 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) m_Stream->async_flush(yc); } catch (const std::exception& ex) { - Log(m_ShuttingDown ? LogDebug : LogWarning, "JsonRpcConnection") + Log(LogInformation, "JsonRpcConnection") << "Error while sending JSON-RPC message for identity '" << m_Identity << "'\n" << DiagnosticInformation(ex); - break; + ForceDisconnectInternal(); + return; } } - } while (!m_ShuttingDown); + } while (m_State == State::Active); - Disconnect(); + // Log(LogInformation, "JsonRpcConnection") << "Write loop for " << m_Identity << " terminated"; + + bool clean = true; + boost::system::error_code ec; + + if (m_State == State::Disconnected) { + return; + } + + JsonRpc::SendRawMessage(m_Stream, "{}", yc[ec]); + if (m_State == State::Disconnected) { + return; + } else if (ec) { + Log(LogInformation, "JsonRpcConnection") << "JSON-RPC shutdown for " << m_Identity << " failed (write): " << ec.message(); + clean = false; + } + + m_Stream->async_flush(yc); + if (m_State == State::Disconnected) { + return; + } else if (ec) { + Log(LogInformation, "JsonRpcConnection") << "JSON-RPC shutdown for " << m_Identity << " failed (flush): " << ec.message(); + clean = false; + } + + // Log(LogInformation, "JsonRpcConnection") << "Starting TLS shutdown for " << m_Identity; + m_Stream->next_layer().async_shutdown(yc[ec]); + if (m_State == State::Disconnected) { + return; + } else if (ec == boost::asio::error::operation_aborted) { + // Yada, yada, asio doing strange things... + ec.clear(); + m_Stream->next_layer().async_shutdown(yc[ec]); + } + + if (m_State == State::Disconnected) { + return; + } else if (ec) { + Log(LogInformation, "JsonRpcConnection") << "TLS shutdown for " << m_Identity << " failed: " << ec.message(); + clean = false; + } + + // async_shutdown() should wait for the close notify from the peer. However, in case of an error, it may exit + // before the read loop finished, so wait for it explicitly. + m_ReadLoopDone.Wait(yc); + + // Shut down the TCP connection. + ec.clear(); + // Log(LogInformation, "JsonRpcConnection") << "Starting TCP shutdown for " << m_Identity; + m_Stream->lowest_layer().shutdown(AsioTlsStream::lowest_layer_type::shutdown_both, ec); + if (m_State == State::Disconnected) { + return; + } else if (ec) { + Log(LogInformation, "JsonRpcConnection") << "TCP shutdown for " << m_Identity << " failed: " << ec.message(); + clean = false; + } + + ForceDisconnectInternal(clean); + + if (clean) { + Log(LogInformation, "JsonRpcConnection") << "Disconnected " << m_Identity << " cleanly"; + } } double JsonRpcConnection::GetTimestamp() const @@ -163,7 +258,7 @@ ConnectionRole JsonRpcConnection::GetRole() const void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { - if (m_ShuttingDown) { + if (m_State != State::Active) { BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); } @@ -174,14 +269,14 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) void JsonRpcConnection::SendRawMessage(const String& message) { - if (m_ShuttingDown) { + if (m_State != State::Active) { BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); } Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { - if (m_ShuttingDown) { + if (m_State != State::Active) { return; } @@ -192,7 +287,7 @@ void JsonRpcConnection::SendRawMessage(const String& message) void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) { - if (m_ShuttingDown) { + if (m_State != State::Active) { return; } @@ -202,37 +297,61 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) void JsonRpcConnection::Disconnect() { - namespace asio = boost::asio; - - if (!m_ShuttingDown.exchange(true)) { - JsonRpcConnection::Ptr keepAlive (this); - - IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { - Log(LogWarning, "JsonRpcConnection") - << "API client disconnected for identity '" << m_Identity << "'"; - - // We need to unregister the endpoint client as soon as possible not to confuse Icinga 2, - // given that Endpoint::GetConnected() is just performing a check that the endpoint's client - // cache is not empty, which could result in an already disconnected endpoint never trying to - // reconnect again. See #7444. - if (m_Endpoint) { - m_Endpoint->RemoveClient(this); - } else { - ApiListener::GetInstance()->RemoveAnonymousClient(this); - } - + if (State expected = State::Active; m_State.compare_exchange_strong(expected, State::Disconnecting)) { + Log(LogInformation, "JsonRpcConnection") << "Disconnecting " << m_Identity << (m_IoStrand.running_in_this_thread() ? " (internal trigger)" : " (external trigger)"); + // Wake write loop to initiate the actual disconnect. + if (m_IoStrand.running_in_this_thread()) { m_OutgoingMessagesQueued.Set(); - - m_WriterDone.Wait(yc); - - m_CheckLivenessTimer.cancel(); - m_HeartbeatTimer.cancel(); - - m_Stream->GracefulDisconnect(m_IoStrand, yc); - }); + } else { + m_IoStrand.post([conn=Ptr(this)]() { + conn->m_OutgoingMessagesQueued.Set(); + }); + } } } +void JsonRpcConnection::ForceDisconnectInternal(bool clean) +{ + ASSERT(m_IoStrand.running_in_this_thread()); + + if (m_State == State::Disconnected) { + return; + } + + if (!clean) { + Log(LogWarning, "JsonRpcConnection") << "Forcefully disconnecting API client for identity '" << m_Identity << "'."; + } + + m_Stream->ForceDisconnect(); + Cleanup(); +} + +void JsonRpcConnection::Cleanup() +{ + ASSERT(m_IoStrand.running_in_this_thread()); + + if (m_State.exchange(State::Disconnected) == State::Disconnected) { + return; + } + + if (m_Endpoint) { + m_Endpoint->RemoveClient(this); + } else { + ApiListener::GetInstance()->RemoveAnonymousClient(this); + } + + // Wake WriteOutgoingMessages() if it waiting for new messages to send. + m_OutgoingMessagesQueued.Set(); + + // Signal CheckLiveness() coroutine to terminate. + // m_CheckLivenessTimer.cancel(); + + // Signal HandleAndWriteHeartbeats() coroutine to terminate. + // m_HeartbeatTimer.cancel(); + + Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; +} + void JsonRpcConnection::MessageHandler(const String& jsonString) { Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); @@ -325,6 +444,7 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary:: void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) { +#if 0 boost::system::error_code ec; if (!m_Authenticated) { @@ -338,7 +458,7 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10)); m_CheckLivenessTimer.async_wait(yc[ec]); - if (m_ShuttingDown) { + if (m_State != State::Active) { return; } @@ -353,7 +473,7 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30)); m_CheckLivenessTimer.async_wait(yc[ec]); - if (m_ShuttingDown) { + if (m_State != State::Active) { break; } @@ -361,11 +481,12 @@ void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) Log(LogInformation, "JsonRpcConnection") << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; - Disconnect(); + ForceDisconnectInternal(); break; } } } +#endif } double JsonRpcConnection::GetWorkQueueRate() diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 3ecf76d5d..6e34af63e 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -66,6 +66,13 @@ public: static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr& origin, const String& path); private: + enum class State + { + Active, // up and running (initial state, as JsonRpcConnection is constructed from an open connection) + Disconnecting, // in the process of being shut down gracefully + Disconnected, // completely shut down + }; + String m_Identity; bool m_Authenticated; Endpoint::Ptr m_Endpoint; @@ -76,9 +83,9 @@ private: boost::asio::io_context::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; AsioConditionVariable m_OutgoingMessagesQueued; - AsioConditionVariable m_WriterDone; - Atomic m_ShuttingDown; - boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; + AsioConditionVariable m_ReadLoopDone; + Atomic m_State; + //boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer; JsonRpcConnection(const String& identity, bool authenticated, const AsioTlsStream::Ptr& stream, ConnectionRole role, boost::asio::io_context& io); @@ -93,6 +100,8 @@ private: void CertificateRequestResponseHandler(const Dictionary::Ptr& message); void SendMessageInternal(const Dictionary::Ptr& request); + void ForceDisconnectInternal(bool clean = false); + void Cleanup(); }; }