Bugfixes for the delegation feature.

This commit is contained in:
Gunnar Beutner 2012-06-21 00:10:10 +02:00
parent 0cffb4ed72
commit db4fde9554
6 changed files with 68 additions and 38 deletions

View File

@ -107,10 +107,10 @@ void Timer::Call(void)
time_t et; time_t et;
time(&et); time(&et);
if (et - st > 5) { if (et - st > 3) {
stringstream msgbuf; stringstream msgbuf;
msgbuf << "Timer call took " << et - st << " seconds."; msgbuf << "Timer call took " << et - st << " seconds.";
Application::Log(LogDebug, "base", msgbuf.str()); Application::Log(LogInformation, "base", msgbuf.str());
} }
} }

View File

@ -30,15 +30,16 @@ string DelegationComponent::GetName(void) const
void DelegationComponent::Start(void) void DelegationComponent::Start(void)
{ {
m_AllServices = boost::make_shared<ConfigObject::Set>(ConfigObject::GetAllObjects(), ConfigObject::MakeTypePredicate("service")); m_AllServices = boost::make_shared<ConfigObject::Set>(ConfigObject::GetAllObjects(), ConfigObject::MakeTypePredicate("service"));
m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2)); /* m_AllServices->OnObjectAdded.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
m_AllServices->OnObjectCommitted.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2)); m_AllServices->OnObjectCommitted.connect(boost::bind(&DelegationComponent::NewServiceHandler, this, _2));
m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2)); m_AllServices->OnObjectRemoved.connect(boost::bind(&DelegationComponent::RemovedServiceHandler, this, _2));*/
m_AllServices->Start(); m_AllServices->Start();
m_DelegationTimer = boost::make_shared<Timer>(); m_DelegationTimer = boost::make_shared<Timer>();
m_DelegationTimer->SetInterval(30); m_DelegationTimer->SetInterval(30);
m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this)); m_DelegationTimer->OnTimerExpired.connect(boost::bind(&DelegationComponent::DelegationTimerHandler, this));
m_DelegationTimer->Start(); m_DelegationTimer->Start();
m_DelegationTimer->Reschedule(0);
m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>(); m_DelegationEndpoint = boost::make_shared<VirtualEndpoint>();
m_DelegationEndpoint->RegisterPublication("checker::AssignService"); m_DelegationEndpoint->RegisterPublication("checker::AssignService");
@ -54,17 +55,7 @@ void DelegationComponent::Stop(void)
mgr->UnregisterEndpoint(m_DelegationEndpoint); mgr->UnregisterEndpoint(m_DelegationEndpoint);
} }
void DelegationComponent::NewServiceHandler(const Service& object) void DelegationComponent::AssignService(const Endpoint::Ptr& checker, const Service& service)
{
AssignService(object);
}
void DelegationComponent::RemovedServiceHandler(const Service& object)
{
RevokeService(object);
}
void DelegationComponent::AssignService(const Service& service)
{ {
RequestMessage request; RequestMessage request;
request.SetMethod("checker::AssignService"); request.SetMethod("checker::AssignService");
@ -75,7 +66,7 @@ void DelegationComponent::AssignService(const Service& service)
Application::Log(LogDebug, "delegation", "Trying to delegate service '" + service.GetName() + "'"); Application::Log(LogDebug, "delegation", "Trying to delegate service '" + service.GetName() + "'");
GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, request, GetEndpointManager()->SendAPIMessage(m_DelegationEndpoint, checker, request,
boost::bind(&DelegationComponent::AssignServiceResponseHandler, this, service, _2, _5)); boost::bind(&DelegationComponent::AssignServiceResponseHandler, this, service, _2, _5));
} }
@ -83,13 +74,11 @@ void DelegationComponent::AssignServiceResponseHandler(Service& service, const E
{ {
if (timedOut) { if (timedOut) {
Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' timed out."); Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' timed out.");
} else { service.SetChecker("");
service.SetChecker(sender->GetIdentity());
Application::Log(LogDebug, "delegation", "Service delegation for service '" + service.GetName() + "' was successful.");
} }
} }
void DelegationComponent::RevokeService(const Service& service) void DelegationComponent::ClearServices(const Endpoint::Ptr& checker)
{ {
} }
@ -103,8 +92,14 @@ vector<Endpoint::Ptr> DelegationComponent::GetCheckerCandidates(const Service& s
vector<Endpoint::Ptr> candidates; vector<Endpoint::Ptr> candidates;
EndpointManager::Iterator it; EndpointManager::Iterator it;
for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) for (it = GetEndpointManager()->Begin(); it != GetEndpointManager()->End(); it++) {
candidates.push_back(it->second); Endpoint::Ptr endpoint = it->second;
if (!endpoint->HasSubscription("checker::AssignService"))
continue;
candidates.push_back(endpoint);
}
return candidates; return candidates;
} }
@ -114,9 +109,8 @@ void DelegationComponent::DelegationTimerHandler(void)
map<Endpoint::Ptr, int> histogram; map<Endpoint::Ptr, int> histogram;
EndpointManager::Iterator eit; EndpointManager::Iterator eit;
for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++) { for (eit = GetEndpointManager()->Begin(); eit != GetEndpointManager()->End(); eit++)
histogram[eit->second] = 0; histogram[eit->second] = 0;
}
/* nothing to do if we have no checkers */ /* nothing to do if we have no checkers */
if (histogram.size() == 0) if (histogram.size() == 0)
@ -144,11 +138,11 @@ void DelegationComponent::DelegationTimerHandler(void)
std::random_shuffle(services.begin(), services.end()); std::random_shuffle(services.begin(), services.end());
long delegated = 0; int delegated = 0;
/* re-assign services */ /* re-assign services */
vector<Service>::iterator sit; vector<Service>::iterator sit;
for (sit = services.begin(); sit != services.end(); it++) { for (sit = services.begin(); sit != services.end(); sit++) {
Service service = *sit; Service service = *sit;
string checker = service.GetChecker(); string checker = service.GetChecker();
@ -158,14 +152,19 @@ void DelegationComponent::DelegationTimerHandler(void)
oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker); oldEndpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service); vector<Endpoint::Ptr> candidates = GetCheckerCandidates(service);
std::random_shuffle(candidates.begin(), candidates.end());
long avg_services = 0; stringstream msgbuf;
msgbuf << "Service: " << service.GetName() << ", candidates: " << candidates.size();
Application::Log(LogDebug, "delegation", msgbuf.str());
int avg_services = 0;
vector<Endpoint::Ptr>::iterator cit; vector<Endpoint::Ptr>::iterator cit;
for (cit = candidates.begin(); cit != candidates.end(); cit++) for (cit = candidates.begin(); cit != candidates.end(); cit++)
avg_services += histogram[*cit]; avg_services += histogram[*cit];
avg_services /= candidates.size(); avg_services /= candidates.size();
long overflow_tolerance = candidates.size() * 2; int overflow_tolerance = candidates.size() * 2;
/* don't re-assign service if the checker is still valid /* don't re-assign service if the checker is still valid
* and doesn't have too many services */ * and doesn't have too many services */
@ -198,6 +197,23 @@ void DelegationComponent::DelegationTimerHandler(void)
assert(!service.GetChecker().empty()); assert(!service.GetChecker().empty());
} }
if (delegated > 0) {
map<Endpoint::Ptr, int>::iterator hit;
for (hit = histogram.begin(); hit != histogram.end(); hit++) {
ClearServices(hit->first);
}
for (sit = services.begin(); sit != services.end(); sit++) {
string checker = sit->GetChecker();
Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(checker);
if (!endpoint)
continue;
AssignService(endpoint, *sit);
}
}
stringstream msgbuf; stringstream msgbuf;
msgbuf << "Re-delegated " << delegated << " services"; msgbuf << "Re-delegated " << delegated << " services";
Application::Log(LogInformation, "delegation", msgbuf.str()); Application::Log(LogInformation, "delegation", msgbuf.str());

View File

@ -38,9 +38,6 @@ private:
ConfigObject::Set::Ptr m_AllServices; ConfigObject::Set::Ptr m_AllServices;
Timer::Ptr m_DelegationTimer; Timer::Ptr m_DelegationTimer;
void NewServiceHandler(const Service& object);
void RemovedServiceHandler(const Service& object);
void AssignServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut); void AssignServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut);
void RevokeServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut); void RevokeServiceResponseHandler(Service& service, const Endpoint::Ptr& sender, bool timedOut);
@ -48,8 +45,8 @@ private:
vector<Endpoint::Ptr> GetCheckerCandidates(const Service& service) const; vector<Endpoint::Ptr> GetCheckerCandidates(const Service& service) const;
void AssignService(const Service& service); void AssignService(const Endpoint::Ptr& checker, const Service& service);
void RevokeService(const Service& service); void ClearServices(const Endpoint::Ptr& checker);
}; };
} }

View File

@ -292,7 +292,7 @@ Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
return Endpoint::Ptr(); return Endpoint::Ptr();
} }
void EndpointManager::SendAPIMessage(Endpoint::Ptr sender, void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
RequestMessage& message, RequestMessage& message,
function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> callback, time_t timeout) function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> callback, time_t timeout)
{ {
@ -312,7 +312,10 @@ void EndpointManager::SendAPIMessage(Endpoint::Ptr sender,
m_Requests[id] = pr; m_Requests[id] = pr;
RescheduleRequestTimer(); RescheduleRequestTimer();
SendAnycastMessage(sender, message); if (!recipient)
SendAnycastMessage(sender, message);
else
SendUnicastMessage(sender, recipient, message);
} }
bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a, bool EndpointManager::RequestTimeoutLessComparer(const pair<string, PendingRequest>& a,

View File

@ -56,7 +56,7 @@ public:
void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message); void SendAnycastMessage(Endpoint::Ptr sender, const RequestMessage& message);
void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message); void SendMulticastMessage(Endpoint::Ptr sender, const RequestMessage& message);
void SendAPIMessage(Endpoint::Ptr sender, RequestMessage& message, void SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message,
function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> callback, time_t timeout = 10); function<void(const EndpointManager::Ptr&, const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> callback, time_t timeout = 10);
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message); void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);

View File

@ -57,8 +57,19 @@ void NagiosCheckTask::RunCheck(void)
#ifdef _MSC_VER #ifdef _MSC_VER
fp = _popen(m_Command.c_str(), "r"); fp = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */ #else /* _MSC_VER */
bool use_libc_popen = false;
popen_noshell_pass_to_pclose pclose_arg; popen_noshell_pass_to_pclose pclose_arg;
fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
if (!use_libc_popen) {
fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
if (fp == NULL) // TODO: add check for valgrind
use_libc_popen = true;
}
if (use_libc_popen)
fp = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */ #endif /* _MSC_VER */
stringstream outputbuf; stringstream outputbuf;
@ -80,7 +91,10 @@ void NagiosCheckTask::RunCheck(void)
#ifdef _MSC_VER #ifdef _MSC_VER
status = _pclose(fp); status = _pclose(fp);
#else /* _MSC_VER */ #else /* _MSC_VER */
status = pclose_noshell(&pclose_arg); if (use_libc_popen)
status = pclose(fp);
else
status = pclose_noshell(&pclose_arg);
#endif /* _MSC_VER */ #endif /* _MSC_VER */
#ifndef _MSC_VER #ifndef _MSC_VER