mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-22 01:08:04 +02:00
Perfdata writers: disconnect handlers from signals in Pause()
as they would be re-connected in Resume() (HA). Before they were still connected during pause and connected X+1 times after X split-brains (the same data was written X+1 times).
This commit is contained in:
parent
92e688b94a
commit
1eb274b550
@ -95,14 +95,18 @@ void ElasticsearchWriter::Resume()
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
/* Register for new metrics. */
|
||||
Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
|
||||
Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
|
||||
Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect(std::bind(&ElasticsearchWriter::CheckResultHandler, this, _1, _2));
|
||||
m_HandleStateChanges = Checkable::OnStateChange.connect(std::bind(&ElasticsearchWriter::StateChangeHandler, this, _1, _2, _3));
|
||||
m_HandleNotifications = Checkable::OnNotificationSentToAllUsers.connect(std::bind(&ElasticsearchWriter::NotificationSentToAllUsersHandler, this, _1, _2, _3, _4, _5, _6, _7));
|
||||
}
|
||||
|
||||
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||
void ElasticsearchWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_HandleStateChanges.disconnect();
|
||||
m_HandleNotifications.disconnect();
|
||||
|
||||
Flush();
|
||||
m_WorkQueue.Join();
|
||||
Flush();
|
||||
|
@ -31,6 +31,7 @@ protected:
|
||||
private:
|
||||
String m_EventPrefix;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
boost::signals2::connection m_HandleCheckResults, m_HandleStateChanges, m_HandleNotifications;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
std::vector<String> m_DataBuffer;
|
||||
boost::mutex m_DataBufferMutex;
|
||||
|
@ -90,14 +90,18 @@ void GelfWriter::Resume()
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
|
||||
/* Register event handlers. */
|
||||
Checkable::OnNewCheckResult.connect(std::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
|
||||
Checkable::OnNotificationSentToUser.connect(std::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
|
||||
Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect(std::bind(&GelfWriter::CheckResultHandler, this, _1, _2));
|
||||
m_HandleNotifications = Checkable::OnNotificationSentToUser.connect(std::bind(&GelfWriter::NotificationToUserHandler, this, _1, _2, _3, _4, _5, _6, _7, _8));
|
||||
m_HandleStateChanges = Checkable::OnStateChange.connect(std::bind(&GelfWriter::StateChangeHandler, this, _1, _2, _3));
|
||||
}
|
||||
|
||||
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||
void GelfWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_HandleNotifications.disconnect();
|
||||
m_HandleStateChanges.disconnect();
|
||||
|
||||
m_ReconnectTimer.reset();
|
||||
|
||||
try {
|
||||
|
@ -36,6 +36,7 @@ private:
|
||||
OptionalTlsStream m_Stream;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults, m_HandleNotifications, m_HandleStateChanges;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
|
@ -95,7 +95,7 @@ void GraphiteWriter::Resume()
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
|
||||
/* Register event handlers. */
|
||||
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -103,6 +103,7 @@ void GraphiteWriter::Resume()
|
||||
*/
|
||||
void GraphiteWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_ReconnectTimer.reset();
|
||||
|
||||
try {
|
||||
|
@ -41,6 +41,7 @@ private:
|
||||
boost::mutex m_StreamMutex;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
|
||||
|
@ -123,12 +123,14 @@ void InfluxdbWriter::Resume()
|
||||
m_FlushTimer->Reschedule(0);
|
||||
|
||||
/* Register for new metrics. */
|
||||
Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect(std::bind(&InfluxdbWriter::CheckResultHandler, this, _1, _2));
|
||||
}
|
||||
|
||||
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
|
||||
void InfluxdbWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
|
||||
/* Force a flush. */
|
||||
Log(LogDebug, "InfluxdbWriter")
|
||||
<< "Processing pending tasks and flushing data buffers.";
|
||||
|
@ -38,6 +38,7 @@ protected:
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_FlushTimer;
|
||||
WorkQueue m_WorkQueue{10000000, 1};
|
||||
std::vector<String> m_DataBuffer;
|
||||
|
@ -81,7 +81,7 @@ void OpenTsdbWriter::Resume()
|
||||
m_ReconnectTimer->Start();
|
||||
m_ReconnectTimer->Reschedule(0);
|
||||
|
||||
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
|
||||
m_HandleCheckResults = Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
|
||||
}
|
||||
|
||||
/**
|
||||
@ -89,6 +89,7 @@ void OpenTsdbWriter::Resume()
|
||||
*/
|
||||
void OpenTsdbWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_ReconnectTimer.reset();
|
||||
|
||||
Log(LogInformation, "OpentsdbWriter")
|
||||
|
@ -37,6 +37,7 @@ protected:
|
||||
private:
|
||||
Shared<AsioTcpStream>::Ptr m_Stream;
|
||||
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_ReconnectTimer;
|
||||
|
||||
Dictionary::Ptr m_ServiceConfigTemplate;
|
||||
|
@ -53,7 +53,7 @@ void PerfdataWriter::Resume()
|
||||
Log(LogInformation, "PerfdataWriter")
|
||||
<< "'" << GetName() << "' resumed.";
|
||||
|
||||
Checkable::OnNewCheckResult.connect(std::bind(&PerfdataWriter::CheckResultHandler, this, _1, _2));
|
||||
m_HandleCheckResults = Checkable::OnNewCheckResult.connect(std::bind(&PerfdataWriter::CheckResultHandler, this, _1, _2));
|
||||
|
||||
m_RotationTimer = new Timer();
|
||||
m_RotationTimer->OnTimerExpired.connect(std::bind(&PerfdataWriter::RotationTimerHandler, this));
|
||||
@ -66,6 +66,7 @@ void PerfdataWriter::Resume()
|
||||
|
||||
void PerfdataWriter::Pause()
|
||||
{
|
||||
m_HandleCheckResults.disconnect();
|
||||
m_RotationTimer.reset();
|
||||
|
||||
#ifdef I2_DEBUG
|
||||
|
@ -34,6 +34,7 @@ protected:
|
||||
void Pause() override;
|
||||
|
||||
private:
|
||||
boost::signals2::connection m_HandleCheckResults;
|
||||
Timer::Ptr m_RotationTimer;
|
||||
std::ofstream m_ServiceOutputFile;
|
||||
std::ofstream m_HostOutputFile;
|
||||
|
Loading…
x
Reference in New Issue
Block a user