Make thread pool utilization calculation more accurate.

This commit is contained in:
Gunnar Beutner 2013-03-28 12:14:39 +00:00
parent 2e051ad9e2
commit b0c8f3f626
2 changed files with 71 additions and 25 deletions

View File

@ -32,14 +32,14 @@ using namespace icinga;
ThreadPool::ThreadPool(void) ThreadPool::ThreadPool(void)
: m_Stopped(false), m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0) : m_Stopped(false), m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0)
{ {
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++)
m_ThreadStates[i] = ThreadDead;
for (int i = 0; i < 2; i++) for (int i = 0; i < 2; i++)
SpawnWorker(); SpawnWorker();
boost::thread managerThread(boost::bind(&ThreadPool::ManagerThreadProc, this)); boost::thread managerThread(boost::bind(&ThreadPool::ManagerThreadProc, this));
managerThread.detach(); managerThread.detach();
boost::thread statsThread(boost::bind(&ThreadPool::StatsThreadProc, this));
statsThread.detach();
} }
ThreadPool::~ThreadPool(void) ThreadPool::~ThreadPool(void)
@ -81,13 +81,10 @@ void ThreadPool::QueueThreadProc(int tid)
for (;;) { for (;;) {
WorkItem wi; WorkItem wi;
double ws = Utility::GetTime();
double st;
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
m_ThreadStates[tid] = ThreadIdle; UpdateThreadUtilization(tid, ThreadIdle);
while (m_WorkItems.empty() && !m_Stopped && m_ThreadDeaths == 0) while (m_WorkItems.empty() && !m_Stopped && m_ThreadDeaths == 0)
m_CV.wait(lock); m_CV.wait(lock);
@ -103,11 +100,11 @@ void ThreadPool::QueueThreadProc(int tid)
wi = m_WorkItems.front(); wi = m_WorkItems.front();
m_WorkItems.pop_front(); m_WorkItems.pop_front();
m_ThreadStates[tid] = ThreadBusy; UpdateThreadUtilization(tid, ThreadBusy);
st = Utility::GetTime();
UpdateThreadUtilization(tid, st - ws, 0);
} }
double st = Utility::GetTime();;
#ifdef _DEBUG #ifdef _DEBUG
# ifdef RUSAGE_THREAD # ifdef RUSAGE_THREAD
struct rusage usage_start, usage_end; struct rusage usage_start, usage_end;
@ -140,8 +137,6 @@ void ThreadPool::QueueThreadProc(int tid)
if (latency > m_MaxLatency) if (latency > m_MaxLatency)
m_MaxLatency = latency; m_MaxLatency = latency;
UpdateThreadUtilization(tid, et - st, 1);
} }
#ifdef _DEBUG #ifdef _DEBUG
@ -175,7 +170,7 @@ void ThreadPool::QueueThreadProc(int tid)
#endif /* _DEBUG */ #endif /* _DEBUG */
} }
m_ThreadStates[tid] = ThreadDead; UpdateThreadUtilization(tid, ThreadDead);
} }
/** /**
@ -219,10 +214,10 @@ void ThreadPool::ManagerThreadProc(void)
alive = 0; alive = 0;
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) { for (int i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
if (m_ThreadStates[i] != ThreadDead) { if (m_ThreadStats[i].State != ThreadDead) {
alive++; alive++;
utilization += m_ThreadUtilization[i] * 100; utilization += m_ThreadStats[i].Utilization * 100;
} }
} }
@ -274,12 +269,11 @@ void ThreadPool::ManagerThreadProc(void)
*/ */
void ThreadPool::SpawnWorker(void) void ThreadPool::SpawnWorker(void)
{ {
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) { for (int i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
if (m_ThreadStates[i] == ThreadDead) { if (m_ThreadStats[i].State == ThreadDead) {
Log(LogDebug, "debug", "Spawning worker thread."); Log(LogDebug, "debug", "Spawning worker thread.");
m_ThreadStates[i] = ThreadIdle; m_ThreadStats[i] = ThreadStats(ThreadIdle);
m_ThreadUtilization[i] = 0;
boost::thread worker(boost::bind(&ThreadPool::QueueThreadProc, this, i)); boost::thread worker(boost::bind(&ThreadPool::QueueThreadProc, this, i));
worker.detach(); worker.detach();
@ -298,15 +292,55 @@ void ThreadPool::KillWorker(void)
m_ThreadDeaths++; m_ThreadDeaths++;
} }
void ThreadPool::StatsThreadProc(void)
{
std::ostringstream idbuf;
idbuf << "TP " << this << " Stats";
Utility::SetThreadName(idbuf.str());
for (;;) {
Utility::Sleep(0.25);
{
boost::mutex::scoped_lock lock(m_Mutex);
for (int i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++)
UpdateThreadUtilization(i);
}
}
}
/** /**
* Note: Caller must hold m_Mutex. * Note: Caller must hold m_Mutex.
*/ */
void ThreadPool::UpdateThreadUtilization(int tid, double time, double utilization) void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state)
{ {
double utilization;
switch (m_ThreadStats[tid].State) {
case ThreadDead:
return;
case ThreadIdle:
utilization = 0;
break;
case ThreadBusy:
utilization = 1;
break;
default:
ASSERT(0);
}
double now = Utility::GetTime();
double time = now - m_ThreadStats[tid].LastUpdate;
const double avg_time = 5.0; const double avg_time = 5.0;
if (time > avg_time) if (time > avg_time)
time = avg_time; time = avg_time;
m_ThreadUtilization[tid] = (m_ThreadUtilization[tid] * (avg_time - time) + utilization * time) / avg_time; m_ThreadStats[tid].Utilization = (m_ThreadStats[tid].Utilization * (avg_time - time) + utilization * time) / avg_time;
m_ThreadStats[tid].LastUpdate = now;
if (state != ThreadUnspecified)
m_ThreadStats[tid].State = state;
} }

View File

@ -51,13 +51,24 @@ public:
private: private:
enum ThreadState enum ThreadState
{ {
ThreadUnspecified,
ThreadDead, ThreadDead,
ThreadIdle, ThreadIdle,
ThreadBusy ThreadBusy
}; };
ThreadState m_ThreadStates[512]; struct ThreadStats
double m_ThreadUtilization[512]; {
ThreadState State;
double Utilization;
double LastUpdate;
ThreadStats(ThreadState state = ThreadDead)
: State(state), Utilization(0), LastUpdate(0)
{ }
};
ThreadStats m_ThreadStats[512];
int m_ThreadDeaths; int m_ThreadDeaths;
double m_WaitTime; double m_WaitTime;
@ -82,11 +93,12 @@ private:
void QueueThreadProc(int tid); void QueueThreadProc(int tid);
void ManagerThreadProc(void); void ManagerThreadProc(void);
void StatsThreadProc(void);
void SpawnWorker(void); void SpawnWorker(void);
void KillWorker(void); void KillWorker(void);
void UpdateThreadUtilization(int tid, double time, double utilization); void UpdateThreadUtilization(int tid, ThreadState state = ThreadUnspecified);
}; };
} }