RedisConnection: log actual query performance

This commit is contained in:
Alexander A. Klimov 2021-02-10 16:54:07 +01:00
parent 9f43c143d7
commit 82c3827b66
2 changed files with 72 additions and 1 deletions

View File

@ -9,6 +9,7 @@
#include "base/objectlock.hpp"
#include "base/string.hpp"
#include "base/tcpsocket.hpp"
#include "base/utility.hpp"
#include <boost/asio.hpp>
#include <boost/coroutine/exceptions.hpp>
#include <boost/date_time/posix_time/posix_time_duration.hpp>
@ -30,7 +31,8 @@ RedisConnection::RedisConnection(const String& host, const int port, const Strin
RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db)
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io)
m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io),
m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io)
{
}
@ -41,6 +43,7 @@ void RedisConnection::Start()
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
}
if (!m_Connecting.exchange(true)) {
@ -97,6 +100,7 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
asio::post(m_Strand, [this, item, priority]() {
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
m_QueuedWrites.Set();
IncreasePendingQueries(1);
});
}
@ -118,6 +122,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
asio::post(m_Strand, [this, item, priority]() {
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
m_QueuedWrites.Set();
IncreasePendingQueries(item->size());
});
}
@ -143,6 +148,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
asio::post(m_Strand, [this, item, priority]() {
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
m_QueuedWrites.Set();
IncreasePendingQueries(1);
});
item = nullptr;
@ -172,6 +178,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
asio::post(m_Strand, [this, item, priority]() {
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
m_QueuedWrites.Set();
IncreasePendingQueries(item->first.size());
});
item = nullptr;
@ -379,6 +386,41 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
}
}
/**
* Periodically log current query performance
*/
void RedisConnection::LogStats(asio::yield_context& yc)
{
double lastMessage = 0;
m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
for (;;) {
m_LogStatsTimer.async_wait(yc);
m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
if (!IsConnected())
continue;
auto now (Utility::GetTime());
bool timeoutReached = now - lastMessage >= 5 * 60;
if (m_PendingQueries < 1 && !timeoutReached)
continue;
auto output (round(m_OutputQueries.CalculateRate(now, 10)));
if (m_PendingQueries < output * 5 && !timeoutReached)
continue;
Log(LogInformation, "IcingaDB")
<< "Pending queries: " << m_PendingQueries << " (Input: "
<< round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)";
lastMessage = now;
}
}
/**
* Send next and schedule receiving the response
*
@ -388,6 +430,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
{
if (next.FireAndForgetQuery) {
auto& item (*next.FireAndForgetQuery);
DecreasePendingQueries(1);
try {
WriteOne(item, yc);
@ -420,6 +463,8 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
auto& item (*next.FireAndForgetQueries);
size_t i = 0;
DecreasePendingQueries(item.size());
try {
for (auto& query : item) {
WriteOne(query, yc);
@ -452,6 +497,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
if (next.GetResultOfQuery) {
auto& item (*next.GetResultOfQuery);
DecreasePendingQueries(1);
try {
WriteOne(item.first, yc);
@ -476,6 +522,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
if (next.GetResultsOfQueries) {
auto& item (*next.GetResultsOfQueries);
DecreasePendingQueries(item.first.size());
try {
for (auto& query : item.first) {
@ -538,3 +585,15 @@ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_contex
void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
m_ConnectedCallback = std::move(callback);
}
void RedisConnection::IncreasePendingQueries(int count)
{
m_PendingQueries += count;
m_InputQueries.InsertValue(Utility::GetTime(), count);
}
void RedisConnection::DecreasePendingQueries(int count)
{
m_PendingQueries -= count;
m_OutputQueries.InsertValue(Utility::GetTime(), count);
}

View File

@ -7,11 +7,13 @@
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/object.hpp"
#include "base/ringbuffer.hpp"
#include "base/shared.hpp"
#include "base/string.hpp"
#include "base/value.hpp"
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffered_stream.hpp>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/ip/tcp.hpp>
@ -146,6 +148,7 @@ namespace icinga
void Connect(boost::asio::yield_context& yc);
void ReadLoop(boost::asio::yield_context& yc);
void WriteLoop(boost::asio::yield_context& yc);
void LogStats(boost::asio::yield_context& yc);
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
Reply ReadOne(boost::asio::yield_context& yc);
void WriteOne(Query& query, boost::asio::yield_context& yc);
@ -156,6 +159,9 @@ namespace icinga
template<class StreamPtr>
void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc);
void IncreasePendingQueries(int count);
void DecreasePendingQueries(int count);
String m_Path;
String m_Host;
int m_Port;
@ -185,6 +191,12 @@ namespace icinga
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;
// Stats
RingBuffer m_InputQueries{10};
RingBuffer m_OutputQueries{10};
int m_PendingQueries{0};
boost::asio::deadline_timer m_LogStatsTimer;
};
/**