JsonRpcConnection: re-add automatic disconnect

This commit is contained in:
Alexander A. Klimov 2019-02-20 13:49:50 +01:00
parent 7aae8bd265
commit a54bd9d5c4
2 changed files with 29 additions and 1 deletions

View File

@ -14,7 +14,9 @@
#include "base/tlsstream.hpp" #include "base/tlsstream.hpp"
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
#include <boost/date_time/posix_time/ptime.hpp> #include <boost/date_time/posix_time/ptime.hpp>
#include <boost/thread/once.hpp> #include <boost/thread/once.hpp>
@ -26,7 +28,7 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role) const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
m_Role(role), m_Timestamp(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()),
m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false) m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
{ {
if (authenticated) if (authenticated)
@ -45,6 +47,7 @@ void JsonRpcConnection::Start()
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); }); asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); }); asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { CheckLiveness(yc); });
} }
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
@ -66,6 +69,8 @@ void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
break; break;
} }
m_Seen = Utility::GetTime();
try { try {
CpuBoundWork handleMessage (yc); CpuBoundWork handleMessage (yc);
@ -293,3 +298,24 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::
return Empty; return Empty;
} }
void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc)
{
boost::asio::deadline_timer timer (m_Stream->get_io_service());
for (;;) {
timer.expires_from_now(boost::posix_time::seconds(30));
timer.async_wait(yc);
if (m_ShuttingDown) {
break;
}
if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
Log(LogInformation, "JsonRpcConnection")
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
break;
}
}
}

View File

@ -67,6 +67,7 @@ private:
std::shared_ptr<AsioTlsStream> m_Stream; std::shared_ptr<AsioTlsStream> m_Stream;
ConnectionRole m_Role; ConnectionRole m_Role;
double m_Timestamp; double m_Timestamp;
double m_Seen;
double m_NextHeartbeat; double m_NextHeartbeat;
boost::asio::io_service::strand m_IoStrand; boost::asio::io_service::strand m_IoStrand;
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue; std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
@ -77,6 +78,7 @@ private:
void HandleIncomingMessages(boost::asio::yield_context yc); void HandleIncomingMessages(boost::asio::yield_context yc);
void WriteOutgoingMessages(boost::asio::yield_context yc); void WriteOutgoingMessages(boost::asio::yield_context yc);
void HandleAndWriteHeartbeats(boost::asio::yield_context yc); void HandleAndWriteHeartbeats(boost::asio::yield_context yc);
void CheckLiveness(boost::asio::yield_context yc);
bool ProcessMessage(); bool ProcessMessage();
void MessageHandler(const String& jsonString); void MessageHandler(const String& jsonString);