diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index b89e59bf1..4f7fb8414 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -32,7 +32,9 @@ REGISTER_TYPE(RedisWriter); RedisWriter::RedisWriter(void) : m_Context(NULL) -{ } +{ + m_WorkQueue.SetName("RedisWriter"); +} /** * Starts the component. @@ -134,6 +136,8 @@ void RedisWriter::TryToReconnect(void) if (dbIndex != 0) ExecuteQuery({ "SELECT", Convert::ToString(dbIndex) }); + UpdateSubscriptions(); + /* Config dump */ m_ConfigDumpInProgress = true; @@ -151,16 +155,19 @@ void RedisWriter::UpdateSubscriptions(void) { AssertOnWorkQueue(); + Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); + + m_Subscriptions.clear(); + if (!m_Context) return; - Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); - - Array::Ptr keys = new Array(); long long cursor = 0; + String keyPrefix = "icinga:subscription:"; + do { - boost::shared_ptr reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", "icinga:subscription:*", "COUNT", "1000" }); + boost::shared_ptr reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); VERIFY(reply->type == REDIS_REPLY_ARRAY); VERIFY(reply->elements % 2 == 0); @@ -173,36 +180,30 @@ void RedisWriter::UpdateSubscriptions(void) for (size_t i = 0; i < keysReply->elements; i++) { redisReply *keyReply = keysReply->element[i]; VERIFY(keyReply->type == REDIS_REPLY_STRING); - keys->Add(keyReply->str); + + String key = keyReply->str; + + try { + boost::shared_ptr redisReply = ExecuteQuery({ "SMEMBERS", key }); + VERIFY(redisReply->type == REDIS_REPLY_ARRAY); + + RedisSubscriptionInfo rsi; + + for (size_t j = 0; j < redisReply->elements; j++) { + rsi.EventTypes.insert(redisReply->element[j]->str); + } + + Log(LogInformation, "RedisWriter") + << "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes)); + + m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi; + } catch (const std::exception& ex) { + Log(LogWarning, "RedisWriter") + << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); + } } } while (cursor != 0); - m_Subscriptions.clear(); - - ObjectLock olock(keys); - for (String key : keys) { - try { - boost::shared_ptr redisReply = ExecuteQuery({ "LRANGE", key, "0", "-1" }); - VERIFY(redisReply->type == REDIS_REPLY_ARRAY); - - RedisSubscriptionInfo rsi; - Array::Ptr printer = new Array(); - for (size_t j = 0; j < redisReply->elements; j++) { - rsi.EventTypes.insert(redisReply->element[j]->str); - printer->Add(redisReply->element[j]->str); - } - Log(LogInformation, "RedisWriter") - << "Subscriber Info - Key: " << key << " Value: " << printer->ToString(); - - m_Subscriptions[key.SubStr(20)] = rsi; - - } catch (const std::exception& ex) { - Log(LogWarning, "RedisWriter") - << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); - - continue; - } - } Log(LogInformation, "RedisWriter") << "Current Redis event subscriptions: " << m_Subscriptions.size(); }