mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-29 16:44:29 +02:00
parent
1cab2d7b16
commit
96b3c7d90b
@ -248,121 +248,34 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
|
|||||||
if (!m_Context)
|
if (!m_Context)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
String type = event->Get("type");
|
|
||||||
bool atLeastOneSubscriber = false;
|
|
||||||
|
|
||||||
for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
|
|
||||||
const auto& rsi = kv.second;
|
|
||||||
|
|
||||||
if (rsi.EventTypes.find(type) == rsi.EventTypes.end())
|
|
||||||
continue;
|
|
||||||
|
|
||||||
atLeastOneSubscriber = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!atLeastOneSubscriber)
|
|
||||||
return;
|
|
||||||
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "Pushing event to Redis: '" << Value(event) << "'.";
|
|
||||||
|
|
||||||
redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "INCR icinga:event.idx"));
|
|
||||||
|
|
||||||
if (!reply1) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "Called INCR in HandleEvent";
|
|
||||||
|
|
||||||
if (reply1->type == REDIS_REPLY_STATUS || reply1->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "INCR icinga:event.idx: " << reply1->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply1->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply1);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
//TODO
|
|
||||||
VERIFY(reply1->type == REDIS_REPLY_INTEGER);
|
|
||||||
|
|
||||||
long long index = reply1->integer;
|
|
||||||
|
|
||||||
freeReplyObject(reply1);
|
|
||||||
|
|
||||||
String body = JsonEncode(event);
|
|
||||||
|
|
||||||
//TODO: Verify that %lld is supported
|
|
||||||
redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SET icinga:event.%d %s", (int)index, body.CStr()));
|
|
||||||
|
|
||||||
if (!reply2) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply2->type == REDIS_REPLY_STATUS || reply2->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "SET icinga:event." << index << ": " << reply2->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply2->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply2);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply2);
|
|
||||||
|
|
||||||
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "EXPIRE icinga:event.%d 3600", (int)index, body.CStr()));
|
|
||||||
|
|
||||||
if (!reply3) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_STATUS || reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "EXPIRE icinga:event." << index << ": " << reply3->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
|
|
||||||
for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
|
for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
|
||||||
const auto& name = kv.first;
|
const auto& name = kv.first;
|
||||||
const auto& rsi = kv.second;
|
const auto& rsi = kv.second;
|
||||||
|
|
||||||
if (rsi.EventTypes.find(type) == rsi.EventTypes.end())
|
if (rsi.EventTypes.find(event->Get("type")) == rsi.EventTypes.end())
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
redisReply *reply4 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "LPUSH icinga:event:%s %d", name.CStr(), (int)index));
|
String body = JsonEncode(event);
|
||||||
|
|
||||||
if (!reply4) {
|
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "LPUSH icinga:event:%s %s", name.CStr(), body.CStr()));
|
||||||
|
|
||||||
|
if (!reply) {
|
||||||
redisFree(m_Context);
|
redisFree(m_Context);
|
||||||
m_Context = NULL;
|
m_Context = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reply4->type == REDIS_REPLY_STATUS || reply4->type == REDIS_REPLY_ERROR) {
|
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
|
||||||
Log(LogInformation, "RedisWriter")
|
Log(LogInformation, "RedisWriter")
|
||||||
<< "LPUSH icinga:event:" << kv.first << " " << index << ": " << reply4->str;
|
<< "LPUSH icinga:event:" << kv.first << " " << body << ": " << reply->str;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reply4->type == REDIS_REPLY_ERROR) {
|
if (reply->type == REDIS_REPLY_ERROR) {
|
||||||
freeReplyObject(reply4);
|
freeReplyObject(reply);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
freeReplyObject(reply4);
|
freeReplyObject(reply);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user