From a54bd9d5c4c454cbd557199f01e4c88fab4c4edd Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Wed, 20 Feb 2019 13:49:50 +0100 Subject: [PATCH] JsonRpcConnection: re-add automatic disconnect --- lib/remote/jsonrpcconnection.cpp | 28 +++++++++++++++++++++++++++- lib/remote/jsonrpcconnection.hpp | 2 ++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/lib/remote/jsonrpcconnection.cpp b/lib/remote/jsonrpcconnection.cpp index 4a50ea20c..73fcc6398 100644 --- a/lib/remote/jsonrpcconnection.cpp +++ b/lib/remote/jsonrpcconnection.cpp @@ -14,7 +14,9 @@ #include "base/tlsstream.hpp" #include #include +#include #include +#include #include #include @@ -26,7 +28,7 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler); JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr& stream, ConnectionRole role) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), - m_Role(role), m_Timestamp(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()), + m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()), m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false) { if (authenticated) @@ -45,6 +47,7 @@ void JsonRpcConnection::Start() asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); }); asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); }); asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); + asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { CheckLiveness(yc); }); } void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) @@ -66,6 +69,8 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) break; } + m_Seen = Utility::GetTime(); + try { CpuBoundWork handleMessage (yc); @@ -293,3 +298,24 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary:: return Empty; } +void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) +{ + boost::asio::deadline_timer timer (m_Stream->get_io_service()); + + for (;;) { + timer.expires_from_now(boost::posix_time::seconds(30)); + timer.async_wait(yc); + + if (m_ShuttingDown) { + break; + } + + if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { + Log(LogInformation, "JsonRpcConnection") + << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; + + Disconnect(); + break; + } + } +} diff --git a/lib/remote/jsonrpcconnection.hpp b/lib/remote/jsonrpcconnection.hpp index 54a71b889..13ee5f62d 100644 --- a/lib/remote/jsonrpcconnection.hpp +++ b/lib/remote/jsonrpcconnection.hpp @@ -67,6 +67,7 @@ private: std::shared_ptr m_Stream; ConnectionRole m_Role; double m_Timestamp; + double m_Seen; double m_NextHeartbeat; boost::asio::io_service::strand m_IoStrand; std::vector m_OutgoingMessagesQueue; @@ -77,6 +78,7 @@ private: void HandleIncomingMessages(boost::asio::yield_context yc); void WriteOutgoingMessages(boost::asio::yield_context yc); void HandleAndWriteHeartbeats(boost::asio::yield_context yc); + void CheckLiveness(boost::asio::yield_context yc); bool ProcessMessage(); void MessageHandler(const String& jsonString);