Implement loop detection for cluster links.

Refs #5467
This commit is contained in:
Gunnar Beutner 2014-04-25 14:33:45 +02:00
parent 83fd836c7b
commit e6fb8caace
12 changed files with 345 additions and 67 deletions

View File

@ -20,7 +20,7 @@ mkclass_target(clusterlistener.ti clusterlistener.th)
mkembedconfig_target(cluster-type.conf cluster-type.cpp)
add_library(cluster SHARED
clusterchecktask.cpp clusterlistener.cpp clusterlistener.th
clusterchecktask.cpp clusterlink.cpp clusterlistener.cpp clusterlistener.th
cluster-type.cpp
)

View File

@ -0,0 +1,69 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "cluster/clusterlink.h"
using namespace icinga;
ClusterLink::ClusterLink(const String& from, const String& to)
{
if (from < to) {
From = from;
To = to;
} else {
From = to;
To = from;
}
}
int ClusterLink::GetMetric(void) const
{
int metric = 0;
Endpoint::Ptr fromEp = Endpoint::GetByName(From);
if (fromEp)
metric += fromEp->GetMetric();
Endpoint::Ptr toEp = Endpoint::GetByName(To);
if (toEp)
metric += toEp->GetMetric();
return metric;
}
bool ClusterLink::operator<(const ClusterLink& other) const
{
if (From < other.From)
return true;
else
return To < other.To;
}
bool ClusterLinkMetricLessComparer::operator()(const ClusterLink& a, const ClusterLink& b) const
{
int metricA = a.GetMetric();
int metricB = b.GetMetric();
if (metricA < metricB)
return true;
else if (metricB > metricA)
return false;
else
return a < b;
}

View File

@ -0,0 +1,49 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef CLUSTERLINK_H
#define CLUSTERLINK_H
#include "remote/endpoint.h"
namespace icinga
{
/**
* @ingroup cluster
*/
struct ClusterLink
{
String From;
String To;
ClusterLink(const String& from, const String& to);
int GetMetric(void) const;
bool operator<(const ClusterLink& other) const;
};
struct ClusterLinkMetricLessComparer
{
bool operator()(const ClusterLink& a, const ClusterLink& b) const;
};
}
#endif /* CLUSTERLINK_H */

View File

@ -222,9 +222,9 @@ void ClusterListener::AddConnection(const String& node, const String& service) {
Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient));
}
void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
{
m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent));
m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, destination, message, persistent));
}
void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
@ -256,7 +256,7 @@ void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictiona
}
}
void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
{
double ts = Utility::GetTime();
message->Set("ts", ts);
@ -288,8 +288,16 @@ void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary
privs = security->Get("privs");
}
double now = Utility::GetTime();
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (!persistent && !endpoint->IsConnected())
if (!endpoint->IsConnected())
continue;
if (destination && endpoint != destination)
continue;
if (!destination && endpoint->GetBlockedUntil() > now)
continue;
if (endpoint == source)
@ -532,8 +540,6 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert);
Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
if (!endpoint) {
@ -542,6 +548,13 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
return;
}
if (endpoint->GetClient()) {
tlsStream->Close();
return;
}
Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
{
ObjectLock olock(endpoint);
@ -593,26 +606,109 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
ReplayLog(endpoint, tlsStream);
}
void ClusterListener::UpdateLinks(void)
{
ObjectLock olock(this);
/* build a set of potential routes */
std::set<ClusterLink> links;
std::pair<String, EndpointPeerInfo> kv;
BOOST_FOREACH(kv, m_VisibleEndpoints) {
String endpoint = kv.first;
const EndpointPeerInfo& epi = kv.second;
if (GetIdentity() == endpoint)
continue;
if (epi.Seen > Utility::GetTime() - 30)
links.insert(ClusterLink(GetIdentity(), endpoint));
if (!epi.Peers)
continue;
ObjectLock olock(epi.Peers);
BOOST_FOREACH(const String& peer, epi.Peers)
links.insert(ClusterLink(endpoint, peer));
}
olock.Unlock();
/* sort the routes by metric */
std::vector<ClusterLink> sortedLinks;
std::copy(links.begin(), links.end(), std::back_inserter(sortedLinks));
std::sort(sortedLinks.begin(), sortedLinks.end(), ClusterLinkMetricLessComparer());
/* pick routes */
std::set<String> visitedEndpoints;
BOOST_FOREACH(const ClusterLink& link, sortedLinks) {
Endpoint::Ptr other;
if (link.From == GetIdentity())
other = Endpoint::GetByName(link.To);
else if (link.To == GetIdentity())
other = Endpoint::GetByName(link.From);
if (visitedEndpoints.find(link.From) != visitedEndpoints.end() &&
visitedEndpoints.find(link.To) != visitedEndpoints.end()) {
if (other) {
Log(LogInformation, "cluster", "Blocking link to '" + other->GetName() + "'");
Dictionary::Ptr message = make_shared<Dictionary>();
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::BlockLink");
message->Set("params", make_shared<Dictionary>());
AsyncRelayMessage(Endpoint::Ptr(), other, message, false);
}
continue;
}
visitedEndpoints.insert(link.From);
visitedEndpoints.insert(link.To);
}
}
void ClusterListener::ClusterTimerHandler(void)
{
/* broadcast a heartbeat message */
Dictionary::Ptr params = make_shared<Dictionary>();
params->Set("identity", GetIdentity());
/* Update endpoint routes */
UpdateLinks();
/* Eww. */
Dictionary::Ptr features = make_shared<Dictionary>();
features->Set("checker", SupportsChecks());
features->Set("notification", SupportsNotifications());
params->Set("features", features);
Dictionary::Ptr message = make_shared<Dictionary>();
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::HeartBeat");
message->Set("params", params);
/* broadcast a heartbeat message */
BOOST_FOREACH(const Endpoint::Ptr& destination, DynamicType::GetObjects<Endpoint>()) {
std::set<String> connected_endpoints;
Endpoint::GetByName(GetIdentity())->SetFeatures(features);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (endpoint->GetName() == GetIdentity())
continue;
AsyncRelayMessage(Endpoint::Ptr(), message, false);
if (!endpoint->IsConnected())
continue;
connected_endpoints.insert(endpoint->GetName());
}
Array::Ptr epnames = make_shared<Array>();
BOOST_FOREACH(const String& name, connected_endpoints)
epnames->Add(name);
Dictionary::Ptr params = make_shared<Dictionary>();
params->Set("identity", GetIdentity());
params->Set("features", features);
params->Set("connected_endpoints", epnames);
Dictionary::Ptr message = make_shared<Dictionary>();
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::HeartBeat");
message->Set("params", params);
Endpoint::GetByName(GetIdentity())->SetFeatures(features);
AsyncRelayMessage(Endpoint::Ptr(), destination, message, false);
}
{
ObjectLock olock(this);
@ -621,6 +717,8 @@ void ClusterListener::ClusterTimerHandler(void)
if (endpoint->GetSeen() > Utility::GetTime() - 60)
continue;
m_VisibleEndpoints.erase(endpoint->GetName());
Stream::Ptr client = endpoint->GetClient();
if (client) {
@ -727,7 +825,7 @@ void ClusterListener::CheckResultHandler(const Checkable::Ptr& checkable, const
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, double nextCheck, const String& authority)
@ -747,7 +845,7 @@ void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, d
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
@ -766,7 +864,7 @@ void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& no
SetSecurityInfo(message, notification->GetCheckable(), DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
@ -786,7 +884,7 @@ void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkab
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
@ -806,7 +904,7 @@ void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr&
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@ -826,7 +924,7 @@ void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& che
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@ -846,7 +944,7 @@ void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& ch
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@ -866,7 +964,7 @@ void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& ch
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
@ -886,7 +984,7 @@ void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkab
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
@ -906,7 +1004,7 @@ void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
@ -926,7 +1024,7 @@ void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, con
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
@ -946,7 +1044,7 @@ void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, cons
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
@ -966,7 +1064,7 @@ void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, co
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
@ -989,7 +1087,7 @@ void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& authority)
@ -1008,7 +1106,7 @@ void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checka
SetSecurityInfo(message, checkable, DomainPrivRead);
AsyncRelayMessage(Endpoint::Ptr(), message, true);
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
}
void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
@ -1045,9 +1143,27 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
sender->SendMessage(lmessage);
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
sender->SetRemoteLogPosition(message->Get("ts"));
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
ObjectLock olock(this);
const EndpointPeerInfo& epi = m_VisibleEndpoints[sender->GetName()];
if (epi.Peers) {
ObjectLock olock(epi.Peers);
BOOST_FOREACH(const String& epname, epi.Peers) {
if (epname == GetIdentity())
continue;
Endpoint::Ptr peer_endpoint = Endpoint::GetByName(epname);
if (!peer_endpoint)
continue;
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + peer_endpoint->GetName() + "' (via '" + sender->GetName() + "'): " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
peer_endpoint->SetRemoteLogPosition(message->Get("ts"));
}
}
}
}
@ -1059,6 +1175,14 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
String identity = params->Get("identity");
{
ObjectLock olock(this);
EndpointPeerInfo epi;
epi.Seen = Utility::GetTime();
epi.Peers = params->Get("connected_endpoints");
m_VisibleEndpoints[identity] = epi;
}
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
if (endpoint) {
@ -1066,7 +1190,10 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
endpoint->SetFeatures(params->Get("features"));
}
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, false);
} else if (message->Get("method") == "cluster::BlockLink") {
Log(LogDebug, "cluster", "Got cluster::BlockLink message. Blocking direct link for '" + sender->GetName() + "'");
sender->SetBlockedUntil(Utility::GetTime() + 30);
} else if (message->Get("method") == "cluster::CheckResult") {
if (!params)
return;
@ -1077,9 +1204,9 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
Checkable::Ptr checkable;
if (type == "Host")
checkable = DynamicObject::GetObject<Host>(chk);
checkable = Host::GetByName(chk);
else if (type == "Service")
checkable = DynamicObject::GetObject<Service>(chk);
checkable = Service::GetByName(chk);
else
return;
@ -1098,7 +1225,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->ProcessCheckResult(cr, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetNextCheck") {
if (!params)
return;
@ -1127,7 +1254,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetNextCheck(nextCheck, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetForceNextCheck") {
if (!params)
return;
@ -1156,7 +1283,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetForceNextCheck(forced, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetForceNextNotification") {
if (!params)
return;
@ -1185,7 +1312,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetForceNextNotification(forced, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
if (!params)
return;
@ -1214,7 +1341,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetEnableActiveChecks(enabled, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
if (!params)
return;
@ -1243,7 +1370,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetEnablePassiveChecks(enabled, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnableNotifications") {
if (!params)
return;
@ -1272,7 +1399,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetEnableNotifications(enabled, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetEnableFlapping") {
if (!params)
return;
@ -1301,7 +1428,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->SetEnableFlapping(enabled, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetNextNotification") {
if (!params)
return;
@ -1324,7 +1451,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
notification->SetNextNotification(nextNotification, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::AddComment") {
if (!params)
return;
@ -1354,7 +1481,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->AddComment(comment->GetEntryType(), comment->GetAuthor(),
comment->GetText(), comment->GetExpireTime(), comment->GetId(), sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::RemoveComment") {
if (!params)
return;
@ -1383,7 +1510,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->RemoveComment(id, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::AddDowntime") {
if (!params)
return;
@ -1416,7 +1543,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
downtime->GetDuration(), downtime->GetScheduledBy(),
downtime->GetId(), sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::RemoveDowntime") {
if (!params)
return;
@ -1445,7 +1572,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->RemoveDowntime(id, false, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetAcknowledgement") {
if (!params)
return;
@ -1477,7 +1604,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->AcknowledgeProblem(author, comment, static_cast<AcknowledgementType>(acktype), expiry, sender->GetName());
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::ClearAcknowledgement") {
if (!params)
return;
@ -1507,7 +1634,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
checkable->ClearAcknowledgement(sender->GetName());
}
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
} else if (message->Get("method") == "cluster::SetLogPosition") {
if (!params)
return;
@ -1614,19 +1741,21 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
Application::RequestRestart();
}
AsyncRelayMessage(sender, message, true);
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
}
}
bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type)
{
double now = Utility::GetTime();
Array::Ptr authorities = object->GetAuthorities();
std::vector<String> endpoints;
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
bool match = false;
if ((!endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
if ((endpoint->GetSeen() < now - 30 && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
continue;
if (authorities) {
@ -1723,9 +1852,9 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ClusterListener::GetClusterStatus(vo
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
count_endpoints++;
if(!endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
if(!endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
not_connected_endpoints->Add(endpoint->GetName());
else if(endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
else if(endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
connected_endpoints->Add(endpoint->GetName());
}

View File

@ -21,6 +21,7 @@
#define CLUSTERLISTENER_H
#include "cluster/clusterlistener.th"
#include "cluster/clusterlink.h"
#include "base/dynamicobject.h"
#include "base/timer.h"
#include "base/array.h"
@ -36,6 +37,15 @@
namespace icinga
{
/**
* @ingroup cluster
*/
struct EndpointPeerInfo
{
double Seen;
Array::Ptr Peers;
};
/**
* @ingroup cluster
*/
@ -75,8 +85,12 @@ private:
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ListenerThreadProc(const Socket::Ptr& server);
void AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
void RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
std::map<String, EndpointPeerInfo> m_VisibleEndpoints;
void UpdateLinks(void);
void AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent);
void RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent);
void OpenLogFile(void);
void RotateLogFile(void);

View File

@ -942,6 +942,8 @@ Example:
host = "192.168.5.46"
port = 7777
metric = 0
config_files = [ "/etc/icinga2/cluster.d/*" ]
config_files_recursive = [
@ -956,6 +958,7 @@ Attributes:
----------------|----------------
host |**Required.** The hostname/IP address of the remote Icinga 2 instance.
port |**Required.** The service name/port of the remote Icinga 2 instance.
metric |**Optional.** The link metric for this endpoint. Defaults to 0.
config\_files |**Optional.** A list of configuration files sent to remote peers (wildcards possible).
config_files_recursive |**Optional.** A list of configuration files sent to remote peers. Array elements can either be a string (in which case all files in that directory matching the pattern *.conf are included) or a dictionary with elements "path" and "pattern".
accept\_config |**Optional.** A list of endpoint names from which this endpoint accepts configuration files.

View File

@ -105,7 +105,8 @@ void TlsStream::Handshake(void)
if (rc > 0)
break;
switch (SSL_get_error(m_SSL.get(), rc)) {
int err = SSL_get_error(m_SSL.get(), rc);
switch (err) {
case SSL_ERROR_WANT_READ:
m_Socket->Poll(true, false);
continue;
@ -141,7 +142,8 @@ size_t TlsStream::Read(void *buffer, size_t count)
}
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
int err = SSL_get_error(m_SSL.get(), rc);
switch (err) {
case SSL_ERROR_WANT_READ:
m_Socket->Poll(true, false);
continue;
@ -179,7 +181,8 @@ void TlsStream::Write(const void *buffer, size_t count)
}
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
int err = SSL_get_error(m_SSL.get(), rc);
switch (err) {
case SSL_ERROR_WANT_READ:
m_Socket->Poll(true, false);
continue;

View File

@ -45,6 +45,11 @@ bool Endpoint::IsConnected(void) const
return GetClient() != NULL;
}
bool Endpoint::IsAvailable(void) const
{
return GetSeen() > Utility::GetTime() - 30;
}
Stream::Ptr Endpoint::GetClient(void) const
{
return m_Client;
@ -52,6 +57,8 @@ Stream::Ptr Endpoint::GetClient(void) const
void Endpoint::SetClient(const Stream::Ptr& client)
{
SetBlockedUntil(Utility::GetTime() + 15);
if (m_Client)
m_Client->Close();
@ -62,10 +69,10 @@ void Endpoint::SetClient(const Stream::Ptr& client)
thread.detach();
OnConnected(GetSelf());
Log(LogWarning, "remote", "Endpoint connected: " + GetName());
Log(LogInformation, "remote", "Endpoint connected: " + GetName());
} else {
OnDisconnected(GetSelf());
Log(LogWarning, "remote", "Endpoint disconnected: " + GetName());
Log(LogInformation, "remote", "Endpoint disconnected: " + GetName());
}
}

View File

@ -50,6 +50,7 @@ public:
void SetClient(const Stream::Ptr& client);
bool IsConnected(void) const;
bool IsAvailable(void) const;
void SendMessage(const Dictionary::Ptr& request);
@ -58,6 +59,7 @@ public:
private:
Stream::Ptr m_Client;
boost::thread m_Thread;
Array::Ptr m_ConnectedEndpoints;
void MessageThreadProc(const Stream::Ptr& stream);
};

View File

@ -10,15 +10,15 @@ class Endpoint : DynamicObject
[config] Array::Ptr config_files;
[config] Array::Ptr config_files_recursive;
[config] Array::Ptr accept_config;
[config] int metric;
[state] double seen;
[state] double local_log_position;
[state] double remote_log_position;
[state] Dictionary::Ptr features;
bool syncing {
default {{{ return false; }}}
};
bool syncing;
double blocked_until;
};
}

View File

@ -34,7 +34,7 @@ using namespace icinga;
void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
{
String json = JsonSerialize(message);
// std::cerr << ">> " << json << std::endl;
//std::cerr << ">> " << json << std::endl;
NetString::WriteStringToStream(stream, json);
}
@ -44,7 +44,7 @@ Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
if (!NetString::ReadStringFromStream(stream, &jsonString))
BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
// std::cerr << "<< " << jsonString << std::endl;
//std::cerr << "<< " << jsonString << std::endl;
Value value = JsonDeserialize(jsonString);
if (!value.IsObjectType<Dictionary>()) {

View File

@ -24,6 +24,8 @@
%require "port",
%attribute %string "port",
%attribute %number "metric",
%attribute %array "config_files" {
%attribute %string "*"
},