mirror of https://github.com/Icinga/icinga2.git
JsonRpcConnection: re-add heartbeats
This commit is contained in:
parent
84b411501b
commit
7aae8bd265
|
@ -7,13 +7,55 @@
|
|||
#include "base/configtype.hpp"
|
||||
#include "base/logger.hpp"
|
||||
#include "base/utility.hpp"
|
||||
#include <boost/asio/deadline_timer.hpp>
|
||||
#include <boost/asio/spawn.hpp>
|
||||
#include <boost/date_time/posix_time/posix_time_duration.hpp>
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
|
||||
|
||||
void JsonRpcConnection::HandleAndWriteHeartbeats(boost::asio::yield_context yc)
|
||||
{
|
||||
boost::asio::deadline_timer timer (m_Stream->get_io_service());
|
||||
|
||||
for (;;) {
|
||||
timer.expires_from_now(boost::posix_time::seconds(10));
|
||||
timer.async_wait(yc);
|
||||
|
||||
if (m_ShuttingDown) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (m_NextHeartbeat != 0 && m_NextHeartbeat < Utility::GetTime()) {
|
||||
Log(LogWarning, "JsonRpcConnection")
|
||||
<< "Client for endpoint '" << m_Endpoint->GetName() << "' has requested "
|
||||
<< "heartbeat message but hasn't responded in time. Closing connection.";
|
||||
|
||||
Disconnect();
|
||||
break;
|
||||
}
|
||||
|
||||
m_OutgoingMessagesQueue.emplace_back(new Dictionary({
|
||||
{ "jsonrpc", "2.0" },
|
||||
{ "method", "event::Heartbeat" },
|
||||
{ "params", new Dictionary({
|
||||
{ "timeout", 120 }
|
||||
}) }
|
||||
}));
|
||||
|
||||
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
|
||||
}
|
||||
}
|
||||
|
||||
Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
|
||||
{
|
||||
Value vtimeout = params->Get("timeout");
|
||||
|
||||
if (!vtimeout.IsEmpty()) {
|
||||
origin->FromClient->m_NextHeartbeat = Utility::GetTime() + vtimeout;
|
||||
}
|
||||
|
||||
return Empty;
|
||||
}
|
||||
|
||||
|
|
|
@ -26,7 +26,7 @@ REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
|
|||
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
|
||||
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
|
||||
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
|
||||
m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
|
||||
m_Role(role), m_Timestamp(Utility::GetTime()), m_NextHeartbeat(0), m_IoStrand(stream->get_io_service()),
|
||||
m_OutgoingMessagesQueued(stream->get_io_service()), m_WriterDone(stream->get_io_service()), m_ShuttingDown(false)
|
||||
{
|
||||
if (authenticated)
|
||||
|
@ -44,6 +44,7 @@ void JsonRpcConnection::Start()
|
|||
|
||||
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleIncomingMessages(yc); });
|
||||
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { WriteOutgoingMessages(yc); });
|
||||
asio::spawn(m_IoStrand, [this, preventGc](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); });
|
||||
}
|
||||
|
||||
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
|
||||
|
|
|
@ -67,6 +67,7 @@ private:
|
|||
std::shared_ptr<AsioTlsStream> m_Stream;
|
||||
ConnectionRole m_Role;
|
||||
double m_Timestamp;
|
||||
double m_NextHeartbeat;
|
||||
boost::asio::io_service::strand m_IoStrand;
|
||||
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
|
||||
boost::asio::deadline_timer m_OutgoingMessagesQueued;
|
||||
|
@ -75,6 +76,7 @@ private:
|
|||
|
||||
void HandleIncomingMessages(boost::asio::yield_context yc);
|
||||
void WriteOutgoingMessages(boost::asio::yield_context yc);
|
||||
void HandleAndWriteHeartbeats(boost::asio::yield_context yc);
|
||||
|
||||
bool ProcessMessage();
|
||||
void MessageHandler(const String& jsonString);
|
||||
|
|
Loading…
Reference in New Issue