mirror of https://github.com/Icinga/icinga2.git
Merge pull request #8604 from Icinga/bugfix/icingadb-initial-sync-delay
IcingaDB: start initial dump in callback instead of timer
This commit is contained in:
commit
b1719883a3
|
@ -117,6 +117,7 @@ void IcingaDB::ConfigStaticInitialize()
|
||||||
|
|
||||||
void IcingaDB::UpdateAllConfigObjects()
|
void IcingaDB::UpdateAllConfigObjects()
|
||||||
{
|
{
|
||||||
|
Log(LogInformation, "IcingaDB") << "Starting initial config/status dump";
|
||||||
double startTime = Utility::GetTime();
|
double startTime = Utility::GetTime();
|
||||||
|
|
||||||
// Use a Workqueue to pack objects in parallel
|
// Use a Workqueue to pack objects in parallel
|
||||||
|
|
|
@ -63,16 +63,13 @@ void IcingaDB::Start(bool runtimeCreated)
|
||||||
m_ConfigDumpInProgress = false;
|
m_ConfigDumpInProgress = false;
|
||||||
m_ConfigDumpDone = false;
|
m_ConfigDumpDone = false;
|
||||||
|
|
||||||
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex());
|
|
||||||
m_Rcon->Start();
|
|
||||||
|
|
||||||
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
|
||||||
|
|
||||||
m_ReconnectTimer = new Timer();
|
m_Rcon = new RedisConnection(GetHost(), GetPort(), GetPath(), GetPassword(), GetDbIndex());
|
||||||
m_ReconnectTimer->SetInterval(15);
|
m_Rcon->SetConnectedCallback([this](boost::asio::yield_context& yc) {
|
||||||
m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
|
m_WorkQueue.Enqueue([this]() { OnConnectedHandler(); });
|
||||||
m_ReconnectTimer->Start();
|
});
|
||||||
m_ReconnectTimer->Reschedule(0);
|
m_Rcon->Start();
|
||||||
|
|
||||||
m_StatsTimer = new Timer();
|
m_StatsTimer = new Timer();
|
||||||
m_StatsTimer->SetInterval(1);
|
m_StatsTimer->SetInterval(1);
|
||||||
|
@ -93,23 +90,10 @@ void IcingaDB::ExceptionHandler(boost::exception_ptr exp)
|
||||||
<< "Exception during redis operation: " << DiagnosticInformation(exp);
|
<< "Exception during redis operation: " << DiagnosticInformation(exp);
|
||||||
}
|
}
|
||||||
|
|
||||||
void IcingaDB::ReconnectTimerHandler()
|
void IcingaDB::OnConnectedHandler()
|
||||||
{
|
|
||||||
m_WorkQueue.Enqueue([this]() { TryToReconnect(); });
|
|
||||||
}
|
|
||||||
|
|
||||||
void IcingaDB::TryToReconnect()
|
|
||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
if (m_ConfigDumpDone)
|
|
||||||
return;
|
|
||||||
else
|
|
||||||
m_Rcon->Start();
|
|
||||||
|
|
||||||
if (!m_Rcon || !m_Rcon->IsConnected())
|
|
||||||
return;
|
|
||||||
|
|
||||||
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
|
if (m_ConfigDumpInProgress || m_ConfigDumpDone)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
|
|
@ -35,8 +35,7 @@ public:
|
||||||
virtual void Stop(bool runtimeRemoved) override;
|
virtual void Stop(bool runtimeRemoved) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void ReconnectTimerHandler();
|
void OnConnectedHandler();
|
||||||
void TryToReconnect();
|
|
||||||
|
|
||||||
void PublishStatsTimerHandler();
|
void PublishStatsTimerHandler();
|
||||||
void PublishStats();
|
void PublishStats();
|
||||||
|
@ -134,7 +133,6 @@ private:
|
||||||
}
|
}
|
||||||
|
|
||||||
Timer::Ptr m_StatsTimer;
|
Timer::Ptr m_StatsTimer;
|
||||||
Timer::Ptr m_ReconnectTimer;
|
|
||||||
WorkQueue m_WorkQueue;
|
WorkQueue m_WorkQueue;
|
||||||
|
|
||||||
String m_PrefixConfigObject;
|
String m_PrefixConfigObject;
|
||||||
|
|
|
@ -241,6 +241,10 @@ void RedisConnection::Connect(asio::yield_context& yc)
|
||||||
|
|
||||||
Log(LogInformation, "IcingaDB", "Connected to Redis server");
|
Log(LogInformation, "IcingaDB", "Connected to Redis server");
|
||||||
|
|
||||||
|
if (m_ConnectedCallback) {
|
||||||
|
m_ConnectedCallback(yc);
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
} catch (const boost::coroutines::detail::forced_unwind&) {
|
} catch (const boost::coroutines::detail::forced_unwind&) {
|
||||||
throw;
|
throw;
|
||||||
|
@ -512,3 +516,14 @@ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_contex
|
||||||
WriteOne(m_UnixConn, query, yc);
|
WriteOne(m_UnixConn, query, yc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Specify a callback that is run each time a connection is successfully established
|
||||||
|
*
|
||||||
|
* The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations.
|
||||||
|
*
|
||||||
|
* @param callback Callback to execute
|
||||||
|
*/
|
||||||
|
void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
|
||||||
|
m_ConnectedCallback = std::move(callback);
|
||||||
|
}
|
||||||
|
|
|
@ -84,6 +84,8 @@ namespace icinga
|
||||||
void SuppressQueryKind(QueryPriority kind);
|
void SuppressQueryKind(QueryPriority kind);
|
||||||
void UnsuppressQueryKind(QueryPriority kind);
|
void UnsuppressQueryKind(QueryPriority kind);
|
||||||
|
|
||||||
|
void SetConnectedCallback(std::function<void(boost::asio::yield_context& yc)> callback);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
/**
|
/**
|
||||||
* What to do with the responses to Redis queries.
|
* What to do with the responses to Redis queries.
|
||||||
|
@ -182,6 +184,8 @@ namespace icinga
|
||||||
|
|
||||||
// Indicate that there's something to send/receive
|
// Indicate that there's something to send/receive
|
||||||
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
|
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
|
||||||
|
|
||||||
|
std::function<void(boost::asio::yield_context& yc)> m_ConnectedCallback;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
Loading…
Reference in New Issue