EventQueue: Further changes to how we determine the optimal number of threads.

This commit is contained in:
Gunnar Beutner 2013-03-25 16:12:25 +01:00
parent 167be058f0
commit e739dfd88f
3 changed files with 84 additions and 36 deletions

View File

@ -22,6 +22,7 @@
#include "base/convert.h" #include "base/convert.h"
#include "base/utility.h" #include "base/utility.h"
#include <sstream> #include <sstream>
#include <iostream>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/exception/diagnostic_information.hpp> #include <boost/exception/diagnostic_information.hpp>
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
@ -29,7 +30,7 @@
using namespace icinga; using namespace icinga;
EventQueue::EventQueue(void) EventQueue::EventQueue(void)
: m_Stopped(false), m_ThreadDeaths(0), m_Latency(0), m_LatencyCount(0) : m_Stopped(false), m_ThreadDeaths(0), m_WaitTime(0), m_ServiceTime(0), m_TaskCount(0)
{ {
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++)
m_ThreadStates[i] = ThreadDead; m_ThreadStates[i] = ThreadDead;
@ -76,6 +77,9 @@ void EventQueue::QueueThreadProc(int tid)
for (;;) { for (;;) {
EventQueueWorkItem event; EventQueueWorkItem event;
double ws = Utility::GetTime();
double st;
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
@ -96,19 +100,11 @@ void EventQueue::QueueThreadProc(int tid)
m_Events.pop_front(); m_Events.pop_front();
m_ThreadStates[tid] = ThreadBusy; m_ThreadStates[tid] = ThreadBusy;
st = Utility::GetTime();
double latency = Utility::GetTime() - event.Timestamp; UpdateThreadUtilization(tid, st - ws, 0);
m_Latency += latency;
m_LatencyCount++;
if (latency > m_MaxLatency)
m_MaxLatency = latency;
} }
#ifdef _DEBUG #ifdef _DEBUG
double st = Utility::GetTime();
# ifdef RUSAGE_THREAD # ifdef RUSAGE_THREAD
struct rusage usage_start, usage_end; struct rusage usage_start, usage_end;
@ -128,8 +124,23 @@ void EventQueue::QueueThreadProc(int tid)
Log(LogCritical, "base", "Exception of unknown type thrown in event handler."); Log(LogCritical, "base", "Exception of unknown type thrown in event handler.");
} }
#ifdef _DEBUG
double et = Utility::GetTime(); double et = Utility::GetTime();
double latency = st - event.Timestamp;
{
boost::mutex::scoped_lock lock(m_Mutex);
m_WaitTime += latency;
m_ServiceTime += et - st;
m_TaskCount++;
if (latency > m_MaxLatency)
m_MaxLatency = latency;
UpdateThreadUtilization(tid, et - st, 1);
}
#ifdef _DEBUG
# ifdef RUSAGE_THREAD # ifdef RUSAGE_THREAD
(void) getrusage(RUSAGE_THREAD, &usage_end); (void) getrusage(RUSAGE_THREAD, &usage_end);
@ -190,7 +201,7 @@ void EventQueue::ManagerThreadProc(void)
double now = Utility::GetTime(); double now = Utility::GetTime();
int pending, alive, busy; int pending, alive;
double avg_latency, max_latency; double avg_latency, max_latency;
{ {
@ -198,39 +209,58 @@ void EventQueue::ManagerThreadProc(void)
pending = m_Events.size(); pending = m_Events.size();
alive = 0; alive = 0;
busy = 0;
double util = 0;
int hg = 0;
for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) { for (int i = 0; i < sizeof(m_ThreadStates) / sizeof(m_ThreadStates[0]); i++) {
if (m_ThreadStates[i] != ThreadDead) if (m_ThreadStates[i] != ThreadDead) {
alive++; alive++;
util += m_ThreadUtilization[i] * 100;
if (m_ThreadStates[i] == ThreadBusy) std::cout << (int)(m_ThreadUtilization[i] * 100) << "\t";
busy++; hg++;
if (hg % 25 == 0)
std::cout << std::endl;
}
} }
if (m_LatencyCount > 0) util /= alive;
avg_latency = m_Latency / (m_LatencyCount * 1.0);
std::cout << std::endl;
if (m_TaskCount > 0)
avg_latency = m_WaitTime / (m_TaskCount * 1.0);
else else
avg_latency = 0; avg_latency = 0;
m_Latency = 0; std::cout << "Wait time: " << m_WaitTime << "; Service time: " << m_ServiceTime << "; tasks: " << m_TaskCount << std::endl;
m_LatencyCount = 0; std::cout << "Thread util: " << util << std::endl;
if (util < 60 || util > 80) {
int tthreads = ceil((util * alive) / 80.0) - alive;
if (alive + tthreads < 2)
tthreads = 2 - alive;
std::cout << "Target threads: " << tthreads << "; Alive: " << alive << std::endl;
for (int i = 0; i < -tthreads; i++)
KillWorker();
for (int i = 0; i < tthreads; i++)
SpawnWorker();
}
m_WaitTime = 0;
m_ServiceTime = 0;
m_TaskCount = 0;
max_latency = m_MaxLatency; max_latency = m_MaxLatency;
m_MaxLatency = 0; m_MaxLatency = 0;
if (max_latency > 0.1) {
/* Spawn a few additional workers. */
for (int i = 0; i < 8; i++)
SpawnWorker();
} else if (alive > busy + 2) {
KillWorker();
}
} }
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "Pending tasks: " << pending << "; Busy threads: " << busy msgbuf << "Pending tasks: " << pending << "; Average latency: " << (long)(avg_latency * 1000) << "ms"
<< "; Idle threads: " << alive - busy << "; Average latency: " << (long)(avg_latency * 1000) << "ms"
<< "; Max latency: " << (long)(max_latency * 1000) << "ms"; << "; Max latency: " << (long)(max_latency * 1000) << "ms";
Log(LogInformation, "base", msgbuf.str()); Log(LogInformation, "base", msgbuf.str());
} }
@ -246,6 +276,7 @@ void EventQueue::SpawnWorker(void)
Log(LogDebug, "debug", "Spawning worker thread."); Log(LogDebug, "debug", "Spawning worker thread.");
m_ThreadStates[i] = ThreadIdle; m_ThreadStates[i] = ThreadIdle;
m_ThreadUtilization[i] = 0;
boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i)); boost::thread worker(boost::bind(&EventQueue::QueueThreadProc, this, i));
worker.detach(); worker.detach();
@ -263,3 +294,16 @@ void EventQueue::KillWorker(void)
m_ThreadDeaths++; m_ThreadDeaths++;
} }
/**
* Note: Caller must hold m_Mutex.
*/
void EventQueue::UpdateThreadUtilization(int tid, double time, double utilization)
{
const double avg_time = 5.0;
if (time > avg_time)
time = avg_time;
m_ThreadUtilization[tid] = (m_ThreadUtilization[tid] * (avg_time - time) + utilization * time) / avg_time;
}

View File

@ -63,10 +63,12 @@ public:
private: private:
ThreadState m_ThreadStates[512]; ThreadState m_ThreadStates[512];
double m_ThreadUtilization[512];
int m_ThreadDeaths; int m_ThreadDeaths;
double m_Latency; double m_WaitTime;
int m_LatencyCount; double m_ServiceTime;
int m_TaskCount;
double m_MaxLatency; double m_MaxLatency;
@ -81,6 +83,8 @@ private:
void SpawnWorker(void); void SpawnWorker(void);
void KillWorker(void); void KillWorker(void);
void UpdateThreadUtilization(int tid, double time, double utilization);
}; };
} }

View File

@ -120,9 +120,9 @@ void PythonInterpreter::ProcessCall(const ScriptTask::Ptr& task, const String& f
Value vresult = PythonLanguage::MarshalFromPython(result); Value vresult = PythonLanguage::MarshalFromPython(result);
Py_DECREF(result); Py_DECREF(result);
Utility::QueueAsyncCallback(boost::bind(&ScriptTask::FinishResult, task, vresult)); task->FinishResult(vresult);
} catch (...) { } catch (...) {
Utility::QueueAsyncCallback(boost::bind(&ScriptTask::FinishException, task, boost::current_exception())); task->FinishException(boost::current_exception());
} }
m_Language->SetCurrentInterpreter(interp); m_Language->SetCurrentInterpreter(interp);