diff --git a/components/cluster/clustercomponent.cpp b/components/cluster/clustercomponent.cpp index 518aba893..41150e4eb 100644 --- a/components/cluster/clustercomponent.cpp +++ b/components/cluster/clustercomponent.cpp @@ -325,6 +325,8 @@ void ClusterComponent::LogGlobHandler(std::vector& files, const String& fil void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream) { int count = 0; + double peer_ts = endpoint->GetLocalLogPosition(); + bool last_sync = false; ASSERT(!OwnsLock()); @@ -338,15 +340,17 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1)); std::sort(files.begin(), files.end()); - if (files.size() > 1) { + if (count == 0 || count > 50000) { OpenLogFile(); olock.Unlock(); + } else { + last_sync = true; } BOOST_FOREACH(int ts, files) { String path = GetClusterDir() + "log/" + Convert::ToString(ts); - if (ts < endpoint->GetLocalLogPosition()) + if (ts < peer_ts) continue; Log(LogInformation, "cluster", "Replaying log: " + path); @@ -367,7 +371,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt Dictionary::Ptr pmessage = Value::Deserialize(message); - if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition()) + if (pmessage->Get("timestamp") < peer_ts) continue; if (pmessage->Get("except") == endpoint->GetName()) @@ -375,6 +379,8 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt NetString::WriteStringToStream(stream, pmessage->Get("message")); count++; + + peer_ts = pmessage->Get("timestamp"); } lstream->Close(); @@ -382,7 +388,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages."); - if (files.size() == 1) { + if (last_sync) { ObjectLock olock2(endpoint); endpoint->SetSyncing(false);