diff --git a/components/livestatus/livestatuslistener.cpp b/components/livestatus/livestatuslistener.cpp index 7ff0be005..5c67c35ce 100644 --- a/components/livestatus/livestatuslistener.cpp +++ b/components/livestatus/livestatuslistener.cpp @@ -134,7 +134,7 @@ void LivestatusListener::ServerThreadProc(const Socket::Ptr& server) try { Socket::Ptr client = server->Accept(); Log(LogNotice, "LivestatusListener", "Client connected"); - Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client)); + Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client), LowLatencyScheduler); } catch (std::exception&) { Log(LogCritical, "ListenerListener", "Cannot accept new connection."); } diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index 92fe9f4a1..5197bf7bc 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -193,9 +193,10 @@ void ThreadPool::WorkerThread::ThreadProc(Queue& queue) * Appends a work item to the work queue. Work items will be processed in FIFO order. * * @param callback The callback function for the work item. + * @param policy The scheduling policy * @returns true if the item was queued, false otherwise. */ -bool ThreadPool::Post(const ThreadPool::WorkFunction& callback) +bool ThreadPool::Post(const ThreadPool::WorkFunction& callback, SchedulerPolicy policy) { WorkItem wi; wi.Callback = callback; @@ -209,6 +210,9 @@ bool ThreadPool::Post(const ThreadPool::WorkFunction& callback) if (queue.Stopped) return false; + if (policy == LowLatencyScheduler) + queue.SpawnWorker(m_ThreadGroup); + queue.Items.push_back(wi); queue.CV.notify_one(); } @@ -233,7 +237,7 @@ void ThreadPool::ManagerThreadProc(void) boost::mutex::scoped_lock lock(m_MgmtMutex); if (!m_Stopped) - m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5)); + m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(500)); if (m_Stopped) break; @@ -273,7 +277,7 @@ void ThreadPool::ManagerThreadProc(void) int tthreads = wthreads - alive; /* Make sure there is at least one thread per CPU */ - int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 1U); + int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 4U); if (alive + tthreads < ncput) tthreads = ncput - alive; diff --git a/lib/base/threadpool.hpp b/lib/base/threadpool.hpp index 518d7a14b..c34fd80a8 100644 --- a/lib/base/threadpool.hpp +++ b/lib/base/threadpool.hpp @@ -32,6 +32,12 @@ namespace icinga #define QUEUECOUNT 4 +enum SchedulerPolicy +{ + DefaultScheduler, + LowLatencyScheduler +}; + /** * A thread pool. * @@ -49,7 +55,7 @@ public: void Stop(void); void Join(bool wait_for_stop = false); - bool Post(const WorkFunction& callback); + bool Post(const WorkFunction& callback, SchedulerPolicy policy = DefaultScheduler); private: enum ThreadState diff --git a/lib/base/utility.cpp b/lib/base/utility.cpp index 951389421..fc348bcea 100644 --- a/lib/base/utility.cpp +++ b/lib/base/utility.cpp @@ -684,9 +684,9 @@ void Utility::SetNonBlockingSocket(SOCKET s) #endif /* _WIN32 */ } -void Utility::QueueAsyncCallback(const boost::function& callback) +void Utility::QueueAsyncCallback(const boost::function& callback, SchedulerPolicy policy) { - Application::GetTP().Post(callback); + Application::GetTP().Post(callback, policy); } String Utility::NaturalJoin(const std::vector& tokens) diff --git a/lib/base/utility.hpp b/lib/base/utility.hpp index f9fa9eebb..e6e133a76 100644 --- a/lib/base/utility.hpp +++ b/lib/base/utility.hpp @@ -26,6 +26,7 @@ #include #include #include +#include "base/threadpool.hpp" namespace icinga { @@ -91,7 +92,7 @@ public: static bool MkDir(const String& path, int flags); static bool MkDirP(const String& path, int flags); - static void QueueAsyncCallback(const boost::function& callback); + static void QueueAsyncCallback(const boost::function& callback, SchedulerPolicy policy = DefaultScheduler); static String NaturalJoin(const std::vector& tokens); diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index 64474390e..d2e19a284 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -204,7 +204,7 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server) for (;;) { try { Socket::Ptr client = server->Accept(); - Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer)); + Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer), LowLatencyScheduler); } catch (const std::exception&) { Log(LogCritical, "ApiListener", "Cannot accept new connection."); }