diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 67e51c397..f84b753c8 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -30,10 +30,14 @@ int WorkQueue::m_NextID = 1; WorkQueue::WorkQueue(size_t maxItems) : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), - m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback), - m_LastStatus(0) + m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) { m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); + + m_StatusTimer = make_shared(); + m_StatusTimer->SetInterval(10); + m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this)); + m_StatusTimer->Start(); } WorkQueue::~WorkQueue(void) @@ -49,37 +53,37 @@ WorkQueue::~WorkQueue(void) */ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved) { + WorkItem item; + item.Callback = callback; + item.AllowInterleaved = allowInterleaved; + + bool wq_thread = (boost::this_thread::get_id() == GetThreadId()); + boost::mutex::scoped_lock lock(m_Mutex); ASSERT(!m_Stopped); - bool wq_thread = (boost::this_thread::get_id() == GetThreadId()); - if (!wq_thread) { while (m_Items.size() >= m_MaxItems) - m_CV.wait(lock); + m_CVFull.wait(lock); } - WorkItem item; - item.Callback = callback; - item.AllowInterleaved = allowInterleaved; - m_Items.push_back(item); if (wq_thread) ProcessItems(lock, true); else - m_CV.notify_all(); + m_CVEmpty.notify_one(); } void WorkQueue::Join(void) { boost::mutex::scoped_lock lock(m_Mutex); m_Joined = true; - m_CV.notify_all(); + m_CVEmpty.notify_all(); while (!m_Stopped) - m_CV.wait(lock); + m_CVFull.wait(lock); } boost::thread::id WorkQueue::GetThreadId(void) const @@ -99,26 +103,27 @@ void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp) throw; } +void WorkQueue::StatusTimerHandler(void) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size())); +} + void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved) { while (!m_Items.empty()) { + WorkItem wi = m_Items.front(); + + if (interleaved && !wi.AllowInterleaved) + return; + + m_Items.pop_front(); + m_CVFull.notify_all(); + + lock.unlock(); + try { - WorkItem wi = m_Items.front(); - - if (interleaved && !wi.AllowInterleaved) - return; - - m_Items.pop_front(); - m_CV.notify_all(); - - double now = Utility::GetTime(); - - if (m_LastStatus + 10 < now) { - Log(LogInformation, "base", "WQ items: " + Convert::ToString(m_Items.size())); - m_LastStatus = now; - } - - lock.unlock(); wi.Callback(); } catch (const std::exception& ex) { lock.lock(); @@ -144,7 +149,7 @@ void WorkQueue::WorkerThreadProc(void) for (;;) { while (m_Items.empty() && !m_Joined) - m_CV.wait(lock); + m_CVEmpty.wait(lock); if (m_Joined) break; @@ -153,5 +158,5 @@ void WorkQueue::WorkerThreadProc(void) } m_Stopped = true; - m_CV.notify_all(); + m_CVFull.notify_all(); } diff --git a/lib/base/workqueue.h b/lib/base/workqueue.h index 5cfce709f..56b2c8df6 100644 --- a/lib/base/workqueue.h +++ b/lib/base/workqueue.h @@ -21,6 +21,7 @@ #define WORKQUEUE_H #include "base/i2-base.h" +#include "base/timer.h" #include #include #include @@ -50,7 +51,7 @@ class I2_BASE_API WorkQueue public: typedef boost::function ExceptionCallback; - WorkQueue(size_t maxItems = 2500000); + WorkQueue(size_t maxItems = 25000); ~WorkQueue(void); void Enqueue(const WorkCallback& callback, bool allowInterleaved = false); @@ -65,17 +66,19 @@ private: static int m_NextID; boost::mutex m_Mutex; - boost::condition_variable m_CV; + boost::condition_variable m_CVEmpty; + boost::condition_variable m_CVFull; boost::thread m_Thread; size_t m_MaxItems; bool m_Joined; bool m_Stopped; std::deque m_Items; ExceptionCallback m_ExceptionCallback; - double m_LastStatus; + Timer::Ptr m_StatusTimer; void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved); void WorkerThreadProc(void); + void StatusTimerHandler(void); static void DefaultExceptionCallback(boost::exception_ptr exp); };