mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-27 07:34:15 +02:00
Merge pull request #6970 from Icinga/bugfix/perfdata-gaps
Improve reload handling for features (metric & queue flush, activation priority)
This commit is contained in:
commit
e2df11520e
@ -440,6 +440,7 @@ Constant | Description
|
|||||||
--------------------|-------------------
|
--------------------|-------------------
|
||||||
Vars |**Read-write.** Contains a dictionary with global custom attributes. Not set by default.
|
Vars |**Read-write.** Contains a dictionary with global custom attributes. Not set by default.
|
||||||
NodeName |**Read-write.** Contains the cluster node name. Set to the local hostname by default.
|
NodeName |**Read-write.** Contains the cluster node name. Set to the local hostname by default.
|
||||||
|
ReloadTimeout |**Read-write.** Defines the reload timeout for child processes. Defaults to `300s`.
|
||||||
Environment |**Read-write.** The name of the Icinga environment. Included in the SNI host name for outbound connections. Not set by default.
|
Environment |**Read-write.** The name of the Icinga environment. Included in the SNI host name for outbound connections. Not set by default.
|
||||||
RunAsUser |**Read-write.** Defines the user the Icinga 2 daemon is running as. Set in the Icinga 2 sysconfig.
|
RunAsUser |**Read-write.** Defines the user the Icinga 2 daemon is running as. Set in the Icinga 2 sysconfig.
|
||||||
RunAsGroup |**Read-write.** Defines the group the Icinga 2 daemon is running as. Set in the Icinga 2 sysconfig.
|
RunAsGroup |**Read-write.** Defines the group the Icinga 2 daemon is running as. Set in the Icinga 2 sysconfig.
|
||||||
|
@ -60,6 +60,14 @@ void Application::OnConfigLoaded()
|
|||||||
|
|
||||||
ASSERT(m_Instance == nullptr);
|
ASSERT(m_Instance == nullptr);
|
||||||
m_Instance = this;
|
m_Instance = this;
|
||||||
|
|
||||||
|
String reloadTimeout;
|
||||||
|
|
||||||
|
if (ScriptGlobal::Exists("ReloadTimeout"))
|
||||||
|
reloadTimeout = ScriptGlobal::Get("ReloadTimeout");
|
||||||
|
|
||||||
|
if (!reloadTimeout.IsEmpty())
|
||||||
|
Configuration::ReloadTimeout = Convert::ToDouble(reloadTimeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -384,8 +392,6 @@ static void ReloadProcessCallback(const ProcessResult& pr)
|
|||||||
|
|
||||||
pid_t Application::StartReloadProcess()
|
pid_t Application::StartReloadProcess()
|
||||||
{
|
{
|
||||||
Log(LogInformation, "Application", "Got reload command: Starting new instance.");
|
|
||||||
|
|
||||||
// prepare arguments
|
// prepare arguments
|
||||||
ArrayData args;
|
ArrayData args;
|
||||||
args.push_back(GetExePath(m_ArgV[0]));
|
args.push_back(GetExePath(m_ArgV[0]));
|
||||||
@ -405,9 +411,14 @@ pid_t Application::StartReloadProcess()
|
|||||||
#endif /* _WIN32 */
|
#endif /* _WIN32 */
|
||||||
|
|
||||||
Process::Ptr process = new Process(Process::PrepareCommand(new Array(std::move(args))));
|
Process::Ptr process = new Process(Process::PrepareCommand(new Array(std::move(args))));
|
||||||
process->SetTimeout(300);
|
process->SetTimeout(Configuration::ReloadTimeout);
|
||||||
process->Run(&ReloadProcessCallback);
|
process->Run(&ReloadProcessCallback);
|
||||||
|
|
||||||
|
Log(LogInformation, "Application")
|
||||||
|
<< "Got reload command: Started new instance with PID '"
|
||||||
|
<< (unsigned long)(process->GetPID()) << "' (timeout is "
|
||||||
|
<< Configuration::ReloadTimeout << "s).";
|
||||||
|
|
||||||
return process->GetPID();
|
return process->GetPID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
#include "base/workqueue.hpp"
|
#include "base/workqueue.hpp"
|
||||||
#include "base/context.hpp"
|
#include "base/context.hpp"
|
||||||
#include "base/application.hpp"
|
#include "base/application.hpp"
|
||||||
#include <algorithm>
|
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
#include <boost/exception/errinfo_api_function.hpp>
|
#include <boost/exception/errinfo_api_function.hpp>
|
||||||
#include <boost/exception/errinfo_errno.hpp>
|
#include <boost/exception/errinfo_errno.hpp>
|
||||||
@ -601,10 +600,12 @@ void ConfigObject::RestoreObjects(const String& filename, int attributeTypes)
|
|||||||
|
|
||||||
void ConfigObject::StopObjects()
|
void ConfigObject::StopObjects()
|
||||||
{
|
{
|
||||||
auto types = Type::GetAllTypes();
|
std::vector<Type::Ptr> types = Type::GetAllTypes();
|
||||||
|
|
||||||
std::sort(types.begin(), types.end(), [](const Type::Ptr& a, const Type::Ptr& b) {
|
std::sort(types.begin(), types.end(), [](const Type::Ptr& a, const Type::Ptr& b) {
|
||||||
return a->GetActivationPriority() > b->GetActivationPriority();
|
if (a->GetActivationPriority() > b->GetActivationPriority())
|
||||||
|
return true;
|
||||||
|
return false;
|
||||||
});
|
});
|
||||||
|
|
||||||
for (const Type::Ptr& type : types) {
|
for (const Type::Ptr& type : types) {
|
||||||
|
@ -25,6 +25,7 @@ String Configuration::PidPath;
|
|||||||
String Configuration::PkgDataDir;
|
String Configuration::PkgDataDir;
|
||||||
String Configuration::PrefixDir;
|
String Configuration::PrefixDir;
|
||||||
String Configuration::ProgramData;
|
String Configuration::ProgramData;
|
||||||
|
double Configuration::ReloadTimeout{300};
|
||||||
int Configuration::RLimitFiles;
|
int Configuration::RLimitFiles;
|
||||||
int Configuration::RLimitProcesses;
|
int Configuration::RLimitProcesses;
|
||||||
int Configuration::RLimitStack;
|
int Configuration::RLimitStack;
|
||||||
@ -223,6 +224,16 @@ void Configuration::SetProgramData(const String& val, bool suppress_events, cons
|
|||||||
HandleUserWrite("ProgramData", &Configuration::ProgramData, val, m_ReadOnly);
|
HandleUserWrite("ProgramData", &Configuration::ProgramData, val, m_ReadOnly);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double Configuration::GetReloadTimeout() const
|
||||||
|
{
|
||||||
|
return Configuration::ReloadTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
void Configuration::SetReloadTimeout(double val, bool suppress_events, const Value& cookie)
|
||||||
|
{
|
||||||
|
HandleUserWrite("ReloadTimeout", &Configuration::ReloadTimeout, val, m_ReadOnly);
|
||||||
|
}
|
||||||
|
|
||||||
int Configuration::GetRLimitFiles() const
|
int Configuration::GetRLimitFiles() const
|
||||||
{
|
{
|
||||||
return Configuration::RLimitFiles;
|
return Configuration::RLimitFiles;
|
||||||
|
@ -70,6 +70,9 @@ public:
|
|||||||
String GetProgramData() const override;
|
String GetProgramData() const override;
|
||||||
void SetProgramData(const String& value, bool suppress_events = false, const Value& cookie = Empty) override;
|
void SetProgramData(const String& value, bool suppress_events = false, const Value& cookie = Empty) override;
|
||||||
|
|
||||||
|
double GetReloadTimeout() const override;
|
||||||
|
void SetReloadTimeout(double value, bool suppress_events = false, const Value& cookie = Empty) override;
|
||||||
|
|
||||||
int GetRLimitFiles() const override;
|
int GetRLimitFiles() const override;
|
||||||
void SetRLimitFiles(int value, bool suppress_events = false, const Value& cookie = Empty) override;
|
void SetRLimitFiles(int value, bool suppress_events = false, const Value& cookie = Empty) override;
|
||||||
|
|
||||||
@ -130,6 +133,7 @@ public:
|
|||||||
static String PkgDataDir;
|
static String PkgDataDir;
|
||||||
static String PrefixDir;
|
static String PrefixDir;
|
||||||
static String ProgramData;
|
static String ProgramData;
|
||||||
|
static double ReloadTimeout;
|
||||||
static int RLimitFiles;
|
static int RLimitFiles;
|
||||||
static int RLimitProcesses;
|
static int RLimitProcesses;
|
||||||
static int RLimitStack;
|
static int RLimitStack;
|
||||||
|
@ -94,6 +94,11 @@ abstract class Configuration
|
|||||||
set;
|
set;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
[config, no_storage, virtual] double ReloadTimeout {
|
||||||
|
get;
|
||||||
|
set;
|
||||||
|
};
|
||||||
|
|
||||||
[config, no_storage, virtual] int RLimitFiles {
|
[config, no_storage, virtual] int RLimitFiles {
|
||||||
get;
|
get;
|
||||||
set;
|
set;
|
||||||
|
@ -6,7 +6,6 @@
|
|||||||
#include "icinga/cib.hpp"
|
#include "icinga/cib.hpp"
|
||||||
#include "remote/apilistener.hpp"
|
#include "remote/apilistener.hpp"
|
||||||
#include "base/configtype.hpp"
|
#include "base/configtype.hpp"
|
||||||
#include "base/defer.hpp"
|
|
||||||
#include "base/objectlock.hpp"
|
#include "base/objectlock.hpp"
|
||||||
#include "base/utility.hpp"
|
#include "base/utility.hpp"
|
||||||
#include "base/perfdatavalue.hpp"
|
#include "base/perfdatavalue.hpp"
|
||||||
@ -57,7 +56,6 @@ void CheckerComponent::Start(bool runtimeCreated)
|
|||||||
Log(LogInformation, "CheckerComponent")
|
Log(LogInformation, "CheckerComponent")
|
||||||
<< "'" << GetName() << "' started.";
|
<< "'" << GetName() << "' started.";
|
||||||
|
|
||||||
m_RunningChecks.store(0);
|
|
||||||
|
|
||||||
m_Thread = std::thread(std::bind(&CheckerComponent::CheckThreadProc, this));
|
m_Thread = std::thread(std::bind(&CheckerComponent::CheckThreadProc, this));
|
||||||
|
|
||||||
@ -75,13 +73,32 @@ void CheckerComponent::Stop(bool runtimeRemoved)
|
|||||||
m_CV.notify_all();
|
m_CV.notify_all();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
double wait = 0.0;
|
||||||
|
|
||||||
|
while (GetPendingCheckables() > 0) {
|
||||||
|
Log(LogDebug, "CheckerComponent")
|
||||||
|
<< "Waiting for running checks (" << GetPendingCheckables()
|
||||||
|
<< ") to finish. Waited for " << wait << " seconds now.";
|
||||||
|
|
||||||
|
Utility::Sleep(0.1);
|
||||||
|
wait += 0.1;
|
||||||
|
|
||||||
|
/* Pick a timeout slightly shorther than the process reload timeout. */
|
||||||
|
double waitMax = Configuration::ReloadTimeout - 30;
|
||||||
|
if (waitMax <= 0)
|
||||||
|
waitMax = 1;
|
||||||
|
|
||||||
|
if (wait > waitMax) {
|
||||||
|
Log(LogWarning, "CheckerComponent")
|
||||||
|
<< "Checks running too long for " << wait
|
||||||
|
<< " seconds, hard shutdown before reload timeout: " << Configuration::ReloadTimeout << ".";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
m_ResultTimer->Stop();
|
m_ResultTimer->Stop();
|
||||||
m_Thread.join();
|
m_Thread.join();
|
||||||
|
|
||||||
while (m_RunningChecks.load()) {
|
|
||||||
Utility::Sleep(1.0 / 60.0);
|
|
||||||
}
|
|
||||||
|
|
||||||
Log(LogInformation, "CheckerComponent")
|
Log(LogInformation, "CheckerComponent")
|
||||||
<< "'" << GetName() << "' stopped.";
|
<< "'" << GetName() << "' stopped.";
|
||||||
|
|
||||||
@ -196,8 +213,6 @@ void CheckerComponent::CheckThreadProc()
|
|||||||
|
|
||||||
Checkable::IncreasePendingChecks();
|
Checkable::IncreasePendingChecks();
|
||||||
|
|
||||||
m_RunningChecks.fetch_add(1);
|
|
||||||
|
|
||||||
Utility::QueueAsyncCallback(std::bind(&CheckerComponent::ExecuteCheckHelper, CheckerComponent::Ptr(this), checkable));
|
Utility::QueueAsyncCallback(std::bind(&CheckerComponent::ExecuteCheckHelper, CheckerComponent::Ptr(this), checkable));
|
||||||
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
@ -206,10 +221,6 @@ void CheckerComponent::CheckThreadProc()
|
|||||||
|
|
||||||
void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable)
|
void CheckerComponent::ExecuteCheckHelper(const Checkable::Ptr& checkable)
|
||||||
{
|
{
|
||||||
Defer decrementRunningChecks ([this]{
|
|
||||||
m_RunningChecks.fetch_sub(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
checkable->ExecuteCheck();
|
checkable->ExecuteCheck();
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
|
@ -8,13 +8,11 @@
|
|||||||
#include "base/configobject.hpp"
|
#include "base/configobject.hpp"
|
||||||
#include "base/timer.hpp"
|
#include "base/timer.hpp"
|
||||||
#include "base/utility.hpp"
|
#include "base/utility.hpp"
|
||||||
#include <atomic>
|
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
#include <boost/multi_index_container.hpp>
|
#include <boost/multi_index_container.hpp>
|
||||||
#include <boost/multi_index/ordered_index.hpp>
|
#include <boost/multi_index/ordered_index.hpp>
|
||||||
#include <boost/multi_index/key_extractors.hpp>
|
#include <boost/multi_index/key_extractors.hpp>
|
||||||
#include <cstdint>
|
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
|
||||||
namespace icinga
|
namespace icinga
|
||||||
@ -75,7 +73,6 @@ private:
|
|||||||
boost::condition_variable m_CV;
|
boost::condition_variable m_CV;
|
||||||
bool m_Stopped{false};
|
bool m_Stopped{false};
|
||||||
std::thread m_Thread;
|
std::thread m_Thread;
|
||||||
std::atomic<uintmax_t> m_RunningChecks;
|
|
||||||
|
|
||||||
CheckableSet m_IdleCheckables;
|
CheckableSet m_IdleCheckables;
|
||||||
CheckableSet m_PendingCheckables;
|
CheckableSet m_PendingCheckables;
|
||||||
|
@ -9,7 +9,7 @@ namespace icinga
|
|||||||
|
|
||||||
class CheckerComponent : ConfigObject
|
class CheckerComponent : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 100;
|
activation_priority 300;
|
||||||
|
|
||||||
[config, no_storage] int concurrent_checks {
|
[config, no_storage] int concurrent_checks {
|
||||||
get {{{
|
get {{{
|
||||||
|
@ -183,9 +183,6 @@ void ServiceDbObject::OnConfigUpdateHeavy()
|
|||||||
DbObject::OnMultipleQueries(queries);
|
DbObject::OnMultipleQueries(queries);
|
||||||
|
|
||||||
/* service dependencies */
|
/* service dependencies */
|
||||||
Log(LogDebug, "ServiceDbObject")
|
|
||||||
<< "service dependencies for '" << service->GetName() << "'";
|
|
||||||
|
|
||||||
queries.clear();
|
queries.clear();
|
||||||
|
|
||||||
DbQuery query2;
|
DbQuery query2;
|
||||||
@ -233,9 +230,6 @@ void ServiceDbObject::OnConfigUpdateHeavy()
|
|||||||
DbObject::OnMultipleQueries(queries);
|
DbObject::OnMultipleQueries(queries);
|
||||||
|
|
||||||
/* service contacts, contactgroups */
|
/* service contacts, contactgroups */
|
||||||
Log(LogDebug, "ServiceDbObject")
|
|
||||||
<< "service contacts: " << service->GetName();
|
|
||||||
|
|
||||||
queries.clear();
|
queries.clear();
|
||||||
|
|
||||||
DbQuery query3;
|
DbQuery query3;
|
||||||
@ -248,9 +242,6 @@ void ServiceDbObject::OnConfigUpdateHeavy()
|
|||||||
queries.emplace_back(std::move(query3));
|
queries.emplace_back(std::move(query3));
|
||||||
|
|
||||||
for (const User::Ptr& user : CompatUtility::GetCheckableNotificationUsers(service)) {
|
for (const User::Ptr& user : CompatUtility::GetCheckableNotificationUsers(service)) {
|
||||||
Log(LogDebug, "ServiceDbObject")
|
|
||||||
<< "service contacts: " << user->GetName();
|
|
||||||
|
|
||||||
DbQuery query_contact;
|
DbQuery query_contact;
|
||||||
query_contact.Table = GetType()->GetTable() + "_contacts";
|
query_contact.Table = GetType()->GetTable() + "_contacts";
|
||||||
query_contact.Type = DbQueryInsert;
|
query_contact.Type = DbQueryInsert;
|
||||||
@ -266,9 +257,6 @@ void ServiceDbObject::OnConfigUpdateHeavy()
|
|||||||
|
|
||||||
DbObject::OnMultipleQueries(queries);
|
DbObject::OnMultipleQueries(queries);
|
||||||
|
|
||||||
Log(LogDebug, "ServiceDbObject")
|
|
||||||
<< "service contactgroups: " << service->GetName();
|
|
||||||
|
|
||||||
queries.clear();
|
queries.clear();
|
||||||
|
|
||||||
DbQuery query4;
|
DbQuery query4;
|
||||||
@ -281,9 +269,6 @@ void ServiceDbObject::OnConfigUpdateHeavy()
|
|||||||
queries.emplace_back(std::move(query4));
|
queries.emplace_back(std::move(query4));
|
||||||
|
|
||||||
for (const UserGroup::Ptr& usergroup : CompatUtility::GetCheckableNotificationUserGroups(service)) {
|
for (const UserGroup::Ptr& usergroup : CompatUtility::GetCheckableNotificationUserGroups(service)) {
|
||||||
Log(LogDebug, "ServiceDbObject")
|
|
||||||
<< "service contactgroups: " << usergroup->GetName();
|
|
||||||
|
|
||||||
DbQuery query_contact;
|
DbQuery query_contact;
|
||||||
query_contact.Table = GetType()->GetTable() + "_contactgroups";
|
query_contact.Table = GetType()->GetTable() + "_contactgroups";
|
||||||
query_contact.Type = DbQueryInsert;
|
query_contact.Type = DbQueryInsert;
|
||||||
|
@ -90,9 +90,6 @@ void IdoMysqlConnection::Resume()
|
|||||||
|
|
||||||
void IdoMysqlConnection::Pause()
|
void IdoMysqlConnection::Pause()
|
||||||
{
|
{
|
||||||
Log(LogInformation, "IdoMysqlConnection")
|
|
||||||
<< "'" << GetName() << "' paused.";
|
|
||||||
|
|
||||||
m_ReconnectTimer.reset();
|
m_ReconnectTimer.reset();
|
||||||
|
|
||||||
DbConnection::Pause();
|
DbConnection::Pause();
|
||||||
@ -102,8 +99,12 @@ void IdoMysqlConnection::Pause()
|
|||||||
<< "Rescheduling disconnect task.";
|
<< "Rescheduling disconnect task.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Disconnect, this), PriorityHigh);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Disconnect, this), PriorityLow);
|
||||||
m_QueryQueue.Join();
|
m_QueryQueue.Join();
|
||||||
|
|
||||||
|
Log(LogInformation, "IdoMysqlConnection")
|
||||||
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
||||||
@ -175,7 +176,7 @@ void IdoMysqlConnection::ReconnectTimerHandler()
|
|||||||
<< "Scheduling reconnect task.";
|
<< "Scheduling reconnect task.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Reconnect, this), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::Reconnect, this), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoMysqlConnection::Reconnect()
|
void IdoMysqlConnection::Reconnect()
|
||||||
@ -434,9 +435,9 @@ void IdoMysqlConnection::Reconnect()
|
|||||||
<< "Scheduling session table clear and finish connect task.";
|
<< "Scheduling session table clear and finish connect task.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::ClearTablesBySession, this), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::ClearTablesBySession, this), PriorityHigh);
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::FinishConnect, this, startTime), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::FinishConnect, this, startTime), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoMysqlConnection::FinishConnect(double startTime)
|
void IdoMysqlConnection::FinishConnect(double startTime)
|
||||||
@ -709,7 +710,7 @@ void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
|
|||||||
<< "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
|
<< "Scheduling object activation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalActivateObject, this, dbobj), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
||||||
@ -754,7 +755,7 @@ void IdoMysqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
|
|||||||
<< "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
|
<< "Scheduling object deactivation task for '" << dbobj->GetName1() << "!" << dbobj->GetName2() << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalDeactivateObject, this, dbobj), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
||||||
|
@ -97,15 +97,16 @@ void IdoPgsqlConnection::Resume()
|
|||||||
|
|
||||||
void IdoPgsqlConnection::Pause()
|
void IdoPgsqlConnection::Pause()
|
||||||
{
|
{
|
||||||
Log(LogInformation, "IdoPgsqlConnection")
|
|
||||||
<< "'" << GetName() << "' paused.";
|
|
||||||
|
|
||||||
m_ReconnectTimer.reset();
|
m_ReconnectTimer.reset();
|
||||||
|
|
||||||
DbConnection::Pause();
|
DbConnection::Pause();
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Disconnect, this), PriorityHigh);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Disconnect, this), PriorityLow);
|
||||||
m_QueryQueue.Join();
|
m_QueryQueue.Join();
|
||||||
|
|
||||||
|
Log(LogInformation, "IdoPgsqlConnection")
|
||||||
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
||||||
@ -165,7 +166,7 @@ void IdoPgsqlConnection::InternalNewTransaction()
|
|||||||
|
|
||||||
void IdoPgsqlConnection::ReconnectTimerHandler()
|
void IdoPgsqlConnection::ReconnectTimerHandler()
|
||||||
{
|
{
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoPgsqlConnection::Reconnect()
|
void IdoPgsqlConnection::Reconnect()
|
||||||
@ -408,9 +409,9 @@ void IdoPgsqlConnection::Reconnect()
|
|||||||
|
|
||||||
UpdateAllObjects();
|
UpdateAllObjects();
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityHigh);
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoPgsqlConnection::FinishConnect(double startTime)
|
void IdoPgsqlConnection::FinishConnect(double startTime)
|
||||||
@ -558,7 +559,7 @@ void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
|
|||||||
if (IsPaused())
|
if (IsPaused())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
||||||
@ -595,7 +596,7 @@ void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
|
|||||||
if (IsPaused())
|
if (IsPaused())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityLow);
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityHigh);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
||||||
|
@ -20,6 +20,8 @@ public:
|
|||||||
|
|
||||||
class Downtime : ConfigObject < DowntimeNameComposer
|
class Downtime : ConfigObject < DowntimeNameComposer
|
||||||
{
|
{
|
||||||
|
activation_priority -10;
|
||||||
|
|
||||||
load_after Host;
|
load_after Host;
|
||||||
load_after Service;
|
load_after Service;
|
||||||
|
|
||||||
|
@ -9,6 +9,8 @@ namespace icinga
|
|||||||
|
|
||||||
class IcingaApplication : Application
|
class IcingaApplication : Application
|
||||||
{
|
{
|
||||||
|
activation_priority -50;
|
||||||
|
|
||||||
[config, no_storage, virtual] String environment {
|
[config, no_storage, virtual] String environment {
|
||||||
get;
|
get;
|
||||||
set;
|
set;
|
||||||
|
@ -9,7 +9,7 @@ namespace icinga
|
|||||||
|
|
||||||
class NotificationComponent : ConfigObject
|
class NotificationComponent : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 100;
|
activation_priority 200;
|
||||||
|
|
||||||
[config] bool enable_ha (EnableHA) {
|
[config] bool enable_ha (EnableHA) {
|
||||||
default {{{ return true; }}}
|
default {{{ return true; }}}
|
||||||
|
@ -87,13 +87,16 @@ void ElasticsearchWriter::Resume()
|
|||||||
Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
|
Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||||
void ElasticsearchWriter::Pause()
|
void ElasticsearchWriter::Pause()
|
||||||
{
|
{
|
||||||
|
Flush();
|
||||||
|
m_WorkQueue.Join();
|
||||||
|
Flush();
|
||||||
|
|
||||||
Log(LogInformation, "ElasticsearchWriter")
|
Log(LogInformation, "ElasticsearchWriter")
|
||||||
<< "'" << GetName() << "' paused.";
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
m_WorkQueue.Join();
|
|
||||||
|
|
||||||
ObjectImpl<ElasticsearchWriter>::Pause();
|
ObjectImpl<ElasticsearchWriter>::Pause();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7,7 +7,7 @@ namespace icinga
|
|||||||
|
|
||||||
class ElasticsearchWriter : ConfigObject
|
class ElasticsearchWriter : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 80;
|
activation_priority 100;
|
||||||
|
|
||||||
[config, required] String host {
|
[config, required] String host {
|
||||||
default {{{ return "127.0.0.1"; }}}
|
default {{{ return "127.0.0.1"; }}}
|
||||||
|
@ -90,12 +90,26 @@ void GelfWriter::Resume()
|
|||||||
Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
|
Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||||
void GelfWriter::Pause()
|
void GelfWriter::Pause()
|
||||||
{
|
{
|
||||||
Log(LogInformation, "GelfWriter")
|
m_ReconnectTimer.reset();
|
||||||
<< "'" << GetName() << "' paused.";
|
|
||||||
|
try {
|
||||||
|
ReconnectInternal();
|
||||||
|
} catch (const std::exception&) {
|
||||||
|
Log(LogInformation, "GelfWriter")
|
||||||
|
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
|
||||||
|
|
||||||
|
ObjectImpl<GelfWriter>::Pause();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
m_WorkQueue.Join();
|
m_WorkQueue.Join();
|
||||||
|
DisconnectInternal();
|
||||||
|
|
||||||
|
Log(LogInformation, "GraphiteWriter")
|
||||||
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
ObjectImpl<GelfWriter>::Pause();
|
ObjectImpl<GelfWriter>::Pause();
|
||||||
}
|
}
|
||||||
@ -128,6 +142,11 @@ void GelfWriter::Reconnect()
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReconnectInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
void GelfWriter::ReconnectInternal()
|
||||||
|
{
|
||||||
double startTime = Utility::GetTime();
|
double startTime = Utility::GetTime();
|
||||||
|
|
||||||
CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
|
CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
|
||||||
@ -167,6 +186,11 @@ void GelfWriter::Disconnect()
|
|||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
|
DisconnectInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
void GelfWriter::DisconnectInternal()
|
||||||
|
{
|
||||||
if (!GetConnected())
|
if (!GetConnected())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -55,7 +55,9 @@ private:
|
|||||||
void ReconnectTimerHandler();
|
void ReconnectTimerHandler();
|
||||||
|
|
||||||
void Disconnect();
|
void Disconnect();
|
||||||
|
void DisconnectInternal();
|
||||||
void Reconnect();
|
void Reconnect();
|
||||||
|
void ReconnectInternal();
|
||||||
|
|
||||||
void AssertOnWorkQueue();
|
void AssertOnWorkQueue();
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ namespace icinga
|
|||||||
|
|
||||||
class GelfWriter : ConfigObject
|
class GelfWriter : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 80;
|
activation_priority 100;
|
||||||
|
|
||||||
[config] String host {
|
[config] String host {
|
||||||
default {{{ return "127.0.0.1"; }}}
|
default {{{ return "127.0.0.1"; }}}
|
||||||
|
@ -85,12 +85,26 @@ void GraphiteWriter::Resume()
|
|||||||
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
|
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||||
void GraphiteWriter::Pause()
|
void GraphiteWriter::Pause()
|
||||||
{
|
{
|
||||||
Log(LogInformation, "GraphiteWriter")
|
m_ReconnectTimer.reset();
|
||||||
<< "'" << GetName() << "' paused.";
|
|
||||||
|
try {
|
||||||
|
ReconnectInternal();
|
||||||
|
} catch (const std::exception&) {
|
||||||
|
Log(LogInformation, "GraphiteWriter")
|
||||||
|
<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
|
||||||
|
|
||||||
|
ObjectImpl<GraphiteWriter>::Pause();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
m_WorkQueue.Join();
|
m_WorkQueue.Join();
|
||||||
|
DisconnectInternal();
|
||||||
|
|
||||||
|
Log(LogInformation, "GraphiteWriter")
|
||||||
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
ObjectImpl<GraphiteWriter>::Pause();
|
ObjectImpl<GraphiteWriter>::Pause();
|
||||||
}
|
}
|
||||||
@ -123,6 +137,11 @@ void GraphiteWriter::Reconnect()
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ReconnectInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
void GraphiteWriter::ReconnectInternal()
|
||||||
|
{
|
||||||
double startTime = Utility::GetTime();
|
double startTime = Utility::GetTime();
|
||||||
|
|
||||||
CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
|
CONTEXT("Reconnecting to Graphite '" + GetName() + "'");
|
||||||
@ -155,6 +174,9 @@ void GraphiteWriter::Reconnect()
|
|||||||
|
|
||||||
void GraphiteWriter::ReconnectTimerHandler()
|
void GraphiteWriter::ReconnectTimerHandler()
|
||||||
{
|
{
|
||||||
|
if (IsPaused())
|
||||||
|
return;
|
||||||
|
|
||||||
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
|
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -162,6 +184,11 @@ void GraphiteWriter::Disconnect()
|
|||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
|
DisconnectInternal();
|
||||||
|
}
|
||||||
|
|
||||||
|
void GraphiteWriter::DisconnectInternal()
|
||||||
|
{
|
||||||
if (!GetConnected())
|
if (!GetConnected())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -184,6 +211,10 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
|
|||||||
|
|
||||||
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
|
CONTEXT("Processing check result for '" + checkable->GetName() + "'");
|
||||||
|
|
||||||
|
/* TODO: Deal with missing connection here. Needs refactoring
|
||||||
|
* into parsing the actual performance data and then putting it
|
||||||
|
* into a queue for re-inserting. */
|
||||||
|
|
||||||
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
|
if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -276,7 +307,7 @@ void GraphiteWriter::SendMetric(const String& prefix, const String& name, double
|
|||||||
msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
|
msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
|
||||||
|
|
||||||
Log(LogDebug, "GraphiteWriter")
|
Log(LogDebug, "GraphiteWriter")
|
||||||
<< "Add to metric list:'" << msgbuf.str() << "'.";
|
<< "Add to metric list: '" << msgbuf.str() << "'.";
|
||||||
|
|
||||||
// do not send \n to debug log
|
// do not send \n to debug log
|
||||||
msgbuf << "\n";
|
msgbuf << "\n";
|
||||||
|
@ -54,7 +54,9 @@ private:
|
|||||||
void ReconnectTimerHandler();
|
void ReconnectTimerHandler();
|
||||||
|
|
||||||
void Disconnect();
|
void Disconnect();
|
||||||
|
void DisconnectInternal();
|
||||||
void Reconnect();
|
void Reconnect();
|
||||||
|
void ReconnectInternal();
|
||||||
|
|
||||||
void AssertOnWorkQueue();
|
void AssertOnWorkQueue();
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ namespace icinga
|
|||||||
|
|
||||||
class GraphiteWriter : ConfigObject
|
class GraphiteWriter : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 80;
|
activation_priority 100;
|
||||||
|
|
||||||
[config] String host {
|
[config] String host {
|
||||||
default {{{ return "127.0.0.1"; }}}
|
default {{{ return "127.0.0.1"; }}}
|
||||||
|
@ -113,24 +113,33 @@ void InfluxdbWriter::Resume()
|
|||||||
Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||||
void InfluxdbWriter::Pause()
|
void InfluxdbWriter::Pause()
|
||||||
{
|
{
|
||||||
|
/* Force a flush. */
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Flushing pending data buffers.";
|
||||||
|
|
||||||
|
Flush();
|
||||||
|
|
||||||
|
/* Work on the missing tasks. TODO: Find a way to cache them on disk. */
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Joining existing WQ tasks.";
|
||||||
|
|
||||||
|
m_WorkQueue.Join();
|
||||||
|
|
||||||
|
/* Flush again after the WQ tasks have filled the data buffer. */
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Flushing data buffers from WQ tasks.";
|
||||||
|
|
||||||
|
Flush();
|
||||||
|
|
||||||
Log(LogInformation, "InfluxdbWriter")
|
Log(LogInformation, "InfluxdbWriter")
|
||||||
<< "'" << GetName() << "' paused.";
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
m_WorkQueue.Join();
|
|
||||||
|
|
||||||
ObjectImpl<InfluxdbWriter>::Pause();
|
ObjectImpl<InfluxdbWriter>::Pause();
|
||||||
}
|
}
|
||||||
|
|
||||||
void InfluxdbWriter::Stop(bool runtimeDeleted)
|
|
||||||
{
|
|
||||||
FlushTimeout();
|
|
||||||
m_WorkQueue.Join();
|
|
||||||
|
|
||||||
ObjectImpl<InfluxdbWriter>::Stop(runtimeDeleted);
|
|
||||||
}
|
|
||||||
|
|
||||||
void InfluxdbWriter::AssertOnWorkQueue()
|
void InfluxdbWriter::AssertOnWorkQueue()
|
||||||
{
|
{
|
||||||
ASSERT(m_WorkQueue.IsWorkerThread());
|
ASSERT(m_WorkQueue.IsWorkerThread());
|
||||||
@ -419,6 +428,9 @@ void InfluxdbWriter::FlushTimeoutWQ()
|
|||||||
|
|
||||||
void InfluxdbWriter::Flush()
|
void InfluxdbWriter::Flush()
|
||||||
{
|
{
|
||||||
|
Log(LogDebug, "InfluxdbWriter")
|
||||||
|
<< "Flushing data buffer to InfluxDB.";
|
||||||
|
|
||||||
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
||||||
m_DataBuffer.clear();
|
m_DataBuffer.clear();
|
||||||
|
|
||||||
|
@ -34,7 +34,6 @@ protected:
|
|||||||
void OnConfigLoaded() override;
|
void OnConfigLoaded() override;
|
||||||
void Resume() override;
|
void Resume() override;
|
||||||
void Pause() override;
|
void Pause() override;
|
||||||
void Stop(bool runtimeDeleted) override;
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
WorkQueue m_WorkQueue{10000000, 1};
|
WorkQueue m_WorkQueue{10000000, 1};
|
||||||
|
@ -9,7 +9,7 @@ namespace icinga
|
|||||||
|
|
||||||
class InfluxdbWriter : ConfigObject
|
class InfluxdbWriter : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 80;
|
activation_priority 100;
|
||||||
|
|
||||||
[config, required] String host {
|
[config, required] String host {
|
||||||
default {{{ return "127.0.0.1"; }}}
|
default {{{ return "127.0.0.1"; }}}
|
||||||
|
@ -68,8 +68,11 @@ void OpenTsdbWriter::Resume()
|
|||||||
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
|
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||||
void OpenTsdbWriter::Pause()
|
void OpenTsdbWriter::Pause()
|
||||||
{
|
{
|
||||||
|
m_ReconnectTimer.reset();
|
||||||
|
|
||||||
Log(LogInformation, "OpentsdbWriter")
|
Log(LogInformation, "OpentsdbWriter")
|
||||||
<< "'" << GetName() << "' paused.";
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
|
@ -9,7 +9,7 @@ namespace icinga
|
|||||||
|
|
||||||
class OpenTsdbWriter : ConfigObject
|
class OpenTsdbWriter : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 80;
|
activation_priority 100;
|
||||||
|
|
||||||
[config] String host {
|
[config] String host {
|
||||||
default {{{ return "127.0.0.1"; }}}
|
default {{{ return "127.0.0.1"; }}}
|
||||||
|
@ -66,6 +66,16 @@ void PerfdataWriter::Resume()
|
|||||||
|
|
||||||
void PerfdataWriter::Pause()
|
void PerfdataWriter::Pause()
|
||||||
{
|
{
|
||||||
|
m_RotationTimer.reset();
|
||||||
|
|
||||||
|
#ifdef I2_DEBUG
|
||||||
|
//m_HostOutputFile << "\n# Pause the feature" << "\n\n";
|
||||||
|
//m_ServiceOutputFile << "\n# Pause the feature" << "\n\n";
|
||||||
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
/* Force a rotation closing the file stream. */
|
||||||
|
RotateAllFiles();
|
||||||
|
|
||||||
Log(LogInformation, "PerfdataWriter")
|
Log(LogInformation, "PerfdataWriter")
|
||||||
<< "'" << GetName() << "' paused.";
|
<< "'" << GetName() << "' paused.";
|
||||||
|
|
||||||
@ -108,7 +118,8 @@ void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||||||
String line = MacroProcessor::ResolveMacros(GetServiceFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric);
|
String line = MacroProcessor::ResolveMacros(GetServiceFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric);
|
||||||
|
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
boost::mutex::scoped_lock lock(m_StreamMutex);
|
||||||
|
|
||||||
if (!m_ServiceOutputFile.good())
|
if (!m_ServiceOutputFile.good())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -118,7 +129,8 @@ void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||||||
String line = MacroProcessor::ResolveMacros(GetHostFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric);
|
String line = MacroProcessor::ResolveMacros(GetHostFormatTemplate(), resolvers, cr, nullptr, &PerfdataWriter::EscapeMacroMetric);
|
||||||
|
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
boost::mutex::scoped_lock lock(m_StreamMutex);
|
||||||
|
|
||||||
if (!m_HostOutputFile.good())
|
if (!m_HostOutputFile.good())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
@ -129,13 +141,20 @@ void PerfdataWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
|
|||||||
|
|
||||||
void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path)
|
void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path)
|
||||||
{
|
{
|
||||||
ObjectLock olock(this);
|
Log(LogDebug, "PerfdataWriter")
|
||||||
|
<< "Rotating perfdata files.";
|
||||||
|
|
||||||
|
boost::mutex::scoped_lock lock(m_StreamMutex);
|
||||||
|
|
||||||
if (output.good()) {
|
if (output.good()) {
|
||||||
output.close();
|
output.close();
|
||||||
|
|
||||||
if (Utility::PathExists(temp_path)) {
|
if (Utility::PathExists(temp_path)) {
|
||||||
String finalFile = perfdata_path + "." + Convert::ToString((long)Utility::GetTime());
|
String finalFile = perfdata_path + "." + Convert::ToString((long)Utility::GetTime());
|
||||||
|
|
||||||
|
Log(LogDebug, "PerfdataWriter")
|
||||||
|
<< "Closed output file and renaming into '" << finalFile << "'.";
|
||||||
|
|
||||||
if (rename(temp_path.CStr(), finalFile.CStr()) < 0) {
|
if (rename(temp_path.CStr(), finalFile.CStr()) < 0) {
|
||||||
BOOST_THROW_EXCEPTION(posix_error()
|
BOOST_THROW_EXCEPTION(posix_error()
|
||||||
<< boost::errinfo_api_function("rename")
|
<< boost::errinfo_api_function("rename")
|
||||||
@ -147,9 +166,10 @@ void PerfdataWriter::RotateFile(std::ofstream& output, const String& temp_path,
|
|||||||
|
|
||||||
output.open(temp_path.CStr());
|
output.open(temp_path.CStr());
|
||||||
|
|
||||||
if (!output.good())
|
if (!output.good()) {
|
||||||
Log(LogWarning, "PerfdataWriter")
|
Log(LogWarning, "PerfdataWriter")
|
||||||
<< "Could not open perfdata file '" << temp_path << "' for writing. Perfdata will be lost.";
|
<< "Could not open perfdata file '" << temp_path << "' for writing. Perfdata will be lost.";
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void PerfdataWriter::RotationTimerHandler()
|
void PerfdataWriter::RotationTimerHandler()
|
||||||
@ -157,6 +177,11 @@ void PerfdataWriter::RotationTimerHandler()
|
|||||||
if (IsPaused())
|
if (IsPaused())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
RotateAllFiles();
|
||||||
|
}
|
||||||
|
|
||||||
|
void PerfdataWriter::RotateAllFiles()
|
||||||
|
{
|
||||||
RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath());
|
RotateFile(m_ServiceOutputFile, GetServiceTempPath(), GetServicePerfdataPath());
|
||||||
RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath());
|
RotateFile(m_HostOutputFile, GetHostTempPath(), GetHostPerfdataPath());
|
||||||
}
|
}
|
||||||
|
@ -34,14 +34,16 @@ protected:
|
|||||||
void Pause() override;
|
void Pause() override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Timer::Ptr m_RotationTimer;
|
||||||
|
std::ofstream m_ServiceOutputFile;
|
||||||
|
std::ofstream m_HostOutputFile;
|
||||||
|
boost::mutex m_StreamMutex;
|
||||||
|
|
||||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||||
static Value EscapeMacroMetric(const Value& value);
|
static Value EscapeMacroMetric(const Value& value);
|
||||||
|
|
||||||
Timer::Ptr m_RotationTimer;
|
|
||||||
void RotationTimerHandler();
|
void RotationTimerHandler();
|
||||||
|
void RotateAllFiles();
|
||||||
std::ofstream m_ServiceOutputFile;
|
|
||||||
std::ofstream m_HostOutputFile;
|
|
||||||
void RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path);
|
void RotateFile(std::ofstream& output, const String& temp_path, const String& perfdata_path);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -10,7 +10,7 @@ namespace icinga
|
|||||||
|
|
||||||
class PerfdataWriter : ConfigObject
|
class PerfdataWriter : ConfigObject
|
||||||
{
|
{
|
||||||
activation_priority 80;
|
activation_priority 100;
|
||||||
|
|
||||||
[config] String host_perfdata_path {
|
[config] String host_perfdata_path {
|
||||||
default {{{ return Configuration::SpoolDir + "/perfdata/host-perfdata"; }}}
|
default {{{ return Configuration::SpoolDir + "/perfdata/host-perfdata"; }}}
|
||||||
|
@ -218,7 +218,7 @@ void ConfigPackageUtility::AsyncTryActivateStage(const String& packageName, cons
|
|||||||
args->Add("ActiveStageOverride=" + packageName + ":" + stageName);
|
args->Add("ActiveStageOverride=" + packageName + ":" + stageName);
|
||||||
|
|
||||||
Process::Ptr process = new Process(Process::PrepareCommand(args));
|
Process::Ptr process = new Process(Process::PrepareCommand(args));
|
||||||
process->SetTimeout(300);
|
process->SetTimeout(Configuration::ReloadTimeout);
|
||||||
process->Run(std::bind(&TryActivateStageCallback, _1, packageName, stageName, reload));
|
process->Run(std::bind(&TryActivateStageCallback, _1, packageName, stageName, reload));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user