From 456cfdc63616991da2fc4096584e9a8295027ca9 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Tue, 16 May 2017 12:54:37 +0200 Subject: [PATCH] Implement WorkQueue metric stats and periodic logging refs #5133 --- lib/base/workqueue.cpp | 57 +++++++++++++++++++++++++++++++++++++----- lib/base/workqueue.hpp | 11 ++++++++ 2 files changed, 62 insertions(+), 6 deletions(-) diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 04bf2f539..f3d2a6c32 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -33,8 +33,11 @@ boost::thread_specific_ptr l_ThreadWorkQueue; WorkQueue::WorkQueue(size_t maxItems, int threadCount) : m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false), - m_Processing(0), m_NextTaskID(0) + m_Processing(0), m_NextTaskID(0), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0) { + /* Initialize logger. */ + m_StatusTimerTimeout = Utility::GetTime(); + m_StatusTimer = new Timer(); m_StatusTimer->SetInterval(10); m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this)); @@ -192,14 +195,41 @@ void WorkQueue::StatusTimerHandler(void) { boost::mutex::scoped_lock lock(m_Mutex); - Log log(LogNotice, "WorkQueue"); + ASSERT(!m_Name.IsEmpty()); - log << "#" << m_ID; + int pending = m_Tasks.size(); - if (!m_Name.IsEmpty()) - log << " (" << m_Name << ")"; + double now = Utility::GetTime(); + double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp); + double timeToZero = pending / gradient; - log << " tasks: " << m_Tasks.size(); + String timeInfo; + + if (pending > GetTaskCount(5)) { + timeInfo = " empty in "; + if (timeToZero < 0) + timeInfo += "infinite time, your task handler isn't able to keep up"; + else + timeInfo += Utility::FormatDuration(timeToZero); + } + + m_PendingTasks = pending; + m_PendingTasksTimestamp = now; + + /* Log if there are pending items, or 5 minute timeout is reached. */ + if (pending > 0 || m_StatusTimerTimeout < now) { + Log(LogInformation, "WorkQueue") + << "#" << m_ID << " (" << m_Name << ") " + << "items: " << pending << ", " + << "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s " + << "(" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);" + << timeInfo; + } + + /* Reschedule next log entry in 5 minutes. */ + if (m_StatusTimerTimeout < now) { + m_StatusTimerTimeout = now + 60 * 5; + } } void WorkQueue::WorkerThreadProc(void) @@ -247,6 +277,8 @@ void WorkQueue::WorkerThreadProc(void) _before_ we re-acquire the mutex */ task = Task(); + IncreaseTaskCount(); + lock.lock(); m_Processing--; @@ -256,3 +288,16 @@ void WorkQueue::WorkerThreadProc(void) } } +void WorkQueue::IncreaseTaskCount(void) +{ + double now = Utility::GetTime(); + + boost::mutex::scoped_lock lock(m_StatsMutex); + m_TaskStats.InsertValue(now, 1); +} + +int WorkQueue::GetTaskCount(RingBuffer::SizeType span) const +{ + boost::mutex::scoped_lock lock(m_StatsMutex); + return m_TaskStats.GetValues(span); +} diff --git a/lib/base/workqueue.hpp b/lib/base/workqueue.hpp index 5fdb62579..d9ec53bdd 100644 --- a/lib/base/workqueue.hpp +++ b/lib/base/workqueue.hpp @@ -22,6 +22,7 @@ #include "base/i2-base.hpp" #include "base/timer.hpp" +#include "base/ringbuffer.hpp" #include #include #include @@ -93,6 +94,7 @@ public: bool IsWorkerThread(void) const; size_t GetLength(void) const; + int GetTaskCount(RingBuffer::SizeType span) const; void SetExceptionCallback(const ExceptionCallback& callback); @@ -100,6 +102,9 @@ public: std::vector GetExceptions(void) const; void ReportExceptions(const String& facility) const; +protected: + void IncreaseTaskCount(void); + private: int m_ID; String m_Name; @@ -120,6 +125,12 @@ private: ExceptionCallback m_ExceptionCallback; std::vector m_Exceptions; Timer::Ptr m_StatusTimer; + double m_StatusTimerTimeout; + + mutable boost::mutex m_StatsMutex; + RingBuffer m_TaskStats; + int m_PendingTasks; + double m_PendingTasksTimestamp; void WorkerThreadProc(void); void StatusTimerHandler(void);