From eb2e4ac205100f885d8967457e99178e9a68eb7f Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Sat, 14 Jul 2012 12:44:37 +0200 Subject: [PATCH] Bugfixes for the Process/AsyncTask classes. --- base/asynctask.h | 19 ++++++++--- base/configobject.cpp | 5 +++ base/configobject.h | 2 ++ base/process.cpp | 45 ++++++++++++++++--------- base/process.h | 9 +++-- cib/checktask.cpp | 8 ++--- cib/checktask.h | 6 ++-- cib/configobjectadapter.cpp | 5 +++ cib/configobjectadapter.h | 2 ++ cib/nagioschecktask.cpp | 24 ++++++------- cib/nagioschecktask.h | 5 +-- components/checker/checkercomponent.cpp | 11 +++--- 12 files changed, 94 insertions(+), 47 deletions(-) diff --git a/base/asynctask.h b/base/asynctask.h index 75fe0283b..9e25e7f32 100644 --- a/base/asynctask.h +++ b/base/asynctask.h @@ -30,8 +30,10 @@ public: typedef shared_ptr > Ptr; typedef weak_ptr > WeakPtr; - AsyncTask(void) - : m_Finished(false) + typedef function&)> CompletionCallback; + + AsyncTask(const CompletionCallback& completionCallback) + : m_Finished(false), m_CompletionCallback(completionCallback) { } ~AsyncTask(void) @@ -46,17 +48,24 @@ public: Run(); } - boost::signal&)> OnTaskCompleted; - protected: virtual void Run(void) = 0; void Finish(void) { - Event::Post(boost::bind(boost::cref(OnTaskCompleted), static_cast >(GetSelf()))); + Event::Post(boost::bind(&T::ForwardCallback, static_cast >(GetSelf()))); + } + +private: + void ForwardCallback(void) + { + m_CompletionCallback(GetSelf()); + m_CompletionCallback = CompletionCallback(); + m_Finished = true; } bool m_Finished; + CompletionCallback m_CompletionCallback; }; } diff --git a/base/configobject.cpp b/base/configobject.cpp index 13b6f7d5c..c80c09c08 100644 --- a/base/configobject.cpp +++ b/base/configobject.cpp @@ -191,3 +191,8 @@ ConfigObject::TMap::Range ConfigObject::GetObjects(string type) { return GetObjectsByType()->GetRange(type); } + +void ConfigObject::RemoveTag(const string& key) +{ + GetTags()->Remove(key); +} diff --git a/base/configobject.h b/base/configobject.h index 2a5bd31b0..6fbb1427c 100644 --- a/base/configobject.h +++ b/base/configobject.h @@ -63,6 +63,8 @@ public: return GetTags()->Get(key, value); } + void RemoveTag(const string& key); + string GetType(void) const; string GetName(void) const; diff --git a/base/process.cpp b/base/process.cpp index 2d1d8292b..50c4feabe 100644 --- a/base/process.cpp +++ b/base/process.cpp @@ -25,26 +25,19 @@ using namespace icinga; -bool Process::m_ThreadsCreated = false; +bool Process::m_ThreadCreated = false; boost::mutex Process::m_Mutex; deque Process::m_Tasks; condition_variable Process::m_TasksCV; -Process::Process(const string& command) - : m_Command(command) +Process::Process(const string& command, const CompletionCallback& completionCallback) + : AsyncTask(completionCallback), m_Command(command), m_UsePopen(false) { - if (!m_ThreadsCreated) { - int numThreads = boost::thread::hardware_concurrency(); + if (!m_ThreadCreated) { + thread t(&Process::WorkerThreadProc); + t.detach(); - if (numThreads < 4) - numThreads = 4; - - for (int i = 0; i < numThreads; i++) { - thread t(&Process::WorkerThreadProc); - t.detach(); - } - - m_ThreadsCreated = true; + m_ThreadCreated = true; } } @@ -52,7 +45,7 @@ void Process::Run(void) { mutex::scoped_lock lock(m_Mutex); m_Tasks.push_back(GetSelf()); - m_TasksCV.notify_one(); + m_TasksCV.notify_all(); } void Process::WorkerThreadProc(void) @@ -116,6 +109,9 @@ void Process::WorkerThreadProc(void) while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) { Process::Ptr task = m_Tasks.front(); m_Tasks.pop_front(); + + lock.unlock(); + if (!task->InitTask()) { task->Finish(); } else { @@ -123,12 +119,16 @@ void Process::WorkerThreadProc(void) if (fd >= 0) tasks[fd] = task; } + + lock.lock(); } } } bool Process::InitTask(void) { + time(&m_ExecutionStart); + #ifdef _MSC_VER m_FP = _popen(m_Command.c_str(), "r"); #else /* _MSC_VER */ @@ -147,6 +147,8 @@ bool Process::InitTask(void) #endif /* _MSC_VER */ if (m_FP == NULL) { + m_ExitStatus = 128; + m_ExecutionEnd = m_ExecutionStart; return false; } @@ -178,6 +180,8 @@ bool Process::RunTask(void) } #endif /* _MSC_VER */ + time(&m_ExecutionEnd); + #ifndef _MSC_VER if (WIFEXITED(status)) { exitcode = WEXITSTATUS(status); @@ -220,3 +224,14 @@ string Process::GetOutput(void) const { return m_Output; } + +time_t Process::GetExecutionStart(void) const +{ + return m_ExecutionStart; +} + +time_t Process::GetExecutionEnd(void) const +{ + return m_ExecutionEnd; +} + diff --git a/base/process.h b/base/process.h index 4aa0e7f77..a5d11b85a 100644 --- a/base/process.h +++ b/base/process.h @@ -31,13 +31,16 @@ public: static const int MaxTasksPerThread = 128; - Process(const string& command); + Process(const string& command, const CompletionCallback& completionCallback); + + time_t GetExecutionStart(void) const; + time_t GetExecutionEnd(void) const; long GetExitStatus(void) const; string GetOutput(void) const; private: - static bool m_ThreadsCreated; + static bool m_ThreadCreated; string m_Command; @@ -48,6 +51,8 @@ private: void *m_PCloseArg; #endif /* _MSC_VER */ + time_t m_ExecutionStart; + time_t m_ExecutionEnd; long m_ExitStatus; string m_Output; diff --git a/cib/checktask.cpp b/cib/checktask.cpp index 91afd23be..e75f3799d 100644 --- a/cib/checktask.cpp +++ b/cib/checktask.cpp @@ -23,8 +23,8 @@ using namespace icinga; map CheckTask::m_Types; -CheckTask::CheckTask(const Service& service) - : m_Service(service) +CheckTask::CheckTask(const Service& service, const CompletionCallback& completionCallback) + : AsyncTask(completionCallback), m_Service(service) { } Service& CheckTask::GetService(void) @@ -45,7 +45,7 @@ void CheckTask::RegisterType(string type, Factory factory) m_Types[type] = ctt; } -CheckTask::Ptr CheckTask::CreateTask(const Service& service) +CheckTask::Ptr CheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback) { map::iterator it; @@ -54,5 +54,5 @@ CheckTask::Ptr CheckTask::CreateTask(const Service& service) if (it == m_Types.end()) throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'"); - return it->second.Factory(service); + return it->second.Factory(service, completionCallback); } diff --git a/cib/checktask.h b/cib/checktask.h index 321300158..db20668c7 100644 --- a/cib/checktask.h +++ b/cib/checktask.h @@ -31,18 +31,18 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - typedef function Factory; + typedef function Factory; Service& GetService(void); CheckResult& GetResult(void); static void RegisterType(string type, Factory factory); - static CheckTask::Ptr CreateTask(const Service& service); + static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback); static int GetTaskHistogramSlots(void); protected: - CheckTask(const Service& service); + CheckTask(const Service& service, const CompletionCallback& completionCallback); virtual void Run(void) = 0; diff --git a/cib/configobjectadapter.cpp b/cib/configobjectadapter.cpp index fc7279c65..4185552b8 100644 --- a/cib/configobjectadapter.cpp +++ b/cib/configobjectadapter.cpp @@ -40,3 +40,8 @@ ConfigObject::Ptr ConfigObjectAdapter::GetConfigObject() const { return m_ConfigObject; } + +void ConfigObjectAdapter::RemoveTag(const string& key) +{ + m_ConfigObject->RemoveTag(key); +} diff --git a/cib/configobjectadapter.h b/cib/configobjectadapter.h index 4677b3f85..6036a4464 100644 --- a/cib/configobjectadapter.h +++ b/cib/configobjectadapter.h @@ -55,6 +55,8 @@ public: return GetConfigObject()->GetTag(key, value); } + void RemoveTag(const string& key); + private: ConfigObject::Ptr m_ConfigObject; }; diff --git a/cib/nagioschecktask.cpp b/cib/nagioschecktask.cpp index 5b6dc23fc..f2077cacf 100644 --- a/cib/nagioschecktask.cpp +++ b/cib/nagioschecktask.cpp @@ -21,8 +21,8 @@ using namespace icinga; -NagiosCheckTask::NagiosCheckTask(const Service& service) - : CheckTask(service) +NagiosCheckTask::NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback) + : CheckTask(service, completionCallback) { string checkCommand = service.GetCheckCommand(); @@ -30,8 +30,7 @@ NagiosCheckTask::NagiosCheckTask(const Service& service) macroDicts.push_back(service.GetMacros()); macroDicts.push_back(service.GetHost().GetMacros()); macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros()); - string command = MacroProcessor::ResolveMacros(checkCommand, macroDicts); - m_Process = boost::make_shared(command); + m_Command = MacroProcessor::ResolveMacros(checkCommand, macroDicts); } void NagiosCheckTask::Run(void) @@ -40,22 +39,22 @@ void NagiosCheckTask::Run(void) time(&now); GetResult().SetScheduleStart(now); - m_Process->OnTaskCompleted.connect(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast(GetSelf()))); + m_Process = boost::make_shared(m_Command, boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast(GetSelf()))); m_Process->Start(); } void NagiosCheckTask::ProcessFinishedHandler(void) { - time_t now; - time(&now); - GetResult().SetExecutionEnd(now); + GetResult().SetExecutionStart(m_Process->GetExecutionStart()); + GetResult().SetExecutionEnd(m_Process->GetExecutionEnd()); string output = m_Process->GetOutput(); + long exitcode = m_Process->GetExitStatus(); + m_Process.reset(); + boost::algorithm::trim(output); ProcessCheckOutput(output); - long exitcode = m_Process->GetExitStatus(); - ServiceState state; switch (exitcode) { @@ -75,6 +74,7 @@ void NagiosCheckTask::ProcessFinishedHandler(void) GetResult().SetState(state); + time_t now; time(&now); GetResult().SetScheduleEnd(now); @@ -114,9 +114,9 @@ void NagiosCheckTask::ProcessCheckOutput(const string& output) GetResult().SetPerformanceDataRaw(perfdata); } -CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service) +CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback) { - return boost::make_shared(service); + return boost::make_shared(service, completionCallback); } void NagiosCheckTask::Register(void) diff --git a/cib/nagioschecktask.h b/cib/nagioschecktask.h index 0375324d0..1a3e4a581 100644 --- a/cib/nagioschecktask.h +++ b/cib/nagioschecktask.h @@ -29,13 +29,14 @@ public: typedef shared_ptr Ptr; typedef weak_ptr WeakPtr; - NagiosCheckTask(const Service& service); + NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback); - static CheckTask::Ptr CreateTask(const Service& service); + static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback); static void Register(void); private: + string m_Command; Process::Ptr m_Process; virtual void Run(void); diff --git a/components/checker/checkercomponent.cpp b/components/checker/checkercomponent.cpp index 89ae89d35..6c7d5416e 100644 --- a/components/checker/checkercomponent.cpp +++ b/components/checker/checkercomponent.cpp @@ -37,7 +37,7 @@ void CheckerComponent::Start(void) EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint); m_CheckTimer = boost::make_shared(); - m_CheckTimer->SetInterval(5); + m_CheckTimer->SetInterval(1); m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->Start(); @@ -78,10 +78,11 @@ void CheckerComponent::CheckTimerHandler(void) m_PendingServices.insert(service.GetConfigObject()); - CheckTask::Ptr task = CheckTask::CreateTask(service); - task->OnTaskCompleted.connect(boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1)); + CheckTask::Ptr task = CheckTask::CreateTask(service, boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1)); task->Start(); + service.SetTag("current_task", task); + tasks++; } @@ -96,8 +97,10 @@ void CheckerComponent::CheckCompletedHandler(const CheckTask::Ptr& task) { Service service = task->GetService(); + service.RemoveTag("current_task"); + /* if the service isn't in the set of pending services - * it was removed and we need to ignore this check result. */ + * it was removed and we need to ignore this check result. */ if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end()) return;