mirror of https://github.com/Icinga/icinga2.git
Merge pull request #7660 from Icinga/bugfix/icingadb-subscription
IcingaDB: get rid of icinga:{subscription,event}*
This commit is contained in:
commit
8b6c3bdc92
|
@ -53,11 +53,6 @@ void IcingaDB::Start(bool runtimeCreated)
|
|||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
|
||||
m_SubscriptionTimer = new Timer();
|
||||
m_SubscriptionTimer->SetInterval(15);
|
||||
m_SubscriptionTimer->OnTimerExpired.connect([this](const Timer * const&) { UpdateSubscriptionsTimerHandler(); });
|
||||
m_SubscriptionTimer->Start();
|
||||
|
||||
m_StatsTimer = new Timer();
|
||||
m_StatsTimer->SetInterval(1);
|
||||
m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); });
|
||||
|
@ -95,8 +90,6 @@ void IcingaDB::TryToReconnect()
|
|||
if (!m_Rcon || !m_Rcon->IsConnected())
|
||||
return;
|
||||
|
||||
UpdateSubscriptions();
|
||||
|
||||
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
|
||||
return;
|
||||
|
||||
|
@ -111,85 +104,6 @@ void IcingaDB::TryToReconnect()
|
|||
m_ConfigDumpInProgress = false;
|
||||
}
|
||||
|
||||
void IcingaDB::UpdateSubscriptionsTimerHandler()
|
||||
{
|
||||
m_WorkQueue.Enqueue([this]() { UpdateSubscriptions(); });
|
||||
}
|
||||
|
||||
void IcingaDB::UpdateSubscriptions()
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
Log(LogNotice, "IcingaDB", "Updating Redis subscriptions");
|
||||
|
||||
/* TODO:
|
||||
* Silently return in this case. Usually the RedisConnection checks for connectivity and logs in failure case.
|
||||
* But since we expect and answer here and break Icinga in case of receiving no answer/an unexpected one we opt for
|
||||
* better safe than sorry here. Future implementation needs to include an improved error handling and answer verification.
|
||||
*/
|
||||
if (!m_Rcon || !m_Rcon->IsConnected())
|
||||
return;
|
||||
|
||||
String cursor = "0";
|
||||
String keyPrefix = "icinga:subscription:";
|
||||
|
||||
do {
|
||||
Array::Ptr reply = m_Rcon->GetResultOfQuery({ "SCAN", cursor, "MATCH", keyPrefix + "*", "COUNT", "1000" });
|
||||
VERIFY(reply->GetLength() % 2u == 0u);
|
||||
|
||||
cursor = reply->Get(0);
|
||||
|
||||
Array::Ptr keys = reply->Get(1);
|
||||
ObjectLock oLock (keys);
|
||||
|
||||
for (String key : keys) {
|
||||
if (boost::algorithm::ends_with(key, ":limit"))
|
||||
continue;
|
||||
|
||||
RedisSubscriptionInfo rsi;
|
||||
|
||||
if (!IcingaDB::GetSubscriptionTypes(key, rsi)) {
|
||||
Log(LogInformation, "IcingaDB")
|
||||
<< "Subscription \"" << key << "\" has no types listed.";
|
||||
} else {
|
||||
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
|
||||
}
|
||||
}
|
||||
} while (cursor != "0");
|
||||
|
||||
Log(LogNotice, "IcingaDB")
|
||||
<< "Current Redis event subscriptions: " << m_Subscriptions.size();
|
||||
}
|
||||
|
||||
bool IcingaDB::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
|
||||
{
|
||||
try {
|
||||
Array::Ptr redisReply = m_Rcon->GetResultOfQuery({ "SMEMBERS", key });
|
||||
|
||||
if (redisReply->GetLength() == 0)
|
||||
return false;
|
||||
|
||||
{
|
||||
ObjectLock oLock (redisReply);
|
||||
|
||||
for (String member : redisReply) {
|
||||
rsi.EventTypes.insert(member);
|
||||
}
|
||||
}
|
||||
|
||||
Log(LogInformation, "IcingaDB")
|
||||
<< "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes));
|
||||
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "IcingaDB")
|
||||
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
void IcingaDB::PublishStatsTimerHandler(void)
|
||||
{
|
||||
m_WorkQueue.Enqueue([this]() { PublishStats(); });
|
||||
|
@ -217,16 +131,8 @@ void IcingaDB::HandleEvents()
|
|||
|
||||
std::set<String> types;
|
||||
types.insert("CheckResult");
|
||||
types.insert("StateChange");
|
||||
types.insert("Notification");
|
||||
types.insert("AcknowledgementSet");
|
||||
types.insert("AcknowledgementCleared");
|
||||
types.insert("CommentAdded");
|
||||
types.insert("CommentRemoved");
|
||||
types.insert("DowntimeAdded");
|
||||
types.insert("DowntimeRemoved");
|
||||
types.insert("DowntimeStarted");
|
||||
types.insert("DowntimeTriggered");
|
||||
|
||||
queue->SetTypes(types);
|
||||
|
||||
|
@ -245,39 +151,6 @@ void IcingaDB::HandleEvents()
|
|||
EventQueue::UnregisterIfUnused(queueName, queue);
|
||||
}
|
||||
|
||||
void IcingaDB::HandleEvent(const Dictionary::Ptr& event)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
|
||||
const auto& name = kv.first;
|
||||
const auto& rsi = kv.second;
|
||||
|
||||
if (rsi.EventTypes.find(event->Get("type")) == rsi.EventTypes.end())
|
||||
continue;
|
||||
|
||||
String body = JsonEncode(event);
|
||||
|
||||
double maxExists = m_Rcon->GetResultOfQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" });
|
||||
|
||||
long maxEvents = MAX_EVENTS_DEFAULT;
|
||||
if (maxExists != 0) {
|
||||
String redisReply = m_Rcon->GetResultOfQuery({ "GET", "icinga:subscription:" + name + ":limit"});
|
||||
|
||||
Log(LogInformation, "IcingaDB")
|
||||
<< "Got limit " << redisReply << " for " << name;
|
||||
|
||||
maxEvents = Convert::ToLong(redisReply);
|
||||
}
|
||||
|
||||
m_Rcon->FireAndForgetQueries({
|
||||
{ "MULTI" },
|
||||
{ "LPUSH", "icinga:event:" + name, body },
|
||||
{ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)},
|
||||
{ "EXEC" }});
|
||||
}
|
||||
}
|
||||
|
||||
void IcingaDB::SendEvent(const Dictionary::Ptr& event)
|
||||
{
|
||||
AssertOnWorkQueue();
|
||||
|
@ -328,12 +201,6 @@ void IcingaDB::SendEvent(const Dictionary::Ptr& event)
|
|||
event->Set("comment_id", GetObjectIdentifier(AckComment));
|
||||
}
|
||||
}
|
||||
|
||||
String body = JsonEncode(event);
|
||||
|
||||
m_Rcon->FireAndForgetQueries({
|
||||
{ "PUBLISH", "icinga:event:all", body },
|
||||
{ "PUBLISH", "icinga:event:" + event->Get("type"), body }});
|
||||
}
|
||||
|
||||
void IcingaDB::Stop(bool runtimeRemoved)
|
||||
|
|
|
@ -17,11 +17,6 @@
|
|||
namespace icinga
|
||||
{
|
||||
|
||||
struct RedisSubscriptionInfo
|
||||
{
|
||||
std::set<String> EventTypes;
|
||||
};
|
||||
|
||||
/**
|
||||
* @ingroup icingadb
|
||||
*/
|
||||
|
@ -42,12 +37,8 @@ private:
|
|||
void ReconnectTimerHandler();
|
||||
void TryToReconnect();
|
||||
void HandleEvents();
|
||||
void HandleEvent(const Dictionary::Ptr& event);
|
||||
void SendEvent(const Dictionary::Ptr& event);
|
||||
|
||||
void UpdateSubscriptionsTimerHandler();
|
||||
void UpdateSubscriptions();
|
||||
bool GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi);
|
||||
void PublishStatsTimerHandler();
|
||||
void PublishStats();
|
||||
|
||||
|
@ -123,9 +114,7 @@ private:
|
|||
|
||||
Timer::Ptr m_StatsTimer;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
Timer::Ptr m_SubscriptionTimer;
|
||||
WorkQueue m_WorkQueue;
|
||||
std::map<String, RedisSubscriptionInfo> m_Subscriptions;
|
||||
|
||||
String m_PrefixConfigObject;
|
||||
String m_PrefixConfigCheckSum;
|
||||
|
|
Loading…
Reference in New Issue