mirror of
https://github.com/Icinga/icinga2.git
synced 2025-04-08 17:05:25 +02:00
Implement some cluster events.
This commit is contained in:
parent
58a932ab6f
commit
0d24b941f5
@ -52,7 +52,12 @@ void ClusterComponent::Start(void)
|
||||
m_ReconnectTimer->SetInterval(5);
|
||||
m_ReconnectTimer->Start();
|
||||
|
||||
Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2));
|
||||
Service::OnNewCheckResult.connect(bind(&ClusterComponent::CheckResultHandler, this, _1, _2, _3));
|
||||
Service::OnNextCheckChanged.connect(bind(&ClusterComponent::NextCheckChangedHandler, this, _1, _2, _3));
|
||||
Service::OnForceNextCheckChanged.connect(bind(&ClusterComponent::ForceNextCheckChangedHandler, this, _1, _2, _3));
|
||||
Service::OnEnableActiveChecksChanged.connect(bind(&ClusterComponent::EnableActiveChecksChangedHandler, this, _1, _2, _3));
|
||||
Service::OnEnablePassiveChecksChanged.connect(bind(&ClusterComponent::EnablePassiveChecksChangedHandler, this, _1, _2, _3));
|
||||
|
||||
Endpoint::OnMessageReceived.connect(bind(&ClusterComponent::MessageHandler, this, _1, _2));
|
||||
}
|
||||
|
||||
@ -244,13 +249,11 @@ void ClusterComponent::ReconnectTimerHandler(void)
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr)
|
||||
void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority)
|
||||
{
|
||||
if (cr->Contains("source") && cr->Get("source") != GetIdentity())
|
||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||
return;
|
||||
|
||||
cr->Set("source", GetIdentity());
|
||||
|
||||
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
||||
params->Set("service", service->GetName());
|
||||
params->Set("check_result", cr);
|
||||
@ -265,19 +268,95 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message)
|
||||
void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
|
||||
{
|
||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||
return;
|
||||
|
||||
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
||||
params->Set("service", service->GetName());
|
||||
params->Set("next_check", nextCheck);
|
||||
|
||||
Dictionary::Ptr message = boost::make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::SetNextCheck");
|
||||
message->Set("params", params);
|
||||
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
endpoint->SendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
|
||||
{
|
||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||
return;
|
||||
|
||||
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
||||
params->Set("service", service->GetName());
|
||||
params->Set("forced", forced);
|
||||
|
||||
Dictionary::Ptr message = boost::make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::SetForceNextCheck");
|
||||
message->Set("params", params);
|
||||
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
endpoint->SendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
||||
{
|
||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||
return;
|
||||
|
||||
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
||||
params->Set("service", service->GetName());
|
||||
params->Set("enabled", enabled);
|
||||
|
||||
Dictionary::Ptr message = boost::make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::SetEnableActiveChecks");
|
||||
message->Set("params", params);
|
||||
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
endpoint->SendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
|
||||
{
|
||||
if (!authority.IsEmpty() && authority != GetIdentity())
|
||||
return;
|
||||
|
||||
Dictionary::Ptr params = boost::make_shared<Dictionary>();
|
||||
params->Set("service", service->GetName());
|
||||
params->Set("enabled", enabled);
|
||||
|
||||
Dictionary::Ptr message = boost::make_shared<Dictionary>();
|
||||
message->Set("jsonrpc", "2.0");
|
||||
message->Set("method", "cluster::SetEnablePassiveChecks");
|
||||
message->Set("params", params);
|
||||
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
endpoint->SendMessage(message);
|
||||
}
|
||||
}
|
||||
|
||||
void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
|
||||
{
|
||||
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
|
||||
if (sender != endpoint)
|
||||
endpoint->SendMessage(message);
|
||||
}
|
||||
|
||||
Dictionary::Ptr params = message->Get("params");
|
||||
|
||||
if (!params)
|
||||
return;
|
||||
|
||||
if (message->Get("method") == "cluster::CheckResult") {
|
||||
Dictionary::Ptr params = message->Get("params");
|
||||
|
||||
if (!params)
|
||||
return;
|
||||
|
||||
Dictionary::Ptr cr = params->Get("check_result");
|
||||
|
||||
if (!cr)
|
||||
return;
|
||||
|
||||
String svc = params->Get("service");
|
||||
|
||||
Service::Ptr service = Service::GetByName(svc);
|
||||
@ -285,12 +364,56 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti
|
||||
if (!service)
|
||||
return;
|
||||
|
||||
service->ProcessCheckResult(cr);
|
||||
Dictionary::Ptr cr = params->Get("check_result");
|
||||
|
||||
/* Reschedule the next check. The side effect of this is that for as long
|
||||
* as we receive results for a service we won't execute any
|
||||
* active checks. */
|
||||
service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval());
|
||||
if (!cr)
|
||||
return;
|
||||
|
||||
service->ProcessCheckResult(cr, sender->GetName());
|
||||
} else if (message->Get("method") == "cluster::SetNextCheck") {
|
||||
String svc = params->Get("service");
|
||||
|
||||
Service::Ptr service = Service::GetByName(svc);
|
||||
|
||||
if (!service)
|
||||
return;
|
||||
|
||||
double nextCheck = params->Get("next_check");
|
||||
|
||||
service->SetNextCheck(nextCheck, sender->GetName());
|
||||
} else if (message->Get("method") == "cluster::SetForceNextCheck") {
|
||||
String svc = params->Get("service");
|
||||
|
||||
Service::Ptr service = Service::GetByName(svc);
|
||||
|
||||
if (!service)
|
||||
return;
|
||||
|
||||
bool forced = params->Get("forced");
|
||||
|
||||
service->SetForceNextCheck(forced, sender->GetName());
|
||||
} else if (message->Get("method") == "cluster::SetEnableActiveChecks") {
|
||||
String svc = params->Get("service");
|
||||
|
||||
Service::Ptr service = Service::GetByName(svc);
|
||||
|
||||
if (!service)
|
||||
return;
|
||||
|
||||
bool enabled = params->Get("enabled");
|
||||
|
||||
service->SetEnableActiveChecks(enabled, sender->GetName());
|
||||
} else if (message->Get("method") == "cluster::SetEnablePassiveChecks") {
|
||||
String svc = params->Get("service");
|
||||
|
||||
Service::Ptr service = Service::GetByName(svc);
|
||||
|
||||
if (!service)
|
||||
return;
|
||||
|
||||
bool enabled = params->Get("enabled");
|
||||
|
||||
service->SetEnablePassiveChecks(enabled, sender->GetName());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,8 +79,12 @@ private:
|
||||
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
|
||||
void ListenerThreadProc(const Socket::Ptr& server);
|
||||
|
||||
void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr);
|
||||
void MessageHandler(const Endpoint::Ptr& endpoint, const Dictionary::Ptr& message);
|
||||
void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
|
||||
void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
|
||||
void ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority);
|
||||
void EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
|
||||
void EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority);
|
||||
void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
|
||||
|
||||
};
|
||||
|
||||
|
@ -27,6 +27,7 @@ icinga2_LDADD = \
|
||||
${top_builddir}/lib/config/libconfig.la \
|
||||
-dlopen ${top_builddir}/lib/icinga/libicinga.la \
|
||||
-dlopen ${top_builddir}/components/checker/libchecker.la \
|
||||
-dlopen ${top_builddir}/components/cluster/libcluster.la \
|
||||
-dlopen ${top_builddir}/components/compat/libcompat.la \
|
||||
-dlopen ${top_builddir}/components/demo/libdemo.la \
|
||||
-dlopen ${top_builddir}/components/livestatus/liblivestatus.la \
|
||||
|
@ -243,6 +243,7 @@ void Host::UpdateSlaveServices(void)
|
||||
|
||||
ConfigItem::Ptr serviceItem = builder->Compile();
|
||||
DynamicObject::Ptr dobj = serviceItem->Commit();
|
||||
dobj->Start();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -37,9 +37,12 @@ const int Service::DefaultMaxCheckAttempts = 3;
|
||||
const double Service::DefaultCheckInterval = 5 * 60;
|
||||
const double Service::CheckIntervalDivisor = 5.0;
|
||||
|
||||
boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&)> Service::OnNewCheckResult;
|
||||
boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> Service::OnNewCheckResult;
|
||||
boost::signals2::signal<void (const Service::Ptr&, NotificationType, const Dictionary::Ptr&, const String&, const String&)> Service::OnNotificationsRequested;
|
||||
boost::signals2::signal<void (const Service::Ptr&)> Service::OnNextCheckChanged;
|
||||
boost::signals2::signal<void (const Service::Ptr&, double, const String&)> Service::OnNextCheckChanged;
|
||||
boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnForceNextCheckChanged;
|
||||
boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnEnableActiveChecksChanged;
|
||||
boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> Service::OnEnablePassiveChecksChanged;
|
||||
boost::signals2::signal<void (const Service::Ptr&, FlappingState)> Service::OnFlappingChanged;
|
||||
|
||||
CheckCommand::Ptr Service::GetCheckCommand(void) const
|
||||
@ -91,11 +94,11 @@ long Service::GetSchedulingOffset(void)
|
||||
return m_SchedulingOffset;
|
||||
}
|
||||
|
||||
void Service::SetNextCheck(double nextCheck)
|
||||
void Service::SetNextCheck(double nextCheck, const String& authority)
|
||||
{
|
||||
m_NextCheck = nextCheck;
|
||||
|
||||
Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf()));
|
||||
Utility::QueueAsyncCallback(bind(boost::ref(Service::OnNextCheckChanged), GetSelf(), nextCheck, authority));
|
||||
}
|
||||
|
||||
double Service::GetNextCheck(void)
|
||||
@ -401,9 +404,11 @@ bool Service::GetEnableActiveChecks(void) const
|
||||
return m_EnableActiveChecks;
|
||||
}
|
||||
|
||||
void Service::SetEnableActiveChecks(bool enabled)
|
||||
void Service::SetEnableActiveChecks(bool enabled, const String& authority)
|
||||
{
|
||||
m_EnableActiveChecks = enabled ? 1 : 0;
|
||||
|
||||
Utility::QueueAsyncCallback(bind(boost::ref(OnEnableActiveChecksChanged), GetSelf(), enabled, authority));
|
||||
}
|
||||
|
||||
bool Service::GetEnablePassiveChecks(void) const
|
||||
@ -414,9 +419,11 @@ bool Service::GetEnablePassiveChecks(void) const
|
||||
return m_EnablePassiveChecks;
|
||||
}
|
||||
|
||||
void Service::SetEnablePassiveChecks(bool enabled)
|
||||
void Service::SetEnablePassiveChecks(bool enabled, const String& authority)
|
||||
{
|
||||
m_EnablePassiveChecks = enabled ? 1 : 0;
|
||||
|
||||
Utility::QueueAsyncCallback(bind(boost::ref(OnEnablePassiveChecksChanged), GetSelf(), enabled, authority));
|
||||
}
|
||||
|
||||
bool Service::GetForceNextCheck(void) const
|
||||
@ -427,12 +434,14 @@ bool Service::GetForceNextCheck(void) const
|
||||
return static_cast<bool>(m_ForceNextCheck);
|
||||
}
|
||||
|
||||
void Service::SetForceNextCheck(bool forced)
|
||||
void Service::SetForceNextCheck(bool forced, const String& authority)
|
||||
{
|
||||
m_ForceNextCheck = forced ? 1 : 0;
|
||||
|
||||
Utility::QueueAsyncCallback(bind(boost::ref(OnForceNextCheckChanged), GetSelf(), forced, authority));
|
||||
}
|
||||
|
||||
void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
|
||||
void Service::ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority)
|
||||
{
|
||||
double now = Utility::GetTime();
|
||||
|
||||
@ -607,7 +616,7 @@ void Service::ProcessCheckResult(const Dictionary::Ptr& cr)
|
||||
" threshold: " + Convert::ToString(GetFlappingThreshold()) +
|
||||
"% current: " + Convert::ToString(GetFlappingCurrent()) + "%.");
|
||||
|
||||
OnNewCheckResult(GetSelf(), cr);
|
||||
OnNewCheckResult(GetSelf(), cr, authority);
|
||||
OnStateChanged(GetSelf());
|
||||
|
||||
if (call_eventhandler)
|
||||
|
@ -192,7 +192,8 @@ void Service::UpdateSlaveNotifications(void)
|
||||
builder->AddExpressionList(nfc_exprl);
|
||||
|
||||
ConfigItem::Ptr notificationItem = builder->Compile();
|
||||
notificationItem->Commit();
|
||||
DynamicObject::Ptr dobj = notificationItem->Commit();
|
||||
dobj->Start();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -160,7 +160,7 @@ public:
|
||||
long GetSchedulingOffset(void);
|
||||
void SetSchedulingOffset(long offset);
|
||||
|
||||
void SetNextCheck(double nextCheck);
|
||||
void SetNextCheck(double nextCheck, const String& authority = String());
|
||||
double GetNextCheck(void);
|
||||
void UpdateNextCheck(void);
|
||||
|
||||
@ -218,13 +218,13 @@ public:
|
||||
bool GetLastReachable(void) const;
|
||||
|
||||
bool GetEnableActiveChecks(void) const;
|
||||
void SetEnableActiveChecks(bool enabled);
|
||||
void SetEnableActiveChecks(bool enabled, const String& authority = String());
|
||||
|
||||
bool GetEnablePassiveChecks(void) const;
|
||||
void SetEnablePassiveChecks(bool enabled);
|
||||
void SetEnablePassiveChecks(bool enabled, const String& authority = String());
|
||||
|
||||
bool GetForceNextCheck(void) const;
|
||||
void SetForceNextCheck(bool forced);
|
||||
void SetForceNextCheck(bool forced, const String& authority = String());
|
||||
|
||||
double GetAcknowledgementExpiry(void) const;
|
||||
void SetAcknowledgementExpiry(double timestamp);
|
||||
@ -235,7 +235,7 @@ public:
|
||||
void ClearAcknowledgement(void);
|
||||
|
||||
void ExecuteCheck(void);
|
||||
void ProcessCheckResult(const Dictionary::Ptr& cr);
|
||||
void ProcessCheckResult(const Dictionary::Ptr& cr, const String& authority = String());
|
||||
|
||||
static double CalculateExecutionTime(const Dictionary::Ptr& cr);
|
||||
static double CalculateLatency(const Dictionary::Ptr& cr);
|
||||
@ -246,8 +246,11 @@ public:
|
||||
static StateType StateTypeFromString(const String& state);
|
||||
static String StateTypeToString(StateType state);
|
||||
|
||||
static boost::signals2::signal<void (const Service::Ptr&)> OnNextCheckChanged;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&)> OnNewCheckResult;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, double, const String&)> OnNextCheckChanged;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnForceNextCheckChanged;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnEnableActiveChecksChanged;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, bool, const String&)> OnEnablePassiveChecksChanged;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, const Dictionary::Ptr&, const String&)> OnNewCheckResult;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, NotificationType, const Dictionary::Ptr&, const String&, const String&)> OnNotificationsRequested;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, const User::Ptr&, const NotificationType&, const Dictionary::Ptr&, const String&, const String&)> OnNotificationSentChanged;
|
||||
static boost::signals2::signal<void (const Service::Ptr&, DowntimeState)> OnDowntimeChanged;
|
||||
|
Loading…
x
Reference in New Issue
Block a user