Implement ParallelWorkQueue class.

Refs #5327
This commit is contained in:
Gunnar Beutner 2013-12-10 21:44:38 +01:00 committed by Gunnar Beutner
parent b5792a0b81
commit b4f2f06b88
3 changed files with 78 additions and 35 deletions

View File

@ -29,8 +29,8 @@ using namespace icinga;
int WorkQueue::m_NextID = 1; int WorkQueue::m_NextID = 1;
WorkQueue::WorkQueue(size_t maxItems) WorkQueue::WorkQueue(size_t maxItems)
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), : m_ID(m_NextID++), m_MaxItems(maxItems), m_Stopped(false),
m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
{ {
m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
@ -42,9 +42,7 @@ WorkQueue::WorkQueue(size_t maxItems)
WorkQueue::~WorkQueue(void) WorkQueue::~WorkQueue(void)
{ {
Join(); Join(true);
ASSERT(m_Stopped);
} }
/** /**
@ -61,8 +59,6 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
ASSERT(!m_Stopped);
if (!wq_thread) { if (!wq_thread) {
while (m_Items.size() >= m_MaxItems) while (m_Items.size() >= m_MaxItems)
m_CVFull.wait(lock); m_CVFull.wait(lock);
@ -72,18 +68,24 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
if (wq_thread) if (wq_thread)
ProcessItems(lock, true); ProcessItems(lock, true);
else else if (m_Items.size() == 1)
m_CVEmpty.notify_all(); m_CVEmpty.notify_all();
} }
void WorkQueue::Join(void) void WorkQueue::Join(bool stop)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
m_Joined = true;
m_CVEmpty.notify_all();
while (!m_Stopped) while (!m_Items.empty())
m_CVFull.wait(lock); m_CVStarved.wait(lock);
if (stop) {
m_Stopped = true;
m_CVEmpty.notify_all();
lock.unlock();
m_Thread.join();
}
} }
boost::thread::id WorkQueue::GetThreadId(void) const boost::thread::id WorkQueue::GetThreadId(void) const
@ -118,14 +120,11 @@ void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
if (interleaved && !wi.AllowInterleaved) if (interleaved && !wi.AllowInterleaved)
return; return;
m_Items.pop_front();
m_CVFull.notify_one();
lock.unlock(); lock.unlock();
try { try {
wi.Callback(); wi.Callback();
} catch (const std::exception& ex) { } catch (const std::exception&) {
lock.lock(); lock.lock();
ExceptionCallback callback = m_ExceptionCallback; ExceptionCallback callback = m_ExceptionCallback;
@ -136,7 +135,14 @@ void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
} }
lock.lock(); lock.lock();
m_Items.pop_front();
if (m_Items.size() + 1 == m_MaxItems)
m_CVFull.notify_one();
} }
m_CVStarved.notify_all();
} }
void WorkQueue::WorkerThreadProc(void) void WorkQueue::WorkerThreadProc(void)
@ -148,15 +154,36 @@ void WorkQueue::WorkerThreadProc(void)
Utility::SetThreadName(idbuf.str()); Utility::SetThreadName(idbuf.str());
for (;;) { for (;;) {
while (m_Items.empty() && !m_Joined) while (m_Items.empty() && !m_Stopped)
m_CVEmpty.wait(lock); m_CVEmpty.wait(lock);
if (m_Joined) if (m_Stopped)
break; break;
ProcessItems(lock, false); ProcessItems(lock, false);
} }
}
m_Stopped = true;
m_CVFull.notify_all();
ParallelWorkQueue::ParallelWorkQueue(void)
: m_QueueCount(boost::thread::hardware_concurrency()),
m_Queues(new WorkQueue[m_QueueCount]),
m_Index(0)
{ }
ParallelWorkQueue::~ParallelWorkQueue(void)
{
delete[] m_Queues;
}
void ParallelWorkQueue::Enqueue(const boost::function<void(void)>& callback)
{
m_Index++;
m_Queues[m_Index % m_QueueCount].Enqueue(callback);
}
void ParallelWorkQueue::Join(void)
{
for (unsigned int i = 0; i < m_QueueCount; i++)
m_Queues[i].Join();
} }

View File

@ -55,7 +55,7 @@ public:
~WorkQueue(void); ~WorkQueue(void);
void Enqueue(const WorkCallback& callback, bool allowInterleaved = false); void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
void Join(void); void Join(bool stop = false);
boost::thread::id GetThreadId(void) const; boost::thread::id GetThreadId(void) const;
@ -68,9 +68,9 @@ private:
boost::mutex m_Mutex; boost::mutex m_Mutex;
boost::condition_variable m_CVEmpty; boost::condition_variable m_CVEmpty;
boost::condition_variable m_CVFull; boost::condition_variable m_CVFull;
boost::condition_variable m_CVStarved;
boost::thread m_Thread; boost::thread m_Thread;
size_t m_MaxItems; size_t m_MaxItems;
bool m_Joined;
bool m_Stopped; bool m_Stopped;
std::deque<WorkItem> m_Items; std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback; ExceptionCallback m_ExceptionCallback;
@ -83,6 +83,21 @@ private:
static void DefaultExceptionCallback(boost::exception_ptr exp); static void DefaultExceptionCallback(boost::exception_ptr exp);
}; };
class I2_BASE_API ParallelWorkQueue
{
public:
ParallelWorkQueue(void);
~ParallelWorkQueue(void);
void Enqueue(const boost::function<void(void)>& callback);
void Join(void);
private:
unsigned int m_QueueCount;
WorkQueue *m_Queues;
unsigned int m_Index;
};
} }
#endif /* WORKQUEUE_H */ #endif /* WORKQUEUE_H */

View File

@ -24,6 +24,7 @@
#include "base/objectlock.h" #include "base/objectlock.h"
#include "base/logger_fwd.h" #include "base/logger_fwd.h"
#include "base/debug.h" #include "base/debug.h"
#include "base/workqueue.h"
#include <sstream> #include <sstream>
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
@ -258,13 +259,13 @@ bool ConfigItem::ActivateItems(bool validateOnly)
Log(LogInformation, "config", "Validating config items (step 1)..."); Log(LogInformation, "config", "Validating config items (step 1)...");
ThreadPool tp(32); ParallelWorkQueue upq;
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second)); upq.Enqueue(boost::bind(&ConfigItem::ValidateItem, kv.second));
} }
tp.Join(); upq.Join();
if (ConfigCompilerContext::GetInstance()->HasErrors()) if (ConfigCompilerContext::GetInstance()->HasErrors())
return false; return false;
@ -272,10 +273,10 @@ bool ConfigItem::ActivateItems(bool validateOnly)
Log(LogInformation, "config", "Comitting config items"); Log(LogInformation, "config", "Comitting config items");
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
tp.Post(boost::bind(&ConfigItem::Commit, kv.second)); upq.Enqueue(boost::bind(&ConfigItem::Commit, kv.second));
} }
tp.Join(); upq.Join();
std::vector<DynamicObject::Ptr> objects; std::vector<DynamicObject::Ptr> objects;
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
@ -288,18 +289,18 @@ bool ConfigItem::ActivateItems(bool validateOnly)
Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items"); Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items");
BOOST_FOREACH(const DynamicObject::Ptr& object, objects) { BOOST_FOREACH(const DynamicObject::Ptr& object, objects) {
tp.Post(boost::bind(&DynamicObject::OnConfigLoaded, object)); upq.Enqueue(boost::bind(&DynamicObject::OnConfigLoaded, object));
} }
tp.Join(); upq.Join();
Log(LogInformation, "config", "Validating config items (step 2)..."); Log(LogInformation, "config", "Validating config items (step 2)...");
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
tp.Post(boost::bind(&ConfigItem::ValidateItem, kv.second)); upq.Enqueue(boost::bind(&ConfigItem::ValidateItem, kv.second));
} }
tp.Join(); upq.Join();
if (ConfigCompilerContext::GetInstance()->HasErrors()) if (ConfigCompilerContext::GetInstance()->HasErrors())
return false; return false;
@ -320,11 +321,11 @@ bool ConfigItem::ActivateItems(bool validateOnly)
#ifdef _DEBUG #ifdef _DEBUG
Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'"); Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'");
#endif /* _DEBUG */ #endif /* _DEBUG */
tp.Post(boost::bind(&DynamicObject::Start, object)); upq.Enqueue(boost::bind(&DynamicObject::Start, object));
} }
} }
tp.Join(); upq.Join();
#ifdef _DEBUG #ifdef _DEBUG
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {