diff --git a/doc/17-language-reference.md b/doc/17-language-reference.md index 039e3ad31..775684a1a 100644 --- a/doc/17-language-reference.md +++ b/doc/17-language-reference.md @@ -440,6 +440,7 @@ Constant | Description --------------------|------------------- Vars |**Read-write.** Contains a dictionary with global custom attributes. Not set by default. NodeName |**Read-write.** Contains the cluster node name. Set to the local hostname by default. +ReloadTimeout |**Read-write.** Defines the reload timeout for child processes. Defaults to `300s`. Environment |**Read-write.** The name of the Icinga environment. Included in the SNI host name for outbound connections. Not set by default. RunAsUser |**Read-write.** Defines the user the Icinga 2 daemon is running as. Set in the Icinga 2 sysconfig. RunAsGroup |**Read-write.** Defines the group the Icinga 2 daemon is running as. Set in the Icinga 2 sysconfig. diff --git a/lib/base/application.cpp b/lib/base/application.cpp index bdef02f83..11f256bce 100644 --- a/lib/base/application.cpp +++ b/lib/base/application.cpp @@ -60,6 +60,14 @@ void Application::OnConfigLoaded() ASSERT(m_Instance == nullptr); m_Instance = this; + + String reloadTimeout; + + if (ScriptGlobal::Exists("ReloadTimeout")) + reloadTimeout = ScriptGlobal::Get("ReloadTimeout"); + + if (!reloadTimeout.IsEmpty()) + Configuration::ReloadTimeout = Convert::ToDouble(reloadTimeout); } /** @@ -384,8 +392,6 @@ static void ReloadProcessCallback(const ProcessResult& pr) pid_t Application::StartReloadProcess() { - Log(LogInformation, "Application", "Got reload command: Starting new instance."); - // prepare arguments ArrayData args; args.push_back(GetExePath(m_ArgV[0])); @@ -405,9 +411,14 @@ pid_t Application::StartReloadProcess() #endif /* _WIN32 */ Process::Ptr process = new Process(Process::PrepareCommand(new Array(std::move(args)))); - process->SetTimeout(300); + process->SetTimeout(Configuration::ReloadTimeout); process->Run(&ReloadProcessCallback); + Log(LogInformation, "Application") + << "Got reload command: Started new instance with PID '" + << (unsigned long)(process->GetPID()) << "' (timeout is " + << Configuration::ReloadTimeout << "s)."; + return process->GetPID(); } diff --git a/lib/base/configobject.cpp b/lib/base/configobject.cpp index 1969a9e6a..f60c52325 100644 --- a/lib/base/configobject.cpp +++ b/lib/base/configobject.cpp @@ -16,7 +16,6 @@ #include "base/workqueue.hpp" #include "base/context.hpp" #include "base/application.hpp" -#include #include #include #include @@ -601,10 +600,12 @@ void ConfigObject::RestoreObjects(const String& filename, int attributeTypes) void ConfigObject::StopObjects() { - auto types = Type::GetAllTypes(); + std::vector types = Type::GetAllTypes(); std::sort(types.begin(), types.end(), [](const Type::Ptr& a, const Type::Ptr& b) { - return a->GetActivationPriority() > b->GetActivationPriority(); + if (a->GetActivationPriority() > b->GetActivationPriority()) + return true; + return false; }); for (const Type::Ptr& type : types) { diff --git a/lib/base/configuration.cpp b/lib/base/configuration.cpp index d163937e2..4ef31fb1e 100644 --- a/lib/base/configuration.cpp +++ b/lib/base/configuration.cpp @@ -25,6 +25,7 @@ String Configuration::PidPath; String Configuration::PkgDataDir; String Configuration::PrefixDir; String Configuration::ProgramData; +double Configuration::ReloadTimeout{300}; int Configuration::RLimitFiles; int Configuration::RLimitProcesses; int Configuration::RLimitStack; @@ -223,6 +224,16 @@ void Configuration::SetProgramData(const String& val, bool suppress_events, cons HandleUserWrite("ProgramData", &Configuration::ProgramData, val, m_ReadOnly); } +double Configuration::GetReloadTimeout() const +{ + return Configuration::ReloadTimeout; +} + +void Configuration::SetReloadTimeout(double val, bool suppress_events, const Value& cookie) +{ + HandleUserWrite("ReloadTimeout", &Configuration::ReloadTimeout, val, m_ReadOnly); +} + int Configuration::GetRLimitFiles() const { return Configuration::RLimitFiles; diff --git a/lib/base/configuration.hpp b/lib/base/configuration.hpp index 560906596..69781454d 100644 --- a/lib/base/configuration.hpp +++ b/lib/base/configuration.hpp @@ -70,6 +70,9 @@ public: String GetProgramData() const override; void SetProgramData(const String& value, bool suppress_events = false, const Value& cookie = Empty) override; + double GetReloadTimeout() const override; + void SetReloadTimeout(double value, bool suppress_events = false, const Value& cookie = Empty) override; + int GetRLimitFiles() const override; void SetRLimitFiles(int value, bool suppress_events = false, const Value& cookie = Empty) override; @@ -130,6 +133,7 @@ public: static String PkgDataDir; static String PrefixDir; static String ProgramData; + static double ReloadTimeout; static int RLimitFiles; static int RLimitProcesses; static int RLimitStack; diff --git a/lib/base/configuration.ti b/lib/base/configuration.ti index 72fa92dcf..8d4c9b3e0 100644 --- a/lib/base/configuration.ti +++ b/lib/base/configuration.ti @@ -94,6 +94,11 @@ abstract class Configuration set; }; + [config, no_storage, virtual] double ReloadTimeout { + get; + set; + }; + [config, no_storage, virtual] int RLimitFiles { get; set; diff --git a/lib/checker/checkercomponent.cpp b/lib/checker/checkercomponent.cpp index 5321b4d39..0ff4f0e02 100644 --- a/lib/checker/checkercomponent.cpp +++ b/lib/checker/checkercomponent.cpp @@ -6,7 +6,6 @@ #include "icinga/cib.hpp" #include "remote/apilistener.hpp" #include "base/configtype.hpp" -#include "base/defer.hpp" #include "base/objectlock.hpp" #include "base/utility.hpp" #include "base/perfdatavalue.hpp" @@ -57,7 +56,6 @@ void CheckerComponent::Start(bool runtimeCreated) Log(LogInformation, "CheckerComponent") << "'" << GetName() << "' started."; - m_RunningChecks.store(0); m_Thread = std::thread(std::bind(&CheckerComponent::CheckThreadProc, this)); @@ -75,13 +73,32 @@ void CheckerComponent::Stop(bool runtimeRemoved) m_CV.notify_all(); } + double wait = 0.0; + + while (GetPendingCheckables() > 0) { + Log(LogDebug, "CheckerComponent") + << "Waiting for running checks (" << GetPendingCheckables() + << ") to finish. Waited for " << wait << " seconds now."; + + Utility::Sleep(0.1); + wait += 0.1; + + /* Pick a timeout slightly shorther than the process reload timeout. */ + double waitMax = Configuration::ReloadTimeout - 30; + if (waitMax <= 0) + waitMax = 1; + + if (wait > waitMax) { + Log(LogWarning, "CheckerComponent") + << "Checks running too long for " << wait + << " seconds, hard shutdown before reload timeout: " << Configuration::ReloadTimeout << "."; + break; + } + } + m_ResultTimer->Stop(); m_Thread.join(); - while (m_RunningChecks.load()) { - Utility::Sleep(1.0 / 60.0); - } - Log(LogInformation, "CheckerComponent") << "'" << GetName() << "' stopped."; @@ -196,8 +213,6 @@ void CheckerComponent::CheckThreadProc() Checkable::IncreasePendingChecks(); - m_RunningChecks.fetch_add(1); - Utility::QueueAsyncCallback(std::bind(&CheckerComponent::ExecuteCheckHelper, CheckerComponent::Ptr(this), checkable)); lock.lock(); @@ -206,10 +221,6 @@ void CheckerComponent::CheckThreadProc() void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable) { - Defer decrementRunningChecks ([this]{ - m_RunningChecks.fetch_sub(1); - }); - try { checkable->ExecuteCheck(); } catch (const std::exception& ex) { diff --git a/lib/checker/checkercomponent.hpp b/lib/checker/checkercomponent.hpp index bea6ac066..b3589e5fb 100644 --- a/lib/checker/checkercomponent.hpp +++ b/lib/checker/checkercomponent.hpp @@ -8,13 +8,11 @@ #include "base/configobject.hpp" #include "base/timer.hpp" #include "base/utility.hpp" -#include #include #include #include #include #include -#include #include namespace icinga @@ -75,7 +73,6 @@ private: boost::condition_variable m_CV; bool m_Stopped{false}; std::thread m_Thread; - std::atomic m_RunningChecks; CheckableSet m_IdleCheckables; CheckableSet m_PendingCheckables; diff --git a/lib/checker/checkercomponent.ti b/lib/checker/checkercomponent.ti index 0bde3dc80..433940313 100644 --- a/lib/checker/checkercomponent.ti +++ b/lib/checker/checkercomponent.ti @@ -9,7 +9,7 @@ namespace icinga class CheckerComponent : ConfigObject { - activation_priority 100; + activation_priority 300; [config, no_storage] int concurrent_checks { get {{{ diff --git a/lib/db_ido/servicedbobject.cpp b/lib/db_ido/servicedbobject.cpp index bbf3e21aa..b31b1fc4c 100644 --- a/lib/db_ido/servicedbobject.cpp +++ b/lib/db_ido/servicedbobject.cpp @@ -183,9 +183,6 @@ void ServiceDbObject::OnConfigUpdateHeavy() DbObject::OnMultipleQueries(queries); /* service dependencies */ - Log(LogDebug, "ServiceDbObject") - << "service dependencies for '" << service->GetName() << "'"; - queries.clear(); DbQuery query2; @@ -233,9 +230,6 @@ void ServiceDbObject::OnConfigUpdateHeavy() DbObject::OnMultipleQueries(queries); /* service contacts, contactgroups */ - Log(LogDebug, "ServiceDbObject") - << "service contacts: " << service->GetName(); - queries.clear(); DbQuery query3; @@ -248,9 +242,6 @@ void ServiceDbObject::OnConfigUpdateHeavy() queries.emplace_back(std::move(query3)); for (const User::Ptr& user : CompatUtility::GetCheckableNotificationUsers(service)) { - Log(LogDebug, "ServiceDbObject") - << "service contacts: " << user->GetName(); - DbQuery query_contact; query_contact.Table = GetType()->GetTable() + "_contacts"; query_contact.Type = DbQueryInsert; @@ -266,9 +257,6 @@ void ServiceDbObject::OnConfigUpdateHeavy() DbObject::OnMultipleQueries(queries); - Log(LogDebug, "ServiceDbObject") - << "service contactgroups: " << service->GetName(); - queries.clear(); DbQuery query4; @@ -281,9 +269,6 @@ void ServiceDbObject::OnConfigUpdateHeavy() queries.emplace_back(std::move(query4)); for (const UserGroup::Ptr& usergroup : CompatUtility::GetCheckableNotificationUserGroups(service)) { - Log(LogDebug, "ServiceDbObject") - << "service contactgroups: " << usergroup->GetName(); - DbQuery query_contact; query_contact.Table = GetType()->GetTable() + "_contactgroups"; query_contact.Type = DbQueryInsert; diff --git a/lib/db_ido_mysql/idomysqlconnection.cpp b/lib/db_ido_mysql/idomysqlconnection.cpp index 10c47666e..dcef1618b 100644 --- a/lib/db_ido_mysql/idomysqlconnection.cpp +++ b/lib/db_ido_mysql/idomysqlconnection.cpp @@ -90,9 +90,6 @@ void IdoMysqlConnection::Resume() void IdoMysqlConnection::Pause() { - Log(LogInformation, "IdoMysqlConnection") - << "'" << GetName() << "' paused."; - m_ReconnectTimer.reset(); DbConnection::Pause(); @@ -102,8 +99,12 @@ void IdoMysqlConnection::Pause() << "Rescheduling disconnect task."; #endif /* I2_DEBUG */ - m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Disconnect, this), PriorityHigh); + m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Disconnect, this), PriorityLow); m_QueryQueue.Join(); + + Log(LogInformation, "IdoMysqlConnection") + << "'" << GetName() << "' paused."; + } void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp) @@ -175,7 +176,7 @@ void IdoMysqlConnection::ReconnectTimerHandler() << "Scheduling reconnect task."; #endif /* I2_DEBUG */ - m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Reconnect, this), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Reconnect, this), PriorityHigh); } void IdoMysqlConnection::Reconnect() @@ -434,9 +435,9 @@ void IdoMysqlConnection::Reconnect() << "Scheduling session table clear and finish connect task."; #endif /* I2_DEBUG */ - m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::ClearTablesBySession, this), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::ClearTablesBySession, this), PriorityHigh); - m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::FinishConnect, this, startTime), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::FinishConnect, this, startTime), PriorityHigh); } void IdoMysqlConnection::FinishConnect(double startTime) @@ -709,7 +710,7 @@ void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj) << "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'."; #endif /* I2_DEBUG */ - m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityHigh); } void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) @@ -754,7 +755,7 @@ void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) << "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'."; #endif /* I2_DEBUG */ - m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityHigh); } void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) diff --git a/lib/db_ido_pgsql/idopgsqlconnection.cpp b/lib/db_ido_pgsql/idopgsqlconnection.cpp index 81e646ebd..25ce08798 100644 --- a/lib/db_ido_pgsql/idopgsqlconnection.cpp +++ b/lib/db_ido_pgsql/idopgsqlconnection.cpp @@ -97,15 +97,16 @@ void IdoPgsqlConnection::Resume() void IdoPgsqlConnection::Pause() { - Log(LogInformation, "IdoPgsqlConnection") - << "'" << GetName() << "' paused."; - m_ReconnectTimer.reset(); DbConnection::Pause(); - m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Disconnect, this), PriorityHigh); + m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Disconnect, this), PriorityLow); m_QueryQueue.Join(); + + Log(LogInformation, "IdoPgsqlConnection") + << "'" << GetName() << "' paused."; + } void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp) @@ -165,7 +166,7 @@ void IdoPgsqlConnection::InternalNewTransaction() void IdoPgsqlConnection::ReconnectTimerHandler() { - m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityHigh); } void IdoPgsqlConnection::Reconnect() @@ -408,9 +409,9 @@ void IdoPgsqlConnection::Reconnect() UpdateAllObjects(); - m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityHigh); - m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityHigh); } void IdoPgsqlConnection::FinishConnect(double startTime) @@ -558,7 +559,7 @@ void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj) if (IsPaused()) return; - m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityHigh); } void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) @@ -595,7 +596,7 @@ void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) if (IsPaused()) return; - m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow); + m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityHigh); } void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) diff --git a/lib/icinga/downtime.ti b/lib/icinga/downtime.ti index 1e8ec0a75..a55942fb0 100644 --- a/lib/icinga/downtime.ti +++ b/lib/icinga/downtime.ti @@ -20,6 +20,8 @@ public: class Downtime : ConfigObject < DowntimeNameComposer { + activation_priority -10; + load_after Host; load_after Service; diff --git a/lib/icinga/icingaapplication.ti b/lib/icinga/icingaapplication.ti index 867efd3bd..1cdef7406 100644 --- a/lib/icinga/icingaapplication.ti +++ b/lib/icinga/icingaapplication.ti @@ -9,6 +9,8 @@ namespace icinga class IcingaApplication : Application { + activation_priority -50; + [config, no_storage, virtual] String environment { get; set; diff --git a/lib/notification/notificationcomponent.ti b/lib/notification/notificationcomponent.ti index 7c3ff13ff..13af13691 100644 --- a/lib/notification/notificationcomponent.ti +++ b/lib/notification/notificationcomponent.ti @@ -9,7 +9,7 @@ namespace icinga class NotificationComponent : ConfigObject { - activation_priority 100; + activation_priority 200; [config] bool enable_ha (EnableHA) { default {{{ return true; }}} diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp index c794d38a3..53ed48f97 100644 --- a/lib/perfdata/elasticsearchwriter.cpp +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -87,13 +87,16 @@ void ElasticsearchWriter::Resume() Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7)); } +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void ElasticsearchWriter::Pause() { + Flush(); + m_WorkQueue.Join(); + Flush(); + Log(LogInformation, "ElasticsearchWriter") << "'" << GetName() << "' paused."; - m_WorkQueue.Join(); - ObjectImpl::Pause(); } diff --git a/lib/perfdata/elasticsearchwriter.ti b/lib/perfdata/elasticsearchwriter.ti index c3e7fc046..7cf604fb9 100644 --- a/lib/perfdata/elasticsearchwriter.ti +++ b/lib/perfdata/elasticsearchwriter.ti @@ -7,7 +7,7 @@ namespace icinga class ElasticsearchWriter : ConfigObject { - activation_priority 80; + activation_priority 100; [config, required] String host { default {{{ return "127.0.0.1"; }}} diff --git a/lib/perfdata/gelfwriter.cpp b/lib/perfdata/gelfwriter.cpp index 6299c279c..d028c5dbd 100644 --- a/lib/perfdata/gelfwriter.cpp +++ b/lib/perfdata/gelfwriter.cpp @@ -90,12 +90,26 @@ void GelfWriter::Resume() Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3)); } +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void GelfWriter::Pause() { - Log(LogInformation, "GelfWriter") - << "'" << GetName() << "' paused."; + m_ReconnectTimer.reset(); + + try { + ReconnectInternal(); + } catch (const std::exception&) { + Log(LogInformation, "GelfWriter") + << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; + + ObjectImpl::Pause(); + return; + } m_WorkQueue.Join(); + DisconnectInternal(); + + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' paused."; ObjectImpl::Pause(); } @@ -128,6 +142,11 @@ void GelfWriter::Reconnect() return; } + ReconnectInternal(); +} + +void GelfWriter::ReconnectInternal() +{ double startTime = Utility::GetTime(); CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'"); @@ -167,6 +186,11 @@ void GelfWriter::Disconnect() { AssertOnWorkQueue(); + DisconnectInternal(); +} + +void GelfWriter::DisconnectInternal() +{ if (!GetConnected()) return; diff --git a/lib/perfdata/gelfwriter.hpp b/lib/perfdata/gelfwriter.hpp index ce2c72a58..167f523d5 100644 --- a/lib/perfdata/gelfwriter.hpp +++ b/lib/perfdata/gelfwriter.hpp @@ -55,7 +55,9 @@ private: void ReconnectTimerHandler(); void Disconnect(); + void DisconnectInternal(); void Reconnect(); + void ReconnectInternal(); void AssertOnWorkQueue(); diff --git a/lib/perfdata/gelfwriter.ti b/lib/perfdata/gelfwriter.ti index 08994964c..a24c28e1c 100644 --- a/lib/perfdata/gelfwriter.ti +++ b/lib/perfdata/gelfwriter.ti @@ -9,7 +9,7 @@ namespace icinga class GelfWriter : ConfigObject { - activation_priority 80; + activation_priority 100; [config] String host { default {{{ return "127.0.0.1"; }}} diff --git a/lib/perfdata/graphitewriter.cpp b/lib/perfdata/graphitewriter.cpp index fb3c49389..b0b014260 100644 --- a/lib/perfdata/graphitewriter.cpp +++ b/lib/perfdata/graphitewriter.cpp @@ -85,12 +85,26 @@ void GraphiteWriter::Resume() Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2)); } +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void GraphiteWriter::Pause() { - Log(LogInformation, "GraphiteWriter") - << "'" << GetName() << "' paused."; + m_ReconnectTimer.reset(); + + try { + ReconnectInternal(); + } catch (const std::exception&) { + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload."; + + ObjectImpl::Pause(); + return; + } m_WorkQueue.Join(); + DisconnectInternal(); + + Log(LogInformation, "GraphiteWriter") + << "'" << GetName() << "' paused."; ObjectImpl::Pause(); } @@ -123,6 +137,11 @@ void GraphiteWriter::Reconnect() return; } + ReconnectInternal(); +} + +void GraphiteWriter::ReconnectInternal() +{ double startTime = Utility::GetTime(); CONTEXT("Reconnecting to Graphite '" + GetName() + "'"); @@ -155,6 +174,9 @@ void GraphiteWriter::Reconnect() void GraphiteWriter::ReconnectTimerHandler() { + if (IsPaused()) + return; + m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal); } @@ -162,6 +184,11 @@ void GraphiteWriter::Disconnect() { AssertOnWorkQueue(); + DisconnectInternal(); +} + +void GraphiteWriter::DisconnectInternal() +{ if (!GetConnected()) return; @@ -184,6 +211,10 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, CONTEXT("Processing check result for '" + checkable->GetName() + "'"); + /* TODO: Deal with missing connection here. Needs refactoring + * into parsing the actual performance data and then putting it + * into a queue for re-inserting. */ + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) return; @@ -276,7 +307,7 @@ void GraphiteWriter::SendMetric(const String& prefix, const String& name, double msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast(ts); Log(LogDebug, "GraphiteWriter") - << "Add to metric list:'" << msgbuf.str() << "'."; + << "Add to metric list: '" << msgbuf.str() << "'."; // do not send \n to debug log msgbuf << "\n"; diff --git a/lib/perfdata/graphitewriter.hpp b/lib/perfdata/graphitewriter.hpp index 808f4fb1b..8db8e49c7 100644 --- a/lib/perfdata/graphitewriter.hpp +++ b/lib/perfdata/graphitewriter.hpp @@ -54,7 +54,9 @@ private: void ReconnectTimerHandler(); void Disconnect(); + void DisconnectInternal(); void Reconnect(); + void ReconnectInternal(); void AssertOnWorkQueue(); diff --git a/lib/perfdata/graphitewriter.ti b/lib/perfdata/graphitewriter.ti index 6f79e2559..94f1b185c 100644 --- a/lib/perfdata/graphitewriter.ti +++ b/lib/perfdata/graphitewriter.ti @@ -9,7 +9,7 @@ namespace icinga class GraphiteWriter : ConfigObject { - activation_priority 80; + activation_priority 100; [config] String host { default {{{ return "127.0.0.1"; }}} diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index c9abbdd45..773f21a2a 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -113,24 +113,33 @@ void InfluxdbWriter::Resume() Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2)); } +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void InfluxdbWriter::Pause() { + /* Force a flush. */ + Log(LogDebug, "InfluxdbWriter") + << "Flushing pending data buffers."; + + Flush(); + + /* Work on the missing tasks. TODO: Find a way to cache them on disk. */ + Log(LogDebug, "InfluxdbWriter") + << "Joining existing WQ tasks."; + + m_WorkQueue.Join(); + + /* Flush again after the WQ tasks have filled the data buffer. */ + Log(LogDebug, "InfluxdbWriter") + << "Flushing data buffers from WQ tasks."; + + Flush(); + Log(LogInformation, "InfluxdbWriter") << "'" << GetName() << "' paused."; - m_WorkQueue.Join(); - ObjectImpl::Pause(); } -void InfluxdbWriter::Stop(bool runtimeDeleted) -{ - FlushTimeout(); - m_WorkQueue.Join(); - - ObjectImpl::Stop(runtimeDeleted); -} - void InfluxdbWriter::AssertOnWorkQueue() { ASSERT(m_WorkQueue.IsWorkerThread()); @@ -419,6 +428,9 @@ void InfluxdbWriter::FlushTimeoutWQ() void InfluxdbWriter::Flush() { + Log(LogDebug, "InfluxdbWriter") + << "Flushing data buffer to InfluxDB."; + String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index f2a8e1f7f..fa2c1f02b 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -34,7 +34,6 @@ protected: void OnConfigLoaded() override; void Resume() override; void Pause() override; - void Stop(bool runtimeDeleted) override; private: WorkQueue m_WorkQueue{10000000, 1}; diff --git a/lib/perfdata/influxdbwriter.ti b/lib/perfdata/influxdbwriter.ti index 6057bd28a..30bc071c6 100644 --- a/lib/perfdata/influxdbwriter.ti +++ b/lib/perfdata/influxdbwriter.ti @@ -9,7 +9,7 @@ namespace icinga class InfluxdbWriter : ConfigObject { - activation_priority 80; + activation_priority 100; [config, required] String host { default {{{ return "127.0.0.1"; }}} diff --git a/lib/perfdata/opentsdbwriter.cpp b/lib/perfdata/opentsdbwriter.cpp index ab66d213e..df4583685 100644 --- a/lib/perfdata/opentsdbwriter.cpp +++ b/lib/perfdata/opentsdbwriter.cpp @@ -68,8 +68,11 @@ void OpenTsdbWriter::Resume() Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2)); } +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ void OpenTsdbWriter::Pause() { + m_ReconnectTimer.reset(); + Log(LogInformation, "OpentsdbWriter") << "'" << GetName() << "' paused."; diff --git a/lib/perfdata/opentsdbwriter.ti b/lib/perfdata/opentsdbwriter.ti index 82bf4e193..ac6d132d6 100644 --- a/lib/perfdata/opentsdbwriter.ti +++ b/lib/perfdata/opentsdbwriter.ti @@ -9,7 +9,7 @@ namespace icinga class OpenTsdbWriter : ConfigObject { - activation_priority 80; + activation_priority 100; [config] String host { default {{{ return "127.0.0.1"; }}} diff --git a/lib/perfdata/perfdatawriter.cpp b/lib/perfdata/perfdatawriter.cpp index 1f1ce7536..78d44cff1 100644 --- a/lib/perfdata/perfdatawriter.cpp +++ b/lib/perfdata/perfdatawriter.cpp @@ -66,6 +66,16 @@ void PerfdataWriter::Resume() void PerfdataWriter::Pause() { + m_RotationTimer.reset(); + +#ifdef I2_DEBUG + //m_HostOutputFile << "\n# Pause the feature" << "\n\n"; + //m_ServiceOutputFile << "\n# Pause the feature" << "\n\n"; +#endif /* I2_DEBUG */ + + /* Force a rotation closing the file stream. */ + RotateAllFiles(); + Log(LogInformation, "PerfdataWriter") << "'" << GetName() << "' paused."; @@ -108,7 +118,8 @@ void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String line = MacroProcessor::ResolveMacros(GetServiceFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric); { - ObjectLock olock(this); + boost::mutex::scoped_lock lock(m_StreamMutex); + if (!m_ServiceOutputFile.good()) return; @@ -118,7 +129,8 @@ void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C String line = MacroProcessor::ResolveMacros(GetHostFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric); { - ObjectLock olock(this); + boost::mutex::scoped_lock lock(m_StreamMutex); + if (!m_HostOutputFile.good()) return; @@ -129,13 +141,20 @@ void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path) { - ObjectLock olock(this); + Log(LogDebug, "PerfdataWriter") + << "Rotating perfdata files."; + + boost::mutex::scoped_lock lock(m_StreamMutex); if (output.good()) { output.close(); if (Utility::PathExists(temp_path)) { String finalFile = perfdata_path + "." + Convert::ToString((long)Utility::GetTime()); + + Log(LogDebug, "PerfdataWriter") + << "Closed output file and renaming into '" << finalFile << "'."; + if (rename(temp_path.CStr(), finalFile.CStr()) < 0) { BOOST_THROW_EXCEPTION(posix_error() << boost::errinfo_api_function("rename") @@ -147,9 +166,10 @@ void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path, output.open(temp_path.CStr()); - if (!output.good()) + if (!output.good()) { Log(LogWarning, "PerfdataWriter") << "Could not open perfdata file '" << temp_path << "' for writing. Perfdata will be lost."; + } } void PerfdataWriter::RotationTimerHandler() @@ -157,6 +177,11 @@ void PerfdataWriter::RotationTimerHandler() if (IsPaused()) return; + RotateAllFiles(); +} + +void PerfdataWriter::RotateAllFiles() +{ RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath()); RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath()); } diff --git a/lib/perfdata/perfdatawriter.hpp b/lib/perfdata/perfdatawriter.hpp index 4f5f1b1ec..5b6e51ea4 100644 --- a/lib/perfdata/perfdatawriter.hpp +++ b/lib/perfdata/perfdatawriter.hpp @@ -34,14 +34,16 @@ protected: void Pause() override; private: + Timer::Ptr m_RotationTimer; + std::ofstream m_ServiceOutputFile; + std::ofstream m_HostOutputFile; + boost::mutex m_StreamMutex; + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); static Value EscapeMacroMetric(const Value& value); - Timer::Ptr m_RotationTimer; void RotationTimerHandler(); - - std::ofstream m_ServiceOutputFile; - std::ofstream m_HostOutputFile; + void RotateAllFiles(); void RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path); }; diff --git a/lib/perfdata/perfdatawriter.ti b/lib/perfdata/perfdatawriter.ti index 9b4835a80..b0dbb6498 100644 --- a/lib/perfdata/perfdatawriter.ti +++ b/lib/perfdata/perfdatawriter.ti @@ -10,7 +10,7 @@ namespace icinga class PerfdataWriter : ConfigObject { - activation_priority 80; + activation_priority 100; [config] String host_perfdata_path { default {{{ return Configuration::SpoolDir + "/perfdata/host-perfdata"; }}} diff --git a/lib/remote/configpackageutility.cpp b/lib/remote/configpackageutility.cpp index 373aa4846..c35dd1163 100644 --- a/lib/remote/configpackageutility.cpp +++ b/lib/remote/configpackageutility.cpp @@ -218,7 +218,7 @@ void ConfigPackageUtility::AsyncTryActivateStage(const String& packageName, cons args->Add("ActiveStageOverride=" + packageName + ":" + stageName); Process::Ptr process = new Process(Process::PrepareCommand(args)); - process->SetTimeout(300); + process->SetTimeout(Configuration::ReloadTimeout); process->Run(std::bind(&TryActivateStageCallback, _1, packageName, stageName, reload)); }