Port JsonRpcConnection to Boost ASIO

This commit is contained in:
Alexander A. Klimov 2019-02-19 13:57:36 +01:00
parent c76947e8b9
commit 6c86c127f1
6 changed files with 179 additions and 287 deletions

View File

@ -323,7 +323,7 @@ void ApiListener::UpdateConfigObject(const ConfigObject::Ptr& object, const Mess
#endif /* I2_DEBUG */
if (client)
JsonRpc::SendMessage(client->GetStream(), message);
client->SendMessage(message);
else {
Zone::Ptr target = static_pointer_cast<Zone>(object->GetZone());
@ -373,7 +373,7 @@ void ApiListener::DeleteConfigObject(const ConfigObject::Ptr& object, const Mess
#endif /* I2_DEBUG */
if (client)
JsonRpc::SendMessage(client->GetStream(), message);
client->SendMessage(message);
else {
Zone::Ptr target = static_pointer_cast<Zone>(object->GetZone());

View File

@ -212,16 +212,6 @@ void ApiListener::UpdateSSLContext()
}
m_SSLContext = context;
for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
client->Disconnect();
}
}
for (const JsonRpcConnection::Ptr& client : m_AnonymousClients) {
client->Disconnect();
}
}
void ApiListener::OnAllConfigLoaded()
@ -669,7 +659,33 @@ void ApiListener::NewClientHandlerInternal(boost::asio::yield_context yc, const
}
}
if (ctype != ClientJsonRpc) {
if (ctype == ClientJsonRpc) {
Log(LogNotice, "ApiListener", "New JSON-RPC client");
JsonRpcConnection::Ptr aclient = new JsonRpcConnection(identity, verify_ok, client, role);
if (endpoint) {
bool needSync = !endpoint->GetConnected();
endpoint->AddClient(aclient);
asio::spawn(client->get_io_service(), [this, aclient, endpoint, needSync](asio::yield_context yc) {
CpuBoundWork syncClient (yc);
SyncClient(aclient, endpoint, needSync);
});
} else if (!AddAnonymousClient(aclient)) {
Log(LogNotice, "ApiListener")
<< "Ignoring anonymous JSON-RPC connection " << conninfo
<< ". Max connections (" << GetMaxAnonymousClients() << ") exceeded.";
aclient = nullptr;
}
if (aclient) {
aclient->Start();
}
} else {
Log(LogNotice, "ApiListener", "New HTTP client");
HttpServerConnection::Ptr aclient = new HttpServerConnection(identity, verify_ok, client);
@ -810,10 +826,9 @@ void ApiListener::ApiTimerHandler()
}
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
if (client->GetTimestamp() != maxTs)
client->Disconnect();
else
if (client->GetTimestamp() == maxTs) {
client->SendMessage(lmessage);
}
}
Log(LogNotice, "ApiListener")
@ -1280,8 +1295,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
}
try {
size_t bytesSent = NetString::WriteStringToStream(client->GetStream(), pmessage->Get("message"));
endpoint->AddMessageSent(bytesSent);
client->SendMessage(JsonDecode(pmessage->Get("message")));
count++;
} catch (const std::exception& ex) {
Log(LogWarning, "ApiListener")
@ -1306,8 +1320,7 @@ void ApiListener::ReplayLog(const JsonRpcConnection::Ptr& client)
}) }
});
size_t bytesSent = JsonRpc::SendMessage(client->GetStream(), lmessage);
endpoint->AddMessageSent(bytesSent);
client->SendMessage(lmessage);
}
}
@ -1426,11 +1439,8 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
/* connection stats */
size_t jsonRpcAnonymousClients = GetAnonymousClients().size();
size_t httpClients = GetHttpClients().size();
size_t workQueueItems = JsonRpcConnection::GetWorkQueueLength();
size_t workQueueCount = JsonRpcConnection::GetWorkQueueCount();
size_t syncQueueItems = m_SyncQueue.GetLength();
size_t relayQueueItems = m_RelayQueue.GetLength();
double workQueueItemRate = JsonRpcConnection::GetWorkQueueRate();
double syncQueueItemRate = m_SyncQueue.GetTaskCount(60) / 60.0;
double relayQueueItemRate = m_RelayQueue.GetTaskCount(60) / 60.0;
@ -1446,11 +1456,8 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
{ "json_rpc", new Dictionary({
{ "anonymous_clients", jsonRpcAnonymousClients },
{ "work_queue_items", workQueueItems },
{ "work_queue_count", workQueueCount },
{ "sync_queue_items", syncQueueItems },
{ "relay_queue_items", relayQueueItems },
{ "work_queue_item_rate", workQueueItemRate },
{ "sync_queue_item_rate", syncQueueItemRate },
{ "relay_queue_item_rate", relayQueueItemRate }
}) },
@ -1467,12 +1474,9 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ApiListener::GetStatus()
perfdata->Set("num_json_rpc_anonymous_clients", jsonRpcAnonymousClients);
perfdata->Set("num_http_clients", httpClients);
perfdata->Set("num_json_rpc_work_queue_items", workQueueItems);
perfdata->Set("num_json_rpc_work_queue_count", workQueueCount);
perfdata->Set("num_json_rpc_sync_queue_items", syncQueueItems);
perfdata->Set("num_json_rpc_relay_queue_items", relayQueueItems);
perfdata->Set("num_json_rpc_work_queue_item_rate", workQueueItemRate);
perfdata->Set("num_json_rpc_sync_queue_item_rate", syncQueueItemRate);
perfdata->Set("num_json_rpc_relay_queue_item_rate", relayQueueItemRate);

View File

@ -12,41 +12,8 @@ using namespace icinga;
REGISTER_APIFUNCTION(Heartbeat, event, &JsonRpcConnection::HeartbeatAPIHandler);
void JsonRpcConnection::HeartbeatTimerHandler()
{
for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) {
Log(LogWarning, "JsonRpcConnection")
<< "Client for endpoint '" << endpoint->GetName() << "' has requested "
<< "heartbeat message but hasn't responded in time. Closing connection.";
client->Disconnect();
continue;
}
Dictionary::Ptr request = new Dictionary({
{ "jsonrpc", "2.0" },
{ "method", "event::Heartbeat" },
{ "params", new Dictionary({
{ "timeout", 120 }
}) }
});
client->SendMessage(request);
}
}
}
Value JsonRpcConnection::HeartbeatAPIHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
{
Value vtimeout = params->Get("timeout");
if (!vtimeout.IsEmpty()) {
origin->FromClient->m_NextHeartbeat = Utility::GetTime() + vtimeout;
origin->FromClient->m_HeartbeatTimeout = vtimeout;
}
return Empty;
}

View File

@ -13,6 +13,8 @@
#include <boost/thread/once.hpp>
#include <boost/regex.hpp>
#include <fstream>
#include <openssl/ssl.h>
#include <openssl/x509.h>
using namespace icinga;
@ -30,10 +32,12 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona
Dictionary::Ptr result = new Dictionary();
/* Use the presented client certificate if not provided. */
if (certText.IsEmpty())
cert = origin->FromClient->GetStream()->GetPeerCertificate();
else
if (certText.IsEmpty()) {
auto stream (origin->FromClient->GetStream());
cert = std::shared_ptr<X509>(SSL_get_peer_certificate(stream->next_layer().native_handle()), X509_free);
} else {
cert = StringToCertificate(certText);
}
if (!cert) {
Log(LogWarning, "JsonRpcConnection") << "No certificate or CSR received";
@ -121,7 +125,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona
{ "method", "pki::UpdateCertificate" },
{ "params", result }
});
JsonRpc::SendMessage(client->GetStream(), message);
client->SendMessage(message);
return result;
}
@ -192,7 +196,7 @@ Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictiona
{ "method", "pki::UpdateCertificate" },
{ "params", result }
});
JsonRpc::SendMessage(client->GetStream(), message);
client->SendMessage(message);
return result;
@ -255,7 +259,7 @@ void JsonRpcConnection::SendCertificateRequest(const JsonRpcConnection::Ptr& acl
* or b) the local zone and all parents.
*/
if (aclient)
JsonRpc::SendMessage(aclient->GetStream(), message);
aclient->SendMessage(message);
else
listener->RelayMessage(origin, Zone::GetLocalZone(), message, false);
}

View File

@ -5,11 +5,17 @@
#include "remote/apifunction.hpp"
#include "remote/jsonrpc.hpp"
#include "base/configtype.hpp"
#include "base/io-engine.hpp"
#include "base/objectlock.hpp"
#include "base/utility.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include "base/tlsstream.hpp"
#include <memory>
#include <utility>
#include <boost/asio/spawn.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/thread/once.hpp>
using namespace icinga;
@ -17,50 +23,121 @@ using namespace icinga;
static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
static WorkQueue *l_JsonRpcConnectionWorkQueues;
static size_t l_JsonRpcConnectionWorkQueueCount;
static int l_JsonRpcConnectionNextID;
static Timer::Ptr l_HeartbeatTimer;
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
TlsStream::Ptr stream, ConnectionRole role)
: m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(std::move(stream)),
m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_NextHeartbeat(0), m_HeartbeatTimeout(0)
const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role)
: m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
m_Role(role), m_Timestamp(Utility::GetTime()), m_IoStrand(stream->get_io_service()),
m_OutgoingMessagesQueued(stream->get_io_service()), m_ReaderHasError(false), m_RunningCoroutines(0)
{
boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize);
if (authenticated)
m_Endpoint = Endpoint::GetByName(identity);
}
void JsonRpcConnection::StaticInitialize()
{
l_JsonRpcConnectionTimeoutTimer = new Timer();
l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::TimeoutTimerHandler));
l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
l_JsonRpcConnectionTimeoutTimer->Start();
l_JsonRpcConnectionWorkQueueCount = Configuration::Concurrency;
l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i));
}
l_HeartbeatTimer = new Timer();
l_HeartbeatTimer->OnTimerExpired.connect(std::bind(&JsonRpcConnection::HeartbeatTimerHandler));
l_HeartbeatTimer->SetInterval(10);
l_HeartbeatTimer->Start();
m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
}
void JsonRpcConnection::Start()
{
/* the stream holds an owning reference to this object through the callback we're registering here */
m_Stream->RegisterDataHandler(std::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this)));
if (m_Stream->IsDataAvailable())
DataAvailableHandler();
namespace asio = boost::asio;
m_RunningCoroutines = 2;
asio::spawn(m_IoStrand, [this](asio::yield_context yc) { HandleIncomingMessages(yc); });
asio::spawn(m_IoStrand, [this](asio::yield_context yc) { WriteOutgoingMessages(yc); });
}
void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc)
{
Defer shutdownStreamOnce ([this, &yc]() {
m_ReaderHasError = true;
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
ShutdownStreamOnce(yc);
});
for (;;) {
String message;
try {
message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
break;
}
try {
CpuBoundWork handleMessage (yc);
MessageHandler(message);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while processing JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
break;
}
}
}
void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
{
Defer shutdownStreamOnce ([this, &yc]() { ShutdownStreamOnce(yc); });
do {
try {
m_OutgoingMessagesQueued.async_wait(yc);
} catch (...) {
}
auto queue (std::move(m_OutgoingMessagesQueue));
m_OutgoingMessagesQueue.clear();
m_OutgoingMessagesQueued.expires_at(boost::posix_time::pos_infin);
if (!queue.empty()) {
try {
for (auto& message : queue) {
size_t bytesSent = JsonRpc::SendMessage(m_Stream, message, yc);
if (m_Endpoint) {
m_Endpoint->AddMessageSent(bytesSent);
}
}
m_Stream->async_flush(yc);
} catch (const std::exception& ex) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
Log(LogWarning, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
break;
}
}
} while (!m_ReaderHasError);
}
void JsonRpcConnection::ShutdownStreamOnce(boost::asio::yield_context& yc)
{
if (!--m_RunningCoroutines) {
try {
m_Stream->next_layer().async_shutdown(yc);
} catch (...) {
// https://stackoverflow.com/questions/130117/throwing-exceptions-out-of-a-destructor
}
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
if (m_Endpoint) {
m_Endpoint->RemoveClient(this);
} else {
auto listener (ApiListener::GetInstance());
listener->RemoveAnonymousClient(this);
}
}
}
double JsonRpcConnection::GetTimestamp() const
@ -83,7 +160,7 @@ Endpoint::Ptr JsonRpcConnection::GetEndpoint() const
return m_Endpoint;
}
TlsStream::Ptr JsonRpcConnection::GetStream() const
std::shared_ptr<AsioTlsStream> JsonRpcConnection::GetStream() const
{
return m_Stream;
}
@ -95,69 +172,16 @@ ConnectionRole JsonRpcConnection::GetRole() const
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
{
try {
ObjectLock olock(m_Stream);
if (m_Stream->IsEof())
return;
size_t bytesSent = JsonRpc::SendMessage(m_Stream, message);
if (m_Endpoint)
m_Endpoint->AddMessageSent(bytesSent);
} catch (const std::exception& ex) {
std::ostringstream info;
info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
Log(LogWarning, "JsonRpcConnection")
<< info.str() << "\n" << DiagnosticInformation(ex);
Disconnect();
}
}
void JsonRpcConnection::Disconnect()
{
Log(LogWarning, "JsonRpcConnection")
<< "API client disconnected for identity '" << m_Identity << "'";
m_Stream->Close();
if (m_Endpoint)
m_Endpoint->RemoveClient(this);
else {
ApiListener::Ptr listener = ApiListener::GetInstance();
listener->RemoveAnonymousClient(this);
}
}
void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
{
if (m_Stream->IsEof())
return;
try {
MessageHandler(jsonString);
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
Disconnect();
return;
}
m_IoStrand.post([this, message]() {
m_OutgoingMessagesQueue.emplace_back(message);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
});
}
void JsonRpcConnection::MessageHandler(const String& jsonString)
{
Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
m_Seen = Utility::GetTime();
if (m_HeartbeatTimeout != 0)
m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout;
if (m_Endpoint && message->Contains("ts")) {
double ts = message->Get("ts");
@ -225,59 +249,12 @@ void JsonRpcConnection::MessageHandler(const String& jsonString)
if (message->Contains("id")) {
resultMessage->Set("jsonrpc", "2.0");
resultMessage->Set("id", message->Get("id"));
SendMessage(resultMessage);
m_OutgoingMessagesQueue.emplace_back(resultMessage);
m_OutgoingMessagesQueued.expires_at(boost::posix_time::neg_infin);
}
}
bool JsonRpcConnection::ProcessMessage()
{
/* Limit for anonymous clients (signing requests and not configured endpoints. */
ssize_t maxMessageLength = 1024 * 1024;
if (m_Endpoint)
maxMessageLength = -1; /* no limit */
String message;
StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false, maxMessageLength);
if (srs != StatusNewItem)
return false;
l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(std::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
return true;
}
void JsonRpcConnection::DataAvailableHandler()
{
bool close = false;
if (!m_Stream)
return;
if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
try {
while (ProcessMessage())
; /* empty loop body */
} catch (const std::exception& ex) {
Log(LogWarning, "JsonRpcConnection")
<< "Error while reading JSON-RPC message for identity '" << m_Identity
<< "': " << DiagnosticInformation(ex);
Disconnect();
return;
}
} else
close = true;
if (close)
Disconnect();
}
Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
{
double log_position = params->Get("log_position");
@ -292,57 +269,3 @@ Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::
return Empty;
}
void JsonRpcConnection::CheckLiveness()
{
if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
Log(LogInformation, "JsonRpcConnection")
<< "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
Disconnect();
}
}
void JsonRpcConnection::TimeoutTimerHandler()
{
ApiListener::Ptr listener = ApiListener::GetInstance();
for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) {
client->CheckLiveness();
}
for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
client->CheckLiveness();
}
}
}
size_t JsonRpcConnection::GetWorkQueueCount()
{
return l_JsonRpcConnectionWorkQueueCount;
}
size_t JsonRpcConnection::GetWorkQueueLength()
{
size_t itemCount = 0;
for (size_t i = 0; i < GetWorkQueueCount(); i++)
itemCount += l_JsonRpcConnectionWorkQueues[i].GetLength();
return itemCount;
}
double JsonRpcConnection::GetWorkQueueRate()
{
double rate = 0.0;
size_t count = GetWorkQueueCount();
/* If this is a standalone environment, we don't have any queues. */
if (count == 0)
return 0.0;
for (size_t i = 0; i < count; i++)
rate += l_JsonRpcConnectionWorkQueues[i].GetTaskCount(60) / 60.0;
return rate / count;
}

View File

@ -8,6 +8,11 @@
#include "base/tlsstream.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include <memory>
#include <vector>
#include <boost/asio/io_service_strand.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/deadline_timer.hpp>
namespace icinga
{
@ -36,7 +41,7 @@ class JsonRpcConnection final : public Object
public:
DECLARE_PTR_TYPEDEFS(JsonRpcConnection);
JsonRpcConnection(const String& identity, bool authenticated, TlsStream::Ptr stream, ConnectionRole role);
JsonRpcConnection(const String& identity, bool authenticated, const std::shared_ptr<AsioTlsStream>& stream, ConnectionRole role);
void Start();
@ -44,45 +49,34 @@ public:
String GetIdentity() const;
bool IsAuthenticated() const;
Endpoint::Ptr GetEndpoint() const;
TlsStream::Ptr GetStream() const;
std::shared_ptr<AsioTlsStream> GetStream() const;
ConnectionRole GetRole() const;
void Disconnect();
void SendMessage(const Dictionary::Ptr& request);
static void HeartbeatTimerHandler();
static Value HeartbeatAPIHandler(const intrusive_ptr<MessageOrigin>& origin, const Dictionary::Ptr& params);
static size_t GetWorkQueueCount();
static size_t GetWorkQueueLength();
static double GetWorkQueueRate();
static void SendCertificateRequest(const JsonRpcConnection::Ptr& aclient, const intrusive_ptr<MessageOrigin>& origin, const String& path);
private:
int m_ID;
String m_Identity;
bool m_Authenticated;
Endpoint::Ptr m_Endpoint;
TlsStream::Ptr m_Stream;
std::shared_ptr<AsioTlsStream> m_Stream;
ConnectionRole m_Role;
double m_Timestamp;
double m_Seen;
double m_NextHeartbeat;
double m_HeartbeatTimeout;
boost::mutex m_DataHandlerMutex;
boost::asio::io_service::strand m_IoStrand;
std::vector<Dictionary::Ptr> m_OutgoingMessagesQueue;
boost::asio::deadline_timer m_OutgoingMessagesQueued;
bool m_ReaderHasError;
unsigned char m_RunningCoroutines;
StreamReadContext m_Context;
void HandleIncomingMessages(boost::asio::yield_context yc);
void WriteOutgoingMessages(boost::asio::yield_context yc);
void ShutdownStreamOnce(boost::asio::yield_context& yc);
bool ProcessMessage();
void MessageHandlerWrapper(const String& jsonString);
void MessageHandler(const String& jsonString);
void DataAvailableHandler();
static void StaticInitialize();
static void TimeoutTimerHandler();
void CheckLiveness();
void CertificateRequestResponseHandler(const Dictionary::Ptr& message);
};