Implement support for expiring subscriptions

refs #4991
This commit is contained in:
Gunnar Beutner 2017-03-13 10:37:51 +01:00 committed by Michael Friedrich
parent 3e98e3fb2c
commit 10ddcbe4d3

View File

@ -123,37 +123,67 @@ void RedisWriter::UpdateSubscriptions(void)
Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); Log(LogInformation, "RedisWriter", "Updating Redis subscriptions");
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HGETALL icinga:subscription")); std::map<String, String> subscriptions;
long long cursor = 0;
if (!reply) { do {
redisFree(m_Context); redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SCAN %lld MATCH icinga:subscription:* COUNT 1000", cursor));
m_Context = NULL;
return;
}
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { if (!reply) {
Log(LogInformation, "RedisWriter") redisFree(m_Context);
<< "HGETALL icinga:subscription: " << reply->str; m_Context = NULL;
} return;
}
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "SCAN " << cursor << " MATCH icinga:subscription:* COUNT 1000: " << reply->str;
}
VERIFY(reply->type == REDIS_REPLY_ARRAY);
VERIFY(reply->elements % 2 == 0);
redisReply *cursorReply = reply->element[0];
cursor = cursorReply->integer;
redisReply *keysReply = reply->element[1];
for (size_t i = 0; i < keysReply->elements; i++) {
redisReply *keyReply = keysReply->element[i];
VERIFY(keyReply->type == REDIS_REPLY_STRING);
redisReply *vreply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "GET %s", keyReply->str));
if (!vreply) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (vreply->type == REDIS_REPLY_STATUS || vreply->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "GET " << keyReply->str << ": " << vreply->str;
}
subscriptions[keyReply->str] = vreply->str;
freeReplyObject(vreply);
}
freeReplyObject(reply);
} while (cursor != 0);
m_Subscriptions.clear(); m_Subscriptions.clear();
//TODO for (const std::pair<String, String>& kv : subscriptions) {
VERIFY(reply->type == REDIS_REPLY_ARRAY); const String& key = kv.first.SubStr(20); /* removes the "icinga:subscription: prefix */
VERIFY(reply->elements % 2 == 0); const String& value = kv.second;
for (size_t i = 0; i < reply->elements; i += 2) {
redisReply *keyReply = reply->element[i];
VERIFY(keyReply->type == REDIS_REPLY_STRING);
redisReply *valueReply = reply->element[i + 1];
VERIFY(valueReply->type == REDIS_REPLY_STRING);
try { try {
Dictionary::Ptr subscriptionInfo = JsonDecode(valueReply->str); Dictionary::Ptr subscriptionInfo = JsonDecode(value);
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Subscriber Info - Key: " << keyReply->str << " Value: " << Value(subscriptionInfo); << "Subscriber Info - Key: " << key << " Value: " << Value(subscriptionInfo);
RedisSubscriptionInfo rsi; RedisSubscriptionInfo rsi;
@ -162,17 +192,15 @@ void RedisWriter::UpdateSubscriptions(void)
if (types) if (types)
rsi.EventTypes = types->ToSet<String>(); rsi.EventTypes = types->ToSet<String>();
m_Subscriptions[keyReply->str] = rsi; m_Subscriptions[key] = rsi;
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "RedisWriter") Log(LogWarning, "RedisWriter")
<< "Invalid Redis subscriber info for subscriber '" << keyReply->str << "': " << DiagnosticInformation(ex); << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
continue; continue;
} }
//TODO //TODO
} }
freeReplyObject(reply);
} }
void RedisWriter::HandleEvents(void) void RedisWriter::HandleEvents(void)