mirror of https://github.com/Icinga/icinga2.git
More EventQueue tuning.
This commit is contained in:
parent
8b065b36df
commit
167be058f0
|
@ -37,8 +37,8 @@ EventQueue::EventQueue(void)
|
||||||
for (int i = 0; i < 2; i++)
|
for (int i = 0; i < 2; i++)
|
||||||
SpawnWorker();
|
SpawnWorker();
|
||||||
|
|
||||||
boost::thread reportThread(boost::bind(&EventQueue::ReportThreadProc, this));
|
boost::thread managerThread(boost::bind(&EventQueue::ManagerThreadProc, this));
|
||||||
reportThread.detach();
|
managerThread.detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
EventQueue::~EventQueue(void)
|
EventQueue::~EventQueue(void)
|
||||||
|
@ -97,8 +97,13 @@ void EventQueue::QueueThreadProc(int tid)
|
||||||
|
|
||||||
m_ThreadStates[tid] = ThreadBusy;
|
m_ThreadStates[tid] = ThreadBusy;
|
||||||
|
|
||||||
m_Latency += Utility::GetTime() - event.Timestamp;
|
double latency = Utility::GetTime() - event.Timestamp;
|
||||||
|
|
||||||
|
m_Latency += latency;
|
||||||
m_LatencyCount++;
|
m_LatencyCount++;
|
||||||
|
|
||||||
|
if (latency > m_MaxLatency)
|
||||||
|
m_MaxLatency = latency;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef _DEBUG
|
#ifdef _DEBUG
|
||||||
|
@ -178,7 +183,7 @@ void EventQueue::Post(const EventQueueCallback& callback)
|
||||||
m_CV.notify_one();
|
m_CV.notify_one();
|
||||||
}
|
}
|
||||||
|
|
||||||
void EventQueue::ReportThreadProc(void)
|
void EventQueue::ManagerThreadProc(void)
|
||||||
{
|
{
|
||||||
for (;;) {
|
for (;;) {
|
||||||
Utility::Sleep(5);
|
Utility::Sleep(5);
|
||||||
|
@ -186,7 +191,7 @@ void EventQueue::ReportThreadProc(void)
|
||||||
double now = Utility::GetTime();
|
double now = Utility::GetTime();
|
||||||
|
|
||||||
int pending, alive, busy;
|
int pending, alive, busy;
|
||||||
double avg_latency;
|
double avg_latency, max_latency;
|
||||||
|
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock lock(m_Mutex);
|
boost::mutex::scoped_lock lock(m_Mutex);
|
||||||
|
@ -211,7 +216,10 @@ void EventQueue::ReportThreadProc(void)
|
||||||
m_Latency = 0;
|
m_Latency = 0;
|
||||||
m_LatencyCount = 0;
|
m_LatencyCount = 0;
|
||||||
|
|
||||||
if (pending > alive - busy) {
|
max_latency = m_MaxLatency;
|
||||||
|
m_MaxLatency = 0;
|
||||||
|
|
||||||
|
if (max_latency > 0.1) {
|
||||||
/* Spawn a few additional workers. */
|
/* Spawn a few additional workers. */
|
||||||
for (int i = 0; i < 8; i++)
|
for (int i = 0; i < 8; i++)
|
||||||
SpawnWorker();
|
SpawnWorker();
|
||||||
|
@ -221,7 +229,9 @@ void EventQueue::ReportThreadProc(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
std::ostringstream msgbuf;
|
std::ostringstream msgbuf;
|
||||||
msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy << "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms";
|
msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy
|
||||||
|
<< "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms"
|
||||||
|
<< "; Max latency: " << (long)(max_latency * 1000) << "ms";
|
||||||
Log(LogInformation, "base", msgbuf.str());
|
Log(LogInformation, "base", msgbuf.str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -233,7 +243,7 @@ void EventQueue::SpawnWorker(void)
|
||||||
{
|
{
|
||||||
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
|
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
|
||||||
if (m_ThreadStates[i] == ThreadDead) {
|
if (m_ThreadStates[i] == ThreadDead) {
|
||||||
Log(LogInformation, "debug", "Spawning worker thread.");
|
Log(LogDebug, "debug", "Spawning worker thread.");
|
||||||
|
|
||||||
m_ThreadStates[i] = ThreadIdle;
|
m_ThreadStates[i] = ThreadIdle;
|
||||||
boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i));
|
boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i));
|
||||||
|
@ -249,7 +259,7 @@ void EventQueue::SpawnWorker(void)
|
||||||
*/
|
*/
|
||||||
void EventQueue::KillWorker(void)
|
void EventQueue::KillWorker(void)
|
||||||
{
|
{
|
||||||
Log(LogInformation, "base", "Killing worker thread.");
|
Log(LogDebug, "base", "Killing worker thread.");
|
||||||
|
|
||||||
m_ThreadDeaths++;
|
m_ThreadDeaths++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,8 @@ private:
|
||||||
double m_Latency;
|
double m_Latency;
|
||||||
int m_LatencyCount;
|
int m_LatencyCount;
|
||||||
|
|
||||||
|
double m_MaxLatency;
|
||||||
|
|
||||||
boost::mutex m_Mutex;
|
boost::mutex m_Mutex;
|
||||||
boost::condition_variable m_CV;
|
boost::condition_variable m_CV;
|
||||||
|
|
||||||
|
@ -75,7 +77,7 @@ private:
|
||||||
std::deque<EventQueueWorkItem> m_Events;
|
std::deque<EventQueueWorkItem> m_Events;
|
||||||
|
|
||||||
void QueueThreadProc(int tid);
|
void QueueThreadProc(int tid);
|
||||||
void ReportThreadProc(void);
|
void ManagerThreadProc(void);
|
||||||
|
|
||||||
void SpawnWorker(void);
|
void SpawnWorker(void);
|
||||||
void KillWorker(void);
|
void KillWorker(void);
|
||||||
|
|
Loading…
Reference in New Issue