From ddeda8c512a93a889f623556248f036680c7f057 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 21 Jun 2012 12:51:50 +0200 Subject: [PATCH] Delegation bugfixes. --- base/utility.cpp | 8 +++ components/checker/checkercomponent.cpp | 58 +++++-------------- components/checker/checkercomponent.h | 2 +- components/delegation/delegationcomponent.cpp | 32 +++++----- components/discovery/discoverycomponent.cpp | 26 ++++++++- icinga/endpointmanager.cpp | 42 ++++---------- icinga/endpointmanager.h | 5 +- icinga/jsonrpcendpoint.cpp | 4 +- icinga/jsonrpcendpoint.h | 2 +- 9 files changed, 77 insertions(+), 102 deletions(-) diff --git a/base/utility.cpp b/base/utility.cpp index d280edd29..e4ce045c1 100644 --- a/base/utility.cpp +++ b/base/utility.cpp @@ -100,6 +100,14 @@ shared_ptr Utility::MakeSSLContext(string pubkey, string privkey, strin if (!SSL_CTX_load_verify_locations(sslContext.get(), cakey.c_str(), NULL)) throw OpenSSLException("Could not load public CA key file", ERR_get_error()); + STACK_OF(X509_NAME) *cert_names; + + cert_names = SSL_load_client_CA_file(cakey.c_str()); + if (cert_names == NULL) + throw OpenSSLException("SSL_load_client_CA_file() failed", ERR_get_error()); + + SSL_CTX_set_client_CA_list(sslContext.get(), cert_names); + return sslContext; } diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 69bed9640..14364cba5 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -31,8 +31,6 @@ void CheckerComponent::Start(void) m_CheckerEndpoint = boost::make_shared(); m_CheckerEndpoint->RegisterTopicHandler("checker::AssignService", boost::bind(&CheckerComponent::AssignServiceRequestHandler, this, _2, _3)); - m_CheckerEndpoint->RegisterTopicHandler("checker::RevokeService", - boost::bind(&CheckerComponent::RevokeServiceRequestHandler, this, _2, _3)); m_CheckerEndpoint->RegisterTopicHandler("checker::ClearServices", boost::bind(&CheckerComponent::ClearServicesRequestHandler, this, _2, _3)); m_CheckerEndpoint->RegisterPublication("checker::CheckResult"); @@ -78,6 +76,8 @@ void CheckerComponent::CheckTimerHandler(void) Application::Log(LogDebug, "checker", "Executing service check for '" + service.GetName() + "'"); + m_PendingServices.insert(service.GetName()); + CheckTask::Ptr task = CheckTask::CreateTask(service); task->Enqueue(); @@ -109,6 +109,11 @@ void CheckerComponent::ResultTimerHandler(void) Service service = task->GetService(); + /* if the service isn't in the set of pending services + * it was removed and we need to ignore this check result. */ + if (m_PendingServices.find(service.GetName()) == m_PendingServices.end()) + continue; + CheckResult result = task->GetResult(); Application::Log(LogDebug, "checker", "Got result for service '" + service.GetName() + "'"); @@ -127,6 +132,7 @@ void CheckerComponent::ResultTimerHandler(void) failed++; service.SetNextCheck(now + service.GetCheckInterval()); + m_PendingServices.erase(service.GetName()); m_Services.push(service); } @@ -157,49 +163,6 @@ void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, Application::Log(LogDebug, "checker", "Accepted delegation for service '" + service.GetName() + "'"); - /* force a service check */ - m_CheckTimer->Reschedule(0); - - string id; - if (request.GetID(&id)) { - ResponseMessage rm; - rm.SetID(id); - - MessagePart result; - rm.SetResult(result); - GetEndpointManager()->SendUnicastMessage(m_CheckerEndpoint, sender, rm); - } -} - -void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) -{ - MessagePart params; - if (!request.GetParams(¶ms)) - return; - - string name; - if (!params.GetProperty("service", &name)) - return; - - vector services; - - while (!m_Services.empty()) { - Service service = m_Services.top(); - - if (service.GetName() == name) - continue; - - // TODO: take care of services that are currently being checked - - services.push_back(service); - } - - vector::const_iterator it; - for (it = services.begin(); it != services.end(); it++) - m_Services.push(*it); - - Application::Log(LogDebug, "checker", "Revoked delegation for service '" + name + "'"); - string id; if (request.GetID(&id)) { ResponseMessage rm; @@ -214,7 +177,12 @@ void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender, void CheckerComponent::ClearServicesRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) { Application::Log(LogDebug, "checker", "Clearing service delegations."); + + /* clear the services lists */ m_Services = ServiceQueue(); + m_PendingServices.clear(); + + /* TODO: clear checks we've already sent to the thread pool */ string id; if (request.GetID(&id)) { diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index 46ad620b7..630f23fb5 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -51,7 +51,7 @@ private: VirtualEndpoint::Ptr m_CheckerEndpoint; ServiceQueue m_Services; - set m_PendingServices; + set m_PendingServices; Timer::Ptr m_CheckTimer; diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index 298814757..9fa360018 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -43,7 +43,7 @@ void DelegationComponent::Start(void) m_DelegationEndpoint = boost::make_shared(); m_DelegationEndpoint->RegisterPublication("checker::AssignService"); - m_DelegationEndpoint->RegisterPublication("checker::RevokeService"); + m_DelegationEndpoint->RegisterPublication("checker::ClearServices"); GetEndpointManager()->RegisterEndpoint(m_DelegationEndpoint); } @@ -112,10 +112,6 @@ void DelegationComponent::DelegationTimerHandler(void) for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) histogram[eit->second] = 0; - /* nothing to do if we have no checkers */ - if (histogram.size() == 0) - return; - vector services; /* build "checker -> service count" histogram */ @@ -152,19 +148,23 @@ void DelegationComponent::DelegationTimerHandler(void) oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker); vector candidates = GetCheckerCandidates(service); - std::random_shuffle(candidates.begin(), candidates.end()); - stringstream msgbuf; - msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size(); - Application::Log(LogDebug, "delegation", msgbuf.str()); - - int avg_services = 0; + int avg_services = 0, overflow_tolerance = 0; vector::iterator cit; - for (cit = candidates.begin(); cit != candidates.end(); cit++) - avg_services += histogram[*cit]; - avg_services /= candidates.size(); - int overflow_tolerance = candidates.size() * 2; + if (candidates.size() > 0) { + std::random_shuffle(candidates.begin(), candidates.end()); + + stringstream msgbuf; + msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size(); + Application::Log(LogDebug, "delegation", msgbuf.str()); + + for (cit = candidates.begin(); cit != candidates.end(); cit++) + avg_services += histogram[*cit]; + + avg_services /= candidates.size(); + overflow_tolerance = candidates.size() * 2; + } /* don't re-assign service if the checker is still valid * and doesn't have too many services */ @@ -194,7 +194,7 @@ void DelegationComponent::DelegationTimerHandler(void) delegated++; } - assert(!service.GetChecker().empty()); + assert(candidates.size() == 0 || !service.GetChecker().empty()); } if (delegated > 0) { diff --git a/components/discovery/discoverycomponent.cpp b/components/discovery/discoverycomponent.cpp index 328268e3d..daf433b2f 100644 --- a/components/discovery/discoverycomponent.cpp +++ b/components/discovery/discoverycomponent.cpp @@ -105,6 +105,10 @@ void DiscoveryComponent::CheckExistingEndpoint(const Endpoint::Ptr& self, const */ void DiscoveryComponent::NewEndpointHandler(const Endpoint::Ptr& endpoint) { + /* ignore local endpoints */ + if (endpoint->IsLocal()) + return; + /* accept discovery::RegisterComponent messages from any endpoint */ endpoint->RegisterPublication("discovery::RegisterComponent"); @@ -354,8 +358,13 @@ void DiscoveryComponent::ProcessDiscoveryMessage(const string& identity, const D time(&(info->LastSeen)); - message.GetNode(&info->Node); - message.GetService(&info->Service); + string node; + if (message.GetNode(&node) && !node.empty()) + info->Node = node; + + string service; + if (message.GetService(&service) && !service.empty()) + info->Service = service; ConfigObject::Ptr endpointConfig = ConfigObject::GetObject("endpoint", identity); Dictionary::Ptr roles; @@ -472,6 +481,10 @@ void DiscoveryComponent::DiscoveryTimerHandler(void) string identity = i->first; ComponentDiscoveryInfo::Ptr info = i->second; + /* there's no need to reconnect to ourself */ + if (identity == GetEndpointManager()->GetIdentity()) + continue; + curr = i; i++; @@ -492,7 +505,14 @@ void DiscoveryComponent::DiscoveryTimerHandler(void) } else { /* TODO: figure out whether we actually want to connect to this component */ /* try and reconnect to this component */ - endpointManager->AddConnection(info->Node, info->Service); + try { + if (!info->Node.empty() && !info->Service.empty()) + endpointManager->AddConnection(info->Node, info->Service); + } catch (const std::exception& ex) { + stringstream msgbuf; + msgbuf << "Exception while trying to reconnect to endpoint '" << endpoint->GetIdentity() << "': " << ex.what();; + Application::Log(LogInformation, "discovery", msgbuf.str()); + } } } } diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index 03747e1bd..2f03990a4 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -21,6 +21,18 @@ using namespace icinga; +/** + * Constructor for the EndpointManager class. + */ +EndpointManager::EndpointManager(void) + : m_NextMessageID(0) +{ + m_RequestTimer = boost::make_shared(); + m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this)); + m_RequestTimer->SetInterval(5); + m_RequestTimer->Start(); +} + /** * Sets the identity of the endpoint manager. This identity is used when * connecting to remote peers. @@ -147,9 +159,6 @@ void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server) */ void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint) { - if (!endpoint->IsLocal() && endpoint->GetIdentity() != "") - throw invalid_argument("Identity must be empty."); - endpoint->SetEndpointManager(GetSelf()); UnregisterEndpoint(endpoint); @@ -310,7 +319,6 @@ void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint pr.Timeout = time(NULL) + timeout; m_Requests[id] = pr; - RescheduleRequestTimer(); if (!recipient) SendAnycastMessage(sender, message); @@ -324,29 +332,6 @@ bool EndpointManager::RequestTimeoutLessComparer(const pair::iterator it; - it = min_element(m_Requests.begin(), m_Requests.end(), - &EndpointManager::RequestTimeoutLessComparer); - - if (!m_RequestTimer) { - m_RequestTimer = boost::make_shared(); - m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this)); - } - - if (it != m_Requests.end()) { - time_t now; - time(&now); - - time_t next_timeout = (it->second.Timeout < now) ? now : it->second.Timeout; - m_RequestTimer->SetInterval(next_timeout - now); - m_RequestTimer->Start(); - } else { - m_RequestTimer->Stop(); - } -} - void EndpointManager::RequestTimerHandler(void) { map::iterator it; @@ -359,8 +344,6 @@ void EndpointManager::RequestTimerHandler(void) break; } } - - RescheduleRequestTimer(); } void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message) @@ -378,7 +361,6 @@ void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender, const it->second.Callback(GetSelf(), sender, it->second.Request, message, false); m_Requests.erase(it); - RescheduleRequestTimer(); } EndpointManager::Iterator EndpointManager::Begin(void) diff --git a/icinga/endpointmanager.h b/icinga/endpointmanager.h index 61f84aa00..663e48af6 100644 --- a/icinga/endpointmanager.h +++ b/icinga/endpointmanager.h @@ -36,9 +36,7 @@ public: typedef map::iterator Iterator; - EndpointManager(void) - : m_NextMessageID(0) - { } + EndpointManager(void); void SetIdentity(string identity); string GetIdentity(void) const; @@ -102,7 +100,6 @@ private: void UnregisterServer(JsonRpcServer::Ptr server); static bool RequestTimeoutLessComparer(const pair& a, const pair& b); - void RescheduleRequestTimer(void); void RequestTimerHandler(void); void NewClientHandler(const TcpClient::Ptr& client); diff --git a/icinga/jsonrpcendpoint.cpp b/icinga/jsonrpcendpoint.cpp index 97fc57a5a..466ce3d8a 100644 --- a/icinga/jsonrpcendpoint.cpp +++ b/icinga/jsonrpcendpoint.cpp @@ -142,9 +142,9 @@ void JsonRpcEndpoint::ClientErrorHandler(const std::exception& ex) Application::Log(LogWarning, "jsonrpc", message.str()); } -void JsonRpcEndpoint::VerifyCertificateHandler(bool& valid, const shared_ptr& certificate) +void JsonRpcEndpoint::VerifyCertificateHandler(bool *valid, const shared_ptr& certificate) { - if (certificate && valid) { + if (certificate && *valid) { string identity = Utility::GetCertificateCN(certificate); if (GetIdentity().empty() && !identity.empty()) { diff --git a/icinga/jsonrpcendpoint.h b/icinga/jsonrpcendpoint.h index b7e880265..cc9f050c7 100644 --- a/icinga/jsonrpcendpoint.h +++ b/icinga/jsonrpcendpoint.h @@ -65,7 +65,7 @@ private: void NewMessageHandler(const MessagePart& message); void ClientClosedHandler(void); void ClientErrorHandler(const std::exception& ex); - void VerifyCertificateHandler(bool& valid, const shared_ptr& certificate); + void VerifyCertificateHandler(bool *valid, const shared_ptr& certificate); }; }