icinga2/components/discovery/discoverycomponent.cpp

529 lines
17 KiB
C++
Raw Normal View History

/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
2012-05-11 13:33:57 +02:00
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-discovery.h"
using namespace icinga;
/**
* Returns the name of this component.
*
* @returns The name.
*/
string DiscoveryComponent::GetName(void) const
{
return "discoverycomponent";
}
/**
* Starts the discovery component.
*/
void DiscoveryComponent::Start(void)
{
2012-06-15 19:32:41 +02:00
m_DiscoveryEndpoint = boost::make_shared<VirtualEndpoint>();
m_DiscoveryEndpoint->RegisterPublication("discovery::RegisterComponent");
m_DiscoveryEndpoint->RegisterTopicHandler("discovery::RegisterComponent",
2012-06-16 03:42:54 +02:00
boost::bind(&DiscoveryComponent::RegisterComponentMessageHandler, this, _2, _3));
m_DiscoveryEndpoint->RegisterPublication("discovery::NewComponent");
m_DiscoveryEndpoint->RegisterTopicHandler("discovery::NewComponent",
2012-06-16 03:42:54 +02:00
boost::bind(&DiscoveryComponent::NewComponentMessageHandler, this, _3));
m_DiscoveryEndpoint->RegisterTopicHandler("discovery::Welcome",
2012-06-16 03:42:54 +02:00
boost::bind(&DiscoveryComponent::WelcomeMessageHandler, this, _2, _3));
2012-06-16 03:42:54 +02:00
GetEndpointManager()->ForEachEndpoint(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
GetEndpointManager()->OnNewEndpoint.connect(boost::bind(&DiscoveryComponent::NewEndpointHandler, this, _2));
GetEndpointManager()->RegisterEndpoint(m_DiscoveryEndpoint);
2012-05-07 13:48:17 +02:00
/* create the reconnect timer */
2012-06-15 19:32:41 +02:00
m_DiscoveryTimer = boost::make_shared<Timer>();
2012-05-08 09:20:42 +02:00
m_DiscoveryTimer->SetInterval(30);
2012-06-15 19:32:41 +02:00
m_DiscoveryTimer->OnTimerExpired.connect(boost::bind(&DiscoveryComponent::DiscoveryTimerHandler, this));
2012-05-08 09:20:42 +02:00
m_DiscoveryTimer->Start();
/* call the timer as soon as possible */
m_DiscoveryTimer->Reschedule(0);
}
/**
* Stops the discovery component.
*/
void DiscoveryComponent::Stop(void)
{
EndpointManager::Ptr mgr = GetEndpointManager();
if (mgr)
mgr->UnregisterEndpoint(m_DiscoveryEndpoint);
}
/**
* Checks whether the specified endpoint is already connected
* and disconnects older endpoints.
*
2012-06-16 03:42:54 +02:00
* @param self The endpoint that is to be checked.
* @param other The other endpoint.
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const Endpoint::Ptr& other)
{
2012-06-16 03:42:54 +02:00
if (self == other)
2012-06-15 19:32:41 +02:00
return;
2012-06-16 03:42:54 +02:00
if (!other->IsConnected())
2012-06-15 19:32:41 +02:00
return;
2012-06-16 03:42:54 +02:00
if (self->GetIdentity() == other->GetIdentity()) {
Application::Log(LogWarning, "discovery", "Detected duplicate identity:" + other->GetIdentity() + " - Disconnecting old endpoint.");
2012-06-16 03:42:54 +02:00
other->Stop();
GetEndpointManager()->UnregisterEndpoint(other);
}
}
/**
* Deals with a new endpoint.
*
2012-06-16 03:42:54 +02:00
* @param endpoint The endpoint.
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint)
{
2012-06-21 17:39:16 +02:00
/* immediately finish session setup for local endpoints */
if (endpoint->IsLocal()) {
endpoint->OnSessionEstablished(endpoint);
2012-06-21 12:51:50 +02:00
return;
2012-06-21 17:39:16 +02:00
}
2012-06-21 12:51:50 +02:00
/* accept discovery::RegisterComponent messages from any endpoint */
2012-06-16 03:42:54 +02:00
endpoint->RegisterPublication("discovery::RegisterComponent");
2012-05-07 13:48:17 +02:00
2012-05-08 09:20:42 +02:00
/* accept discovery::Welcome messages from any endpoint */
2012-06-16 03:42:54 +02:00
endpoint->RegisterPublication("discovery::Welcome");
string identity = endpoint->GetIdentity();
if (identity == GetEndpointManager()->GetIdentity()) {
2012-06-15 19:32:41 +02:00
Application::Log(LogWarning, "discovery", "Detected loop-back connection - Disconnecting endpoint.");
endpoint->Stop();
GetEndpointManager()->UnregisterEndpoint(endpoint);
2012-06-15 19:32:41 +02:00
return;
2012-05-07 13:48:17 +02:00
}
2012-06-16 03:42:54 +02:00
GetEndpointManager()->ForEachEndpoint(boost::bind(&DiscoveryComponent::CheckExistingEndpoint, this, endpoint, _2));
// we assume the other component _always_ wants
// discovery::RegisterComponent messages from us
endpoint->RegisterSubscription("discovery::RegisterComponent");
// send a discovery::RegisterComponent message, if the
// other component is a broker this makes sure
// the broker knows about our message types
SendDiscoveryMessage("discovery::RegisterComponent", GetEndpointManager()->GetIdentity(), endpoint);
2012-05-09 12:34:11 +02:00
map<string, ComponentDiscoveryInfo::Ptr>::iterator ic;
2012-05-10 12:12:48 +02:00
// we assume the other component _always_ wants
// discovery::NewComponent messages from us
endpoint->RegisterSubscription("discovery::NewComponent");
2012-05-10 12:12:48 +02:00
// send discovery::NewComponent message for ourselves
SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint);
2012-05-10 12:12:48 +02:00
// send discovery::NewComponent messages for all components
// we know about
for (ic = m_Components.begin(); ic != m_Components.end(); ic++) {
SendDiscoveryMessage("discovery::NewComponent", ic->first, endpoint);
}
// check if we already know the other component
2012-05-09 12:34:11 +02:00
ic = m_Components.find(endpoint->GetIdentity());
2012-05-09 12:34:11 +02:00
if (ic == m_Components.end()) {
// we don't know the other component yet, so
// wait until we get a discovery::NewComponent message
// from a broker
2012-06-15 19:32:41 +02:00
return;
}
// register published/subscribed topics for this endpoint
2012-05-09 12:34:11 +02:00
ComponentDiscoveryInfo::Ptr info = ic->second;
set<string>::iterator it;
for (it = info->Publications.begin(); it != info->Publications.end(); it++)
endpoint->RegisterPublication(*it);
2012-05-09 12:34:11 +02:00
for (it = info->Subscriptions.begin(); it != info->Subscriptions.end(); it++)
endpoint->RegisterSubscription(*it);
2012-05-09 12:34:11 +02:00
FinishDiscoverySetup(endpoint);
}
/**
* Registers message Subscriptions/sources in the specified component information object.
*
* @param neea Event arguments for the endpoint.
* @param info Component information object.
* @return 0
*/
void DiscoveryComponent::DiscoveryEndpointHandler(const Endpoint::Ptr& endpoint, const ComponentDiscoveryInfo::Ptr& info) const
{
Endpoint::ConstTopicIterator i;
for (i = endpoint->BeginSubscriptions(); i != endpoint->EndSubscriptions(); i++)
info->Subscriptions.insert(*i);
for (i = endpoint->BeginPublications(); i != endpoint->EndPublications(); i++)
info->Publications.insert(*i);
}
/**
* Retrieves the component information object for the specified component.
*
* @param component The identity of the component.
* @param info Pointer to the information object.
* @returns true if the info object was successfully retrieved, false otherwise.
*/
bool DiscoveryComponent::GetComponentDiscoveryInfo(string component, ComponentDiscoveryInfo::Ptr *info) const
{
if (component == GetEndpointManager()->GetIdentity()) {
/* Build fake discovery info for ourselves */
*info = boost::make_shared<ComponentDiscoveryInfo>();
GetEndpointManager()->ForEachEndpoint(boost::bind(&DiscoveryComponent::DiscoveryEndpointHandler, this, _2, *info));
(*info)->LastSeen = 0;
(*info)->Node = GetIcingaApplication()->GetNode();
(*info)->Service = GetIcingaApplication()->GetService();
return true;
}
map<string, ComponentDiscoveryInfo::Ptr>::const_iterator i;
i = m_Components.find(component);
if (i == m_Components.end())
return false;
*info = i->second;
return true;
}
/**
* Processes discovery::Welcome messages.
*
* @param nrea Event arguments for the request.
* @returns 0
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::WelcomeMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
2012-06-16 03:42:54 +02:00
if (sender->HasReceivedWelcome())
2012-06-15 19:32:41 +02:00
return;
2012-06-16 03:42:54 +02:00
sender->SetReceivedWelcome(true);
2012-06-16 03:42:54 +02:00
if (sender->HasSentWelcome())
sender->OnSessionEstablished(sender);
}
/**
* Finishes the welcome handshake for a new component
* by registering message Subscriptions/sources for the component
* and sending a welcome message if necessary.
*
* @param endpoint The endpoint to set up.
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::FinishDiscoverySetup(const Endpoint::Ptr& endpoint)
{
if (endpoint->HasSentWelcome())
return;
// we assume the other component _always_ wants
// discovery::Welcome messages from us
endpoint->RegisterSubscription("discovery::Welcome");
RequestMessage request;
request.SetMethod("discovery::Welcome");
GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, endpoint, request);
2012-05-08 10:13:15 +02:00
endpoint->SetSentWelcome(true);
2012-06-16 03:42:54 +02:00
if (endpoint->HasReceivedWelcome())
endpoint->OnSessionEstablished(endpoint);
}
/**
* Sends a discovery message for the specified identity using the
* specified message type.
*
* @param method The method to use for the message ("discovery::NewComponent" or "discovery::RegisterComponent").
* @param identity The identity of the component for which a message should be sent.
* @param recipient The recipient of the message. A multicast message is sent if this parameter is empty.
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::SendDiscoveryMessage(const string& method, const string& identity, const Endpoint::Ptr& recipient)
{
RequestMessage request;
request.SetMethod(method);
DiscoveryMessage params;
request.SetParams(params);
params.SetIdentity(identity);
ComponentDiscoveryInfo::Ptr info;
if (!GetComponentDiscoveryInfo(identity, &info))
return;
if (!info->Node.empty() && !info->Service.empty()) {
2012-05-07 13:48:17 +02:00
params.SetNode(info->Node);
params.SetService(info->Service);
}
set<string>::iterator i;
2012-05-16 11:30:54 +02:00
MessagePart subscriptions;
for (i = info->Subscriptions.begin(); i != info->Subscriptions.end(); i++)
subscriptions.AddUnnamedProperty(*i);
params.SetSubscriptions(subscriptions);
MessagePart publications;
for (i = info->Publications.begin(); i != info->Publications.end(); i++)
2012-05-16 11:30:54 +02:00
publications.AddUnnamedProperty(*i);
2012-05-16 11:30:54 +02:00
params.SetPublications(publications);
if (recipient)
GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, recipient, request);
else
GetEndpointManager()->SendMulticastMessage(m_DiscoveryEndpoint, request);
}
2012-06-16 03:42:54 +02:00
bool DiscoveryComponent::HasMessagePermission(const Dictionary::Ptr& roles, const string& messageType, const string& message)
{
if (!roles)
return false;
ConfigObject::TMap::Range range = ConfigObject::GetObjects("role");
for (ConfigObject::TMap::Iterator ip = range.first; ip != range.second; ip++) {
ConfigObject::Ptr role = ip->second;
2012-05-17 19:14:03 +02:00
Object::Ptr object;
if (!role->GetProperty(messageType, &object))
continue;
2012-05-17 19:14:03 +02:00
Dictionary::Ptr permissions = dynamic_pointer_cast<Dictionary>(object);
if (!permissions)
2012-05-26 21:30:04 +02:00
throw runtime_error("Object is not a dictionary.");
2012-05-17 19:14:03 +02:00
for (DictionaryIterator is = permissions->Begin(); is != permissions->End(); is++) {
if (Utility::Match(is->second.GetString(), message))
return true;
}
}
return false;
}
/**
* Processes a discovery message by registering the component in the
* discovery component registry.
*
* @param identity The authorative identity of the component.
* @param message The discovery message.
* @param trusted Whether the message comes from a trusted source (i.e. a broker).
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::ProcessDiscoveryMessage(const string& identity, const DiscoveryMessage& message, bool trusted)
{
2012-05-08 09:20:42 +02:00
/* ignore discovery messages that are about ourselves */
if (identity == GetEndpointManager()->GetIdentity())
return;
2012-06-15 19:32:41 +02:00
ComponentDiscoveryInfo::Ptr info = boost::make_shared<ComponentDiscoveryInfo>();
2012-05-08 09:20:42 +02:00
time(&(info->LastSeen));
2012-06-21 12:51:50 +02:00
string node;
if (message.GetNode(&node) && !node.empty())
info->Node = node;
string service;
if (message.GetService(&service) && !service.empty())
info->Service = service;
ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity);
Dictionary::Ptr roles;
2012-05-17 19:14:03 +02:00
if (endpointConfig) {
Object::Ptr object;
if (endpointConfig->GetProperty("roles", &object)) {
roles = dynamic_pointer_cast<Dictionary>(object);
if (!roles)
2012-05-26 21:30:04 +02:00
throw runtime_error("Object is not a dictionary.");
2012-05-17 19:14:03 +02:00
}
}
2012-05-09 12:34:11 +02:00
Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
2012-05-16 11:30:54 +02:00
MessagePart publications;
if (message.GetPublications(&publications)) {
2012-05-07 13:48:17 +02:00
DictionaryIterator i;
2012-05-16 11:30:54 +02:00
for (i = publications.Begin(); i != publications.End(); i++) {
if (trusted || HasMessagePermission(roles, "publications", i->second)) {
info->Publications.insert(i->second);
2012-05-09 12:34:11 +02:00
if (endpoint)
endpoint->RegisterPublication(i->second);
2012-05-09 12:34:11 +02:00
}
2012-05-07 13:48:17 +02:00
}
}
2012-05-16 11:30:54 +02:00
MessagePart subscriptions;
if (message.GetSubscriptions(&subscriptions)) {
2012-05-07 13:48:17 +02:00
DictionaryIterator i;
2012-05-16 11:30:54 +02:00
for (i = subscriptions.Begin(); i != subscriptions.End(); i++) {
if (trusted || HasMessagePermission(roles, "subscriptions", i->second)) {
info->Subscriptions.insert(i->second);
2012-05-09 12:34:11 +02:00
if (endpoint)
endpoint->RegisterSubscription(i->second);
2012-05-09 12:34:11 +02:00
}
2012-05-07 13:48:17 +02:00
}
}
2012-05-07 13:48:17 +02:00
map<string, ComponentDiscoveryInfo::Ptr>::iterator i;
i = m_Components.find(identity);
2012-05-07 13:48:17 +02:00
if (i != m_Components.end())
m_Components.erase(i);
2012-05-07 13:48:17 +02:00
m_Components[identity] = info;
2012-05-10 12:12:48 +02:00
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
2012-06-21 17:39:16 +02:00
/* don't send a welcome message for discovery::NewComponent messages */
if (endpoint && !trusted)
FinishDiscoverySetup(endpoint);
}
/**
* Processes "discovery::NewComponent" messages.
*
* @param nrea Event arguments for the request.
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::NewComponentMessageHandler(const RequestMessage& request)
{
2012-05-07 13:48:17 +02:00
DiscoveryMessage message;
2012-06-16 03:42:54 +02:00
request.GetParams(&message);
2012-05-07 13:48:17 +02:00
string identity;
if (!message.GetIdentity(&identity))
2012-06-15 19:32:41 +02:00
return;
ProcessDiscoveryMessage(identity, message, true);
}
/**
* Processes "discovery::RegisterComponent" messages.
*
* @param nrea Event arguments for the request.
*/
2012-06-16 03:42:54 +02:00
void DiscoveryComponent::RegisterComponentMessageHandler(const Endpoint::Ptr& sender, const RequestMessage& request)
{
2012-05-07 13:48:17 +02:00
DiscoveryMessage message;
2012-06-16 03:42:54 +02:00
request.GetParams(&message);
ProcessDiscoveryMessage(sender->GetIdentity(), message, false);
2012-05-07 13:48:17 +02:00
}
/**
* Checks whether we have to reconnect to other components and removes stale
* components from the registry.
*/
2012-06-15 19:32:41 +02:00
void DiscoveryComponent::DiscoveryTimerHandler(void)
2012-05-08 09:20:42 +02:00
{
EndpointManager::Ptr endpointManager = GetEndpointManager();
time_t now;
time(&now);
/* check whether we have to reconnect to one of our upstream endpoints */
ConfigObject::TMap::Range range = ConfigObject::GetObjects("endpoint");
for (ConfigObject::TMap::Iterator it = range.first; it != range.second; it++) {
ConfigObject::Ptr object = it->second;
/* Check if we're already connected to this endpoint. */
if (endpointManager->GetEndpointByIdentity(object->GetName()))
continue;
string node, service;
if (object->GetProperty("node", &node) && object->GetProperty("service", &service)) {
/* reconnect to this endpoint */
endpointManager->AddConnection(node, service);
}
}
2012-05-08 09:20:42 +02:00
2012-05-09 13:49:26 +02:00
map<string, ComponentDiscoveryInfo::Ptr>::iterator curr, i;
2012-05-08 09:20:42 +02:00
for (i = m_Components.begin(); i != m_Components.end(); ) {
string identity = i->first;
ComponentDiscoveryInfo::Ptr info = i->second;
2012-06-21 12:51:50 +02:00
/* there's no need to reconnect to ourself */
if (identity == GetEndpointManager()->GetIdentity())
continue;
2012-06-21 17:39:16 +02:00
/* for explicitly-configured upstream endpoints
* we prefer to use the node/service from the
* config object - which is what the for loop above does */
if (ConfigObject::GetObject("endpoint", identity))
continue;
2012-05-09 13:49:26 +02:00
curr = i;
i++;
2012-05-08 09:20:42 +02:00
if (info->LastSeen < now - DiscoveryComponent::RegistrationTTL) {
/* unregister this component if its registration has expired */
2012-05-09 13:49:26 +02:00
m_Components.erase(curr);
2012-05-07 13:48:17 +02:00
continue;
2012-05-08 09:20:42 +02:00
}
2012-05-10 12:12:48 +02:00
/* send discovery message to all connected components to
refresh their TTL for this component */
SendDiscoveryMessage("discovery::NewComponent", identity, Endpoint::Ptr());
2012-05-08 09:20:42 +02:00
Endpoint::Ptr endpoint = endpointManager->GetEndpointByIdentity(identity);
2012-05-08 10:22:47 +02:00
if (endpoint && endpoint->IsConnected()) {
2012-05-08 09:20:42 +02:00
/* update LastSeen if we're still connected to this endpoint */
info->LastSeen = now;
} else {
2012-05-10 12:12:48 +02:00
/* TODO: figure out whether we actually want to connect to this component */
2012-05-08 09:20:42 +02:00
/* try and reconnect to this component */
2012-06-21 12:51:50 +02:00
try {
if (!info->Node.empty() && !info->Service.empty())
endpointManager->AddConnection(info->Node, info->Service);
} catch (const std::exception& ex) {
stringstream msgbuf;
msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();;
Application::Log(LogInformation, "discovery", msgbuf.str());
}
2012-05-08 09:20:42 +02:00
}
2012-05-07 13:48:17 +02:00
}
}
2012-05-12 16:12:26 +02:00
EXPORT_COMPONENT(discovery, DiscoveryComponent);