mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-27 07:34:15 +02:00
Add limit for subscriptions
This commit is contained in:
parent
f631bf8cb5
commit
c8561676ec
@ -22,11 +22,12 @@
|
|||||||
#include "remote/eventqueue.hpp"
|
#include "remote/eventqueue.hpp"
|
||||||
#include "base/json.hpp"
|
#include "base/json.hpp"
|
||||||
#include "base/statsfunction.hpp"
|
#include "base/statsfunction.hpp"
|
||||||
|
#include <boost/algorithm/string.hpp>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
|
||||||
//TODO Make configurable and figure out a sane default
|
//TODO Make configurable and figure out a sane default
|
||||||
#define MAX_EVENTS 5000
|
#define MAX_EVENTS_DEFAULT 5000
|
||||||
|
|
||||||
REGISTER_TYPE(RedisWriter);
|
REGISTER_TYPE(RedisWriter);
|
||||||
|
|
||||||
@ -178,29 +179,20 @@ void RedisWriter::UpdateSubscriptions(void)
|
|||||||
redisReply *keysReply = reply->element[1];
|
redisReply *keysReply = reply->element[1];
|
||||||
|
|
||||||
for (size_t i = 0; i < keysReply->elements; i++) {
|
for (size_t i = 0; i < keysReply->elements; i++) {
|
||||||
|
if (boost::algorithm::ends_with(keysReply->element[i]->str, ":limit"))
|
||||||
|
continue;
|
||||||
redisReply *keyReply = keysReply->element[i];
|
redisReply *keyReply = keysReply->element[i];
|
||||||
VERIFY(keyReply->type == REDIS_REPLY_STRING);
|
VERIFY(keyReply->type == REDIS_REPLY_STRING);
|
||||||
|
|
||||||
String key = keyReply->str;
|
RedisSubscriptionInfo rsi;
|
||||||
|
String key = keysReply->element[i]->str;
|
||||||
try {
|
|
||||||
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "SMEMBERS", key });
|
|
||||||
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
|
|
||||||
|
|
||||||
RedisSubscriptionInfo rsi;
|
|
||||||
|
|
||||||
for (size_t j = 0; j < redisReply->elements; j++) {
|
|
||||||
rsi.EventTypes.insert(redisReply->element[j]->str);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
if (!RedisWriter::GetSubscriptionTypes(key, rsi))
|
||||||
Log(LogInformation, "RedisWriter")
|
Log(LogInformation, "RedisWriter")
|
||||||
<< "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes));
|
<< "Subscription \"" << key<< "\" has no types listed.";
|
||||||
|
else
|
||||||
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
|
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
|
||||||
} catch (const std::exception& ex) {
|
|
||||||
Log(LogWarning, "RedisWriter")
|
|
||||||
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} while (cursor != 0);
|
} while (cursor != 0);
|
||||||
|
|
||||||
@ -208,6 +200,25 @@ void RedisWriter::UpdateSubscriptions(void)
|
|||||||
<< "Current Redis event subscriptions: " << m_Subscriptions.size();
|
<< "Current Redis event subscriptions: " << m_Subscriptions.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "SMEMBERS", key });
|
||||||
|
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
|
||||||
|
|
||||||
|
for (size_t j = 0; j < redisReply->elements; j++) {
|
||||||
|
rsi.EventTypes.insert(redisReply->element[j]->str);
|
||||||
|
}
|
||||||
|
|
||||||
|
Log(LogInformation, "RedisWriter")
|
||||||
|
<< "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes));
|
||||||
|
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log(LogWarning, "RedisWriter")
|
||||||
|
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RedisWriter::PublishStatsTimerHandler(void)
|
void RedisWriter::PublishStatsTimerHandler(void)
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(boost::bind(&RedisWriter::PublishStats, this));
|
m_WorkQueue.Enqueue(boost::bind(&RedisWriter::PublishStats, this));
|
||||||
@ -282,9 +293,19 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
|
|||||||
|
|
||||||
String body = JsonEncode(event);
|
String body = JsonEncode(event);
|
||||||
|
|
||||||
|
boost::shared_ptr<redisReply> maxExists = ExecuteQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" });
|
||||||
|
long maxEvents = MAX_EVENTS_DEFAULT;
|
||||||
|
if (maxExists->integer) {
|
||||||
|
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "GET", "icinga:subscription:" + name + ":limit"});
|
||||||
|
VERIFY(redisReply->type == REDIS_REPLY_STRING);
|
||||||
|
Log(LogInformation, "RedisWriter")
|
||||||
|
<< "Got limit " << redisReply->str << " for " << name;
|
||||||
|
maxEvents = Convert::ToLong(redisReply->str);
|
||||||
|
}
|
||||||
|
|
||||||
ExecuteQuery({ "MULTI" });
|
ExecuteQuery({ "MULTI" });
|
||||||
ExecuteQuery({ "LPUSH", "icinga:event:" + name, body });
|
ExecuteQuery({ "LPUSH", "icinga:event:" + name, body });
|
||||||
ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(MAX_EVENTS - 1)});
|
ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)});
|
||||||
ExecuteQuery({ "EXEC" });
|
ExecuteQuery({ "EXEC" });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -59,6 +59,7 @@ private:
|
|||||||
|
|
||||||
void UpdateSubscriptionsTimerHandler(void);
|
void UpdateSubscriptionsTimerHandler(void);
|
||||||
void UpdateSubscriptions(void);
|
void UpdateSubscriptions(void);
|
||||||
|
int GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi);
|
||||||
void PublishStatsTimerHandler(void);
|
void PublishStatsTimerHandler(void);
|
||||||
void PublishStats(void);
|
void PublishStats(void);
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user