/****************************************************************************** * Icinga 2 * * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * * as published by the Free Software Foundation; either version 2 * * of the License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software Foundation * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ #include "remote/jsonrpcconnection.hpp" #include "remote/apilistener.hpp" #include "remote/apifunction.hpp" #include "remote/jsonrpc.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" #include "base/utility.hpp" #include "base/logger.hpp" #include "base/exception.hpp" #include "base/convert.hpp" #include using namespace icinga; static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler); static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT; static Timer::Ptr l_JsonRpcConnectionTimeoutTimer; static WorkQueue *l_JsonRpcConnectionWorkQueues; static size_t l_JsonRpcConnectionWorkQueueCount; static int l_JsonRpcConnectionNextID; static Timer::Ptr l_HeartbeatTimer; JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, TlsStream::Ptr stream, ConnectionRole role) : m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(std::move(stream)), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0) { boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize); if (authenticated) m_Endpoint = Endpoint::GetByName(identity); } void JsonRpcConnection::StaticInitialize() { l_JsonRpcConnectionTimeoutTimer = new Timer(); l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::TimeoutTimerHandler)); l_JsonRpcConnectionTimeoutTimer->SetInterval(15); l_JsonRpcConnectionTimeoutTimer->Start(); l_JsonRpcConnectionWorkQueueCount = Configuration::Concurrency; l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount]; for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) { l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i)); } l_HeartbeatTimer = new Timer(); l_HeartbeatTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::HeartbeatTimerHandler)); l_HeartbeatTimer->SetInterval(10); l_HeartbeatTimer->Start(); } void JsonRpcConnection::Start() { /* the stream holds an owning reference to this object through the callback we're registering here */ m_Stream->RegisterDataHandler(std::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this))); if (m_Stream->IsDataAvailable()) DataAvailableHandler(); } double JsonRpcConnection::GetTimestamp() const { return m_Timestamp; } String JsonRpcConnection::GetIdentity() const { return m_Identity; } bool JsonRpcConnection::IsAuthenticated() const { return m_Authenticated; } Endpoint::Ptr JsonRpcConnection::GetEndpoint() const { return m_Endpoint; } TlsStream::Ptr JsonRpcConnection::GetStream() const { return m_Stream; } ConnectionRole JsonRpcConnection::GetRole() const { return m_Role; } void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { try { ObjectLock olock(m_Stream); if (m_Stream->IsEof()) return; size_t bytesSent = JsonRpc::SendMessage(m_Stream, message); if (m_Endpoint) m_Endpoint->AddMessageSent(bytesSent); } catch (const std::exception& ex) { std::ostringstream info; info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'"; Log(LogWarning, "JsonRpcConnection") << info.str() << "\n" << DiagnosticInformation(ex); Disconnect(); } } void JsonRpcConnection::Disconnect() { Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; m_Stream->Close(); if (m_Endpoint) m_Endpoint->RemoveClient(this); else { ApiListener::Ptr listener = ApiListener::GetInstance(); listener->RemoveAnonymousClient(this); } } void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString) { if (m_Stream->IsEof()) return; try { MessageHandler(jsonString); } catch (const std::exception& ex) { Log(LogWarning, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); Disconnect(); return; } } void JsonRpcConnection::MessageHandler(const String& jsonString) { Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); m_Seen = Utility::GetTime(); if (m_HeartbeatTimeout != 0) m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout; if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); /* ignore old messages */ if (ts < m_Endpoint->GetRemoteLogPosition()) return; m_Endpoint->SetRemoteLogPosition(ts); } MessageOrigin::Ptr origin = new MessageOrigin(); origin->FromClient = this; if (m_Endpoint) { if (m_Endpoint->GetZone() != Zone::GetLocalZone()) origin->FromZone = m_Endpoint->GetZone(); else origin->FromZone = Zone::GetByName(message->Get("originZone")); m_Endpoint->AddMessageReceived(jsonString.GetLength()); } Value vmethod; if (!message->Get("method", &vmethod)) { Value vid; if (!message->Get("id", &vid)) return; Log(LogWarning, "JsonRpcConnection", "We received a JSON-RPC response message. This should never happen because we're only ever sending notifications."); return; } String method = vmethod; Log(LogNotice, "JsonRpcConnection") << "Received '" << method << "' message from '" << m_Identity << "'"; Dictionary::Ptr resultMessage = new Dictionary(); try { ApiFunction::Ptr afunc = ApiFunction::GetByName(method); if (!afunc) { Log(LogNotice, "JsonRpcConnection") << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'."; } else { Dictionary::Ptr params = message->Get("params"); if (params) resultMessage->Set("result", afunc->Invoke(origin, params)); else resultMessage->Set("result", Empty); } } catch (const std::exception& ex) { /* TODO: Add a user readable error message for the remote caller */ String diagInfo = DiagnosticInformation(ex); resultMessage->Set("error", diagInfo); Log(LogWarning, "JsonRpcConnection") << "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo; } if (message->Contains("id")) { resultMessage->Set("jsonrpc", "2.0"); resultMessage->Set("id", message->Get("id")); SendMessage(resultMessage); } } bool JsonRpcConnection::ProcessMessage() { ssize_t maxMessageLength = 64 * 1024; if (m_Endpoint) maxMessageLength = -1; /* no limit */ String message; StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false, maxMessageLength); if (srs != StatusNewItem) return false; l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message)); return true; } void JsonRpcConnection::DataAvailableHandler() { bool close = false; if (!m_Stream) return; if (!m_Stream->IsEof()) { boost::mutex::scoped_lock lock(m_DataHandlerMutex); m_Stream->SetCorked(true); try { while (ProcessMessage()) ; /* empty loop body */ } catch (const std::exception& ex) { Log(LogWarning, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); Disconnect(); return; } l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&Stream::SetCorked, m_Stream, false)); } else close = true; if (close) Disconnect(); } Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) { double log_position = params->Get("log_position"); Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); if (!endpoint) return Empty; if (log_position > endpoint->GetLocalLogPosition()) endpoint->SetLocalLogPosition(log_position); return Empty; } void JsonRpcConnection::CheckLiveness() { if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { Log(LogInformation, "JsonRpcConnection") << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; Disconnect(); } } void JsonRpcConnection::TimeoutTimerHandler() { ApiListener::Ptr listener = ApiListener::GetInstance(); for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) { client->CheckLiveness(); } for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType()) { for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) { client->CheckLiveness(); } } } size_t JsonRpcConnection::GetWorkQueueCount() { return l_JsonRpcConnectionWorkQueueCount; } size_t JsonRpcConnection::GetWorkQueueLength() { size_t itemCount = 0; for (size_t i = 0; i < GetWorkQueueCount(); i++) itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength(); return itemCount; } double JsonRpcConnection::GetWorkQueueRate() { double rate = 0.0; size_t count = GetWorkQueueCount(); /* If this is a standalone environment, we don't have any queues. */ if (count == 0) return 0.0; for (size_t i = 0; i < count; i++) rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0; return rate / count; }