diff --git a/base/i2-base.h b/base/i2-base.h index c55a2db28..bdfb75729 100644 --- a/base/i2-base.h +++ b/base/i2-base.h @@ -127,7 +127,6 @@ using boost::function; using boost::thread; using boost::thread_group; using boost::mutex; -using boost::unique_lock; using boost::condition_variable; #if defined(__APPLE__) && defined(__MACH__) diff --git a/base/threadpool.cpp b/base/threadpool.cpp index c7eda9f6c..fb1a3ad72 100644 --- a/base/threadpool.cpp +++ b/base/threadpool.cpp @@ -12,7 +12,7 @@ ThreadPool::ThreadPool(long numThreads) ThreadPool::~ThreadPool(void) { { - unique_lock lock(m_Lock); + mutex::scoped_lock lock(m_Lock); m_Tasks.clear(); @@ -24,24 +24,47 @@ ThreadPool::~ThreadPool(void) m_Threads.join_all(); } -void ThreadPool::EnqueueTasks(const vector& tasks) +void ThreadPool::EnqueueTasks(list& tasks) { - unique_lock lock(m_Lock); + { + mutex::scoped_lock lock(m_Lock); + m_Tasks.splice(m_Tasks.end(), tasks, tasks.begin(), tasks.end()); + } - std::copy(tasks.begin(), tasks.end(), std::back_inserter(m_Tasks)); m_CV.notify_all(); } -void ThreadPool::EnqueueTask(Task task) +void ThreadPool::EnqueueTask(const ThreadPoolTask::Ptr& task) { - unique_lock lock(m_Lock); - m_Tasks.push_back(task); + { + mutex::scoped_lock lock(m_Lock); + m_Tasks.push_back(task); + } + m_CV.notify_one(); } + +ThreadPoolTask::Ptr ThreadPool::DequeueTask(void) +{ + mutex::scoped_lock lock(m_Lock); + + while (m_Tasks.empty()) { + if (!m_Alive) + return ThreadPoolTask::Ptr(); + + m_CV.wait(lock); + } + + ThreadPoolTask::Ptr task = m_Tasks.front(); + m_Tasks.pop_front(); + + return task; +} + void ThreadPool::WaitForTasks(void) { - unique_lock lock(m_Lock); + mutex::scoped_lock lock(m_Lock); /* wait for all pending tasks */ while (!m_Tasks.empty()) @@ -51,23 +74,12 @@ void ThreadPool::WaitForTasks(void) void ThreadPool::WorkerThreadProc(void) { while (true) { - Task task; + ThreadPoolTask::Ptr task = DequeueTask(); - { - unique_lock lock(m_Lock); + if (!task) + break; - while (m_Tasks.empty()) { - if (!m_Alive) - return; - - m_CV.wait(lock); - } - - task = m_Tasks.front(); - m_Tasks.pop_front(); - } - - task(); + task->Execute(); } } @@ -80,3 +92,4 @@ ThreadPool::Ptr ThreadPool::GetDefaultPool(void) return threadPool; } + diff --git a/base/threadpool.h b/base/threadpool.h index 3d8bc2947..7c905b09a 100644 --- a/base/threadpool.h +++ b/base/threadpool.h @@ -4,32 +4,39 @@ namespace icinga { +struct ThreadPoolTask +{ + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + + virtual void Execute(void) = 0; +}; + class I2_BASE_API ThreadPool : public Object { public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef function Task; - - ThreadPool(long numThreads = 16); + ThreadPool(long numThreads = 128); ~ThreadPool(void); static ThreadPool::Ptr GetDefaultPool(void); - void EnqueueTasks(const vector& tasks); - void EnqueueTask(Task task); + void EnqueueTasks(list& tasks); + void EnqueueTask(const ThreadPoolTask::Ptr& task); void WaitForTasks(void); private: - mutex m_Lock; + mutable mutex m_Lock; condition_variable m_CV; - deque m_Tasks; + list m_Tasks; thread_group m_Threads; bool m_Alive; + ThreadPoolTask::Ptr DequeueTask(void); void WorkerThreadProc(void); }; diff --git a/base/timer.cpp b/base/timer.cpp index 90ff47dd0..0f003350e 100644 --- a/base/timer.cpp +++ b/base/timer.cpp @@ -87,7 +87,7 @@ void Timer::CallExpiredTimers(void) if (timer->m_Next <= now) { timer->Call(); - timer->Reschedule(now + timer->GetInterval()); + timer->Reschedule(time(NULL) + timer->GetInterval()); } } } diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 03b970ba9..c0cfa3ac8 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -39,7 +39,7 @@ void CheckerComponent::Start(void) GetEndpointManager()->RegisterEndpoint(m_CheckerEndpoint); m_CheckTimer = boost::make_shared(); - m_CheckTimer->SetInterval(10); + m_CheckTimer->SetInterval(5); m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->Start(); @@ -88,8 +88,6 @@ void CheckerComponent::CheckTimerHandler(void) CheckTask::FlushQueue(); - AdjustCheckTimer(); - stringstream msgbuf; msgbuf << "CheckTimerHandler: created " << tasks << " tasks"; Application::Log(LogDebug, "checker", msgbuf.str()); @@ -102,7 +100,7 @@ void CheckerComponent::ResultTimerHandler(void) time_t now; time(&now); - long results = 0; + long latency = 0, results = 0; vector finishedTasks = CheckTask::GetFinishedTasks(); @@ -114,6 +112,7 @@ void CheckerComponent::ResultTimerHandler(void) CheckResult result = task->GetResult(); // Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output); + latency += result.EndTime - result.StartTime; results++; service.SetNextCheck(now + service.GetCheckInterval()); @@ -121,20 +120,8 @@ void CheckerComponent::ResultTimerHandler(void) } stringstream msgbuf; - msgbuf << "ResultTimerHandler: " << results << " results"; + msgbuf << "ResultTimerHandler: " << results << " results; avg. latency: " << latency / (results ? results : 1); Application::Log(LogDebug, "checker", msgbuf.str()); - - AdjustCheckTimer(); -} - -void CheckerComponent::AdjustCheckTimer(void) -{ - if (m_Services.empty()) - return; - - /* adjust next call time for the check timer */ - Service service = m_Services.top(); - m_CheckTimer->Reschedule(service.GetNextCheck()); } void CheckerComponent::AssignServiceRequestHandler(const Endpoint::Ptr& sender, const RequestMessage& request) diff --git a/components/delegation/delegationcomponent.cpp b/components/delegation/delegationcomponent.cpp index f2a9de172..f52a48060 100644 --- a/components/delegation/delegationcomponent.cpp +++ b/components/delegation/delegationcomponent.cpp @@ -109,6 +109,8 @@ void DelegationComponent::DelegationTimerHandler(void) AssignService(service); } + + m_DelegationTimer->Stop(); } EXPORT_COMPONENT(delegation, DelegationComponent); diff --git a/icinga/nagioschecktask.cpp b/icinga/nagioschecktask.cpp index c941dc576..1b67bc17e 100644 --- a/icinga/nagioschecktask.cpp +++ b/icinga/nagioschecktask.cpp @@ -2,7 +2,7 @@ using namespace icinga; -vector NagiosCheckTask::m_QueuedTasks; +list NagiosCheckTask::m_QueuedTasks; boost::mutex NagiosCheckTask::m_FinishedTasksMutex; vector NagiosCheckTask::m_FinishedTasks; @@ -16,7 +16,9 @@ NagiosCheckTask::NagiosCheckTask(const Service& service) void NagiosCheckTask::Enqueue(void) { - m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, static_cast(GetSelf()))); + time(&m_Result.StartTime); + m_QueuedTasks.push_back(static_pointer_cast(static_cast(GetSelf()))); +// m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast(GetSelf())))); } void NagiosCheckTask::FlushQueue(void) @@ -27,7 +29,7 @@ void NagiosCheckTask::FlushQueue(void) void NagiosCheckTask::GetFinishedTasks(vector& tasks) { - unique_lock lock(m_FinishedTasksMutex); + mutex::scoped_lock lock(m_FinishedTasksMutex); std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks)); m_FinishedTasks.clear(); } @@ -39,28 +41,25 @@ CheckResult NagiosCheckTask::GetResult(void) void NagiosCheckTask::Execute(void) { - m_Result = RunCheck(); + RunCheck(); { - unique_lock lock(m_FinishedTasksMutex); + mutex::scoped_lock lock(m_FinishedTasksMutex); m_FinishedTasks.push_back(GetSelf()); } } -CheckResult NagiosCheckTask::RunCheck(void) const +void NagiosCheckTask::RunCheck(void) { - CheckResult cr; FILE *fp; - time(&cr.StartTime); - #ifdef _MSC_VER fp = _popen(m_Command.c_str(), "r"); #else /* _MSC_VER */ fp = popen(m_Command.c_str(), "r"); #endif /* _MSC_VER */ - stringstream outputbuf; +// ostringstream outputbuf; while (!feof(fp)) { char buffer[128]; @@ -69,11 +68,11 @@ CheckResult NagiosCheckTask::RunCheck(void) const if (read == 0) break; - outputbuf << string(buffer, buffer + read); +// outputbuf.write(buffer, read); } - cr.Output = outputbuf.str(); - boost::algorithm::trim(cr.Output); +// m_Result.Output = outputbuf.str(); +// boost::algorithm::trim(m_Result.Output); int status, exitcode; #ifdef _MSC_VER @@ -91,28 +90,26 @@ CheckResult NagiosCheckTask::RunCheck(void) const switch (exitcode) { case 0: - cr.State = StateOK; + m_Result.State = StateOK; break; case 1: - cr.State = StateWarning; + m_Result.State = StateWarning; break; case 2: - cr.State = StateCritical; + m_Result.State = StateCritical; break; default: - cr.State = StateUnknown; + m_Result.State = StateUnknown; break; } #ifndef _MSC_VER } else if (WIFSIGNALED(status)) { - cr.Output = "Process was terminated by signal " + WTERMSIG(status); - cr.State = StateUnknown; + m_Result.Output = "Process was terminated by signal " + WTERMSIG(status); + m_Result.State = StateUnknown; } #endif /* _MSC_VER */ - time(&cr.EndTime); - - return cr; + time(&m_Result.EndTime); } CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service) diff --git a/icinga/nagioschecktask.h b/icinga/nagioschecktask.h index 896716217..e835b505b 100644 --- a/icinga/nagioschecktask.h +++ b/icinga/nagioschecktask.h @@ -4,7 +4,7 @@ namespace icinga { -class I2_ICINGA_API NagiosCheckTask : public CheckTask +class I2_ICINGA_API NagiosCheckTask : public CheckTask, public ThreadPoolTask { public: typedef shared_ptr Ptr; @@ -23,13 +23,13 @@ private: string m_Command; CheckResult m_Result; - static vector m_QueuedTasks; + static list m_QueuedTasks; static boost::mutex m_FinishedTasksMutex; static vector m_FinishedTasks; - void Execute(void); - CheckResult RunCheck(void) const; + virtual void Execute(void); + void RunCheck(void); }; } diff --git a/icinga/service.cpp b/icinga/service.cpp index 8e1394ce1..3e29a7909 100644 --- a/icinga/service.cpp +++ b/icinga/service.cpp @@ -69,42 +69,26 @@ long Service::GetRetryInterval(void) const void Service::SetNextCheck(time_t nextCheck) { - GetConfigObject()->SetTag("next_check", static_cast(nextCheck)); + m_NextCheck = nextCheck; } time_t Service::GetNextCheck(void) { - long value = -1; - GetConfigObject()->GetTag("next_check", &value); + if (m_NextCheck == -1) + m_NextCheck = time(NULL) + rand() % GetCheckInterval(); - if (value == -1) { - value = time(NULL) + rand() % GetCheckInterval(); - SetNextCheck(value); - } - - return value; + return m_NextCheck; } void Service::SetChecker(string checker) { - GetConfigObject()->SetTag("checker", checker); + GetConfigObject()->SetProperty("checker", checker); } string Service::GetChecker(void) const { string value; - GetConfigObject()->GetTag("checker", &value); + GetConfigObject()->GetProperty("checker", &value); return value; } -void Service::SetPendingCheck(bool pending) -{ - GetConfigObject()->SetTag("pendingCheck", pending); -} - -bool Service::HasPendingCheck(void) const -{ - bool value = false; - GetConfigObject()->GetTag("pendingCheck", &value); - return value; -} \ No newline at end of file diff --git a/icinga/service.h b/icinga/service.h index 04f583788..ec063752a 100644 --- a/icinga/service.h +++ b/icinga/service.h @@ -8,7 +8,7 @@ class I2_ICINGA_API Service : public ConfigObjectAdapter { public: Service(const ConfigObject::Ptr& configObject) - : ConfigObjectAdapter(configObject) + : ConfigObjectAdapter(configObject), m_NextCheck(-1) { } string GetDisplayName(void) const; @@ -24,8 +24,9 @@ public: time_t GetNextCheck(void); void SetChecker(string checker); string GetChecker(void) const; - void SetPendingCheck(bool pending); - bool HasPendingCheck(void) const; + +private: + time_t m_NextCheck; }; }