/****************************************************************************** * Icinga 2 * * Copyright (C) 2012-2014 Icinga Development Team (http://www.icinga.org) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * * as published by the Free Software Foundation; either version 2 * * of the License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software Foundation * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ #include "cluster/clusterlistener.h" #include "remote/endpoint.h" #include "icinga/cib.h" #include "icinga/domain.h" #include "icinga/icingaapplication.h" #include "base/netstring.h" #include "base/dynamictype.h" #include "base/logger_fwd.h" #include "base/objectlock.h" #include "base/networkstream.h" #include "base/zlibstream.h" #include "base/application.h" #include "base/convert.h" #include "base/context.h" #include "base/statsfunction.h" #include using namespace icinga; REGISTER_TYPE(ClusterListener); REGISTER_STATSFUNCTION(ClusterListenerStats, &ClusterListener::StatsFunc); Value ClusterListener::StatsFunc(Dictionary::Ptr& status, Dictionary::Ptr& perfdata) { Dictionary::Ptr nodes = make_shared(); std::pair stats; BOOST_FOREACH(const ClusterListener::Ptr& cluster_listener, DynamicType::GetObjects()) { stats = cluster_listener->GetClusterStatus(); nodes->Set(cluster_listener->GetName(), stats.first); String perfdata_prefix = "clusterlistener_" + cluster_listener->GetName() + "_"; BOOST_FOREACH(Dictionary::Pair const& kv, stats.second) { perfdata->Set(perfdata_prefix + kv.first, kv.second); } } status->Set("clusterlistener", nodes); return 0; } /** * Starts the component. */ void ClusterListener::Start(void) { DynamicObject::Start(); { ObjectLock olock(this); RotateLogFile(); OpenLogFile(); } /* set up SSL context */ shared_ptr cert = GetX509Certificate(GetCertPath()); SetIdentity(GetCertificateCN(cert)); Log(LogInformation, "cluster", "My identity: " + GetIdentity()); Endpoint::Ptr self = Endpoint::GetByName(GetIdentity()); if (!self) BOOST_THROW_EXCEPTION(std::invalid_argument("No configuration available for the local endpoint.")); m_SSLContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath()); if (!GetCrlPath().IsEmpty()) AddCRLToSSLContext(m_SSLContext, GetCrlPath()); /* create the primary JSON-RPC listener */ if (!GetBindPort().IsEmpty()) AddListener(GetBindPort()); m_ClusterTimer = make_shared(); m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterListener::ClusterTimerHandler, this)); m_ClusterTimer->SetInterval(5); m_ClusterTimer->Start(); m_MessageQueue.SetExceptionCallback(&ClusterListener::MessageExceptionHandler); Service::OnNewCheckResult.connect(boost::bind(&ClusterListener::CheckResultHandler, this, _1, _2, _3)); Service::OnNextCheckChanged.connect(boost::bind(&ClusterListener::NextCheckChangedHandler, this, _1, _2, _3)); Notification::OnNextNotificationChanged.connect(boost::bind(&ClusterListener::NextNotificationChangedHandler, this, _1, _2, _3)); Service::OnForceNextCheckChanged.connect(boost::bind(&ClusterListener::ForceNextCheckChangedHandler, this, _1, _2, _3)); Service::OnForceNextNotificationChanged.connect(boost::bind(&ClusterListener::ForceNextNotificationChangedHandler, this, _1, _2, _3)); Service::OnEnableActiveChecksChanged.connect(boost::bind(&ClusterListener::EnableActiveChecksChangedHandler, this, _1, _2, _3)); Service::OnEnablePassiveChecksChanged.connect(boost::bind(&ClusterListener::EnablePassiveChecksChangedHandler, this, _1, _2, _3)); Service::OnEnableNotificationsChanged.connect(boost::bind(&ClusterListener::EnableNotificationsChangedHandler, this, _1, _2, _3)); Service::OnEnableFlappingChanged.connect(boost::bind(&ClusterListener::EnableFlappingChangedHandler, this, _1, _2, _3)); Service::OnCommentAdded.connect(boost::bind(&ClusterListener::CommentAddedHandler, this, _1, _2, _3)); Service::OnCommentRemoved.connect(boost::bind(&ClusterListener::CommentRemovedHandler, this, _1, _2, _3)); Service::OnDowntimeAdded.connect(boost::bind(&ClusterListener::DowntimeAddedHandler, this, _1, _2, _3)); Service::OnDowntimeRemoved.connect(boost::bind(&ClusterListener::DowntimeRemovedHandler, this, _1, _2, _3)); Service::OnAcknowledgementSet.connect(boost::bind(&ClusterListener::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6)); Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterListener::AcknowledgementClearedHandler, this, _1, _2)); Endpoint::OnMessageReceived.connect(boost::bind(&ClusterListener::AsyncMessageHandler, this, _1, _2)); BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { int privs = 0; Array::Ptr domains = object->GetDomains(); if (domains) { ObjectLock olock(domains); BOOST_FOREACH(const String& domain, domains) { Domain::Ptr domainObj = Domain::GetByName(domain); if (!domainObj) BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid domain: " + domain)); privs |= domainObj->GetPrivileges(endpoint->GetName()); } } else { privs = INT_MAX; } Log(LogDebug, "cluster", "Privileges for object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "' for instance '" + endpoint->GetName() + "' are '" + Convert::ToString(privs) + "'"); object->SetPrivileges(endpoint->GetName(), privs); } } } } /** * Stops the component. */ void ClusterListener::Stop(void) { ObjectLock olock(this); CloseLogFile(); RotateLogFile(); } shared_ptr ClusterListener::GetSSLContext(void) const { return m_SSLContext; } /** * Creates a new JSON-RPC listener on the specified port. * * @param service The port to listen on. */ void ClusterListener::AddListener(const String& service) { ObjectLock olock(this); shared_ptr sslContext = m_SSLContext; if (!sslContext) BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddListener()")); std::ostringstream s; s << "Adding new listener: port " << service; Log(LogInformation, "cluster", s.str()); TcpSocket::Ptr server = make_shared(); server->Bind(service, AF_INET6); boost::thread thread(boost::bind(&ClusterListener::ListenerThreadProc, this, server)); thread.detach(); m_Servers.insert(server); } void ClusterListener::ListenerThreadProc(const Socket::Ptr& server) { Utility::SetThreadName("Cluster Listener"); server->Listen(); for (;;) { Socket::Ptr client = server->Accept(); Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleServer)); } } /** * Creates a new JSON-RPC client and connects to the specified host and port. * * @param node The remote host. * @param service The remote port. */ void ClusterListener::AddConnection(const String& node, const String& service) { { ObjectLock olock(this); shared_ptr sslContext = m_SSLContext; if (!sslContext) BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()")); } TcpSocket::Ptr client = make_shared(); client->Connect(node, service); Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient)); } void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent) { m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent)); } void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message) { double ts = message->Get("ts"); ASSERT(ts != 0); Dictionary::Ptr pmessage = make_shared(); pmessage->Set("timestamp", ts); if (source) pmessage->Set("source", source->GetName()); pmessage->Set("message", JsonSerialize(message)); pmessage->Set("security", message->Get("security")); ObjectLock olock(this); if (m_LogFile) { NetString::WriteStringToStream(m_LogFile, JsonSerialize(pmessage)); m_LogMessageCount++; SetLogMessageTimestamp(ts); if (m_LogMessageCount > 50000) { CloseLogFile(); RotateLogFile(); OpenLogFile(); } } } void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent) { double ts = Utility::GetTime(); message->Set("ts", ts); if (persistent) m_LogQueue.Enqueue(boost::bind(&ClusterListener::PersistMessage, this, source, message)); Dictionary::Ptr security = message->Get("security"); DynamicObject::Ptr secobj; int privs = 0; if (security) { String type = security->Get("type"); DynamicType::Ptr dtype = DynamicType::GetByName(type); if (!dtype) { Log(LogWarning, "cluster", "Invalid type in security attribute: " + type); return; } String name = security->Get("name"); secobj = dtype->GetObject(name); if (!secobj) { Log(LogWarning, "cluster", "Invalid object name in security attribute: " + name + " (of type '" + type + "')"); return; } privs = security->Get("privs"); } BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { if (!persistent && !endpoint->IsConnected()) continue; if (endpoint == source) continue; if (endpoint->GetName() == GetIdentity()) continue; if (secobj && !secobj->HasPrivileges(endpoint->GetName(), privs)) { Log(LogDebug, "cluster", "Not sending message to endpoint '" + endpoint->GetName() + "': Insufficient privileges."); continue; } { ObjectLock olock(endpoint); if (!endpoint->GetSyncing()) endpoint->SendMessage(message); } } } String ClusterListener::GetClusterDir(void) const { return Application::GetLocalStateDir() + "/lib/icinga2/cluster/"; } void ClusterListener::OpenLogFile(void) { ASSERT(OwnsLock()); String path = GetClusterDir() + "log/current"; std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app); if (!fp->good()) { Log(LogWarning, "cluster", "Could not open spool file: " + path); return; } StdioStream::Ptr logStream = make_shared(fp, true); #ifdef HAVE_BIOZLIB m_LogFile = make_shared(logStream); #else /* HAVE_BIOZLIB */ m_LogFile = logStream; #endif /* HAVE_BIOZLIB */ m_LogMessageCount = 0; SetLogMessageTimestamp(0); } void ClusterListener::CloseLogFile(void) { ASSERT(OwnsLock()); if (!m_LogFile) return; m_LogFile->Close(); m_LogFile.reset(); } void ClusterListener::RotateLogFile(void) { ASSERT(OwnsLock()); double ts = GetLogMessageTimestamp(); if (ts == 0) ts = Utility::GetTime(); String oldpath = GetClusterDir() + "log/current"; String newpath = GetClusterDir() + "log/" + Convert::ToString(static_cast(ts) + 1); (void) rename(oldpath.CStr(), newpath.CStr()); } void ClusterListener::LogGlobHandler(std::vector& files, const String& file) { String name = Utility::BaseName(file); int ts; try { ts = Convert::ToLong(name); } catch (const std::exception&) { return; } files.push_back(ts); } void ClusterListener::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream) { CONTEXT("Replaying log for Endpoint '" + endpoint->GetName() + "'"); int count = -1; double peer_ts = endpoint->GetLocalLogPosition(); bool last_sync = false; ASSERT(!OwnsLock()); for (;;) { ObjectLock olock(this); CloseLogFile(); RotateLogFile(); if (count == -1 || count > 50000) { OpenLogFile(); olock.Unlock(); } else { last_sync = true; } count = 0; std::vector files; Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterListener::LogGlobHandler, boost::ref(files), _1), GlobFile); std::sort(files.begin(), files.end()); BOOST_FOREACH(int ts, files) { String path = GetClusterDir() + "log/" + Convert::ToString(ts); if (ts < peer_ts) continue; Log(LogInformation, "cluster", "Replaying log: " + path); std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); StdioStream::Ptr logStream = make_shared(fp, true); #ifdef HAVE_BIOZLIB ZlibStream::Ptr lstream = make_shared(logStream); #else /* HAVE_BIOZLIB */ Stream::Ptr lstream = logStream; #endif /* HAVE_BIOZLIB */ String message; while (true) { Dictionary::Ptr pmessage; try { if (!NetString::ReadStringFromStream(lstream, &message)) break; pmessage = JsonDeserialize(message); } catch (std::exception&) { Log(LogWarning, "cluster", "Unexpected end-of-file for cluster log: " + path); /* Log files may be incomplete or corrupted. This is perfectly OK. */ break; } if (pmessage->Get("timestamp") < peer_ts) continue; if (pmessage->Get("source") == endpoint->GetName()) continue; Dictionary::Ptr security = pmessage->Get("security"); DynamicObject::Ptr secobj; int privs; if (security) { String type = security->Get("type"); DynamicType::Ptr dtype = DynamicType::GetByName(type); if (!dtype) { Log(LogDebug, "cluster", "Invalid type in security attribute: " + type); continue; } String name = security->Get("name"); secobj = dtype->GetObject(name); if (!secobj) { Log(LogDebug, "cluster", "Invalid object name in security attribute: " + name + " (of type '" + type + "')"); continue; } privs = security->Get("privs"); } if (secobj && !secobj->HasPrivileges(endpoint->GetName(), privs)) { Log(LogDebug, "cluster", "Not replaying message to endpoint '" + endpoint->GetName() + "': Insufficient privileges."); continue; } NetString::WriteStringToStream(stream, pmessage->Get("message")); count++; peer_ts = pmessage->Get("timestamp"); } lstream->Close(); } Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages."); if (last_sync) { { ObjectLock olock2(endpoint); endpoint->SetSyncing(false); } OpenLogFile(); break; } } } void ClusterListener::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename) { CONTEXT("Creating config update for file '" + file + "'"); Dictionary::Ptr elem = make_shared(); std::ifstream fp(file.CStr()); if (!fp) return; String content((std::istreambuf_iterator(fp)), std::istreambuf_iterator()); elem->Set("content", content); config->Set(basename ? Utility::BaseName(file) : file, elem); } /** * Processes a new client connection. * * @param client The new client. */ void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role) { CONTEXT("Handling new cluster client connection"); NetworkStream::Ptr netStream = make_shared(client); TlsStream::Ptr tlsStream = make_shared(netStream, role, m_SSLContext); tlsStream->Handshake(); shared_ptr cert = tlsStream->GetPeerCertificate(); String identity = GetCertificateCN(cert); Log(LogInformation, "cluster", "New client connection for identity '" + identity + "'"); Endpoint::Ptr endpoint = Endpoint::GetByName(identity); if (!endpoint) { Log(LogInformation, "cluster", "Closing endpoint '" + identity + "': No configuration available."); tlsStream->Close(); return; } { ObjectLock olock(endpoint); endpoint->SetSyncing(true); endpoint->SetSeen(Utility::GetTime()); endpoint->SetClient(tlsStream); } Dictionary::Ptr config = make_shared(); Array::Ptr configFiles = endpoint->GetConfigFiles(); if (configFiles) { ObjectLock olock(configFiles); BOOST_FOREACH(const String& pattern, configFiles) { Utility::Glob(pattern, boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(config), _1, false), GlobFile); } } Array::Ptr configFilesRecursive = endpoint->GetConfigFilesRecursive(); if (configFilesRecursive) { ObjectLock olock(configFilesRecursive); BOOST_FOREACH(const Value& configFile, configFilesRecursive) { if (configFile.IsObjectType()) { Dictionary::Ptr configFileDict = configFile; String path = configFileDict->Get("path"); String pattern = configFileDict->Get("pattern"); Utility::GlobRecursive(path, pattern, boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(config), _1, false), GlobFile); } else { String configFilePath = configFile; Utility::GlobRecursive(configFilePath, "*.conf", boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(config), _1, false), GlobFile); } } } Log(LogInformation, "cluster", "Sending " + Convert::ToString(static_cast(config->GetLength())) + " config files to endpoint '" + endpoint->GetName() + "'."); Dictionary::Ptr params = make_shared(); params->Set("identity", GetIdentity()); params->Set("config_files", config); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::Config"); message->Set("params", params); NetString::WriteStringToStream(tlsStream, JsonSerialize(message)); ReplayLog(endpoint, tlsStream); } void ClusterListener::ClusterTimerHandler(void) { /* broadcast a heartbeat message */ Dictionary::Ptr params = make_shared(); params->Set("identity", GetIdentity()); /* Eww. */ Dictionary::Ptr features = make_shared(); features->Set("checker", SupportsChecks() ? 1 : 0); features->Set("notification", SupportsNotifications() ? 1 : 0); params->Set("features", features); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::HeartBeat"); message->Set("params", params); Endpoint::GetByName(GetIdentity())->SetFeatures(features); AsyncRelayMessage(Endpoint::Ptr(), message, false); { ObjectLock olock(this); /* check if we've recently seen heartbeat messages from our peers */ BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { if (endpoint->GetSeen() > Utility::GetTime() - 60) continue; Stream::Ptr client = endpoint->GetClient(); if (client) { Log(LogWarning, "cluster", "Closing connection for endpoint '" + endpoint->GetName() + "' due to inactivity."); client->Close(); endpoint->SetClient(Stream::Ptr()); } } } std::vector files; Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterListener::LogGlobHandler, boost::ref(files), _1), GlobFile); std::sort(files.begin(), files.end()); BOOST_FOREACH(int ts, files) { bool need = false; BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { if (endpoint->GetName() == GetIdentity()) continue; double position = endpoint->GetLocalLogPosition(); if (position != 0 && ts > position) { need = true; break; } } if (!need) { String path = GetClusterDir() + "log/" + Convert::ToString(ts); Log(LogInformation, "cluster", "Removing old log file: " + path); (void) unlink(path.CStr()); } } UpdateAuthority(); Array::Ptr peers = GetPeers(); if (peers) { ObjectLock olock(peers); BOOST_FOREACH(const String& peer, peers) { Endpoint::Ptr endpoint = Endpoint::GetByName(peer); if (!endpoint) { Log(LogWarning, "cluster", "Attempted to reconnect to endpoint '" + peer + "': No configuration found."); continue; } if (endpoint->IsConnected()) continue; String host, port; host = endpoint->GetHost(); port = endpoint->GetPort(); if (host.IsEmpty() || port.IsEmpty()) { Log(LogWarning, "cluster", "Can't reconnect " "to endpoint '" + endpoint->GetName() + "': No " "host/port information."); continue; } try { Log(LogInformation, "cluster", "Attempting to reconnect to cluster endpoint '" + endpoint->GetName() + "' via '" + host + ":" + port + "'."); AddConnection(host, port); } catch (std::exception& ex) { std::ostringstream msgbuf; msgbuf << "Exception occured while reconnecting to endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex); Log(LogWarning, "cluster", msgbuf.str()); } } } } void ClusterListener::SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs) { ASSERT(object); Dictionary::Ptr security = make_shared(); security->Set("type", object->GetType()->GetName()); security->Set("name", object->GetName()); security->Set("privs", privs); message->Set("security", security); } void ClusterListener::CheckResultHandler(const Service::Ptr& service, const CheckResult::Ptr& cr, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("check_result", Serialize(cr)); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::CheckResult"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("next_check", nextCheck); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetNextCheck"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("notification", notification->GetName()); params->Set("next_notification", nextNotification); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetNextNotification"); message->Set("params", params); SetSecurityInfo(message, notification->GetService(), DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("forced", forced); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetForceNextCheck"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("forced", forced); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetForceNextNotification"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("enabled", enabled); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetEnableActiveChecks"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("enabled", enabled); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetEnablePassiveChecks"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("enabled", enabled); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetEnableNotifications"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("enabled", enabled); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetEnableFlapping"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::CommentAddedHandler(const Service::Ptr& service, const Comment::Ptr& comment, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("comment", Serialize(comment)); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::AddComment"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::CommentRemovedHandler(const Service::Ptr& service, const Comment::Ptr& comment, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("id", comment->GetId()); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::RemoveComment"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::DowntimeAddedHandler(const Service::Ptr& service, const Downtime::Ptr& downtime, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("downtime", Serialize(downtime)); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::AddDowntime"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::DowntimeRemovedHandler(const Service::Ptr& service, const Downtime::Ptr& downtime, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("id", downtime->GetId()); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::RemoveDowntime"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); params->Set("author", author); params->Set("comment", comment); params->Set("type", type); params->Set("expiry", expiry); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::SetAcknowledgement"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority) { if (!authority.IsEmpty() && authority != GetIdentity()) return; Dictionary::Ptr params = make_shared(); params->Set("service", service->GetName()); Dictionary::Ptr message = make_shared(); message->Set("jsonrpc", "2.0"); message->Set("method", "cluster::ClearAcknowledgement"); message->Set("params", params); SetSecurityInfo(message, service, DomainPrivRead); AsyncRelayMessage(Endpoint::Ptr(), message, true); } void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) { m_MessageQueue.Enqueue(boost::bind(&ClusterListener::MessageHandler, this, sender, message)); } void ClusterListener::MessageExceptionHandler(boost::exception_ptr exp) { Log(LogCritical, "cluster", "Exception while processing cluster message: " + DiagnosticInformation(exp)); } void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message) { CONTEXT("Processing cluster message of type '" + message->Get("method") + "'"); sender->SetSeen(Utility::GetTime()); if (message->Contains("ts")) { double ts = message->Get("ts"); /* ignore old messages */ if (ts < sender->GetRemoteLogPosition()) return; if (sender->GetRemoteLogPosition() + 10 < ts) { Dictionary::Ptr lparams = make_shared(); lparams->Set("log_position", message->Get("ts")); Dictionary::Ptr lmessage = make_shared(); lmessage->Set("jsonrpc", "2.0"); lmessage->Set("method", "cluster::SetLogPosition"); lmessage->Set("params", lparams); sender->SendMessage(lmessage); sender->SetRemoteLogPosition(message->Get("ts")); Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts"))); } } Dictionary::Ptr params = message->Get("params"); if (message->Get("method") == "cluster::HeartBeat") { if (!params) return; String identity = params->Get("identity"); Endpoint::Ptr endpoint = Endpoint::GetByName(identity); if (endpoint) { endpoint->SetSeen(Utility::GetTime()); endpoint->SetFeatures(params->Get("features")); } AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::CheckResult") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCheckResult)) { Log(LogDebug, "cluster", "Not accepting cluster::CheckResult message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } CheckResult::Ptr cr = Deserialize(params->Get("check_result"), true); if (!cr) return; service->ProcessCheckResult(cr, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetNextCheck") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetNextCheck message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } double nextCheck = params->Get("next_check"); service->SetNextCheck(nextCheck, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetForceNextCheck") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetForceNextCheck message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool forced = params->Get("forced"); service->SetForceNextCheck(forced, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetForceNextNotification") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetForceNextNotification message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool forced = params->Get("forced"); service->SetForceNextNotification(forced, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetEnableActiveChecks") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetEnableActiveChecks message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool enabled = params->Get("enabled"); service->SetEnableActiveChecks(enabled, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetEnablePassiveChecks") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetEnablePassiveChecks message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool enabled = params->Get("enabled"); service->SetEnablePassiveChecks(enabled, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetEnableNotifications") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetEnableNotifications message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool enabled = params->Get("enabled"); service->SetEnableNotifications(enabled, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetEnableFlapping") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetEnableFlapping message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool enabled = params->Get("enabled"); service->SetEnableFlapping(enabled, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetNextNotification") { if (!params) return; String nfc = params->Get("notification"); Notification::Ptr notification = Notification::GetByName(nfc); if (!notification) return; Service::Ptr service = notification->GetService(); if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetNextNotification message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } bool nextNotification = params->Get("next_notification"); notification->SetNextNotification(nextNotification, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::AddComment") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::AddComment message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } Comment::Ptr comment = Deserialize(params->Get("comment"), true); service->AddComment(comment->GetEntryType(), comment->GetAuthor(), comment->GetText(), comment->GetExpireTime(), comment->GetId(), sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::RemoveComment") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::RemoveComment message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } String id = params->Get("id"); service->RemoveComment(id, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::AddDowntime") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::AddDowntime message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } Downtime::Ptr downtime = Deserialize(params->Get("downtime"), true); service->AddDowntime(downtime->GetAuthor(), downtime->GetComment(), downtime->GetStartTime(), downtime->GetEndTime(), downtime->GetFixed(), downtime->GetTriggeredBy(), downtime->GetDuration(), downtime->GetScheduledBy(), downtime->GetId(), sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::RemoveDowntime") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::RemoveDowntime message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } String id = params->Get("id"); service->RemoveDowntime(id, false, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetAcknowledgement") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::SetAcknowledgement message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } String author = params->Get("author"); String comment = params->Get("comment"); int type = params->Get("type"); double expiry = params->Get("expiry"); service->AcknowledgeProblem(author, comment, static_cast(type), expiry, sender->GetName()); AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::ClearAcknowledgement") { if (!params) return; String svc = params->Get("service"); Service::Ptr service = Service::GetByName(svc); if (!service) return; if (!service->HasPrivileges(sender->GetName(), DomainPrivCommand)) { Log(LogDebug, "cluster", "Not accepting cluster::ClearAcknowledgement message from endpoint '" + sender->GetName() + "' for service '" + service->GetName() + "': Insufficient privileges."); return; } { ObjectLock olock(service); service->ClearAcknowledgement(sender->GetName()); } AsyncRelayMessage(sender, message, true); } else if (message->Get("method") == "cluster::SetLogPosition") { if (!params) return; sender->SetLocalLogPosition(params->Get("log_position")); } else if (message->Get("method") == "cluster::Config") { if (!params) return; Dictionary::Ptr remoteConfig = params->Get("config_files"); if (!remoteConfig) return; Endpoint::Ptr self = Endpoint::GetByName(GetIdentity()); Array::Ptr acceptConfig = self->GetAcceptConfig(); bool accept = false; if (acceptConfig) { ObjectLock olock(acceptConfig); BOOST_FOREACH(const String& pattern, acceptConfig) { if (pattern == sender->GetName()) { accept = true; break; } } } String identity = params->Get("identity"); if (!accept) { Log(LogWarning, "cluster", "Ignoring config update from endpoint '" + sender->GetName() + "' for identity '" + identity + "'."); return; } Log(LogInformation, "cluster", "Processing config update for identity '" + identity + "'."); String dir = GetClusterDir() + "config/" + SHA256(identity); #ifndef _WIN32 if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) { #else /*_ WIN32 */ if (mkdir(dir.CStr()) < 0 && errno != EEXIST) { #endif /* _WIN32 */ BOOST_THROW_EXCEPTION(posix_error() << boost::errinfo_api_function("localtime") << boost::errinfo_errno(errno)); } Dictionary::Ptr localConfig = make_shared(); Utility::Glob(dir + "/*", boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(localConfig), _1, true), GlobFile); bool configChange = false; /* figure out whether config files were removed */ if (localConfig->GetLength() != remoteConfig->GetLength()) configChange = true; ObjectLock olock(remoteConfig); BOOST_FOREACH(const Dictionary::Pair& kv, remoteConfig) { Dictionary::Ptr remoteFile = kv.second; bool writeFile = false; String hash = SHA256(kv.first); String path = dir + "/" + hash; if (!localConfig->Contains(hash)) writeFile = true; else { Dictionary::Ptr localFile = localConfig->Get(hash); String localContent = localFile->Get("content"); String remoteContent = remoteFile->Get("content"); if (localContent != remoteContent) writeFile = true; } if (writeFile) { configChange = true; Log(LogInformation, "cluster", "Updating configuration file: " + path); std::ofstream fp(path.CStr(), std::ofstream::out | std::ostream::trunc); fp << remoteFile->Get("content"); fp.close(); } localConfig->Remove(hash); } olock.Unlock(); ObjectLock olock2(localConfig); BOOST_FOREACH(const Dictionary::Pair& kv, localConfig) { String path = dir + "/" + kv.first; Log(LogInformation, "cluster", "Removing obsolete config file: " + path); (void) unlink(path.CStr()); configChange = true; } olock2.Unlock(); if (configChange) { Log(LogInformation, "cluster", "Restarting after configuration change."); Application::RequestRestart(); } AsyncRelayMessage(sender, message, true); } } bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type) { Array::Ptr authorities = object->GetAuthorities(); std::vector endpoints; BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { bool match = false; if ((!endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) || !endpoint->HasFeature(type)) continue; if (authorities) { ObjectLock olock(authorities); BOOST_FOREACH(const String& authority, authorities) { if (authority == endpoint->GetName()) { match = true; break; } } } else { match = true; } if (match) endpoints.push_back(endpoint->GetName()); } if (endpoints.empty()) return false; std::sort(endpoints.begin(), endpoints.end()); String key = object->GetType()->GetName() + "\t" + object->GetName(); unsigned long hash = Utility::SDBM(key); unsigned long index = hash % endpoints.size(); // Log(LogDebug, "cluster", "Authority for object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "' is '" + endpoints[index] + "'."); return (endpoints[index] == GetIdentity()); } void ClusterListener::UpdateAuthority(void) { Log(LogDebug, "cluster", "Updating authority for objects."); int checker_count = 0, notifications_count = 0; BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { bool checkerAuthority = IsAuthority(object, "checker"); if (checkerAuthority) checker_count++; object->SetAuthority("checker", checkerAuthority); bool notificationAuthority = IsAuthority(object, "notifications"); if (notificationAuthority) notifications_count++; object->SetAuthority("notifications", notificationAuthority); } } Log(LogDebug, "cluster", "Cluster authority: " + Convert::ToString(checker_count) + "x checker, " + Convert::ToString(notifications_count) + "x notifications"); } bool ClusterListener::SupportsChecks(void) { DynamicType::Ptr type = DynamicType::GetByName("CheckerComponent"); if (!type) return false; return !type->GetObjects().empty() && IcingaApplication::GetInstance()->GetEnableChecks(); } bool ClusterListener::SupportsNotifications(void) { DynamicType::Ptr type = DynamicType::GetByName("NotificationComponent"); if (!type) return false; return !type->GetObjects().empty() && IcingaApplication::GetInstance()->GetEnableNotifications(); } bool ClusterListener::SupportsFeature(const String& name) { DynamicType::Ptr type = DynamicType::GetByName(name); if (!type) return false; return !type->GetObjects().empty(); } std::pair ClusterListener::GetClusterStatus(void) { Dictionary::Ptr status = make_shared(); Dictionary::Ptr perfdata = make_shared(); /* cluster stats */ status->Set("node", IcingaApplication::GetInstance()->GetNodeName()); status->Set("identity", GetIdentity()); double count_endpoints = 0; Array::Ptr not_connected_endpoints = make_shared(); Array::Ptr connected_endpoints = make_shared(); BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects()) { count_endpoints++; if(!endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) not_connected_endpoints->Add(endpoint->GetName()); else if(endpoint->IsConnected() && endpoint->GetName() != GetIdentity()) connected_endpoints->Add(endpoint->GetName()); } status->Set("num_endpoints", count_endpoints); status->Set("num_conn_endpoints", connected_endpoints->GetLength()); status->Set("num_not_conn_endpoints", not_connected_endpoints->GetLength()); status->Set("conn_endpoints", connected_endpoints); status->Set("not_conn_endpoints", not_connected_endpoints); perfdata->Set("num_endpoints", count_endpoints); perfdata->Set("num_conn_endpoints", Convert::ToDouble(connected_endpoints->GetLength())); perfdata->Set("num_not_conn_endpoints", Convert::ToDouble(not_connected_endpoints->GetLength())); return std::make_pair(status, perfdata); }