mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-26 23:24:09 +02:00
parent
a6ec60a992
commit
50594ec1c8
@ -43,7 +43,7 @@ void RedisConnection::Start()
|
|||||||
{
|
{
|
||||||
RedisConnection::Connect();
|
RedisConnection::Connect();
|
||||||
|
|
||||||
std::thread thread(std::bind(&RedisConnection::HandleRW, this));
|
std::thread thread(&RedisConnection::HandleRW, this);
|
||||||
thread.detach();
|
thread.detach();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,14 +181,18 @@ void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status)
|
|||||||
|
|
||||||
void RedisConnection::ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn, void *privdata)
|
void RedisConnection::ExecuteQuery(const std::vector<String>& query, redisCallbackFn *fn, void *privdata)
|
||||||
{
|
{
|
||||||
m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata));
|
m_RedisConnectionWorkQueue.Enqueue([this, query, fn, privdata]() {
|
||||||
|
SendMessageInternal(query, fn, privdata);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
RedisConnection::ExecuteQueries(const std::vector<std::vector<String> >& queries, redisCallbackFn *fn, void *privdata)
|
RedisConnection::ExecuteQueries(const std::vector<std::vector<String> >& queries, redisCallbackFn *fn, void *privdata)
|
||||||
{
|
{
|
||||||
for (const auto& query : queries) {
|
for (const auto& query : queries) {
|
||||||
m_RedisConnectionWorkQueue.Enqueue(std::bind(&RedisConnection::SendMessageInternal, this, query, fn, privdata));
|
m_RedisConnectionWorkQueue.Enqueue([this, query, fn, privdata]() {
|
||||||
|
SendMessageInternal(query, fn, privdata);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,20 +53,29 @@ INITIALIZE_ONCE(&RedisWriter::ConfigStaticInitialize);
|
|||||||
void RedisWriter::ConfigStaticInitialize()
|
void RedisWriter::ConfigStaticInitialize()
|
||||||
{
|
{
|
||||||
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
|
/* triggered in ProcessCheckResult(), requires UpdateNextCheck() to be called before */
|
||||||
Checkable::OnStateChange.connect(std::bind(&RedisWriter::StateChangeHandler, _1));
|
Checkable::OnStateChange.connect([](const Checkable::Ptr& checkable, const CheckResult::Ptr&, StateType, const MessageOrigin::Ptr&) {
|
||||||
|
RedisWriter::StateChangeHandler(checkable);
|
||||||
|
});
|
||||||
|
|
||||||
/* triggered when acknowledged host/service goes back to ok and when the acknowledgement gets deleted */
|
/* triggered when acknowledged host/service goes back to ok and when the acknowledgement gets deleted */
|
||||||
Checkable::OnAcknowledgementCleared.connect(std::bind(&RedisWriter::StateChangeHandler, _1));
|
Checkable::OnAcknowledgementCleared.connect([](const Checkable::Ptr& checkable, const MessageOrigin::Ptr&) {
|
||||||
|
RedisWriter::StateChangeHandler(checkable);
|
||||||
|
});
|
||||||
|
|
||||||
/* triggered on create, update and delete objects */
|
/* triggered on create, update and delete objects */
|
||||||
ConfigObject::OnActiveChanged.connect(std::bind(&RedisWriter::VersionChangedHandler, _1));
|
ConfigObject::OnActiveChanged.connect([](const ConfigObject::Ptr& object, const Value&) {
|
||||||
ConfigObject::OnVersionChanged.connect(std::bind(&RedisWriter::VersionChangedHandler, _1));
|
RedisWriter::VersionChangedHandler(object);
|
||||||
|
});
|
||||||
|
ConfigObject::OnVersionChanged.connect([](const ConfigObject::Ptr& object, const Value&) {
|
||||||
|
RedisWriter::VersionChangedHandler(object);
|
||||||
|
});
|
||||||
|
|
||||||
/* fixed downtime start */
|
/* fixed downtime start */
|
||||||
Downtime::OnDowntimeStarted.connect(std::bind(&RedisWriter::DowntimeChangedHandler, _1));
|
Downtime::OnDowntimeStarted.connect(&RedisWriter::DowntimeChangedHandler);
|
||||||
/* flexible downtime start */
|
/* flexible downtime start */
|
||||||
Downtime::OnDowntimeTriggered.connect(std::bind(&RedisWriter::DowntimeChangedHandler, _1));
|
Downtime::OnDowntimeTriggered.connect(&RedisWriter::DowntimeChangedHandler);
|
||||||
/* fixed/flexible downtime end */
|
/* fixed/flexible downtime end */
|
||||||
Downtime::OnDowntimeRemoved.connect(std::bind(&RedisWriter::DowntimeChangedHandler, _1));
|
Downtime::OnDowntimeRemoved.connect(&RedisWriter::DowntimeChangedHandler);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisWriter::UpdateAllConfigObjects()
|
void RedisWriter::UpdateAllConfigObjects()
|
||||||
@ -877,7 +886,7 @@ void RedisWriter::StateChangeHandler(const ConfigObject::Ptr &object)
|
|||||||
Type::Ptr type = object->GetReflectionType();
|
Type::Ptr type = object->GetReflectionType();
|
||||||
|
|
||||||
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
|
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
|
||||||
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendStatusUpdate, rw, object));
|
rw->m_WorkQueue.Enqueue([rw, object]() { rw->SendStatusUpdate(object); });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -889,14 +898,14 @@ void RedisWriter::VersionChangedHandler(const ConfigObject::Ptr& object)
|
|||||||
// Create or update the object config
|
// Create or update the object config
|
||||||
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
|
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
|
||||||
if (rw)
|
if (rw)
|
||||||
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigUpdate, rw, object, true));
|
rw->m_WorkQueue.Enqueue([rw, object]() { rw->SendConfigUpdate(object, true); });
|
||||||
}
|
}
|
||||||
} else if (!object->IsActive() &&
|
} else if (!object->IsActive() &&
|
||||||
object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp
|
object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp
|
||||||
// Delete object config
|
// Delete object config
|
||||||
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
|
for (const RedisWriter::Ptr& rw : ConfigType::GetObjectsByType<RedisWriter>()) {
|
||||||
if (rw)
|
if (rw)
|
||||||
rw->m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendConfigDelete, rw, object));
|
rw->m_WorkQueue.Enqueue([rw, object]() { rw->SendConfigDelete(object); });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
#include "icinga/host.hpp"
|
#include "icinga/host.hpp"
|
||||||
|
|
||||||
#include <boost/algorithm/string.hpp>
|
#include <boost/algorithm/string.hpp>
|
||||||
|
#include <utility>
|
||||||
|
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
@ -64,27 +65,27 @@ void RedisWriter::Start(bool runtimeCreated)
|
|||||||
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex());
|
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex());
|
||||||
m_Rcon->Start();
|
m_Rcon->Start();
|
||||||
|
|
||||||
m_WorkQueue.SetExceptionCallback(std::bind(&RedisWriter::ExceptionHandler, this, _1));
|
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||||
|
|
||||||
m_ReconnectTimer = new Timer();
|
m_ReconnectTimer = new Timer();
|
||||||
m_ReconnectTimer->SetInterval(15);
|
m_ReconnectTimer->SetInterval(15);
|
||||||
m_ReconnectTimer->OnTimerExpired.connect(std::bind(&RedisWriter::ReconnectTimerHandler, this));
|
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
||||||
m_ReconnectTimer->Start();
|
m_ReconnectTimer->Start();
|
||||||
m_ReconnectTimer->Reschedule(0);
|
m_ReconnectTimer->Reschedule(0);
|
||||||
|
|
||||||
m_SubscriptionTimer = new Timer();
|
m_SubscriptionTimer = new Timer();
|
||||||
m_SubscriptionTimer->SetInterval(15);
|
m_SubscriptionTimer->SetInterval(15);
|
||||||
m_SubscriptionTimer->OnTimerExpired.connect(std::bind(&RedisWriter::UpdateSubscriptionsTimerHandler, this));
|
m_SubscriptionTimer->OnTimerExpired.connect([this](const Timer * const&) { UpdateSubscriptionsTimerHandler(); });
|
||||||
m_SubscriptionTimer->Start();
|
m_SubscriptionTimer->Start();
|
||||||
|
|
||||||
m_StatsTimer = new Timer();
|
m_StatsTimer = new Timer();
|
||||||
m_StatsTimer->SetInterval(1);
|
m_StatsTimer->SetInterval(1);
|
||||||
m_StatsTimer->OnTimerExpired.connect(std::bind(&RedisWriter::PublishStatsTimerHandler, this));
|
m_StatsTimer->OnTimerExpired.connect([this](const Timer * const&) { PublishStatsTimerHandler(); });
|
||||||
m_StatsTimer->Start();
|
m_StatsTimer->Start();
|
||||||
|
|
||||||
m_WorkQueue.SetName("RedisWriter");
|
m_WorkQueue.SetName("RedisWriter");
|
||||||
|
|
||||||
boost::thread thread(std::bind(&RedisWriter::HandleEvents, this));
|
boost::thread thread(&RedisWriter::HandleEvents, this);
|
||||||
thread.detach();
|
thread.detach();
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -99,7 +100,7 @@ void RedisWriter::ExceptionHandler(boost::exception_ptr exp)
|
|||||||
|
|
||||||
void RedisWriter::ReconnectTimerHandler()
|
void RedisWriter::ReconnectTimerHandler()
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(std::bind(&RedisWriter::TryToReconnect, this));
|
m_WorkQueue.Enqueue([this]() { TryToReconnect(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisWriter::TryToReconnect()
|
void RedisWriter::TryToReconnect()
|
||||||
@ -132,7 +133,7 @@ void RedisWriter::TryToReconnect()
|
|||||||
|
|
||||||
void RedisWriter::UpdateSubscriptionsTimerHandler()
|
void RedisWriter::UpdateSubscriptionsTimerHandler()
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(std::bind(&RedisWriter::UpdateSubscriptions, this));
|
m_WorkQueue.Enqueue([this]() { UpdateSubscriptions(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisWriter::UpdateSubscriptions()
|
void RedisWriter::UpdateSubscriptions()
|
||||||
@ -214,7 +215,7 @@ bool RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
|
|||||||
|
|
||||||
void RedisWriter::PublishStatsTimerHandler(void)
|
void RedisWriter::PublishStatsTimerHandler(void)
|
||||||
{
|
{
|
||||||
m_WorkQueue.Enqueue(std::bind(&RedisWriter::PublishStats, this));
|
m_WorkQueue.Enqueue([this]() { PublishStats(); });
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisWriter::PublishStats()
|
void RedisWriter::PublishStats()
|
||||||
@ -260,7 +261,7 @@ void RedisWriter::HandleEvents()
|
|||||||
if (!event)
|
if (!event)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
m_WorkQueue.Enqueue(std::bind(&RedisWriter::SendEvent, this, event));
|
m_WorkQueue.Enqueue([this, event]() { SendEvent(event); });
|
||||||
}
|
}
|
||||||
|
|
||||||
queue->RemoveClient(this);
|
queue->RemoveClient(this);
|
||||||
@ -318,7 +319,7 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event)
|
|||||||
checkable = Host::GetByName(event->Get("host"));
|
checkable = Host::GetByName(event->Get("host"));
|
||||||
}
|
}
|
||||||
// Update State for icingaweb
|
// Update State for icingaweb
|
||||||
m_WorkQueue.Enqueue(std::bind(&RedisWriter::UpdateState, this, checkable));
|
m_WorkQueue.Enqueue([this, checkable]() { UpdateState(checkable); });
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type.Contains("Acknowledgement")) {
|
if (type.Contains("Acknowledgement")) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user