Use fewer threads by default.

Refs #6053
This commit is contained in:
Gunnar Beutner 2014-04-22 19:47:19 +02:00
parent 63f22b055b
commit 951c61ed16
8 changed files with 97 additions and 83 deletions

View File

@ -188,7 +188,7 @@ void CheckerComponent::CheckThreadProc(void)
Log(LogDebug, "checker", "Executing check for '" + checkable->GetName() + "'");
CheckerComponent::Ptr self = GetSelf();
m_Pool.Post(boost::bind(&CheckerComponent::ExecuteCheckHelper, self, checkable));
Utility::QueueAsyncCallback(boost::bind(&CheckerComponent::ExecuteCheckHelper, self, checkable));
lock.lock();
}

View File

@ -98,8 +98,6 @@ private:
void NextCheckChangedHandler(const Checkable::Ptr& checkable);
void RescheduleCheckTimer(void);
ThreadPool m_Pool;
};
}

View File

@ -257,6 +257,9 @@ int Main(void)
("daemonize,d", "detach from the controlling terminal")
("user,u", po::value<std::string>(), "user to run Icinga as")
("group,g", po::value<std::string>(), "group to run Icinga as")
# ifdef RLIMIT_STACK
("no-stack-rlimit", "don't attempt to set RLIMIT_STACK")
# endif /* RLIMIT_STACK */
#else /* _WIN32 */
("scm", "run as a Windows service (must be the first argument if specified)")
("scm-install", "installs Icinga 2 as a Windows service (must be the first argument if specified")

View File

@ -117,8 +117,9 @@ Application::Ptr Application::GetInstance(void)
void Application::SetResourceLimits(void)
{
#ifndef _WIN32
# ifdef RLIMIT_NOFILE
rlimit rl;
# ifdef RLIMIT_NOFILE
rl.rlim_cur = 16 * 1024;
rl.rlim_max = rl.rlim_cur;
@ -137,6 +138,54 @@ void Application::SetResourceLimits(void)
# else /* RLIMIT_NPROC */
Log(LogDebug, "base", "System does not support adjusting the resource limit for number of processes (RLIMIT_NPROC)");
# endif /* RLIMIT_NPROC */
# ifdef RLIMIT_STACK
int argc = Application::GetArgC();
char **argv = Application::GetArgV();
bool set_stack_rlimit = true;
for (int i = 0; i < argc; i++) {
if (strcmp(argv[i], "--no-stack-rlimit") == 0) {
set_stack_rlimit = false;
break;
}
}
if (set_stack_rlimit) {
rl.rlim_cur = 256 * 1024;
rl.rlim_max = rl.rlim_cur;
if (setrlimit(RLIMIT_STACK, &rl) < 0)
Log(LogDebug, "base", "Could not adjust resource limit for stack size (RLIMIT_STACK)");
else {
char **new_argv = static_cast<char **>(malloc(sizeof(char *) * (argc + 2)));
if (!new_argv) {
perror("malloc");
exit(1);
}
for (int i = 0; i < argc; i++)
new_argv[i] = argv[i];
new_argv[argc] = strdup("--no-stack-rlimit");
if (!new_argv[argc]) {
perror("strdup");
exit(1);
}
new_argv[argc + 1] = NULL;
if (execvp(new_argv[0], new_argv) < 0)
perror("execvp");
exit(1);
}
}
# else /* RLIMIT_STACK */
Log(LogDebug, "base", "System does not support adjusting the resource limit for stack size (RLIMIT_STACK)");
# endif /* RLIMIT_STACK */
#endif /* _WIN32 */
}
@ -166,14 +215,30 @@ void Application::SetArgV(char **argv)
*/
void Application::RunEventLoop(void) const
{
/* Start the system time watch thread. */
boost::thread t(&Application::TimeWatchThreadProc);
t.detach();
Timer::Initialize();
while (!m_ShuttingDown && !m_Restarting)
Utility::Sleep(0.5);
double lastLoop = Utility::GetTime();
while (!m_ShuttingDown && !m_Restarting) {
/* Watches for changes to the system time. Adjusts timers if necessary. */
Utility::Sleep(2.5);
double now = Utility::GetTime();
double timeDiff = lastLoop - now;
if (abs(timeDiff) > 15) {
/* We made a significant jump in time. */
std::ostringstream msgbuf;
msgbuf << "We jumped "
<< (timeDiff < 0 ? "forward" : "backward")
<< " in time: " << abs(timeDiff) << " seconds";
Log(LogInformation, "base", msgbuf.str());
Timer::AdjustTimers(-timeDiff);
}
lastLoop = now;
}
Log(LogInformation, "base", "Shutting down Icinga...");
DynamicObject::StopObjects();
@ -194,36 +259,6 @@ void Application::OnShutdown(void)
/* Nothing to do here. */
}
/**
* Watches for changes to the system time. Adjusts timers if necessary.
*/
void Application::TimeWatchThreadProc(void)
{
Utility::SetThreadName("Time Watch");
double lastLoop = Utility::GetTime();
for (;;) {
Utility::Sleep(5);
double now = Utility::GetTime();
double timeDiff = lastLoop - now;
if (abs(timeDiff) > 15) {
/* We made a significant jump in time. */
std::ostringstream msgbuf;
msgbuf << "We jumped "
<< (timeDiff < 0 ? "forward" : "backward")
<< " in time: " << abs(timeDiff) << " seconds";
Log(LogInformation, "base", msgbuf.str());
Timer::AdjustTimers(-timeDiff);
}
lastLoop = now;
}
}
/**
* Signals the application to shut down during the next
* execution of the event loop.

View File

@ -134,9 +134,6 @@ private:
static void SigAbrtHandler(int signum);
static void ExceptionHandler(void);
static void TimeWatchThreadProc(void);
static void NewTxTimerHandler(void);
};
}

View File

@ -48,7 +48,7 @@ extern char **environ;
using namespace icinga;
#define IOTHREADS 8
#define IOTHREADS 2
static boost::mutex l_ProcessMutex[IOTHREADS];
static std::map<Process::ProcessHandle, Process::Ptr> l_Processes[IOTHREADS];

View File

@ -54,7 +54,6 @@ void ThreadPool::Start(void)
m_Queues[i].SpawnWorker(m_ThreadGroup);
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this));
m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this));
}
void ThreadPool::Stop(void)
@ -226,6 +225,8 @@ void ThreadPool::ManagerThreadProc(void)
idbuf << "TP #" << m_ID << " Manager";
Utility::SetThreadName(idbuf.str());
double lastStats = 0;
for (;;) {
size_t total_pending = 0, total_alive = 0;
double total_avg_latency = 0;
@ -250,6 +251,9 @@ void ThreadPool::ManagerThreadProc(void)
boost::mutex::scoped_lock lock(queue.Mutex);
for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++)
queue.Threads[i].UpdateUtilization();
pending = queue.Items.size();
for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) {
@ -271,9 +275,9 @@ void ThreadPool::ManagerThreadProc(void)
int tthreads = wthreads - alive;
/* Don't ever kill the last threads. */
if (alive + tthreads < 2)
tthreads = 2 - alive;
/* Don't ever kill the last thread. */
if (alive + tthreads < 1)
tthreads = 1 - alive;
/* Don't kill more than 8 threads at once. */
if (tthreads < -8)
@ -309,12 +313,18 @@ void ThreadPool::ManagerThreadProc(void)
total_utilization += utilization;
}
std::ostringstream msgbuf;
msgbuf << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: "
<< (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms"
<< "; Threads: " << total_alive
<< "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%";
Log(LogInformation, "base", msgbuf.str());
double now = Utility::GetTime();
if (lastStats < now - 15) {
lastStats = now;
std::ostringstream msgbuf;
msgbuf << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: "
<< (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms"
<< "; Threads: " << total_alive
<< "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%";
Log(LogInformation, "base", msgbuf.str());
}
}
}
@ -356,34 +366,6 @@ void ThreadPool::Queue::KillWorker(boost::thread_group& group)
}
}
void ThreadPool::StatsThreadProc(void)
{
std::ostringstream idbuf;
idbuf << "TP #" << m_ID << " Stats";
Utility::SetThreadName(idbuf.str());
for (;;) {
{
boost::mutex::scoped_lock lock(m_MgmtMutex);
if (!m_Stopped)
m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250));
if (m_Stopped)
break;
}
for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) {
Queue& queue = m_Queues[i];
boost::mutex::scoped_lock lock(queue.Mutex);
for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++)
queue.Threads[i].UpdateUtilization();
}
}
}
/**
* Note: Caller must hold queue Mutex.
*/

View File

@ -121,7 +121,6 @@ private:
Queue m_Queues[4];
void ManagerThreadProc(void);
void StatsThreadProc(void);
};
}