Implement config dump/update/delete publishing; fix runtime created object creation

refs #4991
This commit is contained in:
Michael Friedrich 2017-03-21 16:11:35 +01:00
parent 75bfd775b9
commit fcd9e3666d
3 changed files with 142 additions and 6 deletions

View File

@ -52,6 +52,7 @@ void RedisWriter::ConfigStaticInitialize(void)
CustomVarObject::OnVarsChanged.connect(boost::bind(&RedisWriter::VarsChangedHandler, _1));
/* triggered on create, update and delete objects */
ConfigObject::OnActiveChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1));
ConfigObject::OnVersionChanged.connect(boost::bind(&RedisWriter::VersionChangedHandler, _1));
}
@ -115,6 +116,27 @@ void RedisWriter::UpdateAllConfigObjects(void)
SendConfigUpdate(object, typeName);
SendStatusUpdate(object, typeName);
}
/* publish config type dump finished */
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "PUBLISH icinga:config:dump %s", typeName.CStr()));
if (!reply3) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (reply3->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "PUBLISH icinga:dump:config:dump " << typeName << ": " << reply3->str;
}
if (reply3->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply3);
return;
}
freeReplyObject(reply3);
}
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "EXEC"));
@ -138,10 +160,19 @@ void RedisWriter::UpdateAllConfigObjects(void)
freeReplyObject(reply3);
}
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName)
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName, bool runtimeUpdate)
{
AssertOnWorkQueue();
/* during startup we might send duplicated object config, ignore them without any connection */
if (!m_Context)
return;
/* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup.
if (!runtimeUpdate && m_ConfigDumpInProgress)
return;
*/
/* Serialize config object attributes */
Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAConfig);
@ -215,6 +246,96 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String
}
freeReplyObject(reply2);
/* publish runtime updated objects immediately */
if (!runtimeUpdate)
return;
/*
PUBLISH "icinga:config:dump" "Host"
PUBLISH "icinga:config:update" "Host:__name!checksumBody"
PUBLISH "icinga:config:delete" "Host:__name"
*/
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "PUBLISH icinga:config:update %s:%s!%s", typeName.CStr(), objectName.CStr(), checkSumBody.CStr()));
if (!reply3) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (reply3->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "PUBLISH icinga:config:update " << typeName << ":" << objectName << "!" << checkSumBody << ": " << reply3->str;
}
if (reply3->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply3);
return;
}
freeReplyObject(reply3);
}
void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object, const String& typeName)
{
AssertOnWorkQueue();
/* during startup we might send duplicated object config, ignore them without any connection */
if (!m_Context)
return;
String objectName = object->GetName();
redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:config:%s %s icinga:config:%s:checksum:%s icinga:status:%s %s",
typeName.CStr(), objectName.CStr(), typeName.CStr(), objectName.CStr(), typeName.CStr(), objectName.CStr()));
if (!reply1) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (reply1->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "DEL icinga:config:" << typeName << " " << objectName
<< " icinga:config:" << typeName << ":checksum:" << objectName
<< " icinga:status:" << typeName << " " << objectName << ": " << reply1->str;
}
if (reply1->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply1);
return;
}
freeReplyObject(reply1);
/*
PUBLISH "icinga:config:dump" "Host"
PUBLISH "icinga:config:update" "Host:__name!checksumBody"
PUBLISH "icinga:config:delete" "Host:__name"
*/
redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "PUBLISH icinga:config:delete %s:%s", typeName.CStr(), objectName.CStr()));
if (!reply2) {
redisFree(m_Context);
m_Context = NULL;
return;
}
if (reply2->type == REDIS_REPLY_ERROR) {
Log(LogInformation, "RedisWriter")
<< "PUBLISH icinga:config:delete " << typeName << ":" << objectName << ": " << reply2->str;
}
if (reply2->type == REDIS_REPLY_ERROR) {
freeReplyObject(reply2);
return;
}
freeReplyObject(reply2);
}
void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName)
@ -226,7 +347,6 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, const String
String jsonBody = JsonEncode(objectAttrs);
//TODO: checksum
String objectName = object->GetName();
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:status:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr()));
@ -264,7 +384,7 @@ void RedisWriter::VarsChangedHandler(const ConfigObject::Ptr& object)
Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw, object, type->GetName()));
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw, object, type->GetName(), true));
}
}
@ -272,7 +392,15 @@ void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, type->GetName()));
if (object->IsActive()) {
/* Create or update the object config */
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, type->GetName(), true));
}
} else {
/* Delete object config */
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(boost::bind(&RedisWriter::SendConfigDelete, rw.get(), object, type->GetName()));
}
}
}

View File

@ -40,6 +40,8 @@ void RedisWriter::Start(bool runtimeCreated)
Log(LogInformation, "RedisWriter")
<< "'" << GetName() << "' started.";
m_ConfigDumpInProgress = false;
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(15);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&RedisWriter::ReconnectTimerHandler, this));
@ -132,7 +134,11 @@ void RedisWriter::TryToReconnect(void)
}
/* Config dump */
m_ConfigDumpInProgress = true;
UpdateAllConfigObjects();
m_ConfigDumpInProgress = false;
}
void RedisWriter::UpdateSubscriptionsTimerHandler(void)

View File

@ -61,7 +61,8 @@ private:
/* config & status dump */
void UpdateAllConfigObjects(void);
void SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName);
void SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName, bool runtimeUpdate = false);
void SendConfigDelete(const ConfigObject::Ptr& object, const String& typeName);
void SendStatusUpdate(const ConfigObject::Ptr& object, const String& typeName);
/* utilities */
@ -88,6 +89,7 @@ private:
WorkQueue m_WorkQueue;
redisContext *m_Context;
std::map<String, RedisSubscriptionInfo> m_Subscriptions;
bool m_ConfigDumpInProgress;
};
struct redis_error : virtual std::exception, virtual boost::exception { };