mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-25 10:48:20 +02:00
WIP
This commit is contained in:
parent
92c886a153
commit
a00f2f5d15
@ -299,6 +299,24 @@ void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckR
|
||||
perfdata->Add(new PerfdataValue("dump_took", dumpTook, false, "seconds", dumpTookThresholds.Warning, dumpTookThresholds.Critical, 0));
|
||||
}
|
||||
|
||||
struct {
|
||||
const char * Name;
|
||||
int (RedisConnection::* Getter)(RingBuffer::SizeType span, RingBuffer::SizeType tv);
|
||||
} const icingaWriteSubjects[] = {
|
||||
{"dump_config", &RedisConnection::GetWrittenConfigFor},
|
||||
{"dump_state", &RedisConnection::GetWrittenStateFor},
|
||||
{"dump_history", &RedisConnection::GetWrittenHistoryFor}
|
||||
};
|
||||
|
||||
for (auto subject : icingaWriteSubjects) {
|
||||
auto perMin ((redis->*subject.Getter)(60));
|
||||
|
||||
perfdata->Add(new PerfdataValue(subject.Name, perMin / 60.0, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_1min", perMin, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_5mins", (redis->*subject.Getter)(5 * 60), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue(String(subject.Name) + "_15mins", (redis->*subject.Getter)(15 * 60), false, "", Empty, Empty, 0));
|
||||
}
|
||||
|
||||
msgbuf << "\n\nIcinga DB daemon\n----------------\n"
|
||||
<< "\n* Version: " << version;
|
||||
|
||||
@ -342,9 +360,9 @@ void IcingadbCheckTask::ScriptFunc(const Checkable::Ptr& checkable, const CheckR
|
||||
}
|
||||
|
||||
perfdata->Add(new PerfdataValue("queries", qps, false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("queries_1min", redis->GetQueryCount(60), Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("queries_5mins", redis->GetQueryCount(5 * 60), Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("queries_15mins", redis->GetQueryCount(15 * 60), Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("queries_1min", redis->GetQueryCount(60), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("queries_5mins", redis->GetQueryCount(5 * 60), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("queries_15mins", redis->GetQueryCount(15 * 60), false, "", Empty, Empty, 0));
|
||||
perfdata->Add(new PerfdataValue("pending_queries", pendingQueries, false, "", pendingQueriesThresholds.Warning, pendingQueriesThresholds.Critical, 0));
|
||||
perfdata->Add(new PerfdataValue("down_for", downFor, false, "seconds", downForThresholds.Warning, downForThresholds.Critical, 0));
|
||||
perfdata->Add(new PerfdataValue("heartbeat_lag", heartbeatLag, false, "seconds", heartbeatThresholds.Warning, heartbeatThresholds.Critical));
|
||||
|
@ -113,7 +113,7 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
|
||||
* @param query Redis query
|
||||
* @param priority The query's priority
|
||||
*/
|
||||
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
|
||||
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
{
|
||||
Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
|
||||
@ -122,8 +122,8 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
|
||||
|
||||
auto item (Shared<Query>::Make(std::move(query)));
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
|
||||
asio::post(m_Strand, [this, item, priority, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(1);
|
||||
});
|
||||
@ -135,7 +135,7 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
|
||||
* @param queries Redis queries
|
||||
* @param priority The queries' priority
|
||||
*/
|
||||
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
|
||||
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
for (auto& query : queries) {
|
||||
Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
|
||||
@ -144,8 +144,8 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
|
||||
|
||||
auto item (Shared<Queries>::Make(std::move(queries)));
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
|
||||
asio::post(m_Strand, [this, item, priority, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(item->size());
|
||||
});
|
||||
@ -159,7 +159,7 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
|
||||
*
|
||||
* @return The response
|
||||
*/
|
||||
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
|
||||
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
{
|
||||
Log msg (LogDebug, "IcingaDB", "Executing query:");
|
||||
@ -170,8 +170,8 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
|
||||
auto future (promise.get_future());
|
||||
auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
|
||||
asio::post(m_Strand, [this, item, priority, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(1);
|
||||
});
|
||||
@ -189,7 +189,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
|
||||
*
|
||||
* @return The responses
|
||||
*/
|
||||
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
|
||||
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
|
||||
{
|
||||
for (auto& query : queries) {
|
||||
Log msg (LogDebug, "IcingaDB", "Executing query:");
|
||||
@ -200,8 +200,8 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
|
||||
auto future (promise.get_future());
|
||||
auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
|
||||
|
||||
asio::post(m_Strand, [this, item, priority]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
|
||||
asio::post(m_Strand, [this, item, priority, affects]() {
|
||||
m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, affects});
|
||||
m_QueuedWrites.Set();
|
||||
IncreasePendingQueries(item->first.size());
|
||||
});
|
||||
@ -625,6 +625,8 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
|
||||
if (next.Callback) {
|
||||
next.Callback(yc);
|
||||
}
|
||||
|
||||
RecordAffected(next.Affects, Utility::GetTime());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -706,3 +708,26 @@ void RedisConnection::DecreasePendingQueries(int count)
|
||||
m_OutputQueries.InsertValue(Utility::GetTime(), count);
|
||||
}
|
||||
}
|
||||
|
||||
void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, double when)
|
||||
{
|
||||
if (m_Parent) {
|
||||
auto parent (m_Parent);
|
||||
|
||||
asio::post(parent->m_Strand, [parent, affected, when]() {
|
||||
parent->RecordAffected(affected, when);
|
||||
});
|
||||
} else {
|
||||
if (affected.Config) {
|
||||
m_WrittenConfig.InsertValue(when, affected.Config);
|
||||
}
|
||||
|
||||
if (affected.State) {
|
||||
m_WrittenState.InsertValue(when, affected.State);
|
||||
}
|
||||
|
||||
if (affected.History) {
|
||||
m_WrittenHistory.InsertValue(when, affected.History);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -74,6 +74,13 @@ namespace icinga
|
||||
SyncConnection = 255
|
||||
};
|
||||
|
||||
struct QueryAffects
|
||||
{
|
||||
size_t Config = 0;
|
||||
size_t State = 0;
|
||||
size_t History = 0;
|
||||
};
|
||||
|
||||
RedisConnection(const String& host, int port, const String& path, const String& password, int db,
|
||||
bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
|
||||
const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const Ptr& parent = nullptr);
|
||||
@ -84,11 +91,11 @@ namespace icinga
|
||||
|
||||
bool IsConnected();
|
||||
|
||||
void FireAndForgetQuery(Query query, QueryPriority priority);
|
||||
void FireAndForgetQueries(Queries queries, QueryPriority priority);
|
||||
void FireAndForgetQuery(Query query, QueryPriority priority, QueryAffects affects = {});
|
||||
void FireAndForgetQueries(Queries queries, QueryPriority priority, QueryAffects affects = {});
|
||||
|
||||
Reply GetResultOfQuery(Query query, QueryPriority priority);
|
||||
Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
|
||||
Reply GetResultOfQuery(Query query, QueryPriority priority, QueryAffects affects = {});
|
||||
Replies GetResultsOfQueries(Queries queries, QueryPriority priority, QueryAffects affects = {});
|
||||
|
||||
void EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, QueryPriority priority);
|
||||
void Sync();
|
||||
@ -110,6 +117,21 @@ namespace icinga
|
||||
return m_PendingQueries;
|
||||
}
|
||||
|
||||
inline int GetWrittenConfigFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
||||
{
|
||||
return m_WrittenConfig.UpdateAndGetValues(tv, span);
|
||||
}
|
||||
|
||||
inline int GetWrittenStateFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
||||
{
|
||||
return m_WrittenState.UpdateAndGetValues(tv, span);
|
||||
}
|
||||
|
||||
inline int GetWrittenHistoryFor(RingBuffer::SizeType span, RingBuffer::SizeType tv = Utility::GetTime())
|
||||
{
|
||||
return m_WrittenHistory.UpdateAndGetValues(tv, span);
|
||||
}
|
||||
|
||||
private:
|
||||
/**
|
||||
* What to do with the responses to Redis queries.
|
||||
@ -146,6 +168,8 @@ namespace icinga
|
||||
Shared<std::pair<Query, std::promise<Reply>>>::Ptr GetResultOfQuery;
|
||||
Shared<std::pair<Queries, std::promise<Replies>>>::Ptr GetResultsOfQueries;
|
||||
std::function<void(boost::asio::yield_context&)> Callback;
|
||||
|
||||
QueryAffects Affects;
|
||||
};
|
||||
|
||||
typedef boost::asio::ip::tcp Tcp;
|
||||
@ -187,6 +211,7 @@ namespace icinga
|
||||
|
||||
void IncreasePendingQueries(int count);
|
||||
void DecreasePendingQueries(int count);
|
||||
void RecordAffected(QueryAffects affected, double when);
|
||||
|
||||
template<class StreamPtr>
|
||||
void Handshake(StreamPtr& stream, boost::asio::yield_context& yc);
|
||||
@ -238,6 +263,9 @@ namespace icinga
|
||||
// Stats
|
||||
RingBuffer m_InputQueries{10};
|
||||
RingBuffer m_OutputQueries{15 * 60};
|
||||
RingBuffer m_WrittenConfig{15 * 60};
|
||||
RingBuffer m_WrittenState{15 * 60};
|
||||
RingBuffer m_WrittenHistory{15 * 60};
|
||||
int m_PendingQueries{0};
|
||||
boost::asio::deadline_timer m_LogStatsTimer;
|
||||
Ptr m_Parent;
|
||||
|
Loading…
x
Reference in New Issue
Block a user