Merge pull request #7601 from Icinga/feature/icingadb

Add IcingaDB
This commit is contained in:
Michael Friedrich 2019-11-02 18:39:13 +01:00 committed by GitHub
commit b6e0abc64c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 3579 additions and 57 deletions

View File

@ -22,6 +22,13 @@ option(ICINGA2_WITH_NOTIFICATION "Build the notification module" ON)
option(ICINGA2_WITH_PERFDATA "Build the perfdata module" ON)
option(ICINGA2_WITH_TESTS "Run unit tests" ON)
# IcingaDB only is supported on modern Linux/Unix master systems
if(NOT WIN32)
option(ICINGA2_WITH_ICINGADB "Build the IcingaDB module" ON)
else()
option(ICINGA2_WITH_ICINGADB "Build the IcingaDB module" OFF)
endif()
option (USE_SYSTEMD
"Configure icinga as native systemd service instead of a SysV initscript" OFF)
@ -203,7 +210,6 @@ if(HAVE_SYSTEMD)
list(APPEND base_DEPS systemd)
endif()
if(EDITLINE_FOUND)
list(APPEND base_DEPS ${EDITLINE_LIBRARIES})
include_directories(${EDITLINE_INCLUDE_DIR})

View File

@ -1379,6 +1379,30 @@ Configuration Attributes:
vars | Dictionary | **Optional.** A dictionary containing custom variables that are available globally.
environment | String | **Optional.** Specify the Icinga environment. This overrides the `Environment` constant specified in the configuration or on the CLI with `--define`. Defaults to empty.
### IcingaDB <a id="objecttype-icingadb"></a>
The IcingaDB object implements the [icingadb feauture](14-features.md#core-backends-icingadb).
Example:
```
object IcingaDB "icingadb" {
//host = "127.0.0.1"
//port = 6379
//password = "xxx"
}
```
Configuration Attributes:
Name | Type | Description
--------------------------|-----------------------|----------------------------------
host | String | **Optional.** Redis host for IcingaDB. Defaults to `127.0.0.1`.
port | Number | **Optional.** Redis port for IcingaDB. Defaults to `6379`.
path | String | **Optional.** Redix unix socket path. Can be used instead of `host` and `port` attributes.
password | String | **Optional.** Redis auth password for IcingaDB.
### IdoMySqlConnection <a id="objecttype-idomysqlconnection"></a>
IDO database adapter for MySQL.

View File

@ -44,6 +44,30 @@ By default, log files will be rotated daily.
The REST API is documented [here](12-icinga2-api.md#icinga2-api) as a core feature.
### Icinga DB <a id="core-backends-icingadb"></a>
Icinga DB provides a new core backend and aims to replace the IDO backend
output. It consists of different components:
* Icinga 2 provides the `icingadb` feature which stores monitoring data in a memory database
* The [IcingaDB service](https://github.com/icinga/icingadb) collects and synchronizes monitoring data into its backend
* Icinga Web reads monitoring data from the new IcingaDB backend
Requirements:
* Local Redis instance
* MySQL/MariaDB server with `icingadb` database, user and schema imports
* Icinga 2's `icingadb` feature enabled
* IcingaDB service requires Redis and MySQL/MariaDB server
* Icinga Web module
> TODO: Detailed instructions.
```
icinga2 feature enable icingadb
```
### IDO Database (DB IDO) <a id="db-ido"></a>
The IDO (Icinga Data Output) feature for Icinga 2 takes care of exporting all

View File

@ -1626,9 +1626,9 @@ it is valid to just sync their zones via the config sync.
The following restores the Zone/Endpoint objects as config objects outside of `zones.d`
in your master/satellite's zones.conf with rendering them as external objects in the Director.
[Example](06-distributed-monitoring.md#three-levels-with-masters-satellites-and-agents)
[Example](06-distributed-monitoring.md#distributed-monitoring-scenarios-master-satellite-agents)
for a 3 level setup with the masters and satellites knowing about the zone hierarchy
outside defined in [zones.conf](#zones-conf):
outside defined in [zones.conf](04-configuration.md#zones-conf):
```
object Endpoint "icinga-master1.localdomain" {

View File

@ -188,7 +188,7 @@ being set but no `zone` defined.
The most convenient way with e.g. managing the objects in `conf.d`
is to move them into the `master` zone. Please continue in the
[troubleshooting docs](#troubleshooting-cluster-command-endpoint-errors-agent-hosts-command-endpoint-zone)
[troubleshooting docs](15-troubleshooting.md#troubleshooting-cluster-command-endpoint-errors-agent-hosts-command-endpoint-zone)
for further instructions.
#### Config Sync <a id="upgrading-to-2-11-cluster-config-sync"></a>

View File

@ -0,0 +1,5 @@
object IcingaDB "icingadb" {
//host = "127.0.0.1"
//port = 6379
//password = "xxx"
}

View File

@ -53,6 +53,10 @@ if(ICINGA2_WITH_PERFDATA)
list(APPEND icinga_app_SOURCES $<TARGET_OBJECTS:perfdata>)
endif()
if(ICINGA2_WITH_ICINGADB)
list(APPEND icinga_app_SOURCES $<TARGET_OBJECTS:icingadb>)
endif()
add_executable(icinga-app
$<TARGET_OBJECTS:icingaloader>
${base_OBJS}

View File

@ -53,4 +53,8 @@ if(ICINGA2_WITH_PERFDATA)
add_subdirectory(perfdata)
endif()
if(ICINGA2_WITH_ICINGADB)
add_subdirectory(icingadb)
endif()
set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE)

View File

@ -300,6 +300,7 @@ void Checkable::ProcessCheckResult(const CheckResult::Ptr& cr, const MessageOrig
if (hardChange || is_volatile) {
SetLastHardStateRaw(new_state);
SetLastHardStateChange(now);
SetLastHardStatesRaw(GetLastHardStatesRaw() / 100u + new_state * 100u);
}
if (!IsStateOK(new_state))

View File

@ -39,23 +39,6 @@ enum CheckableType
CheckableService
};
/**
* Severity Flags
*
* @ingroup icinga
*/
enum SeverityFlag
{
SeverityFlagDowntime = 1,
SeverityFlagAcknowledgement = 2,
SeverityFlagHostDown = 4,
SeverityFlagUnhandled = 8,
SeverityFlagPending = 16,
SeverityFlagWarning = 32,
SeverityFlagUnknown = 64,
SeverityFlagCritical = 128,
};
class CheckCommand;
class EventCommand;
class Dependency;

View File

@ -105,6 +105,9 @@ abstract class Checkable : CustomVarObject
[state, enum, no_user_view, no_user_modify] ServiceState last_hard_state_raw {
default {{{ return ServiceUnknown; }}}
};
[state, no_user_view, no_user_modify] "unsigned short" last_hard_states_raw {
default {{{ return /* current */ 99 * 100 + /* previous */ 99; }}}
};
[state, enum] StateType last_state_type {
default {{{ return StateTypeSoft; }}}
};

View File

@ -249,7 +249,7 @@ std::set<UserGroup::Ptr> CompatUtility::GetCheckableNotificationUserGroups(const
return usergroups;
}
/* Used in DB IDO, StatusDataWriter, Livestatus, CompatLogger, GelfWriter. */
/* Used in DB IDO, StatusDataWriter, Livestatus, CompatLogger, GelfWriter, IcingaDB. */
String CompatUtility::GetCheckResultOutput(const CheckResult::Ptr& cr)
{
if (!cr)
@ -264,7 +264,7 @@ String CompatUtility::GetCheckResultOutput(const CheckResult::Ptr& cr)
return raw_output.SubStr(0, line_end);
}
/* Used in DB IDO, StatusDataWriter and Livestatus. */
/* Used in DB IDO, StatusDataWriter and Livestatus, IcingaDB. */
String CompatUtility::GetCheckResultLongOutput(const CheckResult::Ptr& cr)
{
if (!cr)

View File

@ -164,32 +164,39 @@ HostState Host::GetLastHardState() const
return CalculateState(GetLastHardStateRaw());
}
/* keep in sync with Service::GetSeverity() */
/* keep in sync with Service::GetSeverity()
* One could think it may be smart to use an enum and some bitmask math here.
* But the only thing the consuming icingaweb2 cares about is being able to
* sort by severity. It is therefore easier to keep them seperated here. */
int Host::GetSeverity() const
{
int severity = 0;
ObjectLock olock(this);
ServiceState state = GetStateRaw();
HostState state = GetState();
/* OK/Warning = Up, Critical/Unknownb = Down */
if (!HasBeenChecked())
severity |= SeverityFlagPending;
else if (state == ServiceUnknown)
severity |= SeverityFlagCritical;
else if (state == ServiceCritical)
severity |= SeverityFlagCritical;
if (!HasBeenChecked()) {
severity = 16;
} else if (state == HostUp) {
severity = 0;
} else {
if (IsReachable())
severity = 64;
else
severity = 32;
if (IsInDowntime())
severity |= SeverityFlagDowntime;
else if (IsAcknowledged())
severity |= SeverityFlagAcknowledgement;
else
severity |= SeverityFlagUnhandled;
if (IsAcknowledged())
severity += 512;
else if (IsInDowntime())
severity += 256;
else
severity += 2048;
}
olock.Unlock();
return severity;
}
bool Host::IsStateOK(ServiceState state) const

View File

@ -103,32 +103,50 @@ Host::Ptr Service::GetHost() const
return m_Host;
}
/* keep in sync with Host::GetSeverity() */
/* keep in sync with Host::GetSeverity()
* One could think it may be smart to use an enum and some bitmask math here.
* But the only thing the consuming icingaweb2 cares about is being able to
* sort by severity. It is therefore easier to keep them seperated here. */
int Service::GetSeverity() const
{
int severity = 0;
int severity;
ObjectLock olock(this);
ServiceState state = GetStateRaw();
if (!HasBeenChecked())
severity |= SeverityFlagPending;
else if (state == ServiceWarning)
severity |= SeverityFlagWarning;
else if (state == ServiceUnknown)
severity |= SeverityFlagUnknown;
else if (state == ServiceCritical)
severity |= SeverityFlagCritical;
if (!HasBeenChecked()) {
severity = 16;
} else if (state == ServiceOK) {
severity = 0;
} else {
switch (state) {
case ServiceWarning:
severity = 32;
break;
case ServiceUnknown:
severity = 64;
break;
case ServiceCritical:
severity = 128;
break;
default:
severity = 256;
}
/* TODO: Add host reachability and handled */
if (IsInDowntime())
severity |= SeverityFlagDowntime;
else if (IsAcknowledged())
severity |= SeverityFlagAcknowledgement;
else if (m_Host && m_Host->GetProblem())
severity |= SeverityFlagHostDown;
else
severity |= SeverityFlagUnhandled;
Host::Ptr host = GetHost();
ObjectLock hlock (host);
if (host->GetState() != HostUp || !host->IsReachable()) {
severity += 1024;
} else {
if (IsAcknowledged())
severity += 512;
else if (IsInDowntime())
severity += 256;
else
severity += 2048;
}
hlock.Unlock();
}
olock.Unlock();

View File

@ -0,0 +1,29 @@
# Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+
mkclass_target(icingadb.ti icingadb-ti.cpp icingadb-ti.hpp)
set(icingadb_SOURCES
icingadb.cpp icingadb-objects.cpp icingadb-stats.cpp icingadb-utility.cpp redisconnection.cpp icingadb-ti.hpp
)
if(ICINGA2_UNITY_BUILD)
mkunity_target(icingadb icingadb icingadb_SOURCES)
endif()
add_library(icingadb OBJECT ${icingadb_SOURCES})
include_directories(${icinga2_SOURCE_DIR}/third-party)
add_dependencies(icingadb base config icinga remote)
set_target_properties (
icingadb PROPERTIES
FOLDER Components
)
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/icingadb.conf
${CMAKE_INSTALL_SYSCONFDIR}/icinga2/features-available
)
set(CPACK_NSIS_EXTRA_INSTALL_COMMANDS "${CPACK_NSIS_EXTRA_INSTALL_COMMANDS}" PARENT_SCOPE)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,43 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "icingadb/icingadb.hpp"
#include "base/json.hpp"
#include "base/logger.hpp"
#include "base/serializer.hpp"
#include "base/statsfunction.hpp"
#include "base/convert.hpp"
using namespace icinga;
Dictionary::Ptr IcingaDB::GetStats()
{
Dictionary::Ptr stats = new Dictionary();
//TODO: Figure out if more stats can be useful here.
Namespace::Ptr statsFunctions = ScriptGlobal::Get("StatsFunctions", &Empty);
if (!statsFunctions)
Dictionary::Ptr();
ObjectLock olock(statsFunctions);
for (auto& kv : statsFunctions)
{
Function::Ptr func = kv.second->Get();
if (!func)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid status function name."));
Dictionary::Ptr status = new Dictionary();
Array::Ptr perfdata = new Array();
func->Invoke({ status, perfdata });
stats->Set(kv.first, new Dictionary({
{ "status", status },
{ "perfdata", Serialize(perfdata, FAState) }
}));
}
return stats;
}

View File

@ -0,0 +1,245 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "icingadb/icingadb.hpp"
#include "base/configtype.hpp"
#include "base/object-packer.hpp"
#include "base/logger.hpp"
#include "base/serializer.hpp"
#include "base/tlsutility.hpp"
#include "base/initialize.hpp"
#include "base/objectlock.hpp"
#include "base/array.hpp"
#include "base/scriptglobal.hpp"
#include "base/convert.hpp"
#include "base/json.hpp"
#include "icinga/customvarobject.hpp"
#include "icinga/checkcommand.hpp"
#include "icinga/notificationcommand.hpp"
#include "icinga/eventcommand.hpp"
#include "icinga/host.hpp"
#include <boost/algorithm/string.hpp>
#include <map>
#include <utility>
#include <vector>
using namespace icinga;
String IcingaDB::FormatCheckSumBinary(const String& str)
{
char output[20*2+1];
for (int i = 0; i < 20; i++)
sprintf(output + 2 * i, "%02x", str[i]);
return output;
}
String IcingaDB::FormatCommandLine(const Value& commandLine)
{
String result;
if (commandLine.IsObjectType<Array>()) {
Array::Ptr args = commandLine;
bool first = true;
ObjectLock olock(args);
for (const Value& arg : args) {
String token = "'" + Convert::ToString(arg) + "'";
if (first)
first = false;
else
result += String(1, ' ');
result += token;
}
} else if (!commandLine.IsEmpty()) {
result = commandLine;
boost::algorithm::replace_all(result, "\'", "\\'");
result = "'" + result + "'";
}
return result;
}
String IcingaDB::GetEnvironment()
{
return ConfigType::GetObjectsByType<IcingaApplication>()[0]->GetEnvironment();
}
String IcingaDB::GetObjectIdentifier(const ConfigObject::Ptr& object)
{
Type::Ptr type = object->GetReflectionType();
if (type == CheckCommand::TypeInstance || type == NotificationCommand::TypeInstance || type == EventCommand::TypeInstance)
return HashValue((Array::Ptr)new Array({GetEnvironment(), type->GetName(), object->GetName()}));
else
return HashValue((Array::Ptr)new Array({GetEnvironment(), object->GetName()}));
}
String IcingaDB::CalculateCheckSumString(const String& str)
{
return SHA1(str);
}
String IcingaDB::CalculateCheckSumArray(const Array::Ptr& arr)
{
/* Ensure that checksums happen in a defined order. */
Array::Ptr tmpArr = arr->ShallowClone();
tmpArr->Sort();
return SHA1(PackObject(tmpArr));
}
String IcingaDB::CalculateCheckSumProperties(const ConfigObject::Ptr& object, const std::set<String>& propertiesBlacklist)
{
//TODO: consider precision of 6 for double values; use specific config fields for hashing?
return HashValue(object, propertiesBlacklist);
}
static const std::set<String> metadataWhitelist ({"package", "source_location", "templates"});
String IcingaDB::CalculateCheckSumMetadata(const ConfigObject::Ptr& object)
{
return HashValue(object, metadataWhitelist, true);
}
String IcingaDB::CalculateCheckSumVars(const CustomVarObject::Ptr& object)
{
Dictionary::Ptr vars = object->GetVars();
if (!vars)
return HashValue(Empty);
return HashValue(vars);
}
/**
* Prepare object's custom vars for being written to Redis
*
* object.vars = {
* "disks": {
* "disk": {},
* "disk /": {
* "disk_partitions": "/"
* }
* }
* }
*
* return {
* SHA1(PackObject([
* Environment,
* "disks",
* {
* "disk": {},
* "disk /": {
* "disk_partitions": "/"
* }
* }
* ])): {
* "envId": SHA1(Environment),
* "name_checksum": SHA1("disks"),
* "name": "disks",
* "value": {
* "disk": {},
* "disk /": {
* "disk_partitions": "/"
* }
* }
* }
* }
*
* @param object Config object with custom vars
*
* @return JSON-like data structure for Redis
*/
Dictionary::Ptr IcingaDB::SerializeVars(const CustomVarObject::Ptr& object)
{
Dictionary::Ptr vars = object->GetVars();
if (!vars)
return nullptr;
Dictionary::Ptr res = new Dictionary();
auto env (GetEnvironment());
auto envChecksum (SHA1(env));
ObjectLock olock(vars);
for (auto& kv : vars) {
res->Set(
SHA1(PackObject((Array::Ptr)new Array({env, kv.first, kv.second}))),
(Dictionary::Ptr)new Dictionary({
{"environment_id", envChecksum},
{"name_checksum", SHA1(kv.first)},
{"name", kv.first},
{"value", JsonEncode(kv.second)},
})
);
}
return res;
}
static const std::set<String> propertiesBlacklistEmpty;
String IcingaDB::HashValue(const Value& value)
{
return HashValue(value, propertiesBlacklistEmpty);
}
String IcingaDB::HashValue(const Value& value, const std::set<String>& propertiesBlacklist, bool propertiesWhitelist)
{
Value temp;
bool mutabl;
Type::Ptr type = value.GetReflectionType();
if (ConfigObject::TypeInstance->IsAssignableFrom(type)) {
temp = Serialize(value, FAConfig);
mutabl = true;
} else {
temp = value;
mutabl = false;
}
if (propertiesBlacklist.size() && temp.IsObject()) {
Dictionary::Ptr dict = dynamic_pointer_cast<Dictionary>((Object::Ptr)temp);
if (dict) {
if (!mutabl)
dict = dict->ShallowClone();
ObjectLock olock(dict);
if (propertiesWhitelist) {
auto current = dict->Begin();
auto propertiesBlacklistEnd = propertiesBlacklist.end();
while (current != dict->End()) {
if (propertiesBlacklist.find(current->first) == propertiesBlacklistEnd) {
dict->Remove(current++);
} else {
++current;
}
}
} else {
for (auto& property : propertiesBlacklist)
dict->Remove(property);
}
if (!mutabl)
temp = dict;
}
}
return SHA1(PackObject(temp));
}
String IcingaDB::GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj)
{
return obj->GetReflectionType()->GetName().ToLower();
}
long long IcingaDB::TimestampToMilliseconds(double timestamp) {
return static_cast<long long>(timestamp * 1000);
}

350
lib/icingadb/icingadb.cpp Normal file
View File

@ -0,0 +1,350 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "icingadb/icingadb.hpp"
#include "icingadb/icingadb-ti.cpp"
#include "icingadb/redisconnection.hpp"
#include "remote/eventqueue.hpp"
#include "base/json.hpp"
#include "icinga/checkable.hpp"
#include "icinga/host.hpp"
#include <boost/algorithm/string.hpp>
#include <memory>
#include <utility>
using namespace icinga;
#define MAX_EVENTS_DEFAULT 5000
REGISTER_TYPE(IcingaDB);
IcingaDB::IcingaDB()
: m_Rcon(nullptr)
{
m_Rcon = nullptr;
m_WorkQueue.SetName("IcingaDB");
m_PrefixConfigObject = "icinga:config:";
m_PrefixConfigCheckSum = "icinga:checksum:";
m_PrefixStateObject = "icinga:config:state:";
}
/**
* Starts the component.
*/
void IcingaDB::Start(bool runtimeCreated)
{
ObjectImpl<IcingaDB>::Start(runtimeCreated);
Log(LogInformation, "IcingaDB")
<< "'" << GetName() << "' started.";
m_ConfigDumpInProgress = false;
m_ConfigDumpDone = false;
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex());
m_Rcon->Start();
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
m_ReconnectTimer = new Timer();
m_ReconnectTimer->SetInterval(15);
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0);
m_SubscriptionTimer = new Timer();
m_SubscriptionTimer->SetInterval(15);
m_SubscriptionTimer->OnTimerExpired.connect([this](const Timer * const&) { UpdateSubscriptionsTimerHandler(); });
m_SubscriptionTimer->Start();
m_StatsTimer = new Timer();
m_StatsTimer->SetInterval(1);
m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); });
m_StatsTimer->Start();
m_WorkQueue.SetName("IcingaDB");
boost::thread thread(&IcingaDB::HandleEvents, this);
thread.detach();
}
void IcingaDB::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "IcingaDB", "Exception during redis query. Verify that Redis is operational.");
Log(LogDebug, "IcingaDB")
<< "Exception during redis operation: " << DiagnosticInformation(exp);
}
void IcingaDB::ReconnectTimerHandler()
{
m_WorkQueue.Enqueue([this]() { TryToReconnect(); });
}
void IcingaDB::TryToReconnect()
{
AssertOnWorkQueue();
if (m_ConfigDumpDone)
return;
else
m_Rcon->Start();
if (!m_Rcon || !m_Rcon->IsConnected())
return;
UpdateSubscriptions();
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
return;
/* Config dump */
m_ConfigDumpInProgress = true;
PublishStats();
UpdateAllConfigObjects();
m_ConfigDumpDone = true;
m_ConfigDumpInProgress = false;
}
void IcingaDB::UpdateSubscriptionsTimerHandler()
{
m_WorkQueue.Enqueue([this]() { UpdateSubscriptions(); });
}
void IcingaDB::UpdateSubscriptions()
{
AssertOnWorkQueue();
Log(LogNotice, "IcingaDB", "Updating Redis subscriptions");
/* TODO:
* Silently return in this case. Usually the RedisConnection checks for connectivity and logs in failure case.
* But since we expect and answer here and break Icinga in case of receiving no answer/an unexpected one we opt for
* better safe than sorry here. Future implementation needs to include an improved error handling and answer verification.
*/
if (!m_Rcon || !m_Rcon->IsConnected())
return;
String cursor = "0";
String keyPrefix = "icinga:subscription:";
do {
Array::Ptr reply = m_Rcon->GetResultOfQuery({ "SCAN", cursor, "MATCH", keyPrefix + "*", "COUNT", "1000" });
VERIFY(reply->GetLength() % 2u == 0u);
cursor = reply->Get(0);
Array::Ptr keys = reply->Get(1);
ObjectLock oLock (keys);
for (String key : keys) {
if (boost::algorithm::ends_with(key, ":limit"))
continue;
RedisSubscriptionInfo rsi;
if (!IcingaDB::GetSubscriptionTypes(key, rsi)) {
Log(LogInformation, "IcingaDB")
<< "Subscription \"" << key << "\" has no types listed.";
} else {
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
}
}
} while (cursor != "0");
Log(LogNotice, "IcingaDB")
<< "Current Redis event subscriptions: " << m_Subscriptions.size();
}
bool IcingaDB::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
{
try {
Array::Ptr redisReply = m_Rcon->GetResultOfQuery({ "SMEMBERS", key });
if (redisReply->GetLength() == 0)
return false;
{
ObjectLock oLock (redisReply);
for (String member : redisReply) {
rsi.EventTypes.insert(member);
}
}
Log(LogInformation, "IcingaDB")
<< "Subscriber Info - Key: " << key << " Value: " << Value(Array::FromSet(rsi.EventTypes));
} catch (const std::exception& ex) {
Log(LogWarning, "IcingaDB")
<< "Invalid Redis subscriber info for subscriber '" << key << "': " << DiagnosticInformation(ex);
return false;
}
return true;
}
void IcingaDB::PublishStatsTimerHandler(void)
{
m_WorkQueue.Enqueue([this]() { PublishStats(); });
}
void IcingaDB::PublishStats()
{
AssertOnWorkQueue();
if (!m_Rcon || !m_Rcon->IsConnected())
return;
Dictionary::Ptr status = GetStats();
status->Set("config_dump_in_progress", m_ConfigDumpInProgress);
String jsonStats = JsonEncode(status);
m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats }, true);
}
void IcingaDB::HandleEvents()
{
String queueName = Utility::NewUniqueID();
EventQueue::Ptr queue = new EventQueue(queueName);
EventQueue::Register(queueName, queue);
std::set<String> types;
types.insert("CheckResult");
types.insert("StateChange");
types.insert("Notification");
types.insert("AcknowledgementSet");
types.insert("AcknowledgementCleared");
types.insert("CommentAdded");
types.insert("CommentRemoved");
types.insert("DowntimeAdded");
types.insert("DowntimeRemoved");
types.insert("DowntimeStarted");
types.insert("DowntimeTriggered");
queue->SetTypes(types);
queue->AddClient(this);
for (;;) {
Dictionary::Ptr event = queue->WaitForEvent(this);
if (!event)
continue;
m_WorkQueue.Enqueue([this, event]() { SendEvent(event); });
}
queue->RemoveClient(this);
EventQueue::UnregisterIfUnused(queueName, queue);
}
void IcingaDB::HandleEvent(const Dictionary::Ptr& event)
{
AssertOnWorkQueue();
for (const std::pair<String, RedisSubscriptionInfo>& kv : m_Subscriptions) {
const auto& name = kv.first;
const auto& rsi = kv.second;
if (rsi.EventTypes.find(event->Get("type")) == rsi.EventTypes.end())
continue;
String body = JsonEncode(event);
double maxExists = m_Rcon->GetResultOfQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" });
long maxEvents = MAX_EVENTS_DEFAULT;
if (maxExists != 0) {
String redisReply = m_Rcon->GetResultOfQuery({ "GET", "icinga:subscription:" + name + ":limit"});
Log(LogInformation, "IcingaDB")
<< "Got limit " << redisReply << " for " << name;
maxEvents = Convert::ToLong(redisReply);
}
m_Rcon->FireAndForgetQueries({
{ "MULTI" },
{ "LPUSH", "icinga:event:" + name, body },
{ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)},
{ "EXEC" }});
}
}
void IcingaDB::SendEvent(const Dictionary::Ptr& event)
{
AssertOnWorkQueue();
if (!m_Rcon || !m_Rcon->IsConnected())
return;
String type = event->Get("type");
if (type == "CheckResult") {
Checkable::Ptr checkable;
if (event->Contains("service")) {
checkable = Service::GetByNamePair(event->Get("host"), event->Get("service"));
} else {
checkable = Host::GetByName(event->Get("host"));
}
// Update State for icingaweb
m_WorkQueue.Enqueue([this, checkable]() { UpdateState(checkable); });
}
if (type.Contains("Acknowledgement")) {
Checkable::Ptr checkable;
if (event->Contains("service")) {
checkable = Service::GetByNamePair(event->Get("host"), event->Get("service"));
event->Set("service_id", GetObjectIdentifier(checkable));
} else {
checkable = Host::GetByName(event->Get("host"));
event->Set("host_id", GetObjectIdentifier(checkable));
}
if (type == "AcknowledgementSet") {
Timestamp entry = 0;
Comment::Ptr AckComment;
for (const Comment::Ptr& c : checkable->GetComments()) {
if (c->GetEntryType() == CommentAcknowledgement) {
if (c->GetEntryTime() > entry) {
entry = c->GetEntryTime();
AckComment = c;
StateChangeHandler(checkable);
}
}
}
event->Set("comment_id", GetObjectIdentifier(AckComment));
}
}
String body = JsonEncode(event);
m_Rcon->FireAndForgetQueries({
{ "PUBLISH", "icinga:event:all", body },
{ "PUBLISH", "icinga:event:" + event->Get("type"), body }});
}
void IcingaDB::Stop(bool runtimeRemoved)
{
Log(LogInformation, "IcingaDB")
<< "'" << GetName() << "' stopped.";
ObjectImpl<IcingaDB>::Stop(runtimeRemoved);
}
void IcingaDB::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}

141
lib/icingadb/icingadb.hpp Normal file
View File

@ -0,0 +1,141 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef ICINGADB_H
#define ICINGADB_H
#include "icingadb/icingadb-ti.hpp"
#include "icingadb/redisconnection.hpp"
#include "base/timer.hpp"
#include "base/workqueue.hpp"
#include "icinga/customvarobject.hpp"
#include "icinga/checkable.hpp"
#include "icinga/service.hpp"
#include "icinga/downtime.hpp"
#include "remote/messageorigin.hpp"
#include <memory>
namespace icinga
{
struct RedisSubscriptionInfo
{
std::set<String> EventTypes;
};
/**
* @ingroup icingadb
*/
class IcingaDB : public ObjectImpl<IcingaDB>
{
public:
DECLARE_OBJECT(IcingaDB);
DECLARE_OBJECTNAME(IcingaDB);
IcingaDB();
static void ConfigStaticInitialize();
virtual void Start(bool runtimeCreated) override;
virtual void Stop(bool runtimeRemoved) override;
private:
void ReconnectTimerHandler();
void TryToReconnect();
void HandleEvents();
void HandleEvent(const Dictionary::Ptr& event);
void SendEvent(const Dictionary::Ptr& event);
void UpdateSubscriptionsTimerHandler();
void UpdateSubscriptions();
bool GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi);
void PublishStatsTimerHandler();
void PublishStats();
/* config & status dump */
void UpdateAllConfigObjects();
std::vector<std::vector<intrusive_ptr<ConfigObject>>> ChunkObjects(std::vector<intrusive_ptr<ConfigObject>> objects, size_t chunkSize);
void DeleteKeys(const std::vector<String>& keys);
std::vector<String> GetTypeObjectKeys(const String& type);
void InsertObjectDependencies(const ConfigObject::Ptr& object, const String typeName, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void UpdateState(const Checkable::Ptr& checkable);
void SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpdate);
void CreateConfigUpdate(const ConfigObject::Ptr& object, const String type, std::map<String, std::vector<String>>& hMSets,
std::map<String, std::vector<String>>& publishes, bool runtimeUpdate);
void SendConfigDelete(const ConfigObject::Ptr& object);
void SendStatusUpdate(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
void SendSentNotification(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, size_t users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text
);
void SendStartedDowntime(const Downtime::Ptr& downtime);
void SendRemovedDowntime(const Downtime::Ptr& downtime);
void SendAddedComment(const Comment::Ptr& comment);
void SendRemovedComment(const Comment::Ptr& comment);
void SendFlappingChanged(const Checkable::Ptr& checkable, const Value& value);
std::vector<String> UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType, const String& typeNameOverride);
Dictionary::Ptr SerializeState(const Checkable::Ptr& checkable);
/* Stats */
Dictionary::Ptr GetStats();
/* utilities */
static String FormatCheckSumBinary(const String& str);
static String FormatCommandLine(const Value& commandLine);
static long long TimestampToMilliseconds(double timestamp);
static String GetObjectIdentifier(const ConfigObject::Ptr& object);
static String GetEnvironment();
static String CalculateCheckSumString(const String& str);
static String CalculateCheckSumArray(const Array::Ptr& arr);
static String CalculateCheckSumProperties(const ConfigObject::Ptr& object, const std::set<String>& propertiesBlacklist);
static String CalculateCheckSumMetadata(const ConfigObject::Ptr& object);
static String CalculateCheckSumVars(const CustomVarObject::Ptr& object);
static Dictionary::Ptr SerializeVars(const CustomVarObject::Ptr& object);
static String HashValue(const Value& value);
static String HashValue(const Value& value, const std::set<String>& propertiesBlacklist, bool propertiesWhitelist = false);
static String GetLowerCaseTypeNameDB(const ConfigObject::Ptr& obj);
static bool PrepareObject(const ConfigObject::Ptr& object, Dictionary::Ptr& attributes, Dictionary::Ptr& checkSums);
static void StateChangeHandler(const ConfigObject::Ptr& object);
static void StateChangeHandler(const ConfigObject::Ptr& object, const CheckResult::Ptr& cr, StateType type);
static void VersionChangedHandler(const ConfigObject::Ptr& object);
static void DowntimeStartedHandler(const Downtime::Ptr& downtime);
static void DowntimeRemovedHandler(const Downtime::Ptr& downtime);
static void NotificationSentToAllUsersHandler(
const Notification::Ptr& notification, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users,
NotificationType type, const CheckResult::Ptr& cr, const String& author, const String& text
);
static void CommentAddedHandler(const Comment::Ptr& comment);
static void CommentRemovedHandler(const Comment::Ptr& comment);
static void FlappingChangedHandler(const Checkable::Ptr& checkable, const Value& value);
void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp);
Timer::Ptr m_StatsTimer;
Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_SubscriptionTimer;
WorkQueue m_WorkQueue;
std::map<String, RedisSubscriptionInfo> m_Subscriptions;
String m_PrefixConfigObject;
String m_PrefixConfigCheckSum;
String m_PrefixStateObject;
bool m_ConfigDumpInProgress;
bool m_ConfigDumpDone;
RedisConnection::Ptr m_Rcon;
};
}
#endif /* ICINGADB_H */

23
lib/icingadb/icingadb.ti Normal file
View File

@ -0,0 +1,23 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/configobject.hpp"
library icingadb;
namespace icinga
{
class IcingaDB : ConfigObject
{
[config] String host {
default {{{ return "127.0.0.1"; }}}
};
[config] int port {
default {{{ return 6379; }}}
};
[config] String path;
[config] String password;
[config] int db_index;
};
}

View File

@ -0,0 +1,410 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "icingadb/redisconnection.hpp"
#include "base/array.hpp"
#include "base/convert.hpp"
#include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/logger.hpp"
#include "base/objectlock.hpp"
#include "base/string.hpp"
#include "base/tcpsocket.hpp"
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/coroutine/exceptions.hpp>
#include <boost/utility/string_view.hpp>
#include <boost/variant/get.hpp>
#include <exception>
#include <iterator>
#include <memory>
#include <utility>
using namespace icinga;
namespace asio = boost::asio;
RedisConnection::RedisConnection(const String& host, const int port, const String& path, const String& password, const int db) :
RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db)
{
}
RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db)
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io)
{
}
void RedisConnection::Start()
{
if (!m_Started.exchange(true)) {
Ptr keepAlive (this);
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
}
if (!m_Connecting.exchange(true)) {
Ptr keepAlive (this);
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
}
}
bool RedisConnection::IsConnected() {
return m_Connected.load();
}
static inline
void LogQuery(RedisConnection::Query& query, Log& msg)
{
int i = 0;
for (auto& arg : query) {
if (++i == 8) {
msg << " ...";
break;
}
msg << " '" << arg << '\'';
}
}
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool highPrio)
{
{
Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:");
LogQuery(query, msg);
}
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQuery)::element_type>(std::move(query)));
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
m_QueuedWrites.Set();
});
}
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, bool highPrio)
{
for (auto& query : queries) {
Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:");
LogQuery(query, msg);
}
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQueries)::element_type>(std::move(queries)));
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
m_QueuedWrites.Set();
});
}
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, bool highPrio)
{
{
Log msg (LogNotice, "IcingaDB", "Executing query:");
LogQuery(query, msg);
}
std::promise<Reply> promise;
auto future (promise.get_future());
auto item (std::make_shared<decltype(WriteQueueItem().GetResultOfQuery)::element_type>(std::move(query), std::move(promise)));
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
m_QueuedWrites.Set();
});
item = nullptr;
future.wait();
return future.get();
}
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, bool highPrio)
{
for (auto& query : queries) {
Log msg (LogNotice, "IcingaDB", "Executing query:");
LogQuery(query, msg);
}
std::promise<Replies> promise;
auto future (promise.get_future());
auto item (std::make_shared<decltype(WriteQueueItem().GetResultsOfQueries)::element_type>(std::move(queries), std::move(promise)));
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
m_QueuedWrites.Set();
});
item = nullptr;
future.wait();
return future.get();
}
void RedisConnection::Connect(asio::yield_context& yc)
{
Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
try {
if (m_Path.IsEmpty()) {
Log(LogInformation, "IcingaDB")
<< "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
decltype(m_TcpConn) conn (new TcpConn(m_Strand.context()));
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
m_TcpConn = std::move(conn);
} else {
Log(LogInformation, "IcingaDB")
<< "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
decltype(m_UnixConn) conn (new UnixConn(m_Strand.context()));
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
m_UnixConn = std::move(conn);
}
m_Connected.store(true);
Log(LogInformation, "IcingaDB", "Connected to Redis server");
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log(LogCritical, "IcingaDB")
<< "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
}
}
void RedisConnection::ReadLoop(asio::yield_context& yc)
{
for (;;) {
m_QueuedReads.Wait(yc);
while (!m_Queues.FutureResponseActions.empty()) {
auto item (std::move(m_Queues.FutureResponseActions.front()));
m_Queues.FutureResponseActions.pop();
switch (item.Action) {
case ResponseAction::Ignore:
try {
for (auto i (item.Amount); i; --i) {
ReadOne(yc);
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log(LogCritical, "IcingaDB")
<< "Error during receiving the response to a query which has been fired and forgotten: " << ex.what();
continue;
} catch (...) {
Log(LogCritical, "IcingaDB")
<< "Error during receiving the response to a query which has been fired and forgotten";
continue;
}
break;
case ResponseAction::Deliver:
for (auto i (item.Amount); i; --i) {
auto promise (std::move(m_Queues.ReplyPromises.front()));
m_Queues.ReplyPromises.pop();
Reply reply;
try {
reply = ReadOne(yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
promise.set_exception(std::current_exception());
continue;
}
promise.set_value(std::move(reply));
}
break;
case ResponseAction::DeliverBulk:
{
auto promise (std::move(m_Queues.RepliesPromises.front()));
m_Queues.RepliesPromises.pop();
Replies replies;
replies.reserve(item.Amount);
for (auto i (item.Amount); i; --i) {
try {
replies.emplace_back(ReadOne(yc));
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
promise.set_exception(std::current_exception());
continue;
}
}
promise.set_value(std::move(replies));
}
}
}
m_QueuedReads.Clear();
}
}
void RedisConnection::WriteLoop(asio::yield_context& yc)
{
for (;;) {
m_QueuedWrites.Wait(yc);
for (;;) {
if (m_Queues.HighPrioWrites.empty()) {
if (m_Queues.Writes.empty()) {
break;
} else {
auto next (std::move(m_Queues.Writes.front()));
m_Queues.Writes.pop();
WriteItem(yc, std::move(next));
}
} else {
auto next (std::move(m_Queues.HighPrioWrites.front()));
m_Queues.HighPrioWrites.pop();
WriteItem(yc, std::move(next));
}
}
m_QueuedWrites.Clear();
}
}
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
{
if (next.FireAndForgetQuery) {
auto& item (*next.FireAndForgetQuery);
try {
WriteOne(item, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten: " << ex.what();
return;
} catch (...) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten";
return;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
}
if (next.FireAndForgetQueries) {
auto& item (*next.FireAndForgetQueries);
size_t i = 0;
try {
for (auto& query : item) {
WriteOne(query, yc);
++i;
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten: " << ex.what();
return;
} catch (...) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten";
return;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
} else {
m_Queues.FutureResponseActions.back().Amount += item.size();
}
m_QueuedReads.Set();
}
if (next.GetResultOfQuery) {
auto& item (*next.GetResultOfQuery);
try {
WriteOne(item.first, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
return;
}
m_Queues.ReplyPromises.emplace(std::move(item.second));
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
}
if (next.GetResultsOfQueries) {
auto& item (*next.GetResultsOfQueries);
try {
for (auto& query : item.first) {
WriteOne(query, yc);
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
return;
}
m_Queues.RepliesPromises.emplace(std::move(item.second));
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
m_QueuedReads.Set();
}
}
RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {
return ReadOne(m_TcpConn, yc);
} else {
return ReadOne(m_UnixConn, yc);
}
}
void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {
WriteOne(m_TcpConn, query, yc);
} else {
WriteOne(m_UnixConn, query, yc);
}
}

View File

@ -0,0 +1,398 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef REDISCONNECTION_H
#define REDISCONNECTION_H
#include "base/array.hpp"
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/object.hpp"
#include "base/string.hpp"
#include "base/value.hpp"
#include <boost/asio/spawn.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffered_stream.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/utility/string_view.hpp>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <future>
#include <memory>
#include <queue>
#include <stdexcept>
#include <utility>
#include <vector>
namespace icinga
{
/**
* An Async Redis connection.
*
* @ingroup icingadb
*/
class RedisConnection final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(RedisConnection);
typedef std::vector<String> Query;
typedef std::vector<Query> Queries;
typedef Value Reply;
typedef std::vector<Reply> Replies;
RedisConnection(const String& host, const int port, const String& path,
const String& password = "", const int db = 0);
void Start();
bool IsConnected();
void FireAndForgetQuery(Query query, bool highPrio = false);
void FireAndForgetQueries(Queries queries, bool highPrio = false);
Reply GetResultOfQuery(Query query, bool highPrio = false);
Replies GetResultsOfQueries(Queries queries, bool highPrio = false);
private:
enum class ResponseAction : unsigned char
{
Ignore, Deliver, DeliverBulk
};
struct FutureResponseAction
{
size_t Amount;
ResponseAction Action;
};
struct WriteQueueItem
{
std::shared_ptr<Query> FireAndForgetQuery;
std::shared_ptr<Queries> FireAndForgetQueries;
std::shared_ptr<std::pair<Query, std::promise<Reply>>> GetResultOfQuery;
std::shared_ptr<std::pair<Queries, std::promise<Replies>>> GetResultsOfQueries;
};
typedef boost::asio::ip::tcp Tcp;
typedef boost::asio::local::stream_protocol Unix;
typedef boost::asio::buffered_stream<Tcp::socket> TcpConn;
typedef boost::asio::buffered_stream<Unix::socket> UnixConn;
template<class AsyncReadStream>
static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc);
template<class AsyncReadStream>
static std::vector<char> ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0);
template<class AsyncWriteStream>
static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc);
template<class AsyncWriteStream>
static void WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc);
RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db);
void Connect(boost::asio::yield_context& yc);
void ReadLoop(boost::asio::yield_context& yc);
void WriteLoop(boost::asio::yield_context& yc);
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
Reply ReadOne(boost::asio::yield_context& yc);
void WriteOne(Query& query, boost::asio::yield_context& yc);
template<class StreamPtr>
Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc);
template<class StreamPtr>
void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc);
String m_Path;
String m_Host;
int m_Port;
String m_Password;
int m_DbIndex;
boost::asio::io_context::strand m_Strand;
std::shared_ptr<TcpConn> m_TcpConn;
std::shared_ptr<UnixConn> m_UnixConn;
Atomic<bool> m_Connecting, m_Connected, m_Started;
struct {
std::queue<WriteQueueItem> Writes, HighPrioWrites;
std::queue<std::promise<Reply>> ReplyPromises;
std::queue<std::promise<Replies>> RepliesPromises;
std::queue<FutureResponseAction> FutureResponseActions;
} m_Queues;
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
};
class RedisError final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(RedisError);
inline RedisError(String message) : m_Message(std::move(message))
{
}
inline const String& GetMessage()
{
return m_Message;
}
private:
String m_Message;
};
class RedisDisconnected : public std::runtime_error
{
public:
inline RedisDisconnected() : runtime_error("")
{
}
};
class RedisProtocolError : public std::runtime_error
{
protected:
inline RedisProtocolError() : runtime_error("")
{
}
};
class BadRedisType : public RedisProtocolError
{
public:
inline BadRedisType(char type) : m_What{type, 0}
{
}
virtual const char * what() const noexcept override
{
return m_What;
}
private:
char m_What[2];
};
class BadRedisInt : public RedisProtocolError
{
public:
inline BadRedisInt(std::vector<char> intStr) : m_What(std::move(intStr))
{
m_What.emplace_back(0);
}
virtual const char * what() const noexcept override
{
return m_What.data();
}
private:
std::vector<char> m_What;
};
template<class StreamPtr>
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
{
if (!stream) {
throw RedisDisconnected();
}
auto strm (stream);
try {
return ReadRESP(*strm, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
if (m_Connecting.exchange(false)) {
m_Connected.store(false);
stream = nullptr;
}
throw;
}
}
template<class StreamPtr>
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
{
if (!stream) {
throw RedisDisconnected();
}
auto strm (stream);
try {
WriteRESP(*strm, query, yc);
strm->async_flush(yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
if (m_Connecting.exchange(false)) {
m_Connected.store(false);
stream = nullptr;
}
throw;
}
}
template<class AsyncReadStream>
Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc)
{
namespace asio = boost::asio;
char type = 0;
asio::async_read(stream, asio::mutable_buffer(&type, 1), yc);
switch (type) {
case '+':
{
auto buf (ReadLine(stream, yc));
return String(buf.begin(), buf.end());
}
case '-':
{
auto buf (ReadLine(stream, yc));
return new RedisError(String(buf.begin(), buf.end()));
}
case ':':
{
auto buf (ReadLine(stream, yc, 21));
intmax_t i = 0;
try {
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
} catch (...) {
throw BadRedisInt(std::move(buf));
}
return (double)i;
}
case '$':
{
auto buf (ReadLine(stream, yc, 21));
intmax_t i = 0;
try {
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
} catch (...) {
throw BadRedisInt(std::move(buf));
}
if (i < 0) {
return Value();
}
buf.clear();
buf.insert(buf.end(), i, 0);
asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc);
{
char crlf[2];
asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc);
}
return String(buf.begin(), buf.end());
}
case '*':
{
auto buf (ReadLine(stream, yc, 21));
intmax_t i = 0;
try {
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
} catch (...) {
throw BadRedisInt(std::move(buf));
}
Array::Ptr arr = new Array();
if (i < 0) {
i = 0;
}
arr->Reserve(i);
for (; i; --i) {
arr->Add(ReadRESP(stream, yc));
}
return arr;
}
default:
throw BadRedisType(type);
}
}
template<class AsyncReadStream>
std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint)
{
namespace asio = boost::asio;
std::vector<char> line;
line.reserve(hint);
char next = 0;
asio::mutable_buffer buf (&next, 1);
for (;;) {
asio::async_read(stream, buf, yc);
if (next == '\r') {
asio::async_read(stream, buf, yc);
return std::move(line);
}
line.emplace_back(next);
}
}
template<class AsyncWriteStream>
void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc)
{
namespace asio = boost::asio;
asio::async_write(stream, asio::const_buffer("*", 1), yc);
WriteInt(stream, query.size(), yc);
asio::async_write(stream, asio::const_buffer("\r\n", 2), yc);
for (auto& arg : query) {
asio::async_write(stream, asio::const_buffer("$", 1), yc);
WriteInt(stream, arg.GetLength(), yc);
asio::async_write(stream, asio::const_buffer("\r\n", 2), yc);
asio::async_write(stream, asio::const_buffer(arg.CStr(), arg.GetLength()), yc);
asio::async_write(stream, asio::const_buffer("\r\n", 2), yc);
}
}
template<class AsyncWriteStream>
void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc)
{
namespace asio = boost::asio;
char buf[21] = {};
sprintf(buf, "%jd", i);
asio::async_write(stream, asio::const_buffer(buf, strlen(buf)), yc);
}
}
#endif //REDISCONNECTION_H

View File

@ -90,6 +90,25 @@ void EventQueue::SetFilter(std::unique_ptr<Expression> filter)
m_Filter.swap(filter);
}
Dictionary::Ptr EventQueue::WaitForEvent(void *client, double timeout)
{
boost::mutex::scoped_lock lock(m_Mutex);
for (;;) {
auto it = m_Events.find(client);
ASSERT(it != m_Events.end());
if (!it->second.empty()) {
Dictionary::Ptr result = *it->second.begin();
it->second.pop_front();
return result;
}
if (!m_CV.timed_wait(lock, boost::posix_time::milliseconds(long(timeout * 1000))))
return nullptr;
}
}
std::vector<EventQueue::Ptr> EventQueue::GetQueuesForType(const String& type)
{
EventQueueRegistry::ItemMap queues = EventQueueRegistry::GetInstance()->GetItems();

View File

@ -36,6 +36,8 @@ public:
void SetTypes(const std::set<String>& types);
void SetFilter(std::unique_ptr<Expression> filter);
Dictionary::Ptr WaitForEvent(void *client, double timeout = 5);
static std::vector<EventQueue::Ptr> GetQueuesForType(const String& type);
static void UnregisterIfUnused(const String& name, const EventQueue::Ptr& queue);