cluster: Ignore old messages.

This commit is contained in:
Gunnar Beutner 2013-09-06 14:05:50 +02:00
parent a00a28d31b
commit 82afdc92c8
1 changed files with 18 additions and 10 deletions

View File

@ -769,20 +769,28 @@ void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service
void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
if (message->Contains("ts") && sender->GetRemoteLogPosition() + 10 < message->Get("ts")) {
Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
lparams->Set("log_position", message->Get("ts"));
if (message->Contains("ts")) {
double ts = message->Get("ts");
Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
lmessage->Set("jsonrpc", "2.0");
lmessage->Set("method", "cluster::SetLogPosition");
lmessage->Set("params", lparams);
/* ignore old messages */
if (ts < sender->GetRemoteLogPosition())
return;
sender->SendMessage(lmessage);
if (sender->GetRemoteLogPosition() + 10 < ts) {
Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
lparams->Set("log_position", message->Get("ts"));
sender->SetRemoteLogPosition(message->Get("ts"));
Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
lmessage->Set("jsonrpc", "2.0");
lmessage->Set("method", "cluster::SetLogPosition");
lmessage->Set("params", lparams);
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
sender->SendMessage(lmessage);
sender->SetRemoteLogPosition(message->Get("ts"));
Log(LogInformation, "cluster", "Acknowledging log position for identity '" + sender->GetName() + "': " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
}
}
RelayMessage(sender, message, true);