diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index 5fd3c341e..257a67902 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -41,8 +41,8 @@ ThreadPool::ThreadPool(void) for (int i = 0; i < 2; i++) SpawnWorker(); - m_Threads.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); - m_Threads.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); + m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); + m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); } ThreadPool::~ThreadPool(void) @@ -74,7 +74,7 @@ void ThreadPool::Join(void) } } - m_Threads.join_all(); + m_ThreadGroup.join_all(); } /** @@ -94,10 +94,10 @@ void ThreadPool::QueueThreadProc(int tid) UpdateThreadUtilization(tid, ThreadIdle); - while (m_WorkItems.empty() && !m_Stopped && !m_ThreadStats[tid].Zombie) + while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie) m_WorkCV.wait(lock); - if (m_ThreadStats[tid].Zombie) + if (m_Threads[tid].Zombie) break; if (m_WorkItems.empty() && m_Stopped) @@ -178,7 +178,7 @@ void ThreadPool::QueueThreadProc(int tid) boost::mutex::scoped_lock lock(m_Mutex); UpdateThreadUtilization(tid, ThreadDead); - m_ThreadStats[tid].Zombie = false; + m_Threads[tid].Zombie = false; } /** @@ -230,10 +230,10 @@ void ThreadPool::ManagerThreadProc(void) alive = 0; - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State != ThreadDead && !m_ThreadStats[i].Zombie) { + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) { alive++; - utilization += m_ThreadStats[i].Utilization * 100; + utilization += m_Threads[i].Utilization * 100; } } @@ -295,12 +295,12 @@ void ThreadPool::ManagerThreadProc(void) */ void ThreadPool::SpawnWorker(void) { - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State == ThreadDead) { + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + if (m_Threads[i].State == ThreadDead) { Log(LogDebug, "debug", "Spawning worker thread."); - m_ThreadStats[i] = ThreadStats(ThreadIdle); - m_Threads.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); + m_Threads[i] = WorkerThread(ThreadIdle); + m_Threads[i].Thread = m_ThreadGroup.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); break; } @@ -312,11 +312,15 @@ void ThreadPool::SpawnWorker(void) */ void ThreadPool::KillWorker(void) { - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) { - if (m_ThreadStats[i].State == ThreadIdle && !m_ThreadStats[i].Zombie) { + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { + if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) { Log(LogDebug, "base", "Killing worker thread."); - m_ThreadStats[i].Zombie = true; + m_ThreadGroup.remove_thread(m_Threads[i].Thread); + m_Threads[i].Thread->detach(); + delete m_Threads[i].Thread; + + m_Threads[i].Zombie = true; m_WorkCV.notify_all(); break; @@ -339,7 +343,7 @@ void ThreadPool::StatsThreadProc(void) if (m_Stopped) break; - for (size_t i = 0; i < sizeof(m_ThreadStats) / sizeof(m_ThreadStats[0]); i++) + for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) UpdateThreadUtilization(i); } } @@ -351,7 +355,7 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) { double utilization; - switch (m_ThreadStats[tid].State) { + switch (m_Threads[tid].State) { case ThreadDead: return; case ThreadIdle: @@ -365,16 +369,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) } double now = Utility::GetTime(); - double time = now - m_ThreadStats[tid].LastUpdate; + double time = now - m_Threads[tid].LastUpdate; const double avg_time = 5.0; if (time > avg_time) time = avg_time; - m_ThreadStats[tid].Utilization = (m_ThreadStats[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; - m_ThreadStats[tid].LastUpdate = now; + m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; + m_Threads[tid].LastUpdate = now; if (state != ThreadUnspecified) - m_ThreadStats[tid].State = state; + m_Threads[tid].State = state; } diff --git a/lib/base/threadpool.h b/lib/base/threadpool.h index 657f33b2d..5f7650dcd 100644 --- a/lib/base/threadpool.h +++ b/lib/base/threadpool.h @@ -57,23 +57,24 @@ private: ThreadBusy }; - struct ThreadStats + struct WorkerThread { ThreadState State; bool Zombie; double Utilization; double LastUpdate; + boost::thread *Thread; - ThreadStats(ThreadState state = ThreadDead) - : State(state), Zombie(false), Utilization(0), LastUpdate(0) + WorkerThread(ThreadState state = ThreadDead) + : State(state), Zombie(false), Utilization(0), LastUpdate(0), Thread(NULL) { } }; int m_ID; static int m_NextID; - boost::thread_group m_Threads; - ThreadStats m_ThreadStats[4096]; + boost::thread_group m_ThreadGroup; + WorkerThread m_Threads[4096]; double m_WaitTime; double m_ServiceTime;