From 6d53bd1c8f17f636103128c40ca501d16ccd144e Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 28 Nov 2013 10:36:43 +0100 Subject: [PATCH 1/4] Make sure IDO reconnect attempts don't recurse. Refs #5235 --- .../db_ido_mysql/idomysqlconnection.cpp | 6 +++--- .../db_ido_pgsql/idopgsqlconnection.cpp | 6 +++--- lib/base/workqueue.cpp | 20 +++++++++++++------ lib/base/workqueue.h | 16 +++++++++++---- 4 files changed, 32 insertions(+), 16 deletions(-) diff --git a/components/db_ido_mysql/idomysqlconnection.cpp b/components/db_ido_mysql/idomysqlconnection.cpp index c99b3f49c..d824d033e 100644 --- a/components/db_ido_mysql/idomysqlconnection.cpp +++ b/components/db_ido_mysql/idomysqlconnection.cpp @@ -99,7 +99,7 @@ void IdoMysqlConnection::Disconnect(void) void IdoMysqlConnection::TxTimerHandler(void) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this), true); } void IdoMysqlConnection::NewTransaction(void) @@ -465,7 +465,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) { ASSERT(query.Category != DbCatInvalid); - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true); } void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query) @@ -595,7 +595,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query) void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) { - m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age)); + m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true); } void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) diff --git a/components/db_ido_pgsql/idopgsqlconnection.cpp b/components/db_ido_pgsql/idopgsqlconnection.cpp index 8bfaff2e1..c512af847 100644 --- a/components/db_ido_pgsql/idopgsqlconnection.cpp +++ b/components/db_ido_pgsql/idopgsqlconnection.cpp @@ -99,7 +99,7 @@ void IdoPgsqlConnection::Disconnect(void) void IdoPgsqlConnection::TxTimerHandler(void) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this), true); } void IdoPgsqlConnection::NewTransaction(void) @@ -481,7 +481,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query) { ASSERT(query.Category != DbCatInvalid); - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true); } void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query) @@ -616,7 +616,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query) void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) { - m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age)); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true); } void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 218c91b36..638264f5f 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -45,7 +45,7 @@ WorkQueue::~WorkQueue(void) * Enqueues a work item. Work items are guaranteed to be executed in the order * they were enqueued in. */ -void WorkQueue::Enqueue(const WorkCallback& item) +void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved) { boost::mutex::scoped_lock lock(m_Mutex); @@ -58,10 +58,14 @@ void WorkQueue::Enqueue(const WorkCallback& item) m_CV.wait(lock); } + WorkItem item; + item.Callback = callback; + item.AllowInterleaved = allowInterleaved; + m_Items.push_back(item); if (wq_thread) - ProcessItems(lock); + ProcessItems(lock, true); else m_CV.notify_all(); } @@ -93,16 +97,20 @@ void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp) throw; } -void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock) +void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved) { while (!m_Items.empty()) { try { - WorkCallback wi = m_Items.front(); + WorkItem wi = m_Items.front(); + + if (interleaved && !wi.AllowInterleaved) + return; + m_Items.pop_front(); m_CV.notify_all(); lock.unlock(); - wi(); + wi.Callback(); } catch (const std::exception& ex) { lock.lock(); @@ -132,7 +140,7 @@ void WorkQueue::WorkerThreadProc(void) if (m_Joined) break; - ProcessItems(lock); + ProcessItems(lock, false); } m_Stopped = true; diff --git a/lib/base/workqueue.h b/lib/base/workqueue.h index 71445b8cb..cccf625c4 100644 --- a/lib/base/workqueue.h +++ b/lib/base/workqueue.h @@ -31,6 +31,15 @@ namespace icinga { +typedef boost::function WorkCallback; + +struct WorkItem +{ + + WorkCallback Callback; + bool AllowInterleaved; +}; + /** * A workqueue. * @@ -39,13 +48,12 @@ namespace icinga class I2_BASE_API WorkQueue { public: - typedef boost::function WorkCallback; typedef boost::function ExceptionCallback; WorkQueue(size_t maxItems = 25000); ~WorkQueue(void); - void Enqueue(const WorkCallback& item); + void Enqueue(const WorkCallback& callback, bool allowInterleaved = false); void Join(void); boost::thread::id GetThreadId(void) const; @@ -62,10 +70,10 @@ private: size_t m_MaxItems; bool m_Joined; bool m_Stopped; - std::deque m_Items; + std::deque m_Items; ExceptionCallback m_ExceptionCallback; - void ProcessItems(boost::mutex::scoped_lock& lock); + void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved); void WorkerThreadProc(void); static void DefaultExceptionCallback(boost::exception_ptr exp); From 5daedd2dea0fd3a069e890048c80302b3d7012da Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 28 Nov 2013 10:37:22 +0100 Subject: [PATCH 2/4] Remove unnecessary host config update. Refs #5235 --- lib/db_ido/servicedbobject.cpp | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/lib/db_ido/servicedbobject.cpp b/lib/db_ido/servicedbobject.cpp index 55be48822..497594725 100644 --- a/lib/db_ido/servicedbobject.cpp +++ b/lib/db_ido/servicedbobject.cpp @@ -303,19 +303,6 @@ void ServiceDbObject::OnConfigUpdate(void) /* update comments and downtimes on config change */ AddComments(service); AddDowntimes(service); - - /* service host config update */ - Host::Ptr host = service->GetHost(); - - if (host->GetCheckService() != service) - return; - - DbObject::Ptr dbobj = GetOrCreateByObject(host); - - if (!dbobj) - return; - - dbobj->SendConfigUpdate(); } void ServiceDbObject::OnStatusUpdate(void) From 85fec966b818c1f3b41c595138fc0710f84d73d3 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 28 Nov 2013 12:12:10 +0100 Subject: [PATCH 3/4] Improve IDO query performance. Refs #5235 --- components/db_ido_mysql/idomysqlconnection.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/db_ido_mysql/idomysqlconnection.cpp b/components/db_ido_mysql/idomysqlconnection.cpp index d824d033e..ba89dad7e 100644 --- a/components/db_ido_mysql/idomysqlconnection.cpp +++ b/components/db_ido_mysql/idomysqlconnection.cpp @@ -280,7 +280,7 @@ Array::Ptr IdoMysqlConnection::Query(const String& query) << errinfo_database_query(query) ); - MYSQL_RES *result = mysql_store_result(&m_Connection); + MYSQL_RES *result = mysql_use_result(&m_Connection); if (!result) { if (mysql_field_count(&m_Connection) > 0) From f9c53ad295086f494f7a924f0535477147c44b7d Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 28 Nov 2013 12:12:24 +0100 Subject: [PATCH 4/4] Implement workqueue statistics. Refs #5235 --- lib/base/workqueue.cpp | 11 ++++++++++- lib/base/workqueue.h | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index 638264f5f..67e51c397 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -21,6 +21,7 @@ #include "base/utility.h" #include "base/debug.h" #include "base/logger_fwd.h" +#include "base/convert.h" #include using namespace icinga; @@ -29,7 +30,8 @@ int WorkQueue::m_NextID = 1; WorkQueue::WorkQueue(size_t maxItems) : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), - m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) + m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback), + m_LastStatus(0) { m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); } @@ -109,6 +111,13 @@ void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved) m_Items.pop_front(); m_CV.notify_all(); + double now = Utility::GetTime(); + + if (m_LastStatus + 10 < now) { + Log(LogInformation, "base", "WQ items: " + Convert::ToString(m_Items.size())); + m_LastStatus = now; + } + lock.unlock(); wi.Callback(); } catch (const std::exception& ex) { diff --git a/lib/base/workqueue.h b/lib/base/workqueue.h index cccf625c4..6a35ba728 100644 --- a/lib/base/workqueue.h +++ b/lib/base/workqueue.h @@ -72,6 +72,7 @@ private: bool m_Stopped; std::deque m_Items; ExceptionCallback m_ExceptionCallback; + double m_LastStatus; void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved); void WorkerThreadProc(void);