cluster: Allow relaying config messages.

This commit is contained in:
Gunnar Beutner 2013-09-06 08:48:15 +02:00
parent 0fec580e50
commit 2207524333
1 changed files with 9 additions and 4 deletions

View File

@ -211,11 +211,11 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent) void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
{ {
message->Set("ts", Utility::GetTime()); double ts = Utility::GetTime();
message->Set("ts", ts);
if (persistent) { if (persistent) {
Dictionary::Ptr pmessage = boost::make_shared<Dictionary>(); Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
double ts = Utility::GetTime();
pmessage->Set("timestamp", ts); pmessage->Set("timestamp", ts);
if (except) if (except)
@ -422,6 +422,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
} }
Dictionary::Ptr params = boost::make_shared<Dictionary>(); Dictionary::Ptr params = boost::make_shared<Dictionary>();
params->Set("identity", GetIdentity());
params->Set("config_files", config); params->Set("config_files", config);
Dictionary::Ptr message = boost::make_shared<Dictionary>(); Dictionary::Ptr message = boost::make_shared<Dictionary>();
@ -783,6 +784,8 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
sender->SendMessage(lmessage); sender->SendMessage(lmessage);
sender->SetRemoteLogPosition(message->Get("ts")); sender->SetRemoteLogPosition(message->Get("ts"));
Log(LogInformation, "cluster", "Acknowledging log position: " + Utility::FormatDateTime("%Y/%m/%d %H:%M:%S", message->Get("ts")));
} }
if (message->Get("method") == "cluster::HeartBeat") { if (message->Get("method") == "cluster::HeartBeat") {
@ -993,12 +996,14 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
} }
} }
String identity = params->Get("identity");
if (!accept) { if (!accept) {
Log(LogWarning, "cluster", "Ignoring cluster::Config message from endpoint '" + sender->GetName() + "'."); Log(LogWarning, "cluster", "Ignoring cluster::Config message from endpoint '" + sender->GetName() + "' for identity '" + identity + "'.");
return; return;
} }
String dir = GetClusterDir() + "config/" + SHA256(sender->GetName()); String dir = GetClusterDir() + "config/" + SHA256(identity);
Log(LogInformation, "cluster", "Creating cluster config directory: " + dir); Log(LogInformation, "cluster", "Creating cluster config directory: " + dir);
if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) { if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) {
BOOST_THROW_EXCEPTION(posix_error() BOOST_THROW_EXCEPTION(posix_error()