diff --git a/lib/base/workqueue.cpp b/lib/base/workqueue.cpp index cacd17b76..4593b34ba 100644 --- a/lib/base/workqueue.cpp +++ b/lib/base/workqueue.cpp @@ -14,9 +14,9 @@ using namespace icinga; std::atomic WorkQueue::m_NextID(1); boost::thread_specific_ptr l_ThreadWorkQueue; -WorkQueue::WorkQueue(size_t maxItems, int threadCount) +WorkQueue::WorkQueue(size_t maxItems, int threadCount, LogSeverity statsLogLevel) : m_ID(m_NextID++), m_ThreadCount(threadCount), m_MaxItems(maxItems), - m_TaskStats(15 * 60) + m_TaskStats(15 * 60), m_StatsLogLevel(statsLogLevel) { /* Initialize logger. */ m_StatusTimerTimeout = Utility::GetTime(); @@ -216,7 +216,7 @@ void WorkQueue::StatusTimerHandler() /* Log if there are pending items, or 5 minute timeout is reached. */ if (pending > 0 || m_StatusTimerTimeout < now) { - Log(LogInformation, "WorkQueue") + Log(m_StatsLogLevel, "WorkQueue") << "#" << m_ID << " (" << m_Name << ") " << "items: " << pending << ", " << "rate: " << std::setw(2) << GetTaskCount(60) / 60.0 << "/s " diff --git a/lib/base/workqueue.hpp b/lib/base/workqueue.hpp index bc84d9176..1c3df57b2 100644 --- a/lib/base/workqueue.hpp +++ b/lib/base/workqueue.hpp @@ -6,6 +6,7 @@ #include "base/i2-base.hpp" #include "base/timer.hpp" #include "base/ringbuffer.hpp" +#include "base/logger.hpp" #include #include #include @@ -52,7 +53,7 @@ class WorkQueue public: typedef std::function ExceptionCallback; - WorkQueue(size_t maxItems = 0, int threadCount = 1); + WorkQueue(size_t maxItems = 0, int threadCount = 1, LogSeverity statsLogLevel = LogInformation); ~WorkQueue(); void SetName(const String& name); @@ -129,6 +130,7 @@ private: std::vector m_Exceptions; Timer::Ptr m_StatusTimer; double m_StatusTimerTimeout; + LogSeverity m_StatsLogLevel; RingBuffer m_TaskStats; size_t m_PendingTasks{0}; diff --git a/lib/db_ido/dbconnection.cpp b/lib/db_ido/dbconnection.cpp index 0ae9a80ca..6ab9b4e10 100644 --- a/lib/db_ido/dbconnection.cpp +++ b/lib/db_ido/dbconnection.cpp @@ -76,6 +76,11 @@ void DbConnection::Resume() m_CleanUpTimer->SetInterval(60); m_CleanUpTimer->OnTimerExpired.connect(std::bind(&DbConnection::CleanUpHandler, this)); m_CleanUpTimer->Start(); + + m_LogStatsTimer = new Timer(); + m_LogStatsTimer->SetInterval(10); + m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); }); + m_LogStatsTimer->Start(); } void DbConnection::Pause() @@ -236,6 +241,39 @@ void DbConnection::CleanUpHandler() } +void DbConnection::LogStatsHandler() +{ + auto pending = m_PendingQueries.load(); + + if (pending == 0u) { + return; + } + + auto now = Utility::GetTime(); + auto output = round(m_OutputQueries.CalculateRate(now, 10)); + + if (pending < output * 2) { + return; + } + + auto input = round(m_InputQueries.CalculateRate(now, 10)); + + String timeInfo = " empty in "; + + { + auto rate = output - input; + + if (rate <= 0) + timeInfo += "infinite time, your task handler isn't able to keep up"; + else + timeInfo += Utility::FormatDuration(pending / rate); + } + + Log(LogInformation, GetReflectionType()->GetName()) + << "Pending queries: " << pending << " (Input: " << input + << "/s; Output: " << output << "/s)" << timeInfo; +} + void DbConnection::CleanUpExecuteQuery(const String&, const String&, double) { /* Default handler does nothing. */ @@ -507,3 +545,15 @@ int DbConnection::GetSessionToken() { return Application::GetStartTime(); } + +void DbConnection::IncreasePendingQueries(int count) +{ + m_PendingQueries.fetch_add(count); + m_InputQueries.InsertValue(Utility::GetTime(), count); +} + +void DbConnection::DecreasePendingQueries(int count) +{ + m_PendingQueries.fetch_sub(count); + m_OutputQueries.InsertValue(Utility::GetTime(), count); +} diff --git a/lib/db_ido/dbconnection.hpp b/lib/db_ido/dbconnection.hpp index 3cb049f64..84e28d4c6 100644 --- a/lib/db_ido/dbconnection.hpp +++ b/lib/db_ido/dbconnection.hpp @@ -92,6 +92,9 @@ protected: static int GetSessionToken(); + void IncreasePendingQueries(int count); + void DecreasePendingQueries(int count); + private: bool m_IDCacheValid{false}; std::map, String> m_ConfigHashes; @@ -101,8 +104,10 @@ private: std::set m_ConfigUpdates; std::set m_StatusUpdates; Timer::Ptr m_CleanUpTimer; + Timer::Ptr m_LogStatsTimer; void CleanUpHandler(); + void LogStatsHandler(); static Timer::Ptr m_ProgramStatusTimer; static boost::once_flag m_OnceFlag; @@ -112,6 +117,10 @@ private: mutable boost::mutex m_StatsMutex; RingBuffer m_QueryStats{15 * 60}; bool m_ActiveChangedHandler{false}; + + RingBuffer m_InputQueries{10}; + RingBuffer m_OutputQueries{10}; + Atomic m_PendingQueries{0}; }; struct database_error : virtual std::exception, virtual boost::exception { }; diff --git a/lib/db_ido_mysql/idomysqlconnection.cpp b/lib/db_ido_mysql/idomysqlconnection.cpp index 5dbaba071..915a7b3b8 100644 --- a/lib/db_ido_mysql/idomysqlconnection.cpp +++ b/lib/db_ido_mysql/idomysqlconnection.cpp @@ -13,6 +13,7 @@ #include "base/configtype.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" +#include "base/defer.hpp" #include using namespace icinga; @@ -175,6 +176,8 @@ void IdoMysqlConnection::InternalNewTransaction() if (!GetConnected()) return; + IncreasePendingQueries(2); + AsyncQuery("COMMIT"); AsyncQuery("BEGIN"); } @@ -524,12 +527,28 @@ void IdoMysqlConnection::FinishAsyncQueries() std::vector::size_type offset = 0; + // This will be executed if there is a problem with executing the queries, + // at which point this function throws an exception and the queries should + // not be listed as still pending in the queue. + Defer decreaseQueries ([this, &offset, &queries]() { + auto lostQueries = queries.size() - offset; + + if (lostQueries > 0) { + DecreasePendingQueries(lostQueries); + } + }); + while (offset < queries.size()) { std::ostringstream querybuf; std::vector::size_type count = 0; size_t num_bytes = 0; + Defer decreaseQueries ([this, &offset, &count]() { + offset += count; + DecreasePendingQueries(count); + }); + for (std::vector::size_type i = offset; i < queries.size(); i++) { const IdoAsyncQuery& aq = queries[i]; @@ -608,8 +627,6 @@ void IdoMysqlConnection::FinishAsyncQueries() ); } } - - offset += count; } } @@ -617,6 +634,9 @@ IdoMysqlResult IdoMysqlConnection::Query(const String& query) { AssertOnWorkQueue(); + IncreasePendingQueries(1); + Defer decreaseQueries ([this]() { DecreasePendingQueries(1); }); + /* finish all async queries to maintain the right order for queries */ FinishAsyncQueries(); @@ -770,6 +790,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) SetObjectID(dbobj, GetLastInsertID()); } else { qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast(dbref); + IncreasePendingQueries(1); AsyncQuery(qbuf.str()); } } @@ -804,6 +825,7 @@ void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) std::ostringstream qbuf; qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast(dbref); + IncreasePendingQueries(1); AsyncQuery(qbuf.str()); /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate @@ -893,6 +915,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) << "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true); } @@ -909,6 +932,7 @@ void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector& quer << "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(queries.size()); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true); } @@ -948,11 +972,16 @@ void IdoMysqlConnection::InternalExecuteMultipleQueries(const std::vectorGetObject()->GetExtension("agent_check").ToBool()) + if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) { + DecreasePendingQueries(1); return; + } /* check if there are missing object/insert ids and re-enqueue the query */ if (!CanExecuteQuery(query)) { @@ -1066,6 +1104,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver if ((type & DbQueryInsert) && (type & DbQueryDelete)) { std::ostringstream qdel; qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str(); + IncreasePendingQueries(1); AsyncQuery(qdel.str()); type = DbQueryInsert; @@ -1150,6 +1189,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool << "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority); return; @@ -1178,6 +1218,7 @@ void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& << time_column << "'. max_age is set to '" << max_age << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true); } @@ -1185,11 +1226,15 @@ void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const { AssertOnWorkQueue(); - if (IsPaused()) + if (IsPaused()) { + DecreasePendingQueries(1); return; + } - if (!GetConnected()) + if (!GetConnected()) { + DecreasePendingQueries(1); return; + } AsyncQuery("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID)) + " AND " + time_column + diff --git a/lib/db_ido_mysql/idomysqlconnection.hpp b/lib/db_ido_mysql/idomysqlconnection.hpp index 748243a4c..3c1bf916b 100644 --- a/lib/db_ido_mysql/idomysqlconnection.hpp +++ b/lib/db_ido_mysql/idomysqlconnection.hpp @@ -54,7 +54,7 @@ protected: private: DbReference m_InstanceID; - WorkQueue m_QueryQueue{10000000}; + WorkQueue m_QueryQueue{10000000, 1, LogNotice}; Library m_Library; std::unique_ptr m_Mysql; diff --git a/lib/db_ido_pgsql/idopgsqlconnection.cpp b/lib/db_ido_pgsql/idopgsqlconnection.cpp index e458e0d63..a64da281f 100644 --- a/lib/db_ido_pgsql/idopgsqlconnection.cpp +++ b/lib/db_ido_pgsql/idopgsqlconnection.cpp @@ -14,6 +14,7 @@ #include "base/exception.hpp" #include "base/context.hpp" #include "base/statsfunction.hpp" +#include "base/defer.hpp" #include using namespace icinga; @@ -137,6 +138,7 @@ void IdoPgsqlConnection::Disconnect() if (!GetConnected()) return; + IncreasePendingQueries(1); Query("COMMIT"); m_Pgsql->finish(m_Connection); @@ -166,6 +168,7 @@ void IdoPgsqlConnection::InternalNewTransaction() if (!GetConnected()) return; + IncreasePendingQueries(2); Query("COMMIT"); Query("BEGIN"); } @@ -191,6 +194,7 @@ void IdoPgsqlConnection::Reconnect() if (GetConnected()) { /* Check if we're really still connected */ try { + IncreasePendingQueries(1); Query("SELECT 1"); return; } catch (const std::exception&) { @@ -260,10 +264,13 @@ void IdoPgsqlConnection::Reconnect() /* explicitely require legacy mode for string escaping in PostgreSQL >= 9.1 * changing standard_conforming_strings to on by default */ - if (m_Pgsql->serverVersion(m_Connection) >= 90100) + if (m_Pgsql->serverVersion(m_Connection) >= 90100) { + IncreasePendingQueries(1); result = Query("SET standard_conforming_strings TO off"); + } String dbVersionName = "idoutils"; + IncreasePendingQueries(1); result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'"); Dictionary::Ptr row = FetchRow(result, 0); @@ -295,10 +302,12 @@ void IdoPgsqlConnection::Reconnect() String instanceName = GetInstanceName(); + IncreasePendingQueries(1); result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'"); row = FetchRow(result, 0); if (!row) { + IncreasePendingQueries(1); Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')"); m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id"); } else { @@ -310,6 +319,7 @@ void IdoPgsqlConnection::Reconnect() /* we have an endpoint in a cluster setup, so decide if we can proceed here */ if (my_endpoint && GetHAMode() == HARunOnce) { /* get the current endpoint writing to programstatus table */ + IncreasePendingQueries(1); result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " + GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID)); row = FetchRow(result, 0); @@ -372,12 +382,14 @@ void IdoPgsqlConnection::Reconnect() << "PGSQL IDO instance id: " << static_cast(m_InstanceID) << " (schema version: '" + version + "')" << (!sslMode.IsEmpty() ? ", sslmode='" + sslMode + "'" : ""); + IncreasePendingQueries(1); Query("BEGIN"); /* update programstatus table */ UpdateProgramStatus(); /* record connection */ + IncreasePendingQueries(1); Query("INSERT INTO " + GetTablePrefix() + "conninfo " + "(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES (" + Convert::ToString(static_cast(m_InstanceID)) + ", NOW(), NOW(), E'icinga2 db_ido_pgsql', E'" + Escape(Application::GetAppVersion()) @@ -388,6 +400,7 @@ void IdoPgsqlConnection::Reconnect() std::ostringstream q1buf; q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast(m_InstanceID); + IncreasePendingQueries(1); result = Query(q1buf.str()); std::vector activeDbObjs; @@ -442,6 +455,7 @@ void IdoPgsqlConnection::FinishConnect(double startTime) << "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; + IncreasePendingQueries(2); Query("COMMIT"); Query("BEGIN"); } @@ -455,6 +469,7 @@ void IdoPgsqlConnection::ClearTablesBySession() void IdoPgsqlConnection::ClearTableBySession(const String& table) { + IncreasePendingQueries(1); Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID)) + " AND session_token <> " + Convert::ToString(GetSessionToken())); @@ -464,6 +479,8 @@ IdoPgsqlResult IdoPgsqlConnection::Query(const String& query) { AssertOnWorkQueue(); + Defer decreaseQueries ([this]() { DecreasePendingQueries(1); }); + Log(LogDebug, "IdoPgsqlConnection") << "Query: " << query; @@ -512,6 +529,7 @@ DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const Stri { AssertOnWorkQueue(); + IncreasePendingQueries(1); IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence(E'" + Escape(table) + "', E'" + Escape(column) + "')) AS id"); Dictionary::Ptr row = FetchRow(result, 0); @@ -601,10 +619,12 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) << "E'" << Escape(dbobj->GetName1()) << "', 1)"; } + IncreasePendingQueries(1); Query(qbuf.str()); SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id")); } else { qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast(dbref); + IncreasePendingQueries(1); Query(qbuf.str()); } } @@ -631,6 +651,7 @@ void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) std::ostringstream qbuf; qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast(dbref); + IncreasePendingQueries(1); Query(qbuf.str()); /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate @@ -715,6 +736,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query) ASSERT(query.Category != DbCatInvalid); + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true); } @@ -726,6 +748,7 @@ void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector& quer if (queries.empty()) return; + IncreasePendingQueries(queries.size()); m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true); } @@ -765,11 +788,15 @@ void IdoPgsqlConnection::InternalExecuteMultipleQueries(const std::vectorGetObject()->GetExtension("agent_check").ToBool()) + if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) { + DecreasePendingQueries(1); return; + } /* check if there are missing object/insert ids and re-enqueue the query */ if (!CanExecuteQuery(query)) { @@ -862,6 +898,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver if ((type & DbQueryInsert) && (type & DbQueryDelete)) { std::ostringstream qdel; qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str(); + IncreasePendingQueries(1); Query(qdel.str()); type = DbQueryInsert; @@ -929,6 +966,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver Query(qbuf.str()); if (upsert && GetAffectedRows() == 0) { + IncreasePendingQueries(1); InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert); return; @@ -959,6 +997,7 @@ void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& if (IsPaused()) return; + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true); } @@ -966,8 +1005,10 @@ void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const { AssertOnWorkQueue(); - if (!GetConnected()) + if (!GetConnected()) { + DecreasePendingQueries(1); return; + } Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast(m_InstanceID)) + " AND " + time_column + @@ -977,6 +1018,7 @@ void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type) { String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s"; + IncreasePendingQueries(1); IdoPgsqlResult result = Query(query); Dictionary::Ptr row; diff --git a/lib/db_ido_pgsql/idopgsqlconnection.hpp b/lib/db_ido_pgsql/idopgsqlconnection.hpp index 83e4d3f9f..e355116d4 100644 --- a/lib/db_ido_pgsql/idopgsqlconnection.hpp +++ b/lib/db_ido_pgsql/idopgsqlconnection.hpp @@ -48,7 +48,7 @@ protected: private: DbReference m_InstanceID; - WorkQueue m_QueryQueue{1000000}; + WorkQueue m_QueryQueue{1000000, 1, LogNotice}; Library m_Library; std::unique_ptr m_Pgsql;