From fcd9e3666d76034342649058393303a936b8d3c7 Mon Sep 17 00:00:00 2001 From: Michael Friedrich Date: Tue, 21 Mar 2017 16:11:35 +0100 Subject: [PATCH] Implement config dump/update/delete publishing; fix runtime created object creation refs #4991 --- lib/redis/rediswriter-config.cpp | 138 +++++++++++++++++++++++++++++-- lib/redis/rediswriter.cpp | 6 ++ lib/redis/rediswriter.hpp | 4 +- 3 files changed, 142 insertions(+), 6 deletions(-) diff --git a/lib/redis/rediswriter-config.cpp b/lib/redis/rediswriter-config.cpp index c20971899..a01af7a02 100644 --- a/lib/redis/rediswriter-config.cpp +++ b/lib/redis/rediswriter-config.cpp @@ -52,6 +52,7 @@ void RedisWriter::ConfigStaticInitialize(void) CustomVarObject::OnVarsChanged.connect(boost::bind(&RedisWriter::VarsChangedHandler, _1)); /* triggered on create, update and delete objects */ + ConfigObject::OnActiveChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1)); ConfigObject::OnVersionChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1)); } @@ -115,6 +116,27 @@ void RedisWriter::UpdateAllConfigObjects(void) SendConfigUpdate(object, typeName); SendStatusUpdate(object, typeName); } + + /* publish config type dump finished */ + redisReply *reply3 = reinterpret_cast(redisCommand(m_Context, "PUBLISH icinga:config:dump %s", typeName.CStr())); + + if (!reply3) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (reply3->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "PUBLISH icinga:dump:config:dump " << typeName << ": " << reply3->str; + } + + if (reply3->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply3); + return; + } + + freeReplyObject(reply3); } redisReply *reply3 = reinterpret_cast(redisCommand(m_Context, "EXEC")); @@ -138,10 +160,19 @@ void RedisWriter::UpdateAllConfigObjects(void) freeReplyObject(reply3); } -void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName) +void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName, bool runtimeUpdate) { AssertOnWorkQueue(); + /* during startup we might send duplicated object config, ignore them without any connection */ + if (!m_Context) + return; + + /* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup. + if (!runtimeUpdate && m_ConfigDumpInProgress) + return; + */ + /* Serialize config object attributes */ Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAConfig); @@ -215,6 +246,96 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String } freeReplyObject(reply2); + + /* publish runtime updated objects immediately */ + if (!runtimeUpdate) + return; + + /* + PUBLISH "icinga:config:dump" "Host" + PUBLISH "icinga:config:update" "Host:__name!checksumBody" + PUBLISH "icinga:config:delete" "Host:__name" + */ + + redisReply *reply3 = reinterpret_cast(redisCommand(m_Context, "PUBLISH icinga:config:update %s:%s!%s", typeName.CStr(), objectName.CStr(), checkSumBody.CStr())); + + if (!reply3) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (reply3->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "PUBLISH icinga:config:update " << typeName << ":" << objectName << "!" << checkSumBody << ": " << reply3->str; + } + + if (reply3->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply3); + return; + } + + freeReplyObject(reply3); +} + +void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object, const String& typeName) +{ + AssertOnWorkQueue(); + + /* during startup we might send duplicated object config, ignore them without any connection */ + if (!m_Context) + return; + + String objectName = object->GetName(); + + redisReply *reply1 = reinterpret_cast(redisCommand(m_Context, "DEL icinga:config:%s %s icinga:config:%s:checksum:%s icinga:status:%s %s", + typeName.CStr(), objectName.CStr(), typeName.CStr(), objectName.CStr(), typeName.CStr(), objectName.CStr())); + + if (!reply1) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (reply1->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "DEL icinga:config:" << typeName << " " << objectName + << " icinga:config:" << typeName << ":checksum:" << objectName + << " icinga:status:" << typeName << " " << objectName << ": " << reply1->str; + } + + if (reply1->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply1); + return; + } + + freeReplyObject(reply1); + + /* + PUBLISH "icinga:config:dump" "Host" + PUBLISH "icinga:config:update" "Host:__name!checksumBody" + PUBLISH "icinga:config:delete" "Host:__name" + */ + + redisReply *reply2 = reinterpret_cast(redisCommand(m_Context, "PUBLISH icinga:config:delete %s:%s", typeName.CStr(), objectName.CStr())); + + if (!reply2) { + redisFree(m_Context); + m_Context = NULL; + return; + } + + if (reply2->type == REDIS_REPLY_ERROR) { + Log(LogInformation, "RedisWriter") + << "PUBLISH icinga:config:delete " << typeName << ":" << objectName << ": " << reply2->str; + } + + if (reply2->type == REDIS_REPLY_ERROR) { + freeReplyObject(reply2); + return; + } + + freeReplyObject(reply2); } void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName) @@ -226,7 +347,6 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String String jsonBody = JsonEncode(objectAttrs); - //TODO: checksum String objectName = object->GetName(); redisReply *reply = reinterpret_cast(redisCommand(m_Context, "HSET icinga:status:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr())); @@ -264,7 +384,7 @@ void RedisWriter::VarsChangedHandler(const ConfigObject::Ptr& object) Type::Ptr type = object->GetReflectionType(); for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw, object, type->GetName())); + rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw, object, type->GetName(), true)); } } @@ -272,7 +392,15 @@ void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object) { Type::Ptr type = object->GetReflectionType(); - for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, type->GetName())); + if (object->IsActive()) { + /* Create or update the object config */ + for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, type->GetName(), true)); + } + } else { + /* Delete object config */ + for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigDelete, rw.get(), object, type->GetName())); + } } } diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index 77d6f2989..28a6173df 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -40,6 +40,8 @@ void RedisWriter::Start(bool runtimeCreated) Log(LogInformation, "RedisWriter") << "'" << GetName() << "' started."; + m_ConfigDumpInProgress = false; + m_ReconnectTimer = new Timer(); m_ReconnectTimer->SetInterval(15); m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::ReconnectTimerHandler, this)); @@ -132,7 +134,11 @@ void RedisWriter::TryToReconnect(void) } /* Config dump */ + m_ConfigDumpInProgress = true; + UpdateAllConfigObjects(); + + m_ConfigDumpInProgress = false; } void RedisWriter::UpdateSubscriptionsTimerHandler(void) diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index 362feced6..a32304f0b 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -61,7 +61,8 @@ private: /* config & status dump */ void UpdateAllConfigObjects(void); - void SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName); + void SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName, bool runtimeUpdate = false); + void SendConfigDelete(const ConfigObject::Ptr& object, const String& typeName); void SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName); /* utilities */ @@ -88,6 +89,7 @@ private: WorkQueue m_WorkQueue; redisContext *m_Context; std::map m_Subscriptions; + bool m_ConfigDumpInProgress; }; struct redis_error : virtual std::exception, virtual boost::exception { };