From 84b411501b9736d4b1b4489f8d43e79f128ff6dc Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 20 Feb 2019 12:00:11 +0100 Subject: [PATCH] Re-add JsonRpcConnection#Disconnect() --- lib/remote/apilistener.cpp | 12 ++++ lib/remote/jsonrpcconnection.cpp | 109 +++++++++++++++++++------------ lib/remote/jsonrpcconnection.hpp | 7 +- 3 files changed, 82 insertions(+), 46 deletions(-) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 56fda20e5..dccb8867f 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -213,6 +213,16 @@ void ApiListener::UpdateSSLContext() } m_SSLContext = context; + + for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType()) { + for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { + client->Disconnect(); + } + } + + for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) { + client->Disconnect(); + } } void ApiListener::OnAllConfigLoaded() @@ -841,6 +851,8 @@ void ApiListener::ApiTimerHandler() for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { if (client->GetTimestamp() == maxTs) { client->SendMessage(lmessage); + } else { + client->Disconnect(); } } diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 3840a8c89..ff2e67ebf 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -27,32 +27,28 @@ JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr& stream, ConnectionRole role) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()), - m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0) + m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false) { if (authenticated) m_Endpoint = Endpoint::GetByName(identity); m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin); + m_WriterDone.expires_at(boost::posix_time::pos_infin); } void JsonRpcConnection::Start() { namespace asio = boost::asio; - m_RunningCoroutines = 2; + JsonRpcConnection::Ptr preventGc (this); - asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); }); - asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); }); + asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); }); + asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); }); } void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { - Defer shutdownStreamOnce ([this, &yc]() { - m_ReaderHasError = true; - m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); - - ShutdownStreamOnce(yc); - }); + Defer disconnect ([this]() { Disconnect(); }); for (;;) { String message; @@ -60,9 +56,11 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) try { message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); } catch (const std::exception& ex) { - Log(LogWarning, "JsonRpcConnection") - << "Error while reading JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); + if (!m_ShuttingDown) { + Log(LogWarning, "JsonRpcConnection") + << "Error while reading JSON-RPC message for identity '" << m_Identity + << "': " << DiagnosticInformation(ex); + } break; } @@ -72,9 +70,11 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) MessageHandler(message); } catch (const std::exception& ex) { - Log(LogWarning, "JsonRpcConnection") - << "Error while processing JSON-RPC message for identity '" << m_Identity - << "': " << DiagnosticInformation(ex); + if (!m_ShuttingDown) { + Log(LogWarning, "JsonRpcConnection") + << "Error while processing JSON-RPC message for identity '" << m_Identity + << "': " << DiagnosticInformation(ex); + } break; } @@ -83,7 +83,9 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) { - Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); }); + Defer disconnect ([this]() { Disconnect(); }); + + Defer signalWriterDone ([this]() { m_WriterDone.expires_at(boost::posix_time::neg_infin); }); do { try { @@ -108,36 +110,17 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) m_Stream->async_flush(yc); } catch (const std::exception& ex) { - std::ostringstream info; - info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'"; - Log(LogWarning, "JsonRpcConnection") - << info.str() << "\n" << DiagnosticInformation(ex); + if (!m_ShuttingDown) { + std::ostringstream info; + info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'"; + Log(LogWarning, "JsonRpcConnection") + << info.str() << "\n" << DiagnosticInformation(ex); + } break; } } - } while (!m_ReaderHasError); -} - -void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc) -{ - if (!--m_RunningCoroutines) { - try { - m_Stream->next_layer().async_shutdown(yc); - } catch (...) { - // https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor - } - - Log(LogWarning, "JsonRpcConnection") - << "API client disconnected for identity '" << m_Identity << "'"; - - if (m_Endpoint) { - m_Endpoint->RemoveClient(this); - } else { - auto listener (ApiListener::GetInstance()); - listener->RemoveAnonymousClient(this); - } - } + } while (!m_ShuttingDown); } double JsonRpcConnection::GetTimestamp() const @@ -178,6 +161,46 @@ void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) }); } +void JsonRpcConnection::Disconnect() +{ + namespace asio = boost::asio; + + JsonRpcConnection::Ptr preventGc (this); + + asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { + if (!m_ShuttingDown) { + m_ShuttingDown = true; + + Log(LogWarning, "JsonRpcConnection") + << "API client disconnected for identity '" << m_Identity << "'"; + + m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin); + + try { + m_WriterDone.async_wait(yc); + } catch (...) { + } + + try { + m_Stream->next_layer().async_shutdown(yc); + } catch (...) { + } + + try { + m_Stream->lowest_layer().shutdown(m_Stream->lowest_layer().shutdown_both); + } catch (...) { + } + + if (m_Endpoint) { + m_Endpoint->RemoveClient(this); + } else { + auto listener (ApiListener::GetInstance()); + listener->RemoveAnonymousClient(this); + } + } + }); +} + void JsonRpcConnection::MessageHandler(const String& jsonString) { Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 5263b30ac..51787244e 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -52,6 +52,8 @@ public: std::shared_ptr GetStream() const; ConnectionRole GetRole() const; + void Disconnect(); + void SendMessage(const Dictionary::Ptr& request); static Value HeartbeatAPIHandler(const intrusive_ptr& origin, const Dictionary::Ptr& params); @@ -68,12 +70,11 @@ private: boost::asio::io_service::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; boost::asio::deadline_timer m_OutgoingMessagesQueued; - bool m_ReaderHasError; - unsigned char m_RunningCoroutines; + boost::asio::deadline_timer m_WriterDone; + bool m_ShuttingDown; void HandleIncomingMessages(boost::asio::yield_context yc); void WriteOutgoingMessages(boost::asio::yield_context yc); - void ShutdownStreamOnce(boost::asio::yield_context& yc); bool ProcessMessage(); void MessageHandler(const String& jsonString);