From d3e3159e6bb83322563035967325b6a208418bc9 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Thu, 2 Mar 2017 15:42:25 +0100 Subject: [PATCH] Implement event subscriptions refs #4991 --- lib/redis/rediswriter.cpp | 255 ++++++++++++++++++++++++++++++-------- lib/redis/rediswriter.hpp | 19 ++- 2 files changed, 220 insertions(+), 54 deletions(-) diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index 722d2a323..c720aa65e 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -26,6 +26,10 @@ using namespace icinga; REGISTER_TYPE(RedisWriter); +RedisWriter::RedisWriter(void) + : m_Context(NULL) +{ } + /** * Starts the component. */ @@ -36,12 +40,31 @@ void RedisWriter::Start(bool runtimeCreated) Log(LogInformation, "RedisWriter") << "'" << GetName() << "' started."; - boost::thread thread(boost::bind(&RedisWriter::ConnectionThreadProc, this)); + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->SetInterval(15); + m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::TryToReconnect, this)); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + m_SubscriptionTimer = new Timer(); + m_SubscriptionTimer->SetInterval(15); + m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::UpdateSubscriptions, this)); + m_SubscriptionTimer->Start(); + + boost::thread thread(boost::bind(&RedisWriter::HandleEvents, this)); thread.detach(); } -void RedisWriter::ConnectionThreadProc(void) +void RedisWriter::ReconnectTimerHandler(void) { + m_WorkQueue.Enqueue(boost::bind(&RedisWriter::TryToReconnect, this)); +} + +void RedisWriter::TryToReconnect(void) +{ + if (m_Context) + return; + String path = GetPath(); String host = GetHost(); @@ -59,41 +82,95 @@ void RedisWriter::ConnectionThreadProc(void) Log(LogWarning, "RedisWriter", "Connection error: ") << m_Context->errstr; } - } - for (;;) { - String password = GetPassword(); - - if (!password.IsEmpty()) { - redisReply *reply = reinterpret_cast(redisCommand(m_Context, "AUTH %s", password.CStr())); - - //TODO: Verify if we can continue here. - if (!reply) - continue; - - if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { - Log(LogInformation, "RedisWriter") - << "AUTH: " << reply->str; - } - - freeReplyObject(reply); + if (m_Context) { + redisFree(m_Context); + m_Context = NULL; } - HandleEvents(); - - for (;;) { - Log(LogInformation, "RedisWriter", "Trying to reconnect to redis server"); - - if (redisReconnect(m_Context) == REDIS_OK) { - Log(LogInformation, "RedisWriter", "Connection to redis server was reestablished"); - break; - } - - Log(LogInformation, "RedisWriter", "Unable to reconnect to redis server: Waiting for next attempt"); - - Utility::Sleep(15); - } + return; } + + String password = GetPassword(); + + if (!password.IsEmpty()) { + redisReply *reply = reinterpret_cast(redisCommand(m_Context, "AUTH %s", password.CStr())); + + if (!reply) { + redisFree(m_Context); + return; + } + + if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "AUTH: " << reply->str; + } + + freeReplyObject(reply); + } +} + +void RedisWriter::UpdateSubscriptionsTimerHandler(void) +{ + m_WorkQueue.Enqueue(boost::bind(&RedisWriter::UpdateSubscriptions, this)); +} + +void RedisWriter::UpdateSubscriptions(void) +{ + if (m_Context == NULL) + return; + + Log(LogInformation, "RedisWriter", "Updating Redis subscriptions"); + + redisReply *reply = reinterpret_cast(redisCommand(m_Context, "HGETALL icinga:subscription")); + + if (!reply) { + redisFree(m_Context); + return; + } + + if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "HGETALL icinga:subscription: " << reply->str; + } + + m_Subscriptions.clear(); + + //TODO + VERIFY(reply->type == REDIS_REPLY_ARRAY); + VERIFY(reply->elements % 2 == 0); + + for (size_t i = 0; i < reply->elements; i += 2) { + redisReply *keyReply = reply->element[i]; + VERIFY(keyReply->type == REDIS_REPLY_STRING); + + redisReply *valueReply = reply->element[i + 1]; + VERIFY(valueReply->type == REDIS_REPLY_STRING); + + try { + Dictionary::Ptr subscriptionInfo = JsonDecode(valueReply->str); + + Log(LogInformation, "RedisWriter") + << "Subscriber Info - Key: " << keyReply->str << " Value: " << Value(subscriptionInfo); + + RedisSubscriptionInfo rsi; + + Array::Ptr types = subscriptionInfo->Get("types"); + + if (types) + rsi.EventTypes = types->ToSet(); + + m_Subscriptions[keyReply->str] = rsi; + } catch (const std::exception& ex) { + Log(LogWarning, "RedisWriter") + << "Invalid Redis subscriber info for subscriber '" << keyReply->str << "': " << DiagnosticInformation(ex); + + continue; + } + //TODO + } + + freeReplyObject(reply); } void RedisWriter::HandleEvents(void) @@ -120,35 +197,107 @@ void RedisWriter::HandleEvents(void) queue->AddClient(this); for (;;) { - Dictionary::Ptr result = queue->WaitForEvent(this); + Dictionary::Ptr event = queue->WaitForEvent(this); - if (!result) + if (!event) continue; - String body = JsonEncode(result); - - redisReply *reply = reinterpret_cast(redisCommand(m_Context, "LPUSH icinga:events %s", body.CStr())); - - if (!reply) - break; - - if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { - Log(LogInformation, "RedisWriter") - << "LPUSH icinga:events: " << reply->str; - } - - if (reply->type == REDIS_REPLY_ERROR) { - freeReplyObject(reply); - break; - } - - freeReplyObject(reply); + m_WorkQueue.Enqueue(boost::bind(&RedisWriter::HandleEvent, this, event)); } queue->RemoveClient(this); EventQueue::UnregisterIfUnused(queueName, queue); } +void RedisWriter::HandleEvent(const Dictionary::Ptr& event) +{ + redisReply *reply1 = reinterpret_cast(redisCommand(m_Context, "INCR icinga:event.idx")); + + if (!reply1) + return; + + if (reply1->type == REDIS_REPLY_STATUS || reply1->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "INCR icinga:event.idx: " << reply1->str; + } + + if (reply1->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply1); + return; + } + + //TODO + VERIFY(reply1->type == REDIS_REPLY_INTEGER); + + long long index = reply1->integer; + + freeReplyObject(reply1); + + String body = JsonEncode(event); + + //TODO: Verify that %lld is supported + redisReply *reply2 = reinterpret_cast(redisCommand(m_Context, "SET icinga:event.%lld %s", index, body.CStr())); + + if (!reply2) + return; + + if (reply2->type == REDIS_REPLY_STATUS || reply2->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "SET icinga:event." << index << ": " << reply2->str; + } + + if (reply2->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply2); + return; + } + + freeReplyObject(reply2); + + redisReply *reply3 = reinterpret_cast(redisCommand(m_Context, "EXPIRE icinga:event.%lld 3600", index, body.CStr())); + + if (!reply3) + return; + + if (reply3->type == REDIS_REPLY_STATUS || reply3->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "EXPIRE icinga:event." << index << ": " << reply3->str; + } + + if (reply3->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply3); + return; + } + + freeReplyObject(reply3); + + String type = event->Get("type"); + + for (const std::pair& kv : m_Subscriptions) { + const auto& name = kv.first; + const auto& rsi = kv.second; + + if (rsi.EventTypes.find(type) == rsi.EventTypes.end()) + continue; + + redisReply *reply4 = reinterpret_cast(redisCommand(m_Context, "LPUSH icinga:event:%s %lld", name.CStr(), index)); + + if (!reply4) + return; + + if (reply4->type == REDIS_REPLY_STATUS || reply4->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "LPUSH icinga:event:" << kv.first << " " << index << ": " << reply4->str; + } + + if (reply4->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply4); + return; + } + + freeReplyObject(reply4); + } +} + void RedisWriter::Stop(bool runtimeRemoved) { Log(LogInformation, "RedisWriter") diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index 44fe6b4ae..51d9b7c43 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -23,11 +23,17 @@ #include "redis/rediswriter.thpp" #include "remote/messageorigin.hpp" #include "base/timer.hpp" +#include "base/workqueue.hpp" #include namespace icinga { +struct RedisSubscriptionInfo +{ + std::set EventTypes; +}; + /** * @ingroup redis */ @@ -37,14 +43,25 @@ public: DECLARE_OBJECT(RedisWriter); DECLARE_OBJECTNAME(RedisWriter); + RedisWriter(void); + virtual void Start(bool runtimeCreated) override; virtual void Stop(bool runtimeRemoved) override; private: - void ConnectionThreadProc(void); + void ReconnectTimerHandler(void); + void TryToReconnect(void); void HandleEvents(void); + void HandleEvent(const Dictionary::Ptr& event); + void UpdateSubscriptionsTimerHandler(void); + void UpdateSubscriptions(void); + + Timer::Ptr m_ReconnectTimer; + Timer::Ptr m_SubscriptionTimer; + WorkQueue m_WorkQueue; redisContext *m_Context; + std::map m_Subscriptions; }; }