Improve thread spawning behavior

fixes #7186
This commit is contained in:
Gunnar Beutner 2014-09-11 11:45:21 +02:00
parent ffe5b960d0
commit ac32d4b382
6 changed files with 20 additions and 9 deletions

View File

@ -134,7 +134,7 @@ void LivestatusListener::ServerThreadProc(const Socket::Ptr& server)
try { try {
Socket::Ptr client = server->Accept(); Socket::Ptr client = server->Accept();
Log(LogNotice, "LivestatusListener", "Client connected"); Log(LogNotice, "LivestatusListener", "Client connected");
Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client)); Utility::QueueAsyncCallback(boost::bind(&LivestatusListener::ClientHandler, this, client), LowLatencyScheduler);
} catch (std::exception&) { } catch (std::exception&) {
Log(LogCritical, "ListenerListener", "Cannot accept new connection."); Log(LogCritical, "ListenerListener", "Cannot accept new connection.");
} }

View File

@ -193,9 +193,10 @@ void ThreadPool::WorkerThread::ThreadProc(Queue& queue)
* Appends a work item to the work queue. Work items will be processed in FIFO order. * Appends a work item to the work queue. Work items will be processed in FIFO order.
* *
* @param callback The callback function for the work item. * @param callback The callback function for the work item.
* @param policy The scheduling policy
* @returns true if the item was queued, false otherwise. * @returns true if the item was queued, false otherwise.
*/ */
bool ThreadPool::Post(const ThreadPool::WorkFunction& callback) bool ThreadPool::Post(const ThreadPool::WorkFunction& callback, SchedulerPolicy policy)
{ {
WorkItem wi; WorkItem wi;
wi.Callback = callback; wi.Callback = callback;
@ -209,6 +210,9 @@ bool ThreadPool::Post(const ThreadPool::WorkFunction& callback)
if (queue.Stopped) if (queue.Stopped)
return false; return false;
if (policy == LowLatencyScheduler)
queue.SpawnWorker(m_ThreadGroup);
queue.Items.push_back(wi); queue.Items.push_back(wi);
queue.CV.notify_one(); queue.CV.notify_one();
} }
@ -233,7 +237,7 @@ void ThreadPool::ManagerThreadProc(void)
boost::mutex::scoped_lock lock(m_MgmtMutex); boost::mutex::scoped_lock lock(m_MgmtMutex);
if (!m_Stopped) if (!m_Stopped)
m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5)); m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(500));
if (m_Stopped) if (m_Stopped)
break; break;
@ -273,7 +277,7 @@ void ThreadPool::ManagerThreadProc(void)
int tthreads = wthreads - alive; int tthreads = wthreads - alive;
/* Make sure there is at least one thread per CPU */ /* Make sure there is at least one thread per CPU */
int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 1U); int ncput = std::max(boost::thread::hardware_concurrency() / QUEUECOUNT, 4U);
if (alive + tthreads < ncput) if (alive + tthreads < ncput)
tthreads = ncput - alive; tthreads = ncput - alive;

View File

@ -32,6 +32,12 @@ namespace icinga
#define QUEUECOUNT 4 #define QUEUECOUNT 4
enum SchedulerPolicy
{
DefaultScheduler,
LowLatencyScheduler
};
/** /**
* A thread pool. * A thread pool.
* *
@ -49,7 +55,7 @@ public:
void Stop(void); void Stop(void);
void Join(bool wait_for_stop = false); void Join(bool wait_for_stop = false);
bool Post(const WorkFunction& callback); bool Post(const WorkFunction& callback, SchedulerPolicy policy = DefaultScheduler);
private: private:
enum ThreadState enum ThreadState

View File

@ -684,9 +684,9 @@ void Utility::SetNonBlockingSocket(SOCKET s)
#endif /* _WIN32 */ #endif /* _WIN32 */
} }
void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback) void Utility::QueueAsyncCallback(const boost::function<void (void)>& callback, SchedulerPolicy policy)
{ {
Application::GetTP().Post(callback); Application::GetTP().Post(callback, policy);
} }
String Utility::NaturalJoin(const std::vector<String>& tokens) String Utility::NaturalJoin(const std::vector<String>& tokens)

View File

@ -26,6 +26,7 @@
#include <boost/function.hpp> #include <boost/function.hpp>
#include <boost/thread/tss.hpp> #include <boost/thread/tss.hpp>
#include <vector> #include <vector>
#include "base/threadpool.hpp"
namespace icinga namespace icinga
{ {
@ -91,7 +92,7 @@ public:
static bool MkDir(const String& path, int flags); static bool MkDir(const String& path, int flags);
static bool MkDirP(const String& path, int flags); static bool MkDirP(const String& path, int flags);
static void QueueAsyncCallback(const boost::function<void (void)>& callback); static void QueueAsyncCallback(const boost::function<void (void)>& callback, SchedulerPolicy policy = DefaultScheduler);
static String NaturalJoin(const std::vector<String>& tokens); static String NaturalJoin(const std::vector<String>& tokens);

View File

@ -204,7 +204,7 @@ void ApiListener::ListenerThreadProc(const Socket::Ptr& server)
for (;;) { for (;;) {
try { try {
Socket::Ptr client = server->Accept(); Socket::Ptr client = server->Accept();
Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer)); Utility::QueueAsyncCallback(boost::bind(&ApiListener::NewClientHandler, this, client, RoleServer), LowLatencyScheduler);
} catch (const std::exception&) { } catch (const std::exception&) {
Log(LogCritical, "ApiListener", "Cannot accept new connection."); Log(LogCritical, "ApiListener", "Cannot accept new connection.");
} }