Subscriptions

fixes #5656
This commit is contained in:
Jean Flach 2017-10-11 16:03:43 +02:00 committed by Michael Friedrich
parent 1ada53dd57
commit 7c8412928e
1 changed files with 16 additions and 22 deletions

View File

@ -156,7 +156,7 @@ void RedisWriter::UpdateSubscriptions(void)
Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
std::map<String, String> subscriptions; Array::Ptr keys = new Array();
long long cursor = 0; long long cursor = 0;
do { do {
@ -173,42 +173,36 @@ void RedisWriter::UpdateSubscriptions(void)
for (size_t i = 0; i < keysReply->elements; i++) { for (size_t i = 0; i < keysReply->elements; i++) {
redisReply *keyReply = keysReply->element[i]; redisReply *keyReply = keysReply->element[i];
VERIFY(keyReply->type == REDIS_REPLY_STRING); VERIFY(keyReply->type == REDIS_REPLY_STRING);
keys->Add(keyReply->str);
boost::shared_ptr<redisReply> vreply = ExecuteQuery({ "GET", keyReply->str });
subscriptions[keyReply->str] = vreply->str;
} }
} while (cursor != 0); } while (cursor != 0);
m_Subscriptions.clear(); m_Subscriptions.clear();
for (const std::pair<String, String>& kv : subscriptions) { ObjectLock olock(keys);
const String& key = kv.first.SubStr(20); /* removes the "icinga:subscription: prefix */ for (String key : keys) {
const String& value = kv.second;
try { try {
Dictionary::Ptr subscriptionInfo = JsonDecode(value); boost::shared_ptr<redisReply> redisReply = ExecuteQuery({ "LRANGE", key, "0", "-1" });
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
Log(LogInformation, "RedisWriter")
<< "Subscriber Info - Key: " << key << " Value: " << Value(subscriptionInfo);
RedisSubscriptionInfo rsi; RedisSubscriptionInfo rsi;
Array::Ptr printer = new Array();
for (size_t j = 0; j < redisReply->elements; j++) {
rsi.EventTypes.insert(redisReply->element[j]->str);
printer->Add(redisReply->element[j]->str);
}
Log(LogInformation, "RedisWriter")
<< "Subscriber Info - Key: " << key << " Value: " << printer->ToString();
Array::Ptr types = subscriptionInfo->Get("types"); m_Subscriptions[key.SubStr(20)] = rsi;
if (types)
rsi.EventTypes = types->ToSet<String>();
m_Subscriptions[key] = rsi;
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "RedisWriter") Log(LogWarning, "RedisWriter")
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
continue; continue;
} }
//TODO
} }
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Current Redis event subscriptions: " << m_Subscriptions.size(); << "Current Redis event subscriptions: " << m_Subscriptions.size();
} }
@ -289,7 +283,7 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
ExecuteQuery({ "MULTI" }); ExecuteQuery({ "MULTI" });
ExecuteQuery({ "LPUSH", "icinga:event:" + name, body }); ExecuteQuery({ "LPUSH", "icinga:event:" + name, body });
ExecuteQuery({ "LTRIM", "icinga:event:" + name, 0, MAX_EVENTS - 1 }); ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(MAX_EVENTS - 1)});
ExecuteQuery({ "EXEC" }); ExecuteQuery({ "EXEC" });
} }
} }