Clean up JSON-RPC library.

This commit is contained in:
Gunnar Beutner 2013-08-26 16:53:17 +02:00
parent e9e55cd8c0
commit 45b8d96461
15 changed files with 240 additions and 1065 deletions

View File

@ -24,6 +24,7 @@
#include "base/logger_fwd.h"
#include "base/utility.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <boost/exception/diagnostic_information.hpp>
#include <boost/foreach.hpp>
using namespace icinga;

View File

@ -25,8 +25,6 @@
#include "icinga/host.h"
#include "icinga/timeperiod.h"
#include "icinga/notification.h"
#include "remoting/requestmessage.h"
#include "remoting/endpoint.h"
#include "base/i2-base.h"
#include "base/dynamicobject.h"
#include "base/array.h"

View File

@ -15,13 +15,7 @@ libremoting_la_SOURCES = \
i2-remoting.h \
jsonrpc.cpp \
jsonrpc.h \
messagepart.cpp \
messagepart.h \
remoting-type.cpp \
requestmessage.cpp \
requestmessage.h \
responsemessage.cpp \
responsemessage.h
remoting-type.cpp
libremoting_la_CPPFLAGS = \
-DI2_REMOTING_BUILD \

View File

@ -33,37 +33,7 @@ using namespace icinga;
REGISTER_TYPE(Endpoint);
boost::signals2::signal<void (const Endpoint::Ptr&)> Endpoint::OnConnected;
/**
* Helper function for creating new endpoint objects.
*
* @param name The name of the new endpoint.
* @param replicated Whether replication is enabled for the endpoint object.
* @param local Whether the new endpoint should be local.
* @returns The new endpoint.
*/
Endpoint::Ptr Endpoint::MakeEndpoint(const String& name, bool replicated, bool local)
{
ConfigItemBuilder::Ptr endpointConfig = boost::make_shared<ConfigItemBuilder>();
endpointConfig->SetType("Endpoint");
endpointConfig->SetName((!replicated && local) ? "local:" + name : name);
//TODO: endpointConfig->SetLocal(!replicated);
endpointConfig->AddExpression("local", OperatorSet, local);
ConfigItem::Ptr item = endpointConfig->Compile();
DynamicObject::Ptr object = item->Commit();
return dynamic_pointer_cast<Endpoint>(object);
}
/**
* Checks whether this is a local endpoint.
*
* @returns true if this is a local endpoint, false otherwise.
*/
bool Endpoint::IsLocalEndpoint(void) const
{
return m_Local;
}
boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> Endpoint::OnMessageReceived;
/**
* Checks whether this endpoint is connected.
@ -72,11 +42,7 @@ bool Endpoint::IsLocalEndpoint(void) const
*/
bool Endpoint::IsConnected(void) const
{
if (IsLocalEndpoint()) {
return true;
} else {
return GetClient();
}
return GetClient();
}
Stream::Ptr Endpoint::GetClient(void) const
@ -100,159 +66,28 @@ void Endpoint::SetClient(const Stream::Ptr& client)
OnConnected(GetSelf());
}
/**
* Registers a topic subscription for this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::RegisterSubscription(const String& topic)
{
Dictionary::Ptr subscriptions = GetSubscriptions();
if (!subscriptions)
subscriptions = boost::make_shared<Dictionary>();
if (!subscriptions->Contains(topic)) {
Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
newSubscriptions->Set(topic, topic);
ObjectLock olock(this);
SetSubscriptions(newSubscriptions);
}
}
/**
* Removes a topic subscription from this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::UnregisterSubscription(const String& topic)
{
Dictionary::Ptr subscriptions = GetSubscriptions();
if (!subscriptions)
return;
if (subscriptions->Contains(topic)) {
Dictionary::Ptr newSubscriptions = subscriptions->ShallowClone();
newSubscriptions->Remove(topic);
SetSubscriptions(newSubscriptions);
}
}
/**
* Checks whether the endpoint has a subscription for the specified topic.
*
* @param topic The name of the topic.
* @returns true if the endpoint is subscribed to the topic, false otherwise.
*/
bool Endpoint::HasSubscription(const String& topic) const
{
Dictionary::Ptr subscriptions = GetSubscriptions();
return (subscriptions && subscriptions->Contains(topic));
}
/**
* Removes all subscriptions for the endpoint.
*/
void Endpoint::ClearSubscriptions(void)
{
m_Subscriptions = Empty;
}
Dictionary::Ptr Endpoint::GetSubscriptions(void) const
{
return m_Subscriptions;
}
void Endpoint::SetSubscriptions(const Dictionary::Ptr& subscriptions)
{
subscriptions->Seal();
m_Subscriptions = subscriptions;
}
void Endpoint::RegisterTopicHandler(const String& topic, const boost::function<Endpoint::Callback>& callback)
{
ObjectLock olock(this);
std::map<String, shared_ptr<boost::signals2::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(topic);
shared_ptr<boost::signals2::signal<Endpoint::Callback> > sig;
if (it == m_TopicHandlers.end()) {
sig = boost::make_shared<boost::signals2::signal<Endpoint::Callback> >();
m_TopicHandlers.insert(make_pair(topic, sig));
} else {
sig = it->second;
}
sig->connect(callback);
olock.Unlock();
RegisterSubscription(topic);
}
void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& request)
void Endpoint::SendMessage(const Dictionary::Ptr& message)
{
if (!IsConnected()) {
// TODO: persist the message
return;
}
if (IsLocalEndpoint()) {
ObjectLock olock(this);
try {
JsonRpc::SendMessage(GetClient(), message);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
Log(LogWarning, "remoting", msgbuf.str());
String method;
if (!request.GetMethod(&method))
return;
std::map<String, shared_ptr<boost::signals2::signal<Endpoint::Callback> > >::iterator it;
it = m_TopicHandlers.find(method);
if (it == m_TopicHandlers.end())
return;
Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else {
try {
JsonRpc::SendMessage(GetClient(), request);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
Log(LogWarning, "remoting", msgbuf.str());
m_Client.reset();
}
}
}
void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& response)
{
if (!IsConnected())
return;
if (IsLocalEndpoint())
EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
else {
try {
JsonRpc::SendMessage(GetClient(), response);
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error while sending JSON-RPC message for endpoint '" << GetName() << "': " << boost::diagnostic_information(ex);
Log(LogWarning, "remoting", msgbuf.str());
m_Client.reset();
}
m_Client.reset();
}
}
void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
{
for (;;) {
MessagePart message;
Dictionary::Ptr message;
try {
message = JsonRpc::ReadMessage(stream);
@ -262,26 +97,7 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
m_Client.reset();
}
Endpoint::Ptr sender = GetSelf();
if (ResponseMessage::IsResponseMessage(message)) {
/* rather than routing the message to the right virtual
* endpoint we just process it here right away. */
EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
return;
}
RequestMessage request = message;
String method;
if (!request.GetMethod(&method))
return;
String id;
if (request.GetID(&id))
EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
Utility::QueueAsyncCallback(bind(boost::ref(Endpoint::OnMessageReceived), GetSelf(), message));
}
}
@ -290,11 +106,11 @@ void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
*
* @returns The node address (hostname).
*/
String Endpoint::GetNode(void) const
String Endpoint::GetHost(void) const
{
ObjectLock olock(this);
return m_Node;
return m_Host;
}
/**
@ -302,11 +118,11 @@ String Endpoint::GetNode(void) const
*
* @returns The service name (port).
*/
String Endpoint::GetService(void) const
String Endpoint::GetPort(void) const
{
ObjectLock olock(this);
return m_Service;
return m_Port;
}
void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const
@ -315,11 +131,9 @@ void Endpoint::InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes)
if (attributeTypes & Attribute_Config) {
bag->Set("local", m_Local);
bag->Set("node", m_Node);
bag->Set("service", m_Service);
bag->Set("host", m_Host);
bag->Set("port", m_Port);
}
bag->Set("subscriptions", m_Subscriptions);
}
void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeTypes)
@ -328,9 +142,7 @@ void Endpoint::InternalDeserialize(const Dictionary::Ptr& bag, int attributeType
if (attributeTypes & Attribute_Config) {
m_Local = bag->Get("local");
m_Node = bag->Get("node");
m_Service = bag->Get("service");
m_Host = bag->Get("host");
m_Port = bag->Get("port");
}
bag->Set("subscriptions", m_Subscriptions);
}
}

View File

@ -21,8 +21,6 @@
#define ENDPOINT_H
#include "remoting/i2-remoting.h"
#include "remoting/requestmessage.h"
#include "remoting/responsemessage.h"
#include "base/dynamicobject.h"
#include "base/stream.h"
#include <boost/signals2.hpp>
@ -43,34 +41,18 @@ public:
DECLARE_PTR_TYPEDEFS(Endpoint);
DECLARE_TYPENAME(Endpoint);
typedef void (Callback)(const Endpoint::Ptr&, const Endpoint::Ptr&, const RequestMessage&);
static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
static boost::signals2::signal<void (const Endpoint::Ptr&, const Dictionary::Ptr&)> OnMessageReceived;
Stream::Ptr GetClient(void) const;
void SetClient(const Stream::Ptr& client);
void RegisterSubscription(const String& topic);
void UnregisterSubscription(const String& topic);
bool HasSubscription(const String& topic) const;
Dictionary::Ptr GetSubscriptions(void) const;
void SetSubscriptions(const Dictionary::Ptr& subscriptions);
bool IsLocalEndpoint(void) const;
bool IsConnected(void) const;
void ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage& message);
void ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessage& message);
void SendMessage(const Dictionary::Ptr& request);
void ClearSubscriptions(void);
void RegisterTopicHandler(const String& topic, const boost::function<Callback>& callback);
String GetNode(void) const;
String GetService(void) const;
static Endpoint::Ptr MakeEndpoint(const String& name, bool replicated, bool local = true);
static boost::signals2::signal<void (const Endpoint::Ptr&)> OnConnected;
String GetHost(void) const;
String GetPort(void) const;
protected:
virtual void InternalSerialize(const Dictionary::Ptr& bag, int attributeTypes) const;
@ -79,18 +61,11 @@ protected:
private:
bool m_Local;
Dictionary::Ptr m_Subscriptions;
String m_Node;
String m_Service;
String m_Host;
String m_Port;
Stream::Ptr m_Client;
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
bool m_SentWelcome; /**< Have we sent a welcome message to this
endpoint? */
std::map<String, shared_ptr<boost::signals2::signal<Callback> > > m_TopicHandlers;
void MessageThreadProc(const Stream::Ptr& stream);
};

View File

@ -34,18 +34,7 @@ using namespace icinga;
* Constructor for the EndpointManager class.
*/
EndpointManager::EndpointManager(void)
: m_NextMessageID(0)
{
m_RequestTimer = boost::make_shared<Timer>();
m_RequestTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::RequestTimerHandler, this));
m_RequestTimer->SetInterval(5);
m_RequestTimer->Start();
m_SubscriptionTimer = boost::make_shared<Timer>();
m_SubscriptionTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::SubscriptionTimerHandler, this));
m_SubscriptionTimer->SetInterval(5);
m_SubscriptionTimer->Start();
m_ReconnectTimer = boost::make_shared<Timer>();
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&EndpointManager::ReconnectTimerHandler, this));
m_ReconnectTimer->SetInterval(5);
@ -87,16 +76,6 @@ void EndpointManager::SetIdentity(const String& identity)
ObjectLock olock(this);
m_Identity = identity;
if (m_Endpoint)
m_Endpoint->Stop();
Endpoint::Ptr endpoint = DynamicObject::GetObject<Endpoint>(identity);
if (endpoint)
m_Endpoint = endpoint;
else
m_Endpoint = Endpoint::MakeEndpoint(identity, true, true);
}
/**
@ -200,175 +179,173 @@ void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
Endpoint::Ptr endpoint = Endpoint::GetByName(identity);
if (!endpoint)
endpoint = Endpoint::MakeEndpoint(identity, true);
if (!endpoint) {
Log(LogInformation, "remoting", "Closing endpoint '" + identity + "': No configuration available.");
return;
}
endpoint->SetClient(tlsStream);
}
/**
* Sends an anonymous unicast message to the specified recipient.
*
* @param recipient The recipient of the message.
* @param message The message.
*/
void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient,
const MessagePart& message)
{
SendUnicastMessage(Endpoint::Ptr(), recipient, message);
}
///**
// * Sends an anonymous unicast message to the specified recipient.
// *
// * @param recipient The recipient of the message.
// * @param message The message.
// */
//void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& recipient,
// const MessagePart& message)
//{
// SendUnicastMessage(Endpoint::Ptr(), recipient, message);
//}
/**
* Sends a unicast message to the specified recipient.
*
* @param sender The sender of the message.
* @param recipient The recipient of the message.
* @param message The message.
*/
void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender,
const Endpoint::Ptr& recipient, const MessagePart& message)
{
/* don't forward messages between non-local endpoints, assume that
* anonymous senders (sender == null) are local */
// if ((sender && !sender->IsLocal()) && !recipient->IsLocal())
///**
// * Sends a unicast message to the specified recipient.
// *
// * @param sender The sender of the message.
// * @param recipient The recipient of the message.
// * @param message The message.
// */
//void EndpointManager::SendUnicastMessage(const Endpoint::Ptr& sender,
// const Endpoint::Ptr& recipient, const MessagePart& message)
//{
// /* don't forward messages between non-local endpoints, assume that
// * anonymous senders (sender == null) are local */
//// if ((sender && !sender->IsLocal()) && !recipient->IsLocal())
//// return;
//
// if (ResponseMessage::IsResponseMessage(message))
// recipient->ProcessResponse(sender, message);
// else
// recipient->ProcessRequest(sender, message);
//}
///**
// * Sends a message to exactly one recipient out of all recipients who have a
// * subscription for the message's topic.
// *
// * @param sender The sender of the message.
// * @param message The message.
// */
//void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
// const RequestMessage& message)
//{
// String method;
// if (!message.GetMethod(&method))
// BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
//
// std::vector<Endpoint::Ptr> candidates;
//
// BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
// /* don't forward messages between non-local endpoints */
//// if ((sender && !sender->IsLocal()) && !endpoint->IsLocal())
//// continue;
//
// if (endpoint->HasSubscription(method))
// candidates.push_back(endpoint);
// }
//
// if (candidates.empty())
// return;
//
// Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
// SendUnicastMessage(sender, recipient, message);
//}
if (ResponseMessage::IsResponseMessage(message))
recipient->ProcessResponse(sender, message);
else
recipient->ProcessRequest(sender, message);
}
///**
// * Sends an anonymous message to all recipients who have a subscription for the
// * message's topic.
// *
// * @param message The message.
// */
//void EndpointManager::SendMulticastMessage(const RequestMessage& message)
//{
// SendMulticastMessage(Endpoint::Ptr(), message);
//}
/**
* Sends a message to exactly one recipient out of all recipients who have a
* subscription for the message's topic.
*
* @param sender The sender of the message.
* @param message The message.
*/
void EndpointManager::SendAnycastMessage(const Endpoint::Ptr& sender,
const RequestMessage& message)
{
String method;
if (!message.GetMethod(&method))
BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
std::vector<Endpoint::Ptr> candidates;
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
/* don't forward messages between non-local endpoints */
// if ((sender && !sender->IsLocal()) && !endpoint->IsLocal())
///**
// * Sends a message to all recipients who have a subscription for the
// * message's topic.
// *
// * @param sender The sender of the message.
// * @param message The message.
// */
//void EndpointManager::SendBroadcastMessage(const Endpoint::Ptr& sender,
// const RequestMessage& message)
//{
// String method;
// if (!message.GetMethod(&method))
// BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
//
// BOOST_FOREACH(const Endpoint::Ptr& recipient, DynamicType::GetObjects<Endpoint>()) {
// /* don't forward messages back to the sender */
// if (sender == recipient)
// continue;
//
// Log(LogDebug, "remoting", "Send multicast message using method " + method);
// if (recipient->HasSubscription(method))
// SendUnicastMessage(sender, recipient, message);
// }
//}
if (endpoint->HasSubscription(method))
candidates.push_back(endpoint);
}
if (candidates.empty())
return;
Endpoint::Ptr recipient = candidates[rand() % candidates.size()];
SendUnicastMessage(sender, recipient, message);
}
/**
* Sends an anonymous message to all recipients who have a subscription for the
* message's topic.
*
* @param message The message.
*/
void EndpointManager::SendMulticastMessage(const RequestMessage& message)
{
SendMulticastMessage(Endpoint::Ptr(), message);
}
/**
* Sends a message to all recipients who have a subscription for the
* message's topic.
*
* @param sender The sender of the message.
* @param message The message.
*/
void EndpointManager::SendMulticastMessage(const Endpoint::Ptr& sender,
const RequestMessage& message)
{
String id;
if (message.GetID(&id))
BOOST_THROW_EXCEPTION(std::invalid_argument("Multicast requests must not have an ID."));
String method;
if (!message.GetMethod(&method))
BOOST_THROW_EXCEPTION(std::invalid_argument("Message is missing the 'method' property."));
BOOST_FOREACH(const Endpoint::Ptr& recipient, DynamicType::GetObjects<Endpoint>()) {
/* don't forward messages back to the sender */
if (sender == recipient)
continue;
Log(LogDebug, "remoting", "Send multicast message using method " + method);
if (recipient->HasSubscription(method))
SendUnicastMessage(sender, recipient, message);
}
}
void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
RequestMessage& message,
const EndpointManager::APICallback& callback, double timeout)
{
ObjectLock olock(this);
m_NextMessageID++;
String id = Convert::ToString(m_NextMessageID);
message.SetID(id);
PendingRequest pr;
pr.Request = message;
pr.Callback = callback;
pr.Timeout = Utility::GetTime() + timeout;
m_Requests[id] = pr;
if (!recipient)
SendAnycastMessage(sender, message);
else
SendUnicastMessage(sender, recipient, message);
}
bool EndpointManager::RequestTimeoutLessComparer(const std::pair<String, PendingRequest>& a,
const std::pair<String, PendingRequest>& b)
{
return a.second.Timeout < b.second.Timeout;
}
void EndpointManager::SubscriptionTimerHandler(void)
{
Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
/* don't copy subscriptions from non-local endpoints or the identity endpoint */
// if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint)
// continue;
Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions();
if (endpointSubscriptions) {
ObjectLock olock(endpointSubscriptions);
String topic;
BOOST_FOREACH(boost::tie(boost::tuples::ignore, topic), endpointSubscriptions) {
subscriptions->Set(topic, topic);
}
}
}
subscriptions->Seal();
if (m_Endpoint) {
ObjectLock olock(m_Endpoint);
m_Endpoint->SetSubscriptions(subscriptions);
}
}
//void EndpointManager::SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient,
// RequestMessage& message,
// const EndpointManager::APICallback& callback, double timeout)
//{
// ObjectLock olock(this);
//
// m_NextMessageID++;
//
// String id = Convert::ToString(m_NextMessageID);
// message.SetID(id);
//
// PendingRequest pr;
// pr.Request = message;
// pr.Callback = callback;
// pr.Timeout = Utility::GetTime() + timeout;
//
// m_Requests[id] = pr;
//
// if (!recipient)
// SendAnycastMessage(sender, message);
// else
// SendUnicastMessage(sender, recipient, message);
//}
//
//bool EndpointManager::RequestTimeoutLessComparer(const std::pair<String, PendingRequest>& a,
// const std::pair<String, PendingRequest>& b)
//{
// return a.second.Timeout < b.second.Timeout;
//}
//
//void EndpointManager::SubscriptionTimerHandler(void)
//{
// Dictionary::Ptr subscriptions = boost::make_shared<Dictionary>();
//
// BOOST_FOREACH(const Endpoint::Ptr& endpoint, DynamicType::GetObjects<Endpoint>()) {
// /* don't copy subscriptions from non-local endpoints or the identity endpoint */
//// if (!endpoint->IsLocalEndpoint() || endpoint == m_Endpoint)
//// continue;
//
// Dictionary::Ptr endpointSubscriptions = endpoint->GetSubscriptions();
//
// if (endpointSubscriptions) {
// ObjectLock olock(endpointSubscriptions);
//
// String topic;
// BOOST_FOREACH(boost::tie(boost::tuples::ignore, topic), endpointSubscriptions) {
// subscriptions->Set(topic, topic);
// }
// }
// }
//
// subscriptions->Seal();
//
// if (m_Endpoint) {
// ObjectLock olock(m_Endpoint);
// m_Endpoint->SetSubscriptions(subscriptions);
// }
//}
void EndpointManager::ReconnectTimerHandler(void)
{
@ -376,57 +353,57 @@ void EndpointManager::ReconnectTimerHandler(void)
if (endpoint->IsConnected() || endpoint == m_Endpoint)
continue;
String node, service;
node = endpoint->GetNode();
service = endpoint->GetService();
String host, port;
host = endpoint->GetHost();
port = endpoint->GetPort();
if (node.IsEmpty() || service.IsEmpty()) {
if (host.IsEmpty() || port.IsEmpty()) {
Log(LogWarning, "icinga", "Can't reconnect "
"to endpoint '" + endpoint->GetName() + "': No "
"node/service information.");
"host/port information.");
continue;
}
AddConnection(node, service);
AddConnection(host, port);
}
}
void EndpointManager::RequestTimerHandler(void)
{
ObjectLock olock(this);
//void EndpointManager::RequestTimerHandler(void)
//{
// ObjectLock olock(this);
//
// std::map<String, PendingRequest>::iterator it;
// for (it = m_Requests.begin(); it != m_Requests.end(); ++it) {
// if (it->second.HasTimedOut()) {
// it->second.Callback(Endpoint::Ptr(), it->second.Request,
// ResponseMessage(), true);
//
// m_Requests.erase(it);
//
// break;
// }
// }
//}
std::map<String, PendingRequest>::iterator it;
for (it = m_Requests.begin(); it != m_Requests.end(); ++it) {
if (it->second.HasTimedOut()) {
it->second.Callback(Endpoint::Ptr(), it->second.Request,
ResponseMessage(), true);
m_Requests.erase(it);
break;
}
}
}
void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender,
const ResponseMessage& message)
{
ObjectLock olock(this);
String id;
if (!message.GetID(&id))
BOOST_THROW_EXCEPTION(std::invalid_argument("Response message must have a message ID."));
std::map<String, PendingRequest>::iterator it;
it = m_Requests.find(id);
if (it == m_Requests.end())
return;
it->second.Callback(sender, it->second.Request, message, false);
m_Requests.erase(it);
}
//void EndpointManager::ProcessResponseMessage(const Endpoint::Ptr& sender,
// const ResponseMessage& message)
//{
// ObjectLock olock(this);
//
// String id;
// if (!message.GetID(&id))
// BOOST_THROW_EXCEPTION(std::invalid_argument("Response message must have a message ID."));
//
// std::map<String, PendingRequest>::iterator it;
// it = m_Requests.find(id);
//
// if (it == m_Requests.end())
// return;
//
// it->second.Callback(sender, it->second.Request, message, false);
//
// m_Requests.erase(it);
//}
EndpointManager *EndpointManager::GetInstance(void)
{

View File

@ -54,18 +54,11 @@ public:
void AddListener(const String& service);
void AddConnection(const String& node, const String& service);
void SendUnicastMessage(const Endpoint::Ptr& recipient, const MessagePart& message);
void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
void SendMulticastMessage(const RequestMessage& message);
void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
typedef boost::function<void(const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> APICallback;
void SendAPIMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, RequestMessage& message,
const APICallback& callback, double timeout = 30);
void ProcessResponseMessage(const Endpoint::Ptr& sender, const ResponseMessage& message);
//void SendUnicastMessage(const Endpoint::Ptr& recipient, const Dictionary::Ptr& message);
//void SendUnicastMessage(const Endpoint::Ptr& sender, const Endpoint::Ptr& recipient, const MessagePart& message);
//void SendAnycastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
//void SendMulticastMessage(const RequestMessage& message);
//void SendMulticastMessage(const Endpoint::Ptr& sender, const RequestMessage& message);
boost::signals2::signal<void (const Endpoint::Ptr&)> OnNewEndpoint;
@ -75,38 +68,10 @@ private:
shared_ptr<SSL_CTX> m_SSLContext;
Timer::Ptr m_SubscriptionTimer;
Timer::Ptr m_ReconnectTimer;
std::set<TcpSocket::Ptr> m_Servers;
/**
* Information about a pending API request.
*
* @ingroup remoting
*/
struct I2_REMOTING_API PendingRequest
{
double Timeout;
RequestMessage Request;
boost::function<void(const Endpoint::Ptr, const RequestMessage&, const ResponseMessage&, bool TimedOut)> Callback;
bool HasTimedOut(void) const
{
return Utility::GetTime() > Timeout;
}
};
long m_NextMessageID;
std::map<String, PendingRequest> m_Requests;
Timer::Ptr m_RequestTimer;
static bool RequestTimeoutLessComparer(const std::pair<String, PendingRequest>& a, const std::pair<String, PendingRequest>& b);
void RequestTimerHandler(void);
void SubscriptionTimerHandler(void);
void ReconnectTimerHandler(void);
void NewClientHandler(const Socket::Ptr& client, TlsRole role);

View File

@ -31,15 +31,14 @@ using namespace icinga;
*
* @param message The message.
*/
void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message)
void JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
{
Value value = message.GetDictionary();
String json = value.Serialize();
String json = Value(message).Serialize();
//std::cerr << ">> " << json << std::endl;
NetString::WriteStringToStream(stream, json);
}
MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream)
Dictionary::Ptr JsonRpc::ReadMessage(const Stream::Ptr& stream)
{
String jsonString;
if (!NetString::ReadStringFromStream(stream, &jsonString))
@ -53,5 +52,5 @@ MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream)
" message must be a dictionary."));
}
return MessagePart(value);
return value;
}

View File

@ -21,8 +21,8 @@
#define JSONRPC_H
#include "remoting/i2-remoting.h"
#include "remoting/messagepart.h"
#include "base/stream.h"
#include "base/dictionary.h"
namespace icinga
{
@ -35,8 +35,8 @@ namespace icinga
class I2_REMOTING_API JsonRpc
{
public:
static void SendMessage(const Stream::Ptr& stream, const MessagePart& message);
static MessagePart ReadMessage(const Stream::Ptr& stream);
static void SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
static Dictionary::Ptr ReadMessage(const Stream::Ptr& stream);
private:
JsonRpc(void);

View File

@ -1,127 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remoting/messagepart.h"
#include <boost/smart_ptr/make_shared.hpp>
using namespace icinga;
/**
* Constructor for the MessagePart class.
*/
MessagePart::MessagePart(void)
{
m_Dictionary = boost::make_shared<Dictionary>();
}
/**
* Constructor for the MessagePart class.
*
* @param dictionary The dictionary that this MessagePart object should wrap.
*/
MessagePart::MessagePart(const Dictionary::Ptr& dictionary)
{
m_Dictionary = dictionary;
}
/**
* Copy-constructor for the MessagePart class.
*
* @param message The message that should be copied.
*/
MessagePart::MessagePart(const MessagePart& message)
{
m_Dictionary = message.GetDictionary();
}
/**
* Retrieves the underlying dictionary for this message.
*
* @returns A dictionary.
*/
Dictionary::Ptr MessagePart::GetDictionary(void) const
{
return m_Dictionary;
}
/**
* Retrieves a property's value.
*
* @param key The name of the property.
* @param[out] value The value.
* @returns true if the value was retrieved, false otherwise.
*/
bool MessagePart::Get(String key, MessagePart *value) const
{
Value v;
v = GetDictionary()->Get(key);
if (!v.IsObjectType<Dictionary>())
return false;
Dictionary::Ptr dictionary = v;
MessagePart mp(dictionary);
*value = mp;
return true;
}
/**
* Sets a property's value.
*
* @param key The name of the property.
* @param value The value.
*/
void MessagePart::Set(String key, const MessagePart& value)
{
GetDictionary()->Set(key, value.GetDictionary());
}
/**
* Returns an iterator that points to the first element of the dictionary
* which holds the properties for the message.
*
* @returns An iterator.
*/
Dictionary::Iterator MessagePart::Begin(void)
{
return GetDictionary()->Begin();
}
/**
* Returns an iterator that points past the last element of the dictionary
* which holds the properties for the message.
*
* @returns An iterator.
*/
Dictionary::Iterator MessagePart::End(void)
{
return GetDictionary()->End();
}
/**
* Checks whether the message contains the specified element.
*
* @param key The name of the element.
* @returns true if the message contains the element, false otherwise.
*/
bool MessagePart::Contains(const String& key) const
{
return GetDictionary()->Contains(key);
}

View File

@ -1,88 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef MESSAGEPART_H
#define MESSAGEPART_H
#include "remoting/i2-remoting.h"
#include "base/dictionary.h"
namespace icinga
{
/**
* A part of an RPC message.
*
* @ingroup remoting
*/
class I2_REMOTING_API MessagePart
{
public:
MessagePart(void);
MessagePart(const MessagePart& message);
explicit MessagePart(const Dictionary::Ptr& dictionary);
Dictionary::Ptr GetDictionary(void) const;
/**
* Retrieves a property's value.
*
* @param key The name of the property.
* @param[out] value The value.
* @returns true if the value was retrieved, false otherwise.
*/
template<typename T>
bool Get(String key, T *value) const
{
Value v = GetDictionary()->Get(key);
if (v.IsEmpty())
return false;
*value = static_cast<T>(v);
return true;
}
/**
* Sets a property's value.
*
* @param key The name of the property.
* @param value The value.
*/
template<typename T>
void Set(String key, const T& value)
{
GetDictionary()->Set(key, value);
}
bool Get(String key, MessagePart *value) const;
void Set(String key, const MessagePart& value);
bool Contains(const String& key) const;
Dictionary::Iterator Begin(void);
Dictionary::Iterator End(void);
private:
Dictionary::Ptr m_Dictionary;
};
}
#endif /* MESSAGEPART_H */

View File

@ -1,22 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remoting/requestmessage.h"
using namespace icinga;

View File

@ -1,138 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef REQUESTMESSAGE_H
#define REQUESTMESSAGE_H
#include "remoting/i2-remoting.h"
#include "remoting/messagepart.h"
namespace icinga
{
/**
* A JSON-RPC request message.
*
* @ingroup remoting
*/
class I2_REMOTING_API RequestMessage : public MessagePart
{
public:
/**
* Constructor for the RequestMessage class.
*/
RequestMessage(void) : MessagePart() {
SetVersion("2.0");
}
/**
* Copy-constructor for the RequestMessage class.
*
* @param message The message that is to be copied.
*/
RequestMessage(const MessagePart& message) : MessagePart(message) { }
/**
* Retrieves the version of the JSON-RPC protocol.
*
* @param[out] value The value.
* @returns true if the value was retrieved, false otherwise.
*/
inline bool GetVersion(String *value) const
{
return Get("jsonrpc", value);
}
/**
* Sets the version of the JSON-RPC protocol that should be used.
*
* @param value The version.
*/
inline void SetVersion(const String& value)
{
Set("jsonrpc", value);
}
/**
* Retrieves the method of the JSON-RPC call.
*
* @param[out] value The method.
* @returns true if the value was retrieved, false otherwise.
*/
inline bool GetMethod(String *value) const
{
return Get("method", value);
}
/**
* Sets the method for the JSON-RPC call.
*
* @param value The method.
*/
inline void SetMethod(const String& value)
{
Set("method", value);
}
/**
* Retrieves the parameters of the JSON-RPC call.
*
* @param[out] value The parameters.
* @returns true if the value was retrieved, false otherwise.
*/
inline bool GetParams(MessagePart *value) const
{
return Get("params", value);
}
/**
* Sets the parameters for the JSON-RPC call.
*
* @param value The parameters.
*/
inline void SetParams(const MessagePart& value)
{
Set("params", value);
}
/**
* Retrieves the ID of the JSON-RPC call.
*
* @param[out] value The ID.
* @return true if the value was retrieved, false otherwise.
*/
inline bool GetID(String *value) const
{
return Get("id", value);
}
/**
* Sets the ID for the JSON-RPC call.
*
* @param value The ID.
*/
inline void SetID(const String& value)
{
Set("id", value);
}
};
}
#endif /* REQUESTMESSAGE_H */

View File

@ -1,22 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "remoting/responsemessage.h"
using namespace icinga;

View File

@ -1,149 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef RESPONSEMESSAGE_H
#define RESPONSEMESSAGE_H
#include "remoting/i2-remoting.h"
#include "remoting/messagepart.h"
namespace icinga
{
/**
* A JSON-RPC response message.
*
* @ingroup remoting
*/
class I2_REMOTING_API ResponseMessage : public MessagePart
{
public:
/**
* Constructor for the ResponseMessage class.
*/
ResponseMessage(void) : MessagePart() {
SetVersion("2.0");
}
/**
* Copy-constructor for the ResponseMessage class.
*
* @param message The message that should be copied.
*/
ResponseMessage(const MessagePart& message) : MessagePart(message) { }
/**
* Retrieves the version of the JSON-RPC protocol.
*
* @param[out] value The value.
* @returns true if the value was retrieved, false otherwise.
*/
inline bool GetVersion(String *value) const
{
return Get("jsonrpc", value);
}
/**
* Sets the version of the JSON-RPC protocol that should be used.
*
* @param value The version.
*/
inline void SetVersion(const String& value)
{
Set("jsonrpc", value);
}
/**
* Retrieves the result of the JSON-RPC call.
*
* @param[out] value The result.
* @returns true if the value was retrieved, false otherwise.
*/
bool GetResult(MessagePart *value) const
{
return Get("result", value);
}
/**
* Sets the result for the JSON-RPC call.
*
* @param value The result.
*/
void SetResult(const MessagePart& value)
{
Set("result", value);
}
/**
* Retrieves the error message of the JSON-RPC call.
*
* @param[out] value The error message.
* @returns true if the value was retrieved, false otherwise.
*/
bool GetError(String *value) const
{
return Get("error", value);
}
/**
* Sets the error message for the JSON-RPC call.
*
* @param value The error message.
*/
void SetError(const String& value)
{
Set("error", value);
}
/**
* Retrieves the ID of the JSON-RPC call.
*
* @param[out] value The ID.
* @return true if the value was retrieved, false otherwise.
*/
bool GetID(String *value) const
{
return Get("id", value);
}
/**
* Sets the ID for the JSON-RPC call.
*
* @param value The ID.
*/
void SetID(const String& value)
{
Set("id", value);
}
/**
* Checks whether a message is a response message.
*
* @param message The message.
* @returns true if the message is a response message, false otherwise.
*/
static bool IsResponseMessage(const MessagePart& message)
{
return (message.Contains("result"));
}
};
}
#endif /* RESPONSEMESSAGE_H */