From b4f2f06b88a485e05a1cb81e0a2f858f7c5f8d08 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Tue, 10 Dec 2013 21:44:38 +0100 Subject: [PATCH] Implement ParallelWorkQueue class. Refs #5327 --- lib/base/workqueue.cpp | 71 +++++++++++++++++++++++++++------------ lib/base/workqueue.h | 19 +++++++++-- lib/config/configitem.cpp | 23 +++++++------ 3 files changed, 78 insertions(+), 35 deletions(-) diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 1a9a0a6b2..6ed2d6c3a 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -29,8 +29,8 @@ using namespace icinga; int WorkQueue::m_NextID = 1; WorkQueue::WorkQueue(size_t maxItems) - : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), - m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) + : m_ID(m_NextID++), m_MaxItems(maxItems), m_Stopped(false), + m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) { m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); @@ -42,9 +42,7 @@ WorkQueue::WorkQueue(size_t maxItems) WorkQueue::~WorkQueue(void) { - Join(); - - ASSERT(m_Stopped); + Join(true); } /** @@ -61,8 +59,6 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved) boost::mutex::scoped_lock lock(m_Mutex); - ASSERT(!m_Stopped); - if (!wq_thread) { while (m_Items.size() >= m_MaxItems) m_CVFull.wait(lock); @@ -72,18 +68,24 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved) if (wq_thread) ProcessItems(lock, true); - else + else if (m_Items.size() == 1) m_CVEmpty.notify_all(); } -void WorkQueue::Join(void) +void WorkQueue::Join(bool stop) { boost::mutex::scoped_lock lock(m_Mutex); - m_Joined = true; - m_CVEmpty.notify_all(); - while (!m_Stopped) - m_CVFull.wait(lock); + while (!m_Items.empty()) + m_CVStarved.wait(lock); + + if (stop) { + m_Stopped = true; + m_CVEmpty.notify_all(); + lock.unlock(); + + m_Thread.join(); + } } boost::thread::id WorkQueue::GetThreadId(void) const @@ -118,14 +120,11 @@ void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved) if (interleaved && !wi.AllowInterleaved) return; - m_Items.pop_front(); - m_CVFull.notify_one(); - lock.unlock(); try { wi.Callback(); - } catch (const std::exception& ex) { + } catch (const std::exception&) { lock.lock(); ExceptionCallback callback = m_ExceptionCallback; @@ -136,7 +135,14 @@ void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved) } lock.lock(); + + m_Items.pop_front(); + + if (m_Items.size() + 1 == m_MaxItems) + m_CVFull.notify_one(); } + + m_CVStarved.notify_all(); } void WorkQueue::WorkerThreadProc(void) @@ -148,15 +154,36 @@ void WorkQueue::WorkerThreadProc(void) Utility::SetThreadName(idbuf.str()); for (;;) { - while (m_Items.empty() && !m_Joined) + while (m_Items.empty() && !m_Stopped) m_CVEmpty.wait(lock); - if (m_Joined) + if (m_Stopped) break; ProcessItems(lock, false); } - - m_Stopped = true; - m_CVFull.notify_all(); +} + + +ParallelWorkQueue::ParallelWorkQueue(void) + : m_QueueCount(boost::thread::hardware_concurrency()), + m_Queues(new WorkQueue[m_QueueCount]), + m_Index(0) +{ } + +ParallelWorkQueue::~ParallelWorkQueue(void) +{ + delete[] m_Queues; +} + +void ParallelWorkQueue::Enqueue(const boost::function& callback) +{ + m_Index++; + m_Queues[m_Index % m_QueueCount].Enqueue(callback); +} + +void ParallelWorkQueue::Join(void) +{ + for (unsigned int i = 0; i < m_QueueCount; i++) + m_Queues[i].Join(); } diff --git a/lib/base/workqueue.h b/lib/base/workqueue.h index 56b2c8df6..4e92d2b27 100644 --- a/lib/base/workqueue.h +++ b/lib/base/workqueue.h @@ -55,7 +55,7 @@ public: ~WorkQueue(void); void Enqueue(const WorkCallback& callback, bool allowInterleaved = false); - void Join(void); + void Join(bool stop = false); boost::thread::id GetThreadId(void) const; @@ -68,9 +68,9 @@ private: boost::mutex m_Mutex; boost::condition_variable m_CVEmpty; boost::condition_variable m_CVFull; + boost::condition_variable m_CVStarved; boost::thread m_Thread; size_t m_MaxItems; - bool m_Joined; bool m_Stopped; std::deque m_Items; ExceptionCallback m_ExceptionCallback; @@ -83,6 +83,21 @@ private: static void DefaultExceptionCallback(boost::exception_ptr exp); }; +class I2_BASE_API ParallelWorkQueue +{ +public: + ParallelWorkQueue(void); + ~ParallelWorkQueue(void); + + void Enqueue(const boost::function& callback); + void Join(void); + +private: + unsigned int m_QueueCount; + WorkQueue *m_Queues; + unsigned int m_Index; +}; + } #endif /* WORKQUEUE_H */ diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index e9756141d..7b03b805a 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -24,6 +24,7 @@ #include "base/objectlock.h" #include "base/logger_fwd.h" #include "base/debug.h" +#include "base/workqueue.h" #include #include @@ -258,13 +259,13 @@ bool ConfigItem::ActivateItems(bool validateOnly) Log(LogInformation, "config", "Validating config items (step 1)..."); - ThreadPool tp(32); + ParallelWorkQueue upq; BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second)); + upq.Enqueue(boost::bind(&ConfigItem::ValidateItem, kv.second)); } - tp.Join(); + upq.Join(); if (ConfigCompilerContext::GetInstance()->HasErrors()) return false; @@ -272,10 +273,10 @@ bool ConfigItem::ActivateItems(bool validateOnly) Log(LogInformation, "config", "Comitting config items"); BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - tp.Post(boost::bind(&ConfigItem::Commit, kv.second)); + upq.Enqueue(boost::bind(&ConfigItem::Commit, kv.second)); } - tp.Join(); + upq.Join(); std::vector objects; BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { @@ -288,18 +289,18 @@ bool ConfigItem::ActivateItems(bool validateOnly) Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items"); BOOST_FOREACH(const DynamicObject::Ptr& object, objects) { - tp.Post(boost::bind(&DynamicObject::OnConfigLoaded, object)); + upq.Enqueue(boost::bind(&DynamicObject::OnConfigLoaded, object)); } - tp.Join(); + upq.Join(); Log(LogInformation, "config", "Validating config items (step 2)..."); BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second)); + upq.Enqueue(boost::bind(&ConfigItem::ValidateItem, kv.second)); } - tp.Join(); + upq.Join(); if (ConfigCompilerContext::GetInstance()->HasErrors()) return false; @@ -320,11 +321,11 @@ bool ConfigItem::ActivateItems(bool validateOnly) #ifdef _DEBUG Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'"); #endif /* _DEBUG */ - tp.Post(boost::bind(&DynamicObject::Start, object)); + upq.Enqueue(boost::bind(&DynamicObject::Start, object)); } } - tp.Join(); + upq.Join(); #ifdef _DEBUG BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {