mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-27 07:34:15 +02:00
parent
fcd9e3666d
commit
844e2bf68c
@ -61,25 +61,7 @@ void RedisWriter::UpdateAllConfigObjects(void)
|
|||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
//TODO: "Publish" the config dump by adding another event, globally or by object
|
//TODO: "Publish" the config dump by adding another event, globally or by object
|
||||||
redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "MULTI"));
|
ExecuteQuery({ "MULTI" });
|
||||||
|
|
||||||
if (!reply1) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply1->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "MULTI: " << reply1->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply1->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply1);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply1);
|
|
||||||
|
|
||||||
for (const Type::Ptr& type : Type::GetAllTypes()) {
|
for (const Type::Ptr& type : Type::GetAllTypes()) {
|
||||||
if (!ConfigObject::TypeInstance->IsAssignableFrom(type))
|
if (!ConfigObject::TypeInstance->IsAssignableFrom(type))
|
||||||
@ -88,25 +70,7 @@ void RedisWriter::UpdateAllConfigObjects(void)
|
|||||||
String typeName = type->GetName();
|
String typeName = type->GetName();
|
||||||
|
|
||||||
/* replace into aka delete insert is faster than a full diff */
|
/* replace into aka delete insert is faster than a full diff */
|
||||||
redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "DEL icinga:config:%s icinga:config:%s:checksum, icinga:status:%s", typeName.CStr(), typeName.CStr(), typeName.CStr()));
|
ExecuteQuery({ "DEL", "icinga:config:" + typeName, "icinga:config:" + typeName + ":checksum", "icinga:status:" + typeName });
|
||||||
|
|
||||||
if (!reply2) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply2->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "DEL icinga:config:" << typeName << " icinga:status:" << typeName << ": " << reply2->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply2->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply2);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply2);
|
|
||||||
|
|
||||||
/* fetch all objects and dump them */
|
/* fetch all objects and dump them */
|
||||||
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
|
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
|
||||||
@ -118,46 +82,10 @@ void RedisWriter::UpdateAllConfigObjects(void)
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* publish config type dump finished */
|
/* publish config type dump finished */
|
||||||
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "PUBLISH icinga:config:dump %s", typeName.CStr()));
|
ExecuteQuery({ "PUBLISH", "icinga:config:dump", typeName });
|
||||||
|
|
||||||
if (!reply3) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
ExecuteQuery({ "EXEC" });
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "PUBLISH icinga:dump:config:dump " << typeName << ": " << reply3->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
}
|
|
||||||
|
|
||||||
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "EXEC"));
|
|
||||||
|
|
||||||
if (!reply3) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "EXEC: " << reply3->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName, bool runtimeUpdate)
|
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String& typeName, bool runtimeUpdate)
|
||||||
@ -180,26 +108,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String
|
|||||||
|
|
||||||
String objectName = object->GetName();
|
String objectName = object->GetName();
|
||||||
|
|
||||||
redisReply *reply1 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:config:%s %s %s", typeName.CStr(), objectName.CStr(), jsonBody.CStr()));
|
ExecuteQuery({ "HSET", "icinga:config:" + typeName, objectName, jsonBody });
|
||||||
|
|
||||||
if (!reply1) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply1->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "HSET icinga:config:" << typeName << " " << objectName << " " << jsonBody << ": " << reply1->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply1->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply1);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply1);
|
|
||||||
|
|
||||||
|
|
||||||
/* check sums */
|
/* check sums */
|
||||||
/* hset icinga:config:Host:checksums localhost { "name_checksum": "...", "properties_checksum": "...", "groups_checksum": "...", "vars_checksum": null } */
|
/* hset icinga:config:Host:checksums localhost { "name_checksum": "...", "properties_checksum": "...", "groups_checksum": "...", "vars_checksum": null } */
|
||||||
@ -227,25 +136,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String
|
|||||||
|
|
||||||
String checkSumBody = JsonEncode(checkSum);
|
String checkSumBody = JsonEncode(checkSum);
|
||||||
|
|
||||||
redisReply *reply2 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "HSET icinga:config:%s:checksum %s %s", typeName.CStr(), objectName.CStr(), checkSumBody.CStr()));
|
ExecuteQuery({ "HSET", "icinga:config:" + typeName + ":checksum", objectName, checkSumBody });
|
||||||
|
|
||||||
if (!reply2) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply2->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "HSET icinga:config:" << typeName << " " << objectName << " " << jsonBody << ": " << reply2->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply2->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply2);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply2);
|
|
||||||
|
|
||||||
/* publish runtime updated objects immediately */
|
/* publish runtime updated objects immediately */
|
||||||
if (!runtimeUpdate)
|
if (!runtimeUpdate)
|
||||||
@ -257,25 +148,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, const String
|
|||||||
PUBLISH "icinga:config:delete" "Host:__name"
|
PUBLISH "icinga:config:delete" "Host:__name"
|
||||||
*/
|
*/
|
||||||
|
|
||||||
redisReply *reply3 = reinterpret_cast<redisReply *>(redisCommand(m_Context, "PUBLISH icinga:config:update %s:%s!%s", typeName.CStr(), objectName.CStr(), checkSumBody.CStr()));
|
ExecuteQuery({ "PUBLISH", "icinga:config:update", typeName + ":" + objectName + "!" + checkSumBody });
|
||||||
|
|
||||||
if (!reply3) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "PUBLISH icinga:config:update " << typeName << ":" << objectName << "!" << checkSumBody << ": " << reply3->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply3->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply3);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object, const String& typeName)
|
void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object, const String& typeName)
|
||||||
|
@ -97,41 +97,13 @@ void RedisWriter::TryToReconnect(void)
|
|||||||
|
|
||||||
String password = GetPassword();
|
String password = GetPassword();
|
||||||
|
|
||||||
if (!password.IsEmpty()) {
|
if (!password.IsEmpty())
|
||||||
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "AUTH %s", password.CStr()));
|
ExecuteQuery({ "AUTH", password });
|
||||||
|
|
||||||
if (!reply) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "AUTH: " << reply->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply);
|
|
||||||
}
|
|
||||||
|
|
||||||
int dbIndex = GetDbIndex();
|
int dbIndex = GetDbIndex();
|
||||||
|
|
||||||
if (dbIndex != 0) {
|
if (dbIndex != 0)
|
||||||
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SELECT %d", dbIndex));
|
ExecuteQuery({ "SELECT", Convert::ToString(dbIndex) });
|
||||||
|
|
||||||
if (!reply) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "SELECT " << dbIndex << ": " << reply->str;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Config dump */
|
/* Config dump */
|
||||||
m_ConfigDumpInProgress = true;
|
m_ConfigDumpInProgress = true;
|
||||||
@ -159,22 +131,7 @@ void RedisWriter::UpdateSubscriptions(void)
|
|||||||
long long cursor = 0;
|
long long cursor = 0;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "SCAN %lld MATCH icinga:subscription:* COUNT 1000", cursor));
|
boost::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", "icinga:subscription:*" });
|
||||||
|
|
||||||
if (!reply) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "SCAN " << cursor << " MATCH icinga:subscription:* COUNT 1000: " << reply->str;
|
|
||||||
|
|
||||||
freeReplyObject(reply);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
VERIFY(reply->type == REDIS_REPLY_ARRAY);
|
VERIFY(reply->type == REDIS_REPLY_ARRAY);
|
||||||
VERIFY(reply->elements % 2 == 0);
|
VERIFY(reply->elements % 2 == 0);
|
||||||
@ -188,30 +145,10 @@ void RedisWriter::UpdateSubscriptions(void)
|
|||||||
redisReply *keyReply = keysReply->element[i];
|
redisReply *keyReply = keysReply->element[i];
|
||||||
VERIFY(keyReply->type == REDIS_REPLY_STRING);
|
VERIFY(keyReply->type == REDIS_REPLY_STRING);
|
||||||
|
|
||||||
redisReply *vreply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "GET %s", keyReply->str));
|
boost::shared_ptr<redisReply> vreply = ExecuteQuery({ "GET", keyReply->str });
|
||||||
|
|
||||||
if (!vreply) {
|
|
||||||
freeReplyObject(reply);
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vreply->type == REDIS_REPLY_STATUS || vreply->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "GET " << keyReply->str << ": " << vreply->str;
|
|
||||||
|
|
||||||
freeReplyObject(vreply);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
subscriptions[keyReply->str] = vreply->str;
|
subscriptions[keyReply->str] = vreply->str;
|
||||||
|
|
||||||
freeReplyObject(vreply);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
freeReplyObject(reply);
|
|
||||||
} while (cursor != 0);
|
} while (cursor != 0);
|
||||||
|
|
||||||
m_Subscriptions.clear();
|
m_Subscriptions.clear();
|
||||||
@ -299,29 +236,7 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
|
|||||||
|
|
||||||
String body = JsonEncode(event);
|
String body = JsonEncode(event);
|
||||||
|
|
||||||
redisReply *reply = reinterpret_cast<redisReply *>(redisCommand(m_Context, "LPUSH icinga:event:%s %s", name.CStr(), body.CStr()));
|
ExecuteQuery({ "LPUSH", "icinga:event:" + name, body });
|
||||||
|
|
||||||
if (!reply) {
|
|
||||||
redisFree(m_Context);
|
|
||||||
m_Context = NULL;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply->type == REDIS_REPLY_STATUS || reply->type == REDIS_REPLY_ERROR) {
|
|
||||||
Log(LogInformation, "RedisWriter")
|
|
||||||
<< "LPUSH icinga:event:" << kv.first << " " << body << ": " << reply->str;
|
|
||||||
|
|
||||||
freeReplyObject(reply);
|
|
||||||
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (reply->type == REDIS_REPLY_ERROR) {
|
|
||||||
freeReplyObject(reply);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
freeReplyObject(reply);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user