/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #include "remote/jsonrpcconnection.hpp" #include "remote/apilistener.hpp" #include "remote/apifunction.hpp" #include "remote/jsonrpc.hpp" #include "base/defer.hpp" #include "base/configtype.hpp" #include "base/io-engine.hpp" #include "base/json.hpp" #include "base/objectlock.hpp" #include "base/utility.hpp" #include "base/logger.hpp" #include "base/exception.hpp" #include "base/convert.hpp" #include "base/tlsstream.hpp" #include #include #include #include #include #include #include using namespace icinga; static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params); REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler); static RingBuffer l_TaskStats (15 * 60); JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const AsioTlsStream::Ptr& stream, ConnectionRole role) : JsonRpcConnection(identity, authenticated, stream, role, IoEngine::Get().GetIoContext()) { } JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated, const AsioTlsStream::Ptr& stream, ConnectionRole role, boost::asio::io_context& io) : m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream), m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()), m_IoStrand(io), m_OutgoingMessagesQueued(io), m_ReadLoopDone(io), m_State(State::Active) // m_CheckLivenessTimer(io), m_HeartbeatTimer(io) { if (authenticated) m_Endpoint = Endpoint::GetByName(identity); // SSL *ssl = m_Stream->next_layer().native_handle(); // SSL_set_msg_callback(ssl, SSL_trace); // SSL_set_msg_callback_arg(ssl, BIO_new_fp(stdout,0)); } void JsonRpcConnection::Start() { namespace asio = boost::asio; JsonRpcConnection::Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleIncomingMessages(yc); }); IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { WriteOutgoingMessages(yc); }); //IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { HandleAndWriteHeartbeats(yc); }); //IoEngine::SpawnCoroutine(m_IoStrand, [this, keepAlive](asio::yield_context yc) { CheckLiveness(yc); }); } void JsonRpcConnection::HandleIncomingMessages(boost::asio::yield_context yc) { Defer signalDone ([this]() { m_ReadLoopDone.Set(); }); m_Stream->next_layer().SetSeen(&m_Seen); // Log(LogInformation, "JsonRpcConnection") << "Starting read loop for " << m_Identity; while (true) { String message; try { // Log(LogInformation, "JsonRpcConnection") << "Waiting for message from " << m_Identity; message = JsonRpc::ReadMessage(m_Stream, yc, m_Endpoint ? -1 : 1024 * 1024); // Log(LogInformation, "JsonRpcConnection") << "Successfully read message from " << m_Identity; } catch (const std::exception& ex) { // Log(LogInformation, "JsonRpcConnection") << "Read loop exception for '" << m_Identity << "': " << ex.what(); if (auto err = dynamic_cast(&ex); err && err->code() == boost::asio::error::eof) { Log(LogInformation, "JsonRpcConnection") << "Remote " << m_Identity << " sent TLS shutdown"; break; } Log(LogInformation, "JsonRpcConnection") << "Error while reading JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); ForceDisconnectInternal(); return; } if (message == "{}") { Log(LogInformation, "JsonRpcConnection") << "Remote " << m_Identity << " send JSON-RPC shutdown"; break; } m_Seen = Utility::GetTime(); try { CpuBoundWork handleMessage (yc); MessageHandler(message); l_TaskStats.InsertValue(Utility::GetTime(), 1); } catch (const std::exception& ex) { Log(LogInformation, "JsonRpcConnection") << "Error while processing JSON-RPC message for identity '" << m_Identity << "': " << DiagnosticInformation(ex); ForceDisconnectInternal(); return; } } // Log(LogInformation, "JsonRpcConnection") << "Read loop for " << m_Identity << " terminated"; // boost::asio::deadline_timer timer (m_IoStrand); // timer.expires_from_now(boost::posix_time::seconds(3)); // boost::system::error_code ec; // timer.async_wait(yc[ec]); // Utility::Sleep(2); Log(LogInformation, "JsonRpcConnection") << "Trigger disconnect after read loop teminated for " << m_Identity; Disconnect(); } void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc) { // Log(LogInformation, "JsonRpcConnection") << "Starting write loop for " << m_Identity; do { m_OutgoingMessagesQueued.Wait(yc); auto queue (std::move(m_OutgoingMessagesQueue)); m_OutgoingMessagesQueue.clear(); m_OutgoingMessagesQueued.Clear(); if (!queue.empty()) { try { for (auto& message : queue) { size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc); if (m_Endpoint) { m_Endpoint->AddMessageSent(bytesSent); } } m_Stream->async_flush(yc); } catch (const std::exception& ex) { Log(LogInformation, "JsonRpcConnection") << "Error while sending JSON-RPC message for identity '" << m_Identity << "'\n" << DiagnosticInformation(ex); ForceDisconnectInternal(); return; } } } while (m_State == State::Active); // Log(LogInformation, "JsonRpcConnection") << "Write loop for " << m_Identity << " terminated"; bool clean = true; boost::system::error_code ec; if (m_State == State::Disconnected) { return; } JsonRpc::SendRawMessage(m_Stream, "{}", yc[ec]); if (m_State == State::Disconnected) { return; } else if (ec) { Log(LogInformation, "JsonRpcConnection") << "JSON-RPC shutdown for " << m_Identity << " failed (write): " << ec.message(); clean = false; } m_Stream->async_flush(yc); if (m_State == State::Disconnected) { return; } else if (ec) { Log(LogInformation, "JsonRpcConnection") << "JSON-RPC shutdown for " << m_Identity << " failed (flush): " << ec.message(); clean = false; } // Log(LogInformation, "JsonRpcConnection") << "Starting TLS shutdown for " << m_Identity; m_Stream->next_layer().async_shutdown(yc[ec]); if (m_State == State::Disconnected) { return; } else if (ec == boost::asio::error::operation_aborted) { // Yada, yada, asio doing strange things... ec.clear(); m_Stream->next_layer().async_shutdown(yc[ec]); } if (m_State == State::Disconnected) { return; } else if (ec) { Log(LogInformation, "JsonRpcConnection") << "TLS shutdown for " << m_Identity << " failed: " << ec.message(); clean = false; } // async_shutdown() should wait for the close notify from the peer. However, in case of an error, it may exit // before the read loop finished, so wait for it explicitly. m_ReadLoopDone.Wait(yc); // Shut down the TCP connection. ec.clear(); // Log(LogInformation, "JsonRpcConnection") << "Starting TCP shutdown for " << m_Identity; m_Stream->lowest_layer().shutdown(AsioTlsStream::lowest_layer_type::shutdown_both, ec); if (m_State == State::Disconnected) { return; } else if (ec) { Log(LogInformation, "JsonRpcConnection") << "TCP shutdown for " << m_Identity << " failed: " << ec.message(); clean = false; } ForceDisconnectInternal(clean); if (clean) { Log(LogInformation, "JsonRpcConnection") << "Disconnected " << m_Identity << " cleanly"; } } double JsonRpcConnection::GetTimestamp() const { return m_Timestamp; } String JsonRpcConnection::GetIdentity() const { return m_Identity; } bool JsonRpcConnection::IsAuthenticated() const { return m_Authenticated; } Endpoint::Ptr JsonRpcConnection::GetEndpoint() const { return m_Endpoint; } AsioTlsStream::Ptr JsonRpcConnection::GetStream() const { return m_Stream; } ConnectionRole JsonRpcConnection::GetRole() const { return m_Role; } void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message) { if (m_State != State::Active) { BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); } Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { SendMessageInternal(message); }); } void JsonRpcConnection::SendRawMessage(const String& message) { if (m_State != State::Active) { BOOST_THROW_EXCEPTION(std::runtime_error("Cannot send message to already disconnected API client '" + GetIdentity() + "'!")); } Ptr keepAlive (this); m_IoStrand.post([this, keepAlive, message]() { if (m_State != State::Active) { return; } m_OutgoingMessagesQueue.emplace_back(message); m_OutgoingMessagesQueued.Set(); }); } void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message) { if (m_State != State::Active) { return; } m_OutgoingMessagesQueue.emplace_back(JsonEncode(message)); m_OutgoingMessagesQueued.Set(); } void JsonRpcConnection::Disconnect() { if (State expected = State::Active; m_State.compare_exchange_strong(expected, State::Disconnecting)) { Log(LogInformation, "JsonRpcConnection") << "Disconnecting " << m_Identity << (m_IoStrand.running_in_this_thread() ? " (internal trigger)" : " (external trigger)"); // Wake write loop to initiate the actual disconnect. if (m_IoStrand.running_in_this_thread()) { m_OutgoingMessagesQueued.Set(); } else { m_IoStrand.post([conn=Ptr(this)]() { conn->m_OutgoingMessagesQueued.Set(); }); } } } void JsonRpcConnection::ForceDisconnectInternal(bool clean) { ASSERT(m_IoStrand.running_in_this_thread()); if (m_State == State::Disconnected) { return; } if (!clean) { Log(LogWarning, "JsonRpcConnection") << "Forcefully disconnecting API client for identity '" << m_Identity << "'."; } m_Stream->ForceDisconnect(); Cleanup(); } void JsonRpcConnection::Cleanup() { ASSERT(m_IoStrand.running_in_this_thread()); if (m_State.exchange(State::Disconnected) == State::Disconnected) { return; } if (m_Endpoint) { m_Endpoint->RemoveClient(this); } else { ApiListener::GetInstance()->RemoveAnonymousClient(this); } // Wake WriteOutgoingMessages() if it waiting for new messages to send. m_OutgoingMessagesQueued.Set(); // Signal CheckLiveness() coroutine to terminate. // m_CheckLivenessTimer.cancel(); // Signal HandleAndWriteHeartbeats() coroutine to terminate. // m_HeartbeatTimer.cancel(); Log(LogWarning, "JsonRpcConnection") << "API client disconnected for identity '" << m_Identity << "'"; } void JsonRpcConnection::MessageHandler(const String& jsonString) { Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString); if (m_Endpoint && message->Contains("ts")) { double ts = message->Get("ts"); /* ignore old messages */ if (ts < m_Endpoint->GetRemoteLogPosition()) return; m_Endpoint->SetRemoteLogPosition(ts); } MessageOrigin::Ptr origin = new MessageOrigin(); origin->FromClient = this; if (m_Endpoint) { if (m_Endpoint->GetZone() != Zone::GetLocalZone()) origin->FromZone = m_Endpoint->GetZone(); else origin->FromZone = Zone::GetByName(message->Get("originZone")); m_Endpoint->AddMessageReceived(jsonString.GetLength()); } Value vmethod; if (!message->Get("method", &vmethod)) { Value vid; if (!message->Get("id", &vid)) return; Log(LogWarning, "JsonRpcConnection", "We received a JSON-RPC response message. This should never happen because we're only ever sending notifications."); return; } String method = vmethod; Log(LogNotice, "JsonRpcConnection") << "Received '" << method << "' message from identity '" << m_Identity << "'."; Dictionary::Ptr resultMessage = new Dictionary(); try { ApiFunction::Ptr afunc = ApiFunction::GetByName(method); if (!afunc) { Log(LogNotice, "JsonRpcConnection") << "Call to non-existent function '" << method << "' from endpoint '" << m_Identity << "'."; } else { Dictionary::Ptr params = message->Get("params"); if (params) resultMessage->Set("result", afunc->Invoke(origin, params)); else resultMessage->Set("result", Empty); } } catch (const std::exception& ex) { /* TODO: Add a user readable error message for the remote caller */ String diagInfo = DiagnosticInformation(ex); resultMessage->Set("error", diagInfo); Log(LogWarning, "JsonRpcConnection") << "Error while processing message for identity '" << m_Identity << "'\n" << diagInfo; } if (message->Contains("id")) { resultMessage->Set("jsonrpc", "2.0"); resultMessage->Set("id", message->Get("id")); SendMessageInternal(resultMessage); } } Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params) { double log_position = params->Get("log_position"); Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint(); if (!endpoint) return Empty; if (log_position > endpoint->GetLocalLogPosition()) endpoint->SetLocalLogPosition(log_position); return Empty; } void JsonRpcConnection::CheckLiveness(boost::asio::yield_context yc) { #if 0 boost::system::error_code ec; if (!m_Authenticated) { /* Anonymous connections are normally only used for requesting a certificate and are closed after this request * is received. However, the request is only sent if the child has successfully verified the certificate of its * parent so that it is an authenticated connection from its perspective. In case this verification fails, both * ends view it as an anonymous connection and never actually use it but attempt a reconnect after 10 seconds * leaking the connection. Therefore close it after a timeout. */ m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(10)); m_CheckLivenessTimer.async_wait(yc[ec]); if (m_State != State::Active) { return; } auto remote (m_Stream->lowest_layer().remote_endpoint()); Log(LogInformation, "JsonRpcConnection") << "Closing anonymous connection [" << remote.address() << "]:" << remote.port() << " after 10 seconds."; Disconnect(); } else { for (;;) { m_CheckLivenessTimer.expires_from_now(boost::posix_time::seconds(30)); m_CheckLivenessTimer.async_wait(yc[ec]); if (m_State != State::Active) { break; } if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) { Log(LogInformation, "JsonRpcConnection") << "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds."; ForceDisconnectInternal(); break; } } } #endif } double JsonRpcConnection::GetWorkQueueRate() { return l_TaskStats.UpdateAndGetValues(Utility::GetTime(), 60) / 60.0; }