cluster: Fix how replaying old messages works.

This commit is contained in:
Gunnar Beutner 2013-09-03 10:08:02 +02:00
parent 782ff8b13c
commit b844ea1bcb
5 changed files with 168 additions and 76 deletions

View File

@ -29,7 +29,7 @@ install-data-local:
$(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
$(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
$(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}/cluster
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}

View File

@ -19,11 +19,15 @@
#include "cluster/clustercomponent.h"
#include "cluster/endpoint.h"
#include "base/netstring.h"
#include "base/dynamictype.h"
#include "base/logger_fwd.h"
#include "base/objectlock.h"
#include "base/networkstream.h"
#include "base/application.h"
#include "base/convert.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <fstream>
using namespace icinga;
@ -36,6 +40,8 @@ void ClusterComponent::Start(void)
{
DynamicObject::Start();
OpenLogFile();
/* set up SSL context */
shared_ptr<X509> cert = GetX509Certificate(GetCertificateFile());
m_Identity = GetCertificateCN(cert);
@ -76,7 +82,7 @@ void ClusterComponent::Start(void)
*/
void ClusterComponent::Stop(void)
{
/* Nothing to do here. */
CloseLogFile();
}
String ClusterComponent::GetCertificateFile(void) const
@ -192,6 +198,24 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
{
message->Set("ts", Utility::GetTime());
if (persistent) {
Dictionary::Ptr pmessage = boost::make_shared<Dictionary>();
pmessage->Set("timestamp", Utility::GetTime());
pmessage->Set("message", message);
ObjectLock olock(this);
String json = Value(pmessage).Serialize();
NetString::WriteStringToStream(m_LogFile, json);
m_LogMessageCount++;
if (m_LogMessageCount > 250000) {
CloseLogFile();
OpenLogFile();
}
}
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (!persistent && !endpoint->IsConnected())
continue;
@ -206,6 +230,83 @@ void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionar
}
}
String ClusterComponent::GetClusterDir(void) const
{
return Application::GetLocalStateDir() + "/lib/icinga2/cluster/";
}
void ClusterComponent::OpenLogFile(void)
{
std::ostringstream msgbuf;
msgbuf << GetClusterDir() << static_cast<long>(Utility::GetTime());
String path = msgbuf.str();
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
if (!fp->good()) {
Log(LogWarning, "cluster", "Could not open spool file: " + path);
return;
}
m_LogFile = boost::make_shared<StdioStream>(fp, true);
m_LogMessageCount = 0;
}
void ClusterComponent::CloseLogFile(void)
{
m_LogFile->Close();
m_LogFile.reset();
}
void ClusterComponent::LogGlobHandler(std::vector<int>& files, const String& file)
{
String name = Utility::BaseName(file);
int ts = Convert::ToLong(name);
files.push_back(ts);
}
void ClusterComponent::ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream)
{
int count = 0;
ASSERT(OwnsLock());
CloseLogFile();
std::vector<int> files;
Utility::Glob(GetClusterDir() + "*", boost::bind(&ClusterComponent::LogGlobHandler, boost::ref(files), _1));
std::sort(files.begin(), files.end());
BOOST_FOREACH(int ts, files) {
std::ostringstream msgbuf;
msgbuf << GetClusterDir() << ts;
String path = msgbuf.str();
Log(LogInformation, "cluster", "Replaying log: " + path);
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::in);
StdioStream::Ptr lstream = boost::make_shared<StdioStream>(fp, true);
String message;
while (NetString::ReadStringFromStream(lstream, &message)) {
Dictionary::Ptr pmessage = Value::Deserialize(message);
if (pmessage->Get("timestamp") < endpoint->GetLocalLogPosition())
continue;
String json = Value(pmessage->Get("message")).Serialize();
NetString::WriteStringToStream(stream, json);
count++;
}
lstream->Close();
}
Log(LogInformation, "cluster", "Replayed " + Convert::ToString(count) + " messages.");
OpenLogFile();
}
/**
* Processes a new client connection.
*
@ -231,6 +332,11 @@ void ClusterComponent::NewClientHandler(const Socket::Ptr& client, TlsRole role)
return;
}
{
ObjectLock olock(this);
ReplayLog(endpoint, tlsStream);
}
endpoint->SetClient(tlsStream);
}
@ -540,6 +646,18 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
return;
}
if (sender->GetRemoteLogPosition() + 10 < message->Get("ts")) {
Dictionary::Ptr lparams = boost::make_shared<Dictionary>();
lparams->Set("log_position", message->Get("ts"));
Dictionary::Ptr lmessage = boost::make_shared<Dictionary>();
lmessage->Set("jsonrpc", "2.0");
lmessage->Set("method", "cluster::SetLogPosition");
lmessage->Set("params", lparams);
sender->SendMessage(lmessage);
}
Dictionary::Ptr params = message->Get("params");
if (!params)
@ -720,6 +838,8 @@ void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Diction
ObjectLock olock(service);
service->ClearAcknowledgement(sender->GetName());
} else if (message->Get("method") == "cluster::SetLogPosition") {
sender->SetRemoteLogPosition(params->Get("log_position"));
}
}

View File

@ -27,6 +27,7 @@
#include "base/tlsstream.h"
#include "base/utility.h"
#include "base/tlsutility.h"
#include "base/stdiostream.h"
#include "icinga/service.h"
#include "cluster/endpoint.h"
@ -50,6 +51,7 @@ public:
String GetBindHost(void) const;
String GetBindPort(void) const;
Array::Ptr GetPeers(void) const;
String GetClusterDir(void) const;
shared_ptr<SSL_CTX> GetSSLContext(void) const;
String GetIdentity(void) const;
@ -81,6 +83,14 @@ private:
void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent);
void OpenLogFile(void);
void CloseLogFile(void);
static void LogGlobHandler(std::vector<int>& files, const String& file);
void ReplayLog(const Endpoint::Ptr& endpoint, const Stream::Ptr& stream);
StdioStream::Ptr m_LogFile;
size_t m_LogMessageCount;
void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority);
@ -97,7 +107,6 @@ private:
void AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority);
void AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority);
void MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message);
};
}

View File

@ -26,7 +26,6 @@
#include "base/logger_fwd.h"
#include "config/configitembuilder.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <fstream>
using namespace icinga;
@ -35,41 +34,6 @@ REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
void Endpoint::OnConfigLoaded(void)
{
ObjectLock olock(this);
OpenSpoolFile();
}
void Endpoint::Stop(void)
{
ObjectLock olock(this);
CloseSpoolFile();
}
void Endpoint::OpenSpoolFile(void)
{
ASSERT(OwnsLock());
String path = GetSpoolPath();
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
if (!fp->good())
Log(LogWarning, "cluster", "Could not open spool file: " + path);
m_SpoolFile = boost::make_shared<StdioStream>(fp, true);
}
void Endpoint::CloseSpoolFile(void)
{
ASSERT(OwnsLock());
m_SpoolFile->Close();
m_SpoolFile.reset();
}
/**
* Checks whether this endpoint is connected.
*
@ -92,24 +56,6 @@ void Endpoint::SetClient(const Stream::Ptr& client)
{
ObjectLock olock(this);
CloseSpoolFile();
String path = GetSpoolPath();
std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in);
while (fp && !fp->eof()) {
char data[1024];
fp->read(data, sizeof(data));
client->Write(data, fp->gcount());
}
fp->close();
delete fp;
(void) unlink(path.CStr());
OpenSpoolFile();
m_Client = client;
}
@ -121,15 +67,13 @@ void Endpoint::SetClient(const Stream::Ptr& client)
void Endpoint::SendMessage(const Dictionary::Ptr& message)
{
Stream::Ptr destination;
Stream::Ptr client = GetClient();
if (!IsConnected())
destination = m_SpoolFile;
else
destination = GetClient();
if (!client)
return;
try {
JsonRpc::SendMessage(destination, message);
JsonRpc::SendMessage(client, message);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
@ -192,9 +136,24 @@ void Endpoint::SetSeen(double ts)
m_Seen = ts;
}
String Endpoint::GetSpoolPath(void) const
double Endpoint::GetLocalLogPosition(void) const
{
return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns";
return m_LocalLogPosition;
}
void Endpoint::SetLocalLogPosition(double ts)
{
m_LocalLogPosition = ts;
}
double Endpoint::GetRemoteLogPosition(void) const
{
return m_RemoteLogPosition;
}
void Endpoint::SetRemoteLogPosition(double ts)
{
m_RemoteLogPosition = ts;
}
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
@ -206,8 +165,11 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
bag->Set("port", m_Port);
}
if (attributeTypes & Attribute_State)
if (attributeTypes & Attribute_State) {
bag->Set("seen", m_Seen);
bag->Set("local_log_position", m_LocalLogPosition);
bag->Set("remote_log_position", m_RemoteLogPosition);
}
}
void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
@ -219,6 +181,9 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
m_Port = bag->Get("port");
}
if (attributeTypes & Attribute_State)
if (attributeTypes & Attribute_State) {
m_Seen = bag->Get("seen");
m_LocalLogPosition = bag->Get("local_log_position");
m_RemoteLogPosition = bag->Get("remote_log_position");
}
}

View File

@ -22,7 +22,6 @@
#include "base/dynamicobject.h"
#include "base/stream.h"
#include "base/stdiostream.h"
#include <boost/signals2.hpp>
namespace icinga
@ -57,15 +56,16 @@ public:
double GetSeen(void) const;
void SetSeen(double ts);
String GetSpoolPath(void) const;
double GetLocalLogPosition(void) const;
void SetLocalLogPosition(double ts);
double GetRemoteLogPosition(void) const;
void SetRemoteLogPosition(double ts);
protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
virtual void OnConfigLoaded(void);
virtual void Stop(void);
private:
Dictionary::Ptr m_Subscriptions;
String m_Host;
@ -73,12 +73,10 @@ private:
Stream::Ptr m_Client;
double m_Seen;
StdioStream::Ptr m_SpoolFile;
double m_LocalLogPosition;
double m_RemoteLogPosition;
void MessageThreadProc(const Stream::Ptr& stream);
void OpenSpoolFile(void);
void CloseSpoolFile(void);
};
}