mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-11-04 05:34:12 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			343 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			343 lines
		
	
	
		
			10 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/******************************************************************************
 | 
						|
 * Icinga 2                                                                   *
 | 
						|
 * Copyright (C) 2012-2016 Icinga Development Team (https://www.icinga.org/)  *
 | 
						|
 *                                                                            *
 | 
						|
 * This program is free software; you can redistribute it and/or              *
 | 
						|
 * modify it under the terms of the GNU General Public License                *
 | 
						|
 * as published by the Free Software Foundation; either version 2             *
 | 
						|
 * of the License, or (at your option) any later version.                     *
 | 
						|
 *                                                                            *
 | 
						|
 * This program is distributed in the hope that it will be useful,            *
 | 
						|
 * but WITHOUT ANY WARRANTY; without even the implied warranty of             *
 | 
						|
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the              *
 | 
						|
 * GNU General Public License for more details.                               *
 | 
						|
 *                                                                            *
 | 
						|
 * You should have received a copy of the GNU General Public License          *
 | 
						|
 * along with this program; if not, write to the Free Software Foundation     *
 | 
						|
 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA.             *
 | 
						|
 ******************************************************************************/
 | 
						|
 | 
						|
#include "remote/jsonrpcconnection.hpp"
 | 
						|
#include "remote/apilistener.hpp"
 | 
						|
#include "remote/apifunction.hpp"
 | 
						|
#include "remote/jsonrpc.hpp"
 | 
						|
#include "base/configtype.hpp"
 | 
						|
#include "base/objectlock.hpp"
 | 
						|
#include "base/utility.hpp"
 | 
						|
#include "base/logger.hpp"
 | 
						|
#include "base/exception.hpp"
 | 
						|
#include "base/convert.hpp"
 | 
						|
#include <boost/thread/once.hpp>
 | 
						|
 | 
						|
using namespace icinga;
 | 
						|
 | 
						|
static Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
 | 
						|
REGISTER_APIFUNCTION(SetLogPosition, log, &SetLogPositionHandler);
 | 
						|
static Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params);
 | 
						|
REGISTER_APIFUNCTION(RequestCertificate, pki, &RequestCertificateHandler);
 | 
						|
 | 
						|
static boost::once_flag l_JsonRpcConnectionOnceFlag = BOOST_ONCE_INIT;
 | 
						|
static Timer::Ptr l_JsonRpcConnectionTimeoutTimer;
 | 
						|
static WorkQueue *l_JsonRpcConnectionWorkQueues;
 | 
						|
static size_t l_JsonRpcConnectionWorkQueueCount;
 | 
						|
static int l_JsonRpcConnectionNextID;
 | 
						|
 | 
						|
JsonRpcConnection::JsonRpcConnection(const String& identity, bool authenticated,
 | 
						|
    const TlsStream::Ptr& stream, ConnectionRole role)
 | 
						|
	: m_ID(l_JsonRpcConnectionNextID++), m_Identity(identity), m_Authenticated(authenticated), m_Stream(stream),
 | 
						|
	  m_Role(role), m_Timestamp(Utility::GetTime()), m_Seen(Utility::GetTime()),
 | 
						|
	  m_NextHeartbeat(0), m_HeartbeatTimeout(0)
 | 
						|
{
 | 
						|
	boost::call_once(l_JsonRpcConnectionOnceFlag, &JsonRpcConnection::StaticInitialize);
 | 
						|
 | 
						|
	if (authenticated)
 | 
						|
		m_Endpoint = Endpoint::GetByName(identity);
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::StaticInitialize(void)
 | 
						|
{
 | 
						|
	l_JsonRpcConnectionTimeoutTimer = new Timer();
 | 
						|
	l_JsonRpcConnectionTimeoutTimer->OnTimerExpired.connect(boost::bind(&JsonRpcConnection::TimeoutTimerHandler));
 | 
						|
	l_JsonRpcConnectionTimeoutTimer->SetInterval(15);
 | 
						|
	l_JsonRpcConnectionTimeoutTimer->Start();
 | 
						|
 | 
						|
	l_JsonRpcConnectionWorkQueueCount = Application::GetConcurrency();
 | 
						|
	l_JsonRpcConnectionWorkQueues = new WorkQueue[l_JsonRpcConnectionWorkQueueCount];
 | 
						|
 | 
						|
	for (size_t i = 0; i < l_JsonRpcConnectionWorkQueueCount; i++) {
 | 
						|
		l_JsonRpcConnectionWorkQueues[i].SetName("JsonRpcConnection, #" + Convert::ToString(i));
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::Start(void)
 | 
						|
{
 | 
						|
	/* the stream holds an owning reference to this object through the callback we're registering here */
 | 
						|
	m_Stream->RegisterDataHandler(boost::bind(&JsonRpcConnection::DataAvailableHandler, JsonRpcConnection::Ptr(this)));
 | 
						|
	if (m_Stream->IsDataAvailable())
 | 
						|
		DataAvailableHandler();
 | 
						|
}
 | 
						|
 | 
						|
double JsonRpcConnection::GetTimestamp(void) const
 | 
						|
{
 | 
						|
	return m_Timestamp;
 | 
						|
}
 | 
						|
 | 
						|
String JsonRpcConnection::GetIdentity(void) const
 | 
						|
{
 | 
						|
	return m_Identity;
 | 
						|
}
 | 
						|
 | 
						|
bool JsonRpcConnection::IsAuthenticated(void) const
 | 
						|
{
 | 
						|
	return m_Authenticated;
 | 
						|
}
 | 
						|
 | 
						|
Endpoint::Ptr JsonRpcConnection::GetEndpoint(void) const
 | 
						|
{
 | 
						|
	return m_Endpoint;
 | 
						|
}
 | 
						|
 | 
						|
TlsStream::Ptr JsonRpcConnection::GetStream(void) const
 | 
						|
{
 | 
						|
	return m_Stream;
 | 
						|
}
 | 
						|
 | 
						|
ConnectionRole JsonRpcConnection::GetRole(void) const
 | 
						|
{
 | 
						|
	return m_Role;
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::SendMessage(const Dictionary::Ptr& message)
 | 
						|
{
 | 
						|
	try {
 | 
						|
		ObjectLock olock(m_Stream);
 | 
						|
		if (m_Stream->IsEof())
 | 
						|
			return;
 | 
						|
		JsonRpc::SendMessage(m_Stream, message);
 | 
						|
	} catch (const std::exception& ex) {
 | 
						|
		std::ostringstream info;
 | 
						|
		info << "Error while sending JSON-RPC message for identity '" << m_Identity << "'";
 | 
						|
		Log(LogWarning, "JsonRpcConnection")
 | 
						|
		    << info.str() << "\n" << DiagnosticInformation(ex);
 | 
						|
 | 
						|
		Disconnect();
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::Disconnect(void)
 | 
						|
{
 | 
						|
	Log(LogWarning, "JsonRpcConnection")
 | 
						|
	    << "API client disconnected for identity '" << m_Identity << "'";
 | 
						|
 | 
						|
	m_Stream->Close();
 | 
						|
 | 
						|
	if (m_Endpoint)
 | 
						|
		m_Endpoint->RemoveClient(this);
 | 
						|
	else {
 | 
						|
		ApiListener::Ptr listener = ApiListener::GetInstance();
 | 
						|
		listener->RemoveAnonymousClient(this);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::MessageHandlerWrapper(const String& jsonString)
 | 
						|
{
 | 
						|
	if (m_Stream->IsEof())
 | 
						|
		return;
 | 
						|
 | 
						|
	try {
 | 
						|
		MessageHandler(jsonString);
 | 
						|
	} catch (const std::exception& ex) {
 | 
						|
		Log(LogWarning, "JsonRpcConnection")
 | 
						|
		    << "Error while reading JSON-RPC message for identity '" << m_Identity
 | 
						|
		    << "': " << DiagnosticInformation(ex);
 | 
						|
 | 
						|
		Disconnect();
 | 
						|
 | 
						|
		return;
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::MessageHandler(const String& jsonString)
 | 
						|
{
 | 
						|
	Dictionary::Ptr message = JsonRpc::DecodeMessage(jsonString);
 | 
						|
 | 
						|
	m_Seen = Utility::GetTime();
 | 
						|
 | 
						|
	if (m_HeartbeatTimeout != 0)
 | 
						|
		m_NextHeartbeat = Utility::GetTime() + m_HeartbeatTimeout;
 | 
						|
 | 
						|
	if (m_Endpoint && message->Contains("ts")) {
 | 
						|
		double ts = message->Get("ts");
 | 
						|
 | 
						|
		/* ignore old messages */
 | 
						|
		if (ts < m_Endpoint->GetRemoteLogPosition())
 | 
						|
			return;
 | 
						|
 | 
						|
		m_Endpoint->SetRemoteLogPosition(ts);
 | 
						|
	}
 | 
						|
 | 
						|
	MessageOrigin::Ptr origin = new MessageOrigin();
 | 
						|
	origin->FromClient = this;
 | 
						|
 | 
						|
	if (m_Endpoint) {
 | 
						|
		if (m_Endpoint->GetZone() != Zone::GetLocalZone())
 | 
						|
			origin->FromZone = m_Endpoint->GetZone();
 | 
						|
		else
 | 
						|
			origin->FromZone = Zone::GetByName(message->Get("originZone"));
 | 
						|
	}
 | 
						|
 | 
						|
	String method = message->Get("method");
 | 
						|
 | 
						|
	Log(LogNotice, "JsonRpcConnection")
 | 
						|
	    << "Received '" << method << "' message from '" << m_Identity << "'";
 | 
						|
 | 
						|
	Dictionary::Ptr resultMessage = new Dictionary();
 | 
						|
 | 
						|
	try {
 | 
						|
		ApiFunction::Ptr afunc = ApiFunction::GetByName(method);
 | 
						|
 | 
						|
		if (!afunc)
 | 
						|
			BOOST_THROW_EXCEPTION(std::invalid_argument("Function '" + method + "' does not exist."));
 | 
						|
 | 
						|
		resultMessage->Set("result", afunc->Invoke(origin, message->Get("params")));
 | 
						|
	} catch (const std::exception& ex) {
 | 
						|
		/* TODO: Add a user readable error message for the remote caller */
 | 
						|
		resultMessage->Set("error", DiagnosticInformation(ex));
 | 
						|
		std::ostringstream info;
 | 
						|
		info << "Error while processing message for identity '" << m_Identity << "'";
 | 
						|
		Log(LogWarning, "JsonRpcConnection")
 | 
						|
		    << info.str() << "\n" << DiagnosticInformation(ex);
 | 
						|
	}
 | 
						|
 | 
						|
	if (message->Contains("id")) {
 | 
						|
		resultMessage->Set("jsonrpc", "2.0");
 | 
						|
		resultMessage->Set("id", message->Get("id"));
 | 
						|
		SendMessage(resultMessage);
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
bool JsonRpcConnection::ProcessMessage(void)
 | 
						|
{
 | 
						|
	String message;
 | 
						|
 | 
						|
	StreamReadStatus srs = JsonRpc::ReadMessage(m_Stream, &message, m_Context, false);
 | 
						|
 | 
						|
	if (srs != StatusNewItem)
 | 
						|
		return false;
 | 
						|
 | 
						|
	l_JsonRpcConnectionWorkQueues[m_ID % l_JsonRpcConnectionWorkQueueCount].Enqueue(boost::bind(&JsonRpcConnection::MessageHandlerWrapper, JsonRpcConnection::Ptr(this), message));
 | 
						|
 | 
						|
	return true;
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::DataAvailableHandler(void)
 | 
						|
{
 | 
						|
	bool close = false;
 | 
						|
 | 
						|
	if (!m_Stream)
 | 
						|
		return;
 | 
						|
 | 
						|
	if (!m_Stream->IsEof()) {
 | 
						|
		boost::mutex::scoped_lock lock(m_DataHandlerMutex);
 | 
						|
 | 
						|
		try {
 | 
						|
			while (ProcessMessage())
 | 
						|
				; /* empty loop body */
 | 
						|
		} catch (const std::exception& ex) {
 | 
						|
			Log(LogWarning, "JsonRpcConnection")
 | 
						|
			    << "Error while reading JSON-RPC message for identity '" << m_Identity
 | 
						|
			    << "': " << DiagnosticInformation(ex);
 | 
						|
 | 
						|
			Disconnect();
 | 
						|
 | 
						|
			return;
 | 
						|
		}
 | 
						|
	} else
 | 
						|
		close = true;
 | 
						|
 | 
						|
	if (close)
 | 
						|
		Disconnect();
 | 
						|
}
 | 
						|
 | 
						|
Value SetLogPositionHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
 | 
						|
{
 | 
						|
	if (!params)
 | 
						|
		return Empty;
 | 
						|
 | 
						|
	double log_position = params->Get("log_position");
 | 
						|
	Endpoint::Ptr endpoint = origin->FromClient->GetEndpoint();
 | 
						|
 | 
						|
	if (!endpoint)
 | 
						|
		return Empty;
 | 
						|
 | 
						|
	if (log_position > endpoint->GetLocalLogPosition())
 | 
						|
		endpoint->SetLocalLogPosition(log_position);
 | 
						|
 | 
						|
	return Empty;
 | 
						|
}
 | 
						|
 | 
						|
Value RequestCertificateHandler(const MessageOrigin::Ptr& origin, const Dictionary::Ptr& params)
 | 
						|
{
 | 
						|
	if (!params)
 | 
						|
		return Empty;
 | 
						|
 | 
						|
	Dictionary::Ptr result = new Dictionary();
 | 
						|
 | 
						|
	if (!origin->FromClient->IsAuthenticated()) {
 | 
						|
		ApiListener::Ptr listener = ApiListener::GetInstance();
 | 
						|
		String salt = listener->GetTicketSalt();
 | 
						|
 | 
						|
		if (salt.IsEmpty()) {
 | 
						|
			result->Set("error", "Ticket salt is not configured.");
 | 
						|
			return result;
 | 
						|
		}
 | 
						|
 | 
						|
		String ticket = params->Get("ticket");
 | 
						|
		String realTicket = PBKDF2_SHA1(origin->FromClient->GetIdentity(), salt, 50000);
 | 
						|
 | 
						|
		if (ticket != realTicket) {
 | 
						|
			result->Set("error", "Invalid ticket.");
 | 
						|
			return result;
 | 
						|
		}
 | 
						|
	}
 | 
						|
 | 
						|
	boost::shared_ptr<X509> cert = origin->FromClient->GetStream()->GetPeerCertificate();
 | 
						|
 | 
						|
	EVP_PKEY *pubkey = X509_get_pubkey(cert.get());
 | 
						|
	X509_NAME *subject = X509_get_subject_name(cert.get());
 | 
						|
 | 
						|
	boost::shared_ptr<X509> newcert = CreateCertIcingaCA(pubkey, subject);
 | 
						|
	result->Set("cert", CertificateToString(newcert));
 | 
						|
 | 
						|
	String cacertfile = GetIcingaCADir() + "/ca.crt";
 | 
						|
	boost::shared_ptr<X509> cacert = GetX509Certificate(cacertfile);
 | 
						|
	result->Set("ca", CertificateToString(cacert));
 | 
						|
 | 
						|
	return result;
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::CheckLiveness(void)
 | 
						|
{
 | 
						|
	if (m_Seen < Utility::GetTime() - 60 && (!m_Endpoint || !m_Endpoint->GetSyncing())) {
 | 
						|
		Log(LogInformation, "JsonRpcConnection")
 | 
						|
		    <<  "No messages for identity '" << m_Identity << "' have been received in the last 60 seconds.";
 | 
						|
		Disconnect();
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
void JsonRpcConnection::TimeoutTimerHandler(void)
 | 
						|
{
 | 
						|
	ApiListener::Ptr listener = ApiListener::GetInstance();
 | 
						|
 | 
						|
	for (const JsonRpcConnection::Ptr& client : listener->GetAnonymousClients()) {
 | 
						|
		client->CheckLiveness();
 | 
						|
	}
 | 
						|
 | 
						|
	for (const Endpoint::Ptr& endpoint : ConfigType::GetObjectsByType<Endpoint>()) {
 | 
						|
		for (const JsonRpcConnection::Ptr& client : endpoint->GetClients()) {
 | 
						|
			client->CheckLiveness();
 | 
						|
		}
 | 
						|
	}
 | 
						|
}
 | 
						|
 |