Implement event handlers for config and status updates

fixes #5072
fixes #5073
refs #4991
This commit is contained in:
Gunnar Beutner 2017-03-16 09:44:09 +01:00
parent df05e24d52
commit 04be7a1f1e
2 changed files with 112 additions and 10 deletions

View File

@ -18,9 +18,11 @@
******************************************************************************/ ******************************************************************************/
#include "redis/rediswriter.hpp" #include "redis/rediswriter.hpp"
#include "icinga/customvarobject.hpp"
#include "base/json.hpp" #include "base/json.hpp"
#include "base/logger.hpp" #include "base/logger.hpp"
#include "base/serializer.hpp" #include "base/serializer.hpp"
#include "base/initialize.hpp"
using namespace icinga; using namespace icinga;
@ -38,6 +40,18 @@ key: sha1 checksum(name)
value: JsonEncode(Serialize(object, FAState)) value: JsonEncode(Serialize(object, FAState))
*/ */
INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize);
void RedisWriter::ConfigStaticInitialize(void)
{
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
ConfigObject::OnStateChanged.connect(boost::bind(&RedisWriter::StateChangedHandler, _1));
CustomVarObject::OnVarsChanged.connect(boost::bind(&RedisWriter::VarsChangedHandler, _1));
/* triggered on create, update and delete objects */
ConfigObject::OnVersionChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1));
}
//TODO: OnActiveChanged handling. //TODO: OnActiveChanged handling.
void RedisWriter::UpdateAllConfigObjects(void) void RedisWriter::UpdateAllConfigObjects(void)
{ {
@ -52,38 +66,62 @@ void RedisWriter::UpdateAllConfigObjects(void)
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Flushing icinga:config:" << typeName << " before config dump."; << "Flushing icinga:config:" << typeName << " before config dump.";
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:config:%s", typeName.CStr())); redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:config:%s", typeName.CStr()));
if (!reply) { if (!reply1) {
redisFree(m_Context); redisFree(m_Context);
m_Context = NULL; m_Context = NULL;
return; return;
} }
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) { if (reply1->type == REDIS_REPLY_STATUS || reply1->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "DEL icinga:config:" << typeName << ": " << reply->str; << "DEL icinga:config:" << typeName << ": " << reply1->str;
} }
if (reply->type == REDIS_REPLY_ERROR) { if (reply1->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply); freeReplyObject(reply1);
return; return;
} }
freeReplyObject(reply); freeReplyObject(reply1);
Log(LogInformation, "RedisWriter")
<< "Flushing icinga:status:" << typeName << " before config dump.";
redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:status:%s", typeName.CStr()));
if (!reply2) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (reply2->type == REDIS_REPLY_STATUS || reply2->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "DEL icinga:status:" << typeName << ": " << reply2->str;
}
if (reply2->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply2);
return;
}
freeReplyObject(reply2);
/* fetch all objects and dump them */ /* fetch all objects and dump them */
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get()); ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
if (ctype) { if (ctype) {
for (const ConfigObject::Ptr& object : ctype->GetObjects()) { for (const ConfigObject::Ptr& object : ctype->GetObjects()) {
DumpConfigObject(object, typeName); SendConfigUpdate(object, typeName);
SendStatusUpdate(object, typeName);
} }
} }
} }
} }
void RedisWriter::DumpConfigObject(const ConfigObject::Ptr& object, const String& typeName) void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName)
{ {
/* Serialize config object attributes */ /* Serialize config object attributes */
Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAConfig); Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAConfig);
@ -114,6 +152,37 @@ void RedisWriter::DumpConfigObject(const ConfigObject::Ptr& object, const String
freeReplyObject(reply); freeReplyObject(reply);
} }
void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName)
{
/* Serialize config object attributes */
Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAState);
String jsonBody = JsonEncode(objectAttrs);
//TODO: checksum
String objectName = object->GetName();
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:status:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr()));
if (!reply) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "HSET icinga:status:" << typeName << " " << objectName << " " << jsonBody << ": " << reply->str;
}
if (reply->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply);
return;
}
freeReplyObject(reply);
}
Dictionary::Ptr RedisWriter::SerializeObjectAttrs(const Object::Ptr& object, int fieldType) Dictionary::Ptr RedisWriter::SerializeObjectAttrs(const Object::Ptr& object, int fieldType)
{ {
Type::Ptr type = object->GetReflectionType(); Type::Ptr type = object->GetReflectionType();
@ -150,3 +219,29 @@ Dictionary::Ptr RedisWriter::SerializeObjectAttrs(const Object::Ptr& object, int
return resultAttrs; return resultAttrs;
} }
void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->SendStatusUpdate(object, type->GetName());
}
}
void RedisWriter::VarsChangedHandler(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->SendConfigUpdate(object, type->GetName());
}
}
void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->SendConfigUpdate(object, type->GetName());
}
}

View File

@ -45,6 +45,8 @@ public:
RedisWriter(void); RedisWriter(void);
static void ConfigStaticInitialize(void);
virtual void Start(bool runtimeCreated) override; virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override; virtual void Stop(bool runtimeRemoved) override;
@ -59,9 +61,14 @@ private:
/* config dump */ /* config dump */
void UpdateAllConfigObjects(void); void UpdateAllConfigObjects(void);
void DumpConfigObject(const ConfigObject::Ptr& object, const String& typeName); void SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName);
void SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName);
static Dictionary::Ptr SerializeObjectAttrs(const Object::Ptr& object, int fieldType); static Dictionary::Ptr SerializeObjectAttrs(const Object::Ptr& object, int fieldType);
static void StateChangedHandler(const ConfigObject::Ptr& object);
static void VarsChangedHandler(const ConfigObject::Ptr& object);
static void VersionChangedHandler(const ConfigObject::Ptr& object);
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_SubscriptionTimer; Timer::Ptr m_SubscriptionTimer;
WorkQueue m_WorkQueue; WorkQueue m_WorkQueue;