mirror of https://github.com/Icinga/icinga2.git
parent
06211c3ac7
commit
f631bf8cb5
|
@ -32,7 +32,9 @@ REGISTER_TYPE(RedisWriter);
|
|||
|
||||
RedisWriter::RedisWriter(void)
|
||||
: m_Context(NULL)
|
||||
{ }
|
||||
{
|
||||
m_WorkQueue.SetName("RedisWriter");
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the component.
|
||||
|
@ -134,6 +136,8 @@ void RedisWriter::TryToReconnect(void)
|
|||
if (dbIndex != 0)
|
||||
ExecuteQuery({ "SELECT", Convert::ToString(dbIndex) });
|
||||
|
||||
UpdateSubscriptions();
|
||||
|
||||
/* Config dump */
|
||||
m_ConfigDumpInProgress = true;
|
||||
|
||||
|
@ -151,16 +155,19 @@ void RedisWriter::UpdateSubscriptions(void)
|
|||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
|
||||
|
||||
m_Subscriptions.clear();
|
||||
|
||||
if (!m_Context)
|
||||
return;
|
||||
|
||||
Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
|
||||
|
||||
Array::Ptr keys = new Array();
|
||||
long long cursor = 0;
|
||||
|
||||
String keyPrefix = "icinga:subscription:";
|
||||
|
||||
do {
|
||||
boost::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", "icinga:subscription:*", "COUNT", "1000" });
|
||||
boost::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" });
|
||||
|
||||
VERIFY(reply->type == REDIS_REPLY_ARRAY);
|
||||
VERIFY(reply->elements % 2 == 0);
|
||||
|
@ -173,36 +180,30 @@ void RedisWriter::UpdateSubscriptions(void)
|
|||
for (size_t i = 0; i < keysReply->elements; i++) {
|
||||
redisReply *keyReply = keysReply->element[i];
|
||||
VERIFY(keyReply->type == REDIS_REPLY_STRING);
|
||||
keys->Add(keyReply->str);
|
||||
|
||||
String key = keyReply->str;
|
||||
|
||||
try {
|
||||
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "SMEMBERS", key });
|
||||
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
|
||||
|
||||
RedisSubscriptionInfo rsi;
|
||||
|
||||
for (size_t j = 0; j < redisReply->elements; j++) {
|
||||
rsi.EventTypes.insert(redisReply->element[j]->str);
|
||||
}
|
||||
|
||||
Log(LogInformation, "RedisWriter")
|
||||
<< "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes));
|
||||
|
||||
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "RedisWriter")
|
||||
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
|
||||
}
|
||||
}
|
||||
} while (cursor != 0);
|
||||
|
||||
m_Subscriptions.clear();
|
||||
|
||||
ObjectLock olock(keys);
|
||||
for (String key : keys) {
|
||||
try {
|
||||
boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "LRANGE", key, "0", "-1" });
|
||||
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
|
||||
|
||||
RedisSubscriptionInfo rsi;
|
||||
Array::Ptr printer = new Array();
|
||||
for (size_t j = 0; j < redisReply->elements; j++) {
|
||||
rsi.EventTypes.insert(redisReply->element[j]->str);
|
||||
printer->Add(redisReply->element[j]->str);
|
||||
}
|
||||
Log(LogInformation, "RedisWriter")
|
||||
<< "Subscriber Info - Key: " << key << " Value: " << printer->ToString();
|
||||
|
||||
m_Subscriptions[key.SubStr(20)] = rsi;
|
||||
|
||||
} catch (const std::exception& ex) {
|
||||
Log(LogWarning, "RedisWriter")
|
||||
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
|
||||
|
||||
continue;
|
||||
}
|
||||
}
|
||||
Log(LogInformation, "RedisWriter")
|
||||
<< "Current Redis event subscriptions: " << m_Subscriptions.size();
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue