mirror of https://github.com/Icinga/icinga2.git
commit
8105f51608
|
@ -144,13 +144,11 @@ void AgentListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
|||
{
|
||||
CONTEXT("Handling new agent client connection");
|
||||
|
||||
NetworkStream::Ptr netStream = make_shared<NetworkStream>(client);
|
||||
|
||||
TlsStream::Ptr tlsStream;
|
||||
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
tlsStream = make_shared<TlsStream>(netStream, role, m_SSLContext);
|
||||
tlsStream = make_shared<TlsStream>(client, role, m_SSLContext);
|
||||
}
|
||||
|
||||
tlsStream->Handshake();
|
||||
|
|
|
@ -20,7 +20,7 @@ mkclass_target(clusterlistener.ti clusterlistener.th)
|
|||
mkembedconfig_target(cluster-type.conf cluster-type.cpp)
|
||||
|
||||
add_library(cluster SHARED
|
||||
clusterchecktask.cpp clusterlistener.cpp clusterlistener.th
|
||||
clusterchecktask.cpp clusterlink.cpp clusterlistener.cpp clusterlistener.th
|
||||
cluster-type.cpp
|
||||
)
|
||||
|
||||
|
|
|
@ -0,0 +1,69 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#include "cluster/clusterlink.h"
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
ClusterLink::ClusterLink(const String& from, const String& to)
|
||||
{
|
||||
if (from < to) {
|
||||
From = from;
|
||||
To = to;
|
||||
} else {
|
||||
From = to;
|
||||
To = from;
|
||||
}
|
||||
}
|
||||
|
||||
int ClusterLink::GetMetric(void) const
|
||||
{
|
||||
int metric = 0;
|
||||
|
||||
Endpoint::Ptr fromEp = Endpoint::GetByName(From);
|
||||
if (fromEp)
|
||||
metric += fromEp->GetMetric();
|
||||
|
||||
Endpoint::Ptr toEp = Endpoint::GetByName(To);
|
||||
if (toEp)
|
||||
metric += toEp->GetMetric();
|
||||
|
||||
return metric;
|
||||
}
|
||||
|
||||
bool ClusterLink::operator<(const ClusterLink& other) const
|
||||
{
|
||||
if (From < other.From)
|
||||
return true;
|
||||
else
|
||||
return To < other.To;
|
||||
}
|
||||
|
||||
bool ClusterLinkMetricLessComparer::operator()(const ClusterLink& a, const ClusterLink& b) const
|
||||
{
|
||||
int metricA = a.GetMetric();
|
||||
int metricB = b.GetMetric();
|
||||
|
||||
if (metricA < metricB)
|
||||
return true;
|
||||
else if (metricB > metricA)
|
||||
return false;
|
||||
else
|
||||
return a < b;
|
||||
}
|
|
@ -0,0 +1,49 @@
|
|||
/******************************************************************************
|
||||
* Icinga 2 *
|
||||
* Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) *
|
||||
* *
|
||||
* This program is free software; you can redistribute it and/or *
|
||||
* modify it under the terms of the GNU General Public License *
|
||||
* as published by the Free Software Foundation; either version 2 *
|
||||
* of the License, or (at your option) any later version. *
|
||||
* *
|
||||
* This program is distributed in the hope that it will be useful, *
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
|
||||
* GNU General Public License for more details. *
|
||||
* *
|
||||
* You should have received a copy of the GNU General Public License *
|
||||
* along with this program; if not, write to the Free Software Foundation *
|
||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||
******************************************************************************/
|
||||
|
||||
#ifndef CLUSTERLINK_H
|
||||
#define CLUSTERLINK_H
|
||||
|
||||
#include "remote/endpoint.h"
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
||||
/**
|
||||
* @ingroup cluster
|
||||
*/
|
||||
struct ClusterLink
|
||||
{
|
||||
String From;
|
||||
String To;
|
||||
|
||||
ClusterLink(const String& from, const String& to);
|
||||
|
||||
int GetMetric(void) const;
|
||||
bool operator<(const ClusterLink& other) const;
|
||||
};
|
||||
|
||||
struct ClusterLinkMetricLessComparer
|
||||
{
|
||||
bool operator()(const ClusterLink& a, const ClusterLink& b) const;
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
#endif /* CLUSTERLINK_H */
|
|
@ -222,9 +222,9 @@ void ClusterListener::AddConnection(const String& node, const String& service) {
|
|||
Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient));
|
||||
}
|
||||
|
||||
void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
|
||||
void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
|
||||
{
|
||||
m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent));
|
||||
m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, destination, message, persistent));
|
||||
}
|
||||
|
||||
void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
|
||||
|
@ -256,7 +256,7 @@ void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictiona
|
|||
}
|
||||
}
|
||||
|
||||
void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
|
||||
void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent)
|
||||
{
|
||||
double ts = Utility::GetTime();
|
||||
message->Set("ts", ts);
|
||||
|
@ -288,8 +288,16 @@ void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary
|
|||
privs = security->Get("privs");
|
||||
}
|
||||
|
||||
double now = Utility::GetTime();
|
||||
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
if (!persistent && !endpoint->IsConnected())
|
||||
if (!endpoint->IsConnected())
|
||||
continue;
|
||||
|
||||
if (destination && endpoint != destination)
|
||||
continue;
|
||||
|
||||
if (!destination && endpoint->GetBlockedUntil() > now)
|
||||
continue;
|
||||
|
||||
if (endpoint == source)
|
||||
|
@ -526,16 +534,12 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
|||
{
|
||||
CONTEXT("Handling new cluster client connection");
|
||||
|
||||
NetworkStream::Ptr netStream = make_shared<NetworkStream>(client);
|
||||
|
||||
TlsStream::Ptr tlsStream = make_shared<TlsStream>(netStream, role, m_SSLContext);
|
||||
TlsStream::Ptr tlsStream = make_shared<TlsStream>(client, role, m_SSLContext);
|
||||
tlsStream->Handshake();
|
||||
|
||||
shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
|
||||
String identity = GetCertificateCN(cert);
|
||||
|
||||
Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
|
||||
|
||||
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
|
||||
|
||||
if (!endpoint) {
|
||||
|
@ -544,6 +548,13 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
|||
return;
|
||||
}
|
||||
|
||||
if (endpoint->GetClient()) {
|
||||
tlsStream->Close();
|
||||
return;
|
||||
}
|
||||
|
||||
Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'");
|
||||
|
||||
{
|
||||
ObjectLock olock(endpoint);
|
||||
|
||||
|
@ -595,26 +606,109 @@ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
|||
ReplayLog(endpoint, tlsStream);
|
||||
}
|
||||
|
||||
void ClusterListener::UpdateLinks(void)
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
/* build a set of potential routes */
|
||||
std::set<ClusterLink> links;
|
||||
std::pair<String, EndpointPeerInfo> kv;
|
||||
BOOST_FOREACH(kv, m_VisibleEndpoints) {
|
||||
String endpoint = kv.first;
|
||||
const EndpointPeerInfo& epi = kv.second;
|
||||
|
||||
if (GetIdentity() == endpoint)
|
||||
continue;
|
||||
|
||||
if (epi.Seen > Utility::GetTime() - 30)
|
||||
links.insert(ClusterLink(GetIdentity(), endpoint));
|
||||
|
||||
if (!epi.Peers)
|
||||
continue;
|
||||
|
||||
ObjectLock olock(epi.Peers);
|
||||
BOOST_FOREACH(const String& peer, epi.Peers)
|
||||
links.insert(ClusterLink(endpoint, peer));
|
||||
}
|
||||
olock.Unlock();
|
||||
|
||||
/* sort the routes by metric */
|
||||
std::vector<ClusterLink> sortedLinks;
|
||||
std::copy(links.begin(), links.end(), std::back_inserter(sortedLinks));
|
||||
std::sort(sortedLinks.begin(), sortedLinks.end(), ClusterLinkMetricLessComparer());
|
||||
|
||||
/* pick routes */
|
||||
std::set<String> visitedEndpoints;
|
||||
BOOST_FOREACH(const ClusterLink& link, sortedLinks) {
|
||||
Endpoint::Ptr other;
|
||||
|
||||
if (link.From == GetIdentity())
|
||||
other = Endpoint::GetByName(link.To);
|
||||
else if (link.To == GetIdentity())
|
||||
other = Endpoint::GetByName(link.From);
|
||||
|
||||
if (visitedEndpoints.find(link.From) != visitedEndpoints.end() &&
|
||||
visitedEndpoints.find(link.To) != visitedEndpoints.end()) {
|
||||
if (other) {
|
||||
Log(LogInformation, "cluster", "Blocking link to '" + other->GetName() + "'");
|
||||
|
||||
Dictionary::Ptr message = make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::BlockLink");
|
||||
message->Set("params", make_shared<Dictionary>());
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), other, message, false);
|
||||
}
|
||||
|
||||
continue;
|
||||
}
|
||||
|
||||
visitedEndpoints.insert(link.From);
|
||||
visitedEndpoints.insert(link.To);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterListener::ClusterTimerHandler(void)
|
||||
{
|
||||
/* broadcast a heartbeat message */
|
||||
Dictionary::Ptr params = make_shared<Dictionary>();
|
||||
params->Set("identity", GetIdentity());
|
||||
/* Update endpoint routes */
|
||||
UpdateLinks();
|
||||
|
||||
/* Eww. */
|
||||
Dictionary::Ptr features = make_shared<Dictionary>();
|
||||
features->Set("checker", SupportsChecks() ? 1 : 0);
|
||||
features->Set("notification", SupportsNotifications() ? 1 : 0);
|
||||
params->Set("features", features);
|
||||
features->Set("checker", SupportsChecks());
|
||||
features->Set("notification", SupportsNotifications());
|
||||
|
||||
Dictionary::Ptr message = make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::HeartBeat");
|
||||
message->Set("params", params);
|
||||
/* broadcast a heartbeat message */
|
||||
BOOST_FOREACH(const Endpoint::Ptr& destination, DynamicType::GetObjects<Endpoint>()) {
|
||||
std::set<String> connected_endpoints;
|
||||
|
||||
Endpoint::GetByName(GetIdentity())->SetFeatures(features);
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
if (endpoint->GetName() == GetIdentity())
|
||||
continue;
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, false);
|
||||
if (!endpoint->IsConnected())
|
||||
continue;
|
||||
|
||||
connected_endpoints.insert(endpoint->GetName());
|
||||
}
|
||||
|
||||
Array::Ptr epnames = make_shared<Array>();
|
||||
BOOST_FOREACH(const String& name, connected_endpoints)
|
||||
epnames->Add(name);
|
||||
|
||||
Dictionary::Ptr params = make_shared<Dictionary>();
|
||||
params->Set("identity", GetIdentity());
|
||||
params->Set("features", features);
|
||||
params->Set("connected_endpoints", epnames);
|
||||
|
||||
Dictionary::Ptr message = make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::HeartBeat");
|
||||
message->Set("params", params);
|
||||
|
||||
Endpoint::GetByName(GetIdentity())->SetFeatures(features);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), destination, message, false);
|
||||
}
|
||||
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
|
@ -623,6 +717,8 @@ void ClusterListener::ClusterTimerHandler(void)
|
|||
if (endpoint->GetSeen() > Utility::GetTime() - 60)
|
||||
continue;
|
||||
|
||||
m_VisibleEndpoints.erase(endpoint->GetName());
|
||||
|
||||
Stream::Ptr client = endpoint->GetClient();
|
||||
|
||||
if (client) {
|
||||
|
@ -729,7 +825,7 @@ void ClusterListener::CheckResultHandler(const Checkable::Ptr& checkable, const
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, double nextCheck, const String& authority)
|
||||
|
@ -749,7 +845,7 @@ void ClusterListener::NextCheckChangedHandler(const Checkable::Ptr& checkable, d
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
|
||||
|
@ -768,7 +864,7 @@ void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& no
|
|||
|
||||
SetSecurityInfo(message, notification->GetCheckable(), DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
|
||||
|
@ -788,7 +884,7 @@ void ClusterListener::ForceNextCheckChangedHandler(const Checkable::Ptr& checkab
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr& checkable, bool forced, const String& authority)
|
||||
|
@ -808,7 +904,7 @@ void ClusterListener::ForceNextNotificationChangedHandler(const Checkable::Ptr&
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
|
||||
|
@ -828,7 +924,7 @@ void ClusterListener::EnableActiveChecksChangedHandler(const Checkable::Ptr& che
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
|
||||
|
@ -848,7 +944,7 @@ void ClusterListener::EnablePassiveChecksChangedHandler(const Checkable::Ptr& ch
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
|
||||
|
@ -868,7 +964,7 @@ void ClusterListener::EnableNotificationsChangedHandler(const Checkable::Ptr& ch
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkable, bool enabled, const String& authority)
|
||||
|
@ -888,7 +984,7 @@ void ClusterListener::EnableFlappingChangedHandler(const Checkable::Ptr& checkab
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
|
||||
|
@ -908,7 +1004,7 @@ void ClusterListener::CommentAddedHandler(const Checkable::Ptr& checkable, const
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, const Comment::Ptr& comment, const String& authority)
|
||||
|
@ -928,7 +1024,7 @@ void ClusterListener::CommentRemovedHandler(const Checkable::Ptr& checkable, con
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
|
||||
|
@ -948,7 +1044,7 @@ void ClusterListener::DowntimeAddedHandler(const Checkable::Ptr& checkable, cons
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, const Downtime::Ptr& downtime, const String& authority)
|
||||
|
@ -968,7 +1064,7 @@ void ClusterListener::DowntimeRemovedHandler(const Checkable::Ptr& checkable, co
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
|
||||
|
@ -991,7 +1087,7 @@ void ClusterListener::AcknowledgementSetHandler(const Checkable::Ptr& checkable,
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checkable, const String& authority)
|
||||
|
@ -1010,7 +1106,7 @@ void ClusterListener::AcknowledgementClearedHandler(const Checkable::Ptr& checka
|
|||
|
||||
SetSecurityInfo(message, checkable, DomainPrivRead);
|
||||
|
||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||
AsyncRelayMessage(Endpoint::Ptr(), Endpoint::Ptr(), message, true);
|
||||
}
|
||||
|
||||
void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
|
||||
|
@ -1047,9 +1143,27 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
sender->SendMessage(lmessage);
|
||||
|
||||
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
|
||||
sender->SetRemoteLogPosition(message->Get("ts"));
|
||||
|
||||
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
|
||||
ObjectLock olock(this);
|
||||
const EndpointPeerInfo& epi = m_VisibleEndpoints[sender->GetName()];
|
||||
|
||||
if (epi.Peers) {
|
||||
ObjectLock olock(epi.Peers);
|
||||
BOOST_FOREACH(const String& epname, epi.Peers) {
|
||||
if (epname == GetIdentity())
|
||||
continue;
|
||||
|
||||
Endpoint::Ptr peer_endpoint = Endpoint::GetByName(epname);
|
||||
|
||||
if (!peer_endpoint)
|
||||
continue;
|
||||
|
||||
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + peer_endpoint->GetName() + "' (via '" + sender->GetName() + "'): " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
|
||||
peer_endpoint->SetRemoteLogPosition(message->Get("ts"));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1061,6 +1175,14 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
String identity = params->Get("identity");
|
||||
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
EndpointPeerInfo epi;
|
||||
epi.Seen = Utility::GetTime();
|
||||
epi.Peers = params->Get("connected_endpoints");
|
||||
m_VisibleEndpoints[identity] = epi;
|
||||
}
|
||||
|
||||
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
|
||||
|
||||
if (endpoint) {
|
||||
|
@ -1068,7 +1190,10 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
endpoint->SetFeatures(params->Get("features"));
|
||||
}
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, false);
|
||||
} else if (message->Get("method") == "cluster::BlockLink") {
|
||||
Log(LogDebug, "cluster", "Got cluster::BlockLink message. Blocking direct link for '" + sender->GetName() + "'");
|
||||
sender->SetBlockedUntil(Utility::GetTime() + 30);
|
||||
} else if (message->Get("method") == "cluster::CheckResult") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1079,9 +1204,9 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
Checkable::Ptr checkable;
|
||||
|
||||
if (type == "Host")
|
||||
checkable = DynamicObject::GetObject<Host>(chk);
|
||||
checkable = Host::GetByName(chk);
|
||||
else if (type == "Service")
|
||||
checkable = DynamicObject::GetObject<Service>(chk);
|
||||
checkable = Service::GetByName(chk);
|
||||
else
|
||||
return;
|
||||
|
||||
|
@ -1100,7 +1225,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->ProcessCheckResult(cr, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetNextCheck") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1129,7 +1254,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetNextCheck(nextCheck, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetForceNextCheck") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1158,7 +1283,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetForceNextCheck(forced, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetForceNextNotification") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1187,7 +1312,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetForceNextNotification(forced, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1216,7 +1341,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetEnableActiveChecks(enabled, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1245,7 +1370,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetEnablePassiveChecks(enabled, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetEnableNotifications") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1274,7 +1399,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetEnableNotifications(enabled, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetEnableFlapping") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1303,7 +1428,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->SetEnableFlapping(enabled, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetNextNotification") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1322,11 +1447,11 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
return;
|
||||
}
|
||||
|
||||
bool nextNotification = params->Get("next_notification");
|
||||
double nextNotification = params->Get("next_notification");
|
||||
|
||||
notification->SetNextNotification(nextNotification, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::AddComment") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1356,7 +1481,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
checkable->AddComment(comment->GetEntryType(), comment->GetAuthor(),
|
||||
comment->GetText(), comment->GetExpireTime(), comment->GetId(), sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::RemoveComment") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1385,7 +1510,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->RemoveComment(id, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::AddDowntime") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1418,7 +1543,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
downtime->GetDuration(), downtime->GetScheduledBy(),
|
||||
downtime->GetId(), sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::RemoveDowntime") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1447,7 +1572,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->RemoveDowntime(id, false, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetAcknowledgement") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1479,7 +1604,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
|
||||
checkable->AcknowledgeProblem(author, comment, static_cast<AcknowledgementType>(acktype), expiry, sender->GetName());
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::ClearAcknowledgement") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1509,7 +1634,7 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
checkable->ClearAcknowledgement(sender->GetName());
|
||||
}
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
} else if (message->Get("method") == "cluster::SetLogPosition") {
|
||||
if (!params)
|
||||
return;
|
||||
|
@ -1616,19 +1741,21 @@ void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictiona
|
|||
Application::RequestRestart();
|
||||
}
|
||||
|
||||
AsyncRelayMessage(sender, message, true);
|
||||
AsyncRelayMessage(sender, Endpoint::Ptr(), message, true);
|
||||
}
|
||||
}
|
||||
|
||||
bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type)
|
||||
{
|
||||
double now = Utility::GetTime();
|
||||
|
||||
Array::Ptr authorities = object->GetAuthorities();
|
||||
std::vector<String> endpoints;
|
||||
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
bool match = false;
|
||||
|
||||
if ((!endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
|
||||
if ((endpoint->GetSeen() < now - 30 && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type))
|
||||
continue;
|
||||
|
||||
if (authorities) {
|
||||
|
@ -1691,22 +1818,12 @@ void ClusterListener::UpdateAuthority(void)
|
|||
|
||||
bool ClusterListener::SupportsChecks(void)
|
||||
{
|
||||
DynamicType::Ptr type = DynamicType::GetByName("CheckerComponent");
|
||||
|
||||
if (!type)
|
||||
return false;
|
||||
|
||||
return std::distance(type->GetObjects().first, type->GetObjects().second) > 0 && (IcingaApplication::GetInstance()->GetEnableHostChecks() || IcingaApplication::GetInstance()->GetEnableServiceChecks());
|
||||
return SupportsFeature("CheckerComponent") && (IcingaApplication::GetInstance()->GetEnableHostChecks() || IcingaApplication::GetInstance()->GetEnableServiceChecks());
|
||||
}
|
||||
|
||||
bool ClusterListener::SupportsNotifications(void)
|
||||
{
|
||||
DynamicType::Ptr type = DynamicType::GetByName("NotificationComponent");
|
||||
|
||||
if (!type)
|
||||
return false;
|
||||
|
||||
return std::distance(type->GetObjects().first, type->GetObjects().second) > 0 && IcingaApplication::GetInstance()->GetEnableNotifications();
|
||||
return SupportsFeature("NotificationComponent") && IcingaApplication::GetInstance()->GetEnableNotifications();
|
||||
}
|
||||
|
||||
bool ClusterListener::SupportsFeature(const String& name)
|
||||
|
@ -1735,9 +1852,9 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ClusterListener::GetClusterStatus(vo
|
|||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
count_endpoints++;
|
||||
|
||||
if(!endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
|
||||
if(!endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
|
||||
not_connected_endpoints->Add(endpoint->GetName());
|
||||
else if(endpoint->IsConnected() && endpoint->GetName() != GetIdentity())
|
||||
else if(endpoint->IsAvailable() && endpoint->GetName() != GetIdentity())
|
||||
connected_endpoints->Add(endpoint->GetName());
|
||||
}
|
||||
|
||||
|
@ -1753,4 +1870,3 @@ std::pair<Dictionary::Ptr, Dictionary::Ptr> ClusterListener::GetClusterStatus(vo
|
|||
|
||||
return std::make_pair(status, perfdata);
|
||||
}
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#define CLUSTERLISTENER_H
|
||||
|
||||
#include "cluster/clusterlistener.th"
|
||||
#include "cluster/clusterlink.h"
|
||||
#include "base/dynamicobject.h"
|
||||
#include "base/timer.h"
|
||||
#include "base/array.h"
|
||||
|
@ -36,6 +37,15 @@
|
|||
namespace icinga
|
||||
{
|
||||
|
||||
/**
|
||||
* @ingroup cluster
|
||||
*/
|
||||
struct EndpointPeerInfo
|
||||
{
|
||||
double Seen;
|
||||
Array::Ptr Peers;
|
||||
};
|
||||
|
||||
/**
|
||||
* @ingroup cluster
|
||||
*/
|
||||
|
@ -75,8 +85,12 @@ private:
|
|||
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
|
||||
void ListenerThreadProc(const Socket::Ptr& server);
|
||||
|
||||
void AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
|
||||
void RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent);
|
||||
std::map<String, EndpointPeerInfo> m_VisibleEndpoints;
|
||||
|
||||
void UpdateLinks(void);
|
||||
|
||||
void AsyncRelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent);
|
||||
void RelayMessage(const Endpoint::Ptr& source, const Endpoint::Ptr& destination, const Dictionary::Ptr& message, bool persistent);
|
||||
|
||||
void OpenLogFile(void);
|
||||
void RotateLogFile(void);
|
||||
|
|
|
@ -942,6 +942,8 @@ Example:
|
|||
host = "192.168.5.46"
|
||||
port = 7777
|
||||
|
||||
metric = 0
|
||||
|
||||
config_files = [ "/etc/icinga2/cluster.d/*" ]
|
||||
|
||||
config_files_recursive = [
|
||||
|
@ -956,6 +958,7 @@ Attributes:
|
|||
----------------|----------------
|
||||
host |**Required.** The hostname/IP address of the remote Icinga 2 instance.
|
||||
port |**Required.** The service name/port of the remote Icinga 2 instance.
|
||||
metric |**Optional.** The link metric for this endpoint. Defaults to 0.
|
||||
config\_files |**Optional.** A list of configuration files sent to remote peers (wildcards possible).
|
||||
config_files_recursive |**Optional.** A list of configuration files sent to remote peers. Array elements can either be a string (in which case all files in that directory matching the pattern *.conf are included) or a dictionary with elements "path" and "pattern".
|
||||
accept\_config |**Optional.** A list of endpoint names from which this endpoint accepts configuration files.
|
||||
|
|
|
@ -28,6 +28,10 @@
|
|||
#include <boost/exception/errinfo_errno.hpp>
|
||||
#include <boost/exception/errinfo_file_name.hpp>
|
||||
|
||||
#ifndef _WIN32
|
||||
# include <poll.h>
|
||||
#endif /* _WIN32 */
|
||||
|
||||
using namespace icinga;
|
||||
|
||||
/**
|
||||
|
@ -284,3 +288,45 @@ Socket::Ptr Socket::Accept(void)
|
|||
|
||||
return make_shared<Socket>(fd);
|
||||
}
|
||||
|
||||
void Socket::Poll(bool read, bool write)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
fd_set readfds, writefds, exceptfds;
|
||||
|
||||
FD_ZERO(&readfds);
|
||||
if (read)
|
||||
FD_SET(GetFD(), &readfds);
|
||||
|
||||
FD_ZERO(&writefds);
|
||||
if (write)
|
||||
FD_SET(GetFD(), &writefds);
|
||||
|
||||
FD_ZERO(&exceptfds);
|
||||
FD_SET(GetFD(), &exceptfds);
|
||||
|
||||
if (select(GetFD() + 1, &readfds, &writefds, &exceptfds, NULL) < 0)
|
||||
BOOST_THROW_EXCEPTION(socket_error()
|
||||
<< boost::errinfo_api_function("select")
|
||||
<< errinfo_win32_error(WSAGetLastError()));
|
||||
#else /* _WIN32 */
|
||||
pollfd pfd;
|
||||
pfd.fd = GetFD();
|
||||
pfd.events = (read ? POLLIN : 0) | (write ? POLLOUT : 0);
|
||||
pfd.revents = 0;
|
||||
|
||||
if (poll(&pfd, 1, -1) < 0)
|
||||
BOOST_THROW_EXCEPTION(socket_error()
|
||||
<< boost::errinfo_api_function("poll")
|
||||
<< boost::errinfo_errno(errno));
|
||||
#endif /* _WIN32 */
|
||||
}
|
||||
|
||||
void Socket::MakeNonBlocking(void)
|
||||
{
|
||||
#ifdef _WIN32
|
||||
Utility::SetNonBlockingSocket(GetFD());
|
||||
#else /* _WIN32 */
|
||||
Utility::SetNonBlocking(GetFD());
|
||||
#endif /* _WIN32 */
|
||||
}
|
|
@ -43,6 +43,8 @@ public:
|
|||
Socket(SOCKET fd);
|
||||
~Socket(void);
|
||||
|
||||
SOCKET GetFD(void) const;
|
||||
|
||||
void Close(void);
|
||||
|
||||
String GetClientAddress(void);
|
||||
|
@ -54,9 +56,12 @@ public:
|
|||
void Listen(void);
|
||||
Socket::Ptr Accept(void);
|
||||
|
||||
void Poll(bool read, bool write);
|
||||
|
||||
void MakeNonBlocking(void);
|
||||
|
||||
protected:
|
||||
void SetFD(SOCKET fd);
|
||||
SOCKET GetFD(void) const;
|
||||
|
||||
int GetError(void) const;
|
||||
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
******************************************************************************/
|
||||
|
||||
#include "base/tlsstream.h"
|
||||
#include "base/stream_bio.h"
|
||||
#include "base/objectlock.h"
|
||||
#include "base/debug.h"
|
||||
#include "base/utility.h"
|
||||
|
@ -37,8 +36,8 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
|
|||
* @param role The role of the client.
|
||||
* @param sslContext The SSL context for the client.
|
||||
*/
|
||||
TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
|
||||
: m_InnerStream(innerStream), m_Role(role)
|
||||
TlsStream::TlsStream(const Socket::Ptr& socket, TlsRole role, shared_ptr<SSL_CTX> sslContext)
|
||||
: m_Socket(socket), m_Role(role)
|
||||
{
|
||||
m_SSL = shared_ptr<SSL>(SSL_new(sslContext.get()), SSL_free);
|
||||
|
||||
|
@ -57,7 +56,10 @@ TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SS
|
|||
|
||||
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
|
||||
|
||||
m_BIO = BIO_new_I2Stream(m_InnerStream);
|
||||
socket->MakeNonBlocking();
|
||||
|
||||
m_BIO = BIO_new_socket(socket->GetFD(), 0);
|
||||
BIO_set_nbio(m_BIO, 1);
|
||||
SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
|
||||
|
||||
if (m_Role == TlsRoleServer)
|
||||
|
@ -92,19 +94,29 @@ void TlsStream::Handshake(void)
|
|||
|
||||
int rc;
|
||||
|
||||
ObjectLock olock(this);
|
||||
for (;;) {
|
||||
int rc;
|
||||
|
||||
while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) {
|
||||
switch (SSL_get_error(m_SSL.get(), rc)) {
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
rc = SSL_do_handshake(m_SSL.get());
|
||||
}
|
||||
|
||||
if (rc > 0)
|
||||
break;
|
||||
|
||||
int err = SSL_get_error(m_SSL.get(), rc);
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
m_Socket->Poll(true, false);
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
m_Socket->Poll(false, true);
|
||||
continue;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
Close();
|
||||
return;
|
||||
default:
|
||||
I2Stream_check_exception(m_BIO);
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("SSL_do_handshake")
|
||||
<< errinfo_openssl_error(ERR_get_error()));
|
||||
|
@ -121,22 +133,27 @@ size_t TlsStream::Read(void *buffer, size_t count)
|
|||
|
||||
size_t left = count;
|
||||
|
||||
ObjectLock olock(this);
|
||||
|
||||
while (left > 0) {
|
||||
int rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
|
||||
int rc;
|
||||
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
|
||||
}
|
||||
|
||||
if (rc <= 0) {
|
||||
switch (SSL_get_error(m_SSL.get(), rc)) {
|
||||
int err = SSL_get_error(m_SSL.get(), rc);
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
m_Socket->Poll(true, false);
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
m_Socket->Poll(false, true);
|
||||
continue;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
Close();
|
||||
return count - left;
|
||||
default:
|
||||
I2Stream_check_exception(m_BIO);
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("SSL_read")
|
||||
<< errinfo_openssl_error(ERR_get_error()));
|
||||
|
@ -155,22 +172,27 @@ void TlsStream::Write(const void *buffer, size_t count)
|
|||
|
||||
size_t left = count;
|
||||
|
||||
ObjectLock olock(this);
|
||||
|
||||
while (left > 0) {
|
||||
int rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
|
||||
int rc;
|
||||
|
||||
{
|
||||
ObjectLock olock(this);
|
||||
rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
|
||||
}
|
||||
|
||||
if (rc <= 0) {
|
||||
switch (SSL_get_error(m_SSL.get(), rc)) {
|
||||
int err = SSL_get_error(m_SSL.get(), rc);
|
||||
switch (err) {
|
||||
case SSL_ERROR_WANT_READ:
|
||||
m_Socket->Poll(true, false);
|
||||
continue;
|
||||
case SSL_ERROR_WANT_WRITE:
|
||||
m_Socket->Poll(false, true);
|
||||
continue;
|
||||
case SSL_ERROR_ZERO_RETURN:
|
||||
Close();
|
||||
return;
|
||||
default:
|
||||
I2Stream_check_exception(m_BIO);
|
||||
BOOST_THROW_EXCEPTION(openssl_error()
|
||||
<< boost::errinfo_api_function("SSL_write")
|
||||
<< errinfo_openssl_error(ERR_get_error()));
|
||||
|
@ -186,10 +208,10 @@ void TlsStream::Write(const void *buffer, size_t count)
|
|||
*/
|
||||
void TlsStream::Close(void)
|
||||
{
|
||||
m_InnerStream->Close();
|
||||
m_Socket->Close();
|
||||
}
|
||||
|
||||
bool TlsStream::IsEof(void) const
|
||||
{
|
||||
return m_InnerStream->IsEof();
|
||||
return BIO_eof(m_BIO);
|
||||
}
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
#define TLSSTREAM_H
|
||||
|
||||
#include "base/i2-base.h"
|
||||
#include "base/stream.h"
|
||||
#include "base/socket.h"
|
||||
#include "base/fifo.h"
|
||||
#include "base/tlsutility.h"
|
||||
|
||||
|
@ -44,7 +44,7 @@ class I2_BASE_API TlsStream : public Stream
|
|||
public:
|
||||
DECLARE_PTR_TYPEDEFS(TlsStream);
|
||||
|
||||
TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext);
|
||||
TlsStream(const Socket::Ptr& socket, TlsRole role, shared_ptr<SSL_CTX> sslContext);
|
||||
|
||||
shared_ptr<X509> GetClientCertificate(void) const;
|
||||
shared_ptr<X509> GetPeerCertificate(void) const;
|
||||
|
@ -62,7 +62,7 @@ private:
|
|||
shared_ptr<SSL> m_SSL;
|
||||
BIO *m_BIO;
|
||||
|
||||
Stream::Ptr m_InnerStream;
|
||||
Socket::Ptr m_Socket;
|
||||
TlsRole m_Role;
|
||||
|
||||
static int m_SSLIndex;
|
||||
|
|
|
@ -45,6 +45,11 @@ bool Endpoint::IsConnected(void) const
|
|||
return GetClient() != NULL;
|
||||
}
|
||||
|
||||
bool Endpoint::IsAvailable(void) const
|
||||
{
|
||||
return GetSeen() > Utility::GetTime() - 30;
|
||||
}
|
||||
|
||||
Stream::Ptr Endpoint::GetClient(void) const
|
||||
{
|
||||
return m_Client;
|
||||
|
@ -52,6 +57,8 @@ Stream::Ptr Endpoint::GetClient(void) const
|
|||
|
||||
void Endpoint::SetClient(const Stream::Ptr& client)
|
||||
{
|
||||
SetBlockedUntil(Utility::GetTime() + 15);
|
||||
|
||||
if (m_Client)
|
||||
m_Client->Close();
|
||||
|
||||
|
@ -62,10 +69,10 @@ void Endpoint::SetClient(const Stream::Ptr& client)
|
|||
thread.detach();
|
||||
|
||||
OnConnected(GetSelf());
|
||||
Log(LogWarning, "remote", "Endpoint connected: " + GetName());
|
||||
Log(LogInformation, "remote", "Endpoint connected: " + GetName());
|
||||
} else {
|
||||
OnDisconnected(GetSelf());
|
||||
Log(LogWarning, "remote", "Endpoint disconnected: " + GetName());
|
||||
Log(LogInformation, "remote", "Endpoint disconnected: " + GetName());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -50,6 +50,7 @@ public:
|
|||
void SetClient(const Stream::Ptr& client);
|
||||
|
||||
bool IsConnected(void) const;
|
||||
bool IsAvailable(void) const;
|
||||
|
||||
void SendMessage(const Dictionary::Ptr& request);
|
||||
|
||||
|
@ -58,6 +59,7 @@ public:
|
|||
private:
|
||||
Stream::Ptr m_Client;
|
||||
boost::thread m_Thread;
|
||||
Array::Ptr m_ConnectedEndpoints;
|
||||
|
||||
void MessageThreadProc(const Stream::Ptr& stream);
|
||||
};
|
||||
|
|
|
@ -10,15 +10,15 @@ class Endpoint : DynamicObject
|
|||
[config] Array::Ptr config_files;
|
||||
[config] Array::Ptr config_files_recursive;
|
||||
[config] Array::Ptr accept_config;
|
||||
[config] int metric;
|
||||
|
||||
[state] double seen;
|
||||
[state] double local_log_position;
|
||||
[state] double remote_log_position;
|
||||
[state] Dictionary::Ptr features;
|
||||
|
||||
bool syncing {
|
||||
default {{{ return false; }}}
|
||||
};
|
||||
bool syncing;
|
||||
double blocked_until;
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ using namespace icinga;
|
|||
void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
|
||||
{
|
||||
String json = JsonSerialize(message);
|
||||
// std::cerr << ">> " << json << std::endl;
|
||||
//std::cerr << ">> " << json << std::endl;
|
||||
NetString::WriteStringToStream(stream, json);
|
||||
}
|
||||
|
||||
|
@ -44,7 +44,7 @@ Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
|
|||
if (!NetString::ReadStringFromStream(stream, &jsonString))
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
|
||||
|
||||
// std::cerr << "<< " << jsonString << std::endl;
|
||||
//std::cerr << "<< " << jsonString << std::endl;
|
||||
Value value = JsonDeserialize(jsonString);
|
||||
|
||||
if (!value.IsObjectType<Dictionary>()) {
|
||||
|
|
|
@ -24,6 +24,8 @@
|
|||
%require "port",
|
||||
%attribute %string "port",
|
||||
|
||||
%attribute %number "metric",
|
||||
|
||||
%attribute %array "config_files" {
|
||||
%attribute %string "*"
|
||||
},
|
||||
|
|
Loading…
Reference in New Issue