2012-05-10 12:06:41 +02:00
|
|
|
/******************************************************************************
|
|
|
|
* Icinga 2 *
|
|
|
|
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
|
|
|
|
* *
|
|
|
|
* This program is free software; you can redistribute it and/or *
|
|
|
|
* modify it under the terms of the GNU General Public License *
|
|
|
|
* as published by the Free Software Foundation; either version 2 *
|
|
|
|
* of the License, or (at your option) any later version. *
|
|
|
|
* *
|
|
|
|
* This program is distributed in the hope that it will be useful, *
|
|
|
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
|
|
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
|
|
|
* GNU General Public License for more details. *
|
|
|
|
* *
|
|
|
|
* You should have received a copy of the GNU General Public License *
|
|
|
|
* along with this program; if not, write to the Free Software Foundation *
|
2012-05-11 13:33:57 +02:00
|
|
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
2012-05-10 12:06:41 +02:00
|
|
|
******************************************************************************/
|
|
|
|
|
2012-09-10 14:07:32 +02:00
|
|
|
#include "i2-remoting.h"
|
2012-04-06 08:56:52 +02:00
|
|
|
|
|
|
|
using namespace icinga;
|
|
|
|
|
2012-06-21 12:51:50 +02:00
|
|
|
/**
|
|
|
|
* Constructor for the EndpointManager class.
|
|
|
|
*/
|
|
|
|
EndpointManager::EndpointManager(void)
|
|
|
|
: m_NextMessageID(0)
|
|
|
|
{
|
|
|
|
m_RequestTimer = boost::make_shared<Timer>();
|
|
|
|
m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
|
|
|
|
m_RequestTimer->SetInterval(5);
|
|
|
|
m_RequestTimer->Start();
|
2012-09-03 10:28:14 +02:00
|
|
|
|
|
|
|
m_SubscriptionTimer = boost::make_shared<Timer>();
|
|
|
|
m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this));
|
|
|
|
m_SubscriptionTimer->SetInterval(10);
|
|
|
|
m_SubscriptionTimer->Start();
|
|
|
|
|
|
|
|
m_ReconnectTimer = boost::make_shared<Timer>();
|
|
|
|
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this));
|
|
|
|
m_ReconnectTimer->SetInterval(10);
|
|
|
|
m_ReconnectTimer->Start();
|
2012-06-21 12:51:50 +02:00
|
|
|
}
|
|
|
|
|
2012-09-10 14:07:32 +02:00
|
|
|
/**
|
|
|
|
* Sets the SSL context.
|
|
|
|
*
|
|
|
|
* @param sslContext The new SSL context.
|
|
|
|
*/
|
|
|
|
void EndpointManager::SetSSLContext(const shared_ptr<SSL_CTX>& sslContext)
|
|
|
|
{
|
|
|
|
m_SSLContext = sslContext;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Retrieves the SSL context.
|
|
|
|
*
|
|
|
|
* @returns The SSL context.
|
|
|
|
*/
|
|
|
|
shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
|
|
|
|
{
|
|
|
|
return m_SSLContext;
|
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Sets the identity of the endpoint manager. This identity is used when
|
|
|
|
* connecting to remote peers.
|
|
|
|
*
|
|
|
|
* @param identity The new identity.
|
|
|
|
*/
|
2012-08-02 09:38:08 +02:00
|
|
|
void EndpointManager::SetIdentity(const String& identity)
|
2012-04-24 14:02:15 +02:00
|
|
|
{
|
2012-04-27 11:44:34 +02:00
|
|
|
m_Identity = identity;
|
2012-09-03 10:28:14 +02:00
|
|
|
|
|
|
|
if (m_Endpoint)
|
|
|
|
m_Endpoint->Unregister();
|
|
|
|
|
|
|
|
DynamicObject::Ptr object = DynamicObject::GetObject("Endpoint", identity);
|
|
|
|
|
|
|
|
if (object)
|
|
|
|
m_Endpoint = dynamic_pointer_cast<Endpoint>(object);
|
|
|
|
else
|
|
|
|
m_Endpoint = Endpoint::MakeEndpoint(identity, false);
|
2012-04-24 14:02:15 +02:00
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Retrieves the identity for the endpoint manager.
|
|
|
|
*
|
|
|
|
* @returns The identity.
|
|
|
|
*/
|
2012-08-02 09:38:08 +02:00
|
|
|
String EndpointManager::GetIdentity(void) const
|
2012-04-27 11:44:34 +02:00
|
|
|
{
|
|
|
|
return m_Identity;
|
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Creates a new JSON-RPC listener on the specified port.
|
|
|
|
*
|
2012-05-18 23:25:06 +02:00
|
|
|
* @param service The port to listen on.
|
2012-05-15 16:24:04 +02:00
|
|
|
*/
|
2012-08-02 09:38:08 +02:00
|
|
|
void EndpointManager::AddListener(const String& service)
|
2012-04-06 08:56:52 +02:00
|
|
|
{
|
2012-09-10 14:07:32 +02:00
|
|
|
shared_ptr<SSL_CTX> sslContext = GetSSLContext();
|
2012-09-03 10:28:14 +02:00
|
|
|
|
|
|
|
if (!sslContext)
|
2012-07-17 20:41:06 +02:00
|
|
|
throw_exception(logic_error("SSL context is required for AddListener()"));
|
2012-05-15 11:08:04 +02:00
|
|
|
|
2012-04-26 21:33:23 +02:00
|
|
|
stringstream s;
|
2012-05-07 13:48:17 +02:00
|
|
|
s << "Adding new listener: port " << service;
|
2012-07-10 12:21:19 +02:00
|
|
|
Logger::Write(LogInformation, "icinga", s.str());
|
2012-04-26 12:58:20 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
JsonRpcServer::Ptr server = boost::make_shared<JsonRpcServer>(sslContext);
|
|
|
|
|
|
|
|
m_Servers.insert(server);
|
|
|
|
server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler,
|
|
|
|
this, _2));
|
2012-04-06 08:56:52 +02:00
|
|
|
|
2012-05-07 13:48:17 +02:00
|
|
|
server->Bind(service, AF_INET6);
|
2012-04-06 08:56:52 +02:00
|
|
|
server->Listen();
|
|
|
|
server->Start();
|
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Creates a new JSON-RPC client and connects to the specified host and port.
|
|
|
|
*
|
2012-05-18 23:25:06 +02:00
|
|
|
* @param node The remote host.
|
|
|
|
* @param service The remote port.
|
2012-05-15 16:24:04 +02:00
|
|
|
*/
|
2012-09-03 10:28:14 +02:00
|
|
|
void EndpointManager::AddConnection(const String& node, const String& service) {
|
2012-09-10 14:07:32 +02:00
|
|
|
shared_ptr<SSL_CTX> sslContext = GetSSLContext();
|
|
|
|
|
|
|
|
if (!sslContext)
|
|
|
|
throw_exception(logic_error("SSL context is required for AddConnection()"));
|
|
|
|
|
|
|
|
JsonRpcClient::Ptr client = boost::make_shared<JsonRpcClient>(RoleOutbound, sslContext);
|
2012-09-03 10:28:14 +02:00
|
|
|
client->Connect(node, service);
|
|
|
|
NewClientHandler(client);
|
2012-04-06 08:56:52 +02:00
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Processes a new client connection.
|
|
|
|
*
|
2012-09-03 10:28:14 +02:00
|
|
|
* @param client The new client.
|
2012-05-15 16:24:04 +02:00
|
|
|
*/
|
2012-06-16 03:42:54 +02:00
|
|
|
void EndpointManager::NewClientHandler(const TcpClient::Ptr& client)
|
2012-04-06 08:56:52 +02:00
|
|
|
{
|
2012-09-03 10:28:14 +02:00
|
|
|
JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
|
2012-04-26 12:58:20 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
Logger::Write(LogInformation, "icinga", "New client connection from " + jclient->GetPeerAddress());
|
2012-04-06 08:56:52 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
m_PendingClients.insert(jclient);
|
|
|
|
jclient->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1));
|
|
|
|
jclient->Start();
|
2012-04-06 08:56:52 +02:00
|
|
|
}
|
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
void EndpointManager::ClientConnectedHandler(const TcpClient::Ptr& client)
|
2012-04-06 08:56:52 +02:00
|
|
|
{
|
2012-09-03 10:28:14 +02:00
|
|
|
JsonRpcClient::Ptr jclient = static_pointer_cast<JsonRpcClient>(client);
|
2012-04-18 15:22:25 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
m_PendingClients.erase(jclient);
|
2012-06-20 15:22:39 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
shared_ptr<X509> cert = jclient->GetPeerCertificate();
|
2012-06-20 15:22:39 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
String identity = Utility::GetCertificateCN(cert);
|
2012-07-02 11:19:14 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
Endpoint::Ptr endpoint;
|
2012-04-06 08:56:52 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
if (Endpoint::Exists(identity))
|
|
|
|
endpoint = Endpoint::GetByName(identity);
|
|
|
|
else
|
|
|
|
endpoint = Endpoint::MakeEndpoint(identity, false);
|
2012-06-20 15:22:39 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
endpoint->SetClient(jclient);
|
2012-04-06 08:56:52 +02:00
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Sends a unicast message to the specified recipient.
|
|
|
|
*
|
|
|
|
* @param sender The sender of the message.
|
|
|
|
* @param recipient The recipient of the message.
|
|
|
|
* @param message The request.
|
|
|
|
*/
|
2012-07-18 11:43:41 +02:00
|
|
|
void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender,
|
|
|
|
const Endpoint::Ptr& recipient, const MessagePart& message)
|
2012-04-23 16:48:40 +02:00
|
|
|
{
|
|
|
|
/* don't forward messages between non-local endpoints */
|
2012-05-15 16:24:04 +02:00
|
|
|
if (!sender->IsLocal() && !recipient->IsLocal())
|
2012-04-23 16:48:40 +02:00
|
|
|
return;
|
|
|
|
|
2012-06-14 11:18:20 +02:00
|
|
|
if (ResponseMessage::IsResponseMessage(message))
|
|
|
|
recipient->ProcessResponse(sender, message);
|
|
|
|
else
|
|
|
|
recipient->ProcessRequest(sender, message);
|
2012-04-23 16:48:40 +02:00
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Sends a message to exactly one recipient out of all recipients who have a
|
|
|
|
* subscription for the message's topic.
|
|
|
|
*
|
|
|
|
* @param sender The sender of the message.
|
|
|
|
* @param message The message.
|
|
|
|
*/
|
2012-07-18 11:43:41 +02:00
|
|
|
void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
|
2012-05-21 12:53:38 +02:00
|
|
|
const RequestMessage& message)
|
2012-04-06 08:56:52 +02:00
|
|
|
{
|
2012-08-02 09:38:08 +02:00
|
|
|
String method;
|
2012-06-14 11:18:20 +02:00
|
|
|
if (!message.GetMethod(&method))
|
2012-07-17 20:41:06 +02:00
|
|
|
throw_exception(invalid_argument("Message is missing the 'method' property."));
|
2012-06-14 11:18:20 +02:00
|
|
|
|
|
|
|
vector<Endpoint::Ptr> candidates;
|
2012-09-03 10:28:14 +02:00
|
|
|
DynamicObject::Ptr object;
|
|
|
|
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
|
|
|
|
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
|
2012-06-22 13:40:09 +02:00
|
|
|
/* don't forward messages between non-local endpoints */
|
|
|
|
if (!sender->IsLocal() && !endpoint->IsLocal())
|
|
|
|
continue;
|
|
|
|
|
2012-06-14 11:18:20 +02:00
|
|
|
if (endpoint->HasSubscription(method))
|
|
|
|
candidates.push_back(endpoint);
|
|
|
|
}
|
|
|
|
|
2012-06-19 09:38:20 +02:00
|
|
|
if (candidates.empty())
|
2012-06-14 11:18:20 +02:00
|
|
|
return;
|
|
|
|
|
|
|
|
Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
|
|
|
|
SendUnicastMessage(sender, recipient, message);
|
2012-04-16 16:27:41 +02:00
|
|
|
}
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
/**
|
|
|
|
* Sends a message to all recipients who have a subscription for the
|
|
|
|
* message's topic.
|
|
|
|
*
|
|
|
|
* @param sender The sender of the message.
|
|
|
|
* @param message The message.
|
|
|
|
*/
|
2012-07-18 11:43:41 +02:00
|
|
|
void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
|
2012-05-21 12:53:38 +02:00
|
|
|
const RequestMessage& message)
|
2012-04-16 16:27:41 +02:00
|
|
|
{
|
2012-08-02 09:38:08 +02:00
|
|
|
String id;
|
2012-05-15 16:24:04 +02:00
|
|
|
if (message.GetID(&id))
|
2012-07-17 20:41:06 +02:00
|
|
|
throw_exception(invalid_argument("Multicast requests must not have an ID."));
|
2012-04-16 16:27:41 +02:00
|
|
|
|
2012-08-02 09:38:08 +02:00
|
|
|
String method;
|
2012-05-15 16:24:04 +02:00
|
|
|
if (!message.GetMethod(&method))
|
2012-07-17 20:41:06 +02:00
|
|
|
throw_exception(invalid_argument("Message is missing the 'method' property."));
|
2012-04-16 16:27:41 +02:00
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
DynamicObject::Ptr object;
|
|
|
|
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
|
|
|
|
Endpoint::Ptr recipient = dynamic_pointer_cast<Endpoint>(object);
|
|
|
|
|
2012-06-14 11:18:20 +02:00
|
|
|
/* don't forward messages back to the sender */
|
|
|
|
if (sender == recipient)
|
|
|
|
continue;
|
|
|
|
|
2012-05-15 16:24:04 +02:00
|
|
|
if (recipient->HasSubscription(method))
|
|
|
|
SendUnicastMessage(sender, recipient, message);
|
2012-04-18 15:22:25 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-21 00:10:10 +02:00
|
|
|
void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
|
2012-06-14 11:18:20 +02:00
|
|
|
RequestMessage& message,
|
2012-09-21 09:43:06 +02:00
|
|
|
const EndpointManager::APICallback& callback, double timeout)
|
2012-06-14 11:18:20 +02:00
|
|
|
{
|
|
|
|
m_NextMessageID++;
|
|
|
|
|
|
|
|
stringstream idstream;
|
|
|
|
idstream << m_NextMessageID;
|
|
|
|
|
2012-08-02 09:38:08 +02:00
|
|
|
String id = idstream.str();
|
2012-06-14 11:18:20 +02:00
|
|
|
message.SetID(id);
|
|
|
|
|
|
|
|
PendingRequest pr;
|
|
|
|
pr.Request = message;
|
|
|
|
pr.Callback = callback;
|
2012-08-03 18:17:47 +02:00
|
|
|
pr.Timeout = Utility::GetTime() + timeout;
|
2012-06-14 11:18:20 +02:00
|
|
|
|
|
|
|
m_Requests[id] = pr;
|
|
|
|
|
2012-06-21 00:10:10 +02:00
|
|
|
if (!recipient)
|
|
|
|
SendAnycastMessage(sender, message);
|
|
|
|
else
|
|
|
|
SendUnicastMessage(sender, recipient, message);
|
2012-06-14 11:18:20 +02:00
|
|
|
}
|
|
|
|
|
2012-08-02 09:38:08 +02:00
|
|
|
bool EndpointManager::RequestTimeoutLessComparer(const pair<String, PendingRequest>& a,
|
|
|
|
const pair<String, PendingRequest>& b)
|
2012-06-14 11:18:20 +02:00
|
|
|
{
|
|
|
|
return a.second.Timeout < b.second.Timeout;
|
|
|
|
}
|
|
|
|
|
2012-09-03 10:28:14 +02:00
|
|
|
void EndpointManager::SubscriptionTimerHandler(void)
|
|
|
|
{
|
|
|
|
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
|
|
|
|
|
|
|
|
DynamicObject::Ptr object;
|
|
|
|
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
|
|
|
|
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
|
|
|
|
|
|
|
|
if (!endpoint->IsLocalEndpoint())
|
|
|
|
continue;
|
|
|
|
|
|
|
|
String topic;
|
|
|
|
BOOST_FOREACH(tie(tuples::ignore, topic), endpoint->GetSubscriptions()) {
|
|
|
|
subscriptions->Set(topic, topic);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-09-17 09:47:02 +02:00
|
|
|
if (m_Endpoint)
|
|
|
|
m_Endpoint->SetSubscriptions(subscriptions);
|
2012-09-03 10:28:14 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
void EndpointManager::ReconnectTimerHandler(void)
|
|
|
|
{
|
|
|
|
DynamicObject::Ptr object;
|
|
|
|
BOOST_FOREACH(tie(tuples::ignore, object), DynamicObject::GetObjects("Endpoint")) {
|
|
|
|
Endpoint::Ptr endpoint = dynamic_pointer_cast<Endpoint>(object);
|
|
|
|
|
2012-09-03 12:20:04 +02:00
|
|
|
if (endpoint->IsConnected() || endpoint == m_Endpoint)
|
2012-09-03 10:28:14 +02:00
|
|
|
continue;
|
|
|
|
|
|
|
|
String node, service;
|
|
|
|
node = endpoint->GetNode();
|
|
|
|
service = endpoint->GetService();
|
|
|
|
|
2012-09-03 12:20:04 +02:00
|
|
|
if (node.IsEmpty() || service.IsEmpty()) {
|
|
|
|
Logger::Write(LogWarning, "icinga", "Can't reconnect "
|
|
|
|
"to endpoint '" + endpoint->GetName() + "': No "
|
|
|
|
"node/service information.");
|
2012-09-03 10:28:14 +02:00
|
|
|
continue;
|
2012-09-03 12:20:04 +02:00
|
|
|
}
|
2012-09-03 10:28:14 +02:00
|
|
|
|
|
|
|
AddConnection(node, service);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-06-15 19:32:41 +02:00
|
|
|
void EndpointManager::RequestTimerHandler(void)
|
2012-06-14 11:18:20 +02:00
|
|
|
{
|
2012-08-02 09:38:08 +02:00
|
|
|
map<String, PendingRequest>::iterator it;
|
2012-06-14 11:18:20 +02:00
|
|
|
for (it = m_Requests.begin(); it != m_Requests.end(); it++) {
|
|
|
|
if (it->second.HasTimedOut()) {
|
2012-09-21 09:43:06 +02:00
|
|
|
it->second.Callback(GetSelf(), Endpoint::Ptr(),
|
|
|
|
it->second.Request, ResponseMessage(), true);
|
2012-06-14 11:18:20 +02:00
|
|
|
|
|
|
|
m_Requests.erase(it);
|
|
|
|
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2012-09-21 09:43:06 +02:00
|
|
|
void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender,
|
|
|
|
const ResponseMessage& message)
|
2012-06-14 11:18:20 +02:00
|
|
|
{
|
2012-08-02 09:38:08 +02:00
|
|
|
String id;
|
2012-06-14 11:18:20 +02:00
|
|
|
if (!message.GetID(&id))
|
2012-07-17 20:41:06 +02:00
|
|
|
throw_exception(invalid_argument("Response message must have a message ID."));
|
2012-06-14 11:18:20 +02:00
|
|
|
|
2012-08-02 09:38:08 +02:00
|
|
|
map<String, PendingRequest>::iterator it;
|
2012-06-14 11:18:20 +02:00
|
|
|
it = m_Requests.find(id);
|
|
|
|
|
|
|
|
if (it == m_Requests.end())
|
|
|
|
return;
|
|
|
|
|
2012-06-16 13:06:06 +02:00
|
|
|
it->second.Callback(GetSelf(), sender, it->second.Request, message, false);
|
2012-06-14 11:18:20 +02:00
|
|
|
|
|
|
|
m_Requests.erase(it);
|
|
|
|
}
|
2012-06-20 16:52:56 +02:00
|
|
|
|
2012-06-27 18:43:34 +02:00
|
|
|
EndpointManager::Ptr EndpointManager::GetInstance(void)
|
|
|
|
{
|
|
|
|
static EndpointManager::Ptr instance;
|
|
|
|
|
|
|
|
if (!instance)
|
|
|
|
instance = boost::make_shared<EndpointManager>();
|
|
|
|
|
|
|
|
return instance;
|
|
|
|
}
|