Cleaned up jsonrpc library.

Updated documentation.
This commit is contained in:
Gunnar Beutner 2012-05-15 16:24:04 +02:00
parent 2f6380c8f1
commit 77bec95ec5
30 changed files with 439 additions and 318 deletions

View File

@ -302,26 +302,6 @@ void Application::Log(string message)
cout << "[" << timestamp << "]: " << message << endl; cout << "[" << timestamp << "]: " << message << endl;
} }
/**
* Sets the application's command line arguments.
*
* @param arguments The arguments.
*/
void Application::SetArguments(const vector<string>& arguments)
{
m_Arguments = arguments;
}
/**
* Retrieves the application's command line arguments.
*
* @returns The arguments.
*/
const vector<string>& Application::GetArguments(void) const
{
return m_Arguments;
}
/** /**
* Retrieves the directory the application's binary is contained in. * Retrieves the directory the application's binary is contained in.
* *
@ -469,20 +449,17 @@ int Application::Run(int argc, char **argv)
sigaction(SIGPIPE, &sa, NULL); sigaction(SIGPIPE, &sa, NULL);
#endif /* _WIN32 */ #endif /* _WIN32 */
vector<string> args; m_Arguments.clear();
for (int i = 0; i < argc; i++) for (int i = 0; i < argc; i++)
args.push_back(string(argv[i])); m_Arguments.push_back(string(argv[i]));
SetArguments(args);
if (IsDebugging()) { if (IsDebugging()) {
result = Main(args); result = Main(m_Arguments);
Application::Instance.reset(); Application::Instance.reset();
} else { } else {
try { try {
result = Main(args); result = Main(m_Arguments);
} catch (const Exception& ex) { } catch (const Exception& ex) {
Application::Instance.reset(); Application::Instance.reset();

View File

@ -54,9 +54,6 @@ public:
virtual int Main(const vector<string>& args) = 0; virtual int Main(const vector<string>& args) = 0;
void SetArguments(const vector<string>& arguments);
const vector<string>& GetArguments(void) const;
void Shutdown(void); void Shutdown(void);
static void Log(string message); static void Log(string message);

View File

@ -35,22 +35,22 @@ void ConfigRpcComponent::Start(void)
long configSource; long configSource;
if (GetConfig()->GetPropertyInteger("configSource", &configSource) && configSource != 0) { if (GetConfig()->GetPropertyInteger("configSource", &configSource) && configSource != 0) {
m_ConfigRpcEndpoint->RegisterMethodHandler("config::FetchObjects", m_ConfigRpcEndpoint->RegisterTopicHandler("config::FetchObjects",
bind_weak(&ConfigRpcComponent::FetchObjectsHandler, shared_from_this())); bind_weak(&ConfigRpcComponent::FetchObjectsHandler, shared_from_this()));
configHive->OnObjectCommitted += bind_weak(&ConfigRpcComponent::LocalObjectCommittedHandler, shared_from_this()); configHive->OnObjectCommitted += bind_weak(&ConfigRpcComponent::LocalObjectCommittedHandler, shared_from_this());
configHive->OnObjectRemoved += bind_weak(&ConfigRpcComponent::LocalObjectRemovedHandler, shared_from_this()); configHive->OnObjectRemoved += bind_weak(&ConfigRpcComponent::LocalObjectRemovedHandler, shared_from_this());
m_ConfigRpcEndpoint->RegisterMethodSource("config::ObjectCommitted"); m_ConfigRpcEndpoint->RegisterPublication("config::ObjectCommitted");
m_ConfigRpcEndpoint->RegisterMethodSource("config::ObjectRemoved"); m_ConfigRpcEndpoint->RegisterPublication("config::ObjectRemoved");
} }
endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this()); endpointManager->OnNewEndpoint += bind_weak(&ConfigRpcComponent::NewEndpointHandler, shared_from_this());
m_ConfigRpcEndpoint->RegisterMethodSource("config::FetchObjects"); m_ConfigRpcEndpoint->RegisterPublication("config::FetchObjects");
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectCommitted", m_ConfigRpcEndpoint->RegisterTopicHandler("config::ObjectCommitted",
bind_weak(&ConfigRpcComponent::RemoteObjectCommittedHandler, shared_from_this())); bind_weak(&ConfigRpcComponent::RemoteObjectCommittedHandler, shared_from_this()));
m_ConfigRpcEndpoint->RegisterMethodHandler("config::ObjectRemoved", m_ConfigRpcEndpoint->RegisterTopicHandler("config::ObjectRemoved",
bind_weak(&ConfigRpcComponent::RemoteObjectRemovedHandler, shared_from_this())); bind_weak(&ConfigRpcComponent::RemoteObjectRemovedHandler, shared_from_this()));
endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint); endpointManager->RegisterEndpoint(m_ConfigRpcEndpoint);
@ -73,18 +73,18 @@ int ConfigRpcComponent::NewEndpointHandler(const NewEndpointEventArgs& ea)
int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea) int ConfigRpcComponent::SessionEstablishedHandler(const EventArgs& ea)
{ {
JsonRpcRequest request; RpcRequest request;
request.SetMethod("config::FetchObjects"); request.SetMethod("config::FetchObjects");
Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source); Endpoint::Ptr endpoint = static_pointer_cast<Endpoint>(ea.Source);
GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, endpoint, request); GetEndpointManager()->SendUnicastMessage(m_ConfigRpcEndpoint, endpoint, request);
return 0; return 0;
} }
JsonRpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties) RpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties)
{ {
JsonRpcRequest msg; RpcRequest msg;
msg.SetMethod(method); msg.SetMethod(method);
Message params; Message params;
@ -121,9 +121,9 @@ int ConfigRpcComponent::FetchObjectsHandler(const NewRequestEventArgs& ea)
if (!ShouldReplicateObject(object)) if (!ShouldReplicateObject(object))
continue; continue;
JsonRpcRequest request = MakeObjectMessage(object, "config::ObjectCreated", true); RpcRequest request = MakeObjectMessage(object, "config::ObjectCreated", true);
GetEndpointManager()->SendUnicastRequest(m_ConfigRpcEndpoint, client, request); GetEndpointManager()->SendUnicastMessage(m_ConfigRpcEndpoint, client, request);
} }
} }
@ -137,7 +137,7 @@ int ConfigRpcComponent::LocalObjectCommittedHandler(const EventArgs& ea)
if (!ShouldReplicateObject(object)) if (!ShouldReplicateObject(object))
return 0; return 0;
GetEndpointManager()->SendMulticastRequest(m_ConfigRpcEndpoint, GetEndpointManager()->SendMulticastMessage(m_ConfigRpcEndpoint,
MakeObjectMessage(object, "config::ObjectCreated", true)); MakeObjectMessage(object, "config::ObjectCreated", true));
return 0; return 0;
@ -150,7 +150,7 @@ int ConfigRpcComponent::LocalObjectRemovedHandler(const EventArgs& ea)
if (!ShouldReplicateObject(object)) if (!ShouldReplicateObject(object))
return 0; return 0;
GetEndpointManager()->SendMulticastRequest(m_ConfigRpcEndpoint, GetEndpointManager()->SendMulticastMessage(m_ConfigRpcEndpoint,
MakeObjectMessage(object, "config::ObjectRemoved", false)); MakeObjectMessage(object, "config::ObjectRemoved", false));
return 0; return 0;
@ -158,7 +158,7 @@ int ConfigRpcComponent::LocalObjectRemovedHandler(const EventArgs& ea)
int ConfigRpcComponent::RemoteObjectCommittedHandler(const NewRequestEventArgs& ea) int ConfigRpcComponent::RemoteObjectCommittedHandler(const NewRequestEventArgs& ea)
{ {
JsonRpcRequest message = ea.Request; RpcRequest message = ea.Request;
bool was_null = false; bool was_null = false;
Message params; Message params;
@ -199,7 +199,7 @@ int ConfigRpcComponent::RemoteObjectCommittedHandler(const NewRequestEventArgs&
int ConfigRpcComponent::RemoteObjectRemovedHandler(const NewRequestEventArgs& ea) int ConfigRpcComponent::RemoteObjectRemovedHandler(const NewRequestEventArgs& ea)
{ {
JsonRpcRequest message = ea.Request; RpcRequest message = ea.Request;
Message params; Message params;
if (!message.GetParams(&params)) if (!message.GetParams(&params))

View File

@ -38,7 +38,7 @@ private:
int RemoteObjectCommittedHandler(const NewRequestEventArgs& ea); int RemoteObjectCommittedHandler(const NewRequestEventArgs& ea);
int RemoteObjectRemovedHandler(const NewRequestEventArgs& ea); int RemoteObjectRemovedHandler(const NewRequestEventArgs& ea);
static JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties); static RpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
static bool ShouldReplicateObject(const ConfigObject::Ptr& object); static bool ShouldReplicateObject(const ConfigObject::Ptr& object);
public: public:

View File

@ -37,9 +37,9 @@ string DemoComponent::GetName(void) const
void DemoComponent::Start(void) void DemoComponent::Start(void)
{ {
m_DemoEndpoint = make_shared<VirtualEndpoint>(); m_DemoEndpoint = make_shared<VirtualEndpoint>();
m_DemoEndpoint->RegisterMethodHandler("demo::HelloWorld", m_DemoEndpoint->RegisterTopicHandler("demo::HelloWorld",
bind_weak(&DemoComponent::HelloWorldRequestHandler, shared_from_this())); bind_weak(&DemoComponent::HelloWorldRequestHandler, shared_from_this()));
m_DemoEndpoint->RegisterMethodSource("demo::HelloWorld"); m_DemoEndpoint->RegisterPublication("demo::HelloWorld");
EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager(); EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager();
endpointManager->RegisterEndpoint(m_DemoEndpoint); endpointManager->RegisterEndpoint(m_DemoEndpoint);
@ -73,11 +73,11 @@ int DemoComponent::DemoTimerHandler(const TimerEventArgs&)
{ {
Application::Log("Sending multicast 'hello world' message."); Application::Log("Sending multicast 'hello world' message.");
JsonRpcRequest request; RpcRequest request;
request.SetMethod("demo::HelloWorld"); request.SetMethod("demo::HelloWorld");
EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager(); EndpointManager::Ptr endpointManager = GetIcingaApplication()->GetEndpointManager();
endpointManager->SendMulticastRequest(m_DemoEndpoint, request); endpointManager->SendMulticastMessage(m_DemoEndpoint, request);
return 0; return 0;
} }

View File

@ -38,15 +38,15 @@ void DiscoveryComponent::Start(void)
{ {
m_DiscoveryEndpoint = make_shared<VirtualEndpoint>(); m_DiscoveryEndpoint = make_shared<VirtualEndpoint>();
m_DiscoveryEndpoint->RegisterMethodSource("discovery::RegisterComponent"); m_DiscoveryEndpoint->RegisterPublication("discovery::RegisterComponent");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::RegisterComponent", m_DiscoveryEndpoint->RegisterTopicHandler("discovery::RegisterComponent",
bind_weak(&DiscoveryComponent::RegisterComponentMessageHandler, shared_from_this())); bind_weak(&DiscoveryComponent::RegisterComponentMessageHandler, shared_from_this()));
m_DiscoveryEndpoint->RegisterMethodSource("discovery::NewComponent"); m_DiscoveryEndpoint->RegisterPublication("discovery::NewComponent");
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::NewComponent", m_DiscoveryEndpoint->RegisterTopicHandler("discovery::NewComponent",
bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this())); bind_weak(&DiscoveryComponent::NewComponentMessageHandler, shared_from_this()));
m_DiscoveryEndpoint->RegisterMethodHandler("discovery::Welcome", m_DiscoveryEndpoint->RegisterTopicHandler("discovery::Welcome",
bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this())); bind_weak(&DiscoveryComponent::WelcomeMessageHandler, shared_from_this()));
GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1)); GetEndpointManager()->ForEachEndpoint(bind(&DiscoveryComponent::NewEndpointHandler, this, _1));
@ -112,42 +112,16 @@ int DiscoveryComponent::NewEndpointHandler(const NewEndpointEventArgs& neea)
neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this()); neea.Endpoint->OnIdentityChanged += bind_weak(&DiscoveryComponent::NewIdentityHandler, shared_from_this());
/* accept discovery::RegisterComponent messages from any endpoint */ /* accept discovery::RegisterComponent messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::RegisterComponent"); neea.Endpoint->RegisterPublication("discovery::RegisterComponent");
/* accept discovery::Welcome messages from any endpoint */ /* accept discovery::Welcome messages from any endpoint */
neea.Endpoint->RegisterMethodSource("discovery::Welcome"); neea.Endpoint->RegisterPublication("discovery::Welcome");
return 0; return 0;
} }
/** /**
* Registers a new message sink for a component. * Registers message Subscriptions/sources in the specified component information object.
*
* @param nmea Event args for the new message sink.
* @param info The component registration information.
* @returns 0
*/
int DiscoveryComponent::DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const
{
info->SubscribedMethods.insert(nmea.Method);
return 0;
}
/**
* Registers a new message source for a component.
*
* @param nmea Event args for the new message source.
* @param info The component registration information.
* @returns 0
*/
int DiscoveryComponent::DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const
{
info->PublishedMethods.insert(nmea.Method);
return 0;
}
/**
* Registers message sinks/sources in the specified component information object.
* *
* @param neea Event arguments for the endpoint. * @param neea Event arguments for the endpoint.
* @param info Component information object. * @param info Component information object.
@ -155,8 +129,16 @@ int DiscoveryComponent::DiscoverySourceHandler(const NewMethodEventArgs& nmea, C
*/ */
int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const int DiscoveryComponent::DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const
{ {
neea.Endpoint->ForEachMethodSink(bind(&DiscoveryComponent::DiscoverySinkHandler, this, _1, info)); Endpoint::ConstTopicIterator i;
neea.Endpoint->ForEachMethodSource(bind(&DiscoveryComponent::DiscoverySourceHandler, this, _1, info));
for (i = neea.Endpoint->BeginSubscriptions(); i != neea.Endpoint->EndSubscriptions(); i++) {
info->Subscriptions.insert(*i);
}
for (i = neea.Endpoint->BeginPublications(); i != neea.Endpoint->EndPublications(); i++) {
info->Publications.insert(*i);
}
return 0; return 0;
} }
@ -216,7 +198,7 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
// we assume the other component _always_ wants // we assume the other component _always_ wants
// discovery::RegisterComponent messages from us // discovery::RegisterComponent messages from us
endpoint->RegisterMethodSink("discovery::RegisterComponent"); endpoint->RegisterSubscription("discovery::RegisterComponent");
// send a discovery::RegisterComponent message, if the // send a discovery::RegisterComponent message, if the
// other component is a broker this makes sure // other component is a broker this makes sure
@ -227,7 +209,7 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
// we assume the other component _always_ wants // we assume the other component _always_ wants
// discovery::NewComponent messages from us // discovery::NewComponent messages from us
endpoint->RegisterMethodSink("discovery::NewComponent"); endpoint->RegisterSubscription("discovery::NewComponent");
// send discovery::NewComponent message for ourselves // send discovery::NewComponent message for ourselves
SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint); SendDiscoveryMessage("discovery::NewComponent", GetEndpointManager()->GetIdentity(), endpoint);
@ -248,14 +230,14 @@ int DiscoveryComponent::NewIdentityHandler(const EventArgs& ea)
return 0; return 0;
} }
// register published/subscribed methods for this endpoint // register published/subscribed topics for this endpoint
ComponentDiscoveryInfo::Ptr info = ic->second; ComponentDiscoveryInfo::Ptr info = ic->second;
set<string>::iterator it; set<string>::iterator it;
for (it = info->PublishedMethods.begin(); it != info->PublishedMethods.end(); it++) for (it = info->Publications.begin(); it != info->Publications.end(); it++)
endpoint->RegisterMethodSource(*it); endpoint->RegisterPublication(*it);
for (it = info->SubscribedMethods.begin(); it != info->SubscribedMethods.end(); it++) for (it = info->Subscriptions.begin(); it != info->Subscriptions.end(); it++)
endpoint->RegisterMethodSink(*it); endpoint->RegisterSubscription(*it);
FinishDiscoverySetup(endpoint); FinishDiscoverySetup(endpoint);
@ -288,7 +270,7 @@ int DiscoveryComponent::WelcomeMessageHandler(const NewRequestEventArgs& nrea)
/** /**
* Finishes the welcome handshake for a new component * Finishes the welcome handshake for a new component
* by registering message sinks/sources for the component * by registering message Subscriptions/sources for the component
* and sending a welcome message if necessary. * and sending a welcome message if necessary.
* *
* @param endpoint The endpoint to set up. * @param endpoint The endpoint to set up.
@ -300,10 +282,10 @@ void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
// we assume the other component _always_ wants // we assume the other component _always_ wants
// discovery::Welcome messages from us // discovery::Welcome messages from us
endpoint->RegisterMethodSink("discovery::Welcome"); endpoint->RegisterSubscription("discovery::Welcome");
JsonRpcRequest request; RpcRequest request;
request.SetMethod("discovery::Welcome"); request.SetMethod("discovery::Welcome");
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, endpoint, request); GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, endpoint, request);
endpoint->SetSentWelcome(true); endpoint->SetSentWelcome(true);
@ -324,7 +306,7 @@ void DiscoveryComponent::FinishDiscoverySetup(Endpoint::Ptr endpoint)
*/ */
void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient) void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, Endpoint::Ptr recipient)
{ {
JsonRpcRequest request; RpcRequest request;
request.SetMethod(method); request.SetMethod(method);
DiscoveryMessage params; DiscoveryMessage params;
@ -333,10 +315,10 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En
params.SetIdentity(identity); params.SetIdentity(identity);
Message subscriptions; Message subscriptions;
params.SetSubscribes(subscriptions); params.SetSubscriptions(subscriptions);
Message publications; Message publications;
params.SetProvides(publications); params.SetPublications(publications);
ComponentDiscoveryInfo::Ptr info; ComponentDiscoveryInfo::Ptr info;
@ -349,16 +331,16 @@ void DiscoveryComponent::SendDiscoveryMessage(string method, string identity, En
} }
set<string>::iterator i; set<string>::iterator i;
for (i = info->PublishedMethods.begin(); i != info->PublishedMethods.end(); i++) for (i = info->Publications.begin(); i != info->Publications.end(); i++)
publications.AddUnnamedPropertyString(*i); publications.AddUnnamedPropertyString(*i);
for (i = info->SubscribedMethods.begin(); i != info->SubscribedMethods.end(); i++) for (i = info->Subscriptions.begin(); i != info->Subscriptions.end(); i++)
subscriptions.AddUnnamedPropertyString(*i); subscriptions.AddUnnamedPropertyString(*i);
if (recipient) if (recipient)
GetEndpointManager()->SendUnicastRequest(m_DiscoveryEndpoint, recipient, request); GetEndpointManager()->SendUnicastMessage(m_DiscoveryEndpoint, recipient, request);
else else
GetEndpointManager()->SendMulticastRequest(m_DiscoveryEndpoint, request); GetEndpointManager()->SendMulticastMessage(m_DiscoveryEndpoint, request);
} }
bool DiscoveryComponent::HasMessagePermission(Dictionary::Ptr roles, string messageType, string message) bool DiscoveryComponent::HasMessagePermission(Dictionary::Ptr roles, string messageType, string message)
@ -419,26 +401,26 @@ void DiscoveryComponent::ProcessDiscoveryMessage(string identity, DiscoveryMessa
Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity); Endpoint::Ptr endpoint = GetEndpointManager()->GetEndpointByIdentity(identity);
Message provides; Message publications;
if (message.GetProvides(&provides)) { if (message.GetPublications(&publications)) {
DictionaryIterator i; DictionaryIterator i;
for (i = provides.GetDictionary()->Begin(); i != provides.GetDictionary()->End(); i++) { for (i = publications.GetDictionary()->Begin(); i != publications.GetDictionary()->End(); i++) {
if (trusted || HasMessagePermission(roles, "publish", i->second)) { if (trusted || HasMessagePermission(roles, "publications", i->second)) {
info->PublishedMethods.insert(i->second); info->Publications.insert(i->second);
if (endpoint) if (endpoint)
endpoint->RegisterMethodSource(i->second); endpoint->RegisterPublication(i->second);
} }
} }
} }
Message subscribes; Message subscriptions;
if (message.GetSubscribes(&subscribes)) { if (message.GetSubscriptions(&subscriptions)) {
DictionaryIterator i; DictionaryIterator i;
for (i = subscribes.GetDictionary()->Begin(); i != subscribes.GetDictionary()->End(); i++) { for (i = subscriptions.GetDictionary()->Begin(); i != subscriptions.GetDictionary()->End(); i++) {
if (trusted || HasMessagePermission(roles, "subscribe", i->second)) { if (trusted || HasMessagePermission(roles, "subscriptions", i->second)) {
info->SubscribedMethods.insert(i->second); info->Subscriptions.insert(i->second);
if (endpoint) if (endpoint)
endpoint->RegisterMethodSink(i->second); endpoint->RegisterSubscription(i->second);
} }
} }
} }

View File

@ -32,8 +32,8 @@ public:
string Node; string Node;
string Service; string Service;
set<string> SubscribedMethods; set<string> Subscriptions;
set<string> PublishedMethods; set<string> Publications;
time_t LastSeen; time_t LastSeen;
}; };
@ -60,8 +60,6 @@ private:
int CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea); int CheckExistingEndpoint(Endpoint::Ptr endpoint, const NewEndpointEventArgs& neea);
int DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const; int DiscoveryEndpointHandler(const NewEndpointEventArgs& neea, ComponentDiscoveryInfo::Ptr info) const;
int DiscoverySinkHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
int DiscoverySourceHandler(const NewMethodEventArgs& nmea, ComponentDiscoveryInfo::Ptr info) const;
int DiscoveryTimerHandler(const TimerEventArgs& tea); int DiscoveryTimerHandler(const TimerEventArgs& tea);

View File

@ -41,24 +41,24 @@ public:
SetPropertyString("service", value); SetPropertyString("service", value);
} }
inline bool GetSubscribes(Message *value) const inline bool GetSubscriptions(Message *value) const
{ {
return GetPropertyMessage("subscribes", value); return GetPropertyMessage("subscriptions", value);
} }
inline void SetSubscribes(Message value) inline void SetSubscriptions(Message value)
{ {
SetPropertyMessage("subscribes", value); SetPropertyMessage("subscriptions", value);
} }
inline bool GetProvides(Message *value) const inline bool GetPublications(Message *value) const
{ {
return GetPropertyMessage("provides", value); return GetPropertyMessage("publications", value);
} }
inline void SetProvides(Message value) inline void SetPublications(Message value)
{ {
SetPropertyMessage("provides", value); SetPropertyMessage("publications", value);
} }
}; };

View File

@ -29,11 +29,11 @@
}, },
"role": { "role": {
"broker": { "broker": {
"publish": [ "discovery::NewComponent" ] "publications": [ "discovery::NewComponent" ]
}, },
"demo": { "demo": {
"publish": [ "demo::*" ], "publications": [ "demo::*" ],
"subscribe": [ "demo::*" ] "subscriptions": [ "demo::*" ]
} }
} }
} }

View File

@ -24,11 +24,11 @@
}, },
"role": { "role": {
"broker": { "broker": {
"publish": [ "discovery::NewComponent" ] "publications": [ "discovery::NewComponent" ]
}, },
"demo": { "demo": {
"publish": [ "demo::*" ], "publications": [ "demo::*" ],
"subscribe": [ "demo::*" ] "subscriptions": [ "demo::*" ]
} }
} }
} }

View File

@ -24,11 +24,11 @@
}, },
"role": { "role": {
"broker": { "broker": {
"publish": [ "discovery::NewComponent" ] "publications": [ "discovery::NewComponent" ]
}, },
"demo": { "demo": {
"publish": [ "demo::*" ], "publications": [ "demo::*" ],
"subscribe": [ "demo::*" ] "subscriptions": [ "demo::*" ]
} }
} }
} }

View File

@ -21,17 +21,30 @@
using namespace icinga; using namespace icinga;
/**
* Constructor for the Endpoint class.
*/
Endpoint::Endpoint(void) Endpoint::Endpoint(void)
{ {
m_ReceivedWelcome = false; m_ReceivedWelcome = false;
m_SentWelcome = false; m_SentWelcome = false;
} }
/**
* Retrieves the identity of this endpoint.
*
* @returns The identity of the endpoint.
*/
string Endpoint::GetIdentity(void) const string Endpoint::GetIdentity(void) const
{ {
return m_Identity; return m_Identity;
} }
/**
* Sets the identity of this endpoint.
*
* @param identity The new identity of the endpoint.
*/
void Endpoint::SetIdentity(string identity) void Endpoint::SetIdentity(string identity)
{ {
m_Identity = identity; m_Identity = identity;
@ -41,126 +54,179 @@ void Endpoint::SetIdentity(string identity)
OnIdentityChanged(ea); OnIdentityChanged(ea);
} }
bool Endpoint::HasIdentity(void) const /**
{ * Retrieves the endpoint manager this endpoint is registered with.
return !m_Identity.empty(); *
} * @returns The EndpointManager object.
*/
EndpointManager::Ptr Endpoint::GetEndpointManager(void) const EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
{ {
return m_EndpointManager.lock(); return m_EndpointManager.lock();
} }
/**
* Sets the endpoint manager this endpoint is registered with.
*
* @param manager The EndpointManager object.
*/
void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager) void Endpoint::SetEndpointManager(EndpointManager::WeakPtr manager)
{ {
m_EndpointManager = manager; m_EndpointManager = manager;
} }
void Endpoint::RegisterMethodSink(string method) /**
* Registers a topic subscription for this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::RegisterSubscription(string topic)
{ {
m_MethodSinks.insert(method); m_Subscriptions.insert(topic);
} }
void Endpoint::UnregisterMethodSink(string method) /**
* Removes a topic subscription from this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::UnregisterSubscription(string topic)
{ {
m_MethodSinks.erase(method); m_Subscriptions.erase(topic);
} }
bool Endpoint::IsMethodSink(string method) const /**
* Checks whether the endpoint has a subscription for the specified topic.
*
* @param topic The name of the topic.
* @returns true if the endpoint is subscribed to the topic, false otherwise.
*/
bool Endpoint::HasSubscription(string topic) const
{ {
return (m_MethodSinks.find(method) != m_MethodSinks.end()); return (m_Subscriptions.find(topic) != m_Subscriptions.end());
} }
void Endpoint::ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback) /**
* Registers a topic publication for this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::RegisterPublication(string topic)
{ {
for (set<string>::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) { m_Publications.insert(topic);
NewMethodEventArgs nmea;
nmea.Source = shared_from_this();
nmea.Method = *i;
callback(nmea);
}
} }
void Endpoint::RegisterMethodSource(string method) /**
* Removes a topic publication from this endpoint.
*
* @param topic The name of the topic.
*/
void Endpoint::UnregisterPublication(string topic)
{ {
m_MethodSources.insert(method); m_Publications.erase(topic);
} }
void Endpoint::UnregisterMethodSource(string method) /**
* Checks whether the endpoint has a publication for the specified topic.
*
* @param topic The name of the topic.
* @returns true if the endpoint is publishing this topic, false otherwise.
*/
bool Endpoint::HasPublication(string topic) const
{ {
m_MethodSources.erase(method); return (m_Publications.find(topic) != m_Publications.end());
} }
bool Endpoint::IsMethodSource(string method) const /**
* Removes all subscriptions for the endpoint.
*/
void Endpoint::ClearSubscriptions(void)
{ {
return (m_MethodSources.find(method) != m_MethodSources.end()); m_Subscriptions.clear();
} }
void Endpoint::ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback) /**
* Removes all publications for the endpoint.
*/
void Endpoint::ClearPublications(void)
{ {
for (set<string>::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) { m_Publications.clear();
NewMethodEventArgs nmea;
nmea.Source = shared_from_this();
nmea.Method = *i;
callback(nmea);
}
} }
void Endpoint::ClearMethodSinks(void) /**
* Returns the beginning of the subscriptions list.
*
* @returns An iterator that points to the first subscription.
*/
Endpoint::ConstTopicIterator Endpoint::BeginSubscriptions(void) const
{ {
m_MethodSinks.clear(); return m_Subscriptions.begin();
} }
void Endpoint::ClearMethodSources(void) /**
* Returns the end of the subscriptions list.
*
* @returns An iterator that points past the last subscription.
*/
Endpoint::ConstTopicIterator Endpoint::EndSubscriptions(void) const
{ {
m_MethodSources.clear(); return m_Subscriptions.end();
} }
int Endpoint::CountMethodSinks(void) const /**
* Returns the beginning of the publications list.
*
* @returns An iterator that points to the first publication.
*/
Endpoint::ConstTopicIterator Endpoint::BeginPublications(void) const
{ {
return m_MethodSinks.size(); return m_Publications.begin();
} }
int Endpoint::CountMethodSources(void) const /**
* Returns the end of the publications list.
*
* @returns An iterator that points past the last publication.
*/
Endpoint::ConstTopicIterator Endpoint::EndPublications(void) const
{ {
return m_MethodSources.size(); return m_Publications.end();
}
set<string>::const_iterator Endpoint::BeginSinks(void) const
{
return m_MethodSinks.begin();
}
set<string>::const_iterator Endpoint::EndSinks(void) const
{
return m_MethodSinks.end();
}
set<string>::const_iterator Endpoint::BeginSources(void) const
{
return m_MethodSources.begin();
}
set<string>::const_iterator Endpoint::EndSources(void) const
{
return m_MethodSources.end();
} }
/**
* Sets whether a welcome message has been received from this endpoint.
*
* @param value Whether we've received a welcome message.
*/
void Endpoint::SetReceivedWelcome(bool value) void Endpoint::SetReceivedWelcome(bool value)
{ {
m_ReceivedWelcome = value; m_ReceivedWelcome = value;
} }
/**
* Retrieves whether a welcome message has been received from this endpoint.
*
* @returns Whether we've received a welcome message.
*/
bool Endpoint::GetReceivedWelcome(void) const bool Endpoint::GetReceivedWelcome(void) const
{ {
return m_ReceivedWelcome; return m_ReceivedWelcome;
} }
/**
* Sets whether a welcome message has been sent to this endpoint.
*
* @param value Whether we've sent a welcome message.
*/
void Endpoint::SetSentWelcome(bool value) void Endpoint::SetSentWelcome(bool value)
{ {
m_SentWelcome = value; m_SentWelcome = value;
} }
/**
* Retrieves whether a welcome message has been sent to this endpoint.
*
* @returns Whether we've sent a welcome message.
*/
bool Endpoint::GetSentWelcome(void) const bool Endpoint::GetSentWelcome(void) const
{ {
return m_SentWelcome; return m_SentWelcome;

View File

@ -25,33 +25,38 @@ namespace icinga
class EndpointManager; class EndpointManager;
struct I2_ICINGA_API NewMethodEventArgs : public EventArgs /**
{ * An endpoint that can be used to send and receive messages.
string Method; */
};
class I2_ICINGA_API Endpoint : public Object class I2_ICINGA_API Endpoint : public Object
{ {
private: private:
string m_Identity; string m_Identity; /**< The identity of this endpoint. */
set<string> m_MethodSinks; set<string> m_Subscriptions; /**< The topics this endpoint is
set<string> m_MethodSources; subscribed to. */
bool m_ReceivedWelcome; set<string> m_Publications; /**< The topics this endpoint is
bool m_SentWelcome; publishing. */
bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */
bool m_SentWelcome; /**< Have we sent a welcome message to this
endpoint? */
weak_ptr<EndpointManager> m_EndpointManager; weak_ptr<EndpointManager> m_EndpointManager; /**< The endpoint manager
this endpoint is
registered with. */
public: public:
typedef shared_ptr<Endpoint> Ptr; typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr; typedef weak_ptr<Endpoint> WeakPtr;
typedef set<string>::const_iterator ConstTopicIterator;
Endpoint(void); Endpoint(void);
virtual string GetAddress(void) const = 0; virtual string GetAddress(void) const = 0;
string GetIdentity(void) const; string GetIdentity(void) const;
void SetIdentity(string identity); void SetIdentity(string identity);
bool HasIdentity(void) const;
void SetReceivedWelcome(bool value); void SetReceivedWelcome(bool value);
bool GetReceivedWelcome(void) const; bool GetReceivedWelcome(void) const;
@ -62,39 +67,30 @@ public:
shared_ptr<EndpointManager> GetEndpointManager(void) const; shared_ptr<EndpointManager> GetEndpointManager(void) const;
void SetEndpointManager(weak_ptr<EndpointManager> manager); void SetEndpointManager(weak_ptr<EndpointManager> manager);
void RegisterMethodSink(string method); void RegisterSubscription(string topic);
void UnregisterMethodSink(string method); void UnregisterSubscription(string topic);
bool IsMethodSink(string method) const; bool HasSubscription(string topic) const;
void RegisterMethodSource(string method); void RegisterPublication(string topic);
void UnregisterMethodSource(string method); void UnregisterPublication(string topic);
bool IsMethodSource(string method) const; bool HasPublication(string topic) const;
virtual bool IsLocal(void) const = 0; virtual bool IsLocal(void) const = 0;
virtual bool IsConnected(void) const = 0; virtual bool IsConnected(void) const = 0;
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) = 0; virtual void ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message) = 0;
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) = 0; virtual void ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message) = 0;
virtual void Stop(void) = 0; virtual void Stop(void) = 0;
Event<NewMethodEventArgs> OnNewMethodSink; void ClearSubscriptions(void);
Event<NewMethodEventArgs> OnNewMethodSource; void ClearPublications(void);
void ForEachMethodSink(function<int (const NewMethodEventArgs&)> callback); ConstTopicIterator BeginSubscriptions(void) const;
void ForEachMethodSource(function<int (const NewMethodEventArgs&)> callback); ConstTopicIterator EndSubscriptions(void) const;
void ClearMethodSinks(void); ConstTopicIterator BeginPublications(void) const;
void ClearMethodSources(void); ConstTopicIterator EndPublications(void) const;
int CountMethodSinks(void) const;
int CountMethodSources(void) const;
set<string>::const_iterator BeginSinks(void) const;
set<string>::const_iterator EndSinks(void) const;
set<string>::const_iterator BeginSources(void) const;
set<string>::const_iterator EndSources(void) const;
Event<EventArgs> OnIdentityChanged; Event<EventArgs> OnIdentityChanged;
Event<EventArgs> OnSessionEstablished; Event<EventArgs> OnSessionEstablished;

View File

@ -21,26 +21,52 @@
using namespace icinga; using namespace icinga;
/**
* Sets the identity of the endpoint manager. This identity is used when
* connecting to remote peers.
*
* @param identity The new identity.
*/
void EndpointManager::SetIdentity(string identity) void EndpointManager::SetIdentity(string identity)
{ {
m_Identity = identity; m_Identity = identity;
} }
/**
* Retrieves the identity for the endpoint manager.
*
* @returns The identity.
*/
string EndpointManager::GetIdentity(void) const string EndpointManager::GetIdentity(void) const
{ {
return m_Identity; return m_Identity;
} }
/**
* Sets the SSL context that is used for remote connections.
*
* @param sslContext The new SSL context.
*/
void EndpointManager::SetSSLContext(shared_ptr<SSL_CTX> sslContext) void EndpointManager::SetSSLContext(shared_ptr<SSL_CTX> sslContext)
{ {
m_SSLContext = sslContext; m_SSLContext = sslContext;
} }
/**
* Retrieves the SSL context that is used for remote connections.
*
* @returns The SSL context.
*/
shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const shared_ptr<SSL_CTX> EndpointManager::GetSSLContext(void) const
{ {
return m_SSLContext; return m_SSLContext;
} }
/**
* Creates a new JSON-RPC listener on the specified port.
*
* @param service The name of the service to listen on (@see getaddrinfo).
*/
void EndpointManager::AddListener(string service) void EndpointManager::AddListener(string service)
{ {
if (!GetSSLContext()) if (!GetSSLContext())
@ -58,6 +84,12 @@ void EndpointManager::AddListener(string service)
server->Start(); server->Start();
} }
/**
* Creates a new JSON-RPC client and connects to the specified host and port.
*
* @param node The remote host (@see getaddrinfo).
* @param service The remote port (@see getaddrinfo).
*/
void EndpointManager::AddConnection(string node, string service) void EndpointManager::AddConnection(string node, string service)
{ {
stringstream s; stringstream s;
@ -69,12 +101,22 @@ void EndpointManager::AddConnection(string node, string service)
endpoint->Connect(node, service, m_SSLContext); endpoint->Connect(node, service, m_SSLContext);
} }
/**
* Registers a new JSON-RPC server with this endpoint manager.
*
* @param server The JSON-RPC server.
*/
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server) void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
{ {
m_Servers.push_back(server); m_Servers.push_back(server);
server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler, shared_from_this()); server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler, shared_from_this());
} }
/**
* Processes a new client connection.
*
* @param ncea Event arguments.
*/
int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea) int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
{ {
string address = ncea.Client->GetPeerAddress(); string address = ncea.Client->GetPeerAddress();
@ -87,6 +129,11 @@ int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
return 0; return 0;
} }
/**
* Unregisters a JSON-RPC server.
*
* @param server The JSON-RPC server.
*/
void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server) void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
{ {
m_Servers.erase( m_Servers.erase(
@ -95,6 +142,11 @@ void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
// TODO: unbind event // TODO: unbind event
} }
/**
* Registers a new endpoint with this endpoint manager.
*
* @param endpoint The new endpoint.
*/
void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint) void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
{ {
if (!endpoint->IsLocal() && endpoint->GetIdentity() != "") if (!endpoint->IsLocal() && endpoint->GetIdentity() != "")
@ -109,6 +161,11 @@ void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
OnNewEndpoint(neea); OnNewEndpoint(neea);
} }
/**
* Unregisters an endpoint.
*
* @param endpoint The endpoint.
*/
void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint) void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
{ {
m_Endpoints.erase( m_Endpoints.erase(
@ -116,48 +173,68 @@ void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
m_Endpoints.end()); m_Endpoints.end());
} }
void EndpointManager::SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr recipient, const JsonRpcRequest& request, bool fromLocal) /**
* Sends a unicast message to the specified recipient.
*
* @param sender The sender of the message.
* @param recipient The recipient of the message.
* @param message The request.
*/
void EndpointManager::SendUnicastMessage(Endpoint::Ptr sender, Endpoint::Ptr recipient, const Message& message)
{ {
/* don't forward messages back to the sender */
if (sender == recipient) if (sender == recipient)
return; return;
/* don't forward messages between non-local endpoints */ /* don't forward messages between non-local endpoints */
if (!fromLocal && !recipient->IsLocal()) if (!sender->IsLocal() && !recipient->IsLocal())
return; return;
string method; recipient->ProcessRequest(sender, message);
if (!request.GetMethod(&method))
throw InvalidArgumentException("Missing 'method' parameter.");
if (recipient->IsMethodSink(method)) {
//Application::Log(sender->GetAddress() + " -> " + recipient->GetAddress() + ": " + method);
recipient->ProcessRequest(sender, request);
}
} }
void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal) /**
* Sends a message to exactly one recipient out of all recipients who have a
* subscription for the message's topic.
*
* @param sender The sender of the message.
* @param message The message.
*/
void EndpointManager::SendAnycastMessage(Endpoint::Ptr sender, const RpcRequest& message)
{ {
throw NotImplementedException(); throw NotImplementedException();
} }
void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal) /**
* Sends a message to all recipients who have a subscription for the
* message's topic.
*
* @param sender The sender of the message.
* @param message The message.
*/
void EndpointManager::SendMulticastMessage(Endpoint::Ptr sender, const RpcRequest& message)
{ {
#ifdef _DEBUG
string id; string id;
if (request.GetID(&id)) if (message.GetID(&id))
throw InvalidArgumentException("Multicast requests must not have an ID."); throw InvalidArgumentException("Multicast requests must not have an ID.");
#endif /* _DEBUG */
string method; string method;
if (!request.GetMethod(&method)) if (!message.GetMethod(&method))
throw InvalidArgumentException("Message is missing the 'method' property."); throw InvalidArgumentException("Message is missing the 'method' property.");
for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) for (vector<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
{ {
SendUnicastRequest(sender, *i, request, fromLocal); Endpoint::Ptr recipient = *i;
if (recipient->HasSubscription(method))
SendUnicastMessage(sender, recipient, message);
} }
} }
/**
* Calls the specified callback function for each registered endpoint.
*
* @param callback The callback function.
*/
void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback) void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback)
{ {
NewEndpointEventArgs neea; NewEndpointEventArgs neea;
@ -173,6 +250,11 @@ void EndpointManager::ForEachEndpoint(function<int (const NewEndpointEventArgs&)
} }
} }
/**
* Retrieves an endpoint that has the specified identity.
*
* @param identity The identity of the endpoint.
*/
Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const Endpoint::Ptr EndpointManager::GetEndpointByIdentity(string identity) const
{ {
vector<Endpoint::Ptr>::const_iterator i; vector<Endpoint::Ptr>::const_iterator i;

View File

@ -23,11 +23,17 @@
namespace icinga namespace icinga
{ {
/**
* Event arguments for the "new endpoint registered" event.
*/
struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs struct I2_ICINGA_API NewEndpointEventArgs : public EventArgs
{ {
icinga::Endpoint::Ptr Endpoint; icinga::Endpoint::Ptr Endpoint; /**< The new endpoint. */
}; };
/**
* Forwards messages between endpoints.
*/
class I2_ICINGA_API EndpointManager : public Object class I2_ICINGA_API EndpointManager : public Object
{ {
string m_Identity; string m_Identity;
@ -57,9 +63,9 @@ public:
void RegisterEndpoint(Endpoint::Ptr endpoint); void RegisterEndpoint(Endpoint::Ptr endpoint);
void UnregisterEndpoint(Endpoint::Ptr endpoint); void UnregisterEndpoint(Endpoint::Ptr endpoint);
void SendUnicastRequest(Endpoint::Ptr sender, Endpoint::Ptr recipient, const JsonRpcRequest& request, bool fromLocal = true); void SendUnicastMessage(Endpoint::Ptr sender, Endpoint::Ptr recipient, const Message& message);
void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true); void SendAnycastMessage(Endpoint::Ptr sender, const RpcRequest& message);
void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true); void SendMulticastMessage(Endpoint::Ptr sender, const RpcRequest& message);
void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback); void ForEachEndpoint(function<int (const NewEndpointEventArgs&)> callback);

View File

@ -28,6 +28,12 @@
using namespace icinga; using namespace icinga;
/**
* The entry point for the Icinga application.
*
* @param args Command-line arguments.
* @returns An exit status.
*/
int IcingaApplication::Main(const vector<string>& args) int IcingaApplication::Main(const vector<string>& args)
{ {
#ifdef _WIN32 #ifdef _WIN32
@ -37,7 +43,7 @@ int IcingaApplication::Main(const vector<string>& args)
#endif /* _WIN32 */ #endif /* _WIN32 */
if (args.size() < 2) { if (args.size() < 2) {
PrintUsage(args[0]); cout << "Syntax: " << args[0] << " <config-file>" << endl;
return EXIT_FAILURE; return EXIT_FAILURE;
} }
@ -87,11 +93,11 @@ int IcingaApplication::Main(const vector<string>& args)
return EXIT_SUCCESS; return EXIT_SUCCESS;
} }
void IcingaApplication::PrintUsage(const string& programPath) /**
{ * Retrieves Icinga's endpoint manager.
cout << "Syntax: " << programPath << " <config-file>" << endl; *
} * @returns The endpoint manager.
*/
EndpointManager::Ptr IcingaApplication::GetEndpointManager(void) EndpointManager::Ptr IcingaApplication::GetEndpointManager(void)
{ {
return m_EndpointManager; return m_EndpointManager;

View File

@ -23,6 +23,9 @@
namespace icinga namespace icinga
{ {
/**
* The Icinga application.
*/
class I2_ICINGA_API IcingaApplication : public Application class I2_ICINGA_API IcingaApplication : public Application
{ {
private: private:
@ -54,8 +57,6 @@ public:
int Main(const vector<string>& args); int Main(const vector<string>& args);
void PrintUsage(const string& programPath);
EndpointManager::Ptr GetEndpointManager(void); EndpointManager::Ptr GetEndpointManager(void);
void SetPrivateKeyFile(string privkey); void SetPrivateKeyFile(string privkey);

View File

@ -23,6 +23,9 @@
namespace icinga namespace icinga
{ {
/**
* A component that can be loaded into the Icinga application at run-time.
*/
class I2_ICINGA_API IcingaComponent : public Component class I2_ICINGA_API IcingaComponent : public Component
{ {
protected: protected:

View File

@ -61,7 +61,7 @@ bool JsonRpcEndpoint::IsConnected(void) const
return (bool)m_Client; return (bool)m_Client;
} }
void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message)
{ {
if (IsConnected()) { if (IsConnected()) {
string id; string id;
@ -75,7 +75,7 @@ void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest&
} }
} }
void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message)
{ {
if (IsConnected()) if (IsConnected())
m_Client->SendMessage(message); m_Client->SendMessage(message);
@ -88,18 +88,18 @@ int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea)
string method; string method;
if (message.GetPropertyString("method", &method)) { if (message.GetPropertyString("method", &method)) {
if (!IsMethodSource(method)) if (!HasPublication(method))
return 0; return 0;
JsonRpcRequest request = message; RpcRequest request = message;
string id; string id;
if (request.GetID(&id)) if (request.GetID(&id))
GetEndpointManager()->SendAnycastRequest(sender, request, false); GetEndpointManager()->SendAnycastMessage(sender, request);
else else
GetEndpointManager()->SendMulticastRequest(sender, request, false); GetEndpointManager()->SendMulticastMessage(sender, request);
} else { } else {
JsonRpcResponse response = message; RpcResponse response = message;
// TODO: deal with response messages // TODO: deal with response messages
throw NotImplementedException(); throw NotImplementedException();
@ -114,12 +114,13 @@ int JsonRpcEndpoint::ClientClosedHandler(const EventArgs&)
m_PendingCalls.clear(); m_PendingCalls.clear();
// TODO: _only_ clear non-persistent method sources/sinks // TODO: _only_ clear non-persistent publications/subscriptions
// unregister ourselves if no persistent sources/sinks are left (use a timer for that, once we have a TTL property for the methods) // unregister ourselves if no persistent publications/subscriptions are left (use a timer for that, once we have a TTL property for the topics)
ClearMethodSinks(); ClearSubscriptions();
ClearMethodSources(); ClearPublications();
if (CountMethodSinks() == 0) // remove the endpoint if there are no more subscriptions */
if (BeginSubscriptions() == EndSubscriptions())
GetEndpointManager()->UnregisterEndpoint(static_pointer_cast<Endpoint>(shared_from_this())); GetEndpointManager()->UnregisterEndpoint(static_pointer_cast<Endpoint>(shared_from_this()));
m_Client.reset(); m_Client.reset();

View File

@ -23,6 +23,9 @@
namespace icinga namespace icinga
{ {
/**
* A JSON-RPC endpoint that can be used to communicate with a remote
* Icinga instance. */
class I2_ICINGA_API JsonRpcEndpoint : public Endpoint class I2_ICINGA_API JsonRpcEndpoint : public Endpoint
{ {
private: private:
@ -52,8 +55,8 @@ public:
virtual bool IsLocal(void) const; virtual bool IsLocal(void) const;
virtual bool IsConnected(void) const; virtual bool IsConnected(void) const;
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message); virtual void ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message); virtual void ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message);
virtual void Stop(void); virtual void Stop(void);
}; };

View File

@ -38,31 +38,31 @@ bool VirtualEndpoint::IsConnected(void) const
return true; return true;
} }
void VirtualEndpoint::RegisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback) void VirtualEndpoint::RegisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback)
{ {
m_MethodHandlers[method] += callback; m_TopicHandlers[topic] += callback;
RegisterMethodSink(method); RegisterSubscription(topic);
} }
void VirtualEndpoint::UnregisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback) void VirtualEndpoint::UnregisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback)
{ {
// TODO: implement // TODO: implement
//m_MethodHandlers[method] -= callback; //m_TopicHandlers[method] -= callback;
//UnregisterMethodSink(method); //UnregisterMethodSubscription(method);
throw NotImplementedException(); throw NotImplementedException();
} }
void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& request) void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const RpcRequest& request)
{ {
string method; string method;
if (!request.GetMethod(&method)) if (!request.GetMethod(&method))
return; return;
map<string, Event<NewRequestEventArgs> >::iterator i = m_MethodHandlers.find(method); map<string, Event<NewRequestEventArgs> >::iterator i = m_TopicHandlers.find(method);
if (i == m_MethodHandlers.end()) if (i == m_TopicHandlers.end())
return; return;
NewRequestEventArgs nrea; NewRequestEventArgs nrea;
@ -72,7 +72,7 @@ void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest&
i->second(nrea); i->second(nrea);
} }
void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& response) void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const RpcResponse& response)
{ {
// TODO: figure out which request this response belongs to and notify the caller // TODO: figure out which request this response belongs to and notify the caller
throw NotImplementedException(); throw NotImplementedException();

View File

@ -29,28 +29,31 @@ struct I2_ICINGA_API NewRequestEventArgs : public EventArgs
typedef weak_ptr<NewRequestEventArgs> WeakPtr; typedef weak_ptr<NewRequestEventArgs> WeakPtr;
Endpoint::Ptr Sender; Endpoint::Ptr Sender;
JsonRpcRequest Request; RpcRequest Request;
}; };
/**
* A local endpoint.
*/
class I2_ICINGA_API VirtualEndpoint : public Endpoint class I2_ICINGA_API VirtualEndpoint : public Endpoint
{ {
private: private:
map< string, Event<NewRequestEventArgs> > m_MethodHandlers; map< string, Event<NewRequestEventArgs> > m_TopicHandlers;
public: public:
typedef shared_ptr<VirtualEndpoint> Ptr; typedef shared_ptr<VirtualEndpoint> Ptr;
typedef weak_ptr<VirtualEndpoint> WeakPtr; typedef weak_ptr<VirtualEndpoint> WeakPtr;
void RegisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback); void RegisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback);
void UnregisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback); void UnregisterTopicHandler(string topic, function<int (const NewRequestEventArgs&)> callback);
virtual string GetAddress(void) const; virtual string GetAddress(void) const;
virtual bool IsLocal(void) const; virtual bool IsLocal(void) const;
virtual bool IsConnected(void) const; virtual bool IsConnected(void) const;
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message); virtual void ProcessRequest(Endpoint::Ptr sender, const RpcRequest& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message); virtual void ProcessResponse(Endpoint::Ptr sender, const RpcResponse& message);
virtual void Stop(void); virtual void Stop(void);
}; };

View File

@ -8,16 +8,16 @@ libjsonrpc_la_SOURCES = \
i2-jsonrpc.h \ i2-jsonrpc.h \
jsonrpcclient.cpp \ jsonrpcclient.cpp \
jsonrpcclient.h \ jsonrpcclient.h \
jsonrpcrequest.cpp \
jsonrpcrequest.h \
jsonrpcresponse.cpp \
jsonrpcresponse.h \
jsonrpcserver.cpp \ jsonrpcserver.cpp \
jsonrpcserver.h \ jsonrpcserver.h \
message.cpp \ message.cpp \
message.h \ message.h \
netstring.cpp \ netstring.cpp \
netstring.h netstring.h \
rpcrequest.cpp \
rpcrequest.h \
rpcresponse.cpp \
rpcresponse.h
libjsonrpc_la_CXXFLAGS = \ libjsonrpc_la_CXXFLAGS = \
-DI2_JSONRPC_BUILD \ -DI2_JSONRPC_BUILD \

View File

@ -32,9 +32,9 @@
#include "variant.h" #include "variant.h"
#include "dictionary.h" #include "dictionary.h"
#include "message.h" #include "message.h"
#include "rpcrequest.h"
#include "rpcresponse.h"
#include "netstring.h" #include "netstring.h"
#include "jsonrpcrequest.h"
#include "jsonrpcresponse.h"
#include "jsonrpcclient.h" #include "jsonrpcclient.h"
#include "jsonrpcserver.h" #include "jsonrpcserver.h"

View File

@ -13,16 +13,16 @@
<ItemGroup> <ItemGroup>
<ClInclude Include="i2-jsonrpc.h" /> <ClInclude Include="i2-jsonrpc.h" />
<ClInclude Include="jsonrpcclient.h" /> <ClInclude Include="jsonrpcclient.h" />
<ClInclude Include="jsonrpcrequest.h" /> <ClInclude Include="rpcrequest.h" />
<ClInclude Include="jsonrpcresponse.h" /> <ClInclude Include="rpcresponse.h" />
<ClInclude Include="jsonrpcserver.h" /> <ClInclude Include="jsonrpcserver.h" />
<ClInclude Include="message.h" /> <ClInclude Include="message.h" />
<ClInclude Include="netstring.h" /> <ClInclude Include="netstring.h" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="jsonrpcclient.cpp" /> <ClCompile Include="jsonrpcclient.cpp" />
<ClCompile Include="jsonrpcrequest.cpp" /> <ClCompile Include="rpcrequest.cpp" />
<ClCompile Include="jsonrpcresponse.cpp" /> <ClCompile Include="rpcresponse.cpp" />
<ClCompile Include="jsonrpcserver.cpp" /> <ClCompile Include="jsonrpcserver.cpp" />
<ClCompile Include="message.cpp" /> <ClCompile Include="message.cpp" />
<ClCompile Include="netstring.cpp" /> <ClCompile Include="netstring.cpp" />

View File

@ -5,8 +5,8 @@
<ClCompile Include="jsonrpcserver.cpp" /> <ClCompile Include="jsonrpcserver.cpp" />
<ClCompile Include="message.cpp" /> <ClCompile Include="message.cpp" />
<ClCompile Include="netstring.cpp" /> <ClCompile Include="netstring.cpp" />
<ClCompile Include="jsonrpcrequest.cpp" /> <ClCompile Include="rpcrequest.cpp" />
<ClCompile Include="jsonrpcresponse.cpp" /> <ClCompile Include="rpcresponse.cpp" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="i2-jsonrpc.h" /> <ClInclude Include="i2-jsonrpc.h" />
@ -14,7 +14,7 @@
<ClInclude Include="jsonrpcserver.h" /> <ClInclude Include="jsonrpcserver.h" />
<ClInclude Include="message.h" /> <ClInclude Include="message.h" />
<ClInclude Include="netstring.h" /> <ClInclude Include="netstring.h" />
<ClInclude Include="jsonrpcrequest.h" /> <ClInclude Include="rpcrequest.h" />
<ClInclude Include="jsonrpcresponse.h" /> <ClInclude Include="rpcresponse.h" />
</ItemGroup> </ItemGroup>
</Project> </Project>

View File

@ -17,21 +17,21 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#ifndef JSONRPCREQUEST_H #ifndef RpcRequest_H
#define JSONRPCREQUEST_H #define RpcRequest_H
namespace icinga namespace icinga
{ {
class I2_JSONRPC_API JsonRpcRequest : public Message class I2_JSONRPC_API RpcRequest : public Message
{ {
public: public:
JsonRpcRequest(void) : Message() { RpcRequest(void) : Message() {
SetVersion("2.0"); SetVersion("2.0");
} }
JsonRpcRequest(const Message& message) : Message(message) { } RpcRequest(const Message& message) : Message(message) { }
inline bool GetVersion(string *value) const inline bool GetVersion(string *value) const
{ {
@ -76,4 +76,4 @@ public:
} }
#endif /* JSONRPCREQUEST_H */ #endif /* RpcRequest_H */

View File

@ -17,20 +17,20 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#ifndef JSONRPCRESPONSE_H #ifndef RpcResponse_H
#define JSONRPCRESPONSE_H #define RpcResponse_H
namespace icinga namespace icinga
{ {
class I2_JSONRPC_API JsonRpcResponse : public Message class I2_JSONRPC_API RpcResponse : public Message
{ {
public: public:
JsonRpcResponse(void) : Message() { RpcResponse(void) : Message() {
SetVersion("2.0"); SetVersion("2.0");
} }
JsonRpcResponse(const Message& message) : Message(message) { } RpcResponse(const Message& message) : Message(message) { }
inline bool GetVersion(string *value) const inline bool GetVersion(string *value) const
{ {
@ -75,4 +75,4 @@ public:
} }
#endif /* JSONRPCRESPONSE_H */ #endif /* RpcResponse_H */