Performance improvements.

This commit is contained in:
Gunnar Beutner 2012-06-19 09:38:20 +02:00
parent 1db56a5995
commit 436ad8a26a
9 changed files with 59 additions and 66 deletions

View File

@ -5,26 +5,8 @@ using namespace icinga;
ThreadPool::ThreadPool(long numThreads)
: m_Alive(true)
{
for (long i = 0; i < numThreads; i++) {
thread *thr = m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
#ifdef _WIN32
HANDLE handle = thr->native_handle();
SetPriorityClass(handle, BELOW_NORMAL_PRIORITY_CLASS);
#else /* _WIN32 */
pthread_t handle = thr->native_handle();
int policy;
sched_param param;
if (pthread_getschedparam(handle, &policy, &param) < 0)
throw PosixException("pthread_getschedparam failed", errno);
param.sched_priority = 0;
if (pthread_setschedparam(handle, SCHED_IDLE, &param) < 0)
throw PosixException("pthread_setschedparam failed", errno);
#endif /* _WIN32 */
}
for (long i = 0; i < numThreads; i++)
m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this));
}
ThreadPool::~ThreadPool(void)
@ -62,7 +44,7 @@ void ThreadPool::WaitForTasks(void)
unique_lock<mutex> lock(m_Lock);
/* wait for all pending tasks */
while (m_Tasks.size() > 0)
while (!m_Tasks.empty())
m_CV.wait(lock);
}
@ -74,7 +56,7 @@ void ThreadPool::WorkerThreadProc(void)
{
unique_lock<mutex> lock(m_Lock);
while (m_Tasks.size() == 0) {
while (m_Tasks.empty()) {
if (!m_Alive)
return;

View File

@ -12,7 +12,7 @@ public:
typedef function<void()> Task;
ThreadPool(long numThreads = 64);
ThreadPool(long numThreads = 16);
~ThreadPool(void);
static ThreadPool::Ptr GetDefaultPool(void);

View File

@ -43,7 +43,7 @@ void CheckerComponent::Start(void)
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start();
CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue, NagiosCheckTask::GetFinishedTasks);
m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5);
@ -68,25 +68,18 @@ void CheckerComponent::CheckTimerHandler(void)
long tasks = 0;
for (;;) {
if (m_Services.empty())
break;
while (!m_Services.empty()) {
Service service = m_Services.top();
if (service.GetNextCheck() > now || service.HasPendingCheck())
if (service.GetNextCheck() > now)
break;
m_Services.pop();
service.SetPendingCheck(true);
// Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'");
CheckTask::Ptr task = CheckTask::CreateTask(service);
task->Enqueue();
m_PendingTasks.push_back(task);
service.SetNextCheck(now + service.GetCheckInterval());
tasks++;
}
@ -104,37 +97,33 @@ void CheckerComponent::CheckTimerHandler(void)
void CheckerComponent::ResultTimerHandler(void)
{
vector<CheckTask::Ptr> unfinishedTasks;
Application::Log(LogDebug, "checker", "ResultTimerHandler entered.");
time_t now;
time(&now);
long results = 0;
for (vector<CheckTask::Ptr>::iterator it = m_PendingTasks.begin(); it != m_PendingTasks.end(); it++) {
vector<CheckTask::Ptr> finishedTasks = CheckTask::GetFinishedTasks();
for (vector<CheckTask::Ptr>::iterator it = finishedTasks.begin(); it != finishedTasks.end(); it++) {
CheckTask::Ptr task = *it;
if (!task->IsFinished()) {
unfinishedTasks.push_back(task);
continue;
}
Service service = task->GetService();
service.SetPendingCheck(false);
CheckResult result = task->GetResult();
// Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
results++;
service.SetNextCheck(now + service.GetCheckInterval());
m_Services.push(service);
}
stringstream msgbuf;
msgbuf << "ResultTimerHandler: " << results << " results; " << unfinishedTasks.size() << " unfinished";
msgbuf << "ResultTimerHandler: " << results << " results";
Application::Log(LogDebug, "checker", msgbuf.str());
m_PendingTasks = unfinishedTasks;
AdjustCheckTimer();
}
@ -196,8 +185,7 @@ void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender,
if (service.GetName() == name)
continue;
if (service.HasPendingCheck()) // TODO: remember services that should be removed once their pending check is done
throw runtime_error("not yet implemented");
// TODO: take care of services that are currently being checked
services.push_back(service);
}

View File

@ -56,7 +56,6 @@ private:
Timer::Ptr m_CheckTimer;
Timer::Ptr m_ResultTimer;
vector<CheckTask::Ptr> m_PendingTasks;
void CheckTimerHandler(void);
void ResultTimerHandler(void);

View File

@ -13,11 +13,12 @@ Service CheckTask::GetService(void) const
return m_Service;
}
void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter)
{
CheckTaskType ctt;
ctt.Factory = factory;
ctt.QueueFlusher = qflusher;
ctt.FinishedTasksGetter = qtasksgetter;
m_Types[type] = ctt;
}
@ -42,7 +43,18 @@ void CheckTask::Enqueue(const CheckTask::Ptr& task)
void CheckTask::FlushQueue(void)
{
map<string, CheckTaskType>::iterator it;
for (it = m_Types.begin(); it != m_Types.end(); it++)
it->second.QueueFlusher();
}
vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void)
{
vector<CheckTask::Ptr> tasks;
map<string, CheckTaskType>::iterator it;
for (it = m_Types.begin(); it != m_Types.end(); it++)
it->second.FinishedTasksGetter(tasks);
return tasks;
}

View File

@ -34,17 +34,18 @@ public:
typedef function<CheckTask::Ptr(const Service&)> Factory;
typedef function<void()> QueueFlusher;
typedef function<void (vector<CheckTask::Ptr>& tasks)> FinishedTasksGetter;
Service GetService(void) const;
virtual void Enqueue(void) = 0;
virtual bool IsFinished(void) const = 0;
virtual CheckResult GetResult(void) = 0;
static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
static void RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter);
static CheckTask::Ptr CreateTask(const Service& service);
static void Enqueue(const CheckTask::Ptr& task);
static void FlushQueue(void);
static vector<CheckTask::Ptr> GetFinishedTasks(void);
protected:
CheckTask(const Service& service);
@ -59,6 +60,7 @@ struct CheckTaskType
{
CheckTask::Factory Factory;
CheckTask::QueueFlusher QueueFlusher;
CheckTask::FinishedTasksGetter FinishedTasksGetter;
};
}

View File

@ -210,7 +210,7 @@ void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender,
candidates.push_back(endpoint);
}
if (candidates.size() == 0)
if (candidates.empty())
return;
Endpoint::Ptr recipient = candidates[rand() % candidates.size()];

View File

@ -4,19 +4,19 @@ using namespace icinga;
vector<ThreadPool::Task> NagiosCheckTask::m_QueuedTasks;
boost::mutex NagiosCheckTask::m_FinishedTasksMutex;
vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks;
NagiosCheckTask::NagiosCheckTask(const Service& service)
: CheckTask(service)
{
string checkCommand = service.GetCheckCommand();
m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()) + " 2>&1";
m_Task = packaged_task<CheckResult>(boost::bind(&NagiosCheckTask::RunCheck, this));
m_Result = m_Task.get_future();
}
void NagiosCheckTask::Enqueue(void)
{
m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, this));
m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
}
void NagiosCheckTask::FlushQueue(void)
@ -25,19 +25,26 @@ void NagiosCheckTask::FlushQueue(void)
m_QueuedTasks.clear();
}
bool NagiosCheckTask::IsFinished(void) const
void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks)
{
return m_Result.has_value();
unique_lock<mutex> lock(m_FinishedTasksMutex);
std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks));
m_FinishedTasks.clear();
}
CheckResult NagiosCheckTask::GetResult(void)
{
return m_Result.get();
return m_Result;
}
void NagiosCheckTask::Execute(void)
{
m_Task();
m_Result = RunCheck();
{
unique_lock<mutex> lock(m_FinishedTasksMutex);
m_FinishedTasks.push_back(GetSelf());
}
}
CheckResult NagiosCheckTask::RunCheck(void) const
@ -110,7 +117,5 @@ CheckResult NagiosCheckTask::RunCheck(void) const
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
{
assert(service.GetCheckType() == "nagios");
return boost::make_shared<NagiosCheckTask>(service);
}

View File

@ -7,22 +7,27 @@ namespace icinga
class I2_ICINGA_API NagiosCheckTask : public CheckTask
{
public:
typedef shared_ptr<NagiosCheckTask> Ptr;
typedef weak_ptr<NagiosCheckTask> WeakPtr;
NagiosCheckTask(const Service& service);
virtual void Enqueue(void);
virtual bool IsFinished(void) const;
virtual CheckResult GetResult(void);
static CheckTask::Ptr CreateTask(const Service& service);
static void FlushQueue(void);
static void GetFinishedTasks(vector<CheckTask::Ptr>& tasks);
private:
string m_Command;
packaged_task<CheckResult> m_Task;
unique_future<CheckResult> m_Result;
CheckResult m_Result;
static vector<ThreadPool::Task> m_QueuedTasks;
static boost::mutex m_FinishedTasksMutex;
static vector<CheckTask::Ptr> m_FinishedTasks;
void Execute(void);
CheckResult RunCheck(void) const;
};