Get rid off threadpool for nagios checks.

This commit is contained in:
Gunnar Beutner 2012-06-24 16:30:16 +02:00
parent 34d26d424f
commit 833ede8b3a
5 changed files with 138 additions and 56 deletions

View File

@ -41,7 +41,7 @@ void CheckerComponent::Start(void)
m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this)); m_CheckTimer->OnTimerExpired.connect(boost::bind(&CheckerComponent::CheckTimerHandler, this));
m_CheckTimer->Start(); m_CheckTimer->Start();
CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue, NagiosCheckTask::GetFinishedTasks); NagiosCheckTask::Register();
m_ResultTimer = boost::make_shared<Timer>(); m_ResultTimer = boost::make_shared<Timer>();
m_ResultTimer->SetInterval(5); m_ResultTimer->SetInterval(5);

View File

@ -3,6 +3,8 @@
using namespace icinga; using namespace icinga;
map<string, CheckTaskType> CheckTask::m_Types; map<string, CheckTaskType> CheckTask::m_Types;
vector<CheckTask::Ptr> CheckTask::m_FinishedTasks;
mutex CheckTask::m_FinishedTasksMutex;
CheckTask::CheckTask(const Service& service) CheckTask::CheckTask(const Service& service)
: m_Service(service) : m_Service(service)
@ -13,12 +15,11 @@ Service CheckTask::GetService(void) const
return m_Service; return m_Service;
} }
void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter) void CheckTask::RegisterType(string type, Factory factory, QueueFlusher qflusher)
{ {
CheckTaskType ctt; CheckTaskType ctt;
ctt.Factory = factory; ctt.Factory = factory;
ctt.QueueFlusher = qflusher; ctt.QueueFlusher = qflusher;
ctt.FinishedTasksGetter = qtasksgetter;
m_Types[type] = ctt; m_Types[type] = ctt;
} }
@ -49,12 +50,16 @@ void CheckTask::FlushQueue(void)
vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void) vector<CheckTask::Ptr> CheckTask::GetFinishedTasks(void)
{ {
vector<CheckTask::Ptr> tasks; mutex::scoped_lock lock(m_FinishedTasksMutex);
map<string, CheckTaskType>::iterator it; vector<CheckTask::Ptr> result = m_FinishedTasks;
for (it = m_Types.begin(); it != m_Types.end(); it++) m_FinishedTasks.clear();
it->second.FinishedTasksGetter(tasks);
return tasks; return result;
} }
void CheckTask::FinishTask(const CheckTask::Ptr& task)
{
mutex::scoped_lock lock(m_FinishedTasksMutex);
m_FinishedTasks.push_back(task);
}

View File

@ -34,17 +34,18 @@ public:
typedef function<CheckTask::Ptr(const Service&)> Factory; typedef function<CheckTask::Ptr(const Service&)> Factory;
typedef function<void()> QueueFlusher; typedef function<void()> QueueFlusher;
typedef function<void (vector<CheckTask::Ptr>& tasks)> FinishedTasksGetter;
Service GetService(void) const; Service GetService(void) const;
virtual void Enqueue(void) = 0; virtual void Enqueue(void) = 0;
virtual CheckResult GetResult(void) = 0; virtual CheckResult GetResult(void) = 0;
static void RegisterType(string type, Factory factory, QueueFlusher qflusher, FinishedTasksGetter qtasksgetter); static void RegisterType(string type, Factory factory, QueueFlusher qflusher);
static CheckTask::Ptr CreateTask(const Service& service); static CheckTask::Ptr CreateTask(const Service& service);
static void Enqueue(const CheckTask::Ptr& task); static void Enqueue(const CheckTask::Ptr& task);
static void FlushQueue(void); static void FlushQueue(void);
static void FinishTask(const CheckTask::Ptr& task);
static vector<CheckTask::Ptr> GetFinishedTasks(void); static vector<CheckTask::Ptr> GetFinishedTasks(void);
protected: protected:
@ -54,13 +55,15 @@ private:
Service m_Service; Service m_Service;
static map<string, CheckTaskType> m_Types; static map<string, CheckTaskType> m_Types;
static vector<CheckTask::Ptr> m_FinishedTasks;
static mutex m_FinishedTasksMutex;
}; };
struct CheckTaskType struct CheckTaskType
{ {
CheckTask::Factory Factory; CheckTask::Factory Factory;
CheckTask::QueueFlusher QueueFlusher; CheckTask::QueueFlusher QueueFlusher;
CheckTask::FinishedTasksGetter FinishedTasksGetter;
}; };
} }

View File

@ -5,13 +5,12 @@
using namespace icinga; using namespace icinga;
list<ThreadPoolTask::Ptr> NagiosCheckTask::m_QueuedTasks; boost::mutex NagiosCheckTask::m_Mutex;
deque<NagiosCheckTask::Ptr> NagiosCheckTask::m_Tasks;
boost::mutex NagiosCheckTask::m_FinishedTasksMutex; condition_variable NagiosCheckTask::m_TasksCV;
vector<CheckTask::Ptr> NagiosCheckTask::m_FinishedTasks;
NagiosCheckTask::NagiosCheckTask(const Service& service) NagiosCheckTask::NagiosCheckTask(const Service& service)
: CheckTask(service) : CheckTask(service), m_FP(NULL)
{ {
string checkCommand = service.GetCheckCommand(); string checkCommand = service.GetCheckCommand();
m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()); // + " 2>&1"; m_Command = MacroProcessor::ResolveMacros(checkCommand, service.GetMacros()); // + " 2>&1";
@ -20,21 +19,16 @@ NagiosCheckTask::NagiosCheckTask(const Service& service)
void NagiosCheckTask::Enqueue(void) void NagiosCheckTask::Enqueue(void)
{ {
time(&m_Result.StartTime); time(&m_Result.StartTime);
m_QueuedTasks.push_back(static_pointer_cast<ThreadPoolTask>(static_cast<NagiosCheckTask::Ptr>(GetSelf())));
// m_QueuedTasks.push_back(new ThreadPool:Task(bind(&NagiosCheckTask::Execute, static_cast<NagiosCheckTask::Ptr>(GetSelf())))); {
mutex::scoped_lock lock(m_Mutex);
m_Tasks.push_back(GetSelf());
}
} }
void NagiosCheckTask::FlushQueue(void) void NagiosCheckTask::FlushQueue(void)
{ {
ThreadPool::GetDefaultPool()->EnqueueTasks(m_QueuedTasks); m_TasksCV.notify_all();
m_QueuedTasks.clear();
}
void NagiosCheckTask::GetFinishedTasks(vector<CheckTask::Ptr>& tasks)
{
mutex::scoped_lock lock(m_FinishedTasksMutex);
std::copy(m_FinishedTasks.begin(), m_FinishedTasks.end(), back_inserter(tasks));
m_FinishedTasks.clear();
} }
CheckResult NagiosCheckTask::GetResult(void) CheckResult NagiosCheckTask::GetResult(void)
@ -42,56 +36,113 @@ CheckResult NagiosCheckTask::GetResult(void)
return m_Result; return m_Result;
} }
void NagiosCheckTask::Execute(void) void NagiosCheckTask::CheckThreadProc(void)
{ {
RunCheck(); mutex::scoped_lock lock(m_Mutex);
{ map<int, NagiosCheckTask::Ptr> tasks;
mutex::scoped_lock lock(m_FinishedTasksMutex); const int maxTasks = 16;
m_FinishedTasks.push_back(GetSelf());
for (;;) {
while (m_Tasks.empty() || tasks.size() >= maxTasks) {
lock.unlock();
map<int, NagiosCheckTask::Ptr>::iterator it, prev;
#ifndef _MSC_VER
fd_set readfds;
int nfds = 0;
FD_ZERO(&readfds);
for (it = tasks.begin(); it != tasks.end(); it++) {
if (it->first > nfds)
nfds = it->first;
FD_SET(it->first, &readfds);
}
timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
select(nfds + 1, &readfds, NULL, NULL, &tv);
#else /* _MSC_VER */
Sleep(1000);
#endif /* _MSC_VER */
for (it = tasks.begin(); it != tasks.end(); ) {
#ifndef _MSC_VER
if (!FD_ISSET(it->first, &readfds)) {
it++;
continue;
}
#endif /* _MSC_VER */
if (!it->second->RunTask()) {
CheckTask::FinishTask(it->second);
it = tasks.erase(it);
} else {
it++;
}
}
lock.lock();
}
while (!m_Tasks.empty() && tasks.size() < maxTasks) {
NagiosCheckTask::Ptr task = m_Tasks.front();
m_Tasks.pop_front();
if (!task->InitTask()) {
CheckTask::FinishTask(task);
} else {
int fd = task->GetFD();
if (fd >= 0)
tasks[fd] = task;
}
}
} }
} }
void NagiosCheckTask::RunCheck(void) bool NagiosCheckTask::InitTask(void)
{ {
FILE *fp;
#ifdef _MSC_VER #ifdef _MSC_VER
fp = _popen(m_Command.c_str(), "r"); m_FP = _popen(m_Command.c_str(), "r");
#else /* _MSC_VER */ #else /* _MSC_VER */
bool use_libc_popen = false; bool use_libc_popen = false;
popen_noshell_pass_to_pclose pclose_arg; popen_noshell_pass_to_pclose pclose_arg;
if (!use_libc_popen) { if (!use_libc_popen) {
fp = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg); m_FP = popen_noshell_compat(m_Command.c_str(), "r", &pclose_arg);
if (fp == NULL) // TODO: add check for valgrind if (m_FP == NULL) // TODO: add check for valgrind
use_libc_popen = true; use_libc_popen = true;
} }
if (use_libc_popen) if (use_libc_popen)
fp = popen(m_Command.c_str(), "r"); m_FP = popen(m_Command.c_str(), "r");
#endif /* _MSC_VER */ #endif /* _MSC_VER */
stringstream outputbuf; return (m_FP != NULL);
}
while (!feof(fp)) { bool NagiosCheckTask::RunTask(void)
{
char buffer[512]; char buffer[512];
size_t read = fread(buffer, 1, sizeof(buffer), fp); size_t read = fread(buffer, 1, sizeof(buffer), m_FP);
if (read == 0) if (read > 0)
break; m_OutputStream.write(buffer, read);
outputbuf.write(buffer, read); if (!feof(m_FP))
} return true;
m_Result.Output = outputbuf.str(); m_Result.Output = m_OutputStream.str();
boost::algorithm::trim(m_Result.Output); boost::algorithm::trim(m_Result.Output);
int status, exitcode; int status, exitcode;
#ifdef _MSC_VER #ifdef _MSC_VER
status = _pclose(fp); status = _pclose(m_FP);
#else /* _MSC_VER */ #else /* _MSC_VER */
if (use_libc_popen) if (use_libc_popen)
status = pclose(fp); status = pclose(fp);
@ -128,9 +179,26 @@ void NagiosCheckTask::RunCheck(void)
#endif /* _MSC_VER */ #endif /* _MSC_VER */
time(&m_Result.EndTime); time(&m_Result.EndTime);
return false;
}
int NagiosCheckTask::GetFD(void) const
{
return fileno(m_FP);
} }
CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service) CheckTask::Ptr NagiosCheckTask::CreateTask(const Service& service)
{ {
return boost::make_shared<NagiosCheckTask>(service); return boost::make_shared<NagiosCheckTask>(service);
} }
void NagiosCheckTask::Register(void)
{
CheckTask::RegisterType("nagios", NagiosCheckTask::CreateTask, NagiosCheckTask::FlushQueue);
for (int i = 0; i < 1; i++) {
thread t(&NagiosCheckTask::CheckThreadProc);
t.detach();
}
}

View File

@ -4,7 +4,7 @@
namespace icinga namespace icinga
{ {
class I2_ICINGA_API NagiosCheckTask : public CheckTask, public ThreadPoolTask class I2_ICINGA_API NagiosCheckTask : public CheckTask
{ {
public: public:
typedef shared_ptr<NagiosCheckTask> Ptr; typedef shared_ptr<NagiosCheckTask> Ptr;
@ -17,19 +17,25 @@ public:
static CheckTask::Ptr CreateTask(const Service& service); static CheckTask::Ptr CreateTask(const Service& service);
static void FlushQueue(void); static void FlushQueue(void);
static void GetFinishedTasks(vector<CheckTask::Ptr>& tasks);
static void Register(void);
private: private:
string m_Command; string m_Command;
CheckResult m_Result; CheckResult m_Result;
static list<ThreadPoolTask::Ptr> m_QueuedTasks; FILE *m_FP;
stringstream m_OutputStream;
static boost::mutex m_FinishedTasksMutex; static boost::mutex m_Mutex;
static vector<CheckTask::Ptr> m_FinishedTasks; static deque<NagiosCheckTask::Ptr> m_Tasks;
static condition_variable m_TasksCV;
virtual void Execute(void); static void CheckThreadProc(void);
void RunCheck(void);
bool InitTask(void);
bool RunTask(void);
int GetFD(void) const;
}; };
} }