mirror of https://github.com/Icinga/icinga2.git
parent
939b2dd40f
commit
456cfdc636
|
@ -33,8 +33,11 @@ boost::thread_specific_ptr<WorkQueue *> l_ThreadWorkQueue;
|
||||||
|
|
||||||
WorkQueue::WorkQueue(size_t maxItems, int threadCount)
|
WorkQueue::WorkQueue(size_t maxItems, int threadCount)
|
||||||
: m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
|
: m_ID(m_NextID++), m_ThreadCount(threadCount), m_Spawned(false), m_MaxItems(maxItems), m_Stopped(false),
|
||||||
m_Processing(0), m_NextTaskID(0)
|
m_Processing(0), m_NextTaskID(0), m_TaskStats(15 * 60), m_PendingTasks(0), m_PendingTasksTimestamp(0)
|
||||||
{
|
{
|
||||||
|
/* Initialize logger. */
|
||||||
|
m_StatusTimerTimeout = Utility::GetTime();
|
||||||
|
|
||||||
m_StatusTimer = new Timer();
|
m_StatusTimer = new Timer();
|
||||||
m_StatusTimer->SetInterval(10);
|
m_StatusTimer->SetInterval(10);
|
||||||
m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
|
m_StatusTimer->OnTimerExpired.connect(boost::bind(&WorkQueue::StatusTimerHandler, this));
|
||||||
|
@ -192,14 +195,41 @@ void WorkQueue::StatusTimerHandler(void)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
boost::mutex::scoped_lock lock(m_Mutex);
|
||||||
|
|
||||||
Log log(LogNotice, "WorkQueue");
|
ASSERT(!m_Name.IsEmpty());
|
||||||
|
|
||||||
log << "#" << m_ID;
|
int pending = m_Tasks.size();
|
||||||
|
|
||||||
if (!m_Name.IsEmpty())
|
double now = Utility::GetTime();
|
||||||
log << " (" << m_Name << ")";
|
double gradient = (pending - m_PendingTasks) / (now - m_PendingTasksTimestamp);
|
||||||
|
double timeToZero = pending / gradient;
|
||||||
|
|
||||||
log << " tasks: " << m_Tasks.size();
|
String timeInfo;
|
||||||
|
|
||||||
|
if (pending > GetTaskCount(5)) {
|
||||||
|
timeInfo = " empty in ";
|
||||||
|
if (timeToZero < 0)
|
||||||
|
timeInfo += "infinite time, your task handler isn't able to keep up";
|
||||||
|
else
|
||||||
|
timeInfo += Utility::FormatDuration(timeToZero);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_PendingTasks = pending;
|
||||||
|
m_PendingTasksTimestamp = now;
|
||||||
|
|
||||||
|
/* Log if there are pending items, or 5 minute timeout is reached. */
|
||||||
|
if (pending > 0 || m_StatusTimerTimeout < now) {
|
||||||
|
Log(LogInformation, "WorkQueue")
|
||||||
|
<< "#" << m_ID << " (" << m_Name << ") "
|
||||||
|
<< "items: " << pending << ", "
|
||||||
|
<< "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s "
|
||||||
|
<< "(" << GetTaskCount(60) << "/min " << GetTaskCount(60 * 5) << "/5min " << GetTaskCount(60 * 15) << "/15min);"
|
||||||
|
<< timeInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Reschedule next log entry in 5 minutes. */
|
||||||
|
if (m_StatusTimerTimeout < now) {
|
||||||
|
m_StatusTimerTimeout = now + 60 * 5;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void WorkQueue::WorkerThreadProc(void)
|
void WorkQueue::WorkerThreadProc(void)
|
||||||
|
@ -247,6 +277,8 @@ void WorkQueue::WorkerThreadProc(void)
|
||||||
_before_ we re-acquire the mutex */
|
_before_ we re-acquire the mutex */
|
||||||
task = Task();
|
task = Task();
|
||||||
|
|
||||||
|
IncreaseTaskCount();
|
||||||
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
|
||||||
m_Processing--;
|
m_Processing--;
|
||||||
|
@ -256,3 +288,16 @@ void WorkQueue::WorkerThreadProc(void)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void WorkQueue::IncreaseTaskCount(void)
|
||||||
|
{
|
||||||
|
double now = Utility::GetTime();
|
||||||
|
|
||||||
|
boost::mutex::scoped_lock lock(m_StatsMutex);
|
||||||
|
m_TaskStats.InsertValue(now, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
int WorkQueue::GetTaskCount(RingBuffer::SizeType span) const
|
||||||
|
{
|
||||||
|
boost::mutex::scoped_lock lock(m_StatsMutex);
|
||||||
|
return m_TaskStats.GetValues(span);
|
||||||
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@
|
||||||
|
|
||||||
#include "base/i2-base.hpp"
|
#include "base/i2-base.hpp"
|
||||||
#include "base/timer.hpp"
|
#include "base/timer.hpp"
|
||||||
|
#include "base/ringbuffer.hpp"
|
||||||
#include <boost/function.hpp>
|
#include <boost/function.hpp>
|
||||||
#include <boost/thread/thread.hpp>
|
#include <boost/thread/thread.hpp>
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
|
@ -93,6 +94,7 @@ public:
|
||||||
bool IsWorkerThread(void) const;
|
bool IsWorkerThread(void) const;
|
||||||
|
|
||||||
size_t GetLength(void) const;
|
size_t GetLength(void) const;
|
||||||
|
int GetTaskCount(RingBuffer::SizeType span) const;
|
||||||
|
|
||||||
void SetExceptionCallback(const ExceptionCallback& callback);
|
void SetExceptionCallback(const ExceptionCallback& callback);
|
||||||
|
|
||||||
|
@ -100,6 +102,9 @@ public:
|
||||||
std::vector<boost::exception_ptr> GetExceptions(void) const;
|
std::vector<boost::exception_ptr> GetExceptions(void) const;
|
||||||
void ReportExceptions(const String& facility) const;
|
void ReportExceptions(const String& facility) const;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
void IncreaseTaskCount(void);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int m_ID;
|
int m_ID;
|
||||||
String m_Name;
|
String m_Name;
|
||||||
|
@ -120,6 +125,12 @@ private:
|
||||||
ExceptionCallback m_ExceptionCallback;
|
ExceptionCallback m_ExceptionCallback;
|
||||||
std::vector<boost::exception_ptr> m_Exceptions;
|
std::vector<boost::exception_ptr> m_Exceptions;
|
||||||
Timer::Ptr m_StatusTimer;
|
Timer::Ptr m_StatusTimer;
|
||||||
|
double m_StatusTimerTimeout;
|
||||||
|
|
||||||
|
mutable boost::mutex m_StatsMutex;
|
||||||
|
RingBuffer m_TaskStats;
|
||||||
|
int m_PendingTasks;
|
||||||
|
double m_PendingTasksTimestamp;
|
||||||
|
|
||||||
void WorkerThreadProc(void);
|
void WorkerThreadProc(void);
|
||||||
void StatusTimerHandler(void);
|
void StatusTimerHandler(void);
|
||||||
|
|
Loading…
Reference in New Issue