diff --git a/CMakeLists.txt b/CMakeLists.txt index bfc558769..1eefc9938 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -22,6 +22,13 @@ option(ICINGA2_WITH_NOTIFICATION "Build the notification module" ON) option(ICINGA2_WITH_PERFDATA "Build the perfdata module" ON) option(ICINGA2_WITH_TESTS "Run unit tests" ON) +# IcingaDB only is supported on modern Linux/Unix master systems +if(NOT WIN32) + option(ICINGA2_WITH_ICINGADB "Build the IcingaDB module" ON) +else() + option(ICINGA2_WITH_ICINGADB "Build the IcingaDB module" OFF) +endif() + option (USE_SYSTEMD "Configure icinga as native systemd service instead of a SysV initscript" OFF) @@ -203,7 +210,6 @@ if(HAVE_SYSTEMD) list(APPEND base_DEPS systemd) endif() - if(EDITLINE_FOUND) list(APPEND base_DEPS ${EDITLINE_LIBRARIES}) include_directories(${EDITLINE_INCLUDE_DIR}) diff --git a/doc/09-object-types.md b/doc/09-object-types.md index 40f161bc4..efe796e5c 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1379,6 +1379,30 @@ Configuration Attributes: vars | Dictionary | **Optional.** A dictionary containing custom variables that are available globally. environment | String | **Optional.** Specify the Icinga environment. This overrides the `Environment` constant specified in the configuration or on the CLI with `--define`. Defaults to empty. + +### IcingaDB + +The IcingaDB object implements the [icingadb feauture](14-features.md#core-backends-icingadb). + +Example: + +``` +object IcingaDB "icingadb" { + //host = "127.0.0.1" + //port = 6379 + //password = "xxx" +} +``` + +Configuration Attributes: + + Name | Type | Description + --------------------------|-----------------------|---------------------------------- + host | String | **Optional.** Redis host for IcingaDB. Defaults to `127.0.0.1`. + port | Number | **Optional.** Redis port for IcingaDB. Defaults to `6379`. + path | String | **Optional.** Redix unix socket path. Can be used instead of `host` and `port` attributes. + password | String | **Optional.** Redis auth password for IcingaDB. + ### IdoMySqlConnection IDO database adapter for MySQL. diff --git a/doc/14-features.md b/doc/14-features.md index c119abd91..4b56737bf 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -44,6 +44,30 @@ By default, log files will be rotated daily. The REST API is documented [here](12-icinga2-api.md#icinga2-api) as a core feature. +### Icinga DB + +Icinga DB provides a new core backend and aims to replace the IDO backend +output. It consists of different components: + +* Icinga 2 provides the `icingadb` feature which stores monitoring data in a memory database +* The [IcingaDB service](https://github.com/icinga/icingadb) collects and synchronizes monitoring data into its backend +* Icinga Web reads monitoring data from the new IcingaDB backend + +Requirements: + +* Local Redis instance +* MySQL/MariaDB server with `icingadb` database, user and schema imports +* Icinga 2's `icingadb` feature enabled +* IcingaDB service requires Redis and MySQL/MariaDB server +* Icinga Web module + +> TODO: Detailed instructions. + +``` +icinga2 feature enable icingadb +``` + + ### IDO Database (DB IDO) The IDO (Icinga Data Output) feature for Icinga 2 takes care of exporting all diff --git a/doc/15-troubleshooting.md b/doc/15-troubleshooting.md index 5e7cc43ae..3ae53ce4f 100644 --- a/doc/15-troubleshooting.md +++ b/doc/15-troubleshooting.md @@ -1626,9 +1626,9 @@ it is valid to just sync their zones via the config sync. The following restores the Zone/Endpoint objects as config objects outside of `zones.d` in your master/satellite's zones.conf with rendering them as external objects in the Director. -[Example](06-distributed-monitoring.md#three-levels-with-masters-satellites-and-agents) +[Example](06-distributed-monitoring.md#distributed-monitoring-scenarios-master-satellite-agents) for a 3 level setup with the masters and satellites knowing about the zone hierarchy -outside defined in [zones.conf](#zones-conf): +outside defined in [zones.conf](04-configuration.md#zones-conf): ``` object Endpoint "icinga-master1.localdomain" { diff --git a/doc/16-upgrading-icinga-2.md b/doc/16-upgrading-icinga-2.md index 7dacfb764..95e08a2d7 100644 --- a/doc/16-upgrading-icinga-2.md +++ b/doc/16-upgrading-icinga-2.md @@ -188,7 +188,7 @@ being set but no `zone` defined. The most convenient way with e.g. managing the objects in `conf.d` is to move them into the `master` zone. Please continue in the -[troubleshooting docs](#troubleshooting-cluster-command-endpoint-errors-agent-hosts-command-endpoint-zone) +[troubleshooting docs](15-troubleshooting.md#troubleshooting-cluster-command-endpoint-errors-agent-hosts-command-endpoint-zone) for further instructions. #### Config Sync diff --git a/etc/icinga2/features-available/icingadb.conf b/etc/icinga2/features-available/icingadb.conf new file mode 100644 index 000000000..9fe55ec8e --- /dev/null +++ b/etc/icinga2/features-available/icingadb.conf @@ -0,0 +1,5 @@ +object IcingaDB "icingadb" { + //host = "127.0.0.1" + //port = 6379 + //password = "xxx" +} diff --git a/icinga-app/CMakeLists.txt b/icinga-app/CMakeLists.txt index ee3443b28..ef71ad999 100644 --- a/icinga-app/CMakeLists.txt +++ b/icinga-app/CMakeLists.txt @@ -53,6 +53,10 @@ if(ICINGA2_WITH_PERFDATA) list(APPEND icinga_app_SOURCES $) endif() +if(ICINGA2_WITH_ICINGADB) + list(APPEND icinga_app_SOURCES $) +endif() + add_executable(icinga-app $ ${base_OBJS} diff --git a/lib/CMakeLists.txt b/lib/CMakeLists.txt index 004fc154c..aadbb39ad 100644 --- a/lib/CMakeLists.txt +++ b/lib/CMakeLists.txt @@ -53,4 +53,8 @@ if(ICINGA2_WITH_PERFDATA) add_subdirectory(perfdata) endif() +if(ICINGA2_WITH_ICINGADB) + add_subdirectory(icingadb) +endif() + set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE) diff --git a/lib/icinga/checkable-check.cpp b/lib/icinga/checkable-check.cpp index 02bc8fc91..94940b29f 100644 --- a/lib/icinga/checkable-check.cpp +++ b/lib/icinga/checkable-check.cpp @@ -300,6 +300,7 @@ void Checkable::ProcessCheckResult(const CheckResult::Ptr& cr, const MessageOrig if (hardChange || is_volatile) { SetLastHardStateRaw(new_state); SetLastHardStateChange(now); + SetLastHardStatesRaw(GetLastHardStatesRaw() / 100u + new_state * 100u); } if (!IsStateOK(new_state)) diff --git a/lib/icinga/checkable.hpp b/lib/icinga/checkable.hpp index 032910adb..498cb1d8f 100644 --- a/lib/icinga/checkable.hpp +++ b/lib/icinga/checkable.hpp @@ -39,23 +39,6 @@ enum CheckableType CheckableService }; -/** - * Severity Flags - * - * @ingroup icinga - */ -enum SeverityFlag -{ - SeverityFlagDowntime = 1, - SeverityFlagAcknowledgement = 2, - SeverityFlagHostDown = 4, - SeverityFlagUnhandled = 8, - SeverityFlagPending = 16, - SeverityFlagWarning = 32, - SeverityFlagUnknown = 64, - SeverityFlagCritical = 128, -}; - class CheckCommand; class EventCommand; class Dependency; diff --git a/lib/icinga/checkable.ti b/lib/icinga/checkable.ti index a920d2589..07a946258 100644 --- a/lib/icinga/checkable.ti +++ b/lib/icinga/checkable.ti @@ -105,6 +105,9 @@ abstract class Checkable : CustomVarObject [state, enum, no_user_view, no_user_modify] ServiceState last_hard_state_raw { default {{{ return ServiceUnknown; }}} }; + [state, no_user_view, no_user_modify] "unsigned short" last_hard_states_raw { + default {{{ return /* current */ 99 * 100 + /* previous */ 99; }}} + }; [state, enum] StateType last_state_type { default {{{ return StateTypeSoft; }}} }; diff --git a/lib/icinga/compatutility.cpp b/lib/icinga/compatutility.cpp index 305d3b247..40c01f397 100644 --- a/lib/icinga/compatutility.cpp +++ b/lib/icinga/compatutility.cpp @@ -249,7 +249,7 @@ std::set CompatUtility::GetCheckableNotificationUserGroups(const return usergroups; } -/* Used in DB IDO, StatusDataWriter, Livestatus, CompatLogger, GelfWriter. */ +/* Used in DB IDO, StatusDataWriter, Livestatus, CompatLogger, GelfWriter, IcingaDB. */ String CompatUtility::GetCheckResultOutput(const CheckResult::Ptr& cr) { if (!cr) @@ -264,7 +264,7 @@ String CompatUtility::GetCheckResultOutput(const CheckResult::Ptr& cr) return raw_output.SubStr(0, line_end); } -/* Used in DB IDO, StatusDataWriter and Livestatus. */ +/* Used in DB IDO, StatusDataWriter and Livestatus, IcingaDB. */ String CompatUtility::GetCheckResultLongOutput(const CheckResult::Ptr& cr) { if (!cr) diff --git a/lib/icinga/host.cpp b/lib/icinga/host.cpp index 9744eed46..7bb1c434d 100644 --- a/lib/icinga/host.cpp +++ b/lib/icinga/host.cpp @@ -164,32 +164,39 @@ HostState Host::GetLastHardState() const return CalculateState(GetLastHardStateRaw()); } -/* keep in sync with Service::GetSeverity() */ +/* keep in sync with Service::GetSeverity() + * One could think it may be smart to use an enum and some bitmask math here. + * But the only thing the consuming icingaweb2 cares about is being able to + * sort by severity. It is therefore easier to keep them seperated here. */ int Host::GetSeverity() const { int severity = 0; ObjectLock olock(this); - ServiceState state = GetStateRaw(); + HostState state = GetState(); - /* OK/Warning = Up, Critical/Unknownb = Down */ - if (!HasBeenChecked()) - severity |= SeverityFlagPending; - else if (state == ServiceUnknown) - severity |= SeverityFlagCritical; - else if (state == ServiceCritical) - severity |= SeverityFlagCritical; + if (!HasBeenChecked()) { + severity = 16; + } else if (state == HostUp) { + severity = 0; + } else { + if (IsReachable()) + severity = 64; + else + severity = 32; - if (IsInDowntime()) - severity |= SeverityFlagDowntime; - else if (IsAcknowledged()) - severity |= SeverityFlagAcknowledgement; - else - severity |= SeverityFlagUnhandled; + if (IsAcknowledged()) + severity += 512; + else if (IsInDowntime()) + severity += 256; + else + severity += 2048; + } olock.Unlock(); return severity; + } bool Host::IsStateOK(ServiceState state) const diff --git a/lib/icinga/service.cpp b/lib/icinga/service.cpp index e420b64c3..ec80aa6dc 100644 --- a/lib/icinga/service.cpp +++ b/lib/icinga/service.cpp @@ -103,32 +103,50 @@ Host::Ptr Service::GetHost() const return m_Host; } -/* keep in sync with Host::GetSeverity() */ +/* keep in sync with Host::GetSeverity() + * One could think it may be smart to use an enum and some bitmask math here. + * But the only thing the consuming icingaweb2 cares about is being able to + * sort by severity. It is therefore easier to keep them seperated here. */ int Service::GetSeverity() const { - int severity = 0; + int severity; ObjectLock olock(this); ServiceState state = GetStateRaw(); - if (!HasBeenChecked()) - severity |= SeverityFlagPending; - else if (state == ServiceWarning) - severity |= SeverityFlagWarning; - else if (state == ServiceUnknown) - severity |= SeverityFlagUnknown; - else if (state == ServiceCritical) - severity |= SeverityFlagCritical; + if (!HasBeenChecked()) { + severity = 16; + } else if (state == ServiceOK) { + severity = 0; + } else { + switch (state) { + case ServiceWarning: + severity = 32; + break; + case ServiceUnknown: + severity = 64; + break; + case ServiceCritical: + severity = 128; + break; + default: + severity = 256; + } - /* TODO: Add host reachability and handled */ - if (IsInDowntime()) - severity |= SeverityFlagDowntime; - else if (IsAcknowledged()) - severity |= SeverityFlagAcknowledgement; - else if (m_Host && m_Host->GetProblem()) - severity |= SeverityFlagHostDown; - else - severity |= SeverityFlagUnhandled; + Host::Ptr host = GetHost(); + ObjectLock hlock (host); + if (host->GetState() != HostUp || !host->IsReachable()) { + severity += 1024; + } else { + if (IsAcknowledged()) + severity += 512; + else if (IsInDowntime()) + severity += 256; + else + severity += 2048; + } + hlock.Unlock(); + } olock.Unlock(); diff --git a/lib/icingadb/CMakeLists.txt b/lib/icingadb/CMakeLists.txt new file mode 100644 index 000000000..71a7c67f2 --- /dev/null +++ b/lib/icingadb/CMakeLists.txt @@ -0,0 +1,29 @@ +# Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ + +mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp) + +set(icingadb_SOURCES + icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp +) + +if(ICINGA2_UNITY_BUILD) + mkunity_target(icingadb icingadb icingadb_SOURCES) +endif() + +add_library(icingadb OBJECT ${icingadb_SOURCES}) + +include_directories(${icinga2_SOURCE_DIR}/third-party) + +add_dependencies(icingadb base config icinga remote) + +set_target_properties ( + icingadb PROPERTIES + FOLDER Components +) + +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/icingadb.conf + ${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available +) + +set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE) diff --git a/lib/icingadb/icingadb-objects.cpp b/lib/icingadb/icingadb-objects.cpp new file mode 100644 index 000000000..0869aeb97 --- /dev/null +++ b/lib/icingadb/icingadb-objects.cpp @@ -0,0 +1,1783 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "icingadb/redisconnection.hpp" +#include "base/configtype.hpp" +#include "base/configobject.hpp" +#include "base/json.hpp" +#include "base/logger.hpp" +#include "base/serializer.hpp" +#include "base/tlsutility.hpp" +#include "base/initialize.hpp" +#include "base/convert.hpp" +#include "base/array.hpp" +#include "base/exception.hpp" +#include "base/utility.hpp" +#include "icinga/command.hpp" +#include "icinga/compatutility.hpp" +#include "icinga/customvarobject.hpp" +#include "icinga/host.hpp" +#include "icinga/service.hpp" +#include "icinga/hostgroup.hpp" +#include "icinga/servicegroup.hpp" +#include "icinga/usergroup.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/eventcommand.hpp" +#include "icinga/notificationcommand.hpp" +#include "icinga/timeperiod.hpp" +#include "icinga/pluginutility.hpp" +#include "remote/zone.hpp" +#include +#include +#include +#include +#include + +using namespace icinga; + +static const char * const l_LuaResetDump = R"EOF( + +local id = redis.call('XADD', KEYS[1], '*', 'type', '*', 'state', 'wip') + +local xr = redis.call('XRANGE', KEYS[1], '-', '+') +for i = 1, #xr - 1 do + redis.call('XDEL', KEYS[1], xr[i][1]) +end + +return id + +)EOF"; + +INITIALIZE_ONCE(&IcingaDB::ConfigStaticInitialize); + +void IcingaDB::ConfigStaticInitialize() +{ + /* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */ + Checkable::OnStateChange.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) { + IcingaDB::StateChangeHandler(checkable, cr, type); + }); + + /* triggered when acknowledged host/service goes back to ok and when the acknowledgement gets deleted */ + Checkable::OnAcknowledgementCleared.connect([](const Checkable::Ptr& checkable, const MessageOrigin::Ptr&) { + IcingaDB::StateChangeHandler(checkable); + }); + + /* triggered on create, update and delete objects */ + ConfigObject::OnActiveChanged.connect([](const ConfigObject::Ptr& object, const Value&) { + IcingaDB::VersionChangedHandler(object); + }); + ConfigObject::OnVersionChanged.connect([](const ConfigObject::Ptr& object, const Value&) { + IcingaDB::VersionChangedHandler(object); + }); + + /* fixed downtime start */ + Downtime::OnDowntimeStarted.connect(&IcingaDB::DowntimeStartedHandler); + /* flexible downtime start */ + Downtime::OnDowntimeTriggered.connect(&IcingaDB::DowntimeStartedHandler); + /* fixed/flexible downtime end or remove */ + Downtime::OnDowntimeRemoved.connect(&IcingaDB::DowntimeRemovedHandler); + + Checkable::OnNotificationSentToAllUsers.connect([]( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set& users, + const NotificationType& type, const CheckResult::Ptr& cr, const String& author, const String& text, + const MessageOrigin::Ptr& + ) { + IcingaDB::NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text); + }); + + Comment::OnCommentAdded.connect(&IcingaDB::CommentAddedHandler); + Comment::OnCommentRemoved.connect(&IcingaDB::CommentRemovedHandler); + + Checkable::OnFlappingChanged.connect(&IcingaDB::FlappingChangedHandler); +} + +static std::pair SplitOutput(String output) +{ + String longOutput; + auto pos (output.Find("\n")); + + if (pos != String::NPos) { + longOutput = output.SubStr(pos + 1u); + output.erase(output.Begin() + pos, output.End()); + } + + return {std::move(output), std::move(longOutput)}; +} + +void IcingaDB::UpdateAllConfigObjects() +{ + double startTime = Utility::GetTime(); + + // Use a Workqueue to pack objects in parallel + WorkQueue upq(25000, Configuration::Concurrency); + upq.SetName("IcingaDB:ConfigDump"); + + typedef std::pair TypePair; + std::vector types; + + for (const Type::Ptr& type : Type::GetAllTypes()) { + ConfigType *ctype = dynamic_cast(type.get()); + if (!ctype) + continue; + + String lcType(type->GetName().ToLower()); + types.emplace_back(ctype, lcType); + } + + m_Rcon->FireAndForgetQuery({"EVAL", l_LuaResetDump, "1", "icinga:dump"}); + + const std::vector globalKeys = { + m_PrefixConfigObject + "customvar", + m_PrefixConfigObject + "action_url", + m_PrefixConfigObject + "notes_url", + m_PrefixConfigObject + "icon_image", + }; + DeleteKeys(globalKeys); + + upq.ParallelFor(types, [this](const TypePair& type) { + String lcType = type.second; + + std::vector keys = GetTypeObjectKeys(lcType); + DeleteKeys(keys); + + auto objectChunks (ChunkObjects(type.first->GetObjects(), 500)); + + WorkQueue upqObjectType(25000, Configuration::Concurrency); + upqObjectType.SetName("IcingaDB:ConfigDump:" + lcType); + + upqObjectType.ParallelFor(objectChunks, [this, &type, &lcType](decltype(objectChunks)::const_reference chunk) { + std::map> hMSets, publishes; + std::vector states = {"HMSET", m_PrefixStateObject + lcType}; + std::vector > transaction = {{"MULTI"}}; + + bool dumpState = (lcType == "host" || lcType == "service"); + + size_t bulkCounter = 0; + for (const ConfigObject::Ptr& object : chunk) { + if (lcType != GetLowerCaseTypeNameDB(object)) + continue; + + CreateConfigUpdate(object, lcType, hMSets, publishes, false); + + // Write out inital state for checkables + if (dumpState) { + states.emplace_back(GetObjectIdentifier(object)); + states.emplace_back(JsonEncode(SerializeState(dynamic_pointer_cast(object)))); + } + + bulkCounter++; + if (!(bulkCounter % 100)) { + for (auto& kv : hMSets) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } + + if (states.size() > 2) { + transaction.emplace_back(std::move(states)); + states = {"HMSET", m_PrefixStateObject + lcType}; + } + + for (auto& kv : publishes) { + for (auto& message : kv.second) { + std::vector publish; + + publish.reserve(3); + publish.emplace_back("PUBLISH"); + publish.emplace_back(kv.first); + publish.emplace_back(std::move(message)); + + transaction.emplace_back(std::move(publish)); + } + } + + hMSets = decltype(hMSets)(); + publishes = decltype(publishes)(); + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + m_Rcon->FireAndForgetQueries(std::move(transaction)); + transaction = {{"MULTI"}}; + } + } + } + + for (auto& kv : hMSets) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } + + if (states.size() > 2) + transaction.emplace_back(std::move(states)); + + for (auto& kv : publishes) { + for (auto& message : kv.second) { + std::vector publish; + + publish.reserve(3); + publish.emplace_back("PUBLISH"); + publish.emplace_back(kv.first); + publish.emplace_back(std::move(message)); + + transaction.emplace_back(std::move(publish)); + } + } + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + m_Rcon->FireAndForgetQueries(std::move(transaction)); + } + + Log(LogNotice, "IcingaDB") + << "Dumped " << bulkCounter << " objects of type " << type.second; + }); + + upqObjectType.Join(); + + if (upqObjectType.HasExceptions()) { + for (boost::exception_ptr exc : upqObjectType.GetExceptions()) { + if (exc) { + boost::rethrow_exception(exc); + } + } + } + + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", lcType, "state", "done"}); + }); + + upq.Join(); + + if (upq.HasExceptions()) { + for (boost::exception_ptr exc : upq.GetExceptions()) { + try { + if (exc) { + boost::rethrow_exception(exc); + } + } catch(const std::exception& e) { + Log(LogCritical, "IcingaDB") + << "Exception during ConfigDump: " << e.what(); + } + } + } + + m_Rcon->FireAndForgetQuery({"XADD", "icinga:dump", "*", "type", "*", "state", "done"}); + + Log(LogInformation, "IcingaDB") + << "Initial config/status dump finished in " << Utility::GetTime() - startTime << " seconds."; +} + +std::vector>> IcingaDB::ChunkObjects(std::vector> objects, size_t chunkSize) { + std::vector>> chunks; + auto offset (objects.begin()); + auto end (objects.end()); + + chunks.reserve((std::distance(offset, end) + chunkSize - 1) / chunkSize); + + while (std::distance(offset, end) >= chunkSize) { + auto until (offset + chunkSize); + chunks.emplace_back(offset, until); + offset = until; + } + + if (offset != end) { + chunks.emplace_back(offset, end); + } + + return std::move(chunks); +} + +void IcingaDB::DeleteKeys(const std::vector& keys) { + std::vector query = {"DEL"}; + for (auto& key : keys) { + query.emplace_back(key); + } + + m_Rcon->FireAndForgetQuery(std::move(query)); +} + +std::vector IcingaDB::GetTypeObjectKeys(const String& type) +{ + std::vector keys = { + m_PrefixConfigObject + type, + m_PrefixConfigCheckSum + type, + m_PrefixConfigObject + type + ":customvar", + m_PrefixConfigCheckSum + type + ":customvar", + }; + + if (type == "host" || type == "service" || type == "user") { + keys.emplace_back(m_PrefixConfigObject + type + ":groupmember"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":groupmember"); + keys.emplace_back(m_PrefixStateObject + type); + } else if (type == "timeperiod") { + keys.emplace_back(m_PrefixConfigObject + type + ":override:include"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":override:include"); + keys.emplace_back(m_PrefixConfigObject + type + ":override:exclude"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":override:exclude"); + keys.emplace_back(m_PrefixConfigObject + type + ":range"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":range"); + } else if (type == "zone") { + keys.emplace_back(m_PrefixConfigObject + type + ":parent"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":parent"); + } else if (type == "notification") { + keys.emplace_back(m_PrefixConfigObject + type + ":user"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":user"); + keys.emplace_back(m_PrefixConfigObject + type + ":usergroup"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":usergroup"); + } else if (type == "checkcommand" || type == "notificationcommand" || type == "eventcommand") { + keys.emplace_back(m_PrefixConfigObject + type + ":envvar"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":envvar"); + keys.emplace_back(m_PrefixConfigObject + type + ":argument"); + keys.emplace_back(m_PrefixConfigCheckSum + type + ":argument"); + } + + return std::move(keys); +} + +template +static ConfigObject::Ptr GetObjectByName(const String& name) +{ + return ConfigObject::GetObject(name); +} + +void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, + std::map>& publishes, bool runtimeUpdate) +{ + String objectKey = GetObjectIdentifier(object); + CustomVarObject::Ptr customVarObject = dynamic_pointer_cast(object); + String envId = CalculateCheckSumString(GetEnvironment()); + auto* configUpdates (runtimeUpdate ? &publishes["icinga:config:update"] : nullptr); + + if (customVarObject) { + auto vars(SerializeVars(customVarObject)); + if (vars) { + auto& typeCvs (hMSets[m_PrefixConfigObject + typeName + ":customvar"]); + auto& allCvs (hMSets[m_PrefixConfigObject + "customvar"]); + auto& cvChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":customvar"]); + + cvChksms.emplace_back(objectKey); + cvChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumVars(customVarObject)}}))); + + ObjectLock varsLock(vars); + Array::Ptr varsArray(new Array); + + varsArray->Reserve(vars->GetLength()); + + for (auto& kv : vars) { + allCvs.emplace_back(kv.first); + allCvs.emplace_back(JsonEncode(kv.second)); + + if (configUpdates) { + configUpdates->emplace_back("customvar:" + kv.first); + } + + String id = CalculateCheckSumArray(new Array({envId, kv.first, objectKey})); + typeCvs.emplace_back(id); + typeCvs.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", envId}, {"customvar_id", kv.first}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":customvar:" + id); + } + } + } + } + + Type::Ptr type = object->GetReflectionType(); + if (type == Host::TypeInstance || type == Service::TypeInstance) { + Checkable::Ptr checkable = static_pointer_cast(object); + + String actionUrl = checkable->GetActionUrl(); + String notesUrl = checkable->GetNotesUrl(); + String iconImage = checkable->GetIconImage(); + if (!actionUrl.IsEmpty()) { + auto& actionUrls (hMSets[m_PrefixConfigObject + "action_url"]); + actionUrls.emplace_back(CalculateCheckSumArray(new Array({envId, actionUrl}))); + actionUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", envId}, {"action_url", actionUrl}}))); + + if (configUpdates) { + configUpdates->emplace_back("action_url:" + actionUrls.at(actionUrls.size() - 2u)); + } + } + if (!notesUrl.IsEmpty()) { + auto& notesUrls (hMSets[m_PrefixConfigObject + "notes_url"]); + notesUrls.emplace_back(CalculateCheckSumArray(new Array({envId, notesUrl}))); + notesUrls.emplace_back(JsonEncode(new Dictionary({{"environment_id", envId}, {"notes_url", notesUrl}}))); + + if (configUpdates) { + configUpdates->emplace_back("notes_url:" + notesUrls.at(notesUrls.size() - 2u)); + } + } + if (!iconImage.IsEmpty()) { + auto& iconImages (hMSets[m_PrefixConfigObject + "icon_image"]); + iconImages.emplace_back(CalculateCheckSumArray(new Array({envId, iconImage}))); + iconImages.emplace_back(JsonEncode(new Dictionary({{"environment_id", envId}, {"icon_image", iconImage}}))); + + if (configUpdates) { + configUpdates->emplace_back("icon_image:" + iconImages.at(iconImages.size() - 2u)); + } + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + ConfigObject::Ptr (*getGroup)(const String& name); + Array::Ptr groups; + if (service) { + groups = service->GetGroups(); + getGroup = &::GetObjectByName; + } else { + groups = host->GetGroups(); + getGroup = &::GetObjectByName; + } + + if (groups) { + ObjectLock groupsLock(groups); + Array::Ptr groupIds(new Array); + + groupIds->Reserve(groups->GetLength()); + + auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]); + auto& memberChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":groupmember"]); + + for (auto& group : groups) { + String groupId = GetObjectIdentifier((*getGroup)(group)); + String id = CalculateCheckSumArray(new Array({envId, groupId, objectKey})); + members.emplace_back(id); + members.emplace_back(JsonEncode(new Dictionary({{"object_id", objectKey}, {"environment_id", envId}, {"group_id", groupId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":groupmember:" + id); + } + + groupIds->Add(groupId); + } + + memberChksms.emplace_back(objectKey); + memberChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(groupIds)}}))); + } + + return; + } + + if (type == TimePeriod::TypeInstance) { + TimePeriod::Ptr timeperiod = static_pointer_cast(object); + + Dictionary::Ptr ranges = timeperiod->GetRanges(); + if (ranges) { + ObjectLock rangesLock(ranges); + Array::Ptr rangeIds(new Array); + auto& typeRanges (hMSets[m_PrefixConfigObject + typeName + ":range"]); + auto& rangeChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":range"]); + + rangeIds->Reserve(ranges->GetLength()); + + for (auto& kv : ranges) { + String rangeId = CalculateCheckSumArray(new Array({envId, kv.first, kv.second})); + rangeIds->Add(rangeId); + + String id = CalculateCheckSumArray(new Array({envId, rangeId, objectKey})); + typeRanges.emplace_back(id); + typeRanges.emplace_back(JsonEncode(new Dictionary({{"environment_id", envId}, {"timeperiod_id", objectKey}, {"range_key", kv.first}, {"range_value", kv.second}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":range:" + id); + } + } + + rangeChksms.emplace_back(objectKey); + rangeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(rangeIds)}}))); + } + + Array::Ptr includes; + ConfigObject::Ptr (*getInclude)(const String& name); + includes = timeperiod->GetIncludes(); + getInclude = &::GetObjectByName; + + Array::Ptr includeChecksums = new Array(); + + ObjectLock includesLock(includes); + ObjectLock includeChecksumsLock(includeChecksums); + + includeChecksums->Reserve(includes->GetLength()); + + + auto& includs (hMSets[m_PrefixConfigObject + typeName + ":override:include"]); + auto& includeChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":override:include"]); + for (auto include : includes) { + String includeId = GetObjectIdentifier((*getInclude)(include.Get())); + includeChecksums->Add(includeId); + + String id = CalculateCheckSumArray(new Array({envId, includeId, objectKey})); + includs.emplace_back(id); + includs.emplace_back(JsonEncode(new Dictionary({{"environment_id", envId}, {"timeperiod_id", objectKey}, {"include_id", includeId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":override:include:" + id); + } + } + + includeChksms.emplace_back(objectKey); + includeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(includes)}}))); + + Array::Ptr excludes; + ConfigObject::Ptr (*getExclude)(const String& name); + + excludes = timeperiod->GetExcludes(); + getExclude = &::GetObjectByName; + + Array::Ptr excludeChecksums = new Array(); + + ObjectLock excludesLock(excludes); + ObjectLock excludeChecksumsLock(excludeChecksums); + + excludeChecksums->Reserve(excludes->GetLength()); + + auto& excluds (hMSets[m_PrefixConfigObject + typeName + ":override:exclude"]); + auto& excludeChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":override:exclude"]); + + for (auto exclude : excludes) { + String excludeId = GetObjectIdentifier((*getExclude)(exclude.Get())); + excludeChecksums->Add(excludeId); + + String id = CalculateCheckSumArray(new Array({envId, excludeId, objectKey})); + excluds.emplace_back(id); + excluds.emplace_back(JsonEncode(new Dictionary({{"environment_id", envId}, {"timeperiod_id", objectKey}, {"exclude_id", excludeId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":override:exclude:" + id); + } + } + + excludeChksms.emplace_back(objectKey); + excludeChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(excludes)}}))); + + return; + } + + if (type == Zone::TypeInstance) { + Zone::Ptr zone = static_pointer_cast(object); + + Array::Ptr parents(new Array); + auto parentsRaw (zone->GetAllParentsRaw()); + + parents->Reserve(parentsRaw.size()); + + auto& parnts (hMSets[m_PrefixConfigObject + typeName + ":parent"]); + auto& parentChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":parent"]); + + for (auto& parent : parentsRaw) { + String parentId = GetObjectIdentifier(parent); + String id = CalculateCheckSumArray(new Array({envId, parentId, objectKey})); + parnts.emplace_back(id); + parnts.emplace_back(JsonEncode(new Dictionary({{"zone_id", objectKey}, {"environment_id", envId}, {"parent_id", parentId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":parent:" + id); + } + + parents->Add(GetObjectIdentifier(parent)); + } + + parentChksms.emplace_back(objectKey); + parentChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(zone->GetAllParents())}}))); + + return; + } + + if (type == User::TypeInstance) { + User::Ptr user = static_pointer_cast(object); + + Array::Ptr groups; + ConfigObject::Ptr (*getGroup)(const String& name); + + groups = user->GetGroups(); + getGroup = &::GetObjectByName; + + if (groups) { + ObjectLock groupsLock(groups); + Array::Ptr groupIds(new Array); + + groupIds->Reserve(groups->GetLength()); + + auto& members (hMSets[m_PrefixConfigObject + typeName + ":groupmember"]); + auto& memberChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":groupmember"]); + + for (auto& group : groups) { + String groupId = GetObjectIdentifier((*getGroup)(group)); + String id = CalculateCheckSumArray(new Array({envId, groupId, objectKey})); + members.emplace_back(id); + members.emplace_back(JsonEncode(new Dictionary({{"user_id", objectKey}, {"environment_id", envId}, {"group_id", groupId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":groupmember:" + id); + } + + groupIds->Add(groupId); + } + + memberChksms.emplace_back(objectKey); + memberChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(groupIds)}}))); + } + + return; + } + + if (type == Notification::TypeInstance) { + Notification::Ptr notification = static_pointer_cast(object); + + std::set users = notification->GetUsers(); + Array::Ptr userIds = new Array(); + + auto usergroups(notification->GetUserGroups()); + Array::Ptr usergroupIds = new Array(); + + userIds->Reserve(users.size()); + + auto& usrs (hMSets[m_PrefixConfigObject + typeName + ":user"]); + auto& userChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":user"]); + + for (auto& user : users) { + String userId = GetObjectIdentifier(user); + String id = CalculateCheckSumArray(new Array({envId, userId, objectKey})); + usrs.emplace_back(id); + usrs.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", envId}, {"user_id", userId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":user:" + id); + } + + userIds->Add(userId); + } + + userChksms.emplace_back(objectKey); + userChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(userIds)}}))); + + usergroupIds->Reserve(usergroups.size()); + + auto& groups (hMSets[m_PrefixConfigObject + typeName + ":usergroup"]); + auto& groupChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":usergroup"]); + + for (auto& usergroup : usergroups) { + String usergroupId = GetObjectIdentifier(usergroup); + String id = CalculateCheckSumArray(new Array({envId, usergroupId, objectKey})); + groups.emplace_back(id); + groups.emplace_back(JsonEncode(new Dictionary({{"notification_id", objectKey}, {"environment_id", envId}, {"usergroup_id", usergroupId}}))); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":usergroup:" + id); + } + + usergroupIds->Add(usergroupId); + } + + groupChksms.emplace_back(objectKey); + groupChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", CalculateCheckSumArray(usergroupIds)}}))); + + return; + } + + if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { + Command::Ptr command = static_pointer_cast(object); + + Dictionary::Ptr arguments = command->GetArguments(); + if (arguments) { + ObjectLock argumentsLock(arguments); + auto& typeArgs (hMSets[m_PrefixConfigObject + typeName + ":argument"]); + auto& argChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":argument"]); + + for (auto& kv : arguments) { + Dictionary::Ptr values; + if (kv.second.IsObjectType()) { + values = kv.second; + values = values->ShallowClone(); + } else if (kv.second.IsObjectType()) { + values = new Dictionary({{"value", JsonEncode(kv.second)}}); + } else { + values = new Dictionary({{"value", kv.second}}); + } + + values->Set("value", JsonEncode(values->Get("value"))); + values->Set("command_id", objectKey); + values->Set("argument_key", kv.first); + values->Set("environment_id", envId); + + String id = HashValue(objectKey + kv.first + envId); + + typeArgs.emplace_back(id); + typeArgs.emplace_back(JsonEncode(values)); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":argument:" + id); + } + + argChksms.emplace_back(id); + argChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}}))); + } + } + + Dictionary::Ptr envvars = command->GetEnv(); + if (envvars) { + ObjectLock envvarsLock(envvars); + Array::Ptr envvarIds(new Array); + auto& typeVars (hMSets[m_PrefixConfigObject + typeName + ":envvar"]); + auto& varChksms (hMSets[m_PrefixConfigCheckSum + typeName + ":envvar"]); + + envvarIds->Reserve(envvars->GetLength()); + + for (auto& kv : envvars) { + Dictionary::Ptr values; + if (kv.second.IsObjectType()) { + values = kv.second; + values = values->ShallowClone(); + } else if (kv.second.IsObjectType()) { + values = new Dictionary({{"value", JsonEncode(kv.second)}}); + } else { + values = new Dictionary({{"value", kv.second}}); + } + + values->Set("value", JsonEncode(values->Get("value"))); + values->Set("command_id", objectKey); + values->Set("envvar_key", kv.first); + values->Set("environment_id", envId); + + String id = HashValue(objectKey + kv.first + envId); + + typeVars.emplace_back(id); + typeVars.emplace_back(JsonEncode(values)); + + if (configUpdates) { + configUpdates->emplace_back(typeName + ":envvar:" + id); + } + + varChksms.emplace_back(id); + varChksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(kv.second)}}))); + } + } + + return; + } +} + +void IcingaDB::UpdateState(const Checkable::Ptr& checkable) +{ + Dictionary::Ptr stateAttrs = SerializeState(checkable); + + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}); +} + +// Used to update a single object, used for runtime updates +void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + String typeName = GetLowerCaseTypeNameDB(object); + + std::map> hMSets, publishes; + std::vector states = {"HMSET", m_PrefixStateObject + typeName}; + + CreateConfigUpdate(object, typeName, hMSets, publishes, runtimeUpdate); + Checkable::Ptr checkable = dynamic_pointer_cast(object); + if (checkable) { + String objectKey = GetObjectIdentifier(object); + m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName, objectKey, JsonEncode(SerializeState(checkable))}); + publishes["icinga:config:update"].emplace_back("state:" + typeName + ":" + objectKey); + } + + std::vector > transaction = {{"MULTI"}}; + + for (auto& kv : hMSets) { + if (!kv.second.empty()) { + kv.second.insert(kv.second.begin(), {"HMSET", kv.first}); + transaction.emplace_back(std::move(kv.second)); + } + } + + for (auto& kv : publishes) { + for (auto& message : kv.second) { + std::vector publish; + + publish.reserve(3); + publish.emplace_back("PUBLISH"); + publish.emplace_back(kv.first); + publish.emplace_back(std::move(message)); + + transaction.emplace_back(std::move(publish)); + } + } + + if (transaction.size() > 1) { + transaction.push_back({"EXEC"}); + m_Rcon->FireAndForgetQueries(std::move(transaction)); + } +} + +// Takes object and collects IcingaDB relevant attributes and computes checksums. Returns whether the object is relevant +// for IcingaDB. +bool IcingaDB::PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checksums) +{ + attributes->Set("name_checksum", CalculateCheckSumString(object->GetName())); + attributes->Set("environment_id", CalculateCheckSumString(GetEnvironment())); + attributes->Set("name", object->GetName()); + + Zone::Ptr ObjectsZone = static_pointer_cast(object->GetZone()); + if (ObjectsZone) { + attributes->Set("zone_id", GetObjectIdentifier(ObjectsZone)); + attributes->Set("zone", ObjectsZone->GetName()); + } + + Type::Ptr type = object->GetReflectionType(); + + if (type == Endpoint::TypeInstance) { + return true; + } + + if (type == Zone::TypeInstance) { + Zone::Ptr zone = static_pointer_cast(object); + + attributes->Set("is_global", zone->GetGlobal()); + + Zone::Ptr parent = zone->GetParent(); + if (parent) { + attributes->Set("parent_id", GetObjectIdentifier(zone)); + } + + auto parentsRaw (zone->GetAllParentsRaw()); + attributes->Set("depth", parentsRaw.size()); + + return true; + } + + if (type == Host::TypeInstance || type == Service::TypeInstance) { + Checkable::Ptr checkable = static_pointer_cast(object); + + attributes->Set("checkcommand", checkable->GetCheckCommand()->GetName()); + attributes->Set("max_check_attempts", checkable->GetMaxCheckAttempts()); + attributes->Set("check_timeout", checkable->GetCheckTimeout()); + attributes->Set("check_interval", checkable->GetCheckInterval()); + attributes->Set("check_retry_interval", checkable->GetRetryInterval()); + attributes->Set("active_checks_enabled", checkable->GetEnableActiveChecks()); + attributes->Set("passive_checks_enabled", checkable->GetEnablePassiveChecks()); + attributes->Set("event_handler_enabled", checkable->GetEnableEventHandler()); + attributes->Set("notifications_enabled", checkable->GetEnableNotifications()); + attributes->Set("flapping_enabled", checkable->GetEnableFlapping()); + attributes->Set("flapping_threshold_low", checkable->GetFlappingThresholdLow()); + attributes->Set("flapping_threshold_high", checkable->GetFlappingThresholdHigh()); + attributes->Set("perfdata_enabled", checkable->GetEnablePerfdata()); + attributes->Set("is_volatile", checkable->GetVolatile()); + attributes->Set("notes", checkable->GetNotes()); + attributes->Set("icon_image_alt", checkable->GetIconImageAlt()); + + attributes->Set("checkcommand_id", GetObjectIdentifier(checkable->GetCheckCommand())); + + Endpoint::Ptr commandEndpoint = checkable->GetCommandEndpoint(); + if (commandEndpoint) { + attributes->Set("command_endpoint_id", GetObjectIdentifier(commandEndpoint)); + attributes->Set("command_endpoint", commandEndpoint->GetName()); + } + + TimePeriod::Ptr timePeriod = checkable->GetCheckPeriod(); + if (timePeriod) { + attributes->Set("check_timeperiod_id", GetObjectIdentifier(timePeriod)); + attributes->Set("check_timeperiod", timePeriod->GetName()); + } + + EventCommand::Ptr eventCommand = checkable->GetEventCommand(); + if (eventCommand) { + attributes->Set("eventcommand_id", GetObjectIdentifier(eventCommand)); + attributes->Set("eventcommand", eventCommand->GetName()); + } + + String actionUrl = checkable->GetActionUrl(); + String notesUrl = checkable->GetNotesUrl(); + String iconImage = checkable->GetIconImage(); + if (!actionUrl.IsEmpty()) + attributes->Set("action_url_id", CalculateCheckSumArray(new Array({CalculateCheckSumString(GetEnvironment()), actionUrl}))); + if (!notesUrl.IsEmpty()) + attributes->Set("notes_url_id", CalculateCheckSumArray(new Array({CalculateCheckSumString(GetEnvironment()), notesUrl}))); + if (!iconImage.IsEmpty()) + attributes->Set("icon_image_id", CalculateCheckSumArray(new Array({CalculateCheckSumString(GetEnvironment()), iconImage}))); + + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(checkable); + + if (service) { + attributes->Set("host_id", GetObjectIdentifier(service->GetHost())); + attributes->Set("display_name", service->GetDisplayName()); + + // Overwrite name here, `object->name` is 'HostName!ServiceName' but we only want the name of the Service + attributes->Set("name", service->GetShortName()); + } else { + attributes->Set("display_name", host->GetDisplayName()); + attributes->Set("address", host->GetAddress()); + attributes->Set("address6", host->GetAddress6()); + } + + return true; + } + + if (type == User::TypeInstance) { + User::Ptr user = static_pointer_cast(object); + + attributes->Set("display_name", user->GetDisplayName()); + attributes->Set("email", user->GetEmail()); + attributes->Set("pager", user->GetPager()); + attributes->Set("notifications_enabled", user->GetEnableNotifications()); + attributes->Set("states", user->GetStates()); + attributes->Set("types", user->GetTypes()); + + if (user->GetPeriod()) + attributes->Set("timeperiod_id", GetObjectIdentifier(user->GetPeriod())); + + return true; + } + + if (type == TimePeriod::TypeInstance) { + TimePeriod::Ptr timeperiod = static_pointer_cast(object); + + attributes->Set("display_name", timeperiod->GetDisplayName()); + attributes->Set("prefer_includes", timeperiod->GetPreferIncludes()); + return true; + } + + if (type == Notification::TypeInstance) { + Notification::Ptr notification = static_pointer_cast(object); + + Host::Ptr host; + Service::Ptr service; + + tie(host, service) = GetHostService(notification->GetCheckable()); + + attributes->Set("host_id", GetObjectIdentifier(host)); + attributes->Set("command_id", GetObjectIdentifier(notification->GetCommand())); + + if (service) + attributes->Set("service_id", GetObjectIdentifier(service)); + + TimePeriod::Ptr timeperiod = notification->GetPeriod(); + if (timeperiod) + attributes->Set("timeperiod_id", GetObjectIdentifier(timeperiod)); + + if (notification->GetTimes()) { + attributes->Set("times_begin", notification->GetTimes()->Get("begin")); + attributes->Set("times_end",notification->GetTimes()->Get("end")); + } + + attributes->Set("notification_interval", notification->GetInterval()); + attributes->Set("states", notification->GetStates()); + attributes->Set("types", notification->GetTypes()); + + return true; + } + + if (type == Comment::TypeInstance) { + Comment::Ptr comment = static_pointer_cast(object); + + attributes->Set("author", comment->GetAuthor()); + attributes->Set("text", comment->GetText()); + attributes->Set("entry_type", comment->GetEntryType()); + attributes->Set("entry_time", TimestampToMilliseconds(comment->GetEntryTime())); + attributes->Set("is_persistent", comment->GetPersistent()); + attributes->Set("is_sticky", comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky); + attributes->Set("expire_time", TimestampToMilliseconds(comment->GetExpireTime())); + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(comment->GetCheckable()); + if (service) { + attributes->Set("object_type", "service"); + attributes->Set("service_id", GetObjectIdentifier(service)); + attributes->Set("host_id", "00000000000000000000000000000000"); + } else { + attributes->Set("object_type", "host"); + attributes->Set("host_id", GetObjectIdentifier(host)); + attributes->Set("service_id", "00000000000000000000000000000000"); + } + + return true; + } + + if (type == Downtime::TypeInstance) { + Downtime::Ptr downtime = static_pointer_cast(object); + + attributes->Set("author", downtime->GetAuthor()); + attributes->Set("comment", downtime->GetComment()); + attributes->Set("entry_time", TimestampToMilliseconds(downtime->GetEntryTime())); + attributes->Set("scheduled_start_time", TimestampToMilliseconds(downtime->GetStartTime())); + attributes->Set("scheduled_end_time", TimestampToMilliseconds(downtime->GetEndTime())); + attributes->Set("flexible_duration", TimestampToMilliseconds(downtime->GetDuration())); + attributes->Set("is_flexible", !downtime->GetFixed()); + attributes->Set("is_in_effect", downtime->IsInEffect()); + if (downtime->IsInEffect()) { + attributes->Set("start_time", TimestampToMilliseconds(downtime->GetTriggerTime())); + attributes->Set("end_time", TimestampToMilliseconds(downtime->GetFixed() ? downtime->GetEndTime() : (downtime->GetTriggerTime() + downtime->GetDuration()))); + } + + Host::Ptr host; + Service::Ptr service; + tie(host, service) = GetHostService(downtime->GetCheckable()); + + if (service) { + attributes->Set("object_type", "service"); + attributes->Set("service_id", GetObjectIdentifier(service)); + attributes->Set("host_id", "00000000000000000000000000000000"); + } else { + attributes->Set("object_type", "host"); + attributes->Set("host_id", GetObjectIdentifier(host)); + attributes->Set("service_id", "00000000000000000000000000000000"); + } + + auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); + if (triggeredBy) { + attributes->Set("triggered_by_id", GetObjectIdentifier(triggeredBy)); + } + + return true; + } + + if (type == UserGroup::TypeInstance) { + UserGroup::Ptr userGroup = static_pointer_cast(object); + + attributes->Set("display_name", userGroup->GetDisplayName()); + + return true; + } + + if (type == HostGroup::TypeInstance) { + HostGroup::Ptr hostGroup = static_pointer_cast(object); + + attributes->Set("display_name", hostGroup->GetDisplayName()); + + return true; + } + + if (type == ServiceGroup::TypeInstance) { + ServiceGroup::Ptr serviceGroup = static_pointer_cast(object); + + attributes->Set("display_name", serviceGroup->GetDisplayName()); + + return true; + } + + if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) { + Command::Ptr command = static_pointer_cast(object); + + attributes->Set("command", JsonEncode(command->GetCommandLine())); + attributes->Set("timeout", command->GetTimeout()); + + return true; + } + + return false; +} + +/* Creates a config update with computed checksums etc. + * Writes attributes, customVars and checksums into the respective supplied vectors. Adds two values to each vector + * (if applicable), first the key then the value. To use in a Redis command the command (e.g. HSET) and the key (e.g. + * icinga:config:object:downtime) need to be prepended. There is nothing to indicate success or failure. + */ +void +IcingaDB::CreateConfigUpdate(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, + std::map>& publishes, bool runtimeUpdate) +{ + /* TODO: This isn't essentially correct as we don't keep track of config objects ourselves. This would avoid duplicated config updates at startup. + if (!runtimeUpdate && m_ConfigDumpInProgress) + return; + */ + + if (m_Rcon == nullptr) + return; + + Dictionary::Ptr attr = new Dictionary; + Dictionary::Ptr chksm = new Dictionary; + + if (!PrepareObject(object, attr, chksm)) + return; + + InsertObjectDependencies(object, typeName, hMSets, publishes, runtimeUpdate); + + String objectKey = GetObjectIdentifier(object); + auto& attrs (hMSets[m_PrefixConfigObject + typeName]); + auto& chksms (hMSets[m_PrefixConfigCheckSum + typeName]); + + attrs.emplace_back(objectKey); + attrs.emplace_back(JsonEncode(attr)); + + chksms.emplace_back(objectKey); + chksms.emplace_back(JsonEncode(new Dictionary({{"checksum", HashValue(attr)}}))); + + /* Send an update event to subscribers. */ + if (runtimeUpdate) { + publishes["icinga:config:update"].emplace_back(typeName + ":" + objectKey); + } +} + +void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object) +{ + String typeName = object->GetReflectionType()->GetName().ToLower(); + String objectKey = GetObjectIdentifier(object); + + m_Rcon->FireAndForgetQueries({ + {"HDEL", m_PrefixConfigObject + typeName, objectKey}, + {"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, + {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} + }); +} + +static +unsigned short GetPreviousHardState(const Checkable::Ptr& checkable, const Service::Ptr& service) +{ + auto phs (checkable->GetLastHardStatesRaw() % 100u); + + if (service) { + return phs; + } else { + return phs == 99 ? phs : Host::CalculateState(ServiceState(phs)); + } +} + +void IcingaDB::SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + Checkable::Ptr checkable = dynamic_pointer_cast(object); + if (!checkable) + return; + + Host::Ptr host; + Service::Ptr service; + + tie(host, service) = GetHostService(checkable); + + String streamname; + if (service) + streamname = "icinga:state:stream:service"; + else + streamname = "icinga:state:stream:host"; + + Dictionary::Ptr objectAttrs = SerializeState(checkable); + + std::vector streamadd({"XADD", streamname, "*"}); + ObjectLock olock(objectAttrs); + for (const Dictionary::Pair& kv : objectAttrs) { + streamadd.emplace_back(kv.first); + streamadd.emplace_back(Utility::ValidateUTF8(kv.second)); + } + + m_Rcon->FireAndForgetQuery(std::move(streamadd)); + + auto output (SplitOutput(cr ? cr->GetOutput() : "")); + + std::vector xAdd ({ + "XADD", "icinga:history:stream:state", "*", + "id", Utility::NewUniqueID(), + "environment_id", SHA1(GetEnvironment()), + "state_type", Convert::ToString(type), + "soft_state", Convert::ToString(cr ? cr->GetState() : 99), + "hard_state", Convert::ToString(service ? service->GetLastHardState() : host->GetLastHardState()), + "attempt", Convert::ToString(checkable->GetCheckAttempt()), + // TODO: last_hard/soft_state should be "previous". + "last_soft_state", Convert::ToString(cr ? cr->GetState() : 99), + "last_hard_state", Convert::ToString(service ? service->GetLastHardState() : host->GetLastHardState()), + "previous_hard_state", Convert::ToString(GetPreviousHardState(checkable, service)), + "output", Utility::ValidateUTF8(std::move(output.first)), + "long_output", Utility::ValidateUTF8(std::move(output.second)), + "check_source", cr->GetCheckSource(), + "max_check_attempts", Convert::ToString(checkable->GetMaxCheckAttempts()), + "event_time", Convert::ToString(TimestampToMilliseconds(cr ? cr->GetExecutionEnd() : Utility::GetTime())), + "event_id", Utility::NewUniqueID(), + "event_type", "state_change" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +void IcingaDB::SendSentNotification( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text +) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + auto service (dynamic_pointer_cast(checkable)); + + auto finalText = text; + if (finalText == "" && cr) { + finalText = cr->GetOutput(); + } + + std::vector xAdd ({ + "XADD", "icinga:history:stream:notification", "*", + "id", Utility::NewUniqueID(), + "environment_id", SHA1(GetEnvironment()), + "notification_id", GetObjectIdentifier(notification), + "type", Convert::ToString(type), + "state", Convert::ToString(cr->GetState()), + "previous_hard_state", Convert::ToString(GetPreviousHardState(checkable, service)), + "author", Utility::ValidateUTF8(author), + "text", Utility::ValidateUTF8(finalText), + "users_notified", Convert::ToString(users), + "event_time", Convert::ToString(TimestampToMilliseconds(Utility::GetTime())), + "event_id", Utility::NewUniqueID(), + "event_type", "notification" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + SendConfigUpdate(downtime, true); + + auto checkable (downtime->GetCheckable()); + auto service (dynamic_pointer_cast(checkable)); + auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); + + std::vector xAdd ({ + "XADD", "icinga:history:stream:downtime", "*", + "downtime_id", GetObjectIdentifier(downtime), + "environment_id", SHA1(GetEnvironment()), + "entry_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEntryTime())), + "author", Utility::ValidateUTF8(downtime->GetAuthor()), + "comment", Utility::ValidateUTF8(downtime->GetComment()), + "is_flexible", Convert::ToString((unsigned short)!downtime->GetFixed()), + "flexible_duration", Convert::ToString(downtime->GetDuration()), + "scheduled_start_time", Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime())), + "scheduled_end_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime())), + "has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()), + "trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())), + "event_id", Utility::NewUniqueID(), + "event_type", "downtime_start" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + if (triggeredBy) { + xAdd.emplace_back("triggered_by_id"); + xAdd.emplace_back(GetObjectIdentifier(triggeredBy)); + } + + if (downtime->GetFixed()) { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime()))); + } else { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime() + downtime->GetDuration()))); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + auto checkable (downtime->GetCheckable()); + auto service (dynamic_pointer_cast(checkable)); + auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy())); + + // Downtime never got triggered (didn't send "downtime_start") so we don't want to send "downtime_end" + if (downtime->GetTriggerTime() == 0) + return; + + std::vector xAdd ({ + "XADD", "icinga:history:stream:downtime", "*", + "downtime_id", GetObjectIdentifier(downtime), + "environment_id", SHA1(GetEnvironment()), + "entry_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEntryTime())), + "author", Utility::ValidateUTF8(downtime->GetAuthor()), + "comment", Utility::ValidateUTF8(downtime->GetComment()), + "is_flexible", Convert::ToString((unsigned short)!downtime->GetFixed()), + "flexible_duration", Convert::ToString(downtime->GetDuration()), + "scheduled_start_time", Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime())), + "scheduled_end_time", Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime())), + "has_been_cancelled", Convert::ToString((unsigned short)downtime->GetWasCancelled()), + "trigger_time", Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime())), + "cancel_time", Convert::ToString(TimestampToMilliseconds(Utility::GetTime())), + "event_id", Utility::NewUniqueID(), + "event_type", "downtime_end" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + if (triggeredBy) { + xAdd.emplace_back("triggered_by_id"); + xAdd.emplace_back(GetObjectIdentifier(triggeredBy)); + } + + if (downtime->GetFixed()) { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetStartTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetEndTime()))); + } else { + xAdd.emplace_back("start_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime()))); + xAdd.emplace_back("end_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(downtime->GetTriggerTime() + downtime->GetDuration()))); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +void IcingaDB::SendAddedComment(const Comment::Ptr& comment) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + auto checkable (comment->GetCheckable()); + auto service (dynamic_pointer_cast(checkable)); + + std::vector xAdd ({ + "XADD", "icinga:history:stream:comment", "*", + "comment_id", GetObjectIdentifier(comment), + "environment_id", SHA1(GetEnvironment()), + "entry_time", Convert::ToString(TimestampToMilliseconds(comment->GetEntryTime())), + "author", Utility::ValidateUTF8(comment->GetAuthor()), + "comment", Utility::ValidateUTF8(comment->GetText()), + "entry_type", Convert::ToString(comment->GetEntryType()), + "is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()), + "is_sticky", Convert::ToString((unsigned short)(comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky)), + "expire_time", Convert::ToString(TimestampToMilliseconds(comment->GetExpireTime())), + "event_id", Utility::NewUniqueID(), + "event_type", "comment_add" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +void IcingaDB::SendRemovedComment(const Comment::Ptr& comment) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + auto checkable (comment->GetCheckable()); + auto service (dynamic_pointer_cast(checkable)); + + std::vector xAdd ({ + "XADD", "icinga:history:stream:comment", "*", + "comment_id", GetObjectIdentifier(comment), + "environment_id", SHA1(GetEnvironment()), + "entry_time", Convert::ToString(TimestampToMilliseconds(comment->GetEntryTime())), + "author", Utility::ValidateUTF8(comment->GetAuthor()), + "comment", Utility::ValidateUTF8(comment->GetText()), + "entry_type", Convert::ToString(comment->GetEntryType()), + "is_persistent", Convert::ToString((unsigned short)comment->GetPersistent()), + "is_sticky", Convert::ToString((unsigned short)(comment->GetEntryType() == CommentAcknowledgement && comment->GetCheckable()->GetAcknowledgement() == AcknowledgementSticky)), + "expire_time", Convert::ToString(TimestampToMilliseconds(comment->GetExpireTime())), + "event_id", Utility::NewUniqueID(), + "event_type", "comment_remove" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + if (comment->GetExpireTime() < Utility::GetTime()) { + xAdd.emplace_back("remove_time"); + xAdd.emplace_back(Convert::ToString(TimestampToMilliseconds(Utility::GetTime()))); + xAdd.emplace_back("has_been_removed"); + xAdd.emplace_back("1"); + } else { + xAdd.emplace_back("has_been_removed"); + xAdd.emplace_back("0"); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +void IcingaDB::SendFlappingChanged(const Checkable::Ptr& checkable, const Value& value) +{ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + auto service (dynamic_pointer_cast(checkable)); + + std::vector xAdd ({ + "XADD", "icinga:history:stream:flapping", "*", + "id", Utility::NewUniqueID(), + "environment_id", SHA1(GetEnvironment()), + "percent_state_change", Convert::ToString(checkable->GetFlappingCurrent()), + "flapping_threshold_low", Convert::ToString(checkable->GetFlappingThresholdLow()), + "flapping_threshold_high", Convert::ToString(checkable->GetFlappingThresholdHigh()), + "event_time", Convert::ToString(TimestampToMilliseconds(Utility::GetTime())), + "event_id", Utility::NewUniqueID(), + "event_type", value.ToBool() ? "flapping_start" : "flapping_end" + }); + + if (service) { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("service"); + xAdd.emplace_back("service_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } else { + xAdd.emplace_back("object_type"); + xAdd.emplace_back("host"); + xAdd.emplace_back("host_id"); + xAdd.emplace_back(GetObjectIdentifier(checkable)); + } + + auto endpoint (Endpoint::GetLocalEndpoint()); + + if (endpoint) { + xAdd.emplace_back("endpoint_id"); + xAdd.emplace_back(GetObjectIdentifier(endpoint)); + } + + m_Rcon->FireAndForgetQuery(std::move(xAdd)); +} + +Dictionary::Ptr IcingaDB::SerializeState(const Checkable::Ptr& checkable) +{ + Dictionary::Ptr attrs = new Dictionary(); + + Host::Ptr host; + Service::Ptr service; + + tie(host, service) = GetHostService(checkable); + + attrs->Set("id", GetObjectIdentifier(checkable));; + attrs->Set("environment_id", CalculateCheckSumString(GetEnvironment())); + attrs->Set("state_type", checkable->GetStateType()); + + // TODO: last_hard/soft_state should be "previous". + if (service) { + attrs->Set("state", service->GetState()); + attrs->Set("last_soft_state", service->GetState()); + attrs->Set("last_hard_state", service->GetLastHardState()); + attrs->Set("severity", service->GetSeverity()); + } else { + attrs->Set("state", host->GetState()); + attrs->Set("last_soft_state", host->GetState()); + attrs->Set("last_hard_state", host->GetLastHardState()); + attrs->Set("severity", host->GetSeverity()); + } + + attrs->Set("previous_hard_state", GetPreviousHardState(checkable, service)); + attrs->Set("check_attempt", checkable->GetCheckAttempt()); + + attrs->Set("is_active", checkable->IsActive()); + + CheckResult::Ptr cr = checkable->GetLastCheckResult(); + + if (cr) { + String rawOutput = cr->GetOutput(); + if (!rawOutput.IsEmpty()) { + size_t lineBreak = rawOutput.Find("\n"); + String output = rawOutput.SubStr(0, lineBreak); + if (!output.IsEmpty()) + attrs->Set("output", rawOutput.SubStr(0, lineBreak)); + + if (lineBreak > 0 && lineBreak != String::NPos) { + String longOutput = rawOutput.SubStr(lineBreak+1, rawOutput.GetLength()); + if (!longOutput.IsEmpty()) + attrs->Set("long_output", longOutput); + } + } + + String perfData = PluginUtility::FormatPerfdata(cr->GetPerformanceData()); + if (!perfData.IsEmpty()) + attrs->Set("performance_data", perfData); + + if (!cr->GetCommand().IsEmpty()) + attrs->Set("commandline", FormatCommandLine(cr->GetCommand())); + attrs->Set("execution_time", TimestampToMilliseconds(fmax(0.0, cr->CalculateExecutionTime()))); + attrs->Set("latency", TimestampToMilliseconds(cr->CalculateLatency())); + attrs->Set("check_source", cr->GetCheckSource()); + } + + bool isProblem = !checkable->IsStateOK(checkable->GetStateRaw()); + attrs->Set("is_problem", isProblem); + attrs->Set("is_handled", isProblem && (checkable->IsInDowntime() || checkable->IsAcknowledged())); + attrs->Set("is_reachable", checkable->IsReachable()); + attrs->Set("is_flapping", checkable->IsFlapping()); + + attrs->Set("is_acknowledged", checkable->IsAcknowledged()); + if (checkable->IsAcknowledged()) { + Timestamp entry = 0; + Comment::Ptr AckComment; + for (const Comment::Ptr& c : checkable->GetComments()) { + if (c->GetEntryType() == CommentAcknowledgement) { + if (c->GetEntryTime() > entry) { + entry = c->GetEntryTime(); + AckComment = c; + } + } + } + if (AckComment != nullptr) { + attrs->Set("acknowledgement_comment_id", GetObjectIdentifier(AckComment)); + } + } + + attrs->Set("in_downtime", checkable->IsInDowntime()); + + if (checkable->GetCheckTimeout().IsEmpty()) + attrs->Set("check_timeout", TimestampToMilliseconds(checkable->GetCheckCommand()->GetTimeout())); + else + attrs->Set("check_timeout", TimestampToMilliseconds(checkable->GetCheckTimeout())); + + attrs->Set("last_update", TimestampToMilliseconds(Utility::GetTime())); + attrs->Set("last_state_change", TimestampToMilliseconds(checkable->GetLastStateChange())); + attrs->Set("next_check", TimestampToMilliseconds(checkable->GetNextCheck())); + attrs->Set("next_update", TimestampToMilliseconds(checkable->GetNextUpdate())); + + return attrs; +} + +std::vector +IcingaDB::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, + const String& typeNameOverride) +{ + Type::Ptr type = object->GetReflectionType(); + Dictionary::Ptr attrs(new Dictionary); + + for (int fid = 0; fid < type->GetFieldCount(); fid++) { + Field field = type->GetFieldInfo(fid); + + if ((field.Attributes & fieldType) == 0) + continue; + + Value val = object->GetField(fid); + + /* hide attributes which shouldn't be user-visible */ + if (field.Attributes & FANoUserView) + continue; + + /* hide internal navigation fields */ + if (field.Attributes & FANavigation && !(field.Attributes & (FAConfig | FAState))) + continue; + + attrs->Set(field.Name, Serialize(val)); + } + + /* Downtimes require in_effect, which is not an attribute */ + Downtime::Ptr downtime = dynamic_pointer_cast(object); + if (downtime) { + attrs->Set("in_effect", Serialize(downtime->IsInEffect())); + attrs->Set("trigger_time", Serialize(TimestampToMilliseconds(downtime->GetTriggerTime()))); + } + + + /* Use the name checksum as unique key. */ + String typeName = type->GetName().ToLower(); + if (!typeNameOverride.IsEmpty()) + typeName = typeNameOverride.ToLower(); + + return {GetObjectIdentifier(object), JsonEncode(attrs)}; + //m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); +} + +void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object) +{ + auto checkable (dynamic_pointer_cast(object)); + + if (checkable) { + IcingaDB::StateChangeHandler(object, checkable->GetLastCheckResult(), checkable->GetStateType()); + } +} + +void IcingaDB::StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type) +{ + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue([rw, object, cr, type]() { rw->SendStatusUpdate(object, cr, type); }); + } +} + +void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object) +{ + Type::Ptr type = object->GetReflectionType(); + + if (object->IsActive()) { + // Create or update the object config + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { + if (rw) + rw->m_WorkQueue.Enqueue([rw, object]() { rw->SendConfigUpdate(object, true); }); + } + } else if (!object->IsActive() && + object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp + // Delete object config + for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType()) { + if (rw) + rw->m_WorkQueue.Enqueue([rw, object]() { rw->SendConfigDelete(object); }); + } + } +} + +void IcingaDB::DowntimeStartedHandler(const Downtime::Ptr& downtime) +{ + StateChangeHandler(downtime->GetCheckable()); + + for (auto& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue([rw, downtime]() { rw->SendStartedDowntime(downtime); }); + } +} + +void IcingaDB::DowntimeRemovedHandler(const Downtime::Ptr& downtime) +{ + StateChangeHandler(downtime->GetCheckable()); + + for (auto& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue([rw, downtime]() { rw->SendRemovedDowntime(downtime); }); + } +} + +void IcingaDB::NotificationSentToAllUsersHandler( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set& users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text +) +{ + auto rws (ConfigType::GetObjectsByType()); + + if (!rws.empty()) { + auto usersAmount (users.size()); + auto authorAndText (std::make_shared>(author, text)); + + for (auto& rw : rws) { + rw->m_WorkQueue.Enqueue([rw, notification, checkable, usersAmount, type, cr, authorAndText]() { + rw->SendSentNotification(notification, checkable, usersAmount, type, cr, authorAndText->first, authorAndText->second); + }); + } + } +} + +void IcingaDB::CommentAddedHandler(const Comment::Ptr& comment) +{ + for (auto& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue([rw, comment]() { rw->SendAddedComment(comment); }); + } +} + +void IcingaDB::CommentRemovedHandler(const Comment::Ptr& comment) +{ + for (auto& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue([rw, comment]() { rw->SendRemovedComment(comment); }); + } +} + +void IcingaDB::FlappingChangedHandler(const Checkable::Ptr& checkable, const Value& value) +{ + for (auto& rw : ConfigType::GetObjectsByType()) { + rw->m_WorkQueue.Enqueue([rw, checkable, value]() { rw->SendFlappingChanged(checkable, value); }); + } +} diff --git a/lib/icingadb/icingadb-stats.cpp b/lib/icingadb/icingadb-stats.cpp new file mode 100644 index 000000000..125b8fd44 --- /dev/null +++ b/lib/icingadb/icingadb-stats.cpp @@ -0,0 +1,43 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "base/json.hpp" +#include "base/logger.hpp" +#include "base/serializer.hpp" +#include "base/statsfunction.hpp" +#include "base/convert.hpp" + +using namespace icinga; + +Dictionary::Ptr IcingaDB::GetStats() +{ + Dictionary::Ptr stats = new Dictionary(); + + //TODO: Figure out if more stats can be useful here. + Namespace::Ptr statsFunctions = ScriptGlobal::Get("StatsFunctions", &Empty); + + if (!statsFunctions) + Dictionary::Ptr(); + + ObjectLock olock(statsFunctions); + + for (auto& kv : statsFunctions) + { + Function::Ptr func = kv.second->Get(); + + if (!func) + BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid status function name.")); + + Dictionary::Ptr status = new Dictionary(); + Array::Ptr perfdata = new Array(); + func->Invoke({ status, perfdata }); + + stats->Set(kv.first, new Dictionary({ + { "status", status }, + { "perfdata", Serialize(perfdata, FAState) } + })); + } + + return stats; +} + diff --git a/lib/icingadb/icingadb-utility.cpp b/lib/icingadb/icingadb-utility.cpp new file mode 100644 index 000000000..7ac1a1fe9 --- /dev/null +++ b/lib/icingadb/icingadb-utility.cpp @@ -0,0 +1,245 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "base/configtype.hpp" +#include "base/object-packer.hpp" +#include "base/logger.hpp" +#include "base/serializer.hpp" +#include "base/tlsutility.hpp" +#include "base/initialize.hpp" +#include "base/objectlock.hpp" +#include "base/array.hpp" +#include "base/scriptglobal.hpp" +#include "base/convert.hpp" +#include "base/json.hpp" +#include "icinga/customvarobject.hpp" +#include "icinga/checkcommand.hpp" +#include "icinga/notificationcommand.hpp" +#include "icinga/eventcommand.hpp" +#include "icinga/host.hpp" +#include +#include +#include +#include + +using namespace icinga; + +String IcingaDB::FormatCheckSumBinary(const String& str) +{ + char output[20*2+1]; + for (int i = 0; i < 20; i++) + sprintf(output + 2 * i, "%02x", str[i]); + + return output; +} + +String IcingaDB::FormatCommandLine(const Value& commandLine) +{ + String result; + if (commandLine.IsObjectType()) { + Array::Ptr args = commandLine; + bool first = true; + + ObjectLock olock(args); + for (const Value& arg : args) { + String token = "'" + Convert::ToString(arg) + "'"; + + if (first) + first = false; + else + result += String(1, ' '); + + result += token; + } + } else if (!commandLine.IsEmpty()) { + result = commandLine; + boost::algorithm::replace_all(result, "\'", "\\'"); + result = "'" + result + "'"; + } + + return result; +} + +String IcingaDB::GetEnvironment() +{ + return ConfigType::GetObjectsByType()[0]->GetEnvironment(); +} + +String IcingaDB::GetObjectIdentifier(const ConfigObject::Ptr& object) +{ + Type::Ptr type = object->GetReflectionType(); + + if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance) + return HashValue((Array::Ptr)new Array({GetEnvironment(), type->GetName(), object->GetName()})); + else + return HashValue((Array::Ptr)new Array({GetEnvironment(), object->GetName()})); +} + +String IcingaDB::CalculateCheckSumString(const String& str) +{ + return SHA1(str); +} + +String IcingaDB::CalculateCheckSumArray(const Array::Ptr& arr) +{ + /* Ensure that checksums happen in a defined order. */ + Array::Ptr tmpArr = arr->ShallowClone(); + + tmpArr->Sort(); + + return SHA1(PackObject(tmpArr)); +} + +String IcingaDB::CalculateCheckSumProperties(const ConfigObject::Ptr& object, const std::set& propertiesBlacklist) +{ + //TODO: consider precision of 6 for double values; use specific config fields for hashing? + return HashValue(object, propertiesBlacklist); +} + +static const std::set metadataWhitelist ({"package", "source_location", "templates"}); + +String IcingaDB::CalculateCheckSumMetadata(const ConfigObject::Ptr& object) +{ + return HashValue(object, metadataWhitelist, true); +} + +String IcingaDB::CalculateCheckSumVars(const CustomVarObject::Ptr& object) +{ + Dictionary::Ptr vars = object->GetVars(); + + if (!vars) + return HashValue(Empty); + + return HashValue(vars); +} + +/** + * Prepare object's custom vars for being written to Redis + * + * object.vars = { + * "disks": { + * "disk": {}, + * "disk /": { + * "disk_partitions": "/" + * } + * } + * } + * + * return { + * SHA1(PackObject([ + * Environment, + * "disks", + * { + * "disk": {}, + * "disk /": { + * "disk_partitions": "/" + * } + * } + * ])): { + * "envId": SHA1(Environment), + * "name_checksum": SHA1("disks"), + * "name": "disks", + * "value": { + * "disk": {}, + * "disk /": { + * "disk_partitions": "/" + * } + * } + * } + * } + * + * @param object Config object with custom vars + * + * @return JSON-like data structure for Redis + */ +Dictionary::Ptr IcingaDB::SerializeVars(const CustomVarObject::Ptr& object) +{ + Dictionary::Ptr vars = object->GetVars(); + + if (!vars) + return nullptr; + + Dictionary::Ptr res = new Dictionary(); + auto env (GetEnvironment()); + auto envChecksum (SHA1(env)); + + ObjectLock olock(vars); + + for (auto& kv : vars) { + res->Set( + SHA1(PackObject((Array::Ptr)new Array({env, kv.first, kv.second}))), + (Dictionary::Ptr)new Dictionary({ + {"environment_id", envChecksum}, + {"name_checksum", SHA1(kv.first)}, + {"name", kv.first}, + {"value", JsonEncode(kv.second)}, + }) + ); + } + + return res; +} + +static const std::set propertiesBlacklistEmpty; + +String IcingaDB::HashValue(const Value& value) +{ + return HashValue(value, propertiesBlacklistEmpty); +} + +String IcingaDB::HashValue(const Value& value, const std::set& propertiesBlacklist, bool propertiesWhitelist) +{ + Value temp; + bool mutabl; + + Type::Ptr type = value.GetReflectionType(); + + if (ConfigObject::TypeInstance->IsAssignableFrom(type)) { + temp = Serialize(value, FAConfig); + mutabl = true; + } else { + temp = value; + mutabl = false; + } + + if (propertiesBlacklist.size() && temp.IsObject()) { + Dictionary::Ptr dict = dynamic_pointer_cast((Object::Ptr)temp); + + if (dict) { + if (!mutabl) + dict = dict->ShallowClone(); + + ObjectLock olock(dict); + + if (propertiesWhitelist) { + auto current = dict->Begin(); + auto propertiesBlacklistEnd = propertiesBlacklist.end(); + + while (current != dict->End()) { + if (propertiesBlacklist.find(current->first) == propertiesBlacklistEnd) { + dict->Remove(current++); + } else { + ++current; + } + } + } else { + for (auto& property : propertiesBlacklist) + dict->Remove(property); + } + + if (!mutabl) + temp = dict; + } + } + + return SHA1(PackObject(temp)); +} + +String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj) +{ + return obj->GetReflectionType()->GetName().ToLower(); +} + +long long IcingaDB::TimestampToMilliseconds(double timestamp) { + return static_cast(timestamp * 1000); +} diff --git a/lib/icingadb/icingadb.cpp b/lib/icingadb/icingadb.cpp new file mode 100644 index 000000000..7964b414d --- /dev/null +++ b/lib/icingadb/icingadb.cpp @@ -0,0 +1,350 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/icingadb.hpp" +#include "icingadb/icingadb-ti.cpp" +#include "icingadb/redisconnection.hpp" +#include "remote/eventqueue.hpp" +#include "base/json.hpp" +#include "icinga/checkable.hpp" +#include "icinga/host.hpp" +#include +#include +#include + +using namespace icinga; + +#define MAX_EVENTS_DEFAULT 5000 + +REGISTER_TYPE(IcingaDB); + +IcingaDB::IcingaDB() + : m_Rcon(nullptr) +{ + m_Rcon = nullptr; + + m_WorkQueue.SetName("IcingaDB"); + + m_PrefixConfigObject = "icinga:config:"; + m_PrefixConfigCheckSum = "icinga:checksum:"; + m_PrefixStateObject = "icinga:config:state:"; +} + +/** + * Starts the component. + */ +void IcingaDB::Start(bool runtimeCreated) +{ + ObjectImpl::Start(runtimeCreated); + + Log(LogInformation, "IcingaDB") + << "'" << GetName() << "' started."; + + m_ConfigDumpInProgress = false; + m_ConfigDumpDone = false; + + m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex()); + m_Rcon->Start(); + + m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); }); + + m_ReconnectTimer = new Timer(); + m_ReconnectTimer->SetInterval(15); + m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); }); + m_ReconnectTimer->Start(); + m_ReconnectTimer->Reschedule(0); + + m_SubscriptionTimer = new Timer(); + m_SubscriptionTimer->SetInterval(15); + m_SubscriptionTimer->OnTimerExpired.connect([this](const Timer * const&) { UpdateSubscriptionsTimerHandler(); }); + m_SubscriptionTimer->Start(); + + m_StatsTimer = new Timer(); + m_StatsTimer->SetInterval(1); + m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); }); + m_StatsTimer->Start(); + + m_WorkQueue.SetName("IcingaDB"); + + boost::thread thread(&IcingaDB::HandleEvents, this); + thread.detach(); + +} + +void IcingaDB::ExceptionHandler(boost::exception_ptr exp) +{ + Log(LogCritical, "IcingaDB", "Exception during redis query. Verify that Redis is operational."); + + Log(LogDebug, "IcingaDB") + << "Exception during redis operation: " << DiagnosticInformation(exp); +} + +void IcingaDB::ReconnectTimerHandler() +{ + m_WorkQueue.Enqueue([this]() { TryToReconnect(); }); +} + +void IcingaDB::TryToReconnect() +{ + AssertOnWorkQueue(); + + if (m_ConfigDumpDone) + return; + else + m_Rcon->Start(); + + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + UpdateSubscriptions(); + + if (m_ConfigDumpInProgress || m_ConfigDumpDone) + return; + + /* Config dump */ + m_ConfigDumpInProgress = true; + PublishStats(); + + UpdateAllConfigObjects(); + + m_ConfigDumpDone = true; + + m_ConfigDumpInProgress = false; +} + +void IcingaDB::UpdateSubscriptionsTimerHandler() +{ + m_WorkQueue.Enqueue([this]() { UpdateSubscriptions(); }); +} + +void IcingaDB::UpdateSubscriptions() +{ + AssertOnWorkQueue(); + + Log(LogNotice, "IcingaDB", "Updating Redis subscriptions"); + + /* TODO: + * Silently return in this case. Usually the RedisConnection checks for connectivity and logs in failure case. + * But since we expect and answer here and break Icinga in case of receiving no answer/an unexpected one we opt for + * better safe than sorry here. Future implementation needs to include an improved error handling and answer verification. + */ + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + String cursor = "0"; + String keyPrefix = "icinga:subscription:"; + + do { + Array::Ptr reply = m_Rcon->GetResultOfQuery({ "SCAN", cursor, "MATCH", keyPrefix + "*", "COUNT", "1000" }); + VERIFY(reply->GetLength() % 2u == 0u); + + cursor = reply->Get(0); + + Array::Ptr keys = reply->Get(1); + ObjectLock oLock (keys); + + for (String key : keys) { + if (boost::algorithm::ends_with(key, ":limit")) + continue; + + RedisSubscriptionInfo rsi; + + if (!IcingaDB::GetSubscriptionTypes(key, rsi)) { + Log(LogInformation, "IcingaDB") + << "Subscription \"" << key << "\" has no types listed."; + } else { + m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi; + } + } + } while (cursor != "0"); + + Log(LogNotice, "IcingaDB") + << "Current Redis event subscriptions: " << m_Subscriptions.size(); +} + +bool IcingaDB::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi) +{ + try { + Array::Ptr redisReply = m_Rcon->GetResultOfQuery({ "SMEMBERS", key }); + + if (redisReply->GetLength() == 0) + return false; + + { + ObjectLock oLock (redisReply); + + for (String member : redisReply) { + rsi.EventTypes.insert(member); + } + } + + Log(LogInformation, "IcingaDB") + << "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes)); + + } catch (const std::exception& ex) { + Log(LogWarning, "IcingaDB") + << "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex); + + return false; + } + + return true; +} + +void IcingaDB::PublishStatsTimerHandler(void) +{ + m_WorkQueue.Enqueue([this]() { PublishStats(); }); +} + +void IcingaDB::PublishStats() +{ + AssertOnWorkQueue(); + + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + Dictionary::Ptr status = GetStats(); + status->Set("config_dump_in_progress", m_ConfigDumpInProgress); + String jsonStats = JsonEncode(status); + + m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }, true); +} + +void IcingaDB::HandleEvents() +{ + String queueName = Utility::NewUniqueID(); + EventQueue::Ptr queue = new EventQueue(queueName); + EventQueue::Register(queueName, queue); + + std::set types; + types.insert("CheckResult"); + types.insert("StateChange"); + types.insert("Notification"); + types.insert("AcknowledgementSet"); + types.insert("AcknowledgementCleared"); + types.insert("CommentAdded"); + types.insert("CommentRemoved"); + types.insert("DowntimeAdded"); + types.insert("DowntimeRemoved"); + types.insert("DowntimeStarted"); + types.insert("DowntimeTriggered"); + + queue->SetTypes(types); + + queue->AddClient(this); + + for (;;) { + Dictionary::Ptr event = queue->WaitForEvent(this); + + if (!event) + continue; + + m_WorkQueue.Enqueue([this, event]() { SendEvent(event); }); + } + + queue->RemoveClient(this); + EventQueue::UnregisterIfUnused(queueName, queue); +} + +void IcingaDB::HandleEvent(const Dictionary::Ptr& event) +{ + AssertOnWorkQueue(); + + for (const std::pair& kv : m_Subscriptions) { + const auto& name = kv.first; + const auto& rsi = kv.second; + + if (rsi.EventTypes.find(event->Get("type")) == rsi.EventTypes.end()) + continue; + + String body = JsonEncode(event); + + double maxExists = m_Rcon->GetResultOfQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" }); + + long maxEvents = MAX_EVENTS_DEFAULT; + if (maxExists != 0) { + String redisReply = m_Rcon->GetResultOfQuery({ "GET", "icinga:subscription:" + name + ":limit"}); + + Log(LogInformation, "IcingaDB") + << "Got limit " << redisReply << " for " << name; + + maxEvents = Convert::ToLong(redisReply); + } + + m_Rcon->FireAndForgetQueries({ + { "MULTI" }, + { "LPUSH", "icinga:event:" + name, body }, + { "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)}, + { "EXEC" }}); + } +} + +void IcingaDB::SendEvent(const Dictionary::Ptr& event) +{ + AssertOnWorkQueue(); + + if (!m_Rcon || !m_Rcon->IsConnected()) + return; + + String type = event->Get("type"); + + if (type == "CheckResult") { + Checkable::Ptr checkable; + + if (event->Contains("service")) { + checkable = Service::GetByNamePair(event->Get("host"), event->Get("service")); + } else { + checkable = Host::GetByName(event->Get("host")); + } + + // Update State for icingaweb + m_WorkQueue.Enqueue([this, checkable]() { UpdateState(checkable); }); + } + + if (type.Contains("Acknowledgement")) { + Checkable::Ptr checkable; + + if (event->Contains("service")) { + checkable = Service::GetByNamePair(event->Get("host"), event->Get("service")); + event->Set("service_id", GetObjectIdentifier(checkable)); + } else { + checkable = Host::GetByName(event->Get("host")); + event->Set("host_id", GetObjectIdentifier(checkable)); + } + + if (type == "AcknowledgementSet") { + Timestamp entry = 0; + Comment::Ptr AckComment; + + for (const Comment::Ptr& c : checkable->GetComments()) { + if (c->GetEntryType() == CommentAcknowledgement) { + if (c->GetEntryTime() > entry) { + entry = c->GetEntryTime(); + AckComment = c; + StateChangeHandler(checkable); + } + } + } + + event->Set("comment_id", GetObjectIdentifier(AckComment)); + } + } + + String body = JsonEncode(event); + + m_Rcon->FireAndForgetQueries({ + { "PUBLISH", "icinga:event:all", body }, + { "PUBLISH", "icinga:event:" + event->Get("type"), body }}); +} + +void IcingaDB::Stop(bool runtimeRemoved) +{ + Log(LogInformation, "IcingaDB") + << "'" << GetName() << "' stopped."; + + ObjectImpl::Stop(runtimeRemoved); +} + +void IcingaDB::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} diff --git a/lib/icingadb/icingadb.hpp b/lib/icingadb/icingadb.hpp new file mode 100644 index 000000000..4ebe26409 --- /dev/null +++ b/lib/icingadb/icingadb.hpp @@ -0,0 +1,141 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef ICINGADB_H +#define ICINGADB_H + +#include "icingadb/icingadb-ti.hpp" +#include "icingadb/redisconnection.hpp" +#include "base/timer.hpp" +#include "base/workqueue.hpp" +#include "icinga/customvarobject.hpp" +#include "icinga/checkable.hpp" +#include "icinga/service.hpp" +#include "icinga/downtime.hpp" +#include "remote/messageorigin.hpp" +#include + +namespace icinga +{ + +struct RedisSubscriptionInfo +{ + std::set EventTypes; +}; + +/** + * @ingroup icingadb + */ +class IcingaDB : public ObjectImpl +{ +public: + DECLARE_OBJECT(IcingaDB); + DECLARE_OBJECTNAME(IcingaDB); + + IcingaDB(); + + static void ConfigStaticInitialize(); + + virtual void Start(bool runtimeCreated) override; + virtual void Stop(bool runtimeRemoved) override; + +private: + void ReconnectTimerHandler(); + void TryToReconnect(); + void HandleEvents(); + void HandleEvent(const Dictionary::Ptr& event); + void SendEvent(const Dictionary::Ptr& event); + + void UpdateSubscriptionsTimerHandler(); + void UpdateSubscriptions(); + bool GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi); + void PublishStatsTimerHandler(); + void PublishStats(); + + /* config & status dump */ + void UpdateAllConfigObjects(); + std::vector>> ChunkObjects(std::vector> objects, size_t chunkSize); + void DeleteKeys(const std::vector& keys); + std::vector GetTypeObjectKeys(const String& type); + void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map>& hMSets, + std::map>& publishes, bool runtimeUpdate); + void UpdateState(const Checkable::Ptr& checkable); + void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate); + void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map>& hMSets, + std::map>& publishes, bool runtimeUpdate); + void SendConfigDelete(const ConfigObject::Ptr& object); + void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type); + + void SendSentNotification( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text + ); + + void SendStartedDowntime(const Downtime::Ptr& downtime); + void SendRemovedDowntime(const Downtime::Ptr& downtime); + void SendAddedComment(const Comment::Ptr& comment); + void SendRemovedComment(const Comment::Ptr& comment); + void SendFlappingChanged(const Checkable::Ptr& checkable, const Value& value); + + std::vector UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride); + Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable); + + /* Stats */ + Dictionary::Ptr GetStats(); + + /* utilities */ + static String FormatCheckSumBinary(const String& str); + static String FormatCommandLine(const Value& commandLine); + static long long TimestampToMilliseconds(double timestamp); + + static String GetObjectIdentifier(const ConfigObject::Ptr& object); + static String GetEnvironment(); + static String CalculateCheckSumString(const String& str); + static String CalculateCheckSumArray(const Array::Ptr& arr); + static String CalculateCheckSumProperties(const ConfigObject::Ptr& object, const std::set& propertiesBlacklist); + static String CalculateCheckSumMetadata(const ConfigObject::Ptr& object); + static String CalculateCheckSumVars(const CustomVarObject::Ptr& object); + static Dictionary::Ptr SerializeVars(const CustomVarObject::Ptr& object); + + static String HashValue(const Value& value); + static String HashValue(const Value& value, const std::set& propertiesBlacklist, bool propertiesWhitelist = false); + + static String GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj); + static bool PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checkSums); + + static void StateChangeHandler(const ConfigObject::Ptr& object); + static void StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type); + static void VersionChangedHandler(const ConfigObject::Ptr& object); + static void DowntimeStartedHandler(const Downtime::Ptr& downtime); + static void DowntimeRemovedHandler(const Downtime::Ptr& downtime); + + static void NotificationSentToAllUsersHandler( + const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set& users, + NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text + ); + + static void CommentAddedHandler(const Comment::Ptr& comment); + static void CommentRemovedHandler(const Comment::Ptr& comment); + static void FlappingChangedHandler(const Checkable::Ptr& checkable, const Value& value); + + void AssertOnWorkQueue(); + + void ExceptionHandler(boost::exception_ptr exp); + + Timer::Ptr m_StatsTimer; + Timer::Ptr m_ReconnectTimer; + Timer::Ptr m_SubscriptionTimer; + WorkQueue m_WorkQueue; + std::map m_Subscriptions; + + String m_PrefixConfigObject; + String m_PrefixConfigCheckSum; + String m_PrefixStateObject; + + bool m_ConfigDumpInProgress; + bool m_ConfigDumpDone; + + RedisConnection::Ptr m_Rcon; +}; +} + +#endif /* ICINGADB_H */ diff --git a/lib/icingadb/icingadb.ti b/lib/icingadb/icingadb.ti new file mode 100644 index 000000000..7ffc80b09 --- /dev/null +++ b/lib/icingadb/icingadb.ti @@ -0,0 +1,23 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library icingadb; + +namespace icinga +{ + +class IcingaDB : ConfigObject +{ + [config] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config] int port { + default {{{ return 6379; }}} + }; + [config] String path; + [config] String password; + [config] int db_index; +}; + +} diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp new file mode 100644 index 000000000..243644bf4 --- /dev/null +++ b/lib/icingadb/redisconnection.cpp @@ -0,0 +1,410 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "icingadb/redisconnection.hpp" +#include "base/array.hpp" +#include "base/convert.hpp" +#include "base/defer.hpp" +#include "base/io-engine.hpp" +#include "base/logger.hpp" +#include "base/objectlock.hpp" +#include "base/string.hpp" +#include "base/tcpsocket.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace icinga; +namespace asio = boost::asio; + +RedisConnection::RedisConnection(const String& host, const int port, const String& path, const String& password, const int db) : + RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db) +{ +} + +RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db) + : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db), + m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io) +{ +} + +void RedisConnection::Start() +{ + if (!m_Started.exchange(true)) { + Ptr keepAlive (this); + + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); + } + + if (!m_Connecting.exchange(true)) { + Ptr keepAlive (this); + + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); + } +} + +bool RedisConnection::IsConnected() { + return m_Connected.load(); +} + +static inline +void LogQuery(RedisConnection::Query& query, Log& msg) +{ + int i = 0; + + for (auto& arg : query) { + if (++i == 8) { + msg << " ..."; + break; + } + + msg << " '" << arg << '\''; + } +} + +void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool highPrio) +{ + { + Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:"); + LogQuery(query, msg); + } + + auto item (std::make_shared(std::move(query))); + + asio::post(m_Strand, [this, item, highPrio]() { + (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{item, nullptr, nullptr, nullptr}); + m_QueuedWrites.Set(); + }); +} + +void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, bool highPrio) +{ + for (auto& query : queries) { + Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:"); + LogQuery(query, msg); + } + + auto item (std::make_shared(std::move(queries))); + + asio::post(m_Strand, [this, item, highPrio]() { + (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, item, nullptr, nullptr}); + m_QueuedWrites.Set(); + }); +} + +RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, bool highPrio) +{ + { + Log msg (LogNotice, "IcingaDB", "Executing query:"); + LogQuery(query, msg); + } + + std::promise promise; + auto future (promise.get_future()); + auto item (std::make_shared(std::move(query), std::move(promise))); + + asio::post(m_Strand, [this, item, highPrio]() { + (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, item, nullptr}); + m_QueuedWrites.Set(); + }); + + item = nullptr; + future.wait(); + return future.get(); +} + +RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, bool highPrio) +{ + for (auto& query : queries) { + Log msg (LogNotice, "IcingaDB", "Executing query:"); + LogQuery(query, msg); + } + + std::promise promise; + auto future (promise.get_future()); + auto item (std::make_shared(std::move(queries), std::move(promise))); + + asio::post(m_Strand, [this, item, highPrio]() { + (highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, nullptr, item}); + m_QueuedWrites.Set(); + }); + + item = nullptr; + future.wait(); + return future.get(); +} + +void RedisConnection::Connect(asio::yield_context& yc) +{ + Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); }); + + try { + if (m_Path.IsEmpty()) { + Log(LogInformation, "IcingaDB") + << "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'"; + + decltype(m_TcpConn) conn (new TcpConn(m_Strand.context())); + icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); + m_TcpConn = std::move(conn); + } else { + Log(LogInformation, "IcingaDB") + << "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'"; + + decltype(m_UnixConn) conn (new UnixConn(m_Strand.context())); + conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); + m_UnixConn = std::move(conn); + } + + m_Connected.store(true); + + Log(LogInformation, "IcingaDB", "Connected to Redis server"); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what(); + } +} + +void RedisConnection::ReadLoop(asio::yield_context& yc) +{ + for (;;) { + m_QueuedReads.Wait(yc); + + while (!m_Queues.FutureResponseActions.empty()) { + auto item (std::move(m_Queues.FutureResponseActions.front())); + m_Queues.FutureResponseActions.pop(); + + switch (item.Action) { + case ResponseAction::Ignore: + try { + for (auto i (item.Amount); i; --i) { + ReadOne(yc); + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log(LogCritical, "IcingaDB") + << "Error during receiving the response to a query which has been fired and forgotten: " << ex.what(); + + continue; + } catch (...) { + Log(LogCritical, "IcingaDB") + << "Error during receiving the response to a query which has been fired and forgotten"; + + continue; + } + + break; + case ResponseAction::Deliver: + for (auto i (item.Amount); i; --i) { + auto promise (std::move(m_Queues.ReplyPromises.front())); + m_Queues.ReplyPromises.pop(); + + Reply reply; + + try { + reply = ReadOne(yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + promise.set_exception(std::current_exception()); + + continue; + } + + promise.set_value(std::move(reply)); + } + + break; + case ResponseAction::DeliverBulk: + { + auto promise (std::move(m_Queues.RepliesPromises.front())); + m_Queues.RepliesPromises.pop(); + + Replies replies; + replies.reserve(item.Amount); + + for (auto i (item.Amount); i; --i) { + try { + replies.emplace_back(ReadOne(yc)); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + promise.set_exception(std::current_exception()); + + continue; + } + } + + promise.set_value(std::move(replies)); + } + } + } + + m_QueuedReads.Clear(); + } +} + +void RedisConnection::WriteLoop(asio::yield_context& yc) +{ + for (;;) { + m_QueuedWrites.Wait(yc); + + for (;;) { + if (m_Queues.HighPrioWrites.empty()) { + if (m_Queues.Writes.empty()) { + break; + } else { + auto next (std::move(m_Queues.Writes.front())); + m_Queues.Writes.pop(); + + WriteItem(yc, std::move(next)); + } + } else { + auto next (std::move(m_Queues.HighPrioWrites.front())); + m_Queues.HighPrioWrites.pop(); + + WriteItem(yc, std::move(next)); + } + } + + m_QueuedWrites.Clear(); + } +} + +void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next) +{ + if (next.FireAndForgetQuery) { + auto& item (*next.FireAndForgetQuery); + + try { + WriteOne(item, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item, msg); + msg << " which has been fired and forgotten: " << ex.what(); + + return; + } catch (...) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item, msg); + msg << " which has been fired and forgotten"; + + return; + } + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; + } + + m_QueuedReads.Set(); + } + + if (next.FireAndForgetQueries) { + auto& item (*next.FireAndForgetQueries); + size_t i = 0; + + try { + for (auto& query : item) { + WriteOne(query, yc); + ++i; + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (const std::exception& ex) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item[i], msg); + msg << " which has been fired and forgotten: " << ex.what(); + + return; + } catch (...) { + Log msg (LogCritical, "IcingaDB", "Error during sending query"); + LogQuery(item[i], msg); + msg << " which has been fired and forgotten"; + + return; + } + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore}); + } else { + m_Queues.FutureResponseActions.back().Amount += item.size(); + } + + m_QueuedReads.Set(); + } + + if (next.GetResultOfQuery) { + auto& item (*next.GetResultOfQuery); + + try { + WriteOne(item.first, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + item.second.set_exception(std::current_exception()); + + return; + } + + m_Queues.ReplyPromises.emplace(std::move(item.second)); + + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { + m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver}); + } else { + ++m_Queues.FutureResponseActions.back().Amount; + } + + m_QueuedReads.Set(); + } + + if (next.GetResultsOfQueries) { + auto& item (*next.GetResultsOfQueries); + + try { + for (auto& query : item.first) { + WriteOne(query, yc); + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + item.second.set_exception(std::current_exception()); + + return; + } + + m_Queues.RepliesPromises.emplace(std::move(item.second)); + m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk}); + + m_QueuedReads.Set(); + } +} + +RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) +{ + if (m_Path.IsEmpty()) { + return ReadOne(m_TcpConn, yc); + } else { + return ReadOne(m_UnixConn, yc); + } +} + +void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) +{ + if (m_Path.IsEmpty()) { + WriteOne(m_TcpConn, query, yc); + } else { + WriteOne(m_UnixConn, query, yc); + } +} diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp new file mode 100644 index 000000000..9f277f591 --- /dev/null +++ b/lib/icingadb/redisconnection.hpp @@ -0,0 +1,398 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef REDISCONNECTION_H +#define REDISCONNECTION_H + +#include "base/array.hpp" +#include "base/atomic.hpp" +#include "base/io-engine.hpp" +#include "base/object.hpp" +#include "base/string.hpp" +#include "base/value.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace icinga +{ +/** + * An Async Redis connection. + * + * @ingroup icingadb + */ + class RedisConnection final : public Object + { + public: + DECLARE_PTR_TYPEDEFS(RedisConnection); + + typedef std::vector Query; + typedef std::vector Queries; + typedef Value Reply; + typedef std::vector Replies; + + RedisConnection(const String& host, const int port, const String& path, + const String& password = "", const int db = 0); + + void Start(); + + bool IsConnected(); + + void FireAndForgetQuery(Query query, bool highPrio = false); + void FireAndForgetQueries(Queries queries, bool highPrio = false); + + Reply GetResultOfQuery(Query query, bool highPrio = false); + Replies GetResultsOfQueries(Queries queries, bool highPrio = false); + + private: + enum class ResponseAction : unsigned char + { + Ignore, Deliver, DeliverBulk + }; + + struct FutureResponseAction + { + size_t Amount; + ResponseAction Action; + }; + + struct WriteQueueItem + { + std::shared_ptr FireAndForgetQuery; + std::shared_ptr FireAndForgetQueries; + std::shared_ptr>> GetResultOfQuery; + std::shared_ptr>> GetResultsOfQueries; + }; + + typedef boost::asio::ip::tcp Tcp; + typedef boost::asio::local::stream_protocol Unix; + + typedef boost::asio::buffered_stream TcpConn; + typedef boost::asio::buffered_stream UnixConn; + + template + static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); + + template + static std::vector ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); + + template + static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); + + template + static void WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc); + + RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db); + + void Connect(boost::asio::yield_context& yc); + void ReadLoop(boost::asio::yield_context& yc); + void WriteLoop(boost::asio::yield_context& yc); + void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); + Reply ReadOne(boost::asio::yield_context& yc); + void WriteOne(Query& query, boost::asio::yield_context& yc); + + template + Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); + + template + void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); + + String m_Path; + String m_Host; + int m_Port; + String m_Password; + int m_DbIndex; + + boost::asio::io_context::strand m_Strand; + std::shared_ptr m_TcpConn; + std::shared_ptr m_UnixConn; + Atomic m_Connecting, m_Connected, m_Started; + + struct { + std::queue Writes, HighPrioWrites; + std::queue> ReplyPromises; + std::queue> RepliesPromises; + std::queue FutureResponseActions; + } m_Queues; + + AsioConditionVariable m_QueuedWrites, m_QueuedReads; + }; + +class RedisError final : public Object +{ +public: + DECLARE_PTR_TYPEDEFS(RedisError); + + inline RedisError(String message) : m_Message(std::move(message)) + { + } + + inline const String& GetMessage() + { + return m_Message; + } + +private: + String m_Message; +}; + +class RedisDisconnected : public std::runtime_error +{ +public: + inline RedisDisconnected() : runtime_error("") + { + } +}; + +class RedisProtocolError : public std::runtime_error +{ +protected: + inline RedisProtocolError() : runtime_error("") + { + } +}; + +class BadRedisType : public RedisProtocolError +{ +public: + inline BadRedisType(char type) : m_What{type, 0} + { + } + + virtual const char * what() const noexcept override + { + return m_What; + } + +private: + char m_What[2]; +}; + +class BadRedisInt : public RedisProtocolError +{ +public: + inline BadRedisInt(std::vector intStr) : m_What(std::move(intStr)) + { + m_What.emplace_back(0); + } + + virtual const char * what() const noexcept override + { + return m_What.data(); + } + +private: + std::vector m_What; +}; + +template +RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) +{ + if (!stream) { + throw RedisDisconnected(); + } + + auto strm (stream); + + try { + return ReadRESP(*strm, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + if (m_Connecting.exchange(false)) { + m_Connected.store(false); + stream = nullptr; + } + + throw; + } +} + +template +void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) +{ + if (!stream) { + throw RedisDisconnected(); + } + + auto strm (stream); + + try { + WriteRESP(*strm, query, yc); + strm->async_flush(yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + if (m_Connecting.exchange(false)) { + m_Connected.store(false); + stream = nullptr; + } + + throw; + } +} + +template +Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + char type = 0; + asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); + + switch (type) { + case '+': + { + auto buf (ReadLine(stream, yc)); + return String(buf.begin(), buf.end()); + } + case '-': + { + auto buf (ReadLine(stream, yc)); + return new RedisError(String(buf.begin(), buf.end())); + } + case ':': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + return (double)i; + } + case '$': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + if (i < 0) { + return Value(); + } + + buf.clear(); + buf.insert(buf.end(), i, 0); + asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); + + { + char crlf[2]; + asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); + } + + return String(buf.begin(), buf.end()); + } + case '*': + { + auto buf (ReadLine(stream, yc, 21)); + intmax_t i = 0; + + try { + i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); + } catch (...) { + throw BadRedisInt(std::move(buf)); + } + + Array::Ptr arr = new Array(); + + if (i < 0) { + i = 0; + } + + arr->Reserve(i); + + for (; i; --i) { + arr->Add(ReadRESP(stream, yc)); + } + + return arr; + } + default: + throw BadRedisType(type); + } +} + +template +std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) +{ + namespace asio = boost::asio; + + std::vector line; + line.reserve(hint); + + char next = 0; + asio::mutable_buffer buf (&next, 1); + + for (;;) { + asio::async_read(stream, buf, yc); + + if (next == '\r') { + asio::async_read(stream, buf, yc); + return std::move(line); + } + + line.emplace_back(next); + } +} + +template +void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + asio::async_write(stream, asio::const_buffer("*", 1), yc); + WriteInt(stream, query.size(), yc); + asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); + + for (auto& arg : query) { + asio::async_write(stream, asio::const_buffer("$", 1), yc); + + WriteInt(stream, arg.GetLength(), yc); + + asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); + asio::async_write(stream, asio::const_buffer(arg.CStr(), arg.GetLength()), yc); + asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); + } +} + +template +void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc) +{ + namespace asio = boost::asio; + + char buf[21] = {}; + sprintf(buf, "%jd", i); + + asio::async_write(stream, asio::const_buffer(buf, strlen(buf)), yc); +} + +} + +#endif //REDISCONNECTION_H diff --git a/lib/remote/eventqueue.cpp b/lib/remote/eventqueue.cpp index 1125a4543..3c7504e6c 100644 --- a/lib/remote/eventqueue.cpp +++ b/lib/remote/eventqueue.cpp @@ -90,6 +90,25 @@ void EventQueue::SetFilter(std::unique_ptr filter) m_Filter.swap(filter); } +Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout) +{ + boost::mutex::scoped_lock lock(m_Mutex); + + for (;;) { + auto it = m_Events.find(client); + ASSERT(it != m_Events.end()); + + if (!it->second.empty()) { + Dictionary::Ptr result = *it->second.begin(); + it->second.pop_front(); + return result; + } + + if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(long(timeout * 1000)))) + return nullptr; + } +} + std::vector EventQueue::GetQueuesForType(const String& type) { EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems(); diff --git a/lib/remote/eventqueue.hpp b/lib/remote/eventqueue.hpp index c8317cb74..33013836e 100644 --- a/lib/remote/eventqueue.hpp +++ b/lib/remote/eventqueue.hpp @@ -36,6 +36,8 @@ public: void SetTypes(const std::set& types); void SetFilter(std::unique_ptr filter); + Dictionary::Ptr WaitForEvent(void *client, double timeout = 5); + static std::vector GetQueuesForType(const String& type); static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);