diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp index 9d1405ff8..f9c2623c2 100644 --- a/lib/icingadb/icingadb-objects.cpp +++ b/lib/icingadb/icingadb-objects.cpp @@ -130,7 +130,7 @@ void IcingaDB::UpdateAllConfigObjects() double startTime = Utility::GetTime(); // Use a Workqueue to pack objects in parallel - WorkQueue upq(25000, Configuration::Concurrency); + WorkQueue upq(25000, Configuration::Concurrency, LogNotice); upq.SetName("IcingaDB:ConfigDump"); std::vector types = GetTypes(); @@ -174,7 +174,7 @@ void IcingaDB::UpdateAllConfigObjects() std::vector keys = GetTypeOverwriteKeys(lcType); DeleteKeys(rcon, keys, Prio::Config); - WorkQueue upqObjectType(25000, Configuration::Concurrency); + WorkQueue upqObjectType(25000, Configuration::Concurrency, LogNotice); upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); std::map redisCheckSums; diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp index 587544207..7b05364d1 100644 --- a/lib/icingadb/icingadb.cpp +++ b/lib/icingadb/icingadb.cpp @@ -63,7 +63,7 @@ void IcingaDB::Start(bool runtimeCreated) if (!ctype) continue; - RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex())); + RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex(), m_Rcon)); rCon->Start(); m_Rcons[ctype] = std::move(rCon); } diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp index 0ccbdd5b1..964b837b9 100644 --- a/lib/icingadb/icingadb.hpp +++ b/lib/icingadb/icingadb.hpp @@ -154,7 +154,7 @@ private: static std::vector GetTypes(); Timer::Ptr m_StatsTimer; - WorkQueue m_WorkQueue; + WorkQueue m_WorkQueue{0, 1, LogNotice}; String m_PrefixConfigObject; String m_PrefixConfigCheckSum; diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index a605e7967..7d2f564fd 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -9,6 +9,7 @@ #include "base/objectlock.hpp" #include "base/string.hpp" #include "base/tcpsocket.hpp" +#include "base/utility.hpp" #include #include #include @@ -23,14 +24,17 @@ using namespace icinga; namespace asio = boost::asio; -RedisConnection::RedisConnection(const String& host, const int port, const String& path, const String& password, const int db) : - RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db) +RedisConnection::RedisConnection(const String& host, const int port, const String& path, + const String& password, const int db, const RedisConnection::Ptr& parent) : + RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db, parent) { } -RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db) +RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, + String password, int db, const RedisConnection::Ptr& parent) : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db), - m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io) + m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), + m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent) { } @@ -41,6 +45,10 @@ void RedisConnection::Start() IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); + + if (!m_Parent) { + IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); }); + } } if (!m_Connecting.exchange(true)) { @@ -97,6 +105,7 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn asio::post(m_Strand, [this, item, priority]() { m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr}); m_QueuedWrites.Set(); + IncreasePendingQueries(1); }); } @@ -118,6 +127,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red asio::post(m_Strand, [this, item, priority]() { m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr}); m_QueuedWrites.Set(); + IncreasePendingQueries(item->size()); }); } @@ -143,6 +153,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query asio::post(m_Strand, [this, item, priority]() { m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr}); m_QueuedWrites.Set(); + IncreasePendingQueries(1); }); item = nullptr; @@ -172,6 +183,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q asio::post(m_Strand, [this, item, priority]() { m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item}); m_QueuedWrites.Set(); + IncreasePendingQueries(item->first.size()); }); item = nullptr; @@ -379,6 +391,41 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) } } +/** + * Periodically log current query performance + */ +void RedisConnection::LogStats(asio::yield_context& yc) +{ + double lastMessage = 0; + + m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); + + for (;;) { + m_LogStatsTimer.async_wait(yc); + m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10)); + + if (!IsConnected()) + continue; + + auto now (Utility::GetTime()); + bool timeoutReached = now - lastMessage >= 5 * 60; + + if (m_PendingQueries < 1 && !timeoutReached) + continue; + + auto output (round(m_OutputQueries.CalculateRate(now, 10))); + + if (m_PendingQueries < output * 5 && !timeoutReached) + continue; + + Log(LogInformation, "IcingaDB") + << "Pending queries: " << m_PendingQueries << " (Input: " + << round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)"; + + lastMessage = now; + } +} + /** * Send next and schedule receiving the response * @@ -388,6 +435,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection: { if (next.FireAndForgetQuery) { auto& item (*next.FireAndForgetQuery); + DecreasePendingQueries(1); try { WriteOne(item, yc); @@ -420,6 +468,8 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection: auto& item (*next.FireAndForgetQueries); size_t i = 0; + DecreasePendingQueries(item.size()); + try { for (auto& query : item) { WriteOne(query, yc); @@ -452,6 +502,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection: if (next.GetResultOfQuery) { auto& item (*next.GetResultOfQuery); + DecreasePendingQueries(1); try { WriteOne(item.first, yc); @@ -476,6 +527,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection: if (next.GetResultsOfQueries) { auto& item (*next.GetResultsOfQueries); + DecreasePendingQueries(item.first.size()); try { for (auto& query : item.first) { @@ -538,3 +590,31 @@ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_contex void RedisConnection::SetConnectedCallback(std::function callback) { m_ConnectedCallback = std::move(callback); } + +void RedisConnection::IncreasePendingQueries(int count) +{ + if (m_Parent) { + auto parent (m_Parent); + + asio::post(parent->m_Strand, [parent, count]() { + parent->IncreasePendingQueries(count); + }); + } else { + m_PendingQueries += count; + m_InputQueries.InsertValue(Utility::GetTime(), count); + } +} + +void RedisConnection::DecreasePendingQueries(int count) +{ + if (m_Parent) { + auto parent (m_Parent); + + asio::post(parent->m_Strand, [parent, count]() { + parent->DecreasePendingQueries(count); + }); + } else { + m_PendingQueries -= count; + m_OutputQueries.InsertValue(Utility::GetTime(), count); + } +} diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index bf580405b..9c7aaa880 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -7,11 +7,13 @@ #include "base/atomic.hpp" #include "base/io-engine.hpp" #include "base/object.hpp" +#include "base/ringbuffer.hpp" #include "base/shared.hpp" #include "base/string.hpp" #include "base/value.hpp" #include #include +#include #include #include #include @@ -68,7 +70,7 @@ namespace icinga }; RedisConnection(const String& host, const int port, const String& path, - const String& password = "", const int db = 0); + const String& password = "", const int db = 0, const Ptr& parent = nullptr); void Start(); @@ -141,11 +143,13 @@ namespace icinga template static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); - RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db); + RedisConnection(boost::asio::io_context& io, String host, int port, String path, + String password, int db, const Ptr& parent); void Connect(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc); void WriteLoop(boost::asio::yield_context& yc); + void LogStats(boost::asio::yield_context& yc); void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); @@ -156,6 +160,9 @@ namespace icinga template void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); + void IncreasePendingQueries(int count); + void DecreasePendingQueries(int count); + String m_Path; String m_Host; int m_Port; @@ -185,6 +192,13 @@ namespace icinga AsioConditionVariable m_QueuedWrites, m_QueuedReads; std::function m_ConnectedCallback; + + // Stats + RingBuffer m_InputQueries{10}; + RingBuffer m_OutputQueries{10}; + int m_PendingQueries{0}; + boost::asio::deadline_timer m_LogStatsTimer; + Ptr m_Parent; }; /**