mirror of https://github.com/Icinga/icinga2.git
Merge pull request #8634 from Icinga/feature/icingadb-stats-log
RedisConnection: log actual query performance
This commit is contained in:
commit
2fbaf933bc
|
@ -130,7 +130,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
||||||
double startTime = Utility::GetTime();
|
double startTime = Utility::GetTime();
|
||||||
|
|
||||||
// Use a Workqueue to pack objects in parallel
|
// Use a Workqueue to pack objects in parallel
|
||||||
WorkQueue upq(25000, Configuration::Concurrency);
|
WorkQueue upq(25000, Configuration::Concurrency, LogNotice);
|
||||||
upq.SetName("IcingaDB:ConfigDump");
|
upq.SetName("IcingaDB:ConfigDump");
|
||||||
|
|
||||||
std::vector<Type::Ptr> types = GetTypes();
|
std::vector<Type::Ptr> types = GetTypes();
|
||||||
|
@ -174,7 +174,7 @@ void IcingaDB::UpdateAllConfigObjects()
|
||||||
std::vector<String> keys = GetTypeOverwriteKeys(lcType);
|
std::vector<String> keys = GetTypeOverwriteKeys(lcType);
|
||||||
DeleteKeys(rcon, keys, Prio::Config);
|
DeleteKeys(rcon, keys, Prio::Config);
|
||||||
|
|
||||||
WorkQueue upqObjectType(25000, Configuration::Concurrency);
|
WorkQueue upqObjectType(25000, Configuration::Concurrency, LogNotice);
|
||||||
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
|
upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType);
|
||||||
|
|
||||||
std::map<String, String> redisCheckSums;
|
std::map<String, String> redisCheckSums;
|
||||||
|
|
|
@ -63,7 +63,7 @@ void IcingaDB::Start(bool runtimeCreated)
|
||||||
if (!ctype)
|
if (!ctype)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex()));
|
RedisConnection::Ptr rCon (new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex(), m_Rcon));
|
||||||
rCon->Start();
|
rCon->Start();
|
||||||
m_Rcons[ctype] = std::move(rCon);
|
m_Rcons[ctype] = std::move(rCon);
|
||||||
}
|
}
|
||||||
|
|
|
@ -154,7 +154,7 @@ private:
|
||||||
static std::vector<Type::Ptr> GetTypes();
|
static std::vector<Type::Ptr> GetTypes();
|
||||||
|
|
||||||
Timer::Ptr m_StatsTimer;
|
Timer::Ptr m_StatsTimer;
|
||||||
WorkQueue m_WorkQueue;
|
WorkQueue m_WorkQueue{0, 1, LogNotice};
|
||||||
|
|
||||||
String m_PrefixConfigObject;
|
String m_PrefixConfigObject;
|
||||||
String m_PrefixConfigCheckSum;
|
String m_PrefixConfigCheckSum;
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
#include "base/objectlock.hpp"
|
#include "base/objectlock.hpp"
|
||||||
#include "base/string.hpp"
|
#include "base/string.hpp"
|
||||||
#include "base/tcpsocket.hpp"
|
#include "base/tcpsocket.hpp"
|
||||||
|
#include "base/utility.hpp"
|
||||||
#include <boost/asio.hpp>
|
#include <boost/asio.hpp>
|
||||||
#include <boost/coroutine/exceptions.hpp>
|
#include <boost/coroutine/exceptions.hpp>
|
||||||
#include <boost/date_time/posix_time/posix_time_duration.hpp>
|
#include <boost/date_time/posix_time/posix_time_duration.hpp>
|
||||||
|
@ -23,14 +24,17 @@
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
namespace asio = boost::asio;
|
namespace asio = boost::asio;
|
||||||
|
|
||||||
RedisConnection::RedisConnection(const String& host, const int port, const String& path, const String& password, const int db) :
|
RedisConnection::RedisConnection(const String& host, const int port, const String& path,
|
||||||
RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db)
|
const String& password, const int db, const RedisConnection::Ptr& parent) :
|
||||||
|
RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db, parent)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db)
|
RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path,
|
||||||
|
String password, int db, const RedisConnection::Ptr& parent)
|
||||||
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
|
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
|
||||||
m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io)
|
m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io),
|
||||||
|
m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
|
||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +45,10 @@ void RedisConnection::Start()
|
||||||
|
|
||||||
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
|
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
|
||||||
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
|
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
|
||||||
|
|
||||||
|
if (!m_Parent) {
|
||||||
|
IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!m_Connecting.exchange(true)) {
|
if (!m_Connecting.exchange(true)) {
|
||||||
|
@ -97,6 +105,7 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
|
||||||
asio::post(m_Strand, [this, item, priority]() {
|
asio::post(m_Strand, [this, item, priority]() {
|
||||||
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
|
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
|
||||||
m_QueuedWrites.Set();
|
m_QueuedWrites.Set();
|
||||||
|
IncreasePendingQueries(1);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -118,6 +127,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
|
||||||
asio::post(m_Strand, [this, item, priority]() {
|
asio::post(m_Strand, [this, item, priority]() {
|
||||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
|
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
|
||||||
m_QueuedWrites.Set();
|
m_QueuedWrites.Set();
|
||||||
|
IncreasePendingQueries(item->size());
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -143,6 +153,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
|
||||||
asio::post(m_Strand, [this, item, priority]() {
|
asio::post(m_Strand, [this, item, priority]() {
|
||||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
|
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
|
||||||
m_QueuedWrites.Set();
|
m_QueuedWrites.Set();
|
||||||
|
IncreasePendingQueries(1);
|
||||||
});
|
});
|
||||||
|
|
||||||
item = nullptr;
|
item = nullptr;
|
||||||
|
@ -172,6 +183,7 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
|
||||||
asio::post(m_Strand, [this, item, priority]() {
|
asio::post(m_Strand, [this, item, priority]() {
|
||||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
|
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
|
||||||
m_QueuedWrites.Set();
|
m_QueuedWrites.Set();
|
||||||
|
IncreasePendingQueries(item->first.size());
|
||||||
});
|
});
|
||||||
|
|
||||||
item = nullptr;
|
item = nullptr;
|
||||||
|
@ -379,6 +391,41 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Periodically log current query performance
|
||||||
|
*/
|
||||||
|
void RedisConnection::LogStats(asio::yield_context& yc)
|
||||||
|
{
|
||||||
|
double lastMessage = 0;
|
||||||
|
|
||||||
|
m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
m_LogStatsTimer.async_wait(yc);
|
||||||
|
m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
|
||||||
|
|
||||||
|
if (!IsConnected())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto now (Utility::GetTime());
|
||||||
|
bool timeoutReached = now - lastMessage >= 5 * 60;
|
||||||
|
|
||||||
|
if (m_PendingQueries < 1 && !timeoutReached)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
auto output (round(m_OutputQueries.CalculateRate(now, 10)));
|
||||||
|
|
||||||
|
if (m_PendingQueries < output * 5 && !timeoutReached)
|
||||||
|
continue;
|
||||||
|
|
||||||
|
Log(LogInformation, "IcingaDB")
|
||||||
|
<< "Pending queries: " << m_PendingQueries << " (Input: "
|
||||||
|
<< round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)";
|
||||||
|
|
||||||
|
lastMessage = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send next and schedule receiving the response
|
* Send next and schedule receiving the response
|
||||||
*
|
*
|
||||||
|
@ -388,6 +435,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
|
||||||
{
|
{
|
||||||
if (next.FireAndForgetQuery) {
|
if (next.FireAndForgetQuery) {
|
||||||
auto& item (*next.FireAndForgetQuery);
|
auto& item (*next.FireAndForgetQuery);
|
||||||
|
DecreasePendingQueries(1);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
WriteOne(item, yc);
|
WriteOne(item, yc);
|
||||||
|
@ -420,6 +468,8 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
|
||||||
auto& item (*next.FireAndForgetQueries);
|
auto& item (*next.FireAndForgetQueries);
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
|
|
||||||
|
DecreasePendingQueries(item.size());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (auto& query : item) {
|
for (auto& query : item) {
|
||||||
WriteOne(query, yc);
|
WriteOne(query, yc);
|
||||||
|
@ -452,6 +502,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
|
||||||
|
|
||||||
if (next.GetResultOfQuery) {
|
if (next.GetResultOfQuery) {
|
||||||
auto& item (*next.GetResultOfQuery);
|
auto& item (*next.GetResultOfQuery);
|
||||||
|
DecreasePendingQueries(1);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
WriteOne(item.first, yc);
|
WriteOne(item.first, yc);
|
||||||
|
@ -476,6 +527,7 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
|
||||||
|
|
||||||
if (next.GetResultsOfQueries) {
|
if (next.GetResultsOfQueries) {
|
||||||
auto& item (*next.GetResultsOfQueries);
|
auto& item (*next.GetResultsOfQueries);
|
||||||
|
DecreasePendingQueries(item.first.size());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (auto& query : item.first) {
|
for (auto& query : item.first) {
|
||||||
|
@ -538,3 +590,31 @@ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_contex
|
||||||
void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
|
void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
|
||||||
m_ConnectedCallback = std::move(callback);
|
m_ConnectedCallback = std::move(callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RedisConnection::IncreasePendingQueries(int count)
|
||||||
|
{
|
||||||
|
if (m_Parent) {
|
||||||
|
auto parent (m_Parent);
|
||||||
|
|
||||||
|
asio::post(parent->m_Strand, [parent, count]() {
|
||||||
|
parent->IncreasePendingQueries(count);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
m_PendingQueries += count;
|
||||||
|
m_InputQueries.InsertValue(Utility::GetTime(), count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void RedisConnection::DecreasePendingQueries(int count)
|
||||||
|
{
|
||||||
|
if (m_Parent) {
|
||||||
|
auto parent (m_Parent);
|
||||||
|
|
||||||
|
asio::post(parent->m_Strand, [parent, count]() {
|
||||||
|
parent->DecreasePendingQueries(count);
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
m_PendingQueries -= count;
|
||||||
|
m_OutputQueries.InsertValue(Utility::GetTime(), count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -7,11 +7,13 @@
|
||||||
#include "base/atomic.hpp"
|
#include "base/atomic.hpp"
|
||||||
#include "base/io-engine.hpp"
|
#include "base/io-engine.hpp"
|
||||||
#include "base/object.hpp"
|
#include "base/object.hpp"
|
||||||
|
#include "base/ringbuffer.hpp"
|
||||||
#include "base/shared.hpp"
|
#include "base/shared.hpp"
|
||||||
#include "base/string.hpp"
|
#include "base/string.hpp"
|
||||||
#include "base/value.hpp"
|
#include "base/value.hpp"
|
||||||
#include <boost/asio/buffer.hpp>
|
#include <boost/asio/buffer.hpp>
|
||||||
#include <boost/asio/buffered_stream.hpp>
|
#include <boost/asio/buffered_stream.hpp>
|
||||||
|
#include <boost/asio/deadline_timer.hpp>
|
||||||
#include <boost/asio/io_context.hpp>
|
#include <boost/asio/io_context.hpp>
|
||||||
#include <boost/asio/io_context_strand.hpp>
|
#include <boost/asio/io_context_strand.hpp>
|
||||||
#include <boost/asio/ip/tcp.hpp>
|
#include <boost/asio/ip/tcp.hpp>
|
||||||
|
@ -68,7 +70,7 @@ namespace icinga
|
||||||
};
|
};
|
||||||
|
|
||||||
RedisConnection(const String& host, const int port, const String& path,
|
RedisConnection(const String& host, const int port, const String& path,
|
||||||
const String& password = "", const int db = 0);
|
const String& password = "", const int db = 0, const Ptr& parent = nullptr);
|
||||||
|
|
||||||
void Start();
|
void Start();
|
||||||
|
|
||||||
|
@ -141,11 +143,13 @@ namespace icinga
|
||||||
template<class AsyncWriteStream>
|
template<class AsyncWriteStream>
|
||||||
static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc);
|
static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc);
|
||||||
|
|
||||||
RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db);
|
RedisConnection(boost::asio::io_context& io, String host, int port, String path,
|
||||||
|
String password, int db, const Ptr& parent);
|
||||||
|
|
||||||
void Connect(boost::asio::yield_context& yc);
|
void Connect(boost::asio::yield_context& yc);
|
||||||
void ReadLoop(boost::asio::yield_context& yc);
|
void ReadLoop(boost::asio::yield_context& yc);
|
||||||
void WriteLoop(boost::asio::yield_context& yc);
|
void WriteLoop(boost::asio::yield_context& yc);
|
||||||
|
void LogStats(boost::asio::yield_context& yc);
|
||||||
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
|
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
|
||||||
Reply ReadOne(boost::asio::yield_context& yc);
|
Reply ReadOne(boost::asio::yield_context& yc);
|
||||||
void WriteOne(Query& query, boost::asio::yield_context& yc);
|
void WriteOne(Query& query, boost::asio::yield_context& yc);
|
||||||
|
@ -156,6 +160,9 @@ namespace icinga
|
||||||
template<class StreamPtr>
|
template<class StreamPtr>
|
||||||
void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc);
|
void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc);
|
||||||
|
|
||||||
|
void IncreasePendingQueries(int count);
|
||||||
|
void DecreasePendingQueries(int count);
|
||||||
|
|
||||||
String m_Path;
|
String m_Path;
|
||||||
String m_Host;
|
String m_Host;
|
||||||
int m_Port;
|
int m_Port;
|
||||||
|
@ -185,6 +192,13 @@ namespace icinga
|
||||||
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
|
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
|
||||||
|
|
||||||
std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;
|
std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;
|
||||||
|
|
||||||
|
// Stats
|
||||||
|
RingBuffer m_InputQueries{10};
|
||||||
|
RingBuffer m_OutputQueries{10};
|
||||||
|
int m_PendingQueries{0};
|
||||||
|
boost::asio::deadline_timer m_LogStatsTimer;
|
||||||
|
Ptr m_Parent;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue