diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index ef228e1a9..2bd2ecdec 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -18,6 +18,7 @@ ******************************************************************************/ #include "i2-delegation.h" +#include using namespace icinga; @@ -97,23 +98,108 @@ void DelegationComponent::RevokeServiceResponseHandler(Service& service, const E { } +vector DelegationComponent::GetCheckerCandidates(const Service& service) const +{ + vector candidates; + + EndpointManager::Iterator it; + for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) + candidates.push_back(it->second); + + return candidates; +} + void DelegationComponent::DelegationTimerHandler(void) { + map histogram; + + EndpointManager::Iterator eit; + for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) { + histogram[eit->second] = 0; + } + + /* nothing to do if we have no checkers */ + if (histogram.size() == 0) + return; + + vector services; + + /* build "checker -> service count" histogram */ ConfigObject::Set::Iterator it; - long delegated = 0; for (it = m_AllServices->Begin(); it != m_AllServices->End(); it++) { Service service = *it; + services.push_back(service); + string checker = service.GetChecker(); - if (!checker.empty() && GetEndpointManager()->GetEndpointByIdentity(checker)) + if (checker.empty()) continue; - AssignService(service); - delegated++; + Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(checker); + if (!endpoint) + continue; + + histogram[endpoint]++; + } + + std::random_shuffle(services.begin(), services.end()); + + long delegated = 0; + + /* re-assign services */ + vector::iterator sit; + for (sit = services.begin(); sit != services.end(); it++) { + Service service = *sit; + + string checker = service.GetChecker(); + + Endpoint::Ptr oldEndpoint; + if (!checker.empty()) + oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker); + + vector candidates = GetCheckerCandidates(service); + + long avg_services = 0; + vector::iterator cit; + for (cit = candidates.begin(); cit != candidates.end(); cit++) + avg_services += histogram[*cit]; + + avg_services /= candidates.size(); + long overflow_tolerance = candidates.size() * 2; + + /* don't re-assign service if the checker is still valid + * and doesn't have too many services */ + if (oldEndpoint && find(candidates.begin(), candidates.end(), oldEndpoint) != candidates.end() && + histogram[oldEndpoint] <= avg_services + overflow_tolerance) + continue; + + /* clear the service's current checker */ + if (!checker.empty()) { + service.SetChecker(""); + + if (oldEndpoint) + histogram[oldEndpoint]--; + } + + /* find a new checker for the service */ + for (cit = candidates.begin(); cit != candidates.end(); cit++) { + Endpoint::Ptr newEndpoint = *cit; + + /* does this checker already have too many services */ + if (histogram[newEndpoint] > avg_services) + continue; + + service.SetChecker(newEndpoint->GetIdentity()); + histogram[newEndpoint]++; + + delegated++; + } + + assert(!service.GetChecker().empty()); } stringstream msgbuf; - msgbuf << "Delegated " << delegated << " services"; + msgbuf << "Re-delegated " << delegated << " services"; Application::Log(LogInformation, "delegation", msgbuf.str()); } diff --git a/components/delegation/delegationcomponent.h b/components/delegation/delegationcomponent.h index d2559538b..c4d85e6c5 100644 --- a/components/delegation/delegationcomponent.h +++ b/components/delegation/delegationcomponent.h @@ -46,6 +46,8 @@ private: void DelegationTimerHandler(void); + vector GetCheckerCandidates(const Service& service) const; + void AssignService(const Service& service); void RevokeService(const Service& service); }; diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index b8821863b..43354ccfc 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -377,3 +377,14 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const m_Requests.erase(it); RescheduleRequestTimer(); } + +EndpointManager::Iterator EndpointManager::Begin(void) +{ + return m_Endpoints.begin(); +} + +EndpointManager::Iterator EndpointManager::End(void) +{ + return m_Endpoints.end(); +} + diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index 016c8ca83..458e46858 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -34,6 +34,8 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; + typedef map::iterator Iterator; + EndpointManager(void) : m_NextMessageID(0) { } @@ -60,6 +62,8 @@ public: void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); void ForEachEndpoint(function callback); + Iterator Begin(void); + Iterator End(void); Endpoint::Ptr GetEndpointByIdentity(string identity) const;