cluster: Don't replay log messages twice.

This commit is contained in:
Gunnar Beutner 2013-09-16 09:49:28 +02:00
parent e1b8e1180c
commit 644716fb1f
1 changed files with 10 additions and 4 deletions

View File

@ -325,6 +325,8 @@ void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& fil
void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream) void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
{ {
int count = 0; int count = 0;
double peer_ts = endpoint->GetLocalLogPosition();
bool last_sync = false;
ASSERT(!OwnsLock()); ASSERT(!OwnsLock());
@ -338,15 +340,17 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end()); std::sort(files.begin(), files.end());
if (files.size() > 1) { if (count == 0 || count > 50000) {
OpenLogFile(); OpenLogFile();
olock.Unlock(); olock.Unlock();
} else {
last_sync = true;
} }
BOOST_FOREACH(int ts, files) { BOOST_FOREACH(int ts, files) {
String path = GetClusterDir() + "log/" + Convert::ToString(ts); String path = GetClusterDir() + "log/" + Convert::ToString(ts);
if (ts < endpoint->GetLocalLogPosition()) if (ts < peer_ts)
continue; continue;
Log(LogInformation, "cluster", "Replaying log: " + path); Log(LogInformation, "cluster", "Replaying log: " + path);
@ -367,7 +371,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
Dictionary::Ptr pmessage = Value::Deserialize(message); Dictionary::Ptr pmessage = Value::Deserialize(message);
if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition()) if (pmessage->Get("timestamp") < peer_ts)
continue; continue;
if (pmessage->Get("except") == endpoint->GetName()) if (pmessage->Get("except") == endpoint->GetName())
@ -375,6 +379,8 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
NetString::WriteStringToStream(stream, pmessage->Get("message")); NetString::WriteStringToStream(stream, pmessage->Get("message"));
count++; count++;
peer_ts = pmessage->Get("timestamp");
} }
lstream->Close(); lstream->Close();
@ -382,7 +388,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages."); Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
if (files.size() == 1) { if (last_sync) {
ObjectLock olock2(endpoint); ObjectLock olock2(endpoint);
endpoint->SetSyncing(false); endpoint->SetSyncing(false);