cluster: Implement processing config messages.

This commit is contained in:
Gunnar Beutner 2013-09-05 12:09:09 +02:00
parent 4bf0f6b21f
commit 0fec580e50
2 changed files with 60 additions and 6 deletions

View File

@ -373,7 +373,7 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
OpenLogFile();
}
void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file)
void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename)
{
Dictionary::Ptr elem = boost::make_shared<Dictionary>();
@ -384,7 +384,7 @@ void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const St
String content((std::istreambuf_iterator<char>(fp)), std::istreambuf_iterator<char>());
elem->Set("content", content);
config->Set(file, elem);
config->Set(basename ? Utility::BaseName(file) : file, elem);
}
/**
@ -417,7 +417,7 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
if (configFiles) {
BOOST_FOREACH(const String& pattern, configFiles) {
Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1));
Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1, false));
}
}
@ -973,9 +973,9 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
} else if (message->Get("method") == "cluster::SetLogPosition") {
sender->SetLocalLogPosition(params->Get("log_position"));
} else if (message->Get("method") == "cluster::Config") {
Dictionary::Ptr files = params->Get("config_files");
Dictionary::Ptr remoteConfig = params->Get("config_files");
if (!files)
if (!remoteConfig)
return;
Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
@ -1006,6 +1006,60 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
<< boost::errinfo_errno(errno));
}
Dictionary::Ptr localConfig = boost::make_shared<Dictionary>();
Utility::Glob(dir + "/*", boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(localConfig), _1, true));
bool configChange = false;
/* figure out whether config files were removed */
if (localConfig->GetLength() != remoteConfig->GetLength())
configChange = true;
String key;
Value value;
BOOST_FOREACH(boost::tie(key, value), remoteConfig) {
Dictionary::Ptr remoteFile = value;
bool writeFile = false;
String hash = SHA256(key);
String path = dir + "/" + hash;
if (!localConfig->Contains(hash))
writeFile = true;
else {
Dictionary::Ptr localFile = localConfig->Get(hash);
String localContent = localFile->Get("content");
String remoteContent = remoteFile->Get("content");
if (localContent != remoteContent)
writeFile = true;
}
if (writeFile) {
configChange = true;
Log(LogInformation, "cluster", "Updating configuration file: " + path);
std::ofstream fp(path.CStr(), std::ofstream::out | std::ostream::trunc);
fp << remoteFile->Get("content");
fp.close();
}
localConfig->Remove(hash);
}
BOOST_FOREACH(boost::tie(key, boost::tuples::ignore), localConfig) {
String path = dir + "/" + key;
Log(LogInformation, "cluster", "Removing obsolete config file: " + path);
(void) unlink(path.CStr());
configChange = true;
}
if (configChange) {
Log(LogInformation, "cluster", "Restarting after configuration change.");
Application::RequestRestart();
}
/* TODO: update files, remove old files, figure out whether we need to restart */
}
}

View File

@ -78,7 +78,7 @@ private:
void AddListener(const String& service);
void AddConnection(const String& node, const String& service);
static void ConfigGlobHandler(const Dictionary::Ptr& config, const String& file);
static void ConfigGlobHandler(const Dictionary::Ptr& config, const String& file, bool basename);
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ListenerThreadProc(const Socket::Ptr& server);