Combine all Redis connections' logs

This commit is contained in:
Alexander A. Klimov 2021-07-16 18:50:38 +02:00
parent e6a9631a02
commit 67c4ebedd3
3 changed files with 35 additions and 12 deletions

View File

@ -63,7 +63,7 @@ void IcingaDB::Start(bool runtimeCreated)
if (!ctype) if (!ctype)
continue; continue;
RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex())); RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex(), m_Rcon));
rCon->Start(); rCon->Start();
m_Rcons[ctype] = std::move(rCon); m_Rcons[ctype] = std::move(rCon);
} }

View File

@ -24,15 +24,17 @@
using namespace icinga; using namespace icinga;
namespace asio = boost::asio; namespace asio = boost::asio;
RedisConnection::RedisConnection(const String& host, const int port, const String& path, const String& password, const int db) : RedisConnection::RedisConnection(const String& host, const int port, const String& path,
RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db) const String& password, const int db, const RedisConnection::Ptr& parent) :
RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db, parent)
{ {
} }
RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db) RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path,
String password, int db, const RedisConnection::Ptr& parent)
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db), : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io),
m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io) m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
{ {
} }
@ -43,8 +45,11 @@ void RedisConnection::Start()
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
if (!m_Parent) {
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); }); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
} }
}
if (!m_Connecting.exchange(true)) { if (!m_Connecting.exchange(true)) {
Ptr keepAlive (this); Ptr keepAlive (this);
@ -588,12 +593,28 @@ void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_contex
void RedisConnection::IncreasePendingQueries(int count) void RedisConnection::IncreasePendingQueries(int count)
{ {
if (m_Parent) {
auto parent (m_Parent);
asio::post(parent->m_Strand, [parent, count]() {
parent->IncreasePendingQueries(count);
});
} else {
m_PendingQueries += count; m_PendingQueries += count;
m_InputQueries.InsertValue(Utility::GetTime(), count); m_InputQueries.InsertValue(Utility::GetTime(), count);
}
} }
void RedisConnection::DecreasePendingQueries(int count) void RedisConnection::DecreasePendingQueries(int count)
{ {
if (m_Parent) {
auto parent (m_Parent);
asio::post(parent->m_Strand, [parent, count]() {
parent->DecreasePendingQueries(count);
});
} else {
m_PendingQueries -= count; m_PendingQueries -= count;
m_OutputQueries.InsertValue(Utility::GetTime(), count); m_OutputQueries.InsertValue(Utility::GetTime(), count);
}
} }

View File

@ -70,7 +70,7 @@ namespace icinga
}; };
RedisConnection(const String& host, const int port, const String& path, RedisConnection(const String& host, const int port, const String& path,
const String& password = "", const int db = 0); const String& password = "", const int db = 0, const Ptr& parent = nullptr);
void Start(); void Start();
@ -143,7 +143,8 @@ namespace icinga
template<class AsyncWriteStream> template<class AsyncWriteStream>
static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc);
RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db); RedisConnection(boost::asio::io_context& io, String host, int port, String path,
String password, int db, const Ptr& parent);
void Connect(boost::asio::yield_context& yc); void Connect(boost::asio::yield_context& yc);
void ReadLoop(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc);
@ -197,6 +198,7 @@ namespace icinga
RingBuffer m_OutputQueries{10}; RingBuffer m_OutputQueries{10};
int m_PendingQueries{0}; int m_PendingQueries{0};
boost::asio::deadline_timer m_LogStatsTimer; boost::asio::deadline_timer m_LogStatsTimer;
Ptr m_Parent;
}; };
/** /**