base: Limit work queue size.

This commit is contained in:
Gunnar Beutner 2013-09-21 17:53:14 +02:00
parent 3c3101336a
commit 137c726920
2 changed files with 11 additions and 0 deletions

View File

@ -23,6 +23,10 @@
using namespace icinga; using namespace icinga;
WorkQueue::WorkQueue(size_t maxItems)
: m_MaxItems(maxItems)
{ }
WorkQueue::~WorkQueue(void) WorkQueue::~WorkQueue(void)
{ {
Join(); Join();
@ -35,6 +39,10 @@ WorkQueue::~WorkQueue(void)
void WorkQueue::Enqueue(const WorkCallback& item) void WorkQueue::Enqueue(const WorkCallback& item)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
while (m_Items.size() >= m_MaxItems)
m_CV.wait(lock);
m_Items.push_back(item); m_Items.push_back(item);
m_CV.notify_all(); m_CV.notify_all();
@ -66,6 +74,7 @@ void WorkQueue::ExecuteItem(void)
try { try {
WorkCallback wi = m_Items.front(); WorkCallback wi = m_Items.front();
m_Items.pop_front(); m_Items.pop_front();
m_CV.notify_all();
lock.unlock(); lock.unlock();
wi(); wi();

View File

@ -39,6 +39,7 @@ class I2_BASE_API WorkQueue
public: public:
typedef boost::function<void (void)> WorkCallback; typedef boost::function<void (void)> WorkCallback;
WorkQueue(size_t maxItems = 25000);
~WorkQueue(void); ~WorkQueue(void);
void Enqueue(const WorkCallback& item); void Enqueue(const WorkCallback& item);
@ -48,6 +49,7 @@ public:
private: private:
boost::mutex m_Mutex; boost::mutex m_Mutex;
boost::condition_variable m_CV; boost::condition_variable m_CV;
size_t m_MaxItems;
bool m_Executing; bool m_Executing;
std::deque<WorkCallback> m_Items; std::deque<WorkCallback> m_Items;