Bugfixes for the Process/AsyncTask classes.

This commit is contained in:
Gunnar Beutner 2012-07-14 12:44:37 +02:00
parent b176963c93
commit eb2e4ac205
12 changed files with 94 additions and 47 deletions

View File

@ -30,8 +30,10 @@ public:
typedef shared_ptr<AsyncTask<T> > Ptr;
typedef weak_ptr<AsyncTask<T> > WeakPtr;
AsyncTask(void)
: m_Finished(false)
typedef function<void (const shared_ptr<T>&)> CompletionCallback;
AsyncTask(const CompletionCallback& completionCallback)
: m_Finished(false), m_CompletionCallback(completionCallback)
{ }
~AsyncTask(void)
@ -46,17 +48,24 @@ public:
Run();
}
boost::signal<void (const shared_ptr<T>&)> OnTaskCompleted;
protected:
virtual void Run(void) = 0;
void Finish(void)
{
Event::Post(boost::bind(boost::cref(OnTaskCompleted), static_cast<shared_ptr<T> >(GetSelf())));
Event::Post(boost::bind(&T::ForwardCallback, static_cast<shared_ptr<T> >(GetSelf())));
}
private:
void ForwardCallback(void)
{
m_CompletionCallback(GetSelf());
m_CompletionCallback = CompletionCallback();
m_Finished = true;
}
bool m_Finished;
CompletionCallback m_CompletionCallback;
};
}

View File

@ -191,3 +191,8 @@ ConfigObject::TMap::Range ConfigObject::GetObjects(string type)
{
return GetObjectsByType()->GetRange(type);
}
void ConfigObject::RemoveTag(const string& key)
{
GetTags()->Remove(key);
}

View File

@ -63,6 +63,8 @@ public:
return GetTags()->Get(key, value);
}
void RemoveTag(const string& key);
string GetType(void) const;
string GetName(void) const;

View File

@ -25,26 +25,19 @@
using namespace icinga;
bool Process::m_ThreadsCreated = false;
bool Process::m_ThreadCreated = false;
boost::mutex Process::m_Mutex;
deque<Process::Ptr> Process::m_Tasks;
condition_variable Process::m_TasksCV;
Process::Process(const string& command)
: m_Command(command)
Process::Process(const string& command, const CompletionCallback& completionCallback)
: AsyncTask<Process>(completionCallback), m_Command(command), m_UsePopen(false)
{
if (!m_ThreadsCreated) {
int numThreads = boost::thread::hardware_concurrency();
if (!m_ThreadCreated) {
thread t(&Process::WorkerThreadProc);
t.detach();
if (numThreads < 4)
numThreads = 4;
for (int i = 0; i < numThreads; i++) {
thread t(&Process::WorkerThreadProc);
t.detach();
}
m_ThreadsCreated = true;
m_ThreadCreated = true;
}
}
@ -52,7 +45,7 @@ void Process::Run(void)
{
mutex::scoped_lock lock(m_Mutex);
m_Tasks.push_back(GetSelf());
m_TasksCV.notify_one();
m_TasksCV.notify_all();
}
void Process::WorkerThreadProc(void)
@ -116,6 +109,9 @@ void Process::WorkerThreadProc(void)
while (!m_Tasks.empty() && tasks.size() < MaxTasksPerThread) {
Process::Ptr task = m_Tasks.front();
m_Tasks.pop_front();
lock.unlock();
if (!task->InitTask()) {
task->Finish();
} else {
@ -123,12 +119,16 @@ void Process::WorkerThreadProc(void)
if (fd >= 0)
tasks[fd] = task;
}
lock.lock();
}
}
}
bool Process::InitTask(void)
{
time(&m_ExecutionStart);
#ifdef _MSC_VER
m_FP = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */
@ -147,6 +147,8 @@ bool Process::InitTask(void)
#endif /* _MSC_VER */
if (m_FP == NULL) {
m_ExitStatus = 128;
m_ExecutionEnd = m_ExecutionStart;
return false;
}
@ -178,6 +180,8 @@ bool Process::RunTask(void)
}
#endif /* _MSC_VER */
time(&m_ExecutionEnd);
#ifndef _MSC_VER
if (WIFEXITED(status)) {
exitcode = WEXITSTATUS(status);
@ -220,3 +224,14 @@ string Process::GetOutput(void) const
{
return m_Output;
}
time_t Process::GetExecutionStart(void) const
{
return m_ExecutionStart;
}
time_t Process::GetExecutionEnd(void) const
{
return m_ExecutionEnd;
}

View File

@ -31,13 +31,16 @@ public:
static const int MaxTasksPerThread = 128;
Process(const string& command);
Process(const string& command, const CompletionCallback& completionCallback);
time_t GetExecutionStart(void) const;
time_t GetExecutionEnd(void) const;
long GetExitStatus(void) const;
string GetOutput(void) const;
private:
static bool m_ThreadsCreated;
static bool m_ThreadCreated;
string m_Command;
@ -48,6 +51,8 @@ private:
void *m_PCloseArg;
#endif /* _MSC_VER */
time_t m_ExecutionStart;
time_t m_ExecutionEnd;
long m_ExitStatus;
string m_Output;

View File

@ -23,8 +23,8 @@ using namespace icinga;
map<string, CheckTaskType> CheckTask::m_Types;
CheckTask::CheckTask(const Service& service)
: m_Service(service)
CheckTask::CheckTask(const Service& service, const CompletionCallback& completionCallback)
: AsyncTask<CheckTask>(completionCallback), m_Service(service)
{ }
Service& CheckTask::GetService(void)
@ -45,7 +45,7 @@ void CheckTask::RegisterType(string type, Factory factory)
m_Types[type] = ctt;
}
CheckTask::Ptr CheckTask::CreateTask(const Service& service)
CheckTask::Ptr CheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback)
{
map<string, CheckTaskType>::iterator it;
@ -54,5 +54,5 @@ CheckTask::Ptr CheckTask::CreateTask(const Service& service)
if (it == m_Types.end())
throw runtime_error("Invalid check type specified for service '" + service.GetName() + "'");
return it->second.Factory(service);
return it->second.Factory(service, completionCallback);
}

View File

@ -31,18 +31,18 @@ public:
typedef shared_ptr<CheckTask> Ptr;
typedef weak_ptr<CheckTask> WeakPtr;
typedef function<CheckTask::Ptr(const Service&)> Factory;
typedef function<CheckTask::Ptr(const Service&, const CompletionCallback&)> Factory;
Service& GetService(void);
CheckResult& GetResult(void);
static void RegisterType(string type, Factory factory);
static CheckTask::Ptr CreateTask(const Service& service);
static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback);
static int GetTaskHistogramSlots(void);
protected:
CheckTask(const Service& service);
CheckTask(const Service& service, const CompletionCallback& completionCallback);
virtual void Run(void) = 0;

View File

@ -40,3 +40,8 @@ ConfigObject::Ptr ConfigObjectAdapter::GetConfigObject() const
{
return m_ConfigObject;
}
void ConfigObjectAdapter::RemoveTag(const string& key)
{
m_ConfigObject->RemoveTag(key);
}

View File

@ -55,6 +55,8 @@ public:
return GetConfigObject()->GetTag(key, value);
}
void RemoveTag(const string& key);
private:
ConfigObject::Ptr m_ConfigObject;
};

View File

@ -21,8 +21,8 @@
using namespace icinga;
NagiosCheckTask::NagiosCheckTask(const Service& service)
: CheckTask(service)
NagiosCheckTask::NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback)
: CheckTask(service, completionCallback)
{
string checkCommand = service.GetCheckCommand();
@ -30,8 +30,7 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
macroDicts.push_back(service.GetMacros());
macroDicts.push_back(service.GetHost().GetMacros());
macroDicts.push_back(IcingaApplication::GetInstance()->GetMacros());
string command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
m_Process = boost::make_shared<Process>(command);
m_Command = MacroProcessor::ResolveMacros(checkCommand, macroDicts);
}
void NagiosCheckTask::Run(void)
@ -40,22 +39,22 @@ void NagiosCheckTask::Run(void)
time(&now);
GetResult().SetScheduleStart(now);
m_Process->OnTaskCompleted.connect(boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
m_Process = boost::make_shared<Process>(m_Command, boost::bind(&NagiosCheckTask::ProcessFinishedHandler, static_cast<NagiosCheckTask::Ptr>(GetSelf())));
m_Process->Start();
}
void NagiosCheckTask::ProcessFinishedHandler(void)
{
time_t now;
time(&now);
GetResult().SetExecutionEnd(now);
GetResult().SetExecutionStart(m_Process->GetExecutionStart());
GetResult().SetExecutionEnd(m_Process->GetExecutionEnd());
string output = m_Process->GetOutput();
long exitcode = m_Process->GetExitStatus();
m_Process.reset();
boost::algorithm::trim(output);
ProcessCheckOutput(output);
long exitcode = m_Process->GetExitStatus();
ServiceState state;
switch (exitcode) {
@ -75,6 +74,7 @@ void NagiosCheckTask::ProcessFinishedHandler(void)
GetResult().SetState(state);
time_t now;
time(&now);
GetResult().SetScheduleEnd(now);
@ -114,9 +114,9 @@ void NagiosCheckTask::ProcessCheckOutput(const string& output)
GetResult().SetPerformanceDataRaw(perfdata);
}
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service, const CompletionCallback& completionCallback)
{
return boost::make_shared<NagiosCheckTask>(service);
return boost::make_shared<NagiosCheckTask>(service, completionCallback);
}
void NagiosCheckTask::Register(void)

View File

@ -29,13 +29,14 @@ public:
typedef shared_ptr<NagiosCheckTask> Ptr;
typedef weak_ptr<NagiosCheckTask> WeakPtr;
NagiosCheckTask(const Service& service);
NagiosCheckTask(const Service& service, const CompletionCallback& completionCallback);
static CheckTask::Ptr CreateTask(const Service& service);
static CheckTask::Ptr CreateTask(const Service& service, const CompletionCallback& completionCallback);
static void Register(void);
private:
string m_Command;
Process::Ptr m_Process;
virtual void Run(void);

View File

@ -37,7 +37,7 @@ void CheckerComponent::Start(void)
EndpointManager::GetInstance()->RegisterEndpoint(m_Endpoint);
m_CheckTimer = boost::make_shared<Timer>();
m_CheckTimer->SetInterval(5);
m_CheckTimer->SetInterval(1);
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start();
@ -78,10 +78,11 @@ void CheckerComponent::CheckTimerHandler(void)
m_PendingServices.insert(service.GetConfigObject());
CheckTask::Ptr task = CheckTask::CreateTask(service);
task->OnTaskCompleted.connect(boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
CheckTask::Ptr task = CheckTask::CreateTask(service, boost::bind(&CheckerComponent::CheckCompletedHandler, this, _1));
task->Start();
service.SetTag("current_task", task);
tasks++;
}
@ -96,8 +97,10 @@ void CheckerComponent::CheckCompletedHandler(const CheckTask::Ptr& task)
{
Service service = task->GetService();
service.RemoveTag("current_task");
/* if the service isn't in the set of pending services
* it was removed and we need to ignore this check result. */
* it was removed and we need to ignore this check result. */
if (m_PendingServices.find(service.GetConfigObject()) == m_PendingServices.end())
return;