diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 2bd7d298d..790e9d340 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -286,6 +286,11 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti return; service->ProcessCheckResult(cr); + + /* Reschedule the next check. The side effect of this is that for as long + * as we receive results for a service we won't execute any + * active checks. */ + service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval()); } } diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index 45455dafa..4ef83f9ea 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -30,7 +30,7 @@ using namespace icinga; ThreadPool::ThreadPool(void) - : m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), + : m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0), m_Stopped(false) { for (int i = 0; i < 2; i++) @@ -106,13 +106,11 @@ void ThreadPool::QueueThreadProc(int tid) UpdateThreadUtilization(tid, ThreadIdle); - while (m_WorkItems.empty() && !m_Stopped && m_ThreadDeaths == 0) + while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie) m_WorkCV.wait(lock); - if (m_ThreadDeaths > 0) { - m_ThreadDeaths--; + if (m_ThreadStats[tid].Zombie) break; - } if (m_WorkItems.empty() && m_Stopped) break; @@ -191,6 +189,7 @@ void ThreadPool::QueueThreadProc(int tid) } UpdateThreadUtilization(tid, ThreadDead); + m_ThreadStats[tid].Zombie = false; } /** @@ -240,7 +239,7 @@ void ThreadPool::ManagerThreadProc(void) alive = 0; for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State != ThreadDead) { + if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) { alive++; utilization += m_ThreadStats[i].Utilization * 100; } @@ -254,7 +253,12 @@ void ThreadPool::ManagerThreadProc(void) avg_latency = 0; if (utilization < 60 || utilization > 80 || alive < 2) { - int tthreads = ceil((utilization * alive) / 80.0) - alive; + double wthreads = ceil((utilization * alive) / 80.0); + + if (!finite(wthreads)) + wthreads = 0; + + int tthreads = wthreads - alive; /* Don't ever kill the last 2 threads. */ if (alive + tthreads < 2) @@ -264,6 +268,10 @@ void ThreadPool::ManagerThreadProc(void) if (tthreads > 0 && pending > 0) tthreads = 8; + std::ostringstream msgbuf; + msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads; + Log(LogDebug, "base", msgbuf.str()); + for (int i = 0; i < -tthreads; i++) KillWorker(); @@ -312,9 +320,16 @@ void ThreadPool::SpawnWorker(void) */ void ThreadPool::KillWorker(void) { - Log(LogDebug, "base", "Killing worker thread."); + for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { + if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) { + Log(LogDebug, "base", "Killing worker thread."); - m_ThreadDeaths++; + m_ThreadStats[i].Zombie = true; + m_WorkCV.notify_all(); + + break; + } + } } void ThreadPool::StatsThreadProc(void) diff --git a/lib/base/threadpool.h b/lib/base/threadpool.h index b6eb76f0f..8caf83c37 100644 --- a/lib/base/threadpool.h +++ b/lib/base/threadpool.h @@ -60,16 +60,16 @@ private: struct ThreadStats { ThreadState State; + bool Zombie; double Utilization; double LastUpdate; ThreadStats(ThreadState state = ThreadDead) - : State(state), Utilization(0), LastUpdate(0) + : State(state), Zombie(false), Utilization(0), LastUpdate(0) { } }; ThreadStats m_ThreadStats[512]; - int m_ThreadDeaths; boost::thread m_ManagerThread; boost::thread m_StatsThread; diff --git a/lib/base/utility.h b/lib/base/utility.h index 7288f61fd..b3f7d8e0b 100644 --- a/lib/base/utility.h +++ b/lib/base/utility.h @@ -93,7 +93,7 @@ private: # include # define ASSERT(expr) assert(expr) #else /* _DEBUG */ -# define ASSERT(expr) +# define ASSERT(expr) __builtin_unreachable() #endif /* _DEBUG */ #endif /* UTILITY_H */