More performance improvements.

This commit is contained in:
Gunnar Beutner 2012-06-19 12:23:52 +02:00
parent 436ad8a26a
commit 877b9516f0
10 changed files with 90 additions and 100 deletions

View File

@ -127,7 +127,6 @@ using boost::function;
using boost::thread; using boost::thread;
using boost::thread_group; using boost::thread_group;
using boost::mutex; using boost::mutex;
using boost::unique_lock;
using boost::condition_variable; using boost::condition_variable;
#if defined(__APPLE__) && defined(__MACH__) #if defined(__APPLE__) && defined(__MACH__)

View File

@ -12,7 +12,7 @@ ThreadPool::ThreadPool(long numThreads)
ThreadPool::~ThreadPool(void) ThreadPool::~ThreadPool(void)
{ {
{ {
unique_lock<mutex> lock(m_Lock); mutex::scoped_lock lock(m_Lock);
m_Tasks.clear(); m_Tasks.clear();
@ -24,24 +24,47 @@ ThreadPool::~ThreadPool(void)
m_Threads.join_all(); m_Threads.join_all();
} }
void ThreadPool::EnqueueTasks(const vector<Task>& tasks) void ThreadPool::EnqueueTasks(list<ThreadPoolTask::Ptr>& tasks)
{ {
unique_lock<mutex> lock(m_Lock); {
mutex::scoped_lock lock(m_Lock);
m_Tasks.splice(m_Tasks.end(), tasks, tasks.begin(), tasks.end());
}
std::copy(tasks.begin(), tasks.end(), std::back_inserter(m_Tasks));
m_CV.notify_all(); m_CV.notify_all();
} }
void ThreadPool::EnqueueTask(Task task) void ThreadPool::EnqueueTask(const ThreadPoolTask::Ptr& task)
{ {
unique_lock<mutex> lock(m_Lock); {
mutex::scoped_lock lock(m_Lock);
m_Tasks.push_back(task); m_Tasks.push_back(task);
}
m_CV.notify_one(); m_CV.notify_one();
} }
ThreadPoolTask::Ptr ThreadPool::DequeueTask(void)
{
mutex::scoped_lock lock(m_Lock);
while (m_Tasks.empty()) {
if (!m_Alive)
return ThreadPoolTask::Ptr();
m_CV.wait(lock);
}
ThreadPoolTask::Ptr task = m_Tasks.front();
m_Tasks.pop_front();
return task;
}
void ThreadPool::WaitForTasks(void) void ThreadPool::WaitForTasks(void)
{ {
unique_lock<mutex> lock(m_Lock); mutex::scoped_lock lock(m_Lock);
/* wait for all pending tasks */ /* wait for all pending tasks */
while (!m_Tasks.empty()) while (!m_Tasks.empty())
@ -51,23 +74,12 @@ void ThreadPool::WaitForTasks(void)
void ThreadPool::WorkerThreadProc(void) void ThreadPool::WorkerThreadProc(void)
{ {
while (true) { while (true) {
Task task; ThreadPoolTask::Ptr task = DequeueTask();
{ if (!task)
unique_lock<mutex> lock(m_Lock); break;
while (m_Tasks.empty()) { task->Execute();
if (!m_Alive)
return;
m_CV.wait(lock);
}
task = m_Tasks.front();
m_Tasks.pop_front();
}
task();
} }
} }
@ -80,3 +92,4 @@ ThreadPool::Ptr ThreadPool::GetDefaultPool(void)
return threadPool; return threadPool;
} }

View File

@ -4,32 +4,39 @@
namespace icinga namespace icinga
{ {
struct ThreadPoolTask
{
typedef shared_ptr<ThreadPoolTask> Ptr;
typedef weak_ptr<ThreadPoolTask> WeakPtr;
virtual void Execute(void) = 0;
};
class I2_BASE_API ThreadPool : public Object class I2_BASE_API ThreadPool : public Object
{ {
public: public:
typedef shared_ptr<ThreadPool> Ptr; typedef shared_ptr<ThreadPool> Ptr;
typedef weak_ptr<ThreadPool> WeakPtr; typedef weak_ptr<ThreadPool> WeakPtr;
typedef function<void()> Task; ThreadPool(long numThreads = 128);
ThreadPool(long numThreads = 16);
~ThreadPool(void); ~ThreadPool(void);
static ThreadPool::Ptr GetDefaultPool(void); static ThreadPool::Ptr GetDefaultPool(void);
void EnqueueTasks(const vector<Task>& tasks); void EnqueueTasks(list<ThreadPoolTask::Ptr>& tasks);
void EnqueueTask(Task task); void EnqueueTask(const ThreadPoolTask::Ptr& task);
void WaitForTasks(void); void WaitForTasks(void);
private: private:
mutex m_Lock; mutable mutex m_Lock;
condition_variable m_CV; condition_variable m_CV;
deque<Task> m_Tasks; list<ThreadPoolTask::Ptr> m_Tasks;
thread_group m_Threads; thread_group m_Threads;
bool m_Alive; bool m_Alive;
ThreadPoolTask::Ptr DequeueTask(void);
void WorkerThreadProc(void); void WorkerThreadProc(void);
}; };

View File

@ -87,7 +87,7 @@ void Timer::CallExpiredTimers(void)
if (timer->m_Next <= now) { if (timer->m_Next <= now) {
timer->Call(); timer->Call();
timer->Reschedule(now + timer->GetInterval()); timer->Reschedule(time(NULL) + timer->GetInterval());
} }
} }
} }

View File

@ -39,7 +39,7 @@ void CheckerComponent::Start(void)
GetEndpointManager()->RegisterEndpoint(m_CheckerEndpoint); GetEndpointManager()->RegisterEndpoint(m_CheckerEndpoint);
m_CheckTimer = boost::make_shared<Timer>(); m_CheckTimer = boost::make_shared<Timer>();
m_CheckTimer->SetInterval(10); m_CheckTimer->SetInterval(5);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start(); m_CheckTimer->Start();
@ -88,8 +88,6 @@ void CheckerComponent::CheckTimerHandler(void)
CheckTask::FlushQueue(); CheckTask::FlushQueue();
AdjustCheckTimer();
stringstream msgbuf; stringstream msgbuf;
msgbuf << "CheckTimerHandler: created " << tasks << " tasks"; msgbuf << "CheckTimerHandler: created " << tasks << " tasks";
Application::Log(LogDebug, "checker", msgbuf.str()); Application::Log(LogDebug, "checker", msgbuf.str());
@ -102,7 +100,7 @@ void CheckerComponent::ResultTimerHandler(void)
time_t now; time_t now;
time(&now); time(&now);
long results = 0; long latency = 0, results = 0;
vector<CheckTask::Ptr> finishedTasks = CheckTask::GetFinishedTasks(); vector<CheckTask::Ptr> finishedTasks = CheckTask::GetFinishedTasks();
@ -114,6 +112,7 @@ void CheckerComponent::ResultTimerHandler(void)
CheckResult result = task->GetResult(); CheckResult result = task->GetResult();
// Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output); // Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output);
latency += result.EndTime - result.StartTime;
results++; results++;
service.SetNextCheck(now + service.GetCheckInterval()); service.SetNextCheck(now + service.GetCheckInterval());
@ -121,20 +120,8 @@ void CheckerComponent::ResultTimerHandler(void)
} }
stringstream msgbuf; stringstream msgbuf;
msgbuf << "ResultTimerHandler: " << results << " results"; msgbuf << "ResultTimerHandler: " << results << " results; avg. latency: " << latency / (results ? results : 1);
Application::Log(LogDebug, "checker", msgbuf.str()); Application::Log(LogDebug, "checker", msgbuf.str());
AdjustCheckTimer();
}
void CheckerComponent::AdjustCheckTimer(void)
{
if (m_Services.empty())
return;
/* adjust next call time for the check timer */
Service service = m_Services.top();
m_CheckTimer->Reschedule(service.GetNextCheck());
} }
void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request)

View File

@ -109,6 +109,8 @@ void DelegationComponent::DelegationTimerHandler(void)
AssignService(service); AssignService(service);
} }
m_DelegationTimer->Stop();
} }
EXPORT_COMPONENT(delegation, DelegationComponent); EXPORT_COMPONENT(delegation, DelegationComponent);

View File

@ -2,7 +2,7 @@
using namespace icinga; using namespace icinga;
vector<ThreadPool::Task> NagiosCheckTask::m_QueuedTasks; list<ThreadPoolTask::Ptr> NagiosCheckTask::m_QueuedTasks;
boost::mutex NagiosCheckTask::m_FinishedTasksMutex; boost::mutex NagiosCheckTask::m_FinishedTasksMutex;
vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks; vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks;
@ -16,7 +16,9 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
void NagiosCheckTask::Enqueue(void) void NagiosCheckTask::Enqueue(void)
{ {
m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf()))); time(&m_Result.StartTime);
m_QueuedTasks.push_back(static_pointer_cast<ThreadPoolTask>(static_cast<NagiosCheckTask::Ptr>(GetSelf())));
// m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf()))));
} }
void NagiosCheckTask::FlushQueue(void) void NagiosCheckTask::FlushQueue(void)
@ -27,7 +29,7 @@ void NagiosCheckTask::FlushQueue(void)
void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks) void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks)
{ {
unique_lock<mutex> lock(m_FinishedTasksMutex); mutex::scoped_lock lock(m_FinishedTasksMutex);
std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks)); std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks));
m_FinishedTasks.clear(); m_FinishedTasks.clear();
} }
@ -39,28 +41,25 @@ CheckResult NagiosCheckTask::GetResult(void)
void NagiosCheckTask::Execute(void) void NagiosCheckTask::Execute(void)
{ {
m_Result = RunCheck(); RunCheck();
{ {
unique_lock<mutex> lock(m_FinishedTasksMutex); mutex::scoped_lock lock(m_FinishedTasksMutex);
m_FinishedTasks.push_back(GetSelf()); m_FinishedTasks.push_back(GetSelf());
} }
} }
CheckResult NagiosCheckTask::RunCheck(void) const void NagiosCheckTask::RunCheck(void)
{ {
CheckResult cr;
FILE *fp; FILE *fp;
time(&cr.StartTime);
#ifdef _MSC_VER #ifdef _MSC_VER
fp = _popen(m_Command.c_str(), "r"); fp = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */ #else /* _MSC_VER */
fp = popen(m_Command.c_str(), "r"); fp = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */ #endif /* _MSC_VER */
stringstream outputbuf; // ostringstream outputbuf;
while (!feof(fp)) { while (!feof(fp)) {
char buffer[128]; char buffer[128];
@ -69,11 +68,11 @@ CheckResult NagiosCheckTask::RunCheck(void) const
if (read == 0) if (read == 0)
break; break;
outputbuf << string(buffer, buffer + read); // outputbuf.write(buffer, read);
} }
cr.Output = outputbuf.str(); // m_Result.Output = outputbuf.str();
boost::algorithm::trim(cr.Output); // boost::algorithm::trim(m_Result.Output);
int status, exitcode; int status, exitcode;
#ifdef _MSC_VER #ifdef _MSC_VER
@ -91,28 +90,26 @@ CheckResult NagiosCheckTask::RunCheck(void) const
switch (exitcode) { switch (exitcode) {
case 0: case 0:
cr.State = StateOK; m_Result.State = StateOK;
break; break;
case 1: case 1:
cr.State = StateWarning; m_Result.State = StateWarning;
break; break;
case 2: case 2:
cr.State = StateCritical; m_Result.State = StateCritical;
break; break;
default: default:
cr.State = StateUnknown; m_Result.State = StateUnknown;
break; break;
} }
#ifndef _MSC_VER #ifndef _MSC_VER
} else if (WIFSIGNALED(status)) { } else if (WIFSIGNALED(status)) {
cr.Output = "Process was terminated by signal " + WTERMSIG(status); m_Result.Output = "Process was terminated by signal " + WTERMSIG(status);
cr.State = StateUnknown; m_Result.State = StateUnknown;
} }
#endif /* _MSC_VER */ #endif /* _MSC_VER */
time(&cr.EndTime); time(&m_Result.EndTime);
return cr;
} }
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service) CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)

View File

@ -4,7 +4,7 @@
namespace icinga namespace icinga
{ {
class I2_ICINGA_API NagiosCheckTask : public CheckTask class I2_ICINGA_API NagiosCheckTask : public CheckTask, public ThreadPoolTask
{ {
public: public:
typedef shared_ptr<NagiosCheckTask> Ptr; typedef shared_ptr<NagiosCheckTask> Ptr;
@ -23,13 +23,13 @@ private:
string m_Command; string m_Command;
CheckResult m_Result; CheckResult m_Result;
static vector<ThreadPool::Task> m_QueuedTasks; static list<ThreadPoolTask::Ptr> m_QueuedTasks;
static boost::mutex m_FinishedTasksMutex; static boost::mutex m_FinishedTasksMutex;
static vector<CheckTask::Ptr> m_FinishedTasks; static vector<CheckTask::Ptr> m_FinishedTasks;
void Execute(void); virtual void Execute(void);
CheckResult RunCheck(void) const; void RunCheck(void);
}; };
} }

View File

@ -69,42 +69,26 @@ long Service::GetRetryInterval(void) const
void Service::SetNextCheck(time_t nextCheck) void Service::SetNextCheck(time_t nextCheck)
{ {
GetConfigObject()->SetTag("next_check", static_cast<long>(nextCheck)); m_NextCheck = nextCheck;
} }
time_t Service::GetNextCheck(void) time_t Service::GetNextCheck(void)
{ {
long value = -1; if (m_NextCheck == -1)
GetConfigObject()->GetTag("next_check", &value); m_NextCheck = time(NULL) + rand() % GetCheckInterval();
if (value == -1) { return m_NextCheck;
value = time(NULL) + rand() % GetCheckInterval();
SetNextCheck(value);
}
return value;
} }
void Service::SetChecker(string checker) void Service::SetChecker(string checker)
{ {
GetConfigObject()->SetTag("checker", checker); GetConfigObject()->SetProperty("checker", checker);
} }
string Service::GetChecker(void) const string Service::GetChecker(void) const
{ {
string value; string value;
GetConfigObject()->GetTag("checker", &value); GetConfigObject()->GetProperty("checker", &value);
return value; return value;
} }
void Service::SetPendingCheck(bool pending)
{
GetConfigObject()->SetTag("pendingCheck", pending);
}
bool Service::HasPendingCheck(void) const
{
bool value = false;
GetConfigObject()->GetTag("pendingCheck", &value);
return value;
}

View File

@ -8,7 +8,7 @@ class I2_ICINGA_API Service : public ConfigObjectAdapter
{ {
public: public:
Service(const ConfigObject::Ptr& configObject) Service(const ConfigObject::Ptr& configObject)
: ConfigObjectAdapter(configObject) : ConfigObjectAdapter(configObject), m_NextCheck(-1)
{ } { }
string GetDisplayName(void) const; string GetDisplayName(void) const;
@ -24,8 +24,9 @@ public:
time_t GetNextCheck(void); time_t GetNextCheck(void);
void SetChecker(string checker); void SetChecker(string checker);
string GetChecker(void) const; string GetChecker(void) const;
void SetPendingCheck(bool pending);
bool HasPendingCheck(void) const; private:
time_t m_NextCheck;
}; };
} }