Don't send heartbeats during log replay

fixes #8461
refs #8485
This commit is contained in:
Gunnar Beutner 2015-02-26 14:59:39 +01:00
parent fd090e057b
commit 4e8c5706e1
4 changed files with 29 additions and 4 deletions

View File

@ -211,6 +211,7 @@ void TlsStream::OnEvent(int revents)
m_SSL.reset(); m_SSL.reset();
m_Socket->Close(); m_Socket->Close();
m_Socket.reset();
m_Eof = true; m_Eof = true;
@ -222,6 +223,7 @@ void TlsStream::OnEvent(int revents)
m_SSL.reset(); m_SSL.reset();
m_Socket->Close(); m_Socket->Close();
m_Socket.reset();
m_ErrorCode = ERR_peek_error(); m_ErrorCode = ERR_peek_error();
m_ErrorOccurred = true; m_ErrorOccurred = true;
@ -284,6 +286,8 @@ void TlsStream::Write(const void *buffer, size_t count)
*/ */
void TlsStream::Close(void) void TlsStream::Close(void)
{ {
SocketEvents::Unregister();
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
if (!m_SSL) if (!m_SSL)
@ -292,9 +296,8 @@ void TlsStream::Close(void)
(void) SSL_shutdown(m_SSL.get()); (void) SSL_shutdown(m_SSL.get());
m_SSL.reset(); m_SSL.reset();
SocketEvents::Unregister();
m_Socket->Close(); m_Socket->Close();
m_Socket.reset();
m_Eof = true; m_Eof = true;
} }

View File

@ -45,6 +45,12 @@ void ApiClient::HeartbeatTimerHandler(void)
{ {
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjectsByType<Endpoint>()) { BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjectsByType<Endpoint>()) {
BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients()) { BOOST_FOREACH(const ApiClient::Ptr& client, endpoint->GetClients()) {
if (endpoint->GetSyncing()) {
Log(LogInformation, "ApiClient")
<< "Not sending heartbeat for endpoint '" << endpoint->GetName() << "' because we're replaying the log for it.";
continue;
}
if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) { if (client->m_NextHeartbeat != 0 && client->m_NextHeartbeat < Utility::GetTime()) {
Log(LogWarning, "ApiClient") Log(LogWarning, "ApiClient")
<< "Client for endpoint '" << endpoint->GetName() << "' has requested " << "Client for endpoint '" << endpoint->GetName() << "' has requested "

View File

@ -259,7 +259,7 @@ Value RequestCertificateHandler(const MessageOrigin& origin, const Dictionary::P
void ApiClient::TimeoutTimerHandler(void) void ApiClient::TimeoutTimerHandler(void)
{ {
if (m_Seen < Utility::GetTime() - 60) { if (m_Seen < Utility::GetTime() - 60 && !m_Endpoint->GetSyncing()) {
/* Obtain a strong reference to ourselves because Disconnect otherwise removes the last reference */ /* Obtain a strong reference to ourselves because Disconnect otherwise removes the last reference */
ApiClient::Ptr self = this; ApiClient::Ptr self = this;

View File

@ -20,6 +20,7 @@
#include "remote/apilistener.hpp" #include "remote/apilistener.hpp"
#include "remote/apiclient.hpp" #include "remote/apiclient.hpp"
#include "remote/endpoint.hpp" #include "remote/endpoint.hpp"
#include "remote/jsonrpc.hpp"
#include "base/convert.hpp" #include "base/convert.hpp"
#include "base/netstring.hpp" #include "base/netstring.hpp"
#include "base/json.hpp" #include "base/json.hpp"
@ -659,6 +660,7 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
int count = -1; int count = -1;
double peer_ts = endpoint->GetLocalLogPosition(); double peer_ts = endpoint->GetLocalLogPosition();
double logpos_ts = peer_ts;
bool last_sync = false; bool last_sync = false;
Endpoint::Ptr target_endpoint = client->GetEndpoint(); Endpoint::Ptr target_endpoint = client->GetEndpoint();
@ -697,7 +699,7 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
Log(LogNotice, "ApiListener") Log(LogNotice, "ApiListener")
<< "Replaying log: " << path; << "Replaying log: " << path;
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in); std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in | std::fstream::binary);
StdioStream::Ptr logStream = new StdioStream(fp, true); StdioStream::Ptr logStream = new StdioStream(fp, true);
String message; String message;
@ -747,6 +749,20 @@ void ApiListener::ReplayLog(const ApiClient::Ptr& client)
count++; count++;
peer_ts = pmessage->Get("timestamp"); peer_ts = pmessage->Get("timestamp");
if (ts > logpos_ts + 10) {
logpos_ts = ts;
Dictionary::Ptr lparams = new Dictionary();
lparams->Set("log_position", logpos_ts);
Dictionary::Ptr lmessage = new Dictionary();
lmessage->Set("jsonrpc", "2.0");
lmessage->Set("method", "log::SetLogPosition");
lmessage->Set("params", lparams);
JsonRpc::SendMessage(client->GetStream(), lmessage);
}
} }
logStream->Close(); logStream->Close();