cluster: Send config updates.

This commit is contained in:
Gunnar Beutner 2013-09-04 15:47:15 +02:00
parent 0a80dbc305
commit 7e7a565770
8 changed files with 146 additions and 9 deletions

View File

@ -29,7 +29,8 @@ install-data-local:
$(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
$(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster/config
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster/log
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}

View File

@ -34,5 +34,13 @@ type ClusterComponent {
type Endpoint {
%attribute string "host",
%attribute string "port"
%attribute string "port",
%attribute array "config_files" {
%attribute string "*"
},
%attribute array "accept_config" {
%attribute string "*"
}
}

View File

@ -53,6 +53,11 @@ void ClusterComponent::Start(void)
m_Identity = GetCertificateCN(cert);
Log(LogInformation, "cluster", "My identity: " + m_Identity);
Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
if (!self)
BOOST_THROW_EXCEPTION(std::invalid_argument("No configuration available for the local endpoint."));
m_SSLContext = MakeSSLContext(GetCertificateFile(), GetCertificateFile(), GetCAFile());
/* create the primary JSON-RPC listener */
@ -256,7 +261,7 @@ void ClusterComponent::OpenLogFile(void)
{
ASSERT(OwnsLock());
String path = GetClusterDir() + "current";
String path = GetClusterDir() + "log/current";
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
@ -292,8 +297,8 @@ void ClusterComponent::RotateLogFile(void)
if (ts == 0)
ts = Utility::GetTime();
String oldpath = GetClusterDir() + "current";
String newpath = GetClusterDir() + Convert::ToString(static_cast<int>(ts) + 1);
String oldpath = GetClusterDir() + "log/current";
String newpath = GetClusterDir() + "log/" + Convert::ToString(static_cast<int>(ts) + 1);
(void) rename(oldpath.CStr(), newpath.CStr());
}
@ -322,11 +327,11 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
RotateLogFile();
std::vector<int> files;
Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
String path = GetClusterDir() + Convert::ToString(ts);
String path = GetClusterDir() + "log/" + Convert::ToString(ts);
if (ts < endpoint->GetLocalLogPosition())
continue;
@ -368,6 +373,20 @@ void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Pt
OpenLogFile();
}
void ClusterComponent::ConfigGlobHandler(const Dictionary::Ptr& config, const String& file)
{
Dictionary::Ptr elem = boost::make_shared<Dictionary>();
std::ifstream fp(file.CStr());
if (!fp)
return;
String content((std::istreambuf_iterator<char>(fp)), std::istreambuf_iterator<char>());
elem->Set("content", content);
config->Set(file, elem);
}
/**
* Processes a new client connection.
*
@ -393,6 +412,26 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
return;
}
Dictionary::Ptr config = boost::make_shared<Dictionary>();
Array::Ptr configFiles = endpoint->GetConfigFiles();
if (configFiles) {
BOOST_FOREACH(const String& pattern, configFiles) {
Utility::Glob(pattern, boost::bind(&ClusterComponent::ConfigGlobHandler, boost::cref(config), _1));
}
}
Dictionary::Ptr params = boost::make_shared<Dictionary>();
params->Set("config_files", config);
Dictionary::Ptr message = boost::make_shared<Dictionary>();
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::Config");
message->Set("params", params);
String json = Value(message).Serialize();
NetString::WriteStringToStream(tlsStream, json);
{
ObjectLock olock(this);
ReplayLog(endpoint, tlsStream);
@ -448,7 +487,7 @@ void ClusterComponent::ClusterTimerHandler(void)
}
std::vector<int> files;
Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
Utility::Glob(GetClusterDir() + "log/*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
@ -464,7 +503,7 @@ void ClusterComponent::ClusterTimerHandler(void)
}
if (!need) {
String path = GetClusterDir() + Convert::ToString(ts);
String path = GetClusterDir() + "log/" + Convert::ToString(ts);
Log(LogInformation, "cluster", "Removing old log file: " + path);
(void) unlink(path.CStr());
}
@ -933,6 +972,41 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
service->ClearAcknowledgement(sender->GetName());
} else if (message->Get("method") == "cluster::SetLogPosition") {
sender->SetLocalLogPosition(params->Get("log_position"));
} else if (message->Get("method") == "cluster::Config") {
Dictionary::Ptr files = params->Get("config_files");
if (!files)
return;
Endpoint::Ptr self = Endpoint::GetByName(GetIdentity());
Array::Ptr acceptConfig = self->GetAcceptConfig();
bool accept = false;
if (acceptConfig) {
BOOST_FOREACH(const String& pattern, acceptConfig) {
if (Utility::Match(pattern, sender->GetName())) {
accept = true;
break;
}
}
}
if (!accept) {
Log(LogWarning, "cluster", "Ignoring cluster::Config message from endpoint '" + sender->GetName() + "'.");
return;
}
String dir = GetClusterDir() + "config/" + SHA256(sender->GetName());
Log(LogInformation, "cluster", "Creating cluster config directory: " + dir);
if (mkdir(dir.CStr(), 0700) < 0 && errno != EEXIST) {
BOOST_THROW_EXCEPTION(posix_error()
<< boost::errinfo_api_function("localtime")
<< boost::errinfo_errno(errno));
}
/* TODO: update files, remove old files, figure out whether we need to restart */
}
}

View File

@ -78,6 +78,8 @@ private:
void AddListener(const String& service);
void AddConnection(const String& node, const String& service);
static void ConfigGlobHandler(const Dictionary::Ptr& config, const String& file);
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ListenerThreadProc(const Socket::Ptr& server);

View File

@ -126,6 +126,16 @@ String Endpoint::GetPort(void) const
return m_Port;
}
Array::Ptr Endpoint::GetConfigFiles(void) const
{
return m_ConfigFiles;
}
Array::Ptr Endpoint::GetAcceptConfig(void) const
{
return m_AcceptConfig;
}
double Endpoint::GetSeen(void) const
{
return m_Seen;
@ -163,6 +173,8 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
if (attributeTypes & Attribute_Config) {
bag->Set("host", m_Host);
bag->Set("port", m_Port);
bag->Set("config_files", m_ConfigFiles);
bag->Set("accept_config", m_AcceptConfig);
}
if (attributeTypes & Attribute_State) {
@ -179,6 +191,8 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
if (attributeTypes & Attribute_Config) {
m_Host = bag->Get("host");
m_Port = bag->Get("port");
m_ConfigFiles = bag->Get("config_files");
m_AcceptConfig = bag->Get("accept_config");
}
if (attributeTypes & Attribute_State) {

View File

@ -22,6 +22,7 @@
#include "base/dynamicobject.h"
#include "base/stream.h"
#include "base/array.h"
#include <boost/signals2.hpp>
namespace icinga
@ -52,6 +53,8 @@ public:
String GetHost(void) const;
String GetPort(void) const;
Array::Ptr GetConfigFiles(void) const;
Array::Ptr GetAcceptConfig(void) const;
double GetSeen(void) const;
void SetSeen(double ts);
@ -70,6 +73,8 @@ private:
Dictionary::Ptr m_Subscriptions;
String m_Host;
String m_Port;
Array::Ptr m_ConfigFiles;
Array::Ptr m_AcceptConfig;
Stream::Ptr m_Client;
double m_Seen;

View File

@ -148,4 +148,35 @@ shared_ptr<X509> GetX509Certificate(const String& pemfile)
return shared_ptr<X509>(cert, X509_free);
}
String SHA256(const String& s)
{
SHA256_CTX context;
unsigned char digest[SHA256_DIGEST_LENGTH];
if (!SHA256_Init(&context)) {
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SHA256_Init")
<< errinfo_openssl_error(ERR_get_error()));
}
if (!SHA256_Update(&context, (unsigned char*)s.CStr(), s.GetLength())) {
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SHA256_Update")
<< errinfo_openssl_error(ERR_get_error()));
}
if (!SHA256_Final(digest, &context)) {
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SHA256_Final")
<< errinfo_openssl_error(ERR_get_error()));
}
int i;
char output[SHA256_DIGEST_LENGTH*2+1];
for (i = 0; i < 32; i++)
sprintf(output + 2 * i, "%02x", digest[i]);
return output;
}
}

View File

@ -28,6 +28,7 @@
#include <openssl/bio.h>
#include <openssl/err.h>
#include <openssl/comp.h>
#include <openssl/sha.h>
namespace icinga
{
@ -35,6 +36,7 @@ namespace icinga
shared_ptr<SSL_CTX> I2_BASE_API MakeSSLContext(const String& pubkey, const String& privkey, const String& cakey);
String I2_BASE_API GetCertificateCN(const shared_ptr<X509>& certificate);
shared_ptr<X509> I2_BASE_API GetX509Certificate(const String& pemfile);
String SHA256(const String& s);
class I2_BASE_API openssl_error : virtual public std::exception, virtual public boost::exception { };