icinga2/lib/remote/jsonrpcconnection.cpp
Michael Friedrich 46ed01348c Ensure that API/JSON-RPC messages in the same session are processed and not stalled
This basically drops the "corked" implementation which just stalled the
TLS IO polling after some requests. If you need sort of rate limiting
for these events, use an external TLS proxy which terminates that in front
of Icinga.

fixes #6635
2018-11-12 17:07:38 +01:00

366 lines
10 KiB
C++

/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2018 Icinga Development Team (https://icinga.com/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remote/jsonrpcconnection.hpp"
#include "remote/apilistener.hpp"
#include "remote/apifunction.hpp"
#include "remote/jsonrpc.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include <boost/thread/once.hpp>
using namespace icinga;
static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
static WorkQueue *l_JsonRpcConnectionWorkQueues;
static size_t l_JsonRpcConnectionWorkQueueCount;
static int l_JsonRpcConnectionNextID;
static Timer::Ptr l_HeartbeatTimer;
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
TlsStream::Ptr stream, ConnectionRole role)
: m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(std::move(stream)),
m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0)
{
boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize);
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
}
void JsonRpcConnection::StaticInitialize()
{
l_JsonRpcConnectionTimeoutTimer = new Timer();
l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::TimeoutTimerHandler));
l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
l_JsonRpcConnectionTimeoutTimer->Start();
l_JsonRpcConnectionWorkQueueCount = Configuration::Concurrency;
l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i));
}
l_HeartbeatTimer = new Timer();
l_HeartbeatTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::HeartbeatTimerHandler));
l_HeartbeatTimer->SetInterval(10);
l_HeartbeatTimer->Start();
}
void JsonRpcConnection::Start()
{
/* the stream holds an owning reference to this object through the callback we're registering here */
m_Stream->RegisterDataHandler(std::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this)));
if (m_Stream->IsDataAvailable())
DataAvailableHandler();
}
double JsonRpcConnection::GetTimestamp() const
{
return m_Timestamp;
}
String JsonRpcConnection::GetIdentity() const
{
return m_Identity;
}
bool JsonRpcConnection::IsAuthenticated() const
{
return m_Authenticated;
}
Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
{
return m_Endpoint;
}
TlsStream::Ptr JsonRpcConnection::GetStream() const
{
return m_Stream;
}
ConnectionRole JsonRpcConnection::GetRole() const
{
return m_Role;
}
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
try {
ObjectLock olock(m_Stream);
if (m_Stream->IsEof())
return;
size_t bytesSent = JsonRpc::SendMessage(m_Stream, message);
if (m_Endpoint)
m_Endpoint->AddMessageSent(bytesSent);
} catch (const std::exception& ex) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
Log(LogWarning, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
Disconnect();
}
}
void JsonRpcConnection::Disconnect()
{
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
m_Stream->Close();
if (m_Endpoint)
m_Endpoint->RemoveClient(this);
else {
ApiListener::Ptr listener = ApiListener::GetInstance();
listener->RemoveAnonymousClient(this);
}
}
void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
{
if (m_Stream->IsEof())
return;
try {
MessageHandler(jsonString);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
Disconnect();
return;
}
}
void JsonRpcConnection::MessageHandler(const String& jsonString)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
m_Seen = Utility::GetTime();
if (m_HeartbeatTimeout != 0)
m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout;
if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");
/* ignore old messages */
if (ts < m_Endpoint->GetRemoteLogPosition())
return;
m_Endpoint->SetRemoteLogPosition(ts);
}
MessageOrigin::Ptr origin = new MessageOrigin();
origin->FromClient = this;
if (m_Endpoint) {
if (m_Endpoint->GetZone() != Zone::GetLocalZone())
origin->FromZone = m_Endpoint->GetZone();
else
origin->FromZone = Zone::GetByName(message->Get("originZone"));
m_Endpoint->AddMessageReceived(jsonString.GetLength());
}
Value vmethod;
if (!message->Get("method", &vmethod)) {
Value vid;
if (!message->Get("id", &vid))
return;
Log(LogWarning, "JsonRpcConnection",
"We received a JSON-RPC response message. This should never happen because we're only ever sending notifications.");
return;
}
String method = vmethod;
Log(LogNotice, "JsonRpcConnection")
<< "Received '" << method << "' message from '" << m_Identity << "'";
Dictionary::Ptr resultMessage = new Dictionary();
try {
ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
if (!afunc) {
Log(LogNotice, "JsonRpcConnection")
<< "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'.";
} else {
Dictionary::Ptr params = message->Get("params");
if (params)
resultMessage->Set("result", afunc->Invoke(origin, params));
else
resultMessage->Set("result", Empty);
}
} catch (const std::exception& ex) {
/* TODO: Add a user readable error message for the remote caller */
String diagInfo = DiagnosticInformation(ex);
resultMessage->Set("error", diagInfo);
Log(LogWarning, "JsonRpcConnection")
<< "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo;
}
if (message->Contains("id")) {
resultMessage->Set("jsonrpc", "2.0");
resultMessage->Set("id", message->Get("id"));
SendMessage(resultMessage);
}
}
bool JsonRpcConnection::ProcessMessage()
{
/* Limit for anonymous clients (signing requests and not configured endpoints. */
ssize_t maxMessageLength = 1024 * 1024;
if (m_Endpoint)
maxMessageLength = -1; /* no limit */
String message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false, maxMessageLength);
if (srs != StatusNewItem)
return false;
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
return true;
}
void JsonRpcConnection::DataAvailableHandler()
{
bool close = false;
if (!m_Stream)
return;
if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
try {
while (ProcessMessage())
; /* empty loop body */
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
Disconnect();
return;
}
} else
close = true;
if (close)
Disconnect();
}
Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
{
double log_position = params->Get("log_position");
Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
if (!endpoint)
return Empty;
if (log_position > endpoint->GetLocalLogPosition())
endpoint->SetLocalLogPosition(log_position);
return Empty;
}
void JsonRpcConnection::CheckLiveness()
{
if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
Log(LogInformation, "JsonRpcConnection")
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
}
}
void JsonRpcConnection::TimeoutTimerHandler()
{
ApiListener::Ptr listener = ApiListener::GetInstance();
for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) {
client->CheckLiveness();
}
for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
client->CheckLiveness();
}
}
}
size_t JsonRpcConnection::GetWorkQueueCount()
{
return l_JsonRpcConnectionWorkQueueCount;
}
size_t JsonRpcConnection::GetWorkQueueLength()
{
size_t itemCount = 0;
for (size_t i = 0; i < GetWorkQueueCount(); i++)
itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength();
return itemCount;
}
double JsonRpcConnection::GetWorkQueueRate()
{
double rate = 0.0;
size_t count = GetWorkQueueCount();
/* If this is a standalone environment, we don't have any queues. */
if (count == 0)
return 0.0;
for (size_t i = 0; i < count; i++)
rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0;
return rate / count;
}