icinga2/icinga/endpointmanager.cpp

168 lines
4.5 KiB
C++
Raw Normal View History

#include "i2-icinga.h"
using namespace icinga;
2012-04-24 14:02:15 +02:00
EndpointManager::EndpointManager(shared_ptr<SSL_CTX> sslContext)
{
m_SSLContext = sslContext;
}
void EndpointManager::AddListener(unsigned short port)
{
2012-04-26 21:33:23 +02:00
stringstream s;
s << "Adding new listener: port " << port;
Application::Log(s.str());
2012-04-24 14:02:15 +02:00
JsonRpcServer::Ptr server = make_shared<JsonRpcServer>(m_SSLContext);
RegisterServer(server);
2012-04-27 09:54:07 +02:00
server->Bind(port, AF_INET6);
server->Listen();
server->Start();
}
2012-04-18 15:22:25 +02:00
void EndpointManager::AddConnection(string host, unsigned short port)
{
stringstream s;
2012-04-27 09:54:07 +02:00
s << "Adding new endpoint: [" << host << "]:" << port;
Application::Log(s.str());
2012-04-18 15:22:25 +02:00
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
2012-04-24 14:02:15 +02:00
endpoint->Connect(host, port, m_SSLContext);
RegisterEndpoint(endpoint);
}
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
{
m_Servers.push_front(server);
server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler, shared_from_this());
}
2012-04-18 15:22:25 +02:00
int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
{
string address = ncea.Client->GetPeerAddress();
Application::Log("Accepted new client from " + address);
2012-04-18 15:22:25 +02:00
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(ncea.Client));
RegisterEndpoint(endpoint);
return 0;
}
2012-04-18 15:22:25 +02:00
void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
{
2012-04-18 15:22:25 +02:00
m_Servers.remove(server);
// TODO: unbind event
}
void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
{
2012-04-18 15:22:25 +02:00
endpoint->SetEndpointManager(static_pointer_cast<EndpointManager>(shared_from_this()));
m_Endpoints.push_front(endpoint);
2012-04-18 15:22:25 +02:00
endpoint->OnNewMethodSink += bind_weak(&EndpointManager::NewMethodSinkHandler, shared_from_this());
endpoint->ForeachMethodSink(bind(&EndpointManager::NewMethodSinkHandler, this, _1));
endpoint->OnNewMethodSource += bind_weak(&EndpointManager::NewMethodSourceHandler, shared_from_this());
endpoint->ForeachMethodSource(bind(&EndpointManager::NewMethodSourceHandler, this, _1));
NewEndpointEventArgs neea;
neea.Source = shared_from_this();
neea.Endpoint = endpoint;
OnNewEndpoint(neea);
}
void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
{
m_Endpoints.remove(endpoint);
}
void EndpointManager::SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr recipient, const JsonRpcRequest& request, bool fromLocal)
{
if (sender == recipient)
return;
/* don't forward messages between non-local endpoints */
if (!fromLocal && !recipient->IsLocal())
return;
string method;
if (!request.GetMethod(&method))
throw InvalidArgumentException("Missing 'method' parameter.");
if (recipient->IsMethodSink(method) && recipient->IsAllowedMethodSink(method))
recipient->ProcessRequest(sender, request);
}
2012-04-18 15:22:25 +02:00
void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
{
2012-04-16 16:27:41 +02:00
throw NotImplementedException();
}
2012-04-18 15:22:25 +02:00
void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
2012-04-16 16:27:41 +02:00
{
#ifdef _DEBUG
string id;
2012-04-18 15:22:25 +02:00
if (request.GetID(&id))
2012-04-16 16:27:41 +02:00
throw InvalidArgumentException("Multicast requests must not have an ID.");
#endif /* _DEBUG */
string method;
2012-04-18 15:22:25 +02:00
if (!request.GetMethod(&method))
2012-04-16 16:27:41 +02:00
throw InvalidArgumentException();
for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
{
SendUnicastRequest(sender, *i, request, fromLocal);
2012-04-18 15:22:25 +02:00
}
}
int EndpointManager::NewMethodSinkHandler(const NewMethodEventArgs& ea)
{
Endpoint::Ptr sender = static_pointer_cast<Endpoint>(ea.Source);
if (!sender->IsLocal())
return 0;
JsonRpcRequest request;
request.SetMethod("message::Subscribe");
2012-04-20 13:49:04 +02:00
SubscriptionMessage subscriptionMessage;
subscriptionMessage.SetMethod(ea.Method);
request.SetParams(subscriptionMessage);
2012-04-18 15:22:25 +02:00
SendMulticastRequest(sender, request);
return 0;
}
int EndpointManager::NewMethodSourceHandler(const NewMethodEventArgs& ea)
{
Endpoint::Ptr sender = static_pointer_cast<Endpoint>(ea.Source);
if (!sender->IsLocal())
return 0;
JsonRpcRequest request;
request.SetMethod("message::Provide");
2012-04-20 13:49:04 +02:00
SubscriptionMessage subscriptionMessage;
subscriptionMessage.SetMethod(ea.Method);
request.SetParams(subscriptionMessage);
2012-04-18 15:22:25 +02:00
SendMulticastRequest(sender, request);
return 0;
}
void EndpointManager::ForeachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
2012-04-18 15:22:25 +02:00
{
NewEndpointEventArgs neea;
neea.Source = shared_from_this();
2012-04-18 15:22:25 +02:00
for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
neea.Endpoint = *i;
callback(neea);
}
}