Rewrite the RedisWriter

There should be more atomic commits but the whole thing was a mess. This
commit changes the synchrounous Redis connection into an asynchronous
one in its own class RedisConnection.
The RedisConnection uses a Workqueue with commands to fire against the
Redis server. When a response is required a callback must be supplied,
refer to RedisWriter::RedisGet().

Known Issues:
- Authentication has no error handling and can break the connection
- Error handling in general is iffy due to the nature of the async redis
connection
- Getting a reply out of RedisConnection is not trivial
- HandleRW... sunt dracones
This commit is contained in:
Jean Flach 2018-10-18 14:39:59 +02:00 committed by Michael Friedrich
parent 64515b81e3
commit 6aaa35a0a6
6 changed files with 447 additions and 410 deletions

View File

@ -20,30 +20,31 @@
#include "base/object.hpp"
#include "redis/redisconnection.hpp"
#include "base/workqueue.hpp"
#include <hiredis/hiredis.h>
#include <base/logger.hpp>
#include "base/logger.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include "redis/rediswriter.hpp"
#include "hiredis/hiredis.h"
using namespace icinga;
/*
struct redis_error : virtual std::exception, virtual boost::exception { };
struct errinfo_redis_query_;
typedef boost::error_info<struct errinfo_redis_query_, std::string> errinfo_redis_query;
*/
RedisConnection::RedisConnection(const String host, const int port, const String path) :
m_Host(host), m_Port(port), m_Path(path) {
RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) :
m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL)
{
m_RedisConnectionWorkQueue.SetName("RedisConnection");
}
void RedisConnection::StaticInitialize()
{
}
void RedisConnection::Start()
{
RedisConnection::Connect();
std::thread thread(std::bind(&RedisConnection::HandleRW, this));
thread.detach();
}
void RedisConnection::AssertOnWorkQueue()
@ -51,11 +52,31 @@ void RedisConnection::AssertOnWorkQueue()
ASSERT(m_RedisConnectionWorkQueue.IsWorkerThread());
}
void RedisConnection::Connect() {
void RedisConnection::HandleRW()
{
Utility::SetThreadName("RedisConnection Handler");
for (;;) {
try {
{
boost::mutex::scoped_lock lock(m_CMutex);
redisAsyncHandleWrite(m_Context);
redisAsyncHandleRead(m_Context);
}
Utility::Sleep(0.1);
} catch (const std::exception&) {
Log(LogCritical, "RedisWriter", "Internal Redis Error");
}
}
}
void RedisConnection::Connect()
{
if (m_Context)
return;
Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async");
boost::mutex::scoped_lock lock(m_CMutex);
if (m_Path.IsEmpty())
m_Context = redisAsyncConnect(m_Host.CStr(), m_Port);
@ -78,7 +99,16 @@ void RedisConnection::Connect() {
redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback);
//TODO: Authentication, DB selection, error handling
/* TODO: This currently does not work properly:
* In case of error the connection is broken, yet the Context is not set to faulty. May be a bug with hiredis.
* Error case: Password does not match, or even: "Client sent AUTH, but no password is set" which also results in an error.
*/
if (!m_Password.IsEmpty()) {
ExecuteQuery({"AUTH", m_Password});
}
if (m_DbIndex != 0)
ExecuteQuery({"SELECT", Convert::ToString(m_DbIndex)});
}
void RedisConnection::Disconnect()
@ -86,19 +116,31 @@ void RedisConnection::Disconnect()
redisAsyncDisconnect(m_Context);
}
void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) {
void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status)
{
if (status == REDIS_OK)
Log(LogInformation, "RedisWriter") << "Redis disconnected by us";
else
Log(LogCritical, "Rediswriter") << "Redis disconnected for reasons";
else {
if (c->err != 0)
Log(LogCritical, "RedisWriter") << "Redis disconnected by server. Reason: " << c->errstr;
else
Log(LogCritical, "RedisWriter") << "Redis disconnected by server";
}
}
bool RedisConnection::IsConnected()
{
return (REDIS_CONNECTED & m_Context->c.flags) == REDIS_CONNECTED;
}
void RedisConnection::ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn, void *privdata)
{
m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata));
}
void RedisConnection::ExecuteQueries(const std::vector<std::vector<String> >& queries, redisCallbackFn *fn, void *privdata)
void
RedisConnection::ExecuteQueries(const std::vector<std::vector<String> >& queries, redisCallbackFn *fn, void *privdata)
{
for (const auto& query : queries) {
m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata));
@ -109,6 +151,8 @@ void RedisConnection::SendMessageInternal(const std::vector<String>& query, redi
{
AssertOnWorkQueue();
boost::mutex::scoped_lock lock(m_CMutex);
if (!m_Context) {
Log(LogCritical, "RedisWriter")
<< "Connection lost";
@ -120,16 +164,21 @@ void RedisConnection::SendMessageInternal(const std::vector<String>& query, redi
argv = new const char *[query.size()];
argvlen = new size_t[query.size()];
String debugstr;
for (std::vector<String>::size_type i = 0; i < query.size(); i++) {
argv[i] = query[i].CStr();
argvlen[i] = query[i].GetLength();
debugstr += argv[i];
debugstr += " ";
}
Log(LogDebug, "RedisWriter, Connection")
<< "Sending Command: " << debugstr;
int r = redisAsyncCommandArgv(m_Context, fn, privdata, query.size(), argv, argvlen);
delete [] argv;
delete [] argvlen;
delete[] argv;
delete[] argvlen;
if (r == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisWriter")
@ -137,8 +186,7 @@ void RedisConnection::SendMessageInternal(const std::vector<String>& query, redi
BOOST_THROW_EXCEPTION(
redis_error()
<< errinfo_redis_query("FUCK")
<< errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false))
);
}
}
}

View File

@ -24,43 +24,62 @@
#include "base/object.hpp"
#include "base/workqueue.hpp"
namespace icinga {
namespace icinga
{
/**
* An Async Redis connection.
*
* @ingroup redis
*/
class RedisConnection final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(RedisConnection);
class RedisConnection final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(RedisConnection);
RedisConnection(const String host, const int port, const String path);
RedisConnection(const String host, const int port, const String path, const String password = "", const int db = 0);
void Start();
void Start();
void Connect();
void Disconnect();
void Connect();
void ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn = NULL, void *privdata = NULL);
void ExecuteQueries(const std::vector<std::vector<String> >& queries, redisCallbackFn *fn = NULL, void *privdata = NULL);
void Disconnect();
bool IsConnected();
private:
static void StaticInitialize();
void SendMessageInternal(const std::vector<String>& query, redisCallbackFn *fn, void *privdata);
void AssertOnWorkQueue();
static void DisconnectCallback(const redisAsyncContext *c, int status);
void ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn = NULL, void *privdata = NULL);
WorkQueue m_RedisConnectionWorkQueue{100000};
void ExecuteQueries(const std::vector<std::vector<String> >& queries, redisCallbackFn *fn = NULL,
void *privdata = NULL);
redisAsyncContext *m_Context;
private:
static void StaticInitialize();
String m_Path;
String m_Host;
int m_Port;
};
void SendMessageInternal(const std::vector<String>& query, redisCallbackFn *fn, void *privdata);
void AssertOnWorkQueue();
void HandleRW();
static void DisconnectCallback(const redisAsyncContext *c, int status);
WorkQueue m_RedisConnectionWorkQueue{100000};
Timer::Ptr m_EventLoop;
redisAsyncContext *m_Context;
String m_Path;
String m_Host;
int m_Port;
String m_Password;
int m_DbIndex;
boost::mutex m_CMutex;
};
struct redis_error : virtual std::exception, virtual boost::exception { };
struct errinfo_redis_query_;
typedef boost::error_info<struct errinfo_redis_query_, std::string> errinfo_redis_query;
}
#endif //REDISCONNECTION_H

View File

@ -18,6 +18,7 @@
******************************************************************************/
#include "redis/rediswriter.hpp"
#include "redis/redisconnection.hpp"
#include "icinga/command.hpp"
#include "base/configtype.hpp"
#include "base/configobject.hpp"
@ -39,6 +40,7 @@
#include "base/initialize.hpp"
#include "base/convert.hpp"
#include "base/array.hpp"
#include "base/exception.hpp"
#include <map>
#include <set>
@ -56,77 +58,126 @@ void RedisWriter::ConfigStaticInitialize()
ConfigObject::OnVersionChanged.connect(std::bind(&RedisWriter::VersionChangedHandler, _1));
}
void RedisWriter::UpdateAllConfigObjects(void)
void RedisWriter::UpdateAllConfigObjects()
{
AssertOnWorkQueue();
double startTime = Utility::GetTime();
for (const Type::Ptr& type : Type::GetAllTypes()) {
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
m_Rcon->ExecuteQuery({"flushall"});
// Use a Workqueue to pack objects in parallel
WorkQueue upq(25000, Configuration::Concurrency);
upq.SetName("RedisWriter:ConfigDump");
typedef std::pair<ConfigType*, String> TypePair;
std::vector<TypePair> types;
for (const Type::Ptr& type : Type::GetAllTypes()) {
ConfigType *ctype = dynamic_cast<ConfigType *>(type.get());
if (!ctype)
continue;
auto lcType(type->GetName().ToLower());
String lcType (type->GetName().ToLower());
types.emplace_back(ctype, lcType);
m_Rcon->ExecuteQuery({"DEL", m_PrefixConfigCheckSum + lcType, m_PrefixConfigObject + lcType, m_PrefixStatusObject + lcType});
}
ExecuteQuery({"MULTI"});
upq.ParallelFor(types, [this](const TypePair& type) {
size_t bulkCounter = 0;
auto attributes = new std::vector<String>();
attributes->emplace_back("HMSET");
attributes->emplace_back(m_PrefixConfigObject + type.second);
auto customVars = new std::vector<String>();
customVars->emplace_back("HMSET");
customVars->emplace_back(m_PrefixConfigCustomVar + type.second);
auto checksums = new std::vector<String>();
checksums->emplace_back("HMSET");
checksums->emplace_back(m_PrefixConfigCheckSum + type.second);
/* Delete obsolete object keys first. */
ExecuteQuery(
{"DEL", m_PrefixConfigCheckSum + lcType, m_PrefixConfigObject + lcType, m_PrefixStatusObject + lcType});
/* fetch all objects and dump them */
for (const ConfigObject::Ptr& object : ctype->GetObjects()) {
SendConfigUpdate(object, false);
SendStatusUpdate(object, false);
for (const ConfigObject::Ptr& object : type.first->GetObjects()) {
CreateConfigUpdate(object, *attributes, *customVars, *checksums, false);
SendStatusUpdate(object);
bulkCounter ++;
if (!bulkCounter % 100) {
if (attributes->size() > 2) {
m_Rcon->ExecuteQuery(*attributes);
attributes->erase(attributes->begin() + 2, attributes->end());
}
if (customVars->size() > 2) {
m_Rcon->ExecuteQuery(*customVars);
customVars->erase(customVars->begin() + 2, customVars->end());
}
if (checksums->size() > 2) {
m_Rcon->ExecuteQuery(*checksums);
checksums->erase(checksums->begin() + 2, checksums->end());
}
}
}
if (attributes->size() > 2)
m_Rcon->ExecuteQuery(*attributes);
if (customVars->size() > 2)
m_Rcon->ExecuteQuery(*customVars);
if (checksums->size() > 2)
m_Rcon->ExecuteQuery(*checksums);
/* publish config type dump finished */
ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType});
Log(LogNotice, "RedisWriter")
<< "Dumped " << bulkCounter << " objects of type " << type.second;
});
ExecuteQuery({"EXEC"});
upq.Join();
if (upq.HasExceptions()) {
for (auto exc : upq.GetExceptions()) {
Log(LogCritical, "RedisWriter")
<< "Exception during ConfigDump: " << exc;
}
}
Log(LogInformation, "RedisWriter")
<< "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds.";
}
static ConfigObject::Ptr GetHostGroup(const String& name)
template <typename ConfigType>
static ConfigObject::Ptr GetObjectByName(const String& name)
{
return ConfigObject::GetObject<HostGroup>(name);
return ConfigObject::GetObject<ConfigType>(name);
}
static ConfigObject::Ptr GetServiceGroup(const String& name)
// Used to update a single object, used for runtime updates
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate)
{
return ConfigObject::GetObject<ServiceGroup>(name);
String typeName = object->GetReflectionType()->GetName().ToLower();
auto attributes = new std::vector<String>();
attributes->emplace_back("HSET");
attributes->emplace_back(m_PrefixConfigObject +typeName);
auto customVars = new std::vector<String>();
customVars->emplace_back("HSET");
customVars->emplace_back(m_PrefixConfigCustomVar + typeName);
auto checksums = new std::vector<String>();
checksums->emplace_back("HSET");
checksums->emplace_back(m_PrefixConfigCheckSum +typeName);
CreateConfigUpdate(object, *attributes, *customVars, *checksums, runtimeUpdate);
m_Rcon->ExecuteQuery(*attributes);
m_Rcon->ExecuteQuery(*customVars);
m_Rcon->ExecuteQuery(*checksums);
}
static ConfigObject::Ptr GetUserGroup(const String& name)
/* Creates a config update with computed checksums etc.
* Writes attributes, customVars and checksums into the respective supplied vectors. Adds two values to each vector
* (if applicable), first the key then the value. To use in a Redis command the command (e.g. HSET) and the key (e.g.
* icinga:config:object:downtime) need to be prepended. There is nothing to indicate success or failure.
*/
void RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, std::vector<String>& attributes, std::vector<String>& customVars, std::vector<String>& checksums, bool runtimeUpdate)
{
return ConfigObject::GetObject<UserGroup>(name);
}
static ConfigObject::Ptr GetClude(const String& name)
{
return ConfigObject::GetObject<TimePeriod>(name);
}
void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate)
{
AssertOnWorkQueue();
/* during startup we might send duplicated object config, ignore them without any connection */
if (!m_Context)
return;
/* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup.
if (!runtimeUpdate && m_ConfigDumpInProgress)
return;
*/
if (useTransaction)
ExecuteQuery({"MULTI"});
if (m_Rcon == nullptr)
return;
/* Calculate object specific checksums and store them in a different namespace. */
Type::Ptr type = object->GetReflectionType();
@ -158,7 +209,6 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
}
User::Ptr user = dynamic_pointer_cast<User>(object);
if (user) {
propertiesBlacklist.emplace("groups");
@ -166,7 +216,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
ConfigObject::Ptr (*getGroup)(const String& name);
groups = user->GetGroups();
getGroup = &::GetUserGroup;
getGroup = &::GetObjectByName<UserGroup>;
checkSums->Set("groups_checksum", CalculateCheckSumArray(groups));
@ -188,7 +238,6 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
}
Notification::Ptr notification = dynamic_pointer_cast<Notification>(object);
if (notification) {
Host::Ptr host;
Service::Ptr service;
@ -236,7 +285,6 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
/* Calculate checkable checksums. */
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) {
/* groups_checksum, group_checksums */
propertiesBlacklist.emplace("groups");
@ -251,13 +299,13 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
if (service) {
groups = service->GetGroups();
getGroup = &::GetServiceGroup;
getGroup = &::GetObjectByName<ServiceGroup>;
/* Calculate the host_checksum */
checkSums->Set("host_checksum", GetObjectIdentifier(host));
} else {
groups = host->GetGroups();
getGroup = &::GetHostGroup;
getGroup = &::GetObjectByName<HostGroup>;
}
checkSums->Set("groups_checksum", CalculateCheckSumArray(groups));
@ -298,139 +346,137 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
checkSums->Set("notes_url_checksum", CalculateCheckSumString(notesUrl));
if (!iconImage.IsEmpty())
checkSums->Set("icon_image_checksum", CalculateCheckSumString(iconImage));
} else {
Zone::Ptr zone = dynamic_pointer_cast<Zone>(object);
}
if (zone) {
propertiesBlacklist.emplace("endpoints");
Zone::Ptr zone = dynamic_pointer_cast<Zone>(object);
if (zone) {
propertiesBlacklist.emplace("endpoints");
auto endpointObjects = zone->GetEndpoints();
Array::Ptr endpoints = new Array();
endpoints->Resize(endpointObjects.size());
auto endpointObjects = zone->GetEndpoints();
Array::Ptr endpoints = new Array();
endpoints->Resize(endpointObjects.size());
Array::SizeType i = 0;
for (auto& endpointObject : endpointObjects) {
endpoints->Set(i++, endpointObject->GetName());
Array::SizeType i = 0;
for (auto& endpointObject : endpointObjects) {
endpoints->Set(i++, endpointObject->GetName());
}
checkSums->Set("endpoints_checksum", CalculateCheckSumArray(endpoints));
Array::Ptr parents(new Array);
for (auto& parent : zone->GetAllParentsRaw()) {
parents->Add(GetObjectIdentifier(parent));
}
checkSums->Set("all_parents_checksums", parents);
checkSums->Set("all_parents_checksum", HashValue(zone->GetAllParents()));
}
/* zone_checksum for endpoints already is calculated above. */
auto command(dynamic_pointer_cast<Command>(object));
if (command) {
Dictionary::Ptr arguments = command->GetArguments();
Dictionary::Ptr argumentChecksums = new Dictionary;
if (arguments) {
ObjectLock argumentsLock(arguments);
for (auto& kv : arguments) {
argumentChecksums->Set(kv.first, HashValue(kv.second));
}
}
checkSums->Set("endpoints_checksum", CalculateCheckSumArray(endpoints));
checkSums->Set("arguments_checksum", HashValue(arguments));
checkSums->Set("argument_checksums", argumentChecksums);
propertiesBlacklist.emplace("arguments");
Array::Ptr parents(new Array);
Dictionary::Ptr envvars = command->GetEnv();
Dictionary::Ptr envvarChecksums = new Dictionary;
for (auto& parent : zone->GetAllParentsRaw()) {
parents->Add(GetObjectIdentifier(parent));
if (envvars) {
ObjectLock argumentsLock(envvars);
for (auto& kv : envvars) {
envvarChecksums->Set(kv.first, HashValue(kv.second));
}
}
checkSums->Set("all_parents_checksums", parents);
checkSums->Set("all_parents_checksum", HashValue(zone->GetAllParents()));
checkSums->Set("envvars_checksum", HashValue(envvars));
checkSums->Set("envvar_checksums", envvarChecksums);
propertiesBlacklist.emplace("env");
}
auto timeperiod(dynamic_pointer_cast<TimePeriod>(object));
if (timeperiod) {
Dictionary::Ptr ranges = timeperiod->GetRanges();
checkSums->Set("ranges_checksum", HashValue(ranges));
propertiesBlacklist.emplace("ranges");
// Compute checksums for Includes (like groups)
Array::Ptr includes;
ConfigObject::Ptr (*getInclude)(const String& name);
includes = timeperiod->GetIncludes();
getInclude = &::GetObjectByName<TimePeriod>;
checkSums->Set("includes_checksum", CalculateCheckSumArray(includes));
Array::Ptr includeChecksums = new Array();
ObjectLock includesLock(includes);
ObjectLock includeChecksumsLock(includeChecksums);
for (auto include : includes) {
includeChecksums->Add(GetObjectIdentifier((*getInclude)(include.Get<String>())));
}
checkSums->Set("include_checksums", includeChecksums);
// Compute checksums for Excludes (like groups)
Array::Ptr excludes;
ConfigObject::Ptr (*getExclude)(const String& name);
excludes = timeperiod->GetExcludes();
getExclude = &::GetObjectByName<TimePeriod>;
checkSums->Set("excludes_checksum", CalculateCheckSumArray(excludes));
Array::Ptr excludeChecksums = new Array();
ObjectLock excludesLock(excludes);
ObjectLock excludeChecksumsLock(excludeChecksums);
for (auto exclude : excludes) {
excludeChecksums->Add(GetObjectIdentifier((*getExclude)(exclude.Get<String>())));
}
checkSums->Set("exclude_checksums", excludeChecksums);
}
icinga::Comment::Ptr comment = dynamic_pointer_cast<Comment>(object);
if (comment) {
propertiesBlacklist.emplace("name");
propertiesBlacklist.emplace("host_name");
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(comment->GetCheckable());
if (service) {
propertiesBlacklist.emplace("service_name");
checkSums->Set("service_checksum", GetObjectIdentifier(service));
typeName = "servicecomment";
} else {
/* zone_checksum for endpoints already is calculated above. */
auto command(dynamic_pointer_cast<Command>(object));
if (command) {
Dictionary::Ptr arguments = command->GetArguments();
Dictionary::Ptr argumentChecksums = new Dictionary;
if (arguments) {
ObjectLock argumentsLock(arguments);
for (auto& kv : arguments) {
argumentChecksums->Set(kv.first, HashValue(kv.second));
}
}
checkSums->Set("arguments_checksum", HashValue(arguments));
checkSums->Set("argument_checksums", argumentChecksums);
propertiesBlacklist.emplace("arguments");
Dictionary::Ptr envvars = command->GetEnv();
Dictionary::Ptr envvarChecksums = new Dictionary;
if (envvars) {
ObjectLock argumentsLock(envvars);
for (auto& kv : envvars) {
envvarChecksums->Set(kv.first, HashValue(kv.second));
}
}
checkSums->Set("envvars_checksum", HashValue(envvars));
checkSums->Set("envvar_checksums", envvarChecksums);
propertiesBlacklist.emplace("env");
} else {
auto timeperiod(dynamic_pointer_cast<TimePeriod>(object));
if (timeperiod) {
Dictionary::Ptr ranges = timeperiod->GetRanges();
checkSums->Set("ranges_checksum", HashValue(ranges));
propertiesBlacklist.emplace("ranges");
// Compute checksums for Includes (like groups)
Array::Ptr includes;
ConfigObject::Ptr (*getInclude)(const String& name);
includes = timeperiod->GetIncludes();
getInclude = &::GetClude;
checkSums->Set("includes_checksum", CalculateCheckSumArray(includes));
Array::Ptr includeChecksums = new Array();
ObjectLock includesLock(includes);
ObjectLock includeChecksumsLock(includeChecksums);
for (auto include : includes) {
includeChecksums->Add(GetObjectIdentifier((*getInclude)(include.Get<String>())));
}
checkSums->Set("include_checksums", includeChecksums);
// Compute checksums for Excludes (like groups)
Array::Ptr excludes;
ConfigObject::Ptr (*getExclude)(const String& name);
excludes = timeperiod->GetExcludes();
getExclude = &::GetClude;
checkSums->Set("excludes_checksum", CalculateCheckSumArray(excludes));
Array::Ptr excludeChecksums = new Array();
ObjectLock excludesLock(excludes);
ObjectLock excludeChecksumsLock(excludeChecksums);
for (auto exclude : excludes) {
excludeChecksums->Add(GetObjectIdentifier((*getExclude)(exclude.Get<String>())));
}
checkSums->Set("exclude_checksums", excludeChecksums);
} else {
icinga::Comment::Ptr comment = dynamic_pointer_cast<Comment>(object);
if (comment) {
propertiesBlacklist.emplace("name");
propertiesBlacklist.emplace("host_name");
Host::Ptr host;
Service::Ptr service;
tie(host, service) = GetHostService(comment->GetCheckable());
if (service) {
propertiesBlacklist.emplace("service_name");
checkSums->Set("service_checksum", GetObjectIdentifier(service));
typeName = "servicecomment";
} else {
checkSums->Set("host_checksum", GetObjectIdentifier(host));
typeName = "hostcomment";
}
}
}
}
checkSums->Set("host_checksum", GetObjectIdentifier(host));
typeName = "hostcomment";
}
}
/* Send all object attributes to Redis, no extra checksums involved here. */
UpdateObjectAttrs(m_PrefixConfigObject, object, FAConfig, typeName);
auto tempAttrs = (UpdateObjectAttrs(m_PrefixConfigObject, object, FAConfig, typeName));
attributes.insert(attributes.end(), std::begin(tempAttrs), std::end(tempAttrs));
/* Custom var checksums. */
CustomVarObject::Ptr customVarObject = dynamic_pointer_cast<CustomVarObject>(object);
@ -445,10 +491,8 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
if (vars) {
auto varsJson(JsonEncode(vars));
Log(LogDebug, "RedisWriter")
<< "HSET " << m_PrefixConfigCustomVar + typeName << " " << objectKey << " " << varsJson;
ExecuteQuery({"HSET", m_PrefixConfigCustomVar + typeName, objectKey, varsJson});
customVars.emplace_back(objectKey);
customVars.emplace_back(varsJson);
}
}
@ -457,56 +501,34 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran
String checkSumsBody = JsonEncode(checkSums);
Log(LogDebug, "RedisWriter")
<< "HSET " << m_PrefixConfigCheckSum + typeName << " " << objectKey << " " << checkSumsBody;
ExecuteQuery({"HSET", m_PrefixConfigCheckSum + typeName, objectKey, checkSumsBody});
checksums.emplace_back(objectKey);
checksums.emplace_back(checkSumsBody);
/* Send an update event to subscribers. */
if (runtimeUpdate) {
ExecuteQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey});
m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey});
}
if (useTransaction)
ExecuteQuery({"EXEC"});
}
void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object)
{
AssertOnWorkQueue();
/* during startup we might send duplicated object config, ignore them without any connection */
if (!m_Context)
return;
String typeName = object->GetReflectionType()->GetName().ToLower();
String objectKey = GetObjectIdentifier(object);
ExecuteQueries({
m_Rcon->ExecuteQueries({
{"HDEL", m_PrefixConfigObject + typeName, objectKey},
{"DEL", m_PrefixStatusObject + typeName + ":" + objectKey},
{"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey}
{"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey}
});
}
void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, bool useTransaction)
void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object)
{
AssertOnWorkQueue();
/* during startup we might receive check results, ignore them without any connection */
if (!m_Context)
return;
if (useTransaction)
ExecuteQuery({"MULTI"});
//TODO: Manage type names
UpdateObjectAttrs(m_PrefixStatusObject, object, FAState, "");
//TODO: Figure out what we need when we implement the history and state sync
// UpdateObjectAttrs(m_PrefixStatusObject, object, FAState, "");
if (useTransaction)
ExecuteQuery({"EXEC"});
// /* Serialize config object attributes */
// Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAState);
@ -586,7 +608,7 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, bool useTran
// }
}
void RedisWriter::UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType,
std::vector<String> RedisWriter::UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType,
const String& typeNameOverride)
{
Type::Ptr type = object->GetReflectionType();
@ -616,7 +638,8 @@ void RedisWriter::UpdateObjectAttrs(const String& keyPrefix, const ConfigObject:
if (!typeNameOverride.IsEmpty())
typeName = typeNameOverride.ToLower();
ExecuteQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)});
return {GetObjectIdentifier(object), JsonEncode(attrs)};
//m_Rcon->ExecuteQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)});
}
void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object)
@ -624,7 +647,7 @@ void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object)
Type::Ptr type = object->GetReflectionType();
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendStatusUpdate, rw, object, true));
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendStatusUpdate, rw, object));
}
}
@ -633,15 +656,15 @@ void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object)
Type::Ptr type = object->GetReflectionType();
if (object->IsActive()) {
/* Create or update the object config */
// Create or update the object config
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, true, true));
// rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigUpdate, rw, object, true));
}
} else if (!object->IsActive() &&
object->GetExtension("ConfigObjectDeleted")) { /* same as in apilistener-configsync.cpp */
/* Delete object config */
object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp
// Delete object config
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigDelete, rw.get(), object));
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigDelete, rw, object));
}
}
}

View File

@ -336,3 +336,33 @@ String RedisWriter::HashValue(const Value& value, const std::set<String>& proper
return SHA1(PackObject(temp));
}
//Used to duplicate a redisReply, needed as redisReplies are freed when the async callback finishes
redisReply* RedisWriter::dupReplyObject(redisReply* reply)
{
redisReply* r = (redisReply*)calloc(1, sizeof(*r));
memcpy(r, reply, sizeof(*r));
if(REDIS_REPLY_ERROR==reply->type || REDIS_REPLY_STRING==reply->type || REDIS_REPLY_STATUS==reply->type) //copy str
{
r->str = (char*)malloc(reply->len+1);
memcpy(r->str, reply->str, reply->len);
r->str[reply->len] = '\0';
}
else if(REDIS_REPLY_ARRAY==reply->type) //copy array
{
r->element = (redisReply**)calloc(reply->elements, sizeof(redisReply*));
memset(r->element, 0, r->elements*sizeof(redisReply*));
for(uint32_t i=0; i<reply->elements; ++i)
{
if(NULL!=reply->element[i])
{
if( NULL == (r->element[i] = dupReplyObject(reply->element[i])) )
{
//clone child failed, free current reply, and return NULL
freeReplyObject(r);
return NULL;
}
}
}
}
return r;
}

View File

@ -19,8 +19,11 @@
#include "redis/rediswriter.hpp"
#include "redis/rediswriter-ti.cpp"
#include "redis/redisconnection.hpp"
#include "remote/eventqueue.hpp"
#include "base/json.hpp"
#include "rediswriter.hpp"
#include <boost/algorithm/string.hpp>
using namespace icinga;
@ -31,8 +34,10 @@ using namespace icinga;
REGISTER_TYPE(RedisWriter);
RedisWriter::RedisWriter()
: m_Context(NULL)
: m_Rcon(nullptr)
{
m_Rcon = nullptr;
m_WorkQueue.SetName("RedisWriter");
m_PrefixConfigObject = "icinga:config:object:";
@ -52,6 +57,10 @@ void RedisWriter::Start(bool runtimeCreated)
<< "'" << GetName() << "' started.";
m_ConfigDumpInProgress = false;
m_ConfigDumpDone = false;
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex());
m_Rcon->Start();
m_WorkQueue.SetExceptionCallback(std::bind(&RedisWriter::ExceptionHandler, this, _1));
@ -75,6 +84,7 @@ void RedisWriter::Start(bool runtimeCreated)
boost::thread thread(std::bind(&RedisWriter::HandleEvents, this));
thread.detach();
}
void RedisWriter::ExceptionHandler(boost::exception_ptr exp)
@ -83,11 +93,6 @@ void RedisWriter::ExceptionHandler(boost::exception_ptr exp)
Log(LogDebug, "RedisWriter")
<< "Exception during redis operation: " << DiagnosticInformation(exp);
if (m_Context) {
redisFree(m_Context);
m_Context = NULL;
}
}
void RedisWriter::ReconnectTimerHandler()
@ -99,55 +104,23 @@ void RedisWriter::TryToReconnect()
{
AssertOnWorkQueue();
if (m_Context)
if (m_ConfigDumpDone && m_Rcon->IsConnected())
return;
String path = GetPath();
String host = GetHost();
Log(LogInformation, "RedisWriter", "Trying to connect to redis server");
if (path.IsEmpty())
m_Context = redisConnect(host.CStr(), GetPort());
else
m_Context = redisConnectUnix(path.CStr());
if (!m_Context || m_Context->err) {
if (!m_Context) {
Log(LogWarning, "RedisWriter", "Cannot allocate redis context.");
} else {
Log(LogWarning, "RedisWriter", "Connection error: ")
<< m_Context->errstr;
}
if (m_Context) {
redisFree(m_Context);
m_Context = NULL;
}
return;
}
String password = GetPassword();
/* TODO: exception is fired but terminates reconnect silently.
* Error case: Password does not match, or even: "Client sent AUTH, but no password is set" which also results in an error.
*/
if (!password.IsEmpty())
ExecuteQuery({ "AUTH", password });
int dbIndex = GetDbIndex();
if (dbIndex != 0)
ExecuteQuery({ "SELECT", Convert::ToString(dbIndex) });
else if (!m_Rcon->IsConnected())
m_Rcon->Start();
UpdateSubscriptions();
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
return;
/* Config dump */
m_ConfigDumpInProgress = true;
UpdateAllConfigObjects();
m_ConfigDumpDone = true;
m_ConfigDumpInProgress = false;
}
@ -164,15 +137,12 @@ void RedisWriter::UpdateSubscriptions()
m_Subscriptions.clear();
if (!m_Context)
return;
long long cursor = 0;
String keyPrefix = "icinga:subscription:";
do {
std::shared_ptr<redisReply> reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" });
auto reply = RedisGet({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" });
VERIFY(reply->type == REDIS_REPLY_ARRAY);
VERIFY(reply->elements % 2 == 0);
@ -207,7 +177,7 @@ void RedisWriter::UpdateSubscriptions()
bool RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
{
try {
std::shared_ptr<redisReply> redisReply = ExecuteQuery({ "SMEMBERS", key });
redisReply *redisReply = RedisGet({ "SMEMBERS", key });
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
if (redisReply->elements == 0)
@ -239,13 +209,10 @@ void RedisWriter::PublishStats()
{
AssertOnWorkQueue();
if (!m_Context)
return;
Dictionary::Ptr status = GetStats();
String jsonStats = JsonEncode(status);
ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats });
m_Rcon->ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats });
}
void RedisWriter::HandleEvents()
@ -288,9 +255,6 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
{
AssertOnWorkQueue();
if (!m_Context)
return;
for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
const auto& name = kv.first;
const auto& rsi = kv.second;
@ -300,10 +264,11 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
String body = JsonEncode(event);
std::shared_ptr<redisReply> maxExists = ExecuteQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" });
redisReply *maxExists = RedisGet({ "EXISTS", "icinga:subscription:" + name + ":limit" });
long maxEvents = MAX_EVENTS_DEFAULT;
if (maxExists->integer) {
std::shared_ptr<redisReply> redisReply = ExecuteQuery({ "GET", "icinga:subscription:" + name + ":limit"});
redisReply *redisReply =RedisGet({ "GET", "icinga:subscription:" + name + ":limit"});
VERIFY(redisReply->type == REDIS_REPLY_STRING);
Log(LogInformation, "RedisWriter")
@ -312,10 +277,11 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
maxEvents = Convert::ToLong(redisReply->str);
}
ExecuteQuery({ "MULTI" });
ExecuteQuery({ "LPUSH", "icinga:event:" + name, body });
ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)});
ExecuteQuery({ "EXEC" });
m_Rcon->ExecuteQueries({
{ "MULTI" },
{ "LPUSH", "icinga:event:" + name, body },
{ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)},
{ "EXEC" }});
}
}
@ -323,16 +289,14 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event)
{
AssertOnWorkQueue();
if (!m_Context)
return;
String body = JsonEncode(event);
// Log(LogInformation, "RedisWriter")
// << "Sending event \"" << body << "\"";
ExecuteQuery({ "PUBLISH", "icinga:event:all", body });
ExecuteQuery({ "PUBLISH", "icinga:event:" + event->Get("type"), body });
m_Rcon->ExecuteQueries({
{ "PUBLISH", "icinga:event:all", body },
{ "PUBLISH", "icinga:event:" + event->Get("type"), body }});
}
void RedisWriter::Stop(bool runtimeRemoved)
@ -348,95 +312,47 @@ void RedisWriter::AssertOnWorkQueue()
ASSERT(m_WorkQueue.IsWorkerThread());
}
std::shared_ptr<redisReply> RedisWriter::ExecuteQuery(const std::vector<String>& query)
{
const char **argv;
size_t *argvlen;
argv = new const char *[query.size()];
argvlen = new size_t[query.size()];
/*
* This whole spiel is required as we mostly use a "fire and forget" approach with the Redis Connection. To wait for a
* reply from Redis we have to wait for the callback to finish, this is done with the help of this struct. ready, cv
* and mtx are used for making sure we have the redisReply when we return.
*/
struct synchronousWait {
bool ready;
boost::condition_variable cv;
boost::mutex mtx;
redisReply* reply;
};
for (std::vector<String>::size_type i = 0; i < query.size(); i++) {
argv[i] = query[i].CStr();
argvlen[i] = query[i].GetLength();
}
void RedisWriter::RedisQueryCallback(redisAsyncContext *c, void *r, void *p) {
auto wait = (struct synchronousWait*) p;
auto rp = reinterpret_cast<redisReply *>(r);
redisReply *reply = reinterpret_cast<redisReply *>(redisCommandArgv(m_Context, query.size(), argv, argvlen));
delete [] argv;
delete [] argvlen;
if (r == NULL)
wait->reply = nullptr;
else
wait->reply = RedisWriter::dupReplyObject(rp);
if (reply->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisWriter")
<< "Redis query failed: " << reply->str;
String msg = reply->str;
freeReplyObject(reply);
BOOST_THROW_EXCEPTION(
redis_error()
<< errinfo_message(msg)
<< errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false))
);
}
return std::shared_ptr<redisReply>(reply, freeReplyObject);
boost::mutex::scoped_lock lock(wait->mtx);
wait->ready = true;
wait->cv.notify_all();
}
std::vector<std::shared_ptr<redisReply> > RedisWriter::ExecuteQueries(const std::vector<std::vector<String> >& queries)
{
const char **argv;
size_t *argvlen;
for (const auto& query : queries) {
argv = new const char *[query.size()];
argvlen = new size_t[query.size()];
redisReply* RedisWriter::RedisGet(const std::vector<String>& query) {
auto *wait = new synchronousWait;
wait->ready = false;
for (std::vector<String>::size_type i = 0; i < query.size(); i++) {
argv[i] = query[i].CStr();
argvlen[i] = query[i].GetLength();
}
m_Rcon->ExecuteQuery(query, RedisQueryCallback, wait);
redisAppendCommandArgv(m_Context, query.size(), argv, argvlen);
delete [] argv;
delete [] argvlen;
boost::mutex::scoped_lock lock(wait->mtx);
while (!wait->ready) {
wait->cv.timed_wait(lock, boost::posix_time::milliseconds(long(15 * 1000)));
if (!wait->ready)
wait->ready = true;
}
std::vector<std::shared_ptr<redisReply> > replies;
for (size_t i = 0; i < queries.size(); i++) {
redisReply *rawReply;
if (redisGetReply(m_Context, reinterpret_cast<void **>(&rawReply)) == REDIS_ERR) {
BOOST_THROW_EXCEPTION(
redis_error()
<< errinfo_message("redisGetReply() failed")
);
}
std::shared_ptr<redisReply> reply(rawReply, freeReplyObject);
replies.push_back(reply);
}
for (size_t i = 0; i < queries.size(); i++) {
const auto& query = queries[i];
const auto& reply = replies[i];
if (reply->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisWriter")
<< "Redis query failed: " << reply->str;
String msg = reply->str;
BOOST_THROW_EXCEPTION(
redis_error()
<< errinfo_message(msg)
<< errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false))
);
}
}
return replies;
}
return wait->reply;
}

View File

@ -25,6 +25,7 @@
#include "remote/messageorigin.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include "redis/redisconnection.hpp"
#include <hiredis/hiredis.h>
namespace icinga
@ -66,10 +67,11 @@ private:
/* config & status dump */
void UpdateAllConfigObjects();
void SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate = false);
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
void CreateConfigUpdate(const ConfigObject::Ptr& object, std::vector<String>& attributes, std::vector<String>& customVars, std::vector<String>& checksums, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object, bool useTransaction);
void UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);
void SendStatusUpdate(const ConfigObject::Ptr& object);
std::vector<String> UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);
/* Stats */
Dictionary::Ptr GetStats();
@ -96,14 +98,16 @@ private:
void ExceptionHandler(boost::exception_ptr exp);
std::shared_ptr<redisReply> ExecuteQuery(const std::vector<String>& query);
std::vector<std::shared_ptr<redisReply> > ExecuteQueries(const std::vector<std::vector<String> >& queries);
//Used to get a reply from the asyncronous connection
redisReply* RedisGet(const std::vector<String>& query);
static void RedisQueryCallback(redisAsyncContext *c, void *r, void *p);
static redisReply* dupReplyObject(redisReply* reply);
Timer::Ptr m_StatsTimer;
Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_SubscriptionTimer;
WorkQueue m_WorkQueue;
redisContext *m_Context;
std::map<String, RedisSubscriptionInfo> m_Subscriptions;
String m_PrefixConfigObject;
@ -112,13 +116,10 @@ private:
String m_PrefixStatusObject;
bool m_ConfigDumpInProgress;
bool m_ConfigDumpDone;
RedisConnection::Ptr m_Rcon;
};
struct redis_error : virtual std::exception, virtual boost::exception { };
struct errinfo_redis_query_;
typedef boost::error_info<struct errinfo_redis_query_, std::string> errinfo_redis_query;
}
#endif /* REDISWRITER_H */