icinga2/components/discovery/discoverycomponent.cpp

432 lines
13 KiB
C++
Raw Normal View History

#include "i2-discovery.h"
using namespace icinga;
string DiscoveryComponent::GetName(void) const
{
return "discoverycomponent";
}
void DiscoveryComponent::Start(void)
{
m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
long isBroker = 0;
GetConfig()->GetPropertyInteger("broker", &isBroker);
m_Broker = (isBroker != 0);
if (IsBroker()) {
m_DiscoveryEndpoint->RegisterMethodSource("discovery::NewComponent");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::RegisterComponent",
bind_weak(&DiscoveryComponent::RegisterComponentMessageHandler, shared_from_this()));
}
m_DiscoveryEndpoint->RegisterMethodSource("discovery::RegisterComponent");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::NewComponent",
bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome",
bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
2012-05-08 09:20:42 +02:00
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
GetEndpointManager()->OnNewEndpoint += bind_weak(&DiscoveryComponent::NewEndpointHandler, shared_from_this());
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
2012-05-07 13:48:17 +02:00
2012-05-08 09:20:42 +02:00
m_DiscoveryTimer = make_shared<Timer>();
m_DiscoveryTimer->SetInterval(30);
m_DiscoveryTimer->OnTimerExpired += bind_weak(&DiscoveryComponent::DiscoveryTimerHandler, shared_from_this());
m_DiscoveryTimer->Start();
}
void DiscoveryComponent::Stop(void)
{
EndpointManager::Ptr mgr = GetEndpointManager();
if (mgr)
mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
}
int DiscoveryComponent::CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea)
{
if (endpoint == neea.Endpoint)
return 0;
if (!neea.Endpoint->IsConnected())
return 0;
if (endpoint->GetIdentity() == neea.Endpoint->GetIdentity()) {
Application::Log("Detected duplicate identity (" + endpoint->GetIdentity() + " - Disconnecting old endpoint.");
neea.Endpoint->Stop();
GetEndpointManager()->UnregisterEndpoint(neea.Endpoint);
}
return 0;
}
int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
{
neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
2012-05-07 13:48:17 +02:00
if (IsBroker()) {
/* accept discovery::RegisterComponent messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent");
}
2012-05-08 09:20:42 +02:00
/* accept discovery::Welcome messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::Welcome");
return 0;
}
int DiscoveryComponent::DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const
{
info->SubscribedMethods.insert(nmea.Method);
return 0;
}
int DiscoveryComponent::DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const
{
info->PublishedMethods.insert(nmea.Method);
return 0;
}
int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
{
2012-05-08 09:20:42 +02:00
neea.Endpoint->ForEachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info));
neea.Endpoint->ForEachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
return 0;
}
bool DiscoveryComponent::GetComponentDiscoveryInfo(string component, ComponentDiscoveryInfo::Ptr *info) const
{
if (component == GetEndpointManager()->GetIdentity()) {
/* Build fake discovery info for ourselves */
*info = make_shared<ComponentDiscoveryInfo>();
2012-05-08 09:20:42 +02:00
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _1, *info));
2012-05-08 09:20:42 +02:00
(*info)->LastSeen = 0;
(*info)->Node = GetIcingaApplication()->GetNode();
(*info)->Service = GetIcingaApplication()->GetService();
return true;
}
map<string, ComponentDiscoveryInfo::Ptr>::const_iterator i;
i = m_Components.find(component);
if (i == m_Components.end())
return false;
*info = i->second;
return true;
}
bool DiscoveryComponent::IsBroker(void) const
{
return m_Broker;
}
int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
{
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
string identity = endpoint->GetIdentity();
2012-05-07 13:48:17 +02:00
if (!GetIcingaApplication()->IsDebugging()) {
if (identity == GetEndpointManager()->GetIdentity()) {
Application::Log("Detected loop-back connection - Disconnecting endpoint.");
2012-05-07 13:48:17 +02:00
endpoint->Stop();
GetEndpointManager()->UnregisterEndpoint(endpoint);
2012-05-07 13:48:17 +02:00
return 0;
}
2012-05-08 09:20:42 +02:00
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _1));
2012-05-07 13:48:17 +02:00
}
2012-05-08 09:20:42 +02:00
ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
if (brokerCollection->GetObject(identity)) {
/* accept discovery::NewComponent messages from brokers */
endpoint->RegisterMethodSource("discovery::NewComponent");
}
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
endpoint->RegisterMethodSink("discovery::RegisterComponent");
// send a discovery::RegisterComponent message, if the
// other component is a broker this makes sure
// the broker knows about our message types
SendDiscoveryMessage("discovery::RegisterComponent", GetEndpointManager()->GetIdentity(), endpoint);
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
if (IsBroker()) {
// we assume the other component _always_ wants
// discovery::NewComponent messages from us
endpoint->RegisterMethodSink("discovery::NewComponent");
// send discovery::NewComponent message for ourselves
SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint);
// send discovery::NewComponent messages for all components
// we know about
for (i = m_Components.begin(); i != m_Components.end(); i++) {
SendDiscoveryMessage("discovery::NewComponent", i->first, endpoint);
}
}
// check if we already know the other component
i = m_Components.find(endpoint->GetIdentity());
if (i == m_Components.end()) {
// we don't know the other component yet, so
// wait until we get a discovery::NewComponent message
// from a broker
return 0;
}
FinishDiscoverySetup(endpoint);
return 0;
}
int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& nrea)
{
Endpoint::Ptr endpoint = nrea.Sender;
2012-05-08 10:13:15 +02:00
if (endpoint->GetReceivedWelcome())
return 0;
2012-05-08 10:13:15 +02:00
endpoint->SetReceivedWelcome(true);
2012-05-08 10:13:15 +02:00
if (endpoint->GetSentWelcome()) {
EventArgs ea;
ea.Source = shared_from_this();
endpoint->OnSessionEstablished(ea);
}
return 0;
}
void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
{
2012-05-08 10:13:15 +02:00
if (endpoint->GetSentWelcome())
return;
// we assume the other component _always_ wants
// discovery::Welcome messages from us
endpoint->RegisterMethodSink("discovery::Welcome");
JsonRpcRequest request;
request.SetMethod("discovery::Welcome");
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request);
2012-05-08 10:13:15 +02:00
endpoint->SetSentWelcome(true);
ComponentDiscoveryInfo::Ptr info;
if (GetComponentDiscoveryInfo(endpoint->GetIdentity(), &info)) {
set<string>::iterator i;
for (i = info->PublishedMethods.begin(); i != info->PublishedMethods.end(); i++)
endpoint->RegisterMethodSource(*i);
for (i = info->SubscribedMethods.begin(); i != info->SubscribedMethods.end(); i++)
endpoint->RegisterMethodSink(*i);
}
2012-05-08 10:13:15 +02:00
if (endpoint->GetReceivedWelcome()) {
EventArgs ea;
ea.Source = shared_from_this();
endpoint->OnSessionEstablished(ea);
}
}
void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
{
JsonRpcRequest request;
request.SetMethod(method);
DiscoveryMessage params;
request.SetParams(params);
params.SetIdentity(identity);
Message subscriptions;
params.SetSubscribes(subscriptions);
Message publications;
params.SetProvides(publications);
ComponentDiscoveryInfo::Ptr info;
if (!GetComponentDiscoveryInfo(identity, &info))
return;
if (!info->Node.empty() && !info->Service.empty()) {
2012-05-07 13:48:17 +02:00
params.SetNode(info->Node);
params.SetService(info->Service);
}
set<string>::iterator i;
for (i = info->PublishedMethods.begin(); i != info->PublishedMethods.end(); i++)
publications.AddUnnamedPropertyString(*i);
for (i = info->SubscribedMethods.begin(); i != info->SubscribedMethods.end(); i++)
subscriptions.AddUnnamedPropertyString(*i);
if (recipient)
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, recipient, request);
else
GetEndpointManager()->SendMulticastRequest(m_DiscoveryEndpoint, request);
}
void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessage message)
{
2012-05-08 09:20:42 +02:00
/* ignore discovery messages that are about ourselves */
if (identity == GetEndpointManager()->GetIdentity())
return;
ComponentDiscoveryInfo::Ptr info = make_shared<ComponentDiscoveryInfo>();
2012-05-08 09:20:42 +02:00
time(&(info->LastSeen));
2012-05-07 13:48:17 +02:00
message.GetNode(&info->Node);
message.GetService(&info->Service);
2012-05-07 13:48:17 +02:00
Message provides;
if (message.GetProvides(&provides)) {
DictionaryIterator i;
for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) {
2012-05-08 09:20:42 +02:00
if (IsBroker()) {
/* TODO: Add authorisation checks here */
}
2012-05-07 13:48:17 +02:00
info->PublishedMethods.insert(i->second);
}
}
2012-05-07 13:48:17 +02:00
Message subscribes;
if (message.GetSubscribes(&subscribes)) {
DictionaryIterator i;
for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) {
2012-05-08 09:20:42 +02:00
if (IsBroker()) {
/* TODO: Add authorisation checks here */
}
2012-05-07 13:48:17 +02:00
info->SubscribedMethods.insert(i->second);
}
}
2012-05-07 13:48:17 +02:00
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
2012-05-07 13:48:17 +02:00
i = m_Components.find(identity);
2012-05-07 13:48:17 +02:00
if (i != m_Components.end())
m_Components.erase(i);
2012-05-07 13:48:17 +02:00
m_Components[identity] = info;
if (IsBroker())
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
if (endpoint)
FinishDiscoverySetup(endpoint);
}
2012-05-07 13:48:17 +02:00
int DiscoveryComponent::NewComponentMessageHandler(const NewRequestEventArgs& nrea)
{
2012-05-07 13:48:17 +02:00
DiscoveryMessage message;
nrea.Request.GetParams(&message);
2012-05-07 13:48:17 +02:00
string identity;
if (!message.GetIdentity(&identity))
return 0;
2012-05-07 13:48:17 +02:00
ProcessDiscoveryMessage(identity, message);
return 0;
}
2012-05-07 13:48:17 +02:00
int DiscoveryComponent::RegisterComponentMessageHandler(const NewRequestEventArgs& nrea)
{
2012-05-08 09:20:42 +02:00
/* ignore discovery::RegisterComponent messages when we're not a broker */
if (!IsBroker())
return 0;
2012-05-07 13:48:17 +02:00
DiscoveryMessage message;
nrea.Request.GetParams(&message);
ProcessDiscoveryMessage(nrea.Sender->GetIdentity(), message);
2012-05-08 09:20:42 +02:00
2012-05-07 13:48:17 +02:00
return 0;
}
2012-05-08 09:20:42 +02:00
int DiscoveryComponent::BrokerConfigHandler(const EventArgs& ea)
2012-05-07 13:48:17 +02:00
{
2012-05-08 09:20:42 +02:00
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
2012-05-07 13:48:17 +02:00
EndpointManager::Ptr endpointManager = GetEndpointManager();
2012-05-08 09:20:42 +02:00
/* Check if we're already connected to this broker. */
if (endpointManager->GetEndpointByIdentity(object->GetName()))
return 0;
string node;
if (!object->GetPropertyString("node", &node))
throw InvalidArgumentException("'node' property required for 'broker' config object.");
string service;
if (!object->GetPropertyString("service", &service))
throw InvalidArgumentException("'service' property required for 'broker' config object.");
/* reconnect to this broker */
endpointManager->AddConnection(node, service);
return 0;
}
int DiscoveryComponent::DiscoveryTimerHandler(const TimerEventArgs& tea)
{
EndpointManager::Ptr endpointManager = GetEndpointManager();
time_t now;
time(&now);
ConfigCollection::Ptr brokerCollection = GetApplication()->GetConfigHive()->GetCollection("broker");
brokerCollection->ForEachObject(bind(&DiscoveryComponent::BrokerConfigHandler, this, _1));
2012-05-07 13:48:17 +02:00
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
2012-05-08 09:20:42 +02:00
for (i = m_Components.begin(); i != m_Components.end(); ) {
string identity = i->first;
ComponentDiscoveryInfo::Ptr info = i->second;
if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
/* unregister this component if its registration has expired */
i = m_Components.erase(i);
2012-05-07 13:48:17 +02:00
continue;
2012-05-08 09:20:42 +02:00
}
2012-05-08 09:20:42 +02:00
if (IsBroker()) {
/* send discovery message to all connected components to
refresh their TTL for this component */
SendDiscoveryMessage("discovery::NewComponent", i->first, Endpoint::Ptr());
}
Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
2012-05-08 10:22:47 +02:00
if (endpoint && endpoint->IsConnected()) {
2012-05-08 09:20:42 +02:00
/* update LastSeen if we're still connected to this endpoint */
info->LastSeen = now;
} else {
2012-05-08 10:13:15 +02:00
/* TODO: figure out whether we actually want to connect to this component (_always_ if IsBroker() == true) */
2012-05-08 09:20:42 +02:00
/* try and reconnect to this component */
endpointManager->AddConnection(info->Node, info->Service);
}
i++;
2012-05-07 13:48:17 +02:00
}
2012-05-07 13:48:17 +02:00
return 0;
}
EXPORT_COMPONENT(DiscoveryComponent);