Improve WorkQueue performance

Refs #5327
This commit is contained in:
Gunnar Beutner 2013-12-12 06:30:11 +01:00 committed by Gunnar Beutner
parent b4f2f06b88
commit f8d7f7799e
9 changed files with 92 additions and 96 deletions

View File

@ -23,6 +23,7 @@
#include "base/logger_fwd.h"
#include "base/convert.h"
#include <boost/bind.hpp>
#include <boost/foreach.hpp>
using namespace icinga;
@ -30,7 +31,7 @@ int WorkQueue::m_NextID = 1;
WorkQueue::WorkQueue(size_t maxItems)
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Stopped(false),
m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
m_Processing(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
{
m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
@ -47,16 +48,24 @@ WorkQueue::~WorkQueue(void)
/**
* Enqueues a work item. Work items are guaranteed to be executed in the order
* they were enqueued in.
* they were enqueued in except when allowInterleaved is true in which case
* the new work item might be run immediately if it's being enqueued from
* within the WorkQueue thread.
*/
void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
{
bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
if (wq_thread && allowInterleaved) {
callback();
return;
}
WorkItem item;
item.Callback = callback;
item.AllowInterleaved = allowInterleaved;
bool wq_thread = (boost::this_thread::get_id() == GetThreadId());
boost::mutex::scoped_lock lock(m_Mutex);
if (!wq_thread) {
@ -66,9 +75,7 @@ void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
m_Items.push_back(item);
if (wq_thread)
ProcessItems(lock, true);
else if (m_Items.size() == 1)
if (m_Items.size() == 1)
m_CVEmpty.notify_all();
}
@ -76,7 +83,7 @@ void WorkQueue::Join(bool stop)
{
boost::mutex::scoped_lock lock(m_Mutex);
while (!m_Items.empty())
while (m_Processing || !m_Items.empty())
m_CVStarved.wait(lock);
if (stop) {
@ -112,47 +119,14 @@ void WorkQueue::StatusTimerHandler(void)
Log(LogInformation, "base", "WQ #" + Convert::ToString(m_ID) + " items: " + Convert::ToString(m_Items.size()));
}
void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
{
while (!m_Items.empty()) {
WorkItem wi = m_Items.front();
if (interleaved && !wi.AllowInterleaved)
return;
lock.unlock();
try {
wi.Callback();
} catch (const std::exception&) {
lock.lock();
ExceptionCallback callback = m_ExceptionCallback;
lock.unlock();
callback(boost::current_exception());
}
lock.lock();
m_Items.pop_front();
if (m_Items.size() + 1 == m_MaxItems)
m_CVFull.notify_one();
}
m_CVStarved.notify_all();
}
void WorkQueue::WorkerThreadProc(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
std::ostringstream idbuf;
idbuf << "WQ #" << m_ID;
Utility::SetThreadName(idbuf.str());
boost::mutex::scoped_lock lock(m_Mutex);
for (;;) {
while (m_Items.empty() && !m_Stopped)
m_CVEmpty.wait(lock);
@ -160,11 +134,39 @@ void WorkQueue::WorkerThreadProc(void)
if (m_Stopped)
break;
ProcessItems(lock, false);
std::deque<WorkItem> items;
m_Items.swap(items);
if (items.size() >= m_MaxItems)
m_CVFull.notify_all();
m_Processing = true;
lock.unlock();
BOOST_FOREACH(WorkItem& wi, items) {
try {
wi.Callback();
}
catch (const std::exception&) {
lock.lock();
ExceptionCallback callback = m_ExceptionCallback;
lock.unlock();
callback(boost::current_exception());
}
}
lock.lock();
m_Processing = false;
m_CVStarved.notify_all();
}
}
ParallelWorkQueue::ParallelWorkQueue(void)
: m_QueueCount(boost::thread::hardware_concurrency()),
m_Queues(new WorkQueue[m_QueueCount]),

View File

@ -36,7 +36,6 @@ typedef boost::function<void (void)> WorkCallback;
struct WorkItem
{
WorkCallback Callback;
bool AllowInterleaved;
};
@ -72,11 +71,11 @@ private:
boost::thread m_Thread;
size_t m_MaxItems;
bool m_Stopped;
bool m_Processing;
std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback;
Timer::Ptr m_StatusTimer;
void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
void WorkerThreadProc(void);
void StatusTimerHandler(void);

View File

@ -104,7 +104,7 @@ ExpressionList::Ptr ConfigItem::GetExpressionList(void) const
void ConfigItem::Link(void)
{
ObjectLock olock(this);
ASSERT(OwnsLock());
if (m_LinkedExpressionList)
return;
@ -132,6 +132,8 @@ void ConfigItem::Link(void)
ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void)
{
ASSERT(OwnsLock());
if (!m_LinkedExpressionList)
Link();
@ -140,6 +142,10 @@ ExpressionList::Ptr ConfigItem::GetLinkedExpressionList(void)
Dictionary::Ptr ConfigItem::GetProperties(void)
{
ASSERT(!OwnsLock());
ObjectLock olock(this);
if (!m_Properties) {
m_Properties = make_shared<Dictionary>();
GetLinkedExpressionList()->Execute(m_Properties);
@ -184,19 +190,17 @@ DynamicObject::Ptr ConfigItem::Commit(void)
return dobj;
}
DynamicObject::Ptr ConfigItem::GetObject(void) const
{
return m_Object;
}
/**
* Registers the configuration item.
*/
void ConfigItem::Register(void)
{
std::pair<String, String> key = std::make_pair(m_Type, m_Name);
ConfigItem::Ptr self = GetSelf();
boost::mutex::scoped_lock lock(m_Mutex);
m_Items[std::make_pair(m_Type, m_Name)] = GetSelf();
m_Items[key] = self;
}
/**
@ -208,11 +212,14 @@ void ConfigItem::Register(void)
*/
ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
{
boost::mutex::scoped_lock lock(m_Mutex);
std::pair<String, String> key = std::make_pair(type, name);
ConfigItem::ItemMap::iterator it;
it = m_Items.find(std::make_pair(type, name));
{
boost::mutex::scoped_lock lock(m_Mutex);
it = m_Items.find(key);
}
if (it != m_Items.end())
return it->second;
@ -222,11 +229,14 @@ ConfigItem::Ptr ConfigItem::GetObject(const String& type, const String& name)
bool ConfigItem::HasObject(const String& type, const String& name)
{
boost::mutex::scoped_lock lock(m_Mutex);
std::pair<String, String> key = std::make_pair(type, name);
ConfigItem::ItemMap::iterator it;
it = m_Items.find(std::make_pair(type, name));
{
boost::mutex::scoped_lock lock(m_Mutex);
it = m_Items.find(key);
}
return (it != m_Items.end());
}
@ -280,7 +290,7 @@ bool ConfigItem::ActivateItems(bool validateOnly)
std::vector<DynamicObject::Ptr> objects;
BOOST_FOREACH(const ItemMap::value_type& kv, m_Items) {
DynamicObject::Ptr object = kv.second->GetObject();
DynamicObject::Ptr object = kv.second->m_Object;
if (object)
objects.push_back(object);
@ -342,5 +352,7 @@ bool ConfigItem::ActivateItems(bool validateOnly)
void ConfigItem::DiscardItems(void)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_Items.clear();
}

View File

@ -47,7 +47,6 @@ public:
std::vector<ConfigItem::Ptr> GetParents(void) const;
void Link(void);
ExpressionList::Ptr GetLinkedExpressionList(void);
Dictionary::Ptr GetProperties(void);
@ -61,13 +60,12 @@ public:
static bool HasObject(const String& type, const String& name);
void ValidateItem(void);
DynamicObject::Ptr GetObject(void) const;
static bool ActivateItems(bool validateOnly);
static void DiscardItems(void);
private:
void Link(void);
ExpressionList::Ptr GetExpressionList(void) const;
String m_Type; /**< The object type. */

View File

@ -138,17 +138,13 @@ void Host::UpdateSlaveServices(void)
{
ASSERT(!OwnsLock());
ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName());
/* Don't create slave services unless we own this object */
if (!item)
return;
Dictionary::Ptr service_descriptions = GetServiceDescriptions();
if (!service_descriptions)
if (!service_descriptions ||service_descriptions->GetLength() == 0)
return;
ConfigItem::Ptr item = ConfigItem::GetObject("Host", GetName());
ObjectLock olock(service_descriptions);
BOOST_FOREACH(const Dictionary::Pair& kv, service_descriptions) {
std::ostringstream namebuf;

View File

@ -221,12 +221,10 @@ Downtime::Ptr Service::GetDowntimeByID(const String& id)
void Service::StartDowntimesExpiredTimer(void)
{
if (!l_DowntimesExpireTimer) {
l_DowntimesExpireTimer = make_shared<Timer>();
l_DowntimesExpireTimer->SetInterval(60);
l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler));
l_DowntimesExpireTimer->Start();
}
l_DowntimesExpireTimer = make_shared<Timer>();
l_DowntimesExpireTimer->SetInterval(60);
l_DowntimesExpireTimer->OnTimerExpired.connect(boost::bind(&Service::DowntimesExpireTimerHandler));
l_DowntimesExpireTimer->Start();
}
void Service::AddDowntimesToCache(void)
@ -318,18 +316,14 @@ int Service::GetDowntimeDepth(void) const
void Service::UpdateSlaveScheduledDowntimes(void)
{
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
/* Don't create slave scheduled downtimes unless we own this object */
if (!item)
return;
/* Service scheduled downtime descs */
Dictionary::Ptr descs = GetScheduledDowntimeDescriptions();
if (!descs)
if (!descs || descs->GetLength() == 0)
return;
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
ObjectLock olock(descs);
BOOST_FOREACH(const Dictionary::Pair& kv, descs) {

View File

@ -95,18 +95,14 @@ void Service::RemoveNotification(const Notification::Ptr& notification)
void Service::UpdateSlaveNotifications(void)
{
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
/* Don't create slave notifications unless we own this object */
if (!item)
return;
/* Service notification descs */
Dictionary::Ptr descs = GetNotificationDescriptions();
if (!descs)
if (!descs || descs->GetLength() == 0)
return;
ConfigItem::Ptr item = ConfigItem::GetObject("Service", GetName());
ObjectLock olock(descs);
BOOST_FOREACH(const Dictionary::Pair& kv, descs) {

View File

@ -28,6 +28,7 @@
#include "base/objectlock.h"
#include "base/convert.h"
#include "base/utility.h"
#include "base/initialize.h"
#include <boost/foreach.hpp>
#include <boost/bind/apply.hpp>
@ -38,16 +39,14 @@ REGISTER_TYPE(Service);
boost::signals2::signal<void (const Service::Ptr&, const String&, const String&, AcknowledgementType, double, const String&)> Service::OnAcknowledgementSet;
boost::signals2::signal<void (const Service::Ptr&, const String&)> Service::OnAcknowledgementCleared;
INITIALIZE_ONCE(&Service::StartDowntimesExpiredTimer);
Service::Service(void)
: m_CheckRunning(false)
{ }
void Service::Start(void)
{
VERIFY(GetHost());
StartDowntimesExpiredTimer();
double now = Utility::GetTime();
if (GetNextCheck() < now + 300)

View File

@ -204,7 +204,7 @@ public:
static Service::Ptr GetOwnerByDowntimeID(const String& id);
static Downtime::Ptr GetDowntimeByID(const String& id);
void StartDowntimesExpiredTimer(void);
static void StartDowntimesExpiredTimer(void);
bool IsInDowntime(void) const;
bool IsAcknowledged(void);