Merge pull request #9322 from Icinga/perfdata-resume-signal

Perfdata writers: disconnect handlers from signals in Pause()
This commit is contained in:
Julian Brost 2022-04-07 15:52:00 +02:00 committed by GitHub
commit 340b803a8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 40 additions and 16 deletions

View File

@ -95,16 +95,17 @@ void ElasticsearchWriter::Resume()
m_FlushTimer->Reschedule(0); m_FlushTimer->Reschedule(0);
/* Register for new metrics. */ /* Register for new metrics. */
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr); CheckResultHandler(checkable, cr);
}); });
Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
const MessageOrigin::Ptr&) { const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr, type); StateChangeHandler(checkable, cr, type);
}); });
Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable, m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect([this](const Notification::Ptr& notification,
const std::set<User::Ptr>& users, const NotificationType& type, const CheckResult::Ptr& cr, const String& author, const Checkable::Ptr& checkable, const std::set<User::Ptr>& users, const NotificationType& type,
const String& text, const MessageOrigin::Ptr&) { const CheckResult::Ptr& cr, const String& author, const String& text, const MessageOrigin::Ptr&) {
NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text); NotificationSentToAllUsersHandler(notification, checkable, users, type, cr, author, text);
}); });
} }
@ -112,6 +113,10 @@ void ElasticsearchWriter::Resume()
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
void ElasticsearchWriter::Pause() void ElasticsearchWriter::Pause()
{ {
m_HandleCheckResults.disconnect();
m_HandleStateChanges.disconnect();
m_HandleNotifications.disconnect();
Flush(); Flush();
m_WorkQueue.Join(); m_WorkQueue.Join();
Flush(); Flush();

View File

@ -31,6 +31,7 @@ protected:
private: private:
String m_EventPrefix; String m_EventPrefix;
WorkQueue m_WorkQueue{10000000, 1}; WorkQueue m_WorkQueue{10000000, 1};
boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
Timer::Ptr m_FlushTimer; Timer::Ptr m_FlushTimer;
std::vector<String> m_DataBuffer; std::vector<String> m_DataBuffer;
std::mutex m_DataBufferMutex; std::mutex m_DataBufferMutex;

View File

@ -90,16 +90,17 @@ void GelfWriter::Resume()
m_ReconnectTimer->Reschedule(0); m_ReconnectTimer->Reschedule(0);
/* Register event handlers. */ /* Register event handlers. */
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr); CheckResultHandler(checkable, cr);
}); });
Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification, const Checkable::Ptr& checkable, m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr, const String& author, const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
const String& commentText, const String& commandName, const MessageOrigin::Ptr&) { const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName); NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
}); });
Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type, m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
const MessageOrigin::Ptr&) { const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
StateChangeHandler(checkable, cr, type); StateChangeHandler(checkable, cr, type);
}); });
} }
@ -107,6 +108,10 @@ void GelfWriter::Resume()
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
void GelfWriter::Pause() void GelfWriter::Pause()
{ {
m_HandleCheckResults.disconnect();
m_HandleNotifications.disconnect();
m_HandleStateChanges.disconnect();
m_ReconnectTimer.reset(); m_ReconnectTimer.reset();
try { try {

View File

@ -36,6 +36,7 @@ private:
OptionalTlsStream m_Stream; OptionalTlsStream m_Stream;
WorkQueue m_WorkQueue{10000000, 1}; WorkQueue m_WorkQueue{10000000, 1};
boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);

View File

@ -95,7 +95,8 @@ void GraphiteWriter::Resume()
m_ReconnectTimer->Reschedule(0); m_ReconnectTimer->Reschedule(0);
/* Register event handlers. */ /* Register event handlers. */
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr); CheckResultHandler(checkable, cr);
}); });
} }
@ -105,6 +106,7 @@ void GraphiteWriter::Resume()
*/ */
void GraphiteWriter::Pause() void GraphiteWriter::Pause()
{ {
m_HandleCheckResults.disconnect();
m_ReconnectTimer.reset(); m_ReconnectTimer.reset();
try { try {

View File

@ -41,6 +41,7 @@ private:
std::mutex m_StreamMutex; std::mutex m_StreamMutex;
WorkQueue m_WorkQueue{10000000, 1}; WorkQueue m_WorkQueue{10000000, 1};
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);

View File

@ -97,7 +97,8 @@ void InfluxdbCommonWriter::Resume()
m_FlushTimer->Reschedule(0); m_FlushTimer->Reschedule(0);
/* Register for new metrics. */ /* Register for new metrics. */
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr); CheckResultHandler(checkable, cr);
}); });
} }
@ -105,6 +106,8 @@ void InfluxdbCommonWriter::Resume()
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
void InfluxdbCommonWriter::Pause() void InfluxdbCommonWriter::Pause()
{ {
m_HandleCheckResults.disconnect();
/* Force a flush. */ /* Force a flush. */
Log(LogDebug, GetReflectionType()->GetName()) Log(LogDebug, GetReflectionType()->GetName())
<< "Processing pending tasks and flushing data buffers."; << "Processing pending tasks and flushing data buffers.";

View File

@ -47,6 +47,7 @@ protected:
virtual Url::Ptr AssembleUrl() = 0; virtual Url::Ptr AssembleUrl() = 0;
private: private:
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_FlushTimer; Timer::Ptr m_FlushTimer;
WorkQueue m_WorkQueue{10000000, 1}; WorkQueue m_WorkQueue{10000000, 1};
std::vector<String> m_DataBuffer; std::vector<String> m_DataBuffer;

View File

@ -81,7 +81,7 @@ void OpenTsdbWriter::Resume()
m_ReconnectTimer->Start(); m_ReconnectTimer->Start();
m_ReconnectTimer->Reschedule(0); m_ReconnectTimer->Reschedule(0);
Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { m_HandleCheckResults = Service::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr); CheckResultHandler(checkable, cr);
}); });
} }
@ -91,6 +91,7 @@ void OpenTsdbWriter::Resume()
*/ */
void OpenTsdbWriter::Pause() void OpenTsdbWriter::Pause()
{ {
m_HandleCheckResults.disconnect();
m_ReconnectTimer.reset(); m_ReconnectTimer.reset();
Log(LogInformation, "OpentsdbWriter") Log(LogInformation, "OpentsdbWriter")

View File

@ -37,6 +37,7 @@ protected:
private: private:
Shared<AsioTcpStream>::Ptr m_Stream; Shared<AsioTcpStream>::Ptr m_Stream;
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
Dictionary::Ptr m_ServiceConfigTemplate; Dictionary::Ptr m_ServiceConfigTemplate;

View File

@ -53,7 +53,8 @@ void PerfdataWriter::Resume()
Log(LogInformation, "PerfdataWriter") Log(LogInformation, "PerfdataWriter")
<< "'" << GetName() << "' resumed."; << "'" << GetName() << "' resumed.";
Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
CheckResultHandler(checkable, cr); CheckResultHandler(checkable, cr);
}); });
@ -68,6 +69,7 @@ void PerfdataWriter::Resume()
void PerfdataWriter::Pause() void PerfdataWriter::Pause()
{ {
m_HandleCheckResults.disconnect();
m_RotationTimer.reset(); m_RotationTimer.reset();
#ifdef I2_DEBUG #ifdef I2_DEBUG

View File

@ -34,6 +34,7 @@ protected:
void Pause() override; void Pause() override;
private: private:
boost::signals2::connection m_HandleCheckResults;
Timer::Ptr m_RotationTimer; Timer::Ptr m_RotationTimer;
std::ofstream m_ServiceOutputFile; std::ofstream m_ServiceOutputFile;
std::ofstream m_HostOutputFile; std::ofstream m_HostOutputFile;