diff --git a/lib/redis/redisconnection.cpp b/lib/redis/redisconnection.cpp index 721433339..db5fb2742 100644 --- a/lib/redis/redisconnection.cpp +++ b/lib/redis/redisconnection.cpp @@ -20,30 +20,31 @@ #include "base/object.hpp" #include "redis/redisconnection.hpp" #include "base/workqueue.hpp" -#include -#include +#include "base/logger.hpp" +#include "base/convert.hpp" #include "base/utility.hpp" #include "redis/rediswriter.hpp" +#include "hiredis/hiredis.h" + using namespace icinga; -/* -struct redis_error : virtual std::exception, virtual boost::exception { }; -struct errinfo_redis_query_; -typedef boost::error_info errinfo_redis_query; -*/ -RedisConnection::RedisConnection(const String host, const int port, const String path) : -m_Host(host), m_Port(port), m_Path(path) { + +RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) : + m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL) +{ m_RedisConnectionWorkQueue.SetName("RedisConnection"); } void RedisConnection::StaticInitialize() { - } void RedisConnection::Start() { RedisConnection::Connect(); + + std::thread thread(std::bind(&RedisConnection::HandleRW, this)); + thread.detach(); } void RedisConnection::AssertOnWorkQueue() @@ -51,11 +52,31 @@ void RedisConnection::AssertOnWorkQueue() ASSERT(m_RedisConnectionWorkQueue.IsWorkerThread()); } -void RedisConnection::Connect() { +void RedisConnection::HandleRW() +{ + Utility::SetThreadName("RedisConnection Handler"); + + for (;;) { + try { + { + boost::mutex::scoped_lock lock(m_CMutex); + redisAsyncHandleWrite(m_Context); + redisAsyncHandleRead(m_Context); + } + Utility::Sleep(0.1); + } catch (const std::exception&) { + Log(LogCritical, "RedisWriter", "Internal Redis Error"); + } + } +} + +void RedisConnection::Connect() +{ if (m_Context) return; Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async"); + boost::mutex::scoped_lock lock(m_CMutex); if (m_Path.IsEmpty()) m_Context = redisAsyncConnect(m_Host.CStr(), m_Port); @@ -78,7 +99,16 @@ void RedisConnection::Connect() { redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback); - //TODO: Authentication, DB selection, error handling + /* TODO: This currently does not work properly: + * In case of error the connection is broken, yet the Context is not set to faulty. May be a bug with hiredis. + * Error case: Password does not match, or even: "Client sent AUTH, but no password is set" which also results in an error. + */ + if (!m_Password.IsEmpty()) { + ExecuteQuery({"AUTH", m_Password}); + } + + if (m_DbIndex != 0) + ExecuteQuery({"SELECT", Convert::ToString(m_DbIndex)}); } void RedisConnection::Disconnect() @@ -86,19 +116,31 @@ void RedisConnection::Disconnect() redisAsyncDisconnect(m_Context); } -void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) { +void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) +{ if (status == REDIS_OK) Log(LogInformation, "RedisWriter") << "Redis disconnected by us"; - else - Log(LogCritical, "Rediswriter") << "Redis disconnected for reasons"; + else { + if (c->err != 0) + Log(LogCritical, "RedisWriter") << "Redis disconnected by server. Reason: " << c->errstr; + else + Log(LogCritical, "RedisWriter") << "Redis disconnected by server"; + } } + +bool RedisConnection::IsConnected() +{ + return (REDIS_CONNECTED & m_Context->c.flags) == REDIS_CONNECTED; +} + void RedisConnection::ExecuteQuery(const std::vector& query, redisCallbackFn *fn, void *privdata) { m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata)); } -void RedisConnection::ExecuteQueries(const std::vector >& queries, redisCallbackFn *fn, void *privdata) +void +RedisConnection::ExecuteQueries(const std::vector >& queries, redisCallbackFn *fn, void *privdata) { for (const auto& query : queries) { m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata)); @@ -109,6 +151,8 @@ void RedisConnection::SendMessageInternal(const std::vector& query, redi { AssertOnWorkQueue(); + boost::mutex::scoped_lock lock(m_CMutex); + if (!m_Context) { Log(LogCritical, "RedisWriter") << "Connection lost"; @@ -120,16 +164,21 @@ void RedisConnection::SendMessageInternal(const std::vector& query, redi argv = new const char *[query.size()]; argvlen = new size_t[query.size()]; + String debugstr; for (std::vector::size_type i = 0; i < query.size(); i++) { argv[i] = query[i].CStr(); argvlen[i] = query[i].GetLength(); + debugstr += argv[i]; + debugstr += " "; } + Log(LogDebug, "RedisWriter, Connection") + << "Sending Command: " << debugstr; int r = redisAsyncCommandArgv(m_Context, fn, privdata, query.size(), argv, argvlen); - delete [] argv; - delete [] argvlen; + delete[] argv; + delete[] argvlen; if (r == REDIS_REPLY_ERROR) { Log(LogCritical, "RedisWriter") @@ -137,8 +186,7 @@ void RedisConnection::SendMessageInternal(const std::vector& query, redi BOOST_THROW_EXCEPTION( redis_error() - << errinfo_redis_query("FUCK") << errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false)) ); } -} \ No newline at end of file +} diff --git a/lib/redis/redisconnection.hpp b/lib/redis/redisconnection.hpp index f23753235..c968c15af 100644 --- a/lib/redis/redisconnection.hpp +++ b/lib/redis/redisconnection.hpp @@ -24,43 +24,62 @@ #include "base/object.hpp" #include "base/workqueue.hpp" -namespace icinga { +namespace icinga +{ /** * An Async Redis connection. * * @ingroup redis */ -class RedisConnection final : public Object -{ -public: - DECLARE_PTR_TYPEDEFS(RedisConnection); + class RedisConnection final : public Object + { + public: + DECLARE_PTR_TYPEDEFS(RedisConnection); - RedisConnection(const String host, const int port, const String path); + RedisConnection(const String host, const int port, const String path, const String password = "", const int db = 0); - void Start(); + void Start(); - void Connect(); - void Disconnect(); + void Connect(); - void ExecuteQuery(const std::vector& query, redisCallbackFn *fn = NULL, void *privdata = NULL); - void ExecuteQueries(const std::vector >& queries, redisCallbackFn *fn = NULL, void *privdata = NULL); + void Disconnect(); + bool IsConnected(); -private: - static void StaticInitialize(); - void SendMessageInternal(const std::vector& query, redisCallbackFn *fn, void *privdata); - void AssertOnWorkQueue(); - static void DisconnectCallback(const redisAsyncContext *c, int status); + void ExecuteQuery(const std::vector& query, redisCallbackFn *fn = NULL, void *privdata = NULL); - WorkQueue m_RedisConnectionWorkQueue{100000}; + void ExecuteQueries(const std::vector >& queries, redisCallbackFn *fn = NULL, + void *privdata = NULL); - redisAsyncContext *m_Context; + private: + static void StaticInitialize(); - String m_Path; - String m_Host; - int m_Port; -}; + void SendMessageInternal(const std::vector& query, redisCallbackFn *fn, void *privdata); + void AssertOnWorkQueue(); + + void HandleRW(); + + static void DisconnectCallback(const redisAsyncContext *c, int status); + + WorkQueue m_RedisConnectionWorkQueue{100000}; + Timer::Ptr m_EventLoop; + + redisAsyncContext *m_Context; + + String m_Path; + String m_Host; + int m_Port; + String m_Password; + int m_DbIndex; + + boost::mutex m_CMutex; + }; + + struct redis_error : virtual std::exception, virtual boost::exception { }; + + struct errinfo_redis_query_; + typedef boost::error_info errinfo_redis_query; } #endif //REDISCONNECTION_H diff --git a/lib/redis/rediswriter-objects.cpp b/lib/redis/rediswriter-objects.cpp index 7578bd150..9e36dcbff 100644 --- a/lib/redis/rediswriter-objects.cpp +++ b/lib/redis/rediswriter-objects.cpp @@ -18,6 +18,7 @@ ******************************************************************************/ #include "redis/rediswriter.hpp" +#include "redis/redisconnection.hpp" #include "icinga/command.hpp" #include "base/configtype.hpp" #include "base/configobject.hpp" @@ -39,6 +40,7 @@ #include "base/initialize.hpp" #include "base/convert.hpp" #include "base/array.hpp" +#include "base/exception.hpp" #include #include @@ -56,77 +58,126 @@ void RedisWriter::ConfigStaticInitialize() ConfigObject::OnVersionChanged.connect(std::bind(&RedisWriter::VersionChangedHandler, _1)); } -void RedisWriter::UpdateAllConfigObjects(void) +void RedisWriter::UpdateAllConfigObjects() { - AssertOnWorkQueue(); - double startTime = Utility::GetTime(); - for (const Type::Ptr& type : Type::GetAllTypes()) { - ConfigType *ctype = dynamic_cast(type.get()); + m_Rcon->ExecuteQuery({"flushall"}); + // Use a Workqueue to pack objects in parallel + WorkQueue upq(25000, Configuration::Concurrency); + upq.SetName("RedisWriter:ConfigDump"); + + typedef std::pair TypePair; + std::vector types; + + for (const Type::Ptr& type : Type::GetAllTypes()) { + ConfigType *ctype = dynamic_cast(type.get()); if (!ctype) continue; - auto lcType(type->GetName().ToLower()); + String lcType (type->GetName().ToLower()); + types.emplace_back(ctype, lcType); + m_Rcon->ExecuteQuery({"DEL", m_PrefixConfigCheckSum + lcType, m_PrefixConfigObject + lcType, m_PrefixStatusObject + lcType}); + } - ExecuteQuery({"MULTI"}); + upq.ParallelFor(types, [this](const TypePair& type) { + size_t bulkCounter = 0; + auto attributes = new std::vector(); + attributes->emplace_back("HMSET"); + attributes->emplace_back(m_PrefixConfigObject + type.second); + auto customVars = new std::vector(); + customVars->emplace_back("HMSET"); + customVars->emplace_back(m_PrefixConfigCustomVar + type.second); + auto checksums = new std::vector(); + checksums->emplace_back("HMSET"); + checksums->emplace_back(m_PrefixConfigCheckSum + type.second); - /* Delete obsolete object keys first. */ - ExecuteQuery( - {"DEL", m_PrefixConfigCheckSum + lcType, m_PrefixConfigObject + lcType, m_PrefixStatusObject + lcType}); - /* fetch all objects and dump them */ - for (const ConfigObject::Ptr& object : ctype->GetObjects()) { - SendConfigUpdate(object, false); - SendStatusUpdate(object, false); + for (const ConfigObject::Ptr& object : type.first->GetObjects()) { + CreateConfigUpdate(object, *attributes, *customVars, *checksums, false); + SendStatusUpdate(object); + bulkCounter ++; + if (!bulkCounter % 100) { + if (attributes->size() > 2) { + m_Rcon->ExecuteQuery(*attributes); + attributes->erase(attributes->begin() + 2, attributes->end()); + } + if (customVars->size() > 2) { + m_Rcon->ExecuteQuery(*customVars); + customVars->erase(customVars->begin() + 2, customVars->end()); + } + if (checksums->size() > 2) { + m_Rcon->ExecuteQuery(*checksums); + checksums->erase(checksums->begin() + 2, checksums->end()); + } + } } + if (attributes->size() > 2) + m_Rcon->ExecuteQuery(*attributes); + if (customVars->size() > 2) + m_Rcon->ExecuteQuery(*customVars); + if (checksums->size() > 2) + m_Rcon->ExecuteQuery(*checksums); - /* publish config type dump finished */ - ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType}); + Log(LogNotice, "RedisWriter") + << "Dumped " << bulkCounter << " objects of type " << type.second; + }); - ExecuteQuery({"EXEC"}); + upq.Join(); + + if (upq.HasExceptions()) { + for (auto exc : upq.GetExceptions()) { + Log(LogCritical, "RedisWriter") + << "Exception during ConfigDump: " << exc; + } } Log(LogInformation, "RedisWriter") << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds."; } -static ConfigObject::Ptr GetHostGroup(const String& name) +template +static ConfigObject::Ptr GetObjectByName(const String& name) { - return ConfigObject::GetObject(name); + return ConfigObject::GetObject(name); } -static ConfigObject::Ptr GetServiceGroup(const String& name) +// Used to update a single object, used for runtime updates +void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate) { - return ConfigObject::GetObject(name); + String typeName = object->GetReflectionType()->GetName().ToLower(); + auto attributes = new std::vector(); + attributes->emplace_back("HSET"); + attributes->emplace_back(m_PrefixConfigObject +typeName); + auto customVars = new std::vector(); + customVars->emplace_back("HSET"); + customVars->emplace_back(m_PrefixConfigCustomVar + typeName); + auto checksums = new std::vector(); + checksums->emplace_back("HSET"); + checksums->emplace_back(m_PrefixConfigCheckSum +typeName); + + CreateConfigUpdate(object, *attributes, *customVars, *checksums, runtimeUpdate); + + m_Rcon->ExecuteQuery(*attributes); + m_Rcon->ExecuteQuery(*customVars); + m_Rcon->ExecuteQuery(*checksums); } -static ConfigObject::Ptr GetUserGroup(const String& name) +/* Creates a config update with computed checksums etc. + * Writes attributes, customVars and checksums into the respective supplied vectors. Adds two values to each vector + * (if applicable), first the key then the value. To use in a Redis command the command (e.g. HSET) and the key (e.g. + * icinga:config:object:downtime) need to be prepended. There is nothing to indicate success or failure. + */ +void RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, std::vector& attributes, std::vector& customVars, std::vector& checksums, bool runtimeUpdate) { - return ConfigObject::GetObject(name); -} - -static ConfigObject::Ptr GetClude(const String& name) -{ - return ConfigObject::GetObject(name); -} - -void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate) -{ - AssertOnWorkQueue(); - - /* during startup we might send duplicated object config, ignore them without any connection */ - if (!m_Context) - return; - /* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup. if (!runtimeUpdate && m_ConfigDumpInProgress) return; */ - if (useTransaction) - ExecuteQuery({"MULTI"}); + if (m_Rcon == nullptr) + return; /* Calculate object specific checksums and store them in a different namespace. */ Type::Ptr type = object->GetReflectionType(); @@ -158,7 +209,6 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran } User::Ptr user = dynamic_pointer_cast(object); - if (user) { propertiesBlacklist.emplace("groups"); @@ -166,7 +216,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran ConfigObject::Ptr (*getGroup)(const String& name); groups = user->GetGroups(); - getGroup = &::GetUserGroup; + getGroup = &::GetObjectByName; checkSums->Set("groups_checksum", CalculateCheckSumArray(groups)); @@ -188,7 +238,6 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran } Notification::Ptr notification = dynamic_pointer_cast(object); - if (notification) { Host::Ptr host; Service::Ptr service; @@ -236,7 +285,6 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran /* Calculate checkable checksums. */ Checkable::Ptr checkable = dynamic_pointer_cast(object); - if (checkable) { /* groups_checksum, group_checksums */ propertiesBlacklist.emplace("groups"); @@ -251,13 +299,13 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran if (service) { groups = service->GetGroups(); - getGroup = &::GetServiceGroup; + getGroup = &::GetObjectByName; /* Calculate the host_checksum */ checkSums->Set("host_checksum", GetObjectIdentifier(host)); } else { groups = host->GetGroups(); - getGroup = &::GetHostGroup; + getGroup = &::GetObjectByName; } checkSums->Set("groups_checksum", CalculateCheckSumArray(groups)); @@ -298,139 +346,137 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran checkSums->Set("notes_url_checksum", CalculateCheckSumString(notesUrl)); if (!iconImage.IsEmpty()) checkSums->Set("icon_image_checksum", CalculateCheckSumString(iconImage)); - } else { - Zone::Ptr zone = dynamic_pointer_cast(object); + } - if (zone) { - propertiesBlacklist.emplace("endpoints"); + Zone::Ptr zone = dynamic_pointer_cast(object); + if (zone) { + propertiesBlacklist.emplace("endpoints"); - auto endpointObjects = zone->GetEndpoints(); - Array::Ptr endpoints = new Array(); - endpoints->Resize(endpointObjects.size()); + auto endpointObjects = zone->GetEndpoints(); + Array::Ptr endpoints = new Array(); + endpoints->Resize(endpointObjects.size()); - Array::SizeType i = 0; - for (auto& endpointObject : endpointObjects) { - endpoints->Set(i++, endpointObject->GetName()); + Array::SizeType i = 0; + for (auto& endpointObject : endpointObjects) { + endpoints->Set(i++, endpointObject->GetName()); + } + + checkSums->Set("endpoints_checksum", CalculateCheckSumArray(endpoints)); + + Array::Ptr parents(new Array); + + for (auto& parent : zone->GetAllParentsRaw()) { + parents->Add(GetObjectIdentifier(parent)); + } + + checkSums->Set("all_parents_checksums", parents); + checkSums->Set("all_parents_checksum", HashValue(zone->GetAllParents())); + } + + /* zone_checksum for endpoints already is calculated above. */ + + auto command(dynamic_pointer_cast(object)); + if (command) { + Dictionary::Ptr arguments = command->GetArguments(); + Dictionary::Ptr argumentChecksums = new Dictionary; + + if (arguments) { + ObjectLock argumentsLock(arguments); + + for (auto& kv : arguments) { + argumentChecksums->Set(kv.first, HashValue(kv.second)); } + } - checkSums->Set("endpoints_checksum", CalculateCheckSumArray(endpoints)); + checkSums->Set("arguments_checksum", HashValue(arguments)); + checkSums->Set("argument_checksums", argumentChecksums); + propertiesBlacklist.emplace("arguments"); - Array::Ptr parents(new Array); + Dictionary::Ptr envvars = command->GetEnv(); + Dictionary::Ptr envvarChecksums = new Dictionary; - for (auto& parent : zone->GetAllParentsRaw()) { - parents->Add(GetObjectIdentifier(parent)); + if (envvars) { + ObjectLock argumentsLock(envvars); + + for (auto& kv : envvars) { + envvarChecksums->Set(kv.first, HashValue(kv.second)); } + } - checkSums->Set("all_parents_checksums", parents); - checkSums->Set("all_parents_checksum", HashValue(zone->GetAllParents())); + checkSums->Set("envvars_checksum", HashValue(envvars)); + checkSums->Set("envvar_checksums", envvarChecksums); + propertiesBlacklist.emplace("env"); + } + + auto timeperiod(dynamic_pointer_cast(object)); + if (timeperiod) { + Dictionary::Ptr ranges = timeperiod->GetRanges(); + + checkSums->Set("ranges_checksum", HashValue(ranges)); + propertiesBlacklist.emplace("ranges"); + + // Compute checksums for Includes (like groups) + Array::Ptr includes; + ConfigObject::Ptr (*getInclude)(const String& name); + + includes = timeperiod->GetIncludes(); + getInclude = &::GetObjectByName; + + checkSums->Set("includes_checksum", CalculateCheckSumArray(includes)); + + Array::Ptr includeChecksums = new Array(); + + ObjectLock includesLock(includes); + ObjectLock includeChecksumsLock(includeChecksums); + + for (auto include : includes) { + includeChecksums->Add(GetObjectIdentifier((*getInclude)(include.Get()))); + } + + checkSums->Set("include_checksums", includeChecksums); + + // Compute checksums for Excludes (like groups) + Array::Ptr excludes; + ConfigObject::Ptr (*getExclude)(const String& name); + + excludes = timeperiod->GetExcludes(); + getExclude = &::GetObjectByName; + + checkSums->Set("excludes_checksum", CalculateCheckSumArray(excludes)); + + Array::Ptr excludeChecksums = new Array(); + + ObjectLock excludesLock(excludes); + ObjectLock excludeChecksumsLock(excludeChecksums); + + for (auto exclude : excludes) { + excludeChecksums->Add(GetObjectIdentifier((*getExclude)(exclude.Get()))); + } + + checkSums->Set("exclude_checksums", excludeChecksums); + } + + icinga::Comment::Ptr comment = dynamic_pointer_cast(object); + if (comment) { + propertiesBlacklist.emplace("name"); + propertiesBlacklist.emplace("host_name"); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(comment->GetCheckable()); + if (service) { + propertiesBlacklist.emplace("service_name"); + checkSums->Set("service_checksum", GetObjectIdentifier(service)); + typeName = "servicecomment"; } else { - /* zone_checksum for endpoints already is calculated above. */ - - auto command(dynamic_pointer_cast(object)); - - if (command) { - Dictionary::Ptr arguments = command->GetArguments(); - Dictionary::Ptr argumentChecksums = new Dictionary; - - if (arguments) { - ObjectLock argumentsLock(arguments); - - for (auto& kv : arguments) { - argumentChecksums->Set(kv.first, HashValue(kv.second)); - } - } - - checkSums->Set("arguments_checksum", HashValue(arguments)); - checkSums->Set("argument_checksums", argumentChecksums); - propertiesBlacklist.emplace("arguments"); - - Dictionary::Ptr envvars = command->GetEnv(); - Dictionary::Ptr envvarChecksums = new Dictionary; - - if (envvars) { - ObjectLock argumentsLock(envvars); - - for (auto& kv : envvars) { - envvarChecksums->Set(kv.first, HashValue(kv.second)); - } - } - - checkSums->Set("envvars_checksum", HashValue(envvars)); - checkSums->Set("envvar_checksums", envvarChecksums); - propertiesBlacklist.emplace("env"); - } else { - auto timeperiod(dynamic_pointer_cast(object)); - - if (timeperiod) { - Dictionary::Ptr ranges = timeperiod->GetRanges(); - - checkSums->Set("ranges_checksum", HashValue(ranges)); - propertiesBlacklist.emplace("ranges"); - - // Compute checksums for Includes (like groups) - Array::Ptr includes; - ConfigObject::Ptr (*getInclude)(const String& name); - - includes = timeperiod->GetIncludes(); - getInclude = &::GetClude; - - checkSums->Set("includes_checksum", CalculateCheckSumArray(includes)); - - Array::Ptr includeChecksums = new Array(); - - ObjectLock includesLock(includes); - ObjectLock includeChecksumsLock(includeChecksums); - - for (auto include : includes) { - includeChecksums->Add(GetObjectIdentifier((*getInclude)(include.Get()))); - } - - checkSums->Set("include_checksums", includeChecksums); - - // Compute checksums for Excludes (like groups) - Array::Ptr excludes; - ConfigObject::Ptr (*getExclude)(const String& name); - - excludes = timeperiod->GetExcludes(); - getExclude = &::GetClude; - - checkSums->Set("excludes_checksum", CalculateCheckSumArray(excludes)); - - Array::Ptr excludeChecksums = new Array(); - - ObjectLock excludesLock(excludes); - ObjectLock excludeChecksumsLock(excludeChecksums); - - for (auto exclude : excludes) { - excludeChecksums->Add(GetObjectIdentifier((*getExclude)(exclude.Get()))); - } - - checkSums->Set("exclude_checksums", excludeChecksums); - } else { - icinga::Comment::Ptr comment = dynamic_pointer_cast(object); - if (comment) { - propertiesBlacklist.emplace("name"); - propertiesBlacklist.emplace("host_name"); - - Host::Ptr host; - Service::Ptr service; - tie(host, service) = GetHostService(comment->GetCheckable()); - if (service) { - propertiesBlacklist.emplace("service_name"); - checkSums->Set("service_checksum", GetObjectIdentifier(service)); - typeName = "servicecomment"; - } else { - checkSums->Set("host_checksum", GetObjectIdentifier(host)); - typeName = "hostcomment"; - } - } - } - } + checkSums->Set("host_checksum", GetObjectIdentifier(host)); + typeName = "hostcomment"; } } /* Send all object attributes to Redis, no extra checksums involved here. */ - UpdateObjectAttrs(m_PrefixConfigObject, object, FAConfig, typeName); + auto tempAttrs = (UpdateObjectAttrs(m_PrefixConfigObject, object, FAConfig, typeName)); + attributes.insert(attributes.end(), std::begin(tempAttrs), std::end(tempAttrs)); /* Custom var checksums. */ CustomVarObject::Ptr customVarObject = dynamic_pointer_cast(object); @@ -445,10 +491,8 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran if (vars) { auto varsJson(JsonEncode(vars)); - Log(LogDebug, "RedisWriter") - << "HSET " << m_PrefixConfigCustomVar + typeName << " " << objectKey << " " << varsJson; - - ExecuteQuery({"HSET", m_PrefixConfigCustomVar + typeName, objectKey, varsJson}); + customVars.emplace_back(objectKey); + customVars.emplace_back(varsJson); } } @@ -457,56 +501,34 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool useTran String checkSumsBody = JsonEncode(checkSums); - Log(LogDebug, "RedisWriter") - << "HSET " << m_PrefixConfigCheckSum + typeName << " " << objectKey << " " << checkSumsBody; - - ExecuteQuery({"HSET", m_PrefixConfigCheckSum + typeName, objectKey, checkSumsBody}); + checksums.emplace_back(objectKey); + checksums.emplace_back(checkSumsBody); /* Send an update event to subscribers. */ if (runtimeUpdate) { - ExecuteQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey}); + m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey}); } - - if (useTransaction) - ExecuteQuery({"EXEC"}); } void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object) { - AssertOnWorkQueue(); - - /* during startup we might send duplicated object config, ignore them without any connection */ - if (!m_Context) - return; - String typeName = object->GetReflectionType()->GetName().ToLower(); String objectKey = GetObjectIdentifier(object); - ExecuteQueries({ + m_Rcon->ExecuteQueries({ {"HDEL", m_PrefixConfigObject + typeName, objectKey}, {"DEL", m_PrefixStatusObject + typeName + ":" + objectKey}, - {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} + {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} }); - } -void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, bool useTransaction) +void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object) { - AssertOnWorkQueue(); - - /* during startup we might receive check results, ignore them without any connection */ - if (!m_Context) - return; - - if (useTransaction) - ExecuteQuery({"MULTI"}); - //TODO: Manage type names - UpdateObjectAttrs(m_PrefixStatusObject, object, FAState, ""); + //TODO: Figure out what we need when we implement the history and state sync +// UpdateObjectAttrs(m_PrefixStatusObject, object, FAState, ""); - if (useTransaction) - ExecuteQuery({"EXEC"}); // /* Serialize config object attributes */ // Dictionary::Ptr objectAttrs = SerializeObjectAttrs(object, FAState); @@ -586,7 +608,7 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object, bool useTran // } } -void RedisWriter::UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType, +std::vector RedisWriter::UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride) { Type::Ptr type = object->GetReflectionType(); @@ -616,7 +638,8 @@ void RedisWriter::UpdateObjectAttrs(const String& keyPrefix, const ConfigObject: if (!typeNameOverride.IsEmpty()) typeName = typeNameOverride.ToLower(); - ExecuteQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); + return {GetObjectIdentifier(object), JsonEncode(attrs)}; + //m_Rcon->ExecuteQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); } void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object) @@ -624,7 +647,7 @@ void RedisWriter::StateChangedHandler(const ConfigObject::Ptr& object) Type::Ptr type = object->GetReflectionType(); for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendStatusUpdate, rw, object, true)); + rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendStatusUpdate, rw, object)); } } @@ -633,15 +656,15 @@ void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object) Type::Ptr type = object->GetReflectionType(); if (object->IsActive()) { - /* Create or update the object config */ + // Create or update the object config for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigUpdate, rw.get(), object, true, true)); +// rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigUpdate, rw, object, true)); } } else if (!object->IsActive() && - object->GetExtension("ConfigObjectDeleted")) { /* same as in apilistener-configsync.cpp */ - /* Delete object config */ + object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp + // Delete object config for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType()) { - rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigDelete, rw.get(), object)); + rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigDelete, rw, object)); } } } diff --git a/lib/redis/rediswriter-utility.cpp b/lib/redis/rediswriter-utility.cpp index 626506f65..2592828a4 100644 --- a/lib/redis/rediswriter-utility.cpp +++ b/lib/redis/rediswriter-utility.cpp @@ -336,3 +336,33 @@ String RedisWriter::HashValue(const Value& value, const std::set& proper return SHA1(PackObject(temp)); } +//Used to duplicate a redisReply, needed as redisReplies are freed when the async callback finishes +redisReply* RedisWriter::dupReplyObject(redisReply* reply) +{ + redisReply* r = (redisReply*)calloc(1, sizeof(*r)); + memcpy(r, reply, sizeof(*r)); + if(REDIS_REPLY_ERROR==reply->type || REDIS_REPLY_STRING==reply->type || REDIS_REPLY_STATUS==reply->type) //copy str + { + r->str = (char*)malloc(reply->len+1); + memcpy(r->str, reply->str, reply->len); + r->str[reply->len] = '\0'; + } + else if(REDIS_REPLY_ARRAY==reply->type) //copy array + { + r->element = (redisReply**)calloc(reply->elements, sizeof(redisReply*)); + memset(r->element, 0, r->elements*sizeof(redisReply*)); + for(uint32_t i=0; ielements; ++i) + { + if(NULL!=reply->element[i]) + { + if( NULL == (r->element[i] = dupReplyObject(reply->element[i])) ) + { + //clone child failed, free current reply, and return NULL + freeReplyObject(r); + return NULL; + } + } + } + } + return r; +} diff --git a/lib/redis/rediswriter.cpp b/lib/redis/rediswriter.cpp index 8c4c74149..d93b602a7 100644 --- a/lib/redis/rediswriter.cpp +++ b/lib/redis/rediswriter.cpp @@ -19,8 +19,11 @@ #include "redis/rediswriter.hpp" #include "redis/rediswriter-ti.cpp" +#include "redis/redisconnection.hpp" #include "remote/eventqueue.hpp" #include "base/json.hpp" +#include "rediswriter.hpp" + #include using namespace icinga; @@ -31,8 +34,10 @@ using namespace icinga; REGISTER_TYPE(RedisWriter); RedisWriter::RedisWriter() - : m_Context(NULL) +: m_Rcon(nullptr) { + m_Rcon = nullptr; + m_WorkQueue.SetName("RedisWriter"); m_PrefixConfigObject = "icinga:config:object:"; @@ -52,6 +57,10 @@ void RedisWriter::Start(bool runtimeCreated) << "'" << GetName() << "' started."; m_ConfigDumpInProgress = false; + m_ConfigDumpDone = false; + + m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex()); + m_Rcon->Start(); m_WorkQueue.SetExceptionCallback(std::bind(&RedisWriter::ExceptionHandler, this, _1)); @@ -75,6 +84,7 @@ void RedisWriter::Start(bool runtimeCreated) boost::thread thread(std::bind(&RedisWriter::HandleEvents, this)); thread.detach(); + } void RedisWriter::ExceptionHandler(boost::exception_ptr exp) @@ -83,11 +93,6 @@ void RedisWriter::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "RedisWriter") << "Exception during redis operation: " << DiagnosticInformation(exp); - - if (m_Context) { - redisFree(m_Context); - m_Context = NULL; - } } void RedisWriter::ReconnectTimerHandler() @@ -99,55 +104,23 @@ void RedisWriter::TryToReconnect() { AssertOnWorkQueue(); - if (m_Context) + if (m_ConfigDumpDone && m_Rcon->IsConnected()) return; - - String path = GetPath(); - String host = GetHost(); - - Log(LogInformation, "RedisWriter", "Trying to connect to redis server"); - - if (path.IsEmpty()) - m_Context = redisConnect(host.CStr(), GetPort()); - else - m_Context = redisConnectUnix(path.CStr()); - - if (!m_Context || m_Context->err) { - if (!m_Context) { - Log(LogWarning, "RedisWriter", "Cannot allocate redis context."); - } else { - Log(LogWarning, "RedisWriter", "Connection error: ") - << m_Context->errstr; - } - - if (m_Context) { - redisFree(m_Context); - m_Context = NULL; - } - - return; - } - - String password = GetPassword(); - - /* TODO: exception is fired but terminates reconnect silently. - * Error case: Password does not match, or even: "Client sent AUTH, but no password is set" which also results in an error. - */ - if (!password.IsEmpty()) - ExecuteQuery({ "AUTH", password }); - - int dbIndex = GetDbIndex(); - - if (dbIndex != 0) - ExecuteQuery({ "SELECT", Convert::ToString(dbIndex) }); + else if (!m_Rcon->IsConnected()) + m_Rcon->Start(); UpdateSubscriptions(); + if (m_ConfigDumpInProgress || m_ConfigDumpDone) + return; + /* Config dump */ m_ConfigDumpInProgress = true; UpdateAllConfigObjects(); + m_ConfigDumpDone = true; + m_ConfigDumpInProgress = false; } @@ -164,15 +137,12 @@ void RedisWriter::UpdateSubscriptions() m_Subscriptions.clear(); - if (!m_Context) - return; - long long cursor = 0; String keyPrefix = "icinga:subscription:"; do { - std::shared_ptr reply = ExecuteQuery({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); + auto reply = RedisGet({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); VERIFY(reply->type == REDIS_REPLY_ARRAY); VERIFY(reply->elements % 2 == 0); @@ -207,7 +177,7 @@ void RedisWriter::UpdateSubscriptions() bool RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi) { try { - std::shared_ptr redisReply = ExecuteQuery({ "SMEMBERS", key }); + redisReply *redisReply = RedisGet({ "SMEMBERS", key }); VERIFY(redisReply->type == REDIS_REPLY_ARRAY); if (redisReply->elements == 0) @@ -239,13 +209,10 @@ void RedisWriter::PublishStats() { AssertOnWorkQueue(); - if (!m_Context) - return; - Dictionary::Ptr status = GetStats(); String jsonStats = JsonEncode(status); - ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats }); + m_Rcon->ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats }); } void RedisWriter::HandleEvents() @@ -288,9 +255,6 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event) { AssertOnWorkQueue(); - if (!m_Context) - return; - for (const std::pair& kv : m_Subscriptions) { const auto& name = kv.first; const auto& rsi = kv.second; @@ -300,10 +264,11 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event) String body = JsonEncode(event); - std::shared_ptr maxExists = ExecuteQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" }); + redisReply *maxExists = RedisGet({ "EXISTS", "icinga:subscription:" + name + ":limit" }); + long maxEvents = MAX_EVENTS_DEFAULT; if (maxExists->integer) { - std::shared_ptr redisReply = ExecuteQuery({ "GET", "icinga:subscription:" + name + ":limit"}); + redisReply *redisReply =RedisGet({ "GET", "icinga:subscription:" + name + ":limit"}); VERIFY(redisReply->type == REDIS_REPLY_STRING); Log(LogInformation, "RedisWriter") @@ -312,10 +277,11 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event) maxEvents = Convert::ToLong(redisReply->str); } - ExecuteQuery({ "MULTI" }); - ExecuteQuery({ "LPUSH", "icinga:event:" + name, body }); - ExecuteQuery({ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)}); - ExecuteQuery({ "EXEC" }); + m_Rcon->ExecuteQueries({ + { "MULTI" }, + { "LPUSH", "icinga:event:" + name, body }, + { "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)}, + { "EXEC" }}); } } @@ -323,16 +289,14 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event) { AssertOnWorkQueue(); - if (!m_Context) - return; - String body = JsonEncode(event); // Log(LogInformation, "RedisWriter") // << "Sending event \"" << body << "\""; - ExecuteQuery({ "PUBLISH", "icinga:event:all", body }); - ExecuteQuery({ "PUBLISH", "icinga:event:" + event->Get("type"), body }); + m_Rcon->ExecuteQueries({ + { "PUBLISH", "icinga:event:all", body }, + { "PUBLISH", "icinga:event:" + event->Get("type"), body }}); } void RedisWriter::Stop(bool runtimeRemoved) @@ -348,95 +312,47 @@ void RedisWriter::AssertOnWorkQueue() ASSERT(m_WorkQueue.IsWorkerThread()); } -std::shared_ptr RedisWriter::ExecuteQuery(const std::vector& query) -{ - const char **argv; - size_t *argvlen; - argv = new const char *[query.size()]; - argvlen = new size_t[query.size()]; +/* + * This whole spiel is required as we mostly use a "fire and forget" approach with the Redis Connection. To wait for a + * reply from Redis we have to wait for the callback to finish, this is done with the help of this struct. ready, cv + * and mtx are used for making sure we have the redisReply when we return. + */ +struct synchronousWait { + bool ready; + boost::condition_variable cv; + boost::mutex mtx; + redisReply* reply; +}; - for (std::vector::size_type i = 0; i < query.size(); i++) { - argv[i] = query[i].CStr(); - argvlen[i] = query[i].GetLength(); - } +void RedisWriter::RedisQueryCallback(redisAsyncContext *c, void *r, void *p) { + auto wait = (struct synchronousWait*) p; + auto rp = reinterpret_cast(r); - redisReply *reply = reinterpret_cast(redisCommandArgv(m_Context, query.size(), argv, argvlen)); - delete [] argv; - delete [] argvlen; + if (r == NULL) + wait->reply = nullptr; + else + wait->reply = RedisWriter::dupReplyObject(rp); - if (reply->type == REDIS_REPLY_ERROR) { - Log(LogCritical, "RedisWriter") - << "Redis query failed: " << reply->str; - - String msg = reply->str; - - freeReplyObject(reply); - - BOOST_THROW_EXCEPTION( - redis_error() - << errinfo_message(msg) - << errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false)) - ); - } - - return std::shared_ptr(reply, freeReplyObject); + boost::mutex::scoped_lock lock(wait->mtx); + wait->ready = true; + wait->cv.notify_all(); } -std::vector > RedisWriter::ExecuteQueries(const std::vector >& queries) -{ - const char **argv; - size_t *argvlen; - for (const auto& query : queries) { - argv = new const char *[query.size()]; - argvlen = new size_t[query.size()]; +redisReply* RedisWriter::RedisGet(const std::vector& query) { + auto *wait = new synchronousWait; + wait->ready = false; - for (std::vector::size_type i = 0; i < query.size(); i++) { - argv[i] = query[i].CStr(); - argvlen[i] = query[i].GetLength(); - } + m_Rcon->ExecuteQuery(query, RedisQueryCallback, wait); - redisAppendCommandArgv(m_Context, query.size(), argv, argvlen); - - delete [] argv; - delete [] argvlen; + boost::mutex::scoped_lock lock(wait->mtx); + while (!wait->ready) { + wait->cv.timed_wait(lock, boost::posix_time::milliseconds(long(15 * 1000))); + if (!wait->ready) + wait->ready = true; } - std::vector > replies; - - for (size_t i = 0; i < queries.size(); i++) { - redisReply *rawReply; - - if (redisGetReply(m_Context, reinterpret_cast(&rawReply)) == REDIS_ERR) { - BOOST_THROW_EXCEPTION( - redis_error() - << errinfo_message("redisGetReply() failed") - ); - } - - std::shared_ptr reply(rawReply, freeReplyObject); - replies.push_back(reply); - } - - for (size_t i = 0; i < queries.size(); i++) { - const auto& query = queries[i]; - const auto& reply = replies[i]; - - if (reply->type == REDIS_REPLY_ERROR) { - Log(LogCritical, "RedisWriter") - << "Redis query failed: " << reply->str; - - String msg = reply->str; - - BOOST_THROW_EXCEPTION( - redis_error() - << errinfo_message(msg) - << errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false)) - ); - } - } - - return replies; -} + return wait->reply; +} \ No newline at end of file diff --git a/lib/redis/rediswriter.hpp b/lib/redis/rediswriter.hpp index a47cfd9ab..34ffbee9d 100644 --- a/lib/redis/rediswriter.hpp +++ b/lib/redis/rediswriter.hpp @@ -25,6 +25,7 @@ #include "remote/messageorigin.hpp" #include "base/timer.hpp" #include "base/workqueue.hpp" +#include "redis/redisconnection.hpp" #include namespace icinga @@ -66,10 +67,11 @@ private: /* config & status dump */ void UpdateAllConfigObjects(); - void SendConfigUpdate(const ConfigObject::Ptr& object, bool useTransaction, bool runtimeUpdate = false); + void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); + void CreateConfigUpdate(const ConfigObject::Ptr& object, std::vector& attributes, std::vector& customVars, std::vector& checksums, bool runtimeUpdate); void SendConfigDelete(const ConfigObject::Ptr& object); - void SendStatusUpdate(const ConfigObject::Ptr& object, bool useTransaction); - void UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride); + void SendStatusUpdate(const ConfigObject::Ptr& object); + std::vector UpdateObjectAttrs(const String& keyPrefix, const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride); /* Stats */ Dictionary::Ptr GetStats(); @@ -96,14 +98,16 @@ private: void ExceptionHandler(boost::exception_ptr exp); - std::shared_ptr ExecuteQuery(const std::vector& query); - std::vector > ExecuteQueries(const std::vector >& queries); + //Used to get a reply from the asyncronous connection + redisReply* RedisGet(const std::vector& query); + static void RedisQueryCallback(redisAsyncContext *c, void *r, void *p); + static redisReply* dupReplyObject(redisReply* reply); + Timer::Ptr m_StatsTimer; Timer::Ptr m_ReconnectTimer; Timer::Ptr m_SubscriptionTimer; WorkQueue m_WorkQueue; - redisContext *m_Context; std::map m_Subscriptions; String m_PrefixConfigObject; @@ -112,13 +116,10 @@ private: String m_PrefixStatusObject; bool m_ConfigDumpInProgress; + bool m_ConfigDumpDone; + + RedisConnection::Ptr m_Rcon; }; - -struct redis_error : virtual std::exception, virtual boost::exception { }; - -struct errinfo_redis_query_; -typedef boost::error_info errinfo_redis_query; - } #endif /* REDISWRITER_H */