diff --git a/lib/db_ido/dbconnection.cpp b/lib/db_ido/dbconnection.cpp index 0ae9a80ca..40e6b618f 100644 --- a/lib/db_ido/dbconnection.cpp +++ b/lib/db_ido/dbconnection.cpp @@ -76,6 +76,11 @@ void DbConnection::Resume() m_CleanUpTimer->SetInterval(60); m_CleanUpTimer->OnTimerExpired.connect(std::bind(&DbConnection::CleanUpHandler, this)); m_CleanUpTimer->Start(); + + m_LogStatsTimer = new Timer(); + m_LogStatsTimer->SetInterval(10); + m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); }); + m_LogStatsTimer->Start(); } void DbConnection::Pause() @@ -236,6 +241,36 @@ void DbConnection::CleanUpHandler() } +void DbConnection::LogStatsHandler() +{ + auto pending = m_PendingQueries.load(); + + if (pending == 0u) { + return; + } + + auto now = Utility::GetTime(); + auto input = m_InputQueries.CalculateRate(now, 60); + auto output = m_OutputQueries.CalculateRate(now, 60); + String timeInfo; + + { + auto rate = output - input; + + if (pending < rate * 5) { + timeInfo = " empty in "; + if (rate <= 0) + timeInfo += "infinite time, your task handler isn't able to keep up"; + else + timeInfo += Utility::FormatDuration(pending / rate); + } + } + + Log(LogInformation, GetReflectionType()->GetName()) + << "Pending queries: " << pending << " (Input: " << input + << "/s; Output: " << output << "/s)" << timeInfo; +} + void DbConnection::CleanUpExecuteQuery(const String&, const String&, double) { /* Default handler does nothing. */ @@ -507,3 +542,18 @@ int DbConnection::GetSessionToken() { return Application::GetStartTime(); } + +void DbConnection::IncreasePendingQueries(int count) +{ + m_PendingQueries.fetch_add(count); + m_InputQueries.InsertValue(Utility::GetTime(), count); +} + +void DbConnection::DecreasePendingQueries(int count, bool increaseOutputRate) +{ + m_PendingQueries.fetch_sub(count); + + if (increaseOutputRate) { + m_OutputQueries.InsertValue(Utility::GetTime(), count); + } +} diff --git a/lib/db_ido/dbconnection.hpp b/lib/db_ido/dbconnection.hpp index 3cb049f64..9f1c74f6f 100644 --- a/lib/db_ido/dbconnection.hpp +++ b/lib/db_ido/dbconnection.hpp @@ -92,6 +92,9 @@ protected: static int GetSessionToken(); + void IncreasePendingQueries(int count); + void DecreasePendingQueries(int count, bool increaseOutputRate = true); + private: bool m_IDCacheValid{false}; std::map, String> m_ConfigHashes; @@ -101,8 +104,10 @@ private: std::set m_ConfigUpdates; std::set m_StatusUpdates; Timer::Ptr m_CleanUpTimer; + Timer::Ptr m_LogStatsTimer; void CleanUpHandler(); + void LogStatsHandler(); static Timer::Ptr m_ProgramStatusTimer; static boost::once_flag m_OnceFlag; @@ -112,6 +117,10 @@ private: mutable boost::mutex m_StatsMutex; RingBuffer m_QueryStats{15 * 60}; bool m_ActiveChangedHandler{false}; + + RingBuffer m_InputQueries{60}; + RingBuffer m_OutputQueries{60}; + Atomic m_PendingQueries{0}; }; struct database_error : virtual std::exception, virtual boost::exception { }; diff --git a/lib/db_ido_mysql/idomysqlconnection.cpp b/lib/db_ido_mysql/idomysqlconnection.cpp index 5dbaba071..4c449a19a 100644 --- a/lib/db_ido_mysql/idomysqlconnection.cpp +++ b/lib/db_ido_mysql/idomysqlconnection.cpp @@ -13,6 +13,7 @@ #include "base/configtype.hpp" #include "base/exception.hpp" #include "base/statsfunction.hpp" +#include "base/defer.hpp" #include using namespace icinga; @@ -175,6 +176,8 @@ void IdoMysqlConnection::InternalNewTransaction() if (!GetConnected()) return; + IncreasePendingQueries(2); + AsyncQuery("COMMIT"); AsyncQuery("BEGIN"); } @@ -524,12 +527,28 @@ void IdoMysqlConnection::FinishAsyncQueries() std::vector::size_type offset = 0; + // This will be executed if there is a problem with executing the queries, + // at which point this function throws an exception and the queries should + // not be listed as still pending in the queue. + Defer decreaseQueries ([this, &offset, &queries]() { + auto lostQueries = queries.size() - offset; + + if (lostQueries > 0) { + DecreasePendingQueries(lostQueries, false); + } + }); + while (offset < queries.size()) { std::ostringstream querybuf; std::vector::size_type count = 0; size_t num_bytes = 0; + Defer decreaseQueries ([this, &offset, &count]() { + offset += count; + DecreasePendingQueries(count); + }); + for (std::vector::size_type i = offset; i < queries.size(); i++) { const IdoAsyncQuery& aq = queries[i]; @@ -617,6 +636,9 @@ IdoMysqlResult IdoMysqlConnection::Query(const String& query) { AssertOnWorkQueue(); + IncreasePendingQueries(1); + Defer decreaseQueries ([this]() { DecreasePendingQueries(1); }); + /* finish all async queries to maintain the right order for queries */ FinishAsyncQueries(); @@ -770,6 +792,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) SetObjectID(dbobj, GetLastInsertID()); } else { qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast(dbref); + IncreasePendingQueries(1); AsyncQuery(qbuf.str()); } } @@ -804,6 +827,7 @@ void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) std::ostringstream qbuf; qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast(dbref); + IncreasePendingQueries(1); AsyncQuery(qbuf.str()); /* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate @@ -893,6 +917,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query) << "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true); } @@ -909,6 +934,7 @@ void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector& quer << "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(queries.size()); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true); } @@ -1066,6 +1092,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver if ((type & DbQueryInsert) && (type & DbQueryDelete)) { std::ostringstream qdel; qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str(); + IncreasePendingQueries(1); AsyncQuery(qdel.str()); type = DbQueryInsert; @@ -1150,6 +1177,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool << "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority); return; @@ -1178,6 +1206,7 @@ void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& << time_column << "'. max_age is set to '" << max_age << "'."; #endif /* I2_DEBUG */ + IncreasePendingQueries(1); m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true); }