Bug fix for ThreadPool::KillWorker().

This commit is contained in:
Gunnar Beutner 2013-08-27 15:57:00 +02:00
parent f49bb3d150
commit 42cc9cc9b5
4 changed files with 32 additions and 12 deletions

View File

@ -286,6 +286,11 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& endpoint, const Dicti
return; return;
service->ProcessCheckResult(cr); service->ProcessCheckResult(cr);
/* Reschedule the next check. The side effect of this is that for as long
* as we receive results for a service we won't execute any
* active checks. */
service->SetNextCheck(Utility::GetTime() + service->GetCheckInterval());
} }
} }

View File

@ -30,7 +30,7 @@
using namespace icinga; using namespace icinga;
ThreadPool::ThreadPool(void) ThreadPool::ThreadPool(void)
: m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), : m_WaitTime(0), m_ServiceTime(0),
m_TaskCount(0), m_Stopped(false) m_TaskCount(0), m_Stopped(false)
{ {
for (int i = 0; i < 2; i++) for (int i = 0; i < 2; i++)
@ -106,13 +106,11 @@ void ThreadPool::QueueThreadProc(int tid)
UpdateThreadUtilization(tid, ThreadIdle); UpdateThreadUtilization(tid, ThreadIdle);
while (m_WorkItems.empty() && !m_Stopped && m_ThreadDeaths == 0) while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie)
m_WorkCV.wait(lock); m_WorkCV.wait(lock);
if (m_ThreadDeaths > 0) { if (m_ThreadStats[tid].Zombie)
m_ThreadDeaths--;
break; break;
}
if (m_WorkItems.empty() && m_Stopped) if (m_WorkItems.empty() && m_Stopped)
break; break;
@ -191,6 +189,7 @@ void ThreadPool::QueueThreadProc(int tid)
} }
UpdateThreadUtilization(tid, ThreadDead); UpdateThreadUtilization(tid, ThreadDead);
m_ThreadStats[tid].Zombie = false;
} }
/** /**
@ -240,7 +239,7 @@ void ThreadPool::ManagerThreadProc(void)
alive = 0; alive = 0;
for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
if (m_ThreadStats[i].State != ThreadDead) { if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) {
alive++; alive++;
utilization += m_ThreadStats[i].Utilization * 100; utilization += m_ThreadStats[i].Utilization * 100;
} }
@ -254,7 +253,12 @@ void ThreadPool::ManagerThreadProc(void)
avg_latency = 0; avg_latency = 0;
if (utilization < 60 || utilization > 80 || alive < 2) { if (utilization < 60 || utilization > 80 || alive < 2) {
int tthreads = ceil((utilization * alive) / 80.0) - alive; double wthreads = ceil((utilization * alive) / 80.0);
if (!finite(wthreads))
wthreads = 0;
int tthreads = wthreads - alive;
/* Don't ever kill the last 2 threads. */ /* Don't ever kill the last 2 threads. */
if (alive + tthreads < 2) if (alive + tthreads < 2)
@ -264,6 +268,10 @@ void ThreadPool::ManagerThreadProc(void)
if (tthreads > 0 && pending > 0) if (tthreads > 0 && pending > 0)
tthreads = 8; tthreads = 8;
std::ostringstream msgbuf;
msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads;
Log(LogDebug, "base", msgbuf.str());
for (int i = 0; i < -tthreads; i++) for (int i = 0; i < -tthreads; i++)
KillWorker(); KillWorker();
@ -312,9 +320,16 @@ void ThreadPool::SpawnWorker(void)
*/ */
void ThreadPool::KillWorker(void) void ThreadPool::KillWorker(void)
{ {
for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) {
Log(LogDebug, "base", "Killing worker thread."); Log(LogDebug, "base", "Killing worker thread.");
m_ThreadDeaths++; m_ThreadStats[i].Zombie = true;
m_WorkCV.notify_all();
break;
}
}
} }
void ThreadPool::StatsThreadProc(void) void ThreadPool::StatsThreadProc(void)

View File

@ -60,16 +60,16 @@ private:
struct ThreadStats struct ThreadStats
{ {
ThreadState State; ThreadState State;
bool Zombie;
double Utilization; double Utilization;
double LastUpdate; double LastUpdate;
ThreadStats(ThreadState state = ThreadDead) ThreadStats(ThreadState state = ThreadDead)
: State(state), Utilization(0), LastUpdate(0) : State(state), Zombie(false), Utilization(0), LastUpdate(0)
{ } { }
}; };
ThreadStats m_ThreadStats[512]; ThreadStats m_ThreadStats[512];
int m_ThreadDeaths;
boost::thread m_ManagerThread; boost::thread m_ManagerThread;
boost::thread m_StatsThread; boost::thread m_StatsThread;

View File

@ -93,7 +93,7 @@ private:
# include <cassert> # include <cassert>
# define ASSERT(expr) assert(expr) # define ASSERT(expr) assert(expr)
#else /* _DEBUG */ #else /* _DEBUG */
# define ASSERT(expr) # define ASSERT(expr) __builtin_unreachable()
#endif /* _DEBUG */ #endif /* _DEBUG */
#endif /* UTILITY_H */ #endif /* UTILITY_H */