diff --git a/lib/base/eventqueue.cpp b/lib/base/eventqueue.cpp index 08349453e..8913903ed 100644 --- a/lib/base/eventqueue.cpp +++ b/lib/base/eventqueue.cpp @@ -29,23 +29,13 @@ using namespace icinga; EventQueue::EventQueue(void) - : m_Stopped(false) + : m_Stopped(false), m_ThreadDeaths(0), m_Latency(0), m_LatencyCount(0) { - m_ThreadCount = boost::thread::hardware_concurrency(); + for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) + m_ThreadStates[i] = ThreadDead; - if (m_ThreadCount == 0) - m_ThreadCount = 1; - - m_ThreadCount *= 8; - - m_ThreadCount = 128; - - m_States = new ThreadState[m_ThreadCount]; - - for (int i = 0; i < m_ThreadCount; i++) { - m_States[i] = ThreadIdle; - m_Threads.create_thread(boost::bind(&EventQueue::QueueThreadProc, this, i)); - } + for (int i = 0; i < 8; i++) + SpawnWorker(); boost::thread reportThread(boost::bind(&EventQueue::ReportThreadProc, this)); reportThread.detach(); @@ -69,7 +59,13 @@ void EventQueue::Stop(void) */ void EventQueue::Join(void) { - m_Threads.join_all(); + boost::mutex::scoped_lock lock(m_Mutex); + + while (!m_Stopped || !m_Events.empty()) { + lock.unlock(); + Utility::Sleep(0.5); + lock.lock(); + } } /** @@ -83,18 +79,26 @@ void EventQueue::QueueThreadProc(int tid) { boost::mutex::scoped_lock lock(m_Mutex); - m_States[tid] = ThreadIdle; + m_ThreadStates[tid] = ThreadIdle; - while (m_Events.empty() && !m_Stopped) + while (m_Events.empty() && !m_Stopped && m_ThreadDeaths == 0) m_CV.wait(lock); + if (m_ThreadDeaths > 0) { + m_ThreadDeaths--; + break; + } + if (m_Events.empty() && m_Stopped) break; event = m_Events.front(); m_Events.pop_front(); - m_States[tid] = ThreadBusy; + m_ThreadStates[tid] = ThreadBusy; + + m_Latency += Utility::GetTime() - event.Timestamp; + m_LatencyCount++; } #ifdef _DEBUG @@ -150,6 +154,8 @@ void EventQueue::QueueThreadProc(int tid) } #endif /* _DEBUG */ } + + m_ThreadStates[tid] = ThreadDead; } /** @@ -161,6 +167,9 @@ void EventQueue::Post(const EventQueueCallback& callback) { boost::mutex::scoped_lock lock(m_Mutex); + if (m_Stopped) + BOOST_THROW_EXCEPTION(std::runtime_error("EventQueue has been stopped.")); + EventQueueWorkItem event; event.Callback = callback; event.Timestamp = Utility::GetTime(); @@ -171,43 +180,79 @@ void EventQueue::Post(const EventQueueCallback& callback) void EventQueue::ReportThreadProc(void) { + double last_adjustment = 0; + for (;;) { Utility::Sleep(5); double now = Utility::GetTime(); - int pending, busy; - double max_latency, avg_latency; + int pending, alive, busy; + double avg_latency; { boost::mutex::scoped_lock lock(m_Mutex); pending = m_Events.size(); + alive = 0; busy = 0; - for (int i = 0; i < m_ThreadCount; i++) { - if (m_States[i] == ThreadBusy) + for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) { + if (m_ThreadStates[i] != ThreadDead) + alive++; + + if (m_ThreadStates[i] == ThreadBusy) busy++; } - max_latency = 0; - avg_latency = 0; + if (m_LatencyCount > 0) + avg_latency = m_Latency / (m_LatencyCount * 1.0); + else + avg_latency = 0; - BOOST_FOREACH(const EventQueueWorkItem& event, m_Events) { - double latency = now - event.Timestamp; + m_Latency = 0; + m_LatencyCount = 0; - avg_latency += latency; - - if (latency > max_latency) - max_latency = latency; + if (pending > 0) { + /* Spawn a few additional workers. */ + for (int i = 0; i < 2; i++) + SpawnWorker(); + } else if (last_adjustment < now - 30) { + KillWorker(); + last_adjustment = now; } - - avg_latency /= pending; } - Log(LogInformation, "base", "Pending tasks: " + Convert::ToString(pending) + "; Busy threads: " + - Convert::ToString(busy) + "; Idle threads: " + Convert::ToString(m_ThreadCount - busy) + - "; Maximum latency: " + Convert::ToString((long)max_latency * 1000) + "ms" - "; Average latency: " + Convert::ToString((long)avg_latency * 1000) + "ms"); + std::ostringstream msgbuf; + msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy << "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms"; + Log(LogInformation, "base", msgbuf.str()); } } + +/** + * Note: Caller must hold m_Mutex + */ +void EventQueue::SpawnWorker(void) +{ + for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) { + if (m_ThreadStates[i] == ThreadDead) { + Log(LogInformation, "debug", "Spawning worker thread."); + + m_ThreadStates[i] = ThreadIdle; + boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i)); + worker.detach(); + + break; + } + } +} + +/** + * Note: Caller must hold m_Mutex. + */ +void EventQueue::KillWorker(void) +{ + Log(LogInformation, "base", "Killing worker thread."); + + m_ThreadDeaths++; +} diff --git a/lib/base/eventqueue.h b/lib/base/eventqueue.h index 2b3187948..f173bdb71 100644 --- a/lib/base/eventqueue.h +++ b/lib/base/eventqueue.h @@ -32,6 +32,7 @@ namespace icinga enum ThreadState { + ThreadDead, ThreadIdle, ThreadBusy }; @@ -61,9 +62,11 @@ public: void Post(const EventQueueCallback& callback); private: - boost::thread_group m_Threads; - ThreadState *m_States; - int m_ThreadCount; + ThreadState m_ThreadStates[512]; + int m_ThreadDeaths; + + double m_Latency; + int m_LatencyCount; boost::mutex m_Mutex; boost::condition_variable m_CV; @@ -73,6 +76,9 @@ private: void QueueThreadProc(int tid); void ReportThreadProc(void); + + void SpawnWorker(void); + void KillWorker(void); }; }