Implement support for priorities in the WorkQueue class

fixes #8714
This commit is contained in:
Gunnar Beutner 2015-12-10 16:54:43 +01:00
parent dc85b1c6fb
commit 15ca9987fa
8 changed files with 67 additions and 31 deletions

View File

@ -415,8 +415,6 @@ void ConfigObject::Deactivate(bool runtimeRemoved)
{ {
CONTEXT("Deactivating object '" + GetName() + "' of type '" + GetType()->GetName() + "'"); CONTEXT("Deactivating object '" + GetName() + "' of type '" + GetType()->GetName() + "'");
SetAuthority(false);
{ {
ObjectLock olock(this); ObjectLock olock(this);
@ -426,6 +424,8 @@ void ConfigObject::Deactivate(bool runtimeRemoved)
SetActive(false, true); SetActive(false, true);
} }
SetAuthority(false);
Stop(runtimeRemoved); Stop(runtimeRemoved);
ASSERT(GetStopCalled()); ASSERT(GetStopCalled());
@ -471,10 +471,10 @@ void ConfigObject::SetAuthority(bool authority)
ASSERT(GetResumeCalled()); ASSERT(GetResumeCalled());
SetPaused(false); SetPaused(false);
} else if (!authority && !GetPaused()) { } else if (!authority && !GetPaused()) {
SetPaused(true);
SetPauseCalled(false); SetPauseCalled(false);
Pause(); Pause();
ASSERT(GetPauseCalled()); ASSERT(GetPauseCalled());
SetPaused(true);
} }
} }

View File

@ -55,12 +55,13 @@ WorkQueue::~WorkQueue(void)
* allowInterleaved is true in which case the new task might be run * allowInterleaved is true in which case the new task might be run
* immediately if it's being enqueued from within the WorkQueue thread. * immediately if it's being enqueued from within the WorkQueue thread.
*/ */
void WorkQueue::Enqueue(const Task& task, bool allowInterleaved) void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueuePriority priority,
bool allowInterleaved)
{ {
bool wq_thread = IsWorkerThread(); bool wq_thread = IsWorkerThread();
if (wq_thread && allowInterleaved) { if (wq_thread && allowInterleaved) {
task(); function();
return; return;
} }
@ -80,7 +81,7 @@ void WorkQueue::Enqueue(const Task& task, bool allowInterleaved)
m_CVFull.wait(lock); m_CVFull.wait(lock);
} }
m_Tasks.push_back(task); m_Tasks.push(Task(function, priority));
m_CVEmpty.notify_one(); m_CVEmpty.notify_one();
} }
@ -200,15 +201,15 @@ void WorkQueue::WorkerThreadProc(void)
if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0) if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0)
m_CVFull.notify_all(); m_CVFull.notify_all();
Task task = m_Tasks.front(); Task task = m_Tasks.top();
m_Tasks.pop_front(); m_Tasks.pop();
m_Processing++; m_Processing++;
lock.unlock(); lock.unlock();
try { try {
task(); task.Function();
} catch (const std::exception&) { } catch (const std::exception&) {
lock.lock(); lock.lock();

View File

@ -27,12 +27,37 @@
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp> #include <boost/thread/condition_variable.hpp>
#include <boost/exception_ptr.hpp> #include <boost/exception_ptr.hpp>
#include <queue>
#include <deque> #include <deque>
namespace icinga namespace icinga
{ {
typedef boost::function<void (void)> Task; enum WorkQueuePriority
{
PriorityLow,
PriorityNormal,
PriorityHigh
};
struct Task
{
Task(void)
: Priority(PriorityNormal)
{ }
Task(const boost::function<void (void)>& function, WorkQueuePriority priority)
: Function(function), Priority(priority)
{ }
boost::function<void (void)> Function;
WorkQueuePriority Priority;
};
inline bool operator<(const Task& a, const Task& b)
{
return a.Priority < b.Priority;
}
/** /**
* A workqueue. * A workqueue.
@ -47,7 +72,8 @@ public:
WorkQueue(size_t maxItems = 0, int threadCount = 1); WorkQueue(size_t maxItems = 0, int threadCount = 1);
~WorkQueue(void); ~WorkQueue(void);
void Enqueue(const Task& task, bool allowInterleaved = false); void Enqueue(const boost::function<void (void)>& function, WorkQueuePriority priority = PriorityNormal,
bool allowInterleaved = false);
void Join(bool stop = false); void Join(bool stop = false);
bool IsWorkerThread(void) const; bool IsWorkerThread(void) const;
@ -74,7 +100,7 @@ private:
size_t m_MaxItems; size_t m_MaxItems;
bool m_Stopped; bool m_Stopped;
int m_Processing; int m_Processing;
std::deque<Task> m_Tasks; std::priority_queue<Task, std::deque<Task> > m_Tasks;
ExceptionCallback m_ExceptionCallback; ExceptionCallback m_ExceptionCallback;
std::vector<boost::exception_ptr> m_Exceptions; std::vector<boost::exception_ptr> m_Exceptions;
Timer::Ptr m_StatusTimer; Timer::Ptr m_StatusTimer;

View File

@ -97,6 +97,9 @@ void DbConnection::Pause(void)
query1.Fields = new Dictionary(); query1.Fields = new Dictionary();
query1.Fields->Set("instance_id", 0); /* DbConnection class fills in real ID */ query1.Fields->Set("instance_id", 0); /* DbConnection class fills in real ID */
query1.Fields->Set("program_end_time", DbValue::FromTimestamp(Utility::GetTime())); query1.Fields->Set("program_end_time", DbValue::FromTimestamp(Utility::GetTime()));
query1.Priority = PriorityHigh;
ExecuteQuery(query1); ExecuteQuery(query1);
NewTransaction(); NewTransaction();
@ -134,6 +137,7 @@ void DbConnection::ProgramStatusHandler(void)
query1.Category = DbCatProgramStatus; query1.Category = DbCatProgramStatus;
query1.WhereCriteria = new Dictionary(); query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
query1.Priority = PriorityHigh;
DbObject::OnQuery(query1); DbObject::OnQuery(query1);
DbQuery query2; DbQuery query2;
@ -160,6 +164,7 @@ void DbConnection::ProgramStatusHandler(void)
query2.Fields->Set("event_handlers_enabled", (IcingaApplication::GetInstance()->GetEnableEventHandlers() ? 1 : 0)); query2.Fields->Set("event_handlers_enabled", (IcingaApplication::GetInstance()->GetEnableEventHandlers() ? 1 : 0));
query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0)); query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0));
query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0)); query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0));
query2.Priority = PriorityHigh;
DbObject::OnQuery(query2); DbObject::OnQuery(query2);
DbQuery query3; DbQuery query3;

View File

@ -70,11 +70,12 @@ struct I2_DB_IDO_API DbQuery
intrusive_ptr<CustomVarObject> NotificationObject; intrusive_ptr<CustomVarObject> NotificationObject;
bool ConfigUpdate; bool ConfigUpdate;
bool StatusUpdate; bool StatusUpdate;
WorkQueuePriority Priority;
static void StaticInitialize(void); static void StaticInitialize(void);
DbQuery(void) DbQuery(void)
: Type(0), Category(DbCatInvalid), ConfigUpdate(false), StatusUpdate(false) : Type(0), Category(DbCatInvalid), ConfigUpdate(false), StatusUpdate(false), Priority(PriorityLow)
{ } { }
}; };

View File

@ -95,7 +95,7 @@ void IdoMysqlConnection::Pause(void)
DbConnection::Pause(); DbConnection::Pause();
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this), PriorityHigh);
m_QueryQueue.Join(); m_QueryQueue.Join();
} }
@ -138,8 +138,8 @@ void IdoMysqlConnection::TxTimerHandler(void)
void IdoMysqlConnection::NewTransaction(void) void IdoMysqlConnection::NewTransaction(void)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this), PriorityHigh);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true), PriorityHigh);
} }
void IdoMysqlConnection::InternalNewTransaction(void) void IdoMysqlConnection::InternalNewTransaction(void)
@ -155,13 +155,16 @@ void IdoMysqlConnection::InternalNewTransaction(void)
void IdoMysqlConnection::ReconnectTimerHandler(void) void IdoMysqlConnection::ReconnectTimerHandler(void)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this), PriorityLow);
} }
void IdoMysqlConnection::Reconnect(void) void IdoMysqlConnection::Reconnect(void)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
if (!IsActive())
return;
CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'"); CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'");
m_SessionToken = static_cast<int>(Utility::GetTime()); m_SessionToken = static_cast<int>(Utility::GetTime());
@ -406,7 +409,7 @@ void IdoMysqlConnection::AsyncQuery(const String& query, const boost::function<v
if (m_AsyncQueries.size() > 500) if (m_AsyncQueries.size() > 500)
FinishAsyncQueries(true); FinishAsyncQueries(true);
else else
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false), PriorityLow);
} }
void IdoMysqlConnection::FinishAsyncQueries(bool force) void IdoMysqlConnection::FinishAsyncQueries(bool force)
@ -625,7 +628,7 @@ void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj) void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
} }
void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
@ -659,7 +662,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
} }
void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
@ -753,7 +756,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
} }
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
@ -781,7 +784,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) { BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) {
if (!FieldToEscapedString(kv.first, kv.second, &value)) { if (!FieldToEscapedString(kv.first, kv.second, &value)) {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
return; return;
} }
@ -902,7 +905,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool
void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
} }
void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)

View File

@ -97,7 +97,7 @@ void IdoPgsqlConnection::Pause(void)
DbConnection::Pause(); DbConnection::Pause();
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this), PriorityHigh);
m_QueryQueue.Join(); m_QueryQueue.Join();
} }
@ -139,7 +139,7 @@ void IdoPgsqlConnection::TxTimerHandler(void)
void IdoPgsqlConnection::NewTransaction(void) void IdoPgsqlConnection::NewTransaction(void)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), true); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), PriorityHigh, true);
} }
void IdoPgsqlConnection::InternalNewTransaction(void) void IdoPgsqlConnection::InternalNewTransaction(void)
@ -155,7 +155,7 @@ void IdoPgsqlConnection::InternalNewTransaction(void)
void IdoPgsqlConnection::ReconnectTimerHandler(void) void IdoPgsqlConnection::ReconnectTimerHandler(void)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this), PriorityLow);
} }
void IdoPgsqlConnection::Reconnect(void) void IdoPgsqlConnection::Reconnect(void)
@ -503,7 +503,7 @@ Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int r
void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
} }
void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
@ -537,7 +537,7 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
} }
void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
@ -630,7 +630,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
} }
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
@ -781,7 +781,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
} }
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)

View File

@ -540,7 +540,7 @@ void ApiListener::ApiTimerHandler(void)
void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin, void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin,
const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log) const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log)
{ {
m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true); m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true);
} }
void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj) void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)