Use a work queue for replaying the cluster log

refs #10713
This commit is contained in:
Michael Friedrich 2015-11-25 13:11:41 +01:00 committed by Gunnar Beutner
parent 93834623a0
commit 91e1e9d93e
3 changed files with 33 additions and 27 deletions

View File

@ -48,7 +48,7 @@ REGISTER_STATSFUNCTION(ApiListener, &ApiListener::StatsFunc);
REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler); REGISTER_APIFUNCTION(Hello, icinga, &ApiListener::HelloAPIHandler);
ApiListener::ApiListener(void) ApiListener::ApiListener(void)
: m_LogMessageCount(0) : m_SyncQueue(0, 4), m_LogMessageCount(0)
{ } { }
void ApiListener::OnConfigLoaded(void) void ApiListener::OnConfigLoaded(void)
@ -373,26 +373,8 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri
if (endpoint) { if (endpoint) {
endpoint->AddClient(aclient); endpoint->AddClient(aclient);
if (need_sync) { if (need_sync)
{ m_SyncQueue.Enqueue(boost::bind(&ApiListener::SyncClient, this, aclient, endpoint));
ObjectLock olock(endpoint);
endpoint->SetSyncing(true);
}
Log(LogInformation, "ApiListener")
<< "Sending updates for endpoint '" << endpoint->GetName() << "'.";
/* sync zone file config */
SendConfigUpdate(aclient);
/* sync runtime config */
SendRuntimeConfigObjects(aclient);
Log(LogInformation, "ApiListener")
<< "Finished sending updates for endpoint '" << endpoint->GetName() << "'.";
ReplayLog(aclient);
}
} else } else
AddAnonymousClient(aclient); AddAnonymousClient(aclient);
} else { } else {
@ -404,6 +386,33 @@ void ApiListener::NewClientHandlerInternal(const Socket::Ptr& client, const Stri
} }
} }
void ApiListener::SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint)
{
try {
{
ObjectLock olock(endpoint);
endpoint->SetSyncing(true);
}
Log(LogInformation, "ApiListener")
<< "Sending updates for endpoint '" << endpoint->GetName() << "'.";
/* sync zone file config */
SendConfigUpdate(aclient);
/* sync runtime config */
SendRuntimeConfigObjects(aclient);
Log(LogInformation, "ApiListener")
<< "Finished sending updates for endpoint '" << endpoint->GetName() << "'.";
ReplayLog(aclient);
} catch (const std::exception& ex) {
Log(LogCritical, "ApiListener")
<< "Error while syncing endpoint '" << endpoint->GetName() << "': " << DiagnosticInformation(ex);
}
}
void ApiListener::ApiTimerHandler(void) void ApiListener::ApiTimerHandler(void)
{ {
double now = Utility::GetTime(); double now = Utility::GetTime();

View File

@ -113,6 +113,7 @@ private:
void ListenerThreadProc(const Socket::Ptr& server); void ListenerThreadProc(const Socket::Ptr& server);
WorkQueue m_RelayQueue; WorkQueue m_RelayQueue;
WorkQueue m_SyncQueue;
boost::mutex m_LogLock; boost::mutex m_LogLock;
Stream::Ptr m_LogFile; Stream::Ptr m_LogFile;
@ -143,6 +144,8 @@ private:
void DeleteConfigObject(const ConfigObject::Ptr& object, const MessageOrigin::Ptr& origin, void DeleteConfigObject(const ConfigObject::Ptr& object, const MessageOrigin::Ptr& origin,
const JsonRpcConnection::Ptr& client = JsonRpcConnection::Ptr()); const JsonRpcConnection::Ptr& client = JsonRpcConnection::Ptr());
void SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient); void SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient);
void SyncClient(const JsonRpcConnection::Ptr& aclient, const Endpoint::Ptr& endpoint);
}; };
} }

View File

@ -46,12 +46,6 @@ void JsonRpcConnection::HeartbeatTimerHandler(void)
{ {
BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) { BOOST_FOREACH(const Endpoint::Ptr& endpoint, ConfigType::GetObjectsByType<Endpoint>()) {
BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients()) { BOOST_FOREACH(const JsonRpcConnection::Ptr& client, endpoint->GetClients()) {
if (endpoint->GetSyncing()) {
Log(LogInformation, "JsonRpcConnection")
<< "Not sending heartbeat for endpoint '" << endpoint->GetName() << "' because we're replaying the log for it.";
continue;
}
if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) { if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) {
Log(LogWarning, "JsonRpcConnection") Log(LogWarning, "JsonRpcConnection")
<< "Client for endpoint '" << endpoint->GetName() << "' has requested " << "Client for endpoint '" << endpoint->GetName() << "' has requested "