diff --git a/lib/base/application.cpp b/lib/base/application.cpp index a3ee1c532..742384d55 100644 --- a/lib/base/application.cpp +++ b/lib/base/application.cpp @@ -181,7 +181,7 @@ void Application::RunEventLoop(void) const GetTP().Stop(); m_ShuttingDown = false; - GetTP().Join(); + GetTP().Join(true); Timer::Uninitialize(); #endif /* _DEBUG */ diff --git a/lib/base/serializer.cpp b/lib/base/serializer.cpp index a23450c63..7f5faa900 100644 --- a/lib/base/serializer.cpp +++ b/lib/base/serializer.cpp @@ -161,21 +161,24 @@ static Object::Ptr DeserializeObject(const Object::Ptr& object, const Dictionary if (!instance) instance = type->Instantiate(); - for (int i = 0; i < type->GetFieldCount(); i++) { - Field field = type->GetFieldInfo(i); + BOOST_FOREACH(const Dictionary::Pair& kv, input) { + if (kv.first.IsEmpty()) + continue; + + int fid = type->GetFieldId(kv.first); + + if (fid < 0) + continue; + + Field field = type->GetFieldInfo(fid); if ((field.Attributes & attributeTypes) == 0) continue; - Value value = input->Get(field.Name); - - if (value.IsEmpty()) - continue; - try { - instance->SetField(i, Deserialize(value, attributeTypes)); + instance->SetField(fid, Deserialize(kv.second, attributeTypes)); } catch (const std::exception&) { - instance->SetField(i, Empty); + instance->SetField(fid, Empty); } } diff --git a/lib/base/threadpool.cpp b/lib/base/threadpool.cpp index f15d07c24..e4cf9c0ce 100644 --- a/lib/base/threadpool.cpp +++ b/lib/base/threadpool.cpp @@ -33,79 +33,95 @@ using namespace icinga; int ThreadPool::m_NextID = 1; -ThreadPool::ThreadPool(void) - : m_ID(m_NextID++), m_WaitTime(0), m_ServiceTime(0), - m_TaskCount(0), m_Stopped(false) +ThreadPool::ThreadPool(int max_threads) + : m_ID(m_NextID++), m_Stopped(false), m_MaxThreads(max_threads) { - for (int i = 0; i < 2; i++) - SpawnWorker(); + if (m_MaxThreads != -1 && m_MaxThreads < sizeof(m_Queues) / sizeof(m_Queues[0])) + m_MaxThreads = sizeof(m_Queues) / sizeof(m_Queues[0]); - m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); - m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); + Start(); } ThreadPool::~ThreadPool(void) { Stop(); - Join(); + Join(true); +} + +void ThreadPool::Start(void) +{ + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) + m_Queues[i].SpawnWorker(m_ThreadGroup); + + m_ThreadGroup.create_thread(boost::bind(&ThreadPool::ManagerThreadProc, this)); + m_ThreadGroup.create_thread(boost::bind(&ThreadPool::StatsThreadProc, this)); } void ThreadPool::Stop(void) { - boost::mutex::scoped_lock lock(m_Mutex); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + boost::mutex::scoped_lock lock(m_Queues[i].Mutex); + m_Queues[i].Stopped = true; + m_Queues[i].CV.notify_all(); + } + + boost::mutex::scoped_lock lock(m_MgmtMutex); m_Stopped = true; - m_WorkCV.notify_all(); m_MgmtCV.notify_all(); } /** * Waits for all worker threads to finish. */ -void ThreadPool::Join(void) +void ThreadPool::Join(bool wait_for_stop) { - { - boost::mutex::scoped_lock lock(m_Mutex); - - while (!m_Stopped || !m_WorkItems.empty()) { - lock.unlock(); - Utility::Sleep(0.5); - lock.lock(); - } + if (wait_for_stop) { + m_ThreadGroup.join_all(); + return; } - m_ThreadGroup.join_all(); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + boost::mutex::scoped_lock lock(m_Queues[i].Mutex); + + while (!m_Queues[i].Items.empty()) + m_Queues[i].CVStarved.wait(lock); + } } /** * Waits for work items and processes them. */ -void ThreadPool::QueueThreadProc(int tid) +void ThreadPool::WorkerThread::ThreadProc(Queue& queue) { std::ostringstream idbuf; - idbuf << "TP #" << m_ID << " W #" << tid; + idbuf << "Q #" << &queue << " W #" << this; Utility::SetThreadName(idbuf.str()); for (;;) { WorkItem wi; { - boost::mutex::scoped_lock lock(m_Mutex); + boost::mutex::scoped_lock lock(queue.Mutex); - UpdateThreadUtilization(tid, ThreadIdle); + UpdateUtilization(ThreadIdle); - while (m_WorkItems.empty() && !m_Stopped && !m_Threads[tid].Zombie) - m_WorkCV.wait(lock); + while (queue.Items.empty() && !queue.Stopped && !Zombie) { + if (queue.Items.empty()) + queue.CVStarved.notify_all(); - if (m_Threads[tid].Zombie) + queue.CV.wait(lock); + } + + if (Zombie) break; - if (m_WorkItems.empty() && m_Stopped) + if (queue.Items.empty() && queue.Stopped) break; - wi = m_WorkItems.front(); - m_WorkItems.pop_front(); + wi = queue.Items.front(); + queue.Items.pop_front(); - UpdateThreadUtilization(tid, ThreadBusy); + UpdateUtilization(ThreadBusy); } double st = Utility::GetTime();; @@ -134,14 +150,11 @@ void ThreadPool::QueueThreadProc(int tid) double latency = st - wi.Timestamp; { - boost::mutex::scoped_lock lock(m_Mutex); + boost::mutex::scoped_lock lock(queue.Mutex); - m_WaitTime += latency; - m_ServiceTime += et - st; - m_TaskCount++; - - if (latency > m_MaxLatency) - m_MaxLatency = latency; + queue.WaitTime += latency; + queue.ServiceTime += et - st; + queue.TaskCount++; } #ifdef _DEBUG @@ -175,9 +188,9 @@ void ThreadPool::QueueThreadProc(int tid) #endif /* _DEBUG */ } - boost::mutex::scoped_lock lock(m_Mutex); - UpdateThreadUtilization(tid, ThreadDead); - m_Threads[tid].Zombie = false; + boost::mutex::scoped_lock lock(queue.Mutex); + UpdateUtilization(ThreadDead); + Zombie = false; } /** @@ -192,14 +205,16 @@ bool ThreadPool::Post(const ThreadPool::WorkFunction& callback) wi.Callback = callback; wi.Timestamp = Utility::GetTime(); - { - boost::mutex::scoped_lock lock(m_Mutex); + Queue& queue = m_Queues[Utility::Random() % (sizeof(m_Queues) / sizeof(m_Queues[0]))]; - if (m_Stopped) + { + boost::mutex::scoped_lock lock(queue.Mutex); + + if (queue.Stopped) return false; - m_WorkItems.push_back(wi); - m_WorkCV.notify_one(); + queue.Items.push_back(wi); + queue.CV.notify_one(); } return true; @@ -212,34 +227,42 @@ void ThreadPool::ManagerThreadProc(void) Utility::SetThreadName(idbuf.str()); for (;;) { - size_t pending, alive; - double avg_latency, max_latency; - double utilization = 0; + size_t total_pending = 0, total_alive = 0; + double total_avg_latency = 0; + double total_utilization = 0; { - boost::mutex::scoped_lock lock(m_Mutex); + boost::mutex::scoped_lock lock(m_MgmtMutex); if (!m_Stopped) m_MgmtCV.timed_wait(lock, boost::posix_time::seconds(5)); if (m_Stopped) break; + } - pending = m_WorkItems.size(); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + size_t pending, alive = 0; + double avg_latency; + double utilization = 0; - alive = 0; + Queue& queue = m_Queues[i]; - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State != ThreadDead && !m_Threads[i].Zombie) { + boost::mutex::scoped_lock lock(queue.Mutex); + + pending = queue.Items.size(); + + for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) { + if (queue.Threads[i].State != ThreadDead && !queue.Threads[i].Zombie) { alive++; - utilization += m_Threads[i].Utilization * 100; + utilization += queue.Threads[i].Utilization * 100; } } utilization /= alive; - if (m_TaskCount > 0) - avg_latency = m_WaitTime / (m_TaskCount * 1.0); + if (queue.TaskCount > 0) + avg_latency = queue.WaitTime / (queue.TaskCount * 1.0); else avg_latency = 0; @@ -248,9 +271,9 @@ void ThreadPool::ManagerThreadProc(void) int tthreads = wthreads - alive; - /* Don't ever kill the last 8 threads. */ - if (alive + tthreads < 8) - tthreads = 8 - alive; + /* Don't ever kill the last threads. */ + if (alive + tthreads < 2) + tthreads = 2 - alive; /* Don't kill more than 8 threads at once. */ if (tthreads < -8) @@ -258,33 +281,37 @@ void ThreadPool::ManagerThreadProc(void) /* Spawn more workers if there are outstanding work items. */ if (tthreads > 0 && pending > 0) - tthreads = (Utility::GetTime() - Application::GetStartTime() < 300) ? 128 : 8; + tthreads = 8; + + if (m_MaxThreads != -1 && (alive + tthreads) * (sizeof(m_Queues) / sizeof(m_Queues[0])) > m_MaxThreads) + tthreads = m_MaxThreads / (sizeof(m_Queues) / sizeof(m_Queues[0])) - alive; std::ostringstream msgbuf; msgbuf << "Thread pool; current: " << alive << "; adjustment: " << tthreads; Log(LogDebug, "base", msgbuf.str()); for (int i = 0; i < -tthreads; i++) - KillWorker(); + queue.KillWorker(m_ThreadGroup); for (int i = 0; i < tthreads; i++) - SpawnWorker(); + queue.SpawnWorker(m_ThreadGroup); } - m_WaitTime = 0; - m_ServiceTime = 0; - m_TaskCount = 0; + queue.WaitTime = 0; + queue.ServiceTime = 0; + queue.TaskCount = 0; - max_latency = m_MaxLatency; - m_MaxLatency = 0; + total_pending += pending; + total_alive += alive; + total_avg_latency += avg_latency; + total_utilization += utilization; } std::ostringstream msgbuf; - msgbuf << "Pool #" << m_ID << ": Pending tasks: " << pending << "; Average latency: " - << (long)(avg_latency * 1000) << "ms" - << "; Max latency: " << (long)(max_latency * 1000) << "ms" - << "; Threads: " << alive - << "; Pool utilization: " << utilization << "%"; + msgbuf << "Pool #" << m_ID << ": Pending tasks: " << total_pending << "; Average latency: " + << (long)(total_avg_latency * 1000 / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "ms" + << "; Threads: " << total_alive + << "; Pool utilization: " << (total_utilization / (sizeof(m_Queues) / sizeof(m_Queues[0]))) << "%"; Log(LogInformation, "base", msgbuf.str()); } } @@ -292,14 +319,14 @@ void ThreadPool::ManagerThreadProc(void) /** * Note: Caller must hold m_Mutex */ -void ThreadPool::SpawnWorker(void) +void ThreadPool::Queue::SpawnWorker(boost::thread_group& group) { - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State == ThreadDead) { + for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) { + if (Threads[i].State == ThreadDead) { Log(LogDebug, "debug", "Spawning worker thread."); - m_Threads[i] = WorkerThread(ThreadIdle); - m_Threads[i].Thread = m_ThreadGroup.create_thread(boost::bind(&ThreadPool::QueueThreadProc, this, i)); + Threads[i] = WorkerThread(ThreadIdle); + Threads[i].Thread = group.create_thread(boost::bind(&ThreadPool::WorkerThread::ThreadProc, boost::ref(Threads[i]), boost::ref(*this))); break; } @@ -307,20 +334,20 @@ void ThreadPool::SpawnWorker(void) } /** - * Note: Caller must hold m_Mutex. + * Note: Caller must hold Mutex. */ -void ThreadPool::KillWorker(void) +void ThreadPool::Queue::KillWorker(boost::thread_group& group) { - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) { - if (m_Threads[i].State == ThreadIdle && !m_Threads[i].Zombie) { + for (size_t i = 0; i < sizeof(Threads) / sizeof(Threads[0]); i++) { + if (Threads[i].State == ThreadIdle && !Threads[i].Zombie) { Log(LogDebug, "base", "Killing worker thread."); - m_ThreadGroup.remove_thread(m_Threads[i].Thread); - m_Threads[i].Thread->detach(); - delete m_Threads[i].Thread; + group.remove_thread(Threads[i].Thread); + Threads[i].Thread->detach(); + delete Threads[i].Thread; - m_Threads[i].Zombie = true; - m_WorkCV.notify_all(); + Threads[i].Zombie = true; + CV.notify_all(); break; } @@ -334,27 +361,35 @@ void ThreadPool::StatsThreadProc(void) Utility::SetThreadName(idbuf.str()); for (;;) { - boost::mutex::scoped_lock lock(m_Mutex); + { + boost::mutex::scoped_lock lock(m_MgmtMutex); - if (!m_Stopped) - m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250)); + if (!m_Stopped) + m_MgmtCV.timed_wait(lock, boost::posix_time::milliseconds(250)); - if (m_Stopped) - break; + if (m_Stopped) + break; + } - for (size_t i = 0; i < sizeof(m_Threads) / sizeof(m_Threads[0]); i++) - UpdateThreadUtilization(i); + for (int i = 0; i < sizeof(m_Queues) / sizeof(m_Queues[0]); i++) { + Queue& queue = m_Queues[i]; + + boost::mutex::scoped_lock lock(queue.Mutex); + + for (size_t i = 0; i < sizeof(queue.Threads) / sizeof(queue.Threads[0]); i++) + queue.Threads[i].UpdateUtilization(); + } } } /** - * Note: Caller must hold m_Mutex. + * Note: Caller must hold queue Mutex. */ -void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) +void ThreadPool::WorkerThread::UpdateUtilization(ThreadState state) { double utilization; - switch (m_Threads[tid].State) { + switch (State) { case ThreadDead: return; case ThreadIdle: @@ -368,16 +403,16 @@ void ThreadPool::UpdateThreadUtilization(int tid, ThreadState state) } double now = Utility::GetTime(); - double time = now - m_Threads[tid].LastUpdate; + double time = now - LastUpdate; const double avg_time = 5.0; if (time > avg_time) time = avg_time; - m_Threads[tid].Utilization = (m_Threads[tid].Utilization * (avg_time - time) + utilization * time) / avg_time; - m_Threads[tid].LastUpdate = now; + Utilization = (Utilization * (avg_time - time) + utilization * time) / avg_time; + LastUpdate = now; if (state != ThreadUnspecified) - m_Threads[tid].State = state; + State = state; } diff --git a/lib/base/threadpool.h b/lib/base/threadpool.h index 5f7650dcd..848d05a5f 100644 --- a/lib/base/threadpool.h +++ b/lib/base/threadpool.h @@ -40,11 +40,12 @@ class I2_BASE_API ThreadPool public: typedef boost::function WorkFunction; - ThreadPool(void); + ThreadPool(int max_threads = -1); ~ThreadPool(void); + void Start(void); void Stop(void); - void Join(void); + void Join(bool wait_for_stop = false); bool Post(const WorkFunction& callback); @@ -57,6 +58,14 @@ private: ThreadBusy }; + struct WorkItem + { + WorkFunction Callback; + double Timestamp; + }; + + struct Queue; + struct WorkerThread { ThreadState State; @@ -68,43 +77,51 @@ private: WorkerThread(ThreadState state = ThreadDead) : State(state), Zombie(false), Utilization(0), LastUpdate(0), Thread(NULL) { } + + void UpdateUtilization(ThreadState state = ThreadUnspecified); + + void ThreadProc(Queue& queue); + }; + + struct Queue + { + boost::mutex Mutex; + boost::condition_variable CV; + boost::condition_variable CVStarved; + + std::deque Items; + + double WaitTime; + double ServiceTime; + int TaskCount; + + bool Stopped; + + WorkerThread Threads[256]; + + Queue(void) + : WaitTime(0), ServiceTime(0), TaskCount(0), Stopped(false) + { } + + void SpawnWorker(boost::thread_group& group); + void KillWorker(boost::thread_group& group); }; int m_ID; static int m_NextID; + int m_MaxThreads; + boost::thread_group m_ThreadGroup; - WorkerThread m_Threads[4096]; - double m_WaitTime; - double m_ServiceTime; - int m_TaskCount; - - double m_MaxLatency; - - boost::mutex m_Mutex; - boost::condition_variable m_WorkCV; + boost::mutex m_MgmtMutex; boost::condition_variable m_MgmtCV; - bool m_Stopped; - struct WorkItem - { - WorkFunction Callback; - double Timestamp; - }; + Queue m_Queues[16]; - - std::deque m_WorkItems; - - void QueueThreadProc(int tid); void ManagerThreadProc(void); void StatsThreadProc(void); - - void SpawnWorker(void); - void KillWorker(void); - - void UpdateThreadUtilization(int tid, ThreadState state = ThreadUnspecified); }; } diff --git a/lib/base/utility.cpp b/lib/base/utility.cpp index 9ae1af459..f891ac8b2 100644 --- a/lib/base/utility.cpp +++ b/lib/base/utility.cpp @@ -40,6 +40,7 @@ using namespace icinga; boost::thread_specific_ptr Utility::m_ThreadName; +boost::thread_specific_ptr Utility::m_RandSeed; /** * Demangles a symbol name. @@ -704,12 +705,18 @@ String Utility::GetThreadName(void) return *name; } -unsigned long Utility::SDBM(const String& str) +unsigned long Utility::SDBM(const String& str, size_t len) { unsigned long hash = 0; + size_t current = 0; BOOST_FOREACH(char c, str) { + if (current >= len) + break; + hash = c + (hash << 6) + (hash << 16) - hash; + + current++; } return hash; @@ -739,10 +746,18 @@ int Utility::CompareVersion(const String& v1, const String& v2) int Utility::Random(void) { - static boost::mutex mtx; - boost::mutex::scoped_lock lock(mtx); - +#ifdef _WIN32 return rand(); +#else /* _WIN32 */ + unsigned int *seed = m_RandSeed.get(); + + if (!seed) { + seed = new unsigned int(Utility::GetTime()); + m_RandSeed.reset(seed); + } + + return rand_r(seed); +#endif /* _WIN32 */ } tm Utility::LocalTime(time_t ts) diff --git a/lib/base/utility.h b/lib/base/utility.h index 07e56edec..8ad1b49a3 100644 --- a/lib/base/utility.h +++ b/lib/base/utility.h @@ -102,7 +102,7 @@ public: static void SetThreadName(const String& name, bool os = true); static String GetThreadName(void); - static unsigned long SDBM(const String& str); + static unsigned long SDBM(const String& str, size_t len = String::NPos); static int CompareVersion(const String& v1, const String& v2); @@ -114,6 +114,7 @@ private: Utility(void); static boost::thread_specific_ptr m_ThreadName; + static boost::thread_specific_ptr m_RandSeed; }; } diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 1a9a0a6b2..4e5ef9f79 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -23,14 +23,15 @@ #include "base/logger_fwd.h" #include "base/convert.h" #include +#include using namespace icinga; int WorkQueue::m_NextID = 1; WorkQueue::WorkQueue(size_t maxItems) - : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), - m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) + : m_ID(m_NextID++), m_MaxItems(maxItems), m_Stopped(false), + m_Processing(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) { m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); @@ -42,27 +43,31 @@ WorkQueue::WorkQueue(size_t maxItems) WorkQueue::~WorkQueue(void) { - Join(); - - ASSERT(m_Stopped); + Join(true); } /** * Enqueues a work item. Work items are guaranteed to be executed in the order - * they were enqueued in. + * they were enqueued in except when allowInterleaved is true in which case + * the new work item might be run immediately if it's being enqueued from + * within the WorkQueue thread. */ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved) { + bool wq_thread = (boost::this_thread::get_id() == GetThreadId()); + + if (wq_thread && allowInterleaved) { + callback(); + + return; + } + WorkItem item; item.Callback = callback; item.AllowInterleaved = allowInterleaved; - bool wq_thread = (boost::this_thread::get_id() == GetThreadId()); - boost::mutex::scoped_lock lock(m_Mutex); - ASSERT(!m_Stopped); - if (!wq_thread) { while (m_Items.size() >= m_MaxItems) m_CVFull.wait(lock); @@ -70,20 +75,24 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved) m_Items.push_back(item); - if (wq_thread) - ProcessItems(lock, true); - else + if (m_Items.size() == 1) m_CVEmpty.notify_all(); } -void WorkQueue::Join(void) +void WorkQueue::Join(bool stop) { boost::mutex::scoped_lock lock(m_Mutex); - m_Joined = true; - m_CVEmpty.notify_all(); - while (!m_Stopped) - m_CVFull.wait(lock); + while (m_Processing || !m_Items.empty()) + m_CVStarved.wait(lock); + + if (stop) { + m_Stopped = true; + m_CVEmpty.notify_all(); + lock.unlock(); + + m_Thread.join(); + } } boost::thread::id WorkQueue::GetThreadId(void) const @@ -110,53 +119,73 @@ void WorkQueue::StatusTimerHandler(void) Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size())); } -void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved) -{ - while (!m_Items.empty()) { - WorkItem wi = m_Items.front(); - - if (interleaved && !wi.AllowInterleaved) - return; - - m_Items.pop_front(); - m_CVFull.notify_one(); - - lock.unlock(); - - try { - wi.Callback(); - } catch (const std::exception& ex) { - lock.lock(); - - ExceptionCallback callback = m_ExceptionCallback; - - lock.unlock(); - - callback(boost::current_exception()); - } - - lock.lock(); - } -} - void WorkQueue::WorkerThreadProc(void) { - boost::mutex::scoped_lock lock(m_Mutex); - std::ostringstream idbuf; idbuf << "WQ #" << m_ID; Utility::SetThreadName(idbuf.str()); + boost::mutex::scoped_lock lock(m_Mutex); + for (;;) { - while (m_Items.empty() && !m_Joined) + while (m_Items.empty() && !m_Stopped) m_CVEmpty.wait(lock); - if (m_Joined) + if (m_Stopped) break; - ProcessItems(lock, false); - } + std::deque items; + m_Items.swap(items); - m_Stopped = true; - m_CVFull.notify_all(); + if (items.size() >= m_MaxItems) + m_CVFull.notify_all(); + + m_Processing = true; + + lock.unlock(); + + BOOST_FOREACH(WorkItem& wi, items) { + try { + wi.Callback(); + } + catch (const std::exception&) { + lock.lock(); + + ExceptionCallback callback = m_ExceptionCallback; + + lock.unlock(); + + callback(boost::current_exception()); + } + } + + lock.lock(); + + m_Processing = false; + + m_CVStarved.notify_all(); + } +} + +ParallelWorkQueue::ParallelWorkQueue(void) + : m_QueueCount(boost::thread::hardware_concurrency()), + m_Queues(new WorkQueue[m_QueueCount]), + m_Index(0) +{ } + +ParallelWorkQueue::~ParallelWorkQueue(void) +{ + delete[] m_Queues; +} + +void ParallelWorkQueue::Enqueue(const boost::function& callback) +{ + m_Index++; + m_Queues[m_Index % m_QueueCount].Enqueue(callback); +} + +void ParallelWorkQueue::Join(void) +{ + for (unsigned int i = 0; i < m_QueueCount; i++) + m_Queues[i].Join(); } diff --git a/lib/base/workqueue.h b/lib/base/workqueue.h index 56b2c8df6..10015ff5a 100644 --- a/lib/base/workqueue.h +++ b/lib/base/workqueue.h @@ -36,7 +36,6 @@ typedef boost::function WorkCallback; struct WorkItem { - WorkCallback Callback; bool AllowInterleaved; }; @@ -55,7 +54,7 @@ public: ~WorkQueue(void); void Enqueue(const WorkCallback& callback, bool allowInterleaved = false); - void Join(void); + void Join(bool stop = false); boost::thread::id GetThreadId(void) const; @@ -68,21 +67,36 @@ private: boost::mutex m_Mutex; boost::condition_variable m_CVEmpty; boost::condition_variable m_CVFull; + boost::condition_variable m_CVStarved; boost::thread m_Thread; size_t m_MaxItems; - bool m_Joined; bool m_Stopped; + bool m_Processing; std::deque m_Items; ExceptionCallback m_ExceptionCallback; Timer::Ptr m_StatusTimer; - void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved); void WorkerThreadProc(void); void StatusTimerHandler(void); static void DefaultExceptionCallback(boost::exception_ptr exp); }; +class I2_BASE_API ParallelWorkQueue +{ +public: + ParallelWorkQueue(void); + ~ParallelWorkQueue(void); + + void Enqueue(const boost::function& callback); + void Join(void); + +private: + unsigned int m_QueueCount; + WorkQueue *m_Queues; + unsigned int m_Index; +}; + } #endif /* WORKQUEUE_H */ diff --git a/lib/config/configitem.cpp b/lib/config/configitem.cpp index 26720749d..6d2a86561 100644 --- a/lib/config/configitem.cpp +++ b/lib/config/configitem.cpp @@ -24,6 +24,7 @@ #include "base/objectlock.h" #include "base/logger_fwd.h" #include "base/debug.h" +#include "base/workqueue.h" #include #include @@ -103,7 +104,7 @@ ExpressionList::Ptr ConfigItem::GetExpressionList(void) const void ConfigItem::Link(void) { - ObjectLock olock(this); + ASSERT(OwnsLock()); if (m_LinkedExpressionList) return; @@ -131,6 +132,8 @@ void ConfigItem::Link(void) ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void) { + ASSERT(OwnsLock()); + if (!m_LinkedExpressionList) Link(); @@ -139,6 +142,10 @@ ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void) Dictionary::Ptr ConfigItem::GetProperties(void) { + ASSERT(!OwnsLock()); + + ObjectLock olock(this); + if (!m_Properties) { m_Properties = make_shared(); GetLinkedExpressionList()->Execute(m_Properties); @@ -178,6 +185,8 @@ DynamicObject::Ptr ConfigItem::Commit(void) DynamicObject::Ptr dobj = dtype->CreateObject(properties); dobj->Register(); + m_Object = dobj; + return dobj; } @@ -186,13 +195,12 @@ DynamicObject::Ptr ConfigItem::Commit(void) */ void ConfigItem::Register(void) { - ASSERT(!OwnsLock()); + std::pair key = std::make_pair(m_Type, m_Name); + ConfigItem::Ptr self = GetSelf(); - { - ObjectLock olock(this); + boost::mutex::scoped_lock lock(m_Mutex); - m_Items[std::make_pair(m_Type, m_Name)] = GetSelf(); - } + m_Items[key] = self; } /** @@ -204,11 +212,14 @@ void ConfigItem::Register(void) */ ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name) { - boost::mutex::scoped_lock lock(m_Mutex); - + std::pair key = std::make_pair(type, name); ConfigItem::ItemMap::iterator it; - it = m_Items.find(std::make_pair(type, name)); + { + boost::mutex::scoped_lock lock(m_Mutex); + + it = m_Items.find(key); + } if (it != m_Items.end()) return it->second; @@ -216,6 +227,20 @@ ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name) return ConfigItem::Ptr(); } +bool ConfigItem::HasObject(const String& type, const String& name) +{ + std::pair key = std::make_pair(type, name); + ConfigItem::ItemMap::iterator it; + + { + boost::mutex::scoped_lock lock(m_Mutex); + + it = m_Items.find(key); + } + + return (it != m_Items.end()); +} + void ConfigItem::ValidateItem(void) { if (m_Validated) @@ -244,34 +269,49 @@ bool ConfigItem::ActivateItems(bool validateOnly) Log(LogInformation, "config", "Validating config items (step 1)..."); + ParallelWorkQueue upq; + BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - kv.second->ValidateItem(); + upq.Enqueue(boost::bind(&ConfigItem::ValidateItem, kv.second)); } + + upq.Join(); if (ConfigCompilerContext::GetInstance()->HasErrors()) return false; - Log(LogInformation, "config", "Activating config items"); - - std::vector objects; + Log(LogInformation, "config", "Comitting config items"); BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - DynamicObject::Ptr object = kv.second->Commit(); + upq.Enqueue(boost::bind(&ConfigItem::Commit, kv.second)); + } + + upq.Join(); + + std::vector objects; + BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { + DynamicObject::Ptr object = kv.second->m_Object; if (object) objects.push_back(object); } - + + Log(LogInformation, "config", "Triggering OnConfigLoaded signal for config items"); + BOOST_FOREACH(const DynamicObject::Ptr& object, objects) { - object->OnConfigLoaded(); + upq.Enqueue(boost::bind(&DynamicObject::OnConfigLoaded, object)); } + + upq.Join(); Log(LogInformation, "config", "Validating config items (step 2)..."); BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) { - kv.second->ValidateItem(); + upq.Enqueue(boost::bind(&ConfigItem::ValidateItem, kv.second)); } + upq.Join(); + if (ConfigCompilerContext::GetInstance()->HasErrors()) return false; @@ -281,6 +321,8 @@ bool ConfigItem::ActivateItems(bool validateOnly) /* restore the previous program state */ DynamicObject::RestoreObjects(Application::GetStatePath()); + Log(LogInformation, "config", "Triggering Start signal for config items"); + BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { if (object->IsActive()) @@ -289,16 +331,28 @@ bool ConfigItem::ActivateItems(bool validateOnly) #ifdef _DEBUG Log(LogDebug, "config", "Activating object '" + object->GetName() + "' of type '" + object->GetType()->GetName() + "'"); #endif /* _DEBUG */ - object->Start(); - + upq.Enqueue(boost::bind(&DynamicObject::Start, object)); + } + } + + upq.Join(); + +#ifdef _DEBUG + BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { + BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { ASSERT(object->IsActive()); } } +#endif /* _DEBUG */ + + Log(LogInformation, "config", "Activated all objects."); return true; } void ConfigItem::DiscardItems(void) { + boost::mutex::scoped_lock lock(m_Mutex); + m_Items.clear(); } diff --git a/lib/config/configitem.h b/lib/config/configitem.h index 98a65fdf2..bd69daff4 100644 --- a/lib/config/configitem.h +++ b/lib/config/configitem.h @@ -47,7 +47,6 @@ public: std::vector GetParents(void) const; - void Link(void); ExpressionList::Ptr GetLinkedExpressionList(void); Dictionary::Ptr GetProperties(void); @@ -58,13 +57,15 @@ public: static ConfigItem::Ptr GetObject(const String& type, const String& name); + static bool HasObject(const String& type, const String& name); void ValidateItem(void); - + static bool ActivateItems(bool validateOnly); static void DiscardItems(void); private: + void Link(void); ExpressionList::Ptr GetExpressionList(void) const; String m_Type; /**< The object type. */ @@ -79,6 +80,8 @@ private: ExpressionList::Ptr m_LinkedExpressionList; Dictionary::Ptr m_Properties; + + DynamicObject::Ptr m_Object; static boost::mutex m_Mutex; diff --git a/lib/config/typerule.cpp b/lib/config/typerule.cpp index 0cb58cb97..6f45ef8a1 100644 --- a/lib/config/typerule.cpp +++ b/lib/config/typerule.cpp @@ -45,8 +45,6 @@ bool TypeRule::MatchName(const String& name) const bool TypeRule::MatchValue(const Value& value, String *hint) const { - ConfigItem::Ptr item; - if (value.IsEmpty()) return true; @@ -78,9 +76,7 @@ bool TypeRule::MatchValue(const Value& value, String *hint) const if (!value.IsScalar()) return false; - item = ConfigItem::GetObject(m_NameType, value); - - if (!item) { + if (!ConfigItem::HasObject(m_NameType, value)) { *hint = "Object '" + value + "' of type '" + m_NameType + "' does not exist."; return false; } diff --git a/lib/icinga/host.cpp b/lib/icinga/host.cpp index f38eea9c7..adf476771 100644 --- a/lib/icinga/host.cpp +++ b/lib/icinga/host.cpp @@ -138,17 +138,13 @@ void Host::UpdateSlaveServices(void) { ASSERT(!OwnsLock()); - ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName()); - - /* Don't create slave services unless we own this object */ - if (!item) - return; - Dictionary::Ptr service_descriptions = GetServiceDescriptions(); - if (!service_descriptions) + if (!service_descriptions ||service_descriptions->GetLength() == 0) return; + ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName()); + ObjectLock olock(service_descriptions); BOOST_FOREACH(const Dictionary::Pair& kv, service_descriptions) { std::ostringstream namebuf; diff --git a/lib/icinga/service-downtime.cpp b/lib/icinga/service-downtime.cpp index 9596c56a5..793452799 100644 --- a/lib/icinga/service-downtime.cpp +++ b/lib/icinga/service-downtime.cpp @@ -221,12 +221,10 @@ Downtime::Ptr Service::GetDowntimeByID(const String& id) void Service::StartDowntimesExpiredTimer(void) { - if (!l_DowntimesExpireTimer) { - l_DowntimesExpireTimer = make_shared(); - l_DowntimesExpireTimer->SetInterval(60); - l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler)); - l_DowntimesExpireTimer->Start(); - } + l_DowntimesExpireTimer = make_shared(); + l_DowntimesExpireTimer->SetInterval(60); + l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler)); + l_DowntimesExpireTimer->Start(); } void Service::AddDowntimesToCache(void) @@ -318,18 +316,14 @@ int Service::GetDowntimeDepth(void) const void Service::UpdateSlaveScheduledDowntimes(void) { - ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName()); - - /* Don't create slave scheduled downtimes unless we own this object */ - if (!item) - return; - /* Service scheduled downtime descs */ Dictionary::Ptr descs = GetScheduledDowntimeDescriptions(); - if (!descs) + if (!descs || descs->GetLength() == 0) return; + ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName()); + ObjectLock olock(descs); BOOST_FOREACH(const Dictionary::Pair& kv, descs) { diff --git a/lib/icinga/service-notification.cpp b/lib/icinga/service-notification.cpp index ac176fa75..b4808659c 100644 --- a/lib/icinga/service-notification.cpp +++ b/lib/icinga/service-notification.cpp @@ -95,18 +95,14 @@ void Service::RemoveNotification(const Notification::Ptr& notification) void Service::UpdateSlaveNotifications(void) { - ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName()); - - /* Don't create slave notifications unless we own this object */ - if (!item) - return; - /* Service notification descs */ Dictionary::Ptr descs = GetNotificationDescriptions(); - if (!descs) + if (!descs || descs->GetLength() == 0) return; + ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName()); + ObjectLock olock(descs); BOOST_FOREACH(const Dictionary::Pair& kv, descs) { diff --git a/lib/icinga/service.cpp b/lib/icinga/service.cpp index 482892758..be50f34f9 100644 --- a/lib/icinga/service.cpp +++ b/lib/icinga/service.cpp @@ -28,6 +28,7 @@ #include "base/objectlock.h" #include "base/convert.h" #include "base/utility.h" +#include "base/initialize.h" #include #include @@ -38,16 +39,14 @@ REGISTER_TYPE(Service); boost::signals2::signal Service::OnAcknowledgementSet; boost::signals2::signal Service::OnAcknowledgementCleared; +INITIALIZE_ONCE(&Service::StartDowntimesExpiredTimer); + Service::Service(void) : m_CheckRunning(false) { } void Service::Start(void) { - VERIFY(GetHost()); - - StartDowntimesExpiredTimer(); - double now = Utility::GetTime(); if (GetNextCheck() < now + 300) diff --git a/lib/icinga/service.h b/lib/icinga/service.h index c6cd6b8c9..883368751 100644 --- a/lib/icinga/service.h +++ b/lib/icinga/service.h @@ -204,7 +204,7 @@ public: static Service::Ptr GetOwnerByDowntimeID(const String& id); static Downtime::Ptr GetDowntimeByID(const String& id); - void StartDowntimesExpiredTimer(void); + static void StartDowntimesExpiredTimer(void); bool IsInDowntime(void) const; bool IsAcknowledged(void); diff --git a/tools/mkclass/classcompiler.cpp b/tools/mkclass/classcompiler.cpp index 28892309e..561ac74c0 100644 --- a/tools/mkclass/classcompiler.cpp +++ b/tools/mkclass/classcompiler.cpp @@ -21,6 +21,8 @@ #include #include #include +#include +#include using namespace icinga; @@ -77,6 +79,27 @@ void ClassCompiler::HandleCode(const std::string& code, const ClassDebugInfo& lo std::cout << code << std::endl; } +unsigned long ClassCompiler::SDBM(const std::string& str, size_t len = std::string::npos) +{ + unsigned long hash = 0; + size_t current = 0; + + std::string::const_iterator it; + + for (it = str.begin(); it != str.end(); it++) { + if (current >= len) + break; + + char c = *it; + + hash = c + (hash << 6) + (hash << 16) - hash; + + current++; + } + + return hash; +} + void ClassCompiler::HandleClass(const Klass& klass, const ClassDebugInfo& locp) { std::vector::const_iterator it; @@ -136,11 +159,47 @@ void ClassCompiler::HandleClass(const Klass& klass, const ClassDebugInfo& locp) std::cout << ";" << std::endl << std::endl; - int num = 0; - for (it = klass.Fields.begin(); it != klass.Fields.end(); it++) { - std::cout << "\t\t" << "if (name == \"" << it->Name << "\")" << std::endl - << "\t\t\t" << "return offset + " << num << ";" << std::endl; - num++; + std::map > > jumptable; + + int hlen = 0, collisions = 0; + + do { + int num = 0; + + hlen++; + jumptable.clear(); + collisions = 0; + + for (it = klass.Fields.begin(); it != klass.Fields.end(); it++) { + int hash = static_cast(SDBM(it->Name, hlen)); + jumptable[hash].push_back(std::make_pair(num, it->Name)); + num++; + + if (jumptable[hash].size() > 1) + collisions++; + } + } while (collisions >= 5 && hlen < 8); + + if (!klass.Fields.empty()) { + std::cout << "\t\tswitch (static_cast(Utility::SDBM(name, " << hlen << "))) {" << std::endl; + + std::map > >::const_iterator itj; + + for (itj = jumptable.begin(); itj != jumptable.end(); itj++) { + std::cout << "\t\t\tcase " << itj->first << ":" << std::endl; + + std::vector >::const_iterator itf; + + for (itf = itj->second.begin(); itf != itj->second.end(); itf++) { + std::cout << "\t\t\t\t" << "if (name == \"" << itf->second << "\")" << std::endl + << "\t\t\t\t\t" << "return offset + " << itf->first << ";" << std::endl; + } + + std::cout << std::endl + << "\t\t\t\tbreak;" << std::endl; + } + + std::cout << "\t\t}" << std::endl; } std::cout << std::endl @@ -178,7 +237,7 @@ void ClassCompiler::HandleClass(const Klass& klass, const ClassDebugInfo& locp) std::cout << ") {" << std::endl; - num = 0; + size_t num = 0; for (it = klass.Fields.begin(); it != klass.Fields.end(); it++) { std::cout << "\t\t\t" << "case " << num << ":" << std::endl << "\t\t\t\t" << "return Field(" << num << ", \"" << it->Name << "\", " << it->Attributes << ");" << std::endl; @@ -259,7 +318,7 @@ void ClassCompiler::HandleClass(const Klass& klass, const ClassDebugInfo& locp) std::cout << ") {" << std::endl; - num = 0; + size_t num = 0; for (it = klass.Fields.begin(); it != klass.Fields.end(); it++) { std::cout << "\t\t\t" << "case " << num << ":" << std::endl << "\t\t\t\t" << "Set" << it->GetFriendlyName() << "("; @@ -417,7 +476,8 @@ void ClassCompiler::CompileStream(const std::string& path, std::istream *stream) << "#include \"base/debug.h\"" << std::endl << "#include \"base/value.h\"" << std::endl << "#include \"base/array.h\"" << std::endl - << "#include \"base/dictionary.h\"" << std::endl << std::endl + << "#include \"base/dictionary.h\"" << std::endl + << "#include \"base/utility.h\"" << std::endl << std::endl << "#ifdef _MSC_VER" << std::endl << "#pragma warning( push )" << std::endl << "#pragma warning( disable : 4244 )" << std::endl diff --git a/tools/mkclass/classcompiler.h b/tools/mkclass/classcompiler.h index c6298d9bc..386d21244 100644 --- a/tools/mkclass/classcompiler.h +++ b/tools/mkclass/classcompiler.h @@ -142,6 +142,8 @@ private: std::string m_Path; std::istream *m_Input; void *m_Scanner; + + static unsigned long SDBM(const std::string& str, size_t len); }; }