cluster: Implement persistent messages.

This commit is contained in:
Gunnar Beutner 2013-09-02 15:12:20 +02:00
parent c81a88057c
commit 754dbfb8ef
5 changed files with 112 additions and 57 deletions

View File

@ -29,6 +29,7 @@ install-data-local:
$(MKDIR_P) $(DESTDIR)${localstatedir}/log/${PACKAGE}/compat/archives
$(MKDIR_P) $(DESTDIR)${localstatedir}/cache/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/perfdata
$(MKDIR_P) $(DESTDIR)${localstatedir}/spool/${PACKAGE}/cluster
$(MKDIR_P) $(DESTDIR)${localstatedir}/lib/${PACKAGE}
$(MKDIR_P) $(DESTDIR)${localstatedir}/run/${PACKAGE}

View File

@ -190,6 +190,22 @@ void ClusterComponent::AddConnection(const String& node, const String& service)
Utility::QueueAsyncCallback(boost::bind(&ClusterComponent::NewClientHandler, this, client, TlsRoleClient));
}
void ClusterComponent::RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent)
{
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (!persistent && !endpoint->IsConnected())
continue;
if (endpoint == except)
continue;
if (endpoint->GetName() == GetIdentity())
continue;
endpoint->SendMessage(message);
}
}
/**
* Processes a new client connection.
*
@ -225,9 +241,7 @@ void ClusterComponent::ClusterTimerHandler(void)
message->Set("jsonrpc", "2.0");
message->Set("method", "cluster::HeartBeat");
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, false);
Array::Ptr peers = GetPeers();
@ -274,9 +288,7 @@ void ClusterComponent::CheckResultHandler(const Service::Ptr& service, const Dic
message->Set("method", "cluster::CheckResult");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority)
@ -293,9 +305,7 @@ void ClusterComponent::NextCheckChangedHandler(const Service::Ptr& service, doub
message->Set("method", "cluster::SetNextCheck");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& notification, double nextNotification, const String& authority)
@ -312,9 +322,7 @@ void ClusterComponent::NextNotificationChangedHandler(const Notification::Ptr& n
message->Set("method", "cluster::SetNextNotification");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
@ -331,9 +339,7 @@ void ClusterComponent::ForceNextCheckChangedHandler(const Service::Ptr& service,
message->Set("method", "cluster::SetForceNextCheck");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& service, bool forced, const String& authority)
@ -350,9 +356,7 @@ void ClusterComponent::ForceNextNotificationChangedHandler(const Service::Ptr& s
message->Set("method", "cluster::SetForceNextNotification");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@ -369,9 +373,7 @@ void ClusterComponent::EnableActiveChecksChangedHandler(const Service::Ptr& serv
message->Set("method", "cluster::SetEnableActiveChecks");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@ -388,9 +390,7 @@ void ClusterComponent::EnablePassiveChecksChangedHandler(const Service::Ptr& ser
message->Set("method", "cluster::SetEnablePassiveChecks");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@ -407,9 +407,7 @@ void ClusterComponent::EnableNotificationsChangedHandler(const Service::Ptr& ser
message->Set("method", "cluster::SetEnableNotifications");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service, bool enabled, const String& authority)
@ -426,9 +424,7 @@ void ClusterComponent::EnableFlappingChangedHandler(const Service::Ptr& service,
message->Set("method", "cluster::SetEnableFlapping");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
@ -445,9 +441,7 @@ void ClusterComponent::CommentAddedHandler(const Service::Ptr& service, const Di
message->Set("method", "cluster::AddComment");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& comment, const String& authority)
@ -464,9 +458,7 @@ void ClusterComponent::CommentRemovedHandler(const Service::Ptr& service, const
message->Set("method", "cluster::RemoveComment");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
@ -483,9 +475,7 @@ void ClusterComponent::DowntimeAddedHandler(const Service::Ptr& service, const D
message->Set("method", "cluster::AddDowntime");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const Dictionary::Ptr& downtime, const String& authority)
@ -502,9 +492,7 @@ void ClusterComponent::DowntimeRemovedHandler(const Service::Ptr& service, const
message->Set("method", "cluster::RemoveDowntime");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, const String& author, const String& comment, AcknowledgementType type, double expiry, const String& authority)
@ -524,9 +512,7 @@ void ClusterComponent::AcknowledgementSetHandler(const Service::Ptr& service, co
message->Set("method", "cluster::SetAcknowledgement");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service, const String& authority)
@ -542,17 +528,12 @@ void ClusterComponent::AcknowledgementClearedHandler(const Service::Ptr& service
message->Set("method", "cluster::ClearAcknowledgement");
message->Set("params", params);
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
endpoint->SendMessage(message);
}
RelayMessage(Endpoint::Ptr(), message, true);
}
void ClusterComponent::MessageHandler(const Endpoint::Ptr& sender, const Dictionary::Ptr& message)
{
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
if (sender != endpoint)
endpoint->SendMessage(message);
}
RelayMessage(sender, message, true);
if (message->Get("method") == "cluster::HeartBeat") {
sender->SetSeen(Utility::GetTime());

View File

@ -79,6 +79,8 @@ private:
void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ListenerThreadProc(const Socket::Ptr& server);
void RelayMessage(const Endpoint::Ptr& except, const Dictionary::Ptr& message, bool persistent);
void CheckResultHandler(const Service::Ptr& service, const Dictionary::Ptr& cr, const String& authority);
void NextCheckChangedHandler(const Service::Ptr& service, double nextCheck, const String& authority);
void NextNotificationChangedHandler(const Notification::Ptr& notification, double nextCheck, const String& authority);

View File

@ -26,6 +26,7 @@
#include "base/logger_fwd.h"
#include "config/configitembuilder.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <fstream>
using namespace icinga;
@ -34,6 +35,41 @@ REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
void Endpoint::OnConfigLoaded(void)
{
ObjectLock olock(this);
OpenSpoolFile();
}
void Endpoint::Stop(void)
{
ObjectLock olock(this);
CloseSpoolFile();
}
void Endpoint::OpenSpoolFile(void)
{
ASSERT(OwnsLock());
String path = GetSpoolPath();
std::fstream *fp = new std::fstream(path.CStr(), std::fstream::out | std::ofstream::app);
if (!fp->good())
Log(LogWarning, "cluster", "Could not open spool file: " + path);
m_SpoolFile = boost::make_shared<StdioStream>(fp, true);
}
void Endpoint::CloseSpoolFile(void)
{
ASSERT(OwnsLock());
m_SpoolFile->Close();
m_SpoolFile.reset();
}
/**
* Checks whether this endpoint is connected.
*
@ -56,6 +92,24 @@ void Endpoint::SetClient(const Stream::Ptr& client)
{
ObjectLock olock(this);
CloseSpoolFile();
String path = GetSpoolPath();
std::ifstream *fp = new std::ifstream(path.CStr(), std::ifstream::in);
while (fp && !fp->eof()) {
char data[1024];
fp->read(data, sizeof(data));
client->Write(data, fp->gcount());
}
fp->close();
delete fp;
(void) unlink(path.CStr());
OpenSpoolFile();
m_Client = client;
}
@ -67,13 +121,15 @@ void Endpoint::SetClient(const Stream::Ptr& client)
void Endpoint::SendMessage(const Dictionary::Ptr& message)
{
if (!IsConnected()) {
// TODO: persist the message
return;
}
Stream::Ptr destination;
if (!IsConnected())
destination = m_SpoolFile;
else
destination = GetClient();
try {
JsonRpc::SendMessage(GetClient(), message);
JsonRpc::SendMessage(destination, message);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
@ -136,6 +192,11 @@ void Endpoint::SetSeen(double ts)
m_Seen = ts;
}
String Endpoint::GetSpoolPath(void) const
{
return Application::GetLocalStateDir() + "/spool/icinga2/cluster/" + GetName() + ".ns";
}
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
{
DynamicObject::InternalSerialize(bag, attributeTypes);

View File

@ -22,6 +22,7 @@
#include "base/dynamicobject.h"
#include "base/stream.h"
#include "base/stdiostream.h"
#include <boost/signals2.hpp>
namespace icinga
@ -56,10 +57,15 @@ public:
double GetSeen(void) const;
void SetSeen(double ts);
String GetSpoolPath(void) const;
protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
virtual void InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes);
virtual void OnConfigLoaded(void);
virtual void Stop(void);
private:
Dictionary::Ptr m_Subscriptions;
String m_Host;
@ -67,8 +73,12 @@ private:
Stream::Ptr m_Client;
double m_Seen;
StdioStream::Ptr m_SpoolFile;
void MessageThreadProc(const Stream::Ptr& stream);
void OpenSpoolFile(void);
void CloseSpoolFile(void);
};
}