From 04be7a1f1e63854e520e42daae41150a759e1d94 Mon Sep 17 00:00:00 2001 From: Gunnar Beutner Date: Thu, 16 Mar 2017 09:44:09 +0100 Subject: [PATCH] Implement event handlers for config and status updates fixes #5072 fixes #5073 refs #4991 --- lib/redis/rediswriter-config.cpp | 113 ++++++++++++++++++++++++++++--- lib/redis/rediswriter.hpp | 9 ++- 2 files changed, 112 insertions(+), 10 deletions(-) diff --git a/lib/redis/rediswriter-config.cpp b/lib/redis/rediswriter-config.cpp index 4352b3966..695f8de3c 100644 --- a/lib/redis/rediswriter-config.cpp +++ b/lib/redis/rediswriter-config.cpp @@ -18,9 +18,11 @@ ******************************************************************************/ #include "redis/rediswriter.hpp" +#include "icinga/customvarobject.hpp" #include "base/json.hpp" #include "base/logger.hpp" #include "base/serializer.hpp" +#include "base/initialize.hpp" using namespace icinga; @@ -38,6 +40,18 @@ key: sha1 checksum(name) value: JsonEncode(Serialize(object, FAState)) */ +INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize); + +void RedisWriter::ConfigStaticInitialize(void) +{ + /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */ + ConfigObject::OnStateChanged.connect(boost::bind(&RedisWriter::StateChangedHandler, _1)); + CustomVarObject::OnVarsChanged.connect(boost::bind(&RedisWriter::VarsChangedHandler, _1)); + + /* triggered on create, update and delete objects */ + ConfigObject::OnVersionChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1)); +} + //TODO: OnActiveChanged handling. void RedisWriter::UpdateAllConfigObjects(void) { @@ -52,38 +66,62 @@ void RedisWriter::UpdateAllConfigObjects(void) Log(LogInformation, "RedisWriter") << "Flushing icinga:config:" << typeName << " before config dump."; - redisReply *reply = reinterpret_cast(redisCommand(m_Context, "DEL icinga:config:%s", typeName.CStr())); + redisReply *reply1 = reinterpret_cast(redisCommand(m_Context, "DEL icinga:config:%s", typeName.CStr())); - if (!reply) { + if (!reply1) { redisFree(m_Context); m_Context = NULL; return; } - if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { + if (reply1->type == REDIS_REPLY_STATUS || reply1->type == REDIS_REPLY_ERROR) { Log(LogInformation, "RedisWriter") - << "DEL icinga:config:" << typeName << ": " << reply->str; + << "DEL icinga:config:" << typeName << ": " << reply1->str; } - if (reply->type == REDIS_REPLY_ERROR) { - freeReplyObject(reply); + if (reply1->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply1); return; } - freeReplyObject(reply); + freeReplyObject(reply1); + + Log(LogInformation, "RedisWriter") + << "Flushing icinga:status:" << typeName << " before config dump."; + + redisReply *reply2 = reinterpret_cast(redisCommand(m_Context, "DEL icinga:status:%s", typeName.CStr())); + + if (!reply2) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (reply2->type == REDIS_REPLY_STATUS || reply2->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "DEL icinga:status:" << typeName << ": " << reply2->str; + } + + if (reply2->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply2); + return; + } + + freeReplyObject(reply2); /* fetch all objects and dump them */ ConfigType *ctype = dynamic_cast(type.get()); if (ctype) { for (const ConfigObject::Ptr& object : ctype->GetObjects()) { - DumpConfigObject(object, typeName); + SendConfigUpdate(object, typeName); + SendStatusUpdate(object, typeName); } } } } -void RedisWriter::DumpConfigObject(const ConfigObject::Ptr& object, const String& typeName) +void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName) { /* Serialize config object attributes */ Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAConfig); @@ -114,6 +152,37 @@ void RedisWriter::DumpConfigObject(const ConfigObject::Ptr& object, const String freeReplyObject(reply); } +void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName) +{ + /* Serialize config object attributes */ + Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAState); + + String jsonBody = JsonEncode(objectAttrs); + + //TODO: checksum + String objectName = object->GetName(); + + redisReply *reply = reinterpret_cast(redisCommand(m_Context, "HSET icinga:status:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr())); + + if (!reply) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "HSET icinga:status:" << typeName << " " << objectName << " " << jsonBody << ": " << reply->str; + } + + if (reply->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply); + return; + } + + freeReplyObject(reply); +} + Dictionary::Ptr RedisWriter::SerializeObjectAttrs(const Object::Ptr& object, int fieldType) { Type::Ptr type = object->GetReflectionType(); @@ -150,3 +219,29 @@ Dictionary::Ptr RedisWriter::SerializeObjectAttrs(const Object::Ptr& object, int return resultAttrs; } +void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object) +{ + Type::Ptr type = object->GetReflectionType(); + + for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { + rw->SendStatusUpdate(object, type->GetName()); + } +} + +void RedisWriter::VarsChangedHandler(const ConfigObject::Ptr& object) +{ + Type::Ptr type = object->GetReflectionType(); + + for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { + rw->SendConfigUpdate(object, type->GetName()); + } +} + +void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object) +{ + Type::Ptr type = object->GetReflectionType(); + + for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { + rw->SendConfigUpdate(object, type->GetName()); + } +} diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index e7892ac7e..ad24c4184 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -45,6 +45,8 @@ public: RedisWriter(void); + static void ConfigStaticInitialize(void); + virtual void Start(bool runtimeCreated) override; virtual void Stop(bool runtimeRemoved) override; @@ -59,9 +61,14 @@ private: /* config dump */ void UpdateAllConfigObjects(void); - void DumpConfigObject(const ConfigObject::Ptr& object, const String& typeName); + void SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName); + void SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName); static Dictionary::Ptr SerializeObjectAttrs(const Object::Ptr& object, int fieldType); + static void StateChangedHandler(const ConfigObject::Ptr& object); + static void VarsChangedHandler(const ConfigObject::Ptr& object); + static void VersionChangedHandler(const ConfigObject::Ptr& object); + Timer::Ptr m_ReconnectTimer; Timer::Ptr m_SubscriptionTimer; WorkQueue m_WorkQueue;