mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-23 21:55:03 +02:00
Refactor the cluster library.
This commit is contained in:
parent
21999fe51e
commit
c3746e7c73
@ -10,8 +10,8 @@ CLEANFILES = \
|
|||||||
$(top_builddir)/tools/mkembedconfig/mkembedconfig $< $@
|
$(top_builddir)/tools/mkembedconfig/mkembedconfig $< $@
|
||||||
|
|
||||||
libcluster_la_SOURCES = \
|
libcluster_la_SOURCES = \
|
||||||
clustercomponent.cpp \
|
clusterlistener.cpp \
|
||||||
clustercomponent.h \
|
clusterlistener.h \
|
||||||
cluster-type.conf \
|
cluster-type.conf \
|
||||||
endpoint.cpp \
|
endpoint.cpp \
|
||||||
endpoint.h \
|
endpoint.h \
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|
||||||
type ClusterComponent {
|
type ClusterListener {
|
||||||
%attribute string "cert_path",
|
%attribute string "cert_path",
|
||||||
%require "cert_path",
|
%require "cert_path",
|
||||||
|
|
||||||
|
@ -17,7 +17,7 @@
|
|||||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|
||||||
#include "cluster/clustercomponent.h"
|
#include "cluster/clusterlistener.h"
|
||||||
#include "cluster/endpoint.h"
|
#include "cluster/endpoint.h"
|
||||||
#include "icinga/domain.h"
|
#include "icinga/domain.h"
|
||||||
#include "base/netstring.h"
|
#include "base/netstring.h"
|
||||||
@ -34,12 +34,12 @@
|
|||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
REGISTER_TYPE(ClusterComponent);
|
REGISTER_TYPE(ClusterListener);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts the component.
|
* Starts the component.
|
||||||
*/
|
*/
|
||||||
void ClusterComponent::Start(void)
|
void ClusterListener::Start(void)
|
||||||
{
|
{
|
||||||
DynamicObject::Start();
|
DynamicObject::Start();
|
||||||
|
|
||||||
@ -66,27 +66,27 @@ void ClusterComponent::Start(void)
|
|||||||
AddListener(GetBindPort());
|
AddListener(GetBindPort());
|
||||||
|
|
||||||
m_ClusterTimer = boost::make_shared<Timer>();
|
m_ClusterTimer = boost::make_shared<Timer>();
|
||||||
m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterComponent::ClusterTimerHandler, this));
|
m_ClusterTimer->OnTimerExpired.connect(boost::bind(&ClusterListener::ClusterTimerHandler, this));
|
||||||
m_ClusterTimer->SetInterval(5);
|
m_ClusterTimer->SetInterval(5);
|
||||||
m_ClusterTimer->Start();
|
m_ClusterTimer->Start();
|
||||||
|
|
||||||
Service::OnNewCheckResult.connect(boost::bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
|
Service::OnNewCheckResult.connect(boost::bind(&ClusterListener::CheckResultHandler, this, _1, _2, _3));
|
||||||
Service::OnNextCheckChanged.connect(boost::bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
|
Service::OnNextCheckChanged.connect(boost::bind(&ClusterListener::NextCheckChangedHandler, this, _1, _2, _3));
|
||||||
Notification::OnNextNotificationChanged.connect(boost::bind(&ClusterComponent::NextNotificationChangedHandler, this, _1, _2, _3));
|
Notification::OnNextNotificationChanged.connect(boost::bind(&ClusterListener::NextNotificationChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnForceNextCheckChanged.connect(boost::bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3));
|
Service::OnForceNextCheckChanged.connect(boost::bind(&ClusterListener::ForceNextCheckChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnForceNextNotificationChanged.connect(boost::bind(&ClusterComponent::ForceNextNotificationChangedHandler, this, _1, _2, _3));
|
Service::OnForceNextNotificationChanged.connect(boost::bind(&ClusterListener::ForceNextNotificationChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnEnableActiveChecksChanged.connect(boost::bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3));
|
Service::OnEnableActiveChecksChanged.connect(boost::bind(&ClusterListener::EnableActiveChecksChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnEnablePassiveChecksChanged.connect(boost::bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
|
Service::OnEnablePassiveChecksChanged.connect(boost::bind(&ClusterListener::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnEnableNotificationsChanged.connect(boost::bind(&ClusterComponent::EnableNotificationsChangedHandler, this, _1, _2, _3));
|
Service::OnEnableNotificationsChanged.connect(boost::bind(&ClusterListener::EnableNotificationsChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnEnableFlappingChanged.connect(boost::bind(&ClusterComponent::EnableFlappingChangedHandler, this, _1, _2, _3));
|
Service::OnEnableFlappingChanged.connect(boost::bind(&ClusterListener::EnableFlappingChangedHandler, this, _1, _2, _3));
|
||||||
Service::OnCommentAdded.connect(boost::bind(&ClusterComponent::CommentAddedHandler, this, _1, _2, _3));
|
Service::OnCommentAdded.connect(boost::bind(&ClusterListener::CommentAddedHandler, this, _1, _2, _3));
|
||||||
Service::OnCommentRemoved.connect(boost::bind(&ClusterComponent::CommentRemovedHandler, this, _1, _2, _3));
|
Service::OnCommentRemoved.connect(boost::bind(&ClusterListener::CommentRemovedHandler, this, _1, _2, _3));
|
||||||
Service::OnDowntimeAdded.connect(boost::bind(&ClusterComponent::DowntimeAddedHandler, this, _1, _2, _3));
|
Service::OnDowntimeAdded.connect(boost::bind(&ClusterListener::DowntimeAddedHandler, this, _1, _2, _3));
|
||||||
Service::OnDowntimeRemoved.connect(boost::bind(&ClusterComponent::DowntimeRemovedHandler, this, _1, _2, _3));
|
Service::OnDowntimeRemoved.connect(boost::bind(&ClusterListener::DowntimeRemovedHandler, this, _1, _2, _3));
|
||||||
Service::OnAcknowledgementSet.connect(boost::bind(&ClusterComponent::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6));
|
Service::OnAcknowledgementSet.connect(boost::bind(&ClusterListener::AcknowledgementSetHandler, this, _1, _2, _3, _4, _5, _6));
|
||||||
Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterComponent::AcknowledgementClearedHandler, this, _1, _2));
|
Service::OnAcknowledgementCleared.connect(boost::bind(&ClusterListener::AcknowledgementClearedHandler, this, _1, _2));
|
||||||
|
|
||||||
Endpoint::OnMessageReceived.connect(boost::bind(&ClusterComponent::AsyncMessageHandler, this, _1, _2));
|
Endpoint::OnMessageReceived.connect(boost::bind(&ClusterListener::AsyncMessageHandler, this, _1, _2));
|
||||||
|
|
||||||
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
|
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
|
||||||
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
|
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
|
||||||
@ -119,56 +119,56 @@ void ClusterComponent::Start(void)
|
|||||||
/**
|
/**
|
||||||
* Stops the component.
|
* Stops the component.
|
||||||
*/
|
*/
|
||||||
void ClusterComponent::Stop(void)
|
void ClusterListener::Stop(void)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
CloseLogFile();
|
CloseLogFile();
|
||||||
RotateLogFile();
|
RotateLogFile();
|
||||||
}
|
}
|
||||||
|
|
||||||
String ClusterComponent::GetCertificateFile(void) const
|
String ClusterListener::GetCertificateFile(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
return m_CertPath;
|
return m_CertPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
String ClusterComponent::GetCAFile(void) const
|
String ClusterListener::GetCAFile(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
return m_CAPath;
|
return m_CAPath;
|
||||||
}
|
}
|
||||||
|
|
||||||
String ClusterComponent::GetBindHost(void) const
|
String ClusterListener::GetBindHost(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
return m_BindHost;
|
return m_BindHost;
|
||||||
}
|
}
|
||||||
|
|
||||||
String ClusterComponent::GetBindPort(void) const
|
String ClusterListener::GetBindPort(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
return m_BindPort;
|
return m_BindPort;
|
||||||
}
|
}
|
||||||
|
|
||||||
Array::Ptr ClusterComponent::GetPeers(void) const
|
Array::Ptr ClusterListener::GetPeers(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
return m_Peers;
|
return m_Peers;
|
||||||
}
|
}
|
||||||
|
|
||||||
shared_ptr<SSL_CTX> ClusterComponent::GetSSLContext(void) const
|
shared_ptr<SSL_CTX> ClusterListener::GetSSLContext(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
return m_SSLContext;
|
return m_SSLContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
String ClusterComponent::GetIdentity(void) const
|
String ClusterListener::GetIdentity(void) const
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
@ -180,7 +180,7 @@ String ClusterComponent::GetIdentity(void) const
|
|||||||
*
|
*
|
||||||
* @param service The port to listen on.
|
* @param service The port to listen on.
|
||||||
*/
|
*/
|
||||||
void ClusterComponent::AddListener(const String& service)
|
void ClusterListener::AddListener(const String& service)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
@ -196,13 +196,13 @@ void ClusterComponent::AddListener(const String& service)
|
|||||||
TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
|
TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
|
||||||
server->Bind(service, AF_INET6);
|
server->Bind(service, AF_INET6);
|
||||||
|
|
||||||
boost::thread thread(boost::bind(&ClusterComponent::ListenerThreadProc, this, server));
|
boost::thread thread(boost::bind(&ClusterListener::ListenerThreadProc, this, server));
|
||||||
thread.detach();
|
thread.detach();
|
||||||
|
|
||||||
m_Servers.insert(server);
|
m_Servers.insert(server);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::ListenerThreadProc(const Socket::Ptr& server)
|
void ClusterListener::ListenerThreadProc(const Socket::Ptr& server)
|
||||||
{
|
{
|
||||||
Utility::SetThreadName("Cluster Listener");
|
Utility::SetThreadName("Cluster Listener");
|
||||||
|
|
||||||
@ -211,7 +211,7 @@ void ClusterComponent::ListenerThreadProc(const Socket::Ptr& server)
|
|||||||
for (;;) {
|
for (;;) {
|
||||||
Socket::Ptr client = server->Accept();
|
Socket::Ptr client = server->Accept();
|
||||||
|
|
||||||
Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleServer));
|
Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleServer));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -221,7 +221,7 @@ void ClusterComponent::ListenerThreadProc(const Socket::Ptr& server)
|
|||||||
* @param node The remote host.
|
* @param node The remote host.
|
||||||
* @param service The remote port.
|
* @param service The remote port.
|
||||||
*/
|
*/
|
||||||
void ClusterComponent::AddConnection(const String& node, const String& service) {
|
void ClusterListener::AddConnection(const String& node, const String& service) {
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
ObjectLock olock(this);
|
||||||
|
|
||||||
@ -234,15 +234,15 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
|
|||||||
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
|
TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
|
||||||
|
|
||||||
client->Connect(node, service);
|
client->Connect(node, service);
|
||||||
Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleClient));
|
Utility::QueueAsyncCallback(boost::bind(&ClusterListener::NewClientHandler, this, client, TlsRoleClient));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
|
void ClusterListener::AsyncRelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
|
||||||
{
|
{
|
||||||
m_RelayQueue.Enqueue(boost::bind(&ClusterComponent::RelayMessage, this, source, message, persistent));
|
m_RelayQueue.Enqueue(boost::bind(&ClusterListener::RelayMessage, this, source, message, persistent));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
|
void ClusterListener::PersistMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message)
|
||||||
{
|
{
|
||||||
double ts = message->Get("ts");
|
double ts = message->Get("ts");
|
||||||
|
|
||||||
@ -272,13 +272,13 @@ void ClusterComponent::PersistMessage(const Endpoint::Ptr& source, const Diction
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
|
void ClusterListener::RelayMessage(const Endpoint::Ptr& source, const Dictionary::Ptr& message, bool persistent)
|
||||||
{
|
{
|
||||||
double ts = Utility::GetTime();
|
double ts = Utility::GetTime();
|
||||||
message->Set("ts", ts);
|
message->Set("ts", ts);
|
||||||
|
|
||||||
if (persistent)
|
if (persistent)
|
||||||
m_LogQueue.Enqueue(boost::bind(&ClusterComponent::PersistMessage, this, source, message));
|
m_LogQueue.Enqueue(boost::bind(&ClusterListener::PersistMessage, this, source, message));
|
||||||
|
|
||||||
Dictionary::Ptr security = message->Get("security");
|
Dictionary::Ptr security = message->Get("security");
|
||||||
DynamicObject::Ptr secobj;
|
DynamicObject::Ptr secobj;
|
||||||
@ -328,12 +328,12 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& source, const Dictionar
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
String ClusterComponent::GetClusterDir(void) const
|
String ClusterListener::GetClusterDir(void) const
|
||||||
{
|
{
|
||||||
return Application::GetLocalStateDir() + "/lib/icinga2/cluster/";
|
return Application::GetLocalStateDir() + "/lib/icinga2/cluster/";
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::OpenLogFile(void)
|
void ClusterListener::OpenLogFile(void)
|
||||||
{
|
{
|
||||||
ASSERT(OwnsLock());
|
ASSERT(OwnsLock());
|
||||||
|
|
||||||
@ -352,7 +352,7 @@ void ClusterComponent::OpenLogFile(void)
|
|||||||
m_LogMessageTimestamp = 0;
|
m_LogMessageTimestamp = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::CloseLogFile(void)
|
void ClusterListener::CloseLogFile(void)
|
||||||
{
|
{
|
||||||
ASSERT(OwnsLock());
|
ASSERT(OwnsLock());
|
||||||
|
|
||||||
@ -364,7 +364,7 @@ void ClusterComponent::CloseLogFile(void)
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::RotateLogFile(void)
|
void ClusterListener::RotateLogFile(void)
|
||||||
{
|
{
|
||||||
ASSERT(OwnsLock());
|
ASSERT(OwnsLock());
|
||||||
|
|
||||||
@ -378,7 +378,7 @@ void ClusterComponent::RotateLogFile(void)
|
|||||||
(void) rename(oldpath.CStr(), newpath.CStr());
|
(void) rename(oldpath.CStr(), newpath.CStr());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
|
void ClusterListener::LogGlobHandler(std::vector<int>& files, const String& file)
|
||||||
{
|
{
|
||||||
String name = Utility::BaseName(file);
|
String name = Utility::BaseName(file);
|
||||||
|
|
||||||
@ -393,7 +393,7 @@ void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& fil
|
|||||||
files.push_back(ts);
|
files.push_back(ts);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
|
void ClusterListener::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
|
||||||
{
|
{
|
||||||
int count = -1;
|
int count = -1;
|
||||||
double peer_ts = endpoint->GetLocalLogPosition();
|
double peer_ts = endpoint->GetLocalLogPosition();
|
||||||
@ -417,7 +417,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
|
|||||||
count = 0;
|
count = 0;
|
||||||
|
|
||||||
std::vector<int> files;
|
std::vector<int> files;
|
||||||
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
|
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterListener::LogGlobHandler, boost::ref(files), _1));
|
||||||
std::sort(files.begin(), files.end());
|
std::sort(files.begin(), files.end());
|
||||||
|
|
||||||
BOOST_FOREACH(int ts, files) {
|
BOOST_FOREACH(int ts, files) {
|
||||||
@ -505,7 +505,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
|
void ClusterListener::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
|
||||||
{
|
{
|
||||||
Dictionary::Ptr elem = boost::make_shared<Dictionary>();
|
Dictionary::Ptr elem = boost::make_shared<Dictionary>();
|
||||||
|
|
||||||
@ -524,7 +524,7 @@ void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const St
|
|||||||
*
|
*
|
||||||
* @param client The new client.
|
* @param client The new client.
|
||||||
*/
|
*/
|
||||||
void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
void ClusterListener::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
||||||
{
|
{
|
||||||
NetworkStream::Ptr netStream = boost::make_shared<NetworkStream>(client);
|
NetworkStream::Ptr netStream = boost::make_shared<NetworkStream>(client);
|
||||||
|
|
||||||
@ -562,7 +562,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
|||||||
if (configFiles) {
|
if (configFiles) {
|
||||||
ObjectLock olock(configFiles);
|
ObjectLock olock(configFiles);
|
||||||
BOOST_FOREACH(const String& pattern, configFiles) {
|
BOOST_FOREACH(const String& pattern, configFiles) {
|
||||||
Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1, false));
|
Utility::Glob(pattern, boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(config), _1, false));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -581,7 +581,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
|
|||||||
ReplayLog(endpoint, tlsStream);
|
ReplayLog(endpoint, tlsStream);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::ClusterTimerHandler(void)
|
void ClusterListener::ClusterTimerHandler(void)
|
||||||
{
|
{
|
||||||
/* broadcast a heartbeat message */
|
/* broadcast a heartbeat message */
|
||||||
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
||||||
@ -620,7 +620,7 @@ void ClusterComponent::ClusterTimerHandler(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::vector<int> files;
|
std::vector<int> files;
|
||||||
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
|
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterListener::LogGlobHandler, boost::ref(files), _1));
|
||||||
std::sort(files.begin(), files.end());
|
std::sort(files.begin(), files.end());
|
||||||
|
|
||||||
BOOST_FOREACH(int ts, files) {
|
BOOST_FOREACH(int ts, files) {
|
||||||
@ -686,7 +686,7 @@ void ClusterComponent::ClusterTimerHandler(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs)
|
void ClusterListener::SetSecurityInfo(const Dictionary::Ptr& message, const DynamicObject::Ptr& object, int privs)
|
||||||
{
|
{
|
||||||
ASSERT(object);
|
ASSERT(object);
|
||||||
|
|
||||||
@ -698,7 +698,7 @@ void ClusterComponent::SetSecurityInfo(const Dictionary::Ptr& message, const Dyn
|
|||||||
message->Set("security", security);
|
message->Set("security", security);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
|
void ClusterListener::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -717,7 +717,7 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
|
void ClusterListener::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -736,7 +736,7 @@ void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, doub
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
|
void ClusterListener::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -755,7 +755,7 @@ void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& n
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
|
void ClusterListener::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -774,7 +774,7 @@ void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service,
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
|
void ClusterListener::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -793,7 +793,7 @@ void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& s
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
void ClusterListener::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -812,7 +812,7 @@ void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& serv
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
void ClusterListener::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -831,7 +831,7 @@ void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& ser
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
void ClusterListener::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -850,7 +850,7 @@ void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& ser
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
void ClusterListener::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -869,7 +869,7 @@ void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service,
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
|
void ClusterListener::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -888,7 +888,7 @@ void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Di
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
|
void ClusterListener::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -907,7 +907,7 @@ void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
|
void ClusterListener::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -926,7 +926,7 @@ void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const D
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
|
void ClusterListener::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -945,7 +945,7 @@ void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
|
void ClusterListener::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -967,7 +967,7 @@ void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, co
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
|
void ClusterListener::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
|
||||||
{
|
{
|
||||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||||
return;
|
return;
|
||||||
@ -985,12 +985,12 @@ void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service
|
|||||||
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
AsyncRelayMessage(Endpoint::Ptr(), message, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
|
void ClusterListener::AsyncMessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
|
||||||
{
|
{
|
||||||
m_MessageQueue.Enqueue(boost::bind(&ClusterComponent::MessageHandler, this, sender, message));
|
m_MessageQueue.Enqueue(boost::bind(&ClusterListener::MessageHandler, this, sender, message));
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
|
void ClusterListener::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
|
||||||
{
|
{
|
||||||
sender->SetSeen(Utility::GetTime());
|
sender->SetSeen(Utility::GetTime());
|
||||||
|
|
||||||
@ -1410,7 +1410,7 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
|
|||||||
}
|
}
|
||||||
|
|
||||||
Dictionary::Ptr localConfig = boost::make_shared<Dictionary>();
|
Dictionary::Ptr localConfig = boost::make_shared<Dictionary>();
|
||||||
Utility::Glob(dir + "/*", boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(localConfig), _1, true));
|
Utility::Glob(dir + "/*", boost::bind(&ClusterListener::ConfigGlobHandler, boost::cref(localConfig), _1, true));
|
||||||
|
|
||||||
bool configChange = false;
|
bool configChange = false;
|
||||||
|
|
||||||
@ -1471,7 +1471,7 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ClusterComponent::IsAuthority(const DynamicObject::Ptr& object, const String& type)
|
bool ClusterListener::IsAuthority(const DynamicObject::Ptr& object, const String& type)
|
||||||
{
|
{
|
||||||
Array::Ptr authorities = object->GetAuthorities();
|
Array::Ptr authorities = object->GetAuthorities();
|
||||||
std::vector<String> endpoints;
|
std::vector<String> endpoints;
|
||||||
@ -1513,7 +1513,7 @@ bool ClusterComponent::IsAuthority(const DynamicObject::Ptr& object, const Strin
|
|||||||
return (endpoints[index] == GetIdentity());
|
return (endpoints[index] == GetIdentity());
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::UpdateAuthority(void)
|
void ClusterListener::UpdateAuthority(void)
|
||||||
{
|
{
|
||||||
Log(LogDebug, "cluster", "Updating authority for objects.");
|
Log(LogDebug, "cluster", "Updating authority for objects.");
|
||||||
|
|
||||||
@ -1525,7 +1525,7 @@ void ClusterComponent::UpdateAuthority(void)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ClusterComponent::SupportsChecks(void)
|
bool ClusterListener::SupportsChecks(void)
|
||||||
{
|
{
|
||||||
DynamicType::Ptr type = DynamicType::GetByName("CheckerComponent");
|
DynamicType::Ptr type = DynamicType::GetByName("CheckerComponent");
|
||||||
|
|
||||||
@ -1535,7 +1535,7 @@ bool ClusterComponent::SupportsChecks(void)
|
|||||||
return !type->GetObjects().empty();
|
return !type->GetObjects().empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
bool ClusterComponent::SupportsNotifications(void)
|
bool ClusterListener::SupportsNotifications(void)
|
||||||
{
|
{
|
||||||
DynamicType::Ptr type = DynamicType::GetByName("NotificationComponent");
|
DynamicType::Ptr type = DynamicType::GetByName("NotificationComponent");
|
||||||
|
|
||||||
@ -1545,7 +1545,7 @@ bool ClusterComponent::SupportsNotifications(void)
|
|||||||
return !type->GetObjects().empty();
|
return !type->GetObjects().empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
|
void ClusterListener::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
|
||||||
{
|
{
|
||||||
DynamicObject::InternalSerialize(bag, attributeTypes);
|
DynamicObject::InternalSerialize(bag, attributeTypes);
|
||||||
|
|
||||||
@ -1561,7 +1561,7 @@ void ClusterComponent::InternalSerialize(const Dictionary::Ptr& bag, int attribu
|
|||||||
bag->Set("log_message_timestamp", m_LogMessageTimestamp);
|
bag->Set("log_message_timestamp", m_LogMessageTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterComponent::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
|
void ClusterListener::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
|
||||||
{
|
{
|
||||||
DynamicObject::InternalDeserialize(bag, attributeTypes);
|
DynamicObject::InternalDeserialize(bag, attributeTypes);
|
||||||
|
|
@ -17,8 +17,8 @@
|
|||||||
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
|
||||||
******************************************************************************/
|
******************************************************************************/
|
||||||
|
|
||||||
#ifndef CLUSTERCOMPONENT_H
|
#ifndef CLUSTERLISTENER_H
|
||||||
#define CLUSTERCOMPONENT_H
|
#define CLUSTERLISTENER_H
|
||||||
|
|
||||||
#include "base/dynamicobject.h"
|
#include "base/dynamicobject.h"
|
||||||
#include "base/timer.h"
|
#include "base/timer.h"
|
||||||
@ -36,13 +36,13 @@ namespace icinga
|
|||||||
{
|
{
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @ingroup demo
|
* @ingroup cluster
|
||||||
*/
|
*/
|
||||||
class ClusterComponent : public DynamicObject
|
class ClusterListener : public DynamicObject
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
DECLARE_PTR_TYPEDEFS(ClusterComponent);
|
DECLARE_PTR_TYPEDEFS(ClusterListener);
|
||||||
DECLARE_TYPENAME(ClusterComponent);
|
DECLARE_TYPENAME(ClusterListener);
|
||||||
|
|
||||||
virtual void Start(void);
|
virtual void Start(void);
|
||||||
virtual void Stop(void);
|
virtual void Stop(void);
|
||||||
@ -133,4 +133,4 @@ private:
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif /* CLUSTERCOMPONENT_H */
|
#endif /* CLUSTERLISTENER_H */
|
Loading…
x
Reference in New Issue
Block a user