diff --git a/base/threadpool.cpp b/base/threadpool.cpp index f4911dc6c..c7eda9f6c 100644 --- a/base/threadpool.cpp +++ b/base/threadpool.cpp @@ -5,26 +5,8 @@ using namespace icinga; ThreadPool::ThreadPool(long numThreads) : m_Alive(true) { - for (long i = 0; i < numThreads; i++) { - thread *thr = m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this)); -#ifdef _WIN32 - HANDLE handle = thr->native_handle(); - SetPriorityClass(handle, BELOW_NORMAL_PRIORITY_CLASS); -#else /* _WIN32 */ - pthread_t handle = thr->native_handle(); - - int policy; - sched_param param; - - if (pthread_getschedparam(handle, &policy, ¶m) < 0) - throw PosixException("pthread_getschedparam failed", errno); - - param.sched_priority = 0; - - if (pthread_setschedparam(handle, SCHED_IDLE, ¶m) < 0) - throw PosixException("pthread_setschedparam failed", errno); -#endif /* _WIN32 */ - } + for (long i = 0; i < numThreads; i++) + m_Threads.create_thread(boost::bind(&ThreadPool::WorkerThreadProc, this)); } ThreadPool::~ThreadPool(void) @@ -62,7 +44,7 @@ void ThreadPool::WaitForTasks(void) unique_lock lock(m_Lock); /* wait for all pending tasks */ - while (m_Tasks.size() > 0) + while (!m_Tasks.empty()) m_CV.wait(lock); } @@ -74,7 +56,7 @@ void ThreadPool::WorkerThreadProc(void) { unique_lock lock(m_Lock); - while (m_Tasks.size() == 0) { + while (m_Tasks.empty()) { if (!m_Alive) return; diff --git a/base/threadpool.h b/base/threadpool.h index d36c1bec9..3d8bc2947 100644 --- a/base/threadpool.h +++ b/base/threadpool.h @@ -12,7 +12,7 @@ public: typedef function Task; - ThreadPool(long numThreads = 64); + ThreadPool(long numThreads = 16); ~ThreadPool(void); static ThreadPool::Ptr GetDefaultPool(void); diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 90981b05d..03b970ba9 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -43,7 +43,7 @@ void CheckerComponent::Start(void) m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->Start(); - CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue); + CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue, NagiosCheckTask::GetFinishedTasks); m_ResultTimer = boost::make_shared(); m_ResultTimer->SetInterval(5); @@ -68,25 +68,18 @@ void CheckerComponent::CheckTimerHandler(void) long tasks = 0; - for (;;) { - if (m_Services.empty()) - break; - + while (!m_Services.empty()) { Service service = m_Services.top(); - if (service.GetNextCheck() > now || service.HasPendingCheck()) + if (service.GetNextCheck() > now) break; m_Services.pop(); - service.SetPendingCheck(true); // Application::Log(LogInformation, "checker", "Executing service check for '" + service.GetName() + "'"); CheckTask::Ptr task = CheckTask::CreateTask(service); task->Enqueue(); - m_PendingTasks.push_back(task); - - service.SetNextCheck(now + service.GetCheckInterval()); tasks++; } @@ -104,37 +97,33 @@ void CheckerComponent::CheckTimerHandler(void) void CheckerComponent::ResultTimerHandler(void) { - vector unfinishedTasks; - Application::Log(LogDebug, "checker", "ResultTimerHandler entered."); + time_t now; + time(&now); + long results = 0; - for (vector::iterator it = m_PendingTasks.begin(); it != m_PendingTasks.end(); it++) { + vector finishedTasks = CheckTask::GetFinishedTasks(); + + for (vector::iterator it = finishedTasks.begin(); it != finishedTasks.end(); it++) { CheckTask::Ptr task = *it; - if (!task->IsFinished()) { - unfinishedTasks.push_back(task); - continue; - } - Service service = task->GetService(); - service.SetPendingCheck(false); CheckResult result = task->GetResult(); // Application::Log(LogInformation, "checker", "Got result! Plugin output: " + result.Output); results++; + service.SetNextCheck(now + service.GetCheckInterval()); m_Services.push(service); } stringstream msgbuf; - msgbuf << "ResultTimerHandler: " << results << " results; " << unfinishedTasks.size() << " unfinished"; + msgbuf << "ResultTimerHandler: " << results << " results"; Application::Log(LogDebug, "checker", msgbuf.str()); - m_PendingTasks = unfinishedTasks; - AdjustCheckTimer(); } @@ -196,8 +185,7 @@ void CheckerComponent::RevokeServiceRequestHandler(const Endpoint::Ptr& sender, if (service.GetName() == name) continue; - if (service.HasPendingCheck()) // TODO: remember services that should be removed once their pending check is done - throw runtime_error("not yet implemented"); + // TODO: take care of services that are currently being checked services.push_back(service); } diff --git a/components/checker/checkercomponent.h b/components/checker/checkercomponent.h index f39f1b46b..46ad620b7 100644 --- a/components/checker/checkercomponent.h +++ b/components/checker/checkercomponent.h @@ -56,7 +56,6 @@ private: Timer::Ptr m_CheckTimer; Timer::Ptr m_ResultTimer; - vector m_PendingTasks; void CheckTimerHandler(void); void ResultTimerHandler(void); diff --git a/icinga/checktask.cpp b/icinga/checktask.cpp index 694d1f2a6..84ac1606e 100644 --- a/icinga/checktask.cpp +++ b/icinga/checktask.cpp @@ -13,11 +13,12 @@ Service CheckTask::GetService(void) const return m_Service; } -void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher) +void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter) { CheckTaskType ctt; ctt.Factory = factory; ctt.QueueFlusher = qflusher; + ctt.FinishedTasksGetter = qtasksgetter; m_Types[type] = ctt; } @@ -42,7 +43,18 @@ void CheckTask::Enqueue(const CheckTask::Ptr& task) void CheckTask::FlushQueue(void) { map::iterator it; - for (it = m_Types.begin(); it != m_Types.end(); it++) it->second.QueueFlusher(); } + +vector CheckTask::GetFinishedTasks(void) +{ + vector tasks; + + map::iterator it; + for (it = m_Types.begin(); it != m_Types.end(); it++) + it->second.FinishedTasksGetter(tasks); + + return tasks; +} + diff --git a/icinga/checktask.h b/icinga/checktask.h index 38c658743..f8de86e2d 100644 --- a/icinga/checktask.h +++ b/icinga/checktask.h @@ -34,17 +34,18 @@ public: typedef function Factory; typedef function QueueFlusher; + typedef function& tasks)> FinishedTasksGetter; Service GetService(void) const; virtual void Enqueue(void) = 0; - virtual bool IsFinished(void) const = 0; virtual CheckResult GetResult(void) = 0; - static void RegisterType(string type, Factory factory, QueueFlusher qflusher); + static void RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter); static CheckTask::Ptr CreateTask(const Service& service); static void Enqueue(const CheckTask::Ptr& task); static void FlushQueue(void); + static vector GetFinishedTasks(void); protected: CheckTask(const Service& service); @@ -59,6 +60,7 @@ struct CheckTaskType { CheckTask::Factory Factory; CheckTask::QueueFlusher QueueFlusher; + CheckTask::FinishedTasksGetter FinishedTasksGetter; }; } diff --git a/icinga/endpointmanager.cpp b/icinga/endpointmanager.cpp index b95f88a2b..fdfdf4497 100644 --- a/icinga/endpointmanager.cpp +++ b/icinga/endpointmanager.cpp @@ -210,7 +210,7 @@ void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender, candidates.push_back(endpoint); } - if (candidates.size() == 0) + if (candidates.empty()) return; Endpoint::Ptr recipient = candidates[rand() % candidates.size()]; diff --git a/icinga/nagioschecktask.cpp b/icinga/nagioschecktask.cpp index 66d8e7a8d..c941dc576 100644 --- a/icinga/nagioschecktask.cpp +++ b/icinga/nagioschecktask.cpp @@ -4,19 +4,19 @@ using namespace icinga; vector NagiosCheckTask::m_QueuedTasks; +boost::mutex NagiosCheckTask::m_FinishedTasksMutex; +vector NagiosCheckTask::m_FinishedTasks; + NagiosCheckTask::NagiosCheckTask(const Service& service) : CheckTask(service) { string checkCommand = service.GetCheckCommand(); m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()) + " 2>&1"; - - m_Task = packaged_task(boost::bind(&NagiosCheckTask::RunCheck, this)); - m_Result = m_Task.get_future(); } void NagiosCheckTask::Enqueue(void) { - m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, this)); + m_QueuedTasks.push_back(bind(&NagiosCheckTask::Execute, static_cast(GetSelf()))); } void NagiosCheckTask::FlushQueue(void) @@ -25,19 +25,26 @@ void NagiosCheckTask::FlushQueue(void) m_QueuedTasks.clear(); } -bool NagiosCheckTask::IsFinished(void) const +void NagiosCheckTask::GetFinishedTasks(vector& tasks) { - return m_Result.has_value(); + unique_lock lock(m_FinishedTasksMutex); + std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks)); + m_FinishedTasks.clear(); } CheckResult NagiosCheckTask::GetResult(void) { - return m_Result.get(); + return m_Result; } void NagiosCheckTask::Execute(void) { - m_Task(); + m_Result = RunCheck(); + + { + unique_lock lock(m_FinishedTasksMutex); + m_FinishedTasks.push_back(GetSelf()); + } } CheckResult NagiosCheckTask::RunCheck(void) const @@ -110,7 +117,5 @@ CheckResult NagiosCheckTask::RunCheck(void) const CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service) { - assert(service.GetCheckType() == "nagios"); - return boost::make_shared(service); } diff --git a/icinga/nagioschecktask.h b/icinga/nagioschecktask.h index b45aa1a58..896716217 100644 --- a/icinga/nagioschecktask.h +++ b/icinga/nagioschecktask.h @@ -7,22 +7,27 @@ namespace icinga class I2_ICINGA_API NagiosCheckTask : public CheckTask { public: + typedef shared_ptr Ptr; + typedef weak_ptr WeakPtr; + NagiosCheckTask(const Service& service); virtual void Enqueue(void); - virtual bool IsFinished(void) const; virtual CheckResult GetResult(void); static CheckTask::Ptr CreateTask(const Service& service); static void FlushQueue(void); + static void GetFinishedTasks(vector& tasks); private: string m_Command; - packaged_task m_Task; - unique_future m_Result; + CheckResult m_Result; static vector m_QueuedTasks; + static boost::mutex m_FinishedTasksMutex; + static vector m_FinishedTasks; + void Execute(void); CheckResult RunCheck(void) const; };