From 15ca9987faf5cc3fcb418a684d52909c1c128322 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 10 Dec 2015 16:54:43 +0100 Subject: [PATCH] Implement support for priorities in the WorkQueue class fixes #8714 --- lib/base/configobject.cpp | 6 ++--- lib/base/workqueue.cpp | 13 +++++----- lib/base/workqueue.hpp | 32 ++++++++++++++++++++++--- lib/db_ido/dbconnection.cpp | 5 ++++ lib/db_ido/dbquery.hpp | 3 ++- lib/db_ido_mysql/idomysqlconnection.cpp | 23 ++++++++++-------- lib/db_ido_pgsql/idopgsqlconnection.cpp | 14 +++++------ lib/remote/apilistener.cpp | 2 +- 8 files changed, 67 insertions(+), 31 deletions(-) diff --git a/lib/base/configobject.cpp b/lib/base/configobject.cpp index 00b4af1bb..fbf1d95fd 100644 --- a/lib/base/configobject.cpp +++ b/lib/base/configobject.cpp @@ -415,8 +415,6 @@ void ConfigObject::Deactivate(bool runtimeRemoved) { CONTEXT("Deactivating object '" + GetName() + "' of type '" + GetType()->GetName() + "'"); - SetAuthority(false); - { ObjectLock olock(this); @@ -426,6 +424,8 @@ void ConfigObject::Deactivate(bool runtimeRemoved) SetActive(false, true); } + SetAuthority(false); + Stop(runtimeRemoved); ASSERT(GetStopCalled()); @@ -471,10 +471,10 @@ void ConfigObject::SetAuthority(bool authority) ASSERT(GetResumeCalled()); SetPaused(false); } else if (!authority && !GetPaused()) { + SetPaused(true); SetPauseCalled(false); Pause(); ASSERT(GetPauseCalled()); - SetPaused(true); } } diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 87200656c..36d9a32e2 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -55,12 +55,13 @@ WorkQueue::~WorkQueue(void) * allowInterleaved is true in which case the new task might be run * immediately if it's being enqueued from within the WorkQueue thread. */ -void WorkQueue::Enqueue(const Task& task, bool allowInterleaved) +void WorkQueue::Enqueue(const boost::function& function, WorkQueuePriority priority, + bool allowInterleaved) { bool wq_thread = IsWorkerThread(); if (wq_thread && allowInterleaved) { - task(); + function(); return; } @@ -80,7 +81,7 @@ void WorkQueue::Enqueue(const Task& task, bool allowInterleaved) m_CVFull.wait(lock); } - m_Tasks.push_back(task); + m_Tasks.push(Task(function, priority)); m_CVEmpty.notify_one(); } @@ -200,15 +201,15 @@ void WorkQueue::WorkerThreadProc(void) if (m_Tasks.size() >= m_MaxItems && m_MaxItems != 0) m_CVFull.notify_all(); - Task task = m_Tasks.front(); - m_Tasks.pop_front(); + Task task = m_Tasks.top(); + m_Tasks.pop(); m_Processing++; lock.unlock(); try { - task(); + task.Function(); } catch (const std::exception&) { lock.lock(); diff --git a/lib/base/workqueue.hpp b/lib/base/workqueue.hpp index c6088b74e..75d93e1c3 100644 --- a/lib/base/workqueue.hpp +++ b/lib/base/workqueue.hpp @@ -27,12 +27,37 @@ #include #include #include +#include #include namespace icinga { -typedef boost::function Task; +enum WorkQueuePriority +{ + PriorityLow, + PriorityNormal, + PriorityHigh +}; + +struct Task +{ + Task(void) + : Priority(PriorityNormal) + { } + + Task(const boost::function& function, WorkQueuePriority priority) + : Function(function), Priority(priority) + { } + + boost::function Function; + WorkQueuePriority Priority; +}; + +inline bool operator<(const Task& a, const Task& b) +{ + return a.Priority < b.Priority; +} /** * A workqueue. @@ -47,7 +72,8 @@ public: WorkQueue(size_t maxItems = 0, int threadCount = 1); ~WorkQueue(void); - void Enqueue(const Task& task, bool allowInterleaved = false); + void Enqueue(const boost::function& function, WorkQueuePriority priority = PriorityNormal, + bool allowInterleaved = false); void Join(bool stop = false); bool IsWorkerThread(void) const; @@ -74,7 +100,7 @@ private: size_t m_MaxItems; bool m_Stopped; int m_Processing; - std::deque m_Tasks; + std::priority_queue > m_Tasks; ExceptionCallback m_ExceptionCallback; std::vector m_Exceptions; Timer::Ptr m_StatusTimer; diff --git a/lib/db_ido/dbconnection.cpp b/lib/db_ido/dbconnection.cpp index 1bbcf71d8..967815b6c 100644 --- a/lib/db_ido/dbconnection.cpp +++ b/lib/db_ido/dbconnection.cpp @@ -97,6 +97,9 @@ void DbConnection::Pause(void) query1.Fields = new Dictionary(); query1.Fields->Set("instance_id", 0); /* DbConnection class fills in real ID */ query1.Fields->Set("program_end_time", DbValue::FromTimestamp(Utility::GetTime())); + + query1.Priority = PriorityHigh; + ExecuteQuery(query1); NewTransaction(); @@ -134,6 +137,7 @@ void DbConnection::ProgramStatusHandler(void) query1.Category = DbCatProgramStatus; query1.WhereCriteria = new Dictionary(); query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ + query1.Priority = PriorityHigh; DbObject::OnQuery(query1); DbQuery query2; @@ -160,6 +164,7 @@ void DbConnection::ProgramStatusHandler(void) query2.Fields->Set("event_handlers_enabled", (IcingaApplication::GetInstance()->GetEnableEventHandlers() ? 1 : 0)); query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0)); query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0)); + query2.Priority = PriorityHigh; DbObject::OnQuery(query2); DbQuery query3; diff --git a/lib/db_ido/dbquery.hpp b/lib/db_ido/dbquery.hpp index dcab8771b..60ce35736 100644 --- a/lib/db_ido/dbquery.hpp +++ b/lib/db_ido/dbquery.hpp @@ -70,11 +70,12 @@ struct I2_DB_IDO_API DbQuery intrusive_ptr NotificationObject; bool ConfigUpdate; bool StatusUpdate; + WorkQueuePriority Priority; static void StaticInitialize(void); DbQuery(void) - : Type(0), Category(DbCatInvalid), ConfigUpdate(false), StatusUpdate(false) + : Type(0), Category(DbCatInvalid), ConfigUpdate(false), StatusUpdate(false), Priority(PriorityLow) { } }; diff --git a/lib/db_ido_mysql/idomysqlconnection.cpp b/lib/db_ido_mysql/idomysqlconnection.cpp index e9ddea685..dca7df260 100644 --- a/lib/db_ido_mysql/idomysqlconnection.cpp +++ b/lib/db_ido_mysql/idomysqlconnection.cpp @@ -95,7 +95,7 @@ void IdoMysqlConnection::Pause(void) DbConnection::Pause(); - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Disconnect, this), PriorityHigh); m_QueryQueue.Join(); } @@ -138,8 +138,8 @@ void IdoMysqlConnection::TxTimerHandler(void) void IdoMysqlConnection::NewTransaction(void) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this)); - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalNewTransaction, this), PriorityHigh); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, true), PriorityHigh); } void IdoMysqlConnection::InternalNewTransaction(void) @@ -155,13 +155,16 @@ void IdoMysqlConnection::InternalNewTransaction(void) void IdoMysqlConnection::ReconnectTimerHandler(void) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::Reconnect, this), PriorityLow); } void IdoMysqlConnection::Reconnect(void) { AssertOnWorkQueue(); + if (!IsActive()) + return; + CONTEXT("Reconnecting to MySQL IDO database '" + GetName() + "'"); m_SessionToken = static_cast(Utility::GetTime()); @@ -406,7 +409,7 @@ void IdoMysqlConnection::AsyncQuery(const String& query, const boost::function 500) FinishAsyncQueries(true); else - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::FinishAsyncQueries, this, false), PriorityLow); } void IdoMysqlConnection::FinishAsyncQueries(bool force) @@ -625,7 +628,7 @@ void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result) void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityLow); } void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) @@ -659,7 +662,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow); } void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) @@ -753,7 +756,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) { ASSERT(query.Category != DbCatInvalid); - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true); } void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) @@ -781,7 +784,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) { if (!FieldToEscapedString(kv.first, kv.second, &value)) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority); return; } @@ -902,7 +905,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true); } void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) diff --git a/lib/db_ido_pgsql/idopgsqlconnection.cpp b/lib/db_ido_pgsql/idopgsqlconnection.cpp index 1286717ef..c9bd87aad 100644 --- a/lib/db_ido_pgsql/idopgsqlconnection.cpp +++ b/lib/db_ido_pgsql/idopgsqlconnection.cpp @@ -97,7 +97,7 @@ void IdoPgsqlConnection::Pause(void) DbConnection::Pause(); - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Disconnect, this), PriorityHigh); m_QueryQueue.Join(); } @@ -139,7 +139,7 @@ void IdoPgsqlConnection::TxTimerHandler(void) void IdoPgsqlConnection::NewTransaction(void) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), true); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalNewTransaction, this), PriorityHigh, true); } void IdoPgsqlConnection::InternalNewTransaction(void) @@ -155,7 +155,7 @@ void IdoPgsqlConnection::InternalNewTransaction(void) void IdoPgsqlConnection::ReconnectTimerHandler(void) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::Reconnect, this), PriorityLow); } void IdoPgsqlConnection::Reconnect(void) @@ -503,7 +503,7 @@ Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int r void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityLow); } void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) @@ -537,7 +537,7 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow); } void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) @@ -630,7 +630,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query) { ASSERT(query.Category != DbCatInvalid); - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true); } void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) @@ -781,7 +781,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true); } void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) diff --git a/lib/remote/apilistener.cpp b/lib/remote/apilistener.cpp index d4269d8c9..5bccffba4 100644 --- a/lib/remote/apilistener.cpp +++ b/lib/remote/apilistener.cpp @@ -540,7 +540,7 @@ void ApiListener::ApiTimerHandler(void) void ApiListener::RelayMessage(const MessageOrigin::Ptr& origin, const ConfigObject::Ptr& secobj, const Dictionary::Ptr& message, bool log) { - m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), true); + m_RelayQueue.Enqueue(boost::bind(&ApiListener::SyncRelayMessage, this, origin, secobj, message, log), PriorityNormal, true); } void ApiListener::PersistMessage(const Dictionary::Ptr& message, const ConfigObject::Ptr& secobj)