mirror of https://github.com/Icinga/icinga2.git
parent
27e9cd71e7
commit
43ab2b69a4
|
@ -41,8 +41,8 @@ ThreadPool::ThreadPool(void)
|
|||
for (int i = 0; i < 2; i++)
|
||||
SpawnWorker();
|
||||
|
||||
m_Threads.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
|
||||
m_Threads.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
|
||||
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
|
||||
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
|
||||
}
|
||||
|
||||
ThreadPool::~ThreadPool(void)
|
||||
|
@ -74,7 +74,7 @@ void ThreadPool::Join(void)
|
|||
}
|
||||
}
|
||||
|
||||
m_Threads.join_all();
|
||||
m_ThreadGroup.join_all();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -94,10 +94,10 @@ void ThreadPool::QueueThreadProc(int tid)
|
|||
|
||||
UpdateThreadUtilization(tid, ThreadIdle);
|
||||
|
||||
while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie)
|
||||
while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie)
|
||||
m_WorkCV.wait(lock);
|
||||
|
||||
if (m_ThreadStats[tid].Zombie)
|
||||
if (m_Threads[tid].Zombie)
|
||||
break;
|
||||
|
||||
if (m_WorkItems.empty() && m_Stopped)
|
||||
|
@ -178,7 +178,7 @@ void ThreadPool::QueueThreadProc(int tid)
|
|||
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
UpdateThreadUtilization(tid, ThreadDead);
|
||||
m_ThreadStats[tid].Zombie = false;
|
||||
m_Threads[tid].Zombie = false;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -230,10 +230,10 @@ void ThreadPool::ManagerThreadProc(void)
|
|||
|
||||
alive = 0;
|
||||
|
||||
for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
|
||||
if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) {
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
|
||||
if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) {
|
||||
alive++;
|
||||
utilization += m_ThreadStats[i].Utilization * 100;
|
||||
utilization += m_Threads[i].Utilization * 100;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -295,12 +295,12 @@ void ThreadPool::ManagerThreadProc(void)
|
|||
*/
|
||||
void ThreadPool::SpawnWorker(void)
|
||||
{
|
||||
for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
|
||||
if (m_ThreadStats[i].State == ThreadDead) {
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
|
||||
if (m_Threads[i].State == ThreadDead) {
|
||||
Log(LogDebug, "debug", "Spawning worker thread.");
|
||||
|
||||
m_ThreadStats[i] = ThreadStats(ThreadIdle);
|
||||
m_Threads.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i));
|
||||
m_Threads[i] = WorkerThread(ThreadIdle);
|
||||
m_Threads[i].Thread = m_ThreadGroup.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i));
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -312,11 +312,15 @@ void ThreadPool::SpawnWorker(void)
|
|||
*/
|
||||
void ThreadPool::KillWorker(void)
|
||||
{
|
||||
for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) {
|
||||
if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) {
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) {
|
||||
if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) {
|
||||
Log(LogDebug, "base", "Killing worker thread.");
|
||||
|
||||
m_ThreadStats[i].Zombie = true;
|
||||
m_ThreadGroup.remove_thread(m_Threads[i].Thread);
|
||||
m_Threads[i].Thread->detach();
|
||||
delete m_Threads[i].Thread;
|
||||
|
||||
m_Threads[i].Zombie = true;
|
||||
m_WorkCV.notify_all();
|
||||
|
||||
break;
|
||||
|
@ -339,7 +343,7 @@ void ThreadPool::StatsThreadProc(void)
|
|||
if (m_Stopped)
|
||||
break;
|
||||
|
||||
for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++)
|
||||
for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++)
|
||||
UpdateThreadUtilization(i);
|
||||
}
|
||||
}
|
||||
|
@ -351,7 +355,7 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state)
|
|||
{
|
||||
double utilization;
|
||||
|
||||
switch (m_ThreadStats[tid].State) {
|
||||
switch (m_Threads[tid].State) {
|
||||
case ThreadDead:
|
||||
return;
|
||||
case ThreadIdle:
|
||||
|
@ -365,16 +369,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state)
|
|||
}
|
||||
|
||||
double now = Utility::GetTime();
|
||||
double time = now - m_ThreadStats[tid].LastUpdate;
|
||||
double time = now - m_Threads[tid].LastUpdate;
|
||||
|
||||
const double avg_time = 5.0;
|
||||
|
||||
if (time > avg_time)
|
||||
time = avg_time;
|
||||
|
||||
m_ThreadStats[tid].Utilization = (m_ThreadStats[tid].Utilization * (avg_time - time) + utilization * time) / avg_time;
|
||||
m_ThreadStats[tid].LastUpdate = now;
|
||||
m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time;
|
||||
m_Threads[tid].LastUpdate = now;
|
||||
|
||||
if (state != ThreadUnspecified)
|
||||
m_ThreadStats[tid].State = state;
|
||||
m_Threads[tid].State = state;
|
||||
}
|
||||
|
|
|
@ -57,23 +57,24 @@ private:
|
|||
ThreadBusy
|
||||
};
|
||||
|
||||
struct ThreadStats
|
||||
struct WorkerThread
|
||||
{
|
||||
ThreadState State;
|
||||
bool Zombie;
|
||||
double Utilization;
|
||||
double LastUpdate;
|
||||
boost::thread *Thread;
|
||||
|
||||
ThreadStats(ThreadState state = ThreadDead)
|
||||
: State(state), Zombie(false), Utilization(0), LastUpdate(0)
|
||||
WorkerThread(ThreadState state = ThreadDead)
|
||||
: State(state), Zombie(false), Utilization(0), LastUpdate(0), Thread(NULL)
|
||||
{ }
|
||||
};
|
||||
|
||||
int m_ID;
|
||||
static int m_NextID;
|
||||
|
||||
boost::thread_group m_Threads;
|
||||
ThreadStats m_ThreadStats[4096];
|
||||
boost::thread_group m_ThreadGroup;
|
||||
WorkerThread m_Threads[4096];
|
||||
|
||||
double m_WaitTime;
|
||||
double m_ServiceTime;
|
||||
|
|
Loading…
Reference in New Issue