From db4fde95549d1b0bb433482b7259b3fa81375503 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 21 Jun 2012 00:10:10 +0200 Subject: [PATCH] Bugfixes for the delegation feature. --- base/timer.cpp | 4 +- components/delegation/delegationcomponent.cpp | 68 ++++++++++++------- components/delegation/delegationcomponent.h | 7 +- icinga/endpointmanager.cpp | 7 +- icinga/endpointmanager.h | 2 +- icinga/nagioschecktask.cpp | 18 ++++- 6 files changed, 68 insertions(+), 38 deletions(-) diff --git a/base/timer.cpp b/base/timer.cpp index 0f003350e..d7646c59d 100644 --- a/base/timer.cpp +++ b/base/timer.cpp @@ -107,10 +107,10 @@ void Timer::Call(void) time_t et; time(&et); - if (et - st > 5) { + if (et - st > 3) { stringstream msgbuf; msgbuf << "Timer call took " << et - st << " seconds."; - Application::Log(LogDebug, "base", msgbuf.str()); + Application::Log(LogInformation, "base", msgbuf.str()); } } diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index 2bd2ecdec..298814757 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -30,15 +30,16 @@ string DelegationComponent::GetName(void) const void DelegationComponent::Start(void) { m_AllServices = boost::make_shared(ConfigObject::GetAllObjects(), ConfigObject::MakeTypePredicate("service")); - m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2)); +/* m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2)); m_AllServices->OnObjectCommitted.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2)); - m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2)); + m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));*/ m_AllServices->Start(); m_DelegationTimer = boost::make_shared(); m_DelegationTimer->SetInterval(30); m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this)); m_DelegationTimer->Start(); + m_DelegationTimer->Reschedule(0); m_DelegationEndpoint = boost::make_shared(); m_DelegationEndpoint->RegisterPublication("checker::AssignService"); @@ -54,17 +55,7 @@ void DelegationComponent::Stop(void) mgr->UnregisterEndpoint(m_DelegationEndpoint); } -void DelegationComponent::NewServiceHandler(const Service& object) -{ - AssignService(object); -} - -void DelegationComponent::RemovedServiceHandler(const Service& object) -{ - RevokeService(object); -} - -void DelegationComponent::AssignService(const Service& service) +void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service) { RequestMessage request; request.SetMethod("checker::AssignService"); @@ -75,7 +66,7 @@ void DelegationComponent::AssignService(const Service& service) Application::Log(LogDebug, "delegation", "Trying to delegate service '" + service.GetName() + "'"); - GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, request, + GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, checker, request, boost::bind(&DelegationComponent::AssignServiceResponseHandler, this, service, _2, _5)); } @@ -83,13 +74,11 @@ void DelegationComponent::AssignServiceResponseHandler(Service& service, const E { if (timedOut) { Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' timed out."); - } else { - service.SetChecker(sender->GetIdentity()); - Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' was successful."); + service.SetChecker(""); } } -void DelegationComponent::RevokeService(const Service& service) +void DelegationComponent::ClearServices(const Endpoint::Ptr& checker) { } @@ -103,8 +92,14 @@ vector DelegationComponent::GetCheckerCandidates(const Service& s vector candidates; EndpointManager::Iterator it; - for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) - candidates.push_back(it->second); + for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) { + Endpoint::Ptr endpoint = it->second; + + if (!endpoint->HasSubscription("checker::AssignService")) + continue; + + candidates.push_back(endpoint); + } return candidates; } @@ -114,9 +109,8 @@ void DelegationComponent::DelegationTimerHandler(void) map histogram; EndpointManager::Iterator eit; - for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) { + for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) histogram[eit->second] = 0; - } /* nothing to do if we have no checkers */ if (histogram.size() == 0) @@ -144,11 +138,11 @@ void DelegationComponent::DelegationTimerHandler(void) std::random_shuffle(services.begin(), services.end()); - long delegated = 0; + int delegated = 0; /* re-assign services */ vector::iterator sit; - for (sit = services.begin(); sit != services.end(); it++) { + for (sit = services.begin(); sit != services.end(); sit++) { Service service = *sit; string checker = service.GetChecker(); @@ -158,14 +152,19 @@ void DelegationComponent::DelegationTimerHandler(void) oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker); vector candidates = GetCheckerCandidates(service); + std::random_shuffle(candidates.begin(), candidates.end()); - long avg_services = 0; + stringstream msgbuf; + msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size(); + Application::Log(LogDebug, "delegation", msgbuf.str()); + + int avg_services = 0; vector::iterator cit; for (cit = candidates.begin(); cit != candidates.end(); cit++) avg_services += histogram[*cit]; avg_services /= candidates.size(); - long overflow_tolerance = candidates.size() * 2; + int overflow_tolerance = candidates.size() * 2; /* don't re-assign service if the checker is still valid * and doesn't have too many services */ @@ -198,6 +197,23 @@ void DelegationComponent::DelegationTimerHandler(void) assert(!service.GetChecker().empty()); } + if (delegated > 0) { + map::iterator hit; + for (hit = histogram.begin(); hit != histogram.end(); hit++) { + ClearServices(hit->first); + } + + for (sit = services.begin(); sit != services.end(); sit++) { + string checker = sit->GetChecker(); + Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(checker); + + if (!endpoint) + continue; + + AssignService(endpoint, *sit); + } + } + stringstream msgbuf; msgbuf << "Re-delegated " << delegated << " services"; Application::Log(LogInformation, "delegation", msgbuf.str()); diff --git a/components/delegation/delegationcomponent.h b/components/delegation/delegationcomponent.h index c4d85e6c5..0ab714737 100644 --- a/components/delegation/delegationcomponent.h +++ b/components/delegation/delegationcomponent.h @@ -38,9 +38,6 @@ private: ConfigObject::Set::Ptr m_AllServices; Timer::Ptr m_DelegationTimer; - void NewServiceHandler(const Service& object); - void RemovedServiceHandler(const Service& object); - void AssignServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut); void RevokeServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut); @@ -48,8 +45,8 @@ private: vector GetCheckerCandidates(const Service& service) const; - void AssignService(const Service& service); - void RevokeService(const Service& service); + void AssignService(const Endpoint::Ptr& checker, const Service& service); + void ClearServices(const Endpoint::Ptr& checker); }; } diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index 43354ccfc..03747e1bd 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -292,7 +292,7 @@ Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const return Endpoint::Ptr(); } -void EndpointManager::SendAPIMessage(Endpoint::Ptr sender, +void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message, function callback, time_t timeout) { @@ -312,7 +312,10 @@ void EndpointManager::SendAPIMessage(Endpoint::Ptr sender, m_Requests[id] = pr; RescheduleRequestTimer(); - SendAnycastMessage(sender, message); + if (!recipient) + SendAnycastMessage(sender, message); + else + SendUnicastMessage(sender, recipient, message); } bool EndpointManager::RequestTimeoutLessComparer(const pair& a, diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index 458e46858..61f84aa00 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -56,7 +56,7 @@ public: void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message); void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message); - void SendAPIMessage(Endpoint::Ptr sender, RequestMessage& message, + void SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message, function callback, time_t timeout = 10); void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); diff --git a/icinga/nagioschecktask.cpp b/icinga/nagioschecktask.cpp index 1bf3c44a5..aa0b5c6a8 100644 --- a/icinga/nagioschecktask.cpp +++ b/icinga/nagioschecktask.cpp @@ -57,8 +57,19 @@ void NagiosCheckTask::RunCheck(void) #ifdef _MSC_VER fp = _popen(m_Command.c_str(), "r"); #else /* _MSC_VER */ + bool use_libc_popen = false; + popen_noshell_pass_to_pclose pclose_arg; - fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg); + + if (!use_libc_popen) { + fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg); + + if (fp == NULL) // TODO: add check for valgrind + use_libc_popen = true; + } + + if (use_libc_popen) + fp = popen(m_Command.c_str(), "r"); #endif /* _MSC_VER */ stringstream outputbuf; @@ -80,7 +91,10 @@ void NagiosCheckTask::RunCheck(void) #ifdef _MSC_VER status = _pclose(fp); #else /* _MSC_VER */ - status = pclose_noshell(&pclose_arg); + if (use_libc_popen) + status = pclose(fp); + else + status = pclose_noshell(&pclose_arg); #endif /* _MSC_VER */ #ifndef _MSC_VER