From 37179cdf324842091dd21490696c519bede3d4c3 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner <gunnar@beutner.name> Date: Fri, 6 Dec 2013 21:46:50 +0100 Subject: [PATCH] Improve config compiler performance. Refs #5327 --- lib/base/application.cpp | 2 +- lib/base/threadpool.cpp | 237 ++++++++++++++++++++++---------------- lib/base/threadpool.h | 69 ++++++----- lib/base/utility.cpp | 15 ++- lib/base/utility.h | 1 + lib/config/configitem.cpp | 62 +++++++--- lib/config/configitem.h | 4 + 7 files changed, 243 insertions(+), 147 deletions(-) diff --git a/lib/base/application.cpp b/lib/base/application.cpp index a3ee1c532..742384d55 100644 --- a/lib/base/application.cpp +++ b/lib/base/application.cpp @@ -181,7 +181,7 @@ void Application::RunEventLoop(void) const GetTP().Stop(); m_ShuttingDown = false; - GetTP().Join(); + GetTP().Join(true); Timer::Uninitialize(); #endif /* _DEBUG */ diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index f15d07c24..e4cf9c0ce 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -33,79 +33,95 @@ using namespace icinga; int ThreadPool::m_NextID = 1; -ThreadPool::ThreadPool(void) - : m_ID(m_NextID++), m_WaitTime(0), m_ServiceTime(0), - m_TaskCount(0), m_Stopped(false) +ThreadPool::ThreadPool(int max_threads) + : m_ID(m_NextID++), m_Stopped(false), m_MaxThreads(max_threads) { - for (int i = 0; i < 2; i++) - SpawnWorker(); + if (m_MaxThreads != -1 && m_MaxThreads < sizeof(m_Queues) / sizeof(m_Queues[0])) + m_MaxThreads = sizeof(m_Queues) / sizeof(m_Queues[0]); - m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); - m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); + Start(); } ThreadPool::~ThreadPool(void) { Stop(); - Join(); + Join(true); +} + +void ThreadPool::Start(void) +{ + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) + m_Queues[i].SpawnWorker(m_ThreadGroup); + + m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); + m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); } void ThreadPool::Stop(void) { - boost::mutex::scoped_lock lock(m_Mutex); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + boost::mutex::scoped_lock lock(m_Queues[i].Mutex); + m_Queues[i].Stopped = true; + m_Queues[i].CV.notify_all(); + } + + boost::mutex::scoped_lock lock(m_MgmtMutex); m_Stopped = true; - m_WorkCV.notify_all(); m_MgmtCV.notify_all(); } /** * Waits for all worker threads to finish. */ -void ThreadPool::Join(void) +void ThreadPool::Join(bool wait_for_stop) { - { - boost::mutex::scoped_lock lock(m_Mutex); - - while (!m_Stopped || !m_WorkItems.empty()) { - lock.unlock(); - Utility::Sleep(0.5); - lock.lock(); - } + if (wait_for_stop) { + m_ThreadGroup.join_all(); + return; } - m_ThreadGroup.join_all(); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + boost::mutex::scoped_lock lock(m_Queues[i].Mutex); + + while (!m_Queues[i].Items.empty()) + m_Queues[i].CVStarved.wait(lock); + } } /** * Waits for work items and processes them. */ -void ThreadPool::QueueThreadProc(int tid) +void ThreadPool::WorkerThread::ThreadProc(Queue& queue) { std::ostringstream idbuf; - idbuf << "TP #" << m_ID << " W #" << tid; + idbuf << "Q #" << &queue << " W #" << this; Utility::SetThreadName(idbuf.str()); for (;;) { WorkItem wi; { - boost::mutex::scoped_lock lock(m_Mutex); + boost::mutex::scoped_lock lock(queue.Mutex); - UpdateThreadUtilization(tid, ThreadIdle); + UpdateUtilization(ThreadIdle); - while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie) - m_WorkCV.wait(lock); + while (queue.Items.empty() && !queue.Stopped && !Zombie) { + if (queue.Items.empty()) + queue.CVStarved.notify_all(); - if (m_Threads[tid].Zombie) + queue.CV.wait(lock); + } + + if (Zombie) break; - if (m_WorkItems.empty() && m_Stopped) + if (queue.Items.empty() && queue.Stopped) break; - wi = m_WorkItems.front(); - m_WorkItems.pop_front(); + wi = queue.Items.front(); + queue.Items.pop_front(); - UpdateThreadUtilization(tid, ThreadBusy); + UpdateUtilization(ThreadBusy); } double st = Utility::GetTime();; @@ -134,14 +150,11 @@ void ThreadPool::QueueThreadProc(int tid) double latency = st - wi.Timestamp; { - boost::mutex::scoped_lock lock(m_Mutex); + boost::mutex::scoped_lock lock(queue.Mutex); - m_WaitTime += latency; - m_ServiceTime += et - st; - m_TaskCount++; - - if (latency > m_MaxLatency) - m_MaxLatency = latency; + queue.WaitTime += latency; + queue.ServiceTime += et - st; + queue.TaskCount++; } #ifdef _DEBUG @@ -175,9 +188,9 @@ void ThreadPool::QueueThreadProc(int tid) #endif /* _DEBUG */ } - boost::mutex::scoped_lock lock(m_Mutex); - UpdateThreadUtilization(tid, ThreadDead); - m_Threads[tid].Zombie = false; + boost::mutex::scoped_lock lock(queue.Mutex); + UpdateUtilization(ThreadDead); + Zombie = false; } /** @@ -192,14 +205,16 @@ bool ThreadPool::Post(const ThreadPool::WorkFunction& callback) wi.Callback = callback; wi.Timestamp = Utility::GetTime(); - { - boost::mutex::scoped_lock lock(m_Mutex); + Queue& queue = m_Queues[Utility::Random() % (sizeof(m_Queues) / sizeof(m_Queues[0]))]; - if (m_Stopped) + { + boost::mutex::scoped_lock lock(queue.Mutex); + + if (queue.Stopped) return false; - m_WorkItems.push_back(wi); - m_WorkCV.notify_one(); + queue.Items.push_back(wi); + queue.CV.notify_one(); } return true; @@ -212,34 +227,42 @@ void ThreadPool::ManagerThreadProc(void) Utility::SetThreadName(idbuf.str()); for (;;) { - size_t pending, alive; - double avg_latency, max_latency; - double utilization = 0; + size_t total_pending = 0, total_alive = 0; + double total_avg_latency = 0; + double total_utilization = 0; { - boost::mutex::scoped_lock lock(m_Mutex); + boost::mutex::scoped_lock lock(m_MgmtMutex); if (!m_Stopped) m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5)); if (m_Stopped) break; + } - pending = m_WorkItems.size(); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + size_t pending, alive = 0; + double avg_latency; + double utilization = 0; - alive = 0; + Queue& queue = m_Queues[i]; - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) { + boost::mutex::scoped_lock lock(queue.Mutex); + + pending = queue.Items.size(); + + for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) { + if (queue.Threads[i].State != ThreadDead && !queue.Threads[i].Zombie) { alive++; - utilization += m_Threads[i].Utilization * 100; + utilization += queue.Threads[i].Utilization * 100; } } utilization /= alive; - if (m_TaskCount > 0) - avg_latency = m_WaitTime / (m_TaskCount * 1.0); + if (queue.TaskCount > 0) + avg_latency = queue.WaitTime / (queue.TaskCount * 1.0); else avg_latency = 0; @@ -248,9 +271,9 @@ void ThreadPool::ManagerThreadProc(void) int tthreads = wthreads - alive; - /* Don't ever kill the last 8 threads. */ - if (alive + tthreads < 8) - tthreads = 8 - alive; + /* Don't ever kill the last threads. */ + if (alive + tthreads < 2) + tthreads = 2 - alive; /* Don't kill more than 8 threads at once. */ if (tthreads < -8) @@ -258,33 +281,37 @@ void ThreadPool::ManagerThreadProc(void) /* Spawn more workers if there are outstanding work items. */ if (tthreads > 0 && pending > 0) - tthreads = (Utility::GetTime() - Application::GetStartTime() < 300) ? 128 : 8; + tthreads = 8; + + if (m_MaxThreads != -1 && (alive + tthreads) * (sizeof(m_Queues) / sizeof(m_Queues[0])) > m_MaxThreads) + tthreads = m_MaxThreads / (sizeof(m_Queues) / sizeof(m_Queues[0])) - alive; std::ostringstream msgbuf; msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads; Log(LogDebug, "base", msgbuf.str()); for (int i = 0; i < -tthreads; i++) - KillWorker(); + queue.KillWorker(m_ThreadGroup); for (int i = 0; i < tthreads; i++) - SpawnWorker(); + queue.SpawnWorker(m_ThreadGroup); } - m_WaitTime = 0; - m_ServiceTime = 0; - m_TaskCount = 0; + queue.WaitTime = 0; + queue.ServiceTime = 0; + queue.TaskCount = 0; - max_latency = m_MaxLatency; - m_MaxLatency = 0; + total_pending += pending; + total_alive += alive; + total_avg_latency += avg_latency; + total_utilization += utilization; } std::ostringstream msgbuf; - msgbuf << "Pool #" << m_ID << ": Pending tasks: " << pending << "; Average latency: " - << (long)(avg_latency * 1000) << "ms" - << "; Max latency: " << (long)(max_latency * 1000) << "ms" - << "; Threads: " << alive - << "; Pool utilization: " << utilization << "%"; + msgbuf << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: " + << (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms" + << "; Threads: " << total_alive + << "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%"; Log(LogInformation, "base", msgbuf.str()); } } @@ -292,14 +319,14 @@ void ThreadPool::ManagerThreadProc(void) /** * Note: Caller must hold m_Mutex */ -void ThreadPool::SpawnWorker(void) +void ThreadPool::Queue::SpawnWorker(boost::thread_group& group) { - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State == ThreadDead) { + for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) { + if (Threads[i].State == ThreadDead) { Log(LogDebug, "debug", "Spawning worker thread."); - m_Threads[i] = WorkerThread(ThreadIdle); - m_Threads[i].Thread = m_ThreadGroup.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); + Threads[i] = WorkerThread(ThreadIdle); + Threads[i].Thread = group.create_thread(boost::bind(&ThreadPool::WorkerThread::ThreadProc, boost::ref(Threads[i]), boost::ref(*this))); break; } @@ -307,20 +334,20 @@ void ThreadPool::SpawnWorker(void) } /** - * Note: Caller must hold m_Mutex. + * Note: Caller must hold Mutex. */ -void ThreadPool::KillWorker(void) +void ThreadPool::Queue::KillWorker(boost::thread_group& group) { - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) { + for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) { + if (Threads[i].State == ThreadIdle && !Threads[i].Zombie) { Log(LogDebug, "base", "Killing worker thread."); - m_ThreadGroup.remove_thread(m_Threads[i].Thread); - m_Threads[i].Thread->detach(); - delete m_Threads[i].Thread; + group.remove_thread(Threads[i].Thread); + Threads[i].Thread->detach(); + delete Threads[i].Thread; - m_Threads[i].Zombie = true; - m_WorkCV.notify_all(); + Threads[i].Zombie = true; + CV.notify_all(); break; } @@ -334,27 +361,35 @@ void ThreadPool::StatsThreadProc(void) Utility::SetThreadName(idbuf.str()); for (;;) { - boost::mutex::scoped_lock lock(m_Mutex); + { + boost::mutex::scoped_lock lock(m_MgmtMutex); - if (!m_Stopped) - m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250)); + if (!m_Stopped) + m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250)); - if (m_Stopped) - break; + if (m_Stopped) + break; + } - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) - UpdateThreadUtilization(i); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + Queue& queue = m_Queues[i]; + + boost::mutex::scoped_lock lock(queue.Mutex); + + for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) + queue.Threads[i].UpdateUtilization(); + } } } /** - * Note: Caller must hold m_Mutex. + * Note: Caller must hold queue Mutex. */ -void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) +void ThreadPool::WorkerThread::UpdateUtilization(ThreadState state) { double utilization; - switch (m_Threads[tid].State) { + switch (State) { case ThreadDead: return; case ThreadIdle: @@ -368,16 +403,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) } double now = Utility::GetTime(); - double time = now - m_Threads[tid].LastUpdate; + double time = now - LastUpdate; const double avg_time = 5.0; if (time > avg_time) time = avg_time; - m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; - m_Threads[tid].LastUpdate = now; + Utilization = (Utilization * (avg_time - time) + utilization * time) / avg_time; + LastUpdate = now; if (state != ThreadUnspecified) - m_Threads[tid].State = state; + State = state; } diff --git a/lib/base/threadpool.h b/lib/base/threadpool.h index 5f7650dcd..848d05a5f 100644 --- a/lib/base/threadpool.h +++ b/lib/base/threadpool.h @@ -40,11 +40,12 @@ class I2_BASE_API ThreadPool public: typedef boost::function<void ()> WorkFunction; - ThreadPool(void); + ThreadPool(int max_threads = -1); ~ThreadPool(void); + void Start(void); void Stop(void); - void Join(void); + void Join(bool wait_for_stop = false); bool Post(const WorkFunction& callback); @@ -57,6 +58,14 @@ private: ThreadBusy }; + struct WorkItem + { + WorkFunction Callback; + double Timestamp; + }; + + struct Queue; + struct WorkerThread { ThreadState State; @@ -68,43 +77,51 @@ private: WorkerThread(ThreadState state = ThreadDead) : State(state), Zombie(false), Utilization(0), LastUpdate(0), Thread(NULL) { } + + void UpdateUtilization(ThreadState state = ThreadUnspecified); + + void ThreadProc(Queue& queue); + }; + + struct Queue + { + boost::mutex Mutex; + boost::condition_variable CV; + boost::condition_variable CVStarved; + + std::deque<WorkItem> Items; + + double WaitTime; + double ServiceTime; + int TaskCount; + + bool Stopped; + + WorkerThread Threads[256]; + + Queue(void) + : WaitTime(0), ServiceTime(0), TaskCount(0), Stopped(false) + { } + + void SpawnWorker(boost::thread_group& group); + void KillWorker(boost::thread_group& group); }; int m_ID; static int m_NextID; + int m_MaxThreads; + boost::thread_group m_ThreadGroup; - WorkerThread m_Threads[4096]; - double m_WaitTime; - double m_ServiceTime; - int m_TaskCount; - - double m_MaxLatency; - - boost::mutex m_Mutex; - boost::condition_variable m_WorkCV; + boost::mutex m_MgmtMutex; boost::condition_variable m_MgmtCV; - bool m_Stopped; - struct WorkItem - { - WorkFunction Callback; - double Timestamp; - }; + Queue m_Queues[16]; - - std::deque<WorkItem> m_WorkItems; - - void QueueThreadProc(int tid); void ManagerThreadProc(void); void StatsThreadProc(void); - - void SpawnWorker(void); - void KillWorker(void); - - void UpdateThreadUtilization(int tid, ThreadState state = ThreadUnspecified); }; } diff --git a/lib/base/utility.cpp b/lib/base/utility.cpp index 9ae1af459..f60e67718 100644 --- a/lib/base/utility.cpp +++ b/lib/base/utility.cpp @@ -40,6 +40,7 @@ using namespace icinga; boost::thread_specific_ptr<String> Utility::m_ThreadName; +boost::thread_specific_ptr<unsigned int> Utility::m_RandSeed; /** * Demangles a symbol name. @@ -739,10 +740,18 @@ int Utility::CompareVersion(const String& v1, const String& v2) int Utility::Random(void) { - static boost::mutex mtx; - boost::mutex::scoped_lock lock(mtx); - +#ifdef _WIN32 return rand(); +#else /* _WIN32 */ + unsigned int *seed = m_RandSeed.get(); + + if (!seed) { + seed = new unsigned int(Utility::GetTime()); + m_RandSeed.reset(seed); + } + + return rand_r(seed); +#endif /* _WIN32 */ } tm Utility::LocalTime(time_t ts) diff --git a/lib/base/utility.h b/lib/base/utility.h index 07e56edec..6fef1d608 100644 --- a/lib/base/utility.h +++ b/lib/base/utility.h @@ -114,6 +114,7 @@ private: Utility(void); static boost::thread_specific_ptr<String> m_ThreadName; + static boost::thread_specific_ptr<unsigned int> m_RandSeed; }; } diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index 26720749d..4dc6eae7e 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -178,21 +178,24 @@ DynamicObject::Ptr ConfigItem::Commit(void) DynamicObject::Ptr dobj = dtype->CreateObject(properties); dobj->Register(); + m_Object = dobj; + return dobj; } +DynamicObject::Ptr ConfigItem::GetObject(void) const +{ + return m_Object; +} + /** * Registers the configuration item. */ void ConfigItem::Register(void) { - ASSERT(!OwnsLock()); + boost::mutex::scoped_lock lock(m_Mutex); - { - ObjectLock olock(this); - - m_Items[std::make_pair(m_Type, m_Name)] = GetSelf(); - } + m_Items[std::make_pair(m_Type, m_Name)] = GetSelf(); } /** @@ -244,34 +247,49 @@ bool ConfigItem::ActivateItems(bool validateOnly) Log(LogInformation, "config", "Validating config items (step 1)..."); + ThreadPool tp(32); + BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - kv.second->ValidateItem(); + tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second)); } + + tp.Join(); if (ConfigCompilerContext::GetInstance()->HasErrors()) return false; - Log(LogInformation, "config", "Activating config items"); - - std::vector<DynamicObject::Ptr> objects; + Log(LogInformation, "config", "Comitting config items"); BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - DynamicObject::Ptr object = kv.second->Commit(); + tp.Post(boost::bind(&ConfigItem::Commit, kv.second)); + } + + tp.Join(); + + std::vector<DynamicObject::Ptr> objects; + BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { + DynamicObject::Ptr object = kv.second->GetObject(); if (object) objects.push_back(object); } - + + Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items"); + BOOST_FOREACH(const DynamicObject::Ptr& object, objects) { - object->OnConfigLoaded(); + tp.Post(boost::bind(&DynamicObject::OnConfigLoaded, object)); } + + tp.Join(); Log(LogInformation, "config", "Validating config items (step 2)..."); BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - kv.second->ValidateItem(); + tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second)); } + tp.Join(); + if (ConfigCompilerContext::GetInstance()->HasErrors()) return false; @@ -281,6 +299,8 @@ bool ConfigItem::ActivateItems(bool validateOnly) /* restore the previous program state */ DynamicObject::RestoreObjects(Application::GetStatePath()); + Log(LogInformation, "config", "Triggering Start signal for config items"); + BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { if (object->IsActive()) @@ -289,11 +309,21 @@ bool ConfigItem::ActivateItems(bool validateOnly) #ifdef _DEBUG Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'"); #endif /* _DEBUG */ - object->Start(); - + tp.Post(boost::bind(&DynamicObject::Start, object)); + } + } + + tp.Join(); + +#ifdef _DEBUG + BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { + BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { ASSERT(object->IsActive()); } } +#endif /* _DEBUG */ + + Log(LogInformation, "config", "Activated all objects."); return true; } diff --git a/lib/config/configitem.h b/lib/config/configitem.h index 98a65fdf2..b16353f67 100644 --- a/lib/config/configitem.h +++ b/lib/config/configitem.h @@ -61,6 +61,8 @@ public: void ValidateItem(void); + DynamicObject::Ptr GetObject(void) const; + static bool ActivateItems(bool validateOnly); static void DiscardItems(void); @@ -79,6 +81,8 @@ private: ExpressionList::Ptr m_LinkedExpressionList; Dictionary::Ptr m_Properties; + + DynamicObject::Ptr m_Object; static boost::mutex m_Mutex;