Refactored messaging system.

This commit is contained in:
Gunnar Beutner 2012-04-18 15:22:25 +02:00
parent ab32aa4f71
commit 7474b63dff
50 changed files with 1105 additions and 643 deletions

View File

@ -20,6 +20,11 @@ Application::Application(void)
char *debugging = getenv("_DEBUG");
m_Debugging = (debugging && strtol(debugging, NULL, 10) != 0);
#ifdef _WIN32
if (IsDebuggerPresent())
m_Debugging = true;
#endif /* _WIN32 */
m_ShuttingDown = false;
m_ConfigHive = make_shared<ConfigHive>();
}
@ -48,6 +53,8 @@ void Application::RunEventLoop(void)
fd_set readfds, writefds, exceptfds;
int nfds = -1;
Timer::CallExpiredTimers();
FD_ZERO(&readfds);
FD_ZERO(&writefds);
FD_ZERO(&exceptfds);
@ -72,12 +79,7 @@ void Application::RunEventLoop(void)
nfds = fd;
}
long sleep;
do {
Timer::CallExpiredTimers();
sleep = (long)(Timer::GetNextCall() - time(NULL));
} while (sleep <= 0);
long sleep = (long)(Timer::GetNextCall() - time(NULL));
if (m_ShuttingDown)
break;
@ -99,8 +101,8 @@ void Application::RunEventLoop(void)
else if (ready == 0)
continue;
EventArgs::Ptr ea = make_shared<EventArgs>();
ea->Source = shared_from_this();
EventArgs ea;
ea.Source = shared_from_this();
list<Socket::WeakPtr>::iterator prev, i;
for (i = Socket::Sockets.begin(); i != Socket::Sockets.end(); ) {
@ -206,13 +208,29 @@ Component::Ptr Application::LoadComponent(const string& path, const ConfigObject
throw ComponentLoadException("Loadable module does not contain CreateComponent function");
component = Component::Ptr(pCreateComponent());
component->SetApplication(static_pointer_cast<Application>(shared_from_this()));
component->SetConfig(componentConfig);
RegisterComponent(component);
return component;
}
void Application::RegisterComponent(Component::Ptr component)
{
component->SetApplication(static_pointer_cast<Application>(shared_from_this()));
m_Components[component->GetName()] = component;
component->Start();
}
return component;
void Application::UnregisterComponent(Component::Ptr component)
{
string name = component->GetName();
Log("Unloading component '%s'", name.c_str());
map<string, Component::Ptr>::iterator i = m_Components.find(name);
if (i != m_Components.end()) {
m_Components.erase(i);
component->Stop();
}
}
Component::Ptr Application::GetComponent(const string& name)
@ -225,22 +243,6 @@ Component::Ptr Application::GetComponent(const string& name)
return ci->second;
}
void Application::UnloadComponent(const string& name)
{
map<string, Component::Ptr>::iterator ci = m_Components.find(name);
if (ci == m_Components.end())
return;
Log("Unloading component '%s'", name.c_str());
Component::Ptr component = ci->second;
component->Stop();
m_Components.erase(ci);
// TODO: unload DLL
}
void Application::Log(const char *format, ...)
{
char message[512];
@ -396,7 +398,7 @@ int application_main(int argc, char **argv, Application *instance)
try {
result = Application::Instance->Main(args);
} catch (const Exception& ex) {
cout << "---" << endl;
cerr << "---" << endl;
string klass = typeid(ex).name();
@ -410,8 +412,8 @@ int application_main(int argc, char **argv, Application *instance)
}
#endif /* HAVE_GCC_ABI_DEMANGLE */
cout << "Exception: " << klass << endl;
cout << "Message: " << ex.GetMessage() << endl;
cerr << "Exception: " << klass << endl;
cerr << "Message: " << ex.GetMessage() << endl;
return EXIT_FAILURE;
}

View File

@ -38,7 +38,8 @@ public:
ConfigHive::Ptr GetConfigHive(void);
shared_ptr<Component> LoadComponent(const string& path, const ConfigObject::Ptr& componentConfig);
void UnloadComponent(const string& name);
void RegisterComponent(shared_ptr<Component> component);
void UnregisterComponent(shared_ptr<Component> component);
shared_ptr<Component> GetComponent(const string& name);
void AddComponentSearchDir(const string& componentDirectory);

View File

@ -17,6 +17,7 @@
<ClCompile Include="configcollection.cpp" />
<ClCompile Include="confighive.cpp" />
<ClCompile Include="configobject.cpp" />
<ClCompile Include="dictionary.cpp" />
<ClCompile Include="exception.cpp" />
<ClCompile Include="fifo.cpp" />
<ClCompile Include="memory.cpp" />
@ -29,6 +30,7 @@
<ClCompile Include="thread.cpp" />
<ClCompile Include="timer.cpp" />
<ClCompile Include="unix.cpp" />
<ClCompile Include="variant.cpp" />
<ClCompile Include="win32.cpp" />
</ItemGroup>
<ItemGroup>
@ -40,6 +42,7 @@
<ClInclude Include="configobject.h" />
<ClInclude Include="cxx11-compat.h" />
<ClInclude Include="delegate.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="event.h" />
<ClInclude Include="exception.h" />
<ClInclude Include="fifo.h" />
@ -53,6 +56,7 @@
<ClInclude Include="thread.h" />
<ClInclude Include="timer.h" />
<ClInclude Include="unix.h" />
<ClInclude Include="variant.h" />
<ClInclude Include="win32.h" />
</ItemGroup>
<ItemGroup>

View File

@ -16,8 +16,8 @@ void ConfigCollection::AddObject(const ConfigObject::Ptr& object)
{
Objects[object->GetName()] = object;
ConfigObjectEventArgs::Ptr ea = make_shared<ConfigObjectEventArgs>();
ea->Source = object;
ConfigObjectEventArgs ea;
ea.Source = object;
OnObjectCreated(ea);
ConfigHive::Ptr hive = m_Hive.lock();
@ -32,8 +32,8 @@ void ConfigCollection::RemoveObject(const ConfigObject::Ptr& object)
if (oi != Objects.end()) {
Objects.erase(oi);
ConfigObjectEventArgs::Ptr ea = make_shared<ConfigObjectEventArgs>();
ea->Source = object;
ConfigObjectEventArgs ea;
ea.Source = object;
OnObjectRemoved(ea);
ConfigHive::Ptr hive = m_Hive.lock();
@ -52,11 +52,12 @@ ConfigObject::Ptr ConfigCollection::GetObject(const string& name)
return oi->second;
}
void ConfigCollection::ForEachObject(function<int (ConfigObjectEventArgs::Ptr)> callback)
void ConfigCollection::ForEachObject(function<int (const ConfigObjectEventArgs&)> callback)
{
ConfigObjectEventArgs ea;
for (ObjectIterator oi = Objects.begin(); oi != Objects.end(); oi++) {
ConfigObjectEventArgs::Ptr ea = make_shared<ConfigObjectEventArgs>();
ea->Source = oi->second;
ea.Source = oi->second;
callback(ea);
}
}

View File

@ -25,11 +25,11 @@ public:
void RemoveObject(const ConfigObject::Ptr& object);
ConfigObject::Ptr GetObject(const string& name = string());
void ForEachObject(function<int (ConfigObjectEventArgs::Ptr)> callback);
void ForEachObject(function<int (const ConfigObjectEventArgs&)> callback);
Event<ConfigObjectEventArgs::Ptr> OnObjectCreated;
Event<ConfigObjectEventArgs::Ptr> OnObjectRemoved;
Event<ConfigObjectEventArgs::Ptr> OnPropertyChanged;
Event<ConfigObjectEventArgs> OnObjectCreated;
Event<ConfigObjectEventArgs> OnObjectRemoved;
Event<ConfigObjectEventArgs> OnPropertyChanged;
};
}

View File

@ -30,7 +30,7 @@ ConfigCollection::Ptr ConfigHive::GetCollection(const string& collection)
return ci->second;
}
void ConfigHive::ForEachObject(const string& type, function<int (ConfigObjectEventArgs::Ptr)> callback)
void ConfigHive::ForEachObject(const string& type, function<int (const ConfigObjectEventArgs&)> callback)
{
CollectionIterator ci = Collections.find(type);

View File

@ -18,11 +18,11 @@ public:
ConfigObject::Ptr GetObject(const string& collection, const string& name = string());
ConfigCollection::Ptr GetCollection(const string& collection);
void ForEachObject(const string& type, function<int (ConfigObjectEventArgs::Ptr)> callback);
void ForEachObject(const string& type, function<int (const ConfigObjectEventArgs&)> callback);
Event<ConfigObjectEventArgs::Ptr> OnObjectCreated;
Event<ConfigObjectEventArgs::Ptr> OnObjectRemoved;
Event<ConfigObjectEventArgs::Ptr> OnPropertyChanged;
Event<ConfigObjectEventArgs> OnObjectCreated;
Event<ConfigObjectEventArgs> OnObjectRemoved;
Event<ConfigObjectEventArgs> OnPropertyChanged;
};
}

View File

@ -44,13 +44,13 @@ void ConfigObject::SetProperty(const string& name, const string& value)
ConfigHive::Ptr hive = m_Hive.lock();
if (hive) {
ConfigObjectEventArgs::Ptr ea = make_shared<ConfigObjectEventArgs>();
ea->Source = shared_from_this();
ea->Property = name;
ConfigObjectEventArgs ea;
ea.Source = shared_from_this();
ea.Property = name;
string oldValue;
if (GetProperty(name, &oldValue))
ea->OldValue = oldValue;
ea.OldValue = oldValue;
hive->GetCollection(m_Type)->OnPropertyChanged(ea);
hive->OnPropertyChanged(ea);

102
base/dictionary.cpp Normal file
View File

@ -0,0 +1,102 @@
#include "i2-base.h"
using namespace icinga;
bool Dictionary::GetValueVariant(string key, Variant *value)
{
DictionaryIterator i = m_Data.find(key);
if (i == m_Data.end())
return false;
*value = i->second;
return true;
}
void Dictionary::SetValueVariant(string key, const Variant& value)
{
m_Data.erase(key);
m_Data[key] = value;
}
bool Dictionary::GetValueString(string key, string *value)
{
Variant data;
if (!GetValueVariant(key, &data))
return false;
*value = data;
return true;
}
void Dictionary::SetValueString(string key, const string& value)
{
SetValueVariant(key, Variant(value));
}
bool Dictionary::GetValueInteger(string key, long *value)
{
Variant data;
if (!GetValueVariant(key, &data))
return false;
*value = data;
return true;
}
void Dictionary::SetValueInteger(string key, long value)
{
SetValueVariant(key, Variant(value));
}
bool Dictionary::GetValueDictionary(string key, Dictionary::Ptr *value)
{
Dictionary::Ptr dictionary;
Variant data;
if (!GetValueVariant(key, &data))
return false;
dictionary = dynamic_pointer_cast<Dictionary>(data.GetObject());
if (dictionary == NULL)
throw InvalidArgumentException();
*value = dictionary;
return true;
}
void Dictionary::SetValueDictionary(string key, const Dictionary::Ptr& value)
{
SetValueVariant(key, Variant(value));
}
bool Dictionary::GetValueObject(string key, Object::Ptr *value)
{
Variant data;
if (!GetValueVariant(key, &data))
return false;
*value = data;
return true;
}
void Dictionary::SetValueObject(string key, const Object::Ptr& value)
{
SetValueVariant(key, Variant(value));
}
DictionaryIterator Dictionary::Begin(void)
{
return m_Data.begin();
}
DictionaryIterator Dictionary::End(void)
{
return m_Data.end();
}

39
base/dictionary.h Normal file
View File

@ -0,0 +1,39 @@
#ifndef DICTIONARY_H
#define DICTIONARY_H
namespace icinga
{
typedef map<string, Variant>::iterator DictionaryIterator;
class I2_BASE_API Dictionary : public Object
{
private:
map<string, Variant> m_Data;
public:
typedef shared_ptr<Dictionary> Ptr;
typedef weak_ptr<Dictionary> WeakPtr;
bool GetValueVariant(string key, Variant *value);
void SetValueVariant(string key, const Variant& value);
bool GetValueString(string key, string *value);
void SetValueString(string key, const string& value);
bool GetValueInteger(string key, long *value);
void SetValueInteger(string key, long value);
bool GetValueDictionary(string key, Dictionary::Ptr *value);
void SetValueDictionary(string key, const Dictionary::Ptr& value);
bool GetValueObject(string key, Object::Ptr *value);
void SetValueObject(string key, const Object::Ptr& value);
DictionaryIterator Begin(void);
DictionaryIterator End(void);
};
}
#endif /* DICTIONARY_H */

View File

@ -4,11 +4,8 @@
namespace icinga
{
struct I2_BASE_API EventArgs : public Object
struct I2_BASE_API EventArgs
{
typedef shared_ptr<EventArgs> Ptr;
typedef weak_ptr<EventArgs> WeakPtr;
Object::Ptr Source;
};
@ -16,7 +13,7 @@ template<class TArgs>
class Event
{
public:
typedef function<int (TArgs)> DelegateType;
typedef function<int (const TArgs&)> DelegateType;
private:
list<DelegateType> m_Delegates;

View File

@ -70,6 +70,8 @@ using namespace std::tr1::placeholders;
#include "memory.h"
#include "delegate.h"
#include "event.h"
#include "variant.h"
#include "dictionary.h"
#include "timer.h"
#include "fifo.h"
#include "socket.h"

View File

@ -16,6 +16,8 @@ Socket::~Socket(void)
void Socket::Start(void)
{
assert(m_FD != INVALID_SOCKET);
OnException += bind_weak(&Socket::ExceptionEventHandler, shared_from_this());
Sockets.push_front(static_pointer_cast<Socket>(shared_from_this()));
@ -54,8 +56,8 @@ void Socket::Close(bool from_dtor)
/* nobody can possibly have a valid event subscription when the destructor has been called */
if (!from_dtor) {
EventArgs::Ptr ea = make_shared<EventArgs>();
ea->Source = shared_from_this();
EventArgs ea;
ea.Source = shared_from_this();
OnClosed(ea);
}
}
@ -84,7 +86,7 @@ string Socket::FormatErrorCode(int code)
return result;
}
int Socket::ExceptionEventHandler(EventArgs::Ptr ea)
int Socket::ExceptionEventHandler(const EventArgs& ea)
{
int opt;
socklen_t optlen = sizeof(opt);
@ -97,10 +99,10 @@ int Socket::ExceptionEventHandler(EventArgs::Ptr ea)
}
if (opt != 0) {
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
ea->Code = opt;
ea->Message = FormatErrorCode(ea->Code);
OnError(ea);
SocketErrorEventArgs sea;
sea.Code = opt;
sea.Message = FormatErrorCode(sea.Code);
OnError(sea);
Close();
}

View File

@ -17,7 +17,7 @@ class I2_BASE_API Socket : public Object
private:
SOCKET m_FD;
int ExceptionEventHandler(EventArgs::Ptr ea);
int ExceptionEventHandler(const EventArgs& ea);
protected:
string FormatErrorCode(int errorCode);
@ -40,12 +40,12 @@ public:
static void CloseAllSockets(void);
Event<EventArgs::Ptr> OnReadable;
Event<EventArgs::Ptr> OnWritable;
Event<EventArgs::Ptr> OnException;
Event<EventArgs> OnReadable;
Event<EventArgs> OnWritable;
Event<EventArgs> OnException;
Event<SocketErrorEventArgs::Ptr> OnError;
Event<EventArgs::Ptr> OnClosed;
Event<SocketErrorEventArgs> OnError;
Event<EventArgs> OnClosed;
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;

View File

@ -41,15 +41,15 @@ void TCPClient::Connect(const string& hostname, unsigned short port)
#else /* _WIN32 */
if (rc < 0 && errno != EINPROGRESS) {
#endif /* _WIN32 */
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
SocketErrorEventArgs sea;
#ifdef _WIN32
ea->Code = WSAGetLastError();
sea.Code = WSAGetLastError();
#else /* _WIN32 */
ea->Code = errno;
sea.Code = errno;
#endif /* _WIN32 */
ea->Message = FormatErrorCode(ea->Code);
sea.Message = FormatErrorCode(sea.Code);
OnError(ea);
OnError(sea);
Close();
}
@ -79,7 +79,7 @@ int TCPClient::GetPeerPort(void)
return m_PeerPort;
}
int TCPClient::ReadableEventHandler(EventArgs::Ptr ea)
int TCPClient::ReadableEventHandler(const EventArgs& ea)
{
int rc;
@ -96,15 +96,15 @@ int TCPClient::ReadableEventHandler(EventArgs::Ptr ea)
if (rc <= 0) {
if (rc < 0) {
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
SocketErrorEventArgs sea;
#ifdef _WIN32
ea->Code = WSAGetLastError();
sea.Code = WSAGetLastError();
#else /* _WIN32 */
ea->Code = errno;
sea.Code = errno;
#endif /* _WIN32 */
ea->Message = FormatErrorCode(ea->Code);
sea.Message = FormatErrorCode(sea.Code);
OnError(ea);
OnError(sea);
}
Close();
@ -113,14 +113,14 @@ int TCPClient::ReadableEventHandler(EventArgs::Ptr ea)
m_RecvQueue->Write(NULL, rc);
EventArgs::Ptr dea = make_shared<EventArgs>();
dea->Source = shared_from_this();
EventArgs dea;
dea.Source = shared_from_this();
OnDataAvailable(dea);
return 0;
}
int TCPClient::WritableEventHandler(EventArgs::Ptr ea)
int TCPClient::WritableEventHandler(const EventArgs& ea)
{
int rc;
@ -128,15 +128,15 @@ int TCPClient::WritableEventHandler(EventArgs::Ptr ea)
if (rc <= 0) {
if (rc < 0) {
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
SocketErrorEventArgs sea;
#ifdef _WIN32
ea->Code = WSAGetLastError();
sea.Code = WSAGetLastError();
#else /* _WIN32 */
ea->Code = errno;
sea.Code = errno;
#endif /* _WIN32 */
ea->Message = FormatErrorCode(ea->Code);
sea.Message = FormatErrorCode(sea.Code);
OnError(ea);
OnError(sea);
}
Close();

View File

@ -13,8 +13,8 @@ private:
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
int ReadableEventHandler(EventArgs::Ptr ea);
int WritableEventHandler(EventArgs::Ptr ea);
int ReadableEventHandler(const EventArgs& ea);
int WritableEventHandler(const EventArgs& ea);
public:
typedef shared_ptr<TCPClient> Ptr;
@ -35,7 +35,7 @@ public:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
Event<EventArgs::Ptr> OnDataAvailable;
Event<EventArgs> OnDataAvailable;
};
}

View File

@ -36,7 +36,7 @@ void TCPServer::Listen(void)
Start();
}
int TCPServer::ReadableEventHandler(EventArgs::Ptr ea)
int TCPServer::ReadableEventHandler(const EventArgs& ea)
{
int fd;
sockaddr_in addr;
@ -44,11 +44,32 @@ int TCPServer::ReadableEventHandler(EventArgs::Ptr ea)
fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
NewClientEventArgs::Ptr nea = make_shared<NewClientEventArgs>();
nea->Source = shared_from_this();
nea->Client = static_pointer_cast<TCPSocket>(m_ClientFactory());
nea->Client->SetFD(fd);
nea->Client->Start();
if (fd == INVALID_SOCKET) {
#ifdef _WIN32
if (WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (errno == EINPROGRESS)
#endif /* _WIN32 */
return 0;
SocketErrorEventArgs sea;
#ifdef _WIN32
sea.Code = WSAGetLastError();
#else /* _WIN32 */
sea.Code = errno;
#endif /* _WIN32 */
sea.Message = FormatErrorCode(sea.Code);
OnError(sea);
Close();
}
NewClientEventArgs nea;
nea.Source = shared_from_this();
nea.Client = static_pointer_cast<TCPSocket>(m_ClientFactory());
nea.Client->SetFD(fd);
nea.Client->Start();
OnNewClient(nea);
return 0;

View File

@ -15,7 +15,7 @@ struct I2_BASE_API NewClientEventArgs : public EventArgs
class I2_BASE_API TCPServer : public TCPSocket
{
private:
int ReadableEventHandler(EventArgs::Ptr ea);
int ReadableEventHandler(const EventArgs& ea);
factory_function m_ClientFactory;
@ -32,7 +32,7 @@ public:
void Listen(void);
Event<NewClientEventArgs::Ptr> OnNewClient;
Event<NewClientEventArgs> OnNewClient;
virtual bool WantsToRead(void) const;
};

View File

@ -9,14 +9,14 @@ void TCPSocket::MakeSocket(void)
int fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd == INVALID_SOCKET) {
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
SocketErrorEventArgs sea;
#ifdef _WIN32
ea->Code = WSAGetLastError();
sea.Code = WSAGetLastError();
#else /* _WIN32 */
ea->Code = errno;
sea.Code = errno;
#endif /* _WIN32 */
ea->Message = FormatErrorCode(ea->Code);
OnError(ea);
sea.Message = FormatErrorCode(sea.Code);
OnError(sea);
}
SetFD(fd);
@ -39,15 +39,15 @@ void TCPSocket::Bind(const char *hostname, unsigned short port)
int rc = ::bind(GetFD(), (sockaddr *)&sin, sizeof(sin));
if (rc < 0) {
SocketErrorEventArgs::Ptr ea = make_shared<SocketErrorEventArgs>();
SocketErrorEventArgs sea;
#ifdef _WIN32
ea->Code = WSAGetLastError();
sea.Code = WSAGetLastError();
#else /* _WIN32 */
ea->Code = errno;
sea.Code = errno;
#endif /* _WIN32 */
ea->Message = FormatErrorCode(ea->Code);
sea.Message = FormatErrorCode(sea.Code);
OnError(ea);
OnError(sea);
Close();
}

View File

@ -74,10 +74,10 @@ void Timer::StopAllTimers(void)
* the timer that originally invoked the delegate */
void Timer::Call(void)
{
TimerEventArgs::Ptr ea = make_shared<TimerEventArgs>();
ea->Source = shared_from_this();
ea->UserArgs = m_UserArgs;
OnTimerExpired(ea);
TimerEventArgs tea;
tea.Source = shared_from_this();
tea.UserArgs = m_UserArgs;
OnTimerExpired(tea);
}
void Timer::SetInterval(unsigned int interval)
@ -90,13 +90,13 @@ unsigned int Timer::GetInterval(void) const
return m_Interval;
}
void Timer::SetUserArgs(const EventArgs::Ptr& userArgs)
void Timer::SetUserArgs(const EventArgs& userArgs)
{
m_UserArgs = userArgs;
}
EventArgs::Ptr Timer::GetUserArgs(void) const
EventArgs Timer::GetUserArgs(void) const
{
return m_UserArgs;
}

View File

@ -10,13 +10,13 @@ struct I2_BASE_API TimerEventArgs : public EventArgs
typedef shared_ptr<TimerEventArgs> Ptr;
typedef weak_ptr<TimerEventArgs> WeakPtr;
EventArgs::Ptr UserArgs;
EventArgs UserArgs;
};
class I2_BASE_API Timer : public Object
{
private:
EventArgs::Ptr m_UserArgs;
EventArgs m_UserArgs;
unsigned int m_Interval;
time_t m_Next;
@ -37,8 +37,8 @@ public:
void SetInterval(unsigned int interval);
unsigned int GetInterval(void) const;
void SetUserArgs(const EventArgs::Ptr& userArgs);
EventArgs::Ptr GetUserArgs(void) const;
void SetUserArgs(const EventArgs& userArgs);
EventArgs GetUserArgs(void) const;
static time_t GetNextCall(void);
static void CallExpiredTimers(void);
@ -49,7 +49,7 @@ public:
void Reschedule(time_t next);
Event<TimerEventArgs::Ptr> OnTimerExpired;
Event<TimerEventArgs> OnTimerExpired;
};
}

68
base/variant.cpp Normal file
View File

@ -0,0 +1,68 @@
#include "i2-base.h"
using namespace icinga;
Variant::Variant(void) : m_Type(VariantEmpty)
{
}
Variant::Variant(long value) : m_Type(VariantInteger), m_IntegerValue(value)
{
}
Variant::Variant(string value) : m_Type(VariantString), m_StringValue(value)
{
}
Variant::Variant(Object::Ptr value) : m_Type(VariantObject), m_ObjectValue(value)
{
}
void Variant::Convert(VariantType newType) const
{
if (newType == m_Type)
return;
throw NotImplementedException();
}
VariantType Variant::GetType(void) const
{
return m_Type;
}
long Variant::GetInteger(void) const
{
Convert(VariantInteger);
return m_IntegerValue;
}
string Variant::GetString(void) const
{
Convert(VariantString);
return m_StringValue;
}
Object::Ptr Variant::GetObject(void) const
{
Convert(VariantObject);
return m_ObjectValue;
}
Variant::operator long(void) const
{
return GetInteger();
}
Variant::operator string(void) const
{
return GetString();
}
Variant::operator Object::Ptr(void) const
{
return GetObject();
}

45
base/variant.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef VARIANT_H
#define VARIANT_H
namespace icinga
{
enum I2_BASE_API VariantType
{
VariantEmpty,
VariantInteger,
VariantString,
VariantObject
};
class I2_BASE_API Variant
{
private:
mutable long m_IntegerValue;
mutable string m_StringValue;
mutable Object::Ptr m_ObjectValue;
mutable VariantType m_Type;
void Convert(VariantType newType) const;
public:
Variant(void);
Variant(long value);
Variant(string value);
Variant(Object::Ptr value);
VariantType GetType(void) const;
long GetInteger(void) const;
string GetString(void) const;
Object::Ptr GetObject(void) const;
operator long(void) const;
operator string(void) const;
operator Object::Ptr(void) const;
};
}
#endif /* VARIANT_H */

View File

@ -46,114 +46,118 @@ void ConfigRpcComponent::Stop(void)
// TODO: implement
}
JsonRpcMessage::Ptr ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties)
JsonRpcRequest ConfigRpcComponent::MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties)
{
JsonRpcMessage::Ptr msg = make_shared<JsonRpcMessage>();
msg->SetVersion("2.0");
msg->SetMethod(method);
cJSON *params = msg->GetParams();
JsonRpcRequest msg;
msg.SetVersion("2.0");
msg.SetMethod(method);
string name = object->GetName();
cJSON_AddStringToObject(params, "name", name.c_str());
Message params;
msg.SetParams(params);
string type = object->GetType();
cJSON_AddStringToObject(params, "type", type.c_str());
params.GetDictionary()->SetValueString("name", object->GetName());
params.GetDictionary()->SetValueString("type", object->GetType());
if (includeProperties) {
cJSON *properties = cJSON_CreateObject();
cJSON_AddItemToObject(params, "properties", properties);
Message properties;
params.GetDictionary()->SetValueDictionary("properties", properties.GetDictionary());
for (ConfigObject::ParameterIterator pi = object->Properties.begin(); pi != object->Properties.end(); pi++) {
cJSON_AddStringToObject(properties, pi->first.c_str(), pi->second.c_str());
properties.GetDictionary()->SetValueString(pi->first, pi->second);
}
}
return msg;
}
int ConfigRpcComponent::FetchObjectsHandler(NewMessageEventArgs::Ptr ea)
int ConfigRpcComponent::FetchObjectsHandler(const NewRequestEventArgs& ea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->Source);
Endpoint::Ptr client = ea.Sender;
ConfigHive::Ptr configHive = GetIcingaApplication()->GetConfigHive();
for (ConfigHive::CollectionIterator ci = configHive->Collections.begin(); ci != configHive->Collections.end(); ci++) {
ConfigCollection::Ptr collection = ci->second;
for (ConfigCollection::ObjectIterator oi = collection->Objects.begin(); oi != collection->Objects.end(); oi++) {
JsonRpcMessage::Ptr msg = MakeObjectMessage(oi->second, "config::ObjectCreated", true);
client->SendMessage(msg);
client->ProcessRequest(m_ConfigRpcEndpoint, MakeObjectMessage(oi->second, "config::ObjectCreated", true));
}
}
return 0;
}
int ConfigRpcComponent::LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea)
int ConfigRpcComponent::LocalObjectCreatedHandler(const ConfigObjectEventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
int replicate = 0;
object->GetPropertyInteger("replicate", &replicate);
if (replicate) {
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->SendMessage(m_ConfigRpcEndpoint, Endpoint::Ptr(), MakeObjectMessage(object, "config::ObjectCreated", true));
mgr->SendMulticastRequest(m_ConfigRpcEndpoint, MakeObjectMessage(object, "config::ObjectCreated", true));
}
return 0;
}
int ConfigRpcComponent::LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea)
int ConfigRpcComponent::LocalObjectRemovedHandler(const ConfigObjectEventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
int replicate = 0;
object->GetPropertyInteger("replicate", &replicate);
if (replicate) {
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->SendMessage(m_ConfigRpcEndpoint, Endpoint::Ptr(), MakeObjectMessage(object, "config::ObjectRemoved", false));
mgr->SendMulticastRequest(m_ConfigRpcEndpoint, MakeObjectMessage(object, "config::ObjectRemoved", false));
}
return 0;
}
int ConfigRpcComponent::LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea)
int ConfigRpcComponent::LocalPropertyChangedHandler(const ConfigObjectEventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
int replicate = 0;
object->GetPropertyInteger("replicate", &replicate);
if (replicate) {
JsonRpcMessage::Ptr msg = MakeObjectMessage(object, "config::PropertyChanged", false);
cJSON *params = msg->GetParams();
JsonRpcRequest msg = MakeObjectMessage(object, "config::PropertyChanged", false);
Message params;
msg.SetParams(params);
cJSON *properties = cJSON_CreateObject();
cJSON_AddItemToObject(params, "properties", properties);
Message properties;
params.GetDictionary()->SetValueDictionary("properties", properties.GetDictionary());
string value;
object->GetProperty(ea->Property, &value);
object->GetProperty(ea.Property, &value);
cJSON_AddStringToObject(properties, ea->Property.c_str(), value.c_str());
properties.GetDictionary()->SetValueString(ea.Property, value);
EndpointManager::Ptr mgr = GetIcingaApplication()->GetEndpointManager();
mgr->SendMessage(m_ConfigRpcEndpoint, Endpoint::Ptr(), msg);
mgr->SendMulticastRequest(m_ConfigRpcEndpoint, msg);
}
return 0;
}
int ConfigRpcComponent::RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea)
int ConfigRpcComponent::RemoteObjectUpdatedHandler(const NewRequestEventArgs& ea)
{
JsonRpcMessage::Ptr message = ea->Message;
string name, type, value;
JsonRpcRequest message = ea.Request;
bool was_null = false;
if (!message->GetParamString("name", &name))
Message params;
if (!message.GetParams(&params))
return 0;
if (!message->GetParamString("type", &type))
string name;
if (!params.GetDictionary()->GetValueString("name", &name))
return 0;
string type;
if (!params.GetDictionary()->GetValueString("type", &type))
return 0;
ConfigHive::Ptr configHive = GetIcingaApplication()->GetConfigHive();
@ -164,15 +168,12 @@ int ConfigRpcComponent::RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea)
object = make_shared<ConfigObject>(type, name);
}
cJSON *properties = message->GetParam("properties");
Dictionary::Ptr properties;
if (!params.GetDictionary()->GetValueDictionary("properties", &properties))
return 0;
if (properties != NULL) {
for (cJSON *prop = properties->child; prop != NULL; prop = prop->next) {
if (prop->type != cJSON_String)
continue;
object->SetProperty(prop->string, prop->valuestring);
}
for (DictionaryIterator i = properties->Begin(); i != properties->End(); i++) {
object->SetProperty(i->first, i->second);
}
if (was_null)
@ -181,16 +182,20 @@ int ConfigRpcComponent::RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea)
return 0;
}
int ConfigRpcComponent::RemoteObjectRemovedHandler(NewMessageEventArgs::Ptr ea)
int ConfigRpcComponent::RemoteObjectRemovedHandler(const NewRequestEventArgs& ea)
{
JsonRpcRequest::Ptr message = ea->Message->Cast<JsonRpcRequest>();
Message::Ptr params = message->GetParams();
string name, type;
JsonRpcRequest message = ea.Request;
if (!message->GetParamString("name", &name))
Message params;
if (!message.GetParams(&params))
return 0;
if (!message->GetParamString("type", &type))
string name;
if (!params.GetDictionary()->GetValueString("name", &name))
return 0;
string type;
if (!params.GetDictionary()->GetValueString("type", &type))
return 0;
ConfigHive::Ptr configHive = GetIcingaApplication()->GetConfigHive();

View File

@ -11,16 +11,15 @@ private:
IcingaApplication::Ptr GetIcingaApplication(void);
int FetchObjectsHandler(NewMessageEventArgs::Ptr ea);
int LocalObjectCreatedHandler(const ConfigObjectEventArgs& ea);
int LocalObjectRemovedHandler(const ConfigObjectEventArgs& ea);
int LocalPropertyChangedHandler(const ConfigObjectEventArgs& ea);
int LocalObjectCreatedHandler(ConfigObjectEventArgs::Ptr ea);
int LocalObjectRemovedHandler(ConfigObjectEventArgs::Ptr ea);
int LocalPropertyChangedHandler(ConfigObjectEventArgs::Ptr ea);
int FetchObjectsHandler(const NewRequestEventArgs& ea);
int RemoteObjectUpdatedHandler(const NewRequestEventArgs& ea);
int RemoteObjectRemovedHandler(const NewRequestEventArgs& ea);
int RemoteObjectUpdatedHandler(NewMessageEventArgs::Ptr ea);
int RemoteObjectRemovedHandler(NewMessageEventArgs::Ptr ea);
JsonRpcMessage::Ptr MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
JsonRpcRequest MakeObjectMessage(const ConfigObject::Ptr& object, string method, bool includeProperties);
public:
virtual string GetName(void);

View File

@ -6,7 +6,6 @@ EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "jsonrpc", "jsonrpc\jsonrpc.vcxproj", "{8DD52FAC-ECEE-48C2-B266-E7C47ED485F8}"
ProjectSection(ProjectDependencies) = postProject
{66BED474-C33F-48F9-90BA-BBCFEDC006B8} = {66BED474-C33F-48F9-90BA-BBCFEDC006B8}
{4F00EE82-B829-4872-B8F0-C1A8D86C94B4} = {4F00EE82-B829-4872-B8F0-C1A8D86C94B4}
{9C92DA90-FD53-43A9-A244-90F2E8AF9677} = {9C92DA90-FD53-43A9-A244-90F2E8AF9677}
EndProjectSection
EndProject
@ -33,8 +32,6 @@ Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "configrpc", "components\con
{C1FC77E1-04A4-481B-A78B-2F7AF489C2F8} = {C1FC77E1-04A4-481B-A78B-2F7AF489C2F8}
EndProjectSection
EndProject
Project("{8BC9CEB8-8B4A-11D0-8D11-00A0C91BC942}") = "msgc", "msgc\msgc.vcxproj", "{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Win32 = Debug|Win32
@ -69,10 +66,6 @@ Global
{697C6D7E-3109-484C-A7AF-384D28711610}.Debug|Win32.Build.0 = Debug|Win32
{697C6D7E-3109-484C-A7AF-384D28711610}.Release|Win32.ActiveCfg = Release|Win32
{697C6D7E-3109-484C-A7AF-384D28711610}.Release|Win32.Build.0 = Release|Win32
{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Debug|Win32.ActiveCfg = Debug|Win32
{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Debug|Win32.Build.0 = Debug|Win32
{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Release|Win32.ActiveCfg = Release|Win32
{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}.Release|Win32.Build.0 = Release|Win32
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE

View File

@ -2,6 +2,16 @@
using namespace icinga;
EndpointManager::Ptr Endpoint::GetEndpointManager(void) const
{
return m_EndpointManager;
}
void Endpoint::SetEndpointManager(EndpointManager::Ptr manager)
{
m_EndpointManager = manager;
}
void Endpoint::RegisterMethodSink(string method)
{
m_MethodSinks.insert(method);
@ -12,11 +22,21 @@ void Endpoint::UnregisterMethodSink(string method)
m_MethodSinks.erase(method);
}
bool Endpoint::IsMethodSink(string method)
bool Endpoint::IsMethodSink(string method) const
{
return (m_MethodSinks.find(method) != m_MethodSinks.end());
}
void Endpoint::ForeachMethodSink(function<int (const NewMethodEventArgs&)> callback)
{
for (set<string>::iterator i = m_MethodSinks.begin(); i != m_MethodSinks.end(); i++) {
NewMethodEventArgs nmea;
nmea.Source = shared_from_this();
nmea.Method = *i;
callback(nmea);
}
}
void Endpoint::RegisterMethodSource(string method)
{
m_MethodSources.insert(method);
@ -27,7 +47,17 @@ void Endpoint::UnregisterMethodSource(string method)
m_MethodSources.erase(method);
}
bool Endpoint::IsMethodSource(string method)
bool Endpoint::IsMethodSource(string method) const
{
return (m_MethodSources.find(method) != m_MethodSinks.end());
}
void Endpoint::ForeachMethodSource(function<int (const NewMethodEventArgs&)> callback)
{
for (set<string>::iterator i = m_MethodSources.begin(); i != m_MethodSources.end(); i++) {
NewMethodEventArgs nmea;
nmea.Source = shared_from_this();
nmea.Method = *i;
callback(nmea);
}
}

View File

@ -6,26 +6,44 @@ namespace icinga
class EndpointManager;
struct I2_ICINGA_API NewMethodEventArgs : public EventArgs
{
string Method;
};
class I2_ICINGA_API Endpoint : public Object
{
private:
set<string> m_MethodSinks;
set<string> m_MethodSources;
shared_ptr<EndpointManager> m_EndpointManager;
public:
typedef shared_ptr<Endpoint> Ptr;
typedef weak_ptr<Endpoint> WeakPtr;
shared_ptr<EndpointManager> GetEndpointManager(void) const;
void SetEndpointManager(shared_ptr<EndpointManager> manager);
void RegisterMethodSink(string method);
void UnregisterMethodSink(string method);
bool IsMethodSink(string method);
bool IsMethodSink(string method) const;
void RegisterMethodSource(string method);
void UnregisterMethodSource(string method);
bool IsMethodSource(string method);
bool IsMethodSource(string method) const;
virtual void SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message) = 0;
virtual void SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message) = 0;
virtual bool IsLocal(void) const = 0;
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message) = 0;
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message) = 0;
Event<NewMethodEventArgs> OnNewMethodSink;
Event<NewMethodEventArgs> OnNewMethodSource;
void ForeachMethodSink(function<int (const NewMethodEventArgs&)> callback);
void ForeachMethodSource(function<int (const NewMethodEventArgs&)> callback);
};
}

View File

@ -23,14 +23,12 @@ void EndpointManager::AddListener(unsigned short port)
server->Start();
}
void EndpointManager::AddConnection(string host, short port)
void EndpointManager::AddConnection(string host, unsigned short port)
{
JsonRpcClient::Ptr client = make_shared<JsonRpcClient>();
RegisterClient(client);
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
RegisterEndpoint(endpoint);
client->MakeSocket();
client->Connect(host, port);
client->Start();
endpoint->Connect(host, port);
}
void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
@ -39,81 +37,32 @@ void EndpointManager::RegisterServer(JsonRpcServer::Ptr server)
server->OnNewClient += bind_weak(&EndpointManager::NewClientHandler, shared_from_this());
}
int EndpointManager::NewClientHandler(const NewClientEventArgs& ncea)
{
JsonRpcEndpoint::Ptr endpoint = make_shared<JsonRpcEndpoint>();
RegisterEndpoint(endpoint);
endpoint->SetClient(static_pointer_cast<JsonRpcClient>(ncea.Client));
return 0;
}
void EndpointManager::UnregisterServer(JsonRpcServer::Ptr server)
{
m_Servers.remove(server);
// TODO: unbind event
}
void EndpointManager::RegisterClient(JsonRpcClient::Ptr client)
{
m_Clients.push_front(client);
client->OnNewMessage += bind_weak(&EndpointManager::NewMessageHandler, shared_from_this());
client->OnClosed += bind_weak(&EndpointManager::CloseClientHandler, shared_from_this());
client->OnError += bind_weak(&EndpointManager::ErrorClientHandler, shared_from_this());
}
void EndpointManager::UnregisterClient(JsonRpcClient::Ptr client)
{
m_Clients.remove(client);
// TODO: unbind event
}
int EndpointManager::NewClientHandler(NewClientEventArgs::Ptr ncea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ncea->Client);
RegisterClient(client);
return 0;
}
int EndpointManager::CloseClientHandler(EventArgs::Ptr ea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->Source);
UnregisterClient(client);
if (client->GetPeerHost() != string()) {
Timer::Ptr timer = make_shared<Timer>();
timer->SetInterval(30);
timer->SetUserArgs(ea);
timer->OnTimerExpired += bind_weak(&EndpointManager::ReconnectClientHandler, shared_from_this());
timer->Start();
m_ReconnectTimers.push_front(timer);
}
return 0;
}
int EndpointManager::ErrorClientHandler(SocketErrorEventArgs::Ptr ea)
{
cout << "Error occured for JSON-RPC socket: Code=" << ea->Code << "; Message=" << ea->Message << endl;
return 0;
}
int EndpointManager::ReconnectClientHandler(TimerEventArgs::Ptr ea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea->UserArgs->Source);
Timer::Ptr timer = static_pointer_cast<Timer>(ea->Source);
AddConnection(client->GetPeerHost(), client->GetPeerPort());
timer->Stop();
m_ReconnectTimers.remove(timer);
return 0;
}
int EndpointManager::NewMessageHandler(NewMessageEventArgs::Ptr nmea)
{
// TODO: implement
return 0;
}
void EndpointManager::RegisterEndpoint(Endpoint::Ptr endpoint)
{
endpoint->SetEndpointManager(static_pointer_cast<EndpointManager>(shared_from_this()));
m_Endpoints.push_front(endpoint);
endpoint->OnNewMethodSink += bind_weak(&EndpointManager::NewMethodSinkHandler, shared_from_this());
endpoint->ForeachMethodSink(bind(&EndpointManager::NewMethodSinkHandler, this, _1));
endpoint->OnNewMethodSource += bind_weak(&EndpointManager::NewMethodSourceHandler, shared_from_this());
endpoint->ForeachMethodSource(bind(&EndpointManager::NewMethodSourceHandler, this, _1));
}
void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
@ -121,21 +70,21 @@ void EndpointManager::UnregisterEndpoint(Endpoint::Ptr endpoint)
m_Endpoints.remove(endpoint);
}
void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request)
void EndpointManager::SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
{
throw NotImplementedException();
}
void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request)
void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal)
{
#ifdef _DEBUG
string id;
if (request->GetID(&id))
if (request.GetID(&id))
throw InvalidArgumentException("Multicast requests must not have an ID.");
#endif /* _DEBUG */
string method;
if (!request->GetMethod(&method))
if (!request.GetMethod(&method))
throw InvalidArgumentException();
for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++)
@ -145,7 +94,58 @@ void EndpointManager::SendMulticastRequest(Endpoint::Ptr sender, JsonRpcRequest:
if (endpoint == sender)
continue;
/* send non-local messages to just the local endpoints */
if (!fromLocal && !endpoint->IsLocal())
continue;
if (endpoint->IsMethodSink(method))
endpoint->SendRequest(sender, request);
endpoint->ProcessRequest(sender, request);
}
}
int EndpointManager::NewMethodSinkHandler(const NewMethodEventArgs& ea)
{
Endpoint::Ptr sender = static_pointer_cast<Endpoint>(ea.Source);
if (!sender->IsLocal())
return 0;
JsonRpcRequest request;
request.SetVersion("2.0");
request.SetMethod("message::Subscribe");
Message params;
params.GetDictionary()->SetValueString("method", ea.Method);
request.SetParams(params);
SendMulticastRequest(sender, request);
return 0;
}
int EndpointManager::NewMethodSourceHandler(const NewMethodEventArgs& ea)
{
Endpoint::Ptr sender = static_pointer_cast<Endpoint>(ea.Source);
if (!sender->IsLocal())
return 0;
JsonRpcRequest request;
request.SetVersion("2.0");
request.SetMethod("message::Provide");
Message params;
params.GetDictionary()->SetValueString("method", ea.Method);
request.SetParams(params);
SendMulticastRequest(sender, request);
return 0;
}
void EndpointManager::ForeachEndpoint(function<int (Endpoint::Ptr)> callback)
{
for (list<Endpoint::Ptr>::iterator i = m_Endpoints.begin(); i != m_Endpoints.end(); i++) {
callback(*i);
}
}

View File

@ -7,23 +7,17 @@ namespace icinga
class I2_ICINGA_API EndpointManager : public Object
{
list<JsonRpcServer::Ptr> m_Servers;
list<JsonRpcClient::Ptr> m_Clients;
list<Timer::Ptr> m_ReconnectTimers;
list<Endpoint::Ptr> m_Endpoints;
string m_Identity;
int NewClientHandler(NewClientEventArgs::Ptr ncea);
int CloseClientHandler(EventArgs::Ptr ea);
int ErrorClientHandler(SocketErrorEventArgs::Ptr ea);
int ReconnectClientHandler(TimerEventArgs::Ptr ea);
int NewMessageHandler(NewMessageEventArgs::Ptr nmea);
void RegisterClient(JsonRpcClient::Ptr server);
void UnregisterClient(JsonRpcClient::Ptr server);
void RegisterServer(JsonRpcServer::Ptr server);
void UnregisterServer(JsonRpcServer::Ptr server);
int NewClientHandler(const NewClientEventArgs& ncea);
int NewMethodSinkHandler(const NewMethodEventArgs& ea);
int NewMethodSourceHandler(const NewMethodEventArgs& ea);
public:
typedef shared_ptr<EndpointManager> Ptr;
typedef weak_ptr<EndpointManager> WeakPtr;
@ -32,13 +26,15 @@ public:
string GetIdentity(void) const;
void AddListener(unsigned short port);
void AddConnection(string host, short port);
void AddConnection(string host, unsigned short port);
void RegisterEndpoint(Endpoint::Ptr endpoint);
void UnregisterEndpoint(Endpoint::Ptr endpoint);
void SendAnycastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request);
void SendMulticastRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request);
void SendAnycastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
void SendMulticastRequest(Endpoint::Ptr sender, const JsonRpcRequest& request, bool fromLocal = true);
void ForeachEndpoint(function<int (Endpoint::Ptr)> callback);
};
}

View File

@ -32,7 +32,7 @@ int IcingaApplication::Main(const vector<string>& args)
ConfigCollection::Ptr componentCollection = GetConfigHive()->GetCollection("component");
function<int (ConfigObjectEventArgs::Ptr)> NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
function<int (const ConfigObjectEventArgs&)> NewComponentHandler = bind_weak(&IcingaApplication::NewComponentHandler, shared_from_this());
componentCollection->OnObjectCreated += NewComponentHandler;
componentCollection->ForEachObject(NewComponentHandler);
@ -40,7 +40,7 @@ int IcingaApplication::Main(const vector<string>& args)
ConfigCollection::Ptr listenerCollection = GetConfigHive()->GetCollection("rpclistener");
function<int (ConfigObjectEventArgs::Ptr)> NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
function<int (const ConfigObjectEventArgs&)> NewRpcListenerHandler = bind_weak(&IcingaApplication::NewRpcListenerHandler, shared_from_this());
listenerCollection->OnObjectCreated += NewRpcListenerHandler;
listenerCollection->ForEachObject(NewRpcListenerHandler);
@ -48,7 +48,7 @@ int IcingaApplication::Main(const vector<string>& args)
ConfigCollection::Ptr connectionCollection = GetConfigHive()->GetCollection("rpcconnection");
function<int (ConfigObjectEventArgs::Ptr)> NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this());
function<int (const ConfigObjectEventArgs&)> NewRpcConnectionHandler = bind_weak(&IcingaApplication::NewRpcConnectionHandler, shared_from_this());
connectionCollection->OnObjectCreated += NewRpcConnectionHandler;
connectionCollection->ForEachObject(NewRpcConnectionHandler);
@ -61,11 +61,33 @@ int IcingaApplication::Main(const vector<string>& args)
ConfigCollection::Ptr collection = GetConfigHive()->GetCollection("rpclistener");
m_TestEndpoint = make_shared<VirtualEndpoint>();
m_EndpointManager->RegisterEndpoint(m_TestEndpoint);
m_TestEndpoint->RegisterMethodSink("test");
m_TestEndpoint->RegisterMethodSource("test");
m_TestTimer = make_shared<Timer>();
m_TestTimer->SetInterval(5);
m_TestTimer->OnTimerExpired += bind_weak(&IcingaApplication::TestTimerHandler, shared_from_this());
m_TestTimer->Start();
RunEventLoop();
return EXIT_SUCCESS;
}
int IcingaApplication::TestTimerHandler(const TimerEventArgs& tea)
{
cout << "Problem?" << endl;
JsonRpcRequest request;
request.SetVersion("2.0");
request.SetMethod("test");
m_EndpointManager->SendMulticastRequest(m_TestEndpoint, request);
return 0;
}
void IcingaApplication::PrintUsage(const string& programPath)
{
cout << "Syntax: " << programPath << " <config-file>" << endl;
@ -76,10 +98,10 @@ EndpointManager::Ptr IcingaApplication::GetEndpointManager(void)
return m_EndpointManager;
}
int IcingaApplication::NewComponentHandler(ConfigObjectEventArgs::Ptr ea)
int IcingaApplication::NewComponentHandler(const ConfigObjectEventArgs& ea)
{
string path;
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
if (!object->GetProperty("path", &path)) {
#ifdef _WIN32
@ -96,17 +118,18 @@ int IcingaApplication::NewComponentHandler(ConfigObjectEventArgs::Ptr ea)
return 0;
}
int IcingaApplication::DeletedComponentHandler(ConfigObjectEventArgs::Ptr ea)
int IcingaApplication::DeletedComponentHandler(const ConfigObjectEventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
UnloadComponent(object->GetName());
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
Component::Ptr component = GetComponent(object->GetName());
UnregisterComponent(component);
return 0;
}
int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
int IcingaApplication::NewRpcListenerHandler(const ConfigObjectEventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
int port;
if (!object->GetPropertyInteger("port", &port))
@ -119,16 +142,16 @@ int IcingaApplication::NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
return 0;
}
int IcingaApplication::DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea)
int IcingaApplication::DeletedRpcListenerHandler(const ConfigObjectEventArgs& ea)
{
throw Exception("Unsupported operation.");
return 0;
}
int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
int IcingaApplication::NewRpcConnectionHandler(const ConfigObjectEventArgs& ea)
{
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea->Source);
ConfigObject::Ptr object = static_pointer_cast<ConfigObject>(ea.Source);
string hostname;
int port;
@ -145,7 +168,7 @@ int IcingaApplication::NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
return 0;
}
int IcingaApplication::DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea)
int IcingaApplication::DeletedRpcConnectionHandler(const ConfigObjectEventArgs& ea)
{
throw Exception("Unsupported operation.");

View File

@ -8,16 +8,19 @@ class I2_ICINGA_API IcingaApplication : public Application
{
private:
EndpointManager::Ptr m_EndpointManager;
Timer::Ptr m_TestTimer;
VirtualEndpoint::Ptr m_TestEndpoint;
int NewComponentHandler(ConfigObjectEventArgs::Ptr ea);
int DeletedComponentHandler(ConfigObjectEventArgs::Ptr ea);
int NewComponentHandler(const ConfigObjectEventArgs& ea);
int DeletedComponentHandler(const ConfigObjectEventArgs& ea);
int NewRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
int DeletedRpcListenerHandler(ConfigObjectEventArgs::Ptr ea);
int NewRpcListenerHandler(const ConfigObjectEventArgs& ea);
int DeletedRpcListenerHandler(const ConfigObjectEventArgs& ea);
int NewRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
int DeletedRpcConnectionHandler(ConfigObjectEventArgs::Ptr ea);
int NewRpcConnectionHandler(const ConfigObjectEventArgs& ea);
int DeletedRpcConnectionHandler(const ConfigObjectEventArgs& ea);
int TestTimerHandler(const TimerEventArgs& tea);
public:
typedef shared_ptr<IcingaApplication> Ptr;
typedef weak_ptr<IcingaApplication> WeakPtr;

View File

@ -7,9 +7,66 @@ JsonRpcClient::Ptr JsonRpcEndpoint::GetClient(void)
return m_Client;
}
void JsonRpcEndpoint::Connect(string host, unsigned short port)
{
JsonRpcClient::Ptr client = make_shared<JsonRpcClient>();
client->MakeSocket();
client->Connect(host, port);
client->Start();
SetClient(client);
}
int JsonRpcEndpoint::SyncSubscription(string type, const NewMethodEventArgs& nmea)
{
JsonRpcRequest request;
request.SetVersion("2.0");
request.SetMethod(type);
Message params;
params.GetDictionary()->SetValueString("method", nmea.Method);
request.SetParams(params);
m_Client->SendMessage(request);
return 0;
}
int JsonRpcEndpoint::SyncSubscriptions(Endpoint::Ptr endpoint)
{
if (!endpoint->IsLocal())
return 0;
endpoint->ForeachMethodSink(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Subscribe", _1));
endpoint->ForeachMethodSource(bind(&JsonRpcEndpoint::SyncSubscription, this, "message::Provide", _1));
return 0;
}
void JsonRpcEndpoint::SetClient(JsonRpcClient::Ptr client)
{
m_Client = client;
client->OnNewMessage += bind_weak(&JsonRpcEndpoint::NewMessageHandler, shared_from_this());
client->OnClosed += bind_weak(&JsonRpcEndpoint::ClientClosedHandler, shared_from_this());
client->OnError += bind_weak(&JsonRpcEndpoint::ClientErrorHandler, shared_from_this());
NewMethodEventArgs nmea;
nmea.Source = shared_from_this();
nmea.Method = "message::Subscribe";
SyncSubscription("message::Subscribe", nmea);
SyncSubscription("message::Provide", nmea);
nmea.Method = "message::Provide";
SyncSubscription("message::Subscribe", nmea);
SyncSubscription("message::Provide", nmea);
GetEndpointManager()->ForeachEndpoint(bind(&JsonRpcEndpoint::SyncSubscriptions, this, _1));
}
bool JsonRpcEndpoint::IsLocal(void) const
{
return false;
}
bool JsonRpcEndpoint::IsConnected(void) const
@ -17,14 +74,99 @@ bool JsonRpcEndpoint::IsConnected(void) const
return (m_Client.get() != NULL);
}
void JsonRpcEndpoint::SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message)
void JsonRpcEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message)
{
if (IsConnected()) {
string id;
if (message.GetID(&id))
// TODO: remove calls after a certain timeout (and notify callers?)
m_PendingCalls[id] = sender;
m_Client->SendMessage(message);
}
}
void JsonRpcEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message)
{
if (IsConnected())
m_Client->SendMessage(message);
}
void JsonRpcEndpoint::SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message)
int JsonRpcEndpoint::NewMessageHandler(const NewMessageEventArgs& nmea)
{
if (IsConnected())
m_Client->SendMessage(message);
const Message& message = nmea.Message;
Endpoint::Ptr sender = static_pointer_cast<Endpoint>(shared_from_this());
string method;
if (message.GetDictionary()->GetValueString("method", &method)) {
JsonRpcRequest request = message;
Message params;
string method;
if (request.GetMethod(&method) && request.GetParams(&params) &&
(method == "message::Subscribe" || method == "message::Provide")) {
string sub_method;
if (params.GetDictionary()->GetValueString("method", &sub_method)) {
if (method == "message::Subscribe")
RegisterMethodSink(sub_method);
else
RegisterMethodSource(sub_method);
}
return 0;
}
string id;
if (request.GetID(&id))
GetEndpointManager()->SendAnycastRequest(sender, request, false);
else
GetEndpointManager()->SendMulticastRequest(sender, request, false);
} else {
// TODO: deal with response messages
throw NotImplementedException();
}
return 0;
}
int JsonRpcEndpoint::ClientClosedHandler(const EventArgs& ea)
{
m_PendingCalls.clear();
// TODO: clear method sources/sinks
if (m_Client->GetPeerHost() != string()) {
Timer::Ptr timer = make_shared<Timer>();
timer->SetInterval(30);
timer->SetUserArgs(ea);
timer->OnTimerExpired += bind_weak(&JsonRpcEndpoint::ClientReconnectHandler, shared_from_this());
timer->Start();
m_ReconnectTimer = timer;
}
m_Client.reset();
// TODO: persist events, etc., for now we just disable the endpoint
return 0;
}
int JsonRpcEndpoint::ClientErrorHandler(const SocketErrorEventArgs& ea)
{
cerr << "Error occured for JSON-RPC socket: Code=" << ea.Code << "; Message=" << ea.Message << endl;
return 0;
}
int JsonRpcEndpoint::ClientReconnectHandler(const TimerEventArgs& ea)
{
JsonRpcClient::Ptr client = static_pointer_cast<JsonRpcClient>(ea.UserArgs.Source);
Timer::Ptr timer = static_pointer_cast<Timer>(ea.Source);
m_Client = client;
timer->Stop();
m_ReconnectTimer.reset();
return 0;
}

View File

@ -8,17 +8,32 @@ class I2_ICINGA_API JsonRpcEndpoint : public Endpoint
{
private:
JsonRpcClient::Ptr m_Client;
map<string, Endpoint::Ptr> m_PendingCalls;
Timer::Ptr m_ReconnectTimer;
bool IsConnected(void) const;
int NewMessageHandler(const NewMessageEventArgs& nmea);
int ClientClosedHandler(const EventArgs& ea);
int ClientErrorHandler(const SocketErrorEventArgs& ea);
int ClientReconnectHandler(const TimerEventArgs& ea);
int SyncSubscription(string type, const NewMethodEventArgs& nmea);
int SyncSubscriptions(Endpoint::Ptr endpoint);
public:
JsonRpcEndpoint(void);
typedef shared_ptr<JsonRpcEndpoint> Ptr;
typedef weak_ptr<JsonRpcEndpoint> WeakPtr;
void Connect(string host, unsigned short port);
JsonRpcClient::Ptr GetClient(void);
void SetClient(JsonRpcClient::Ptr client);
virtual void SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message);
virtual void SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message);
virtual bool IsLocal(void) const;
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message);
};
}

View File

@ -2,14 +2,19 @@
using namespace icinga;
void VirtualEndpoint::RegisterMethodHandler(string method, function<int (NewRequestEventArgs::Ptr)> callback)
bool VirtualEndpoint::IsLocal(void) const
{
return true;
}
void VirtualEndpoint::RegisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback)
{
m_MethodHandlers[method] += callback;
RegisterMethodSink(method);
}
void VirtualEndpoint::UnregisterMethodHandler(string method, function<int (NewRequestEventArgs::Ptr)> callback)
void VirtualEndpoint::UnregisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback)
{
// TODO: implement
//m_MethodHandlers[method] -= callback;
@ -18,25 +23,25 @@ void VirtualEndpoint::UnregisterMethodHandler(string method, function<int (NewRe
throw NotImplementedException();
}
void VirtualEndpoint::SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr request)
void VirtualEndpoint::ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& request)
{
string method;
if (!request->GetMethod(&method))
if (!request.GetMethod(&method))
return;
map<string, Event<NewRequestEventArgs::Ptr> >::iterator i = m_MethodHandlers.find(method);
map<string, Event<NewRequestEventArgs> >::iterator i = m_MethodHandlers.find(method);
if (i == m_MethodHandlers.end())
throw InvalidArgumentException();
NewRequestEventArgs::Ptr nrea = make_shared<NewRequestEventArgs>();
nrea->Source = shared_from_this();
nrea->Sender = sender;
nrea->Request = request;
NewRequestEventArgs nrea;
nrea.Source = shared_from_this();
nrea.Sender = sender;
nrea.Request = request;
i->second(nrea);
}
void VirtualEndpoint::SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr response)
void VirtualEndpoint::ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& response)
{
// TODO: figure out which request this response belongs to and notify the caller
throw NotImplementedException();

View File

@ -4,32 +4,31 @@
namespace icinga
{
struct I2_JSONRPC_API NewRequestEventArgs : public EventArgs
struct I2_ICINGA_API NewRequestEventArgs : public EventArgs
{
typedef shared_ptr<NewRequestEventArgs> Ptr;
typedef weak_ptr<NewRequestEventArgs> WeakPtr;
Endpoint::Ptr Sender;
JsonRpcRequest::Ptr Request;
JsonRpcRequest Request;
};
class I2_ICINGA_API VirtualEndpoint : public Endpoint
{
private:
map< string, Event<NewRequestEventArgs::Ptr> > m_MethodHandlers;
map< string, Event<NewRequestEventArgs> > m_MethodHandlers;
public:
typedef shared_ptr<VirtualEndpoint> Ptr;
typedef weak_ptr<VirtualEndpoint> WeakPtr;
void RegisterMethodHandler(string method, function<int (NewRequestEventArgs::Ptr)> callback);
void UnregisterMethodHandler(string method, function<int (NewRequestEventArgs::Ptr)> callback);
void RegisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback);
void UnregisterMethodHandler(string method, function<int (const NewRequestEventArgs&)> callback);
virtual void RegisterMethodSource(string method);
virtual void UnregisterMethodSource(string method);
virtual bool IsLocal(void) const;
virtual void SendRequest(Endpoint::Ptr sender, JsonRpcRequest::Ptr message);
virtual void SendResponse(Endpoint::Ptr sender, JsonRpcResponse::Ptr message);
virtual void ProcessRequest(Endpoint::Ptr sender, const JsonRpcRequest& message);
virtual void ProcessResponse(Endpoint::Ptr sender, const JsonRpcResponse& message);
};
}

View File

@ -11,6 +11,8 @@
# define I2_JSONRPC_API I2_IMPORT
#endif /* I2_JSONRPC_BUILD */
#include "variant.h"
#include "dictionary.h"
#include "message.h"
#include "netstring.h"
#include "jsonrpcrequest.h"

View File

@ -27,34 +27,6 @@
<ClCompile Include="message.cpp" />
<ClCompile Include="netstring.cpp" />
</ItemGroup>
<ItemGroup>
<CustomBuild Include="jsonrpcrequest.message">
<FileType>Document</FileType>
<Command Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">"$(OutputPath)\msgc" %(Identity)</Command>
<Command Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">"$(OutputPath)\msgc" %(Identity)</Command>
<Message Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Compiling %(Identity)</Message>
<Message Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Compiling %(Identity)</Message>
<Outputs Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">%(Filename).cpp %(Filename).h</Outputs>
<Outputs Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">%(Filename).cpp %(Filename).h</Outputs>
<AdditionalInputs Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
</AdditionalInputs>
<AdditionalInputs Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
</AdditionalInputs>
</CustomBuild>
<CustomBuild Include="jsonrpcresponse.message">
<FileType>Document</FileType>
<Outputs Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">%(Filename).cpp %(Filename).h</Outputs>
<Outputs Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">%(Filename).cpp %(Filename).h</Outputs>
<Message Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Compiling %(Identity)</Message>
<Message Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Compiling %(Identity)</Message>
<Command Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">"$(OutputPath)\msgc" %(Identity)</Command>
<Command Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">"$(OutputPath)\msgc" %(Identity)</Command>
<AdditionalInputs Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
</AdditionalInputs>
<AdditionalInputs Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
</AdditionalInputs>
</CustomBuild>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{8DD52FAC-ECEE-48C2-B266-E7C47ED485F8}</ProjectGuid>
<Keyword>Win32Proj</Keyword>

View File

@ -9,30 +9,32 @@ void JsonRpcClient::Start(void)
OnDataAvailable += bind_weak(&JsonRpcClient::DataAvailableHandler, shared_from_this());
}
void JsonRpcClient::SendMessage(Message::Ptr message)
void JsonRpcClient::SendMessage(const Message& message)
{
Netstring::WriteMessageToFIFO(GetSendQueue(), message);
}
int JsonRpcClient::DataAvailableHandler(EventArgs::Ptr ea)
int JsonRpcClient::DataAvailableHandler(const EventArgs& ea)
{
Message::Ptr message;
Message message;
bool message_read;
while (true) {
try {
message = Netstring::ReadMessageFromFIFO(GetRecvQueue());
} catch (const exception&) {
message_read = Netstring::ReadMessageFromFIFO(GetRecvQueue(), &message);
} catch (const Exception& ex) {
cerr << "Exception while reading from JSON-RPC client: " << ex.GetMessage() << endl;
Close();
return 1;
}
if (message.get() == NULL)
if (!message_read)
break;
NewMessageEventArgs::Ptr nea = make_shared<NewMessageEventArgs>();
nea->Source = shared_from_this();
nea->Message = message;
NewMessageEventArgs nea;
nea.Source = shared_from_this();
nea.Message = message;
OnNewMessage(nea);
}

View File

@ -9,23 +9,23 @@ struct I2_JSONRPC_API NewMessageEventArgs : public EventArgs
typedef shared_ptr<NewMessageEventArgs> Ptr;
typedef weak_ptr<NewMessageEventArgs> WeakPtr;
Message::Ptr Message;
Message Message;
};
class I2_JSONRPC_API JsonRpcClient : public TCPClient
{
private:
int DataAvailableHandler(EventArgs::Ptr ea);
int DataAvailableHandler(const EventArgs& ea);
public:
typedef shared_ptr<JsonRpcClient> Ptr;
typedef weak_ptr<JsonRpcClient> WeakPtr;
void SendMessage(Message::Ptr message);
void SendMessage(const Message& message);
virtual void Start(void);
Event<NewMessageEventArgs::Ptr> OnNewMessage;
Event<NewMessageEventArgs> OnNewMessage;
};
}

View File

@ -0,0 +1,4 @@
#include "i2-jsonrpc.h"
#include "jsonrpcrequest.h"
using namespace icinga;

64
jsonrpc/jsonrpcrequest.h Normal file
View File

@ -0,0 +1,64 @@
#ifndef JSONRPCREQUEST_H
#define JSONRPCREQUEST_H
namespace icinga
{
class I2_JSONRPC_API JsonRpcRequest : public Message
{
public:
JsonRpcRequest(void) : Message() { }
JsonRpcRequest(const Message& message) : Message(message) { }
inline bool GetVersion(string *value) const
{
return GetDictionary()->GetValueString("jsonrpc", value);
}
inline void SetVersion(const string& value)
{
GetDictionary()->SetValueString("jsonrpc", value);
}
inline bool GetMethod(string *value) const
{
return GetDictionary()->GetValueString("method", value);
}
inline void SetMethod(const string& value)
{
GetDictionary()->SetValueString("method", value);
}
inline bool GetParams(Message *value) const
{
Dictionary::Ptr dictionary;
if (!GetDictionary()->GetValueDictionary("params", &dictionary))
return false;
*value = Message(dictionary);
return true;
}
inline void SetParams(const Message& value)
{
GetDictionary()->SetValueDictionary("params", value.GetDictionary());
}
inline bool GetID(string *value) const
{
return GetDictionary()->GetValueString("id", value);
}
inline void SetID(const string& value)
{
GetDictionary()->SetValueString("id", value);
}
};
}
#endif /* JSONRPCREQUEST_H */

View File

@ -0,0 +1,4 @@
#include "i2-jsonrpc.h"
#include "jsonrpcresponse.h"
using namespace icinga;

56
jsonrpc/jsonrpcresponse.h Normal file
View File

@ -0,0 +1,56 @@
#ifndef JSONRPCRESPONSE_H
#define JSONRPCRESPONSE_H
namespace icinga
{
class I2_JSONRPC_API JsonRpcResponse : public Message
{
public:
JsonRpcResponse(void) : Message() { }
JsonRpcResponse(const Message& message) : Message(message) { }
inline bool GetVersion(string *value) const
{
return GetDictionary()->GetValueString("jsonrpc", value);
}
inline void SetJsonRpc(const string& value)
{
GetDictionary()->SetValueString("jsonrpc", value);
}
bool GetResult(string *value) const
{
return GetDictionary()->GetValueString("result", value);
}
void SetResult(const string& value)
{
GetDictionary()->SetValueString("result", value);
}
bool GetError(string *value) const
{
return GetDictionary()->GetValueString("error", value);
}
void SetError(const string& value)
{
GetDictionary()->SetValueString("error", value);
}
bool GetID(string *value) const
{
return GetDictionary()->GetValueString("id", value);
}
void SetID(const string& value)
{
GetDictionary()->SetValueString("id", value);
}
};
}
#endif /* JSONRPCRESPONSE_H */

23
jsonrpc/message.cpp Normal file
View File

@ -0,0 +1,23 @@
#include "i2-jsonrpc.h"
using namespace icinga;
Message::Message(void)
{
m_Dictionary = make_shared<Dictionary>();
}
Message::Message(const Dictionary::Ptr& dictionary)
{
m_Dictionary = dictionary;
}
Message::Message(const Message& message)
{
m_Dictionary = message.GetDictionary();
}
Dictionary::Ptr Message::GetDictionary(void) const
{
return m_Dictionary;
}

22
jsonrpc/message.h Normal file
View File

@ -0,0 +1,22 @@
#ifndef MESSAGE_H
#define MESSAGE_H
namespace icinga
{
class I2_JSONRPC_API Message
{
private:
Dictionary::Ptr m_Dictionary;
public:
Message(void);
Message(const Dictionary::Ptr& dictionary);
Message(const Message& message);
Dictionary::Ptr GetDictionary(void) const;
};
}
#endif /* MESSAGE_H */

View File

@ -3,19 +3,72 @@
using namespace icinga;
Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json)
{
Dictionary::Ptr dictionary = make_shared<Dictionary>();
for (cJSON *i = json->child; i != NULL; i = i->next) {
switch (i->type) {
case cJSON_Number:
dictionary->SetValueInteger(i->string, i->valueint);
break;
case cJSON_String:
dictionary->SetValueString(i->string, i->valuestring);
break;
case cJSON_Object:
dictionary->SetValueDictionary(i->string, GetDictionaryFromJson(i));
break;
default:
break;
}
}
return dictionary;
}
cJSON *Netstring::GetJsonFromDictionary(const Dictionary::Ptr& dictionary)
{
cJSON *json;
string valueString;
Dictionary::Ptr valueDictionary;
json = cJSON_CreateObject();
for (DictionaryIterator i = dictionary->Begin(); i != dictionary->End(); i++) {
switch (i->second.GetType()) {
case VariantInteger:
cJSON_AddNumberToObject(json, i->first.c_str(), i->second.GetInteger());
break;
case VariantString:
valueString = i->second.GetString();
cJSON_AddStringToObject(json, i->first.c_str(), valueString.c_str());
break;
case VariantObject:
valueDictionary = dynamic_pointer_cast<Dictionary>(i->second.GetObject());
if (valueDictionary.get() != NULL)
cJSON_AddItemToObject(json, i->first.c_str(), GetJsonFromDictionary(valueDictionary));
default:
break;
}
}
return json;
}
/* based on https://github.com/PeterScott/netstring-c/blob/master/netstring.c */
Message::Ptr Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo)
bool Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo, Message *message)
{
size_t buffer_length = fifo->GetSize();
char *buffer = (char *)fifo->GetReadBuffer();
/* minimum netstring length is 3 */
if (buffer_length < 3)
return NULL;
return false;
/* no leading zeros allowed */
if (buffer[0] == '0' && isdigit(buffer[1]))
throw exception(/*"Invalid netstring (leading zero)"*/);
throw InvalidArgumentException("Invalid netstring (leading zero)");
size_t len, i;
@ -23,22 +76,22 @@ Message::Ptr Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo)
for (i = 0; i < buffer_length && isdigit(buffer[i]); i++) {
/* length specifier must have at most 9 characters */
if (i >= 9)
return NULL;
return false;
len = len * 10 + (buffer[i] - '0');
}
/* make sure the buffer is large enough */
if (i + len + 1 >= buffer_length)
return NULL;
return false;
/* check for the colon delimiter */
if (buffer[i++] != ':')
throw exception(/*"Invalid Netstring (missing :)"*/);
throw InvalidArgumentException("Invalid Netstring (missing :)");
/* check for the comma delimiter after the string */
if (buffer[i + len] != ',')
throw exception(/*"Invalid Netstring (missing ,)"*/);
throw InvalidArgumentException("Invalid Netstring (missing ,)");
/* nuke the comma delimiter */
buffer[i + len] = '\0';
@ -47,30 +100,34 @@ Message::Ptr Netstring::ReadMessageFromFIFO(FIFO::Ptr fifo)
if (object == NULL) {
/* restore the comma */
buffer[i + len] = ',';
throw exception(/*"Invalid JSON string"*/);
throw InvalidArgumentException("Invalid JSON string");
}
/* remove the data from the fifo */
fifo->Read(NULL, i + len + 1);
return make_shared<Message>(object);
*message = Message(GetDictionaryFromJson(object));
cJSON_Delete(object);
return true;
}
void Netstring::WriteMessageToFIFO(FIFO::Ptr fifo, Message::Ptr message)
void Netstring::WriteMessageToFIFO(FIFO::Ptr fifo, const Message& message)
{
char *json;
shared_ptr<cJSON> object = message->GetJson();
cJSON *object = GetJsonFromDictionary(message.GetDictionary());
size_t len;
#ifdef _DEBUG
json = cJSON_Print(object.get());
json = cJSON_Print(object);
#else /* _DEBUG */
json = cJSON_PrintUnformatted(object.get());
json = cJSON_PrintUnformatted(object);
#endif /* _DEBUG */
cJSON_Delete(object);
len = strlen(json);
char strLength[50];
sprintf(strLength, "%lu", (unsigned long)len);
sprintf(strLength, "%lu:", (unsigned long)len);
fifo->Write(strLength, strlen(strLength));
fifo->Write(json, len);

View File

@ -10,12 +10,15 @@ private:
size_t m_Length;
void *m_Data;
static Dictionary::Ptr Netstring::GetDictionaryFromJson(cJSON *json);
static cJSON *GetJsonFromDictionary(const Dictionary::Ptr& dictionary);
public:
typedef shared_ptr<Netstring> Ptr;
typedef weak_ptr<Netstring> WeakPtr;
static Message::Ptr ReadMessageFromFIFO(FIFO::Ptr fifo);
static void WriteMessageToFIFO(FIFO::Ptr fifo, Message::Ptr message);
static bool ReadMessageFromFIFO(FIFO::Ptr fifo, Message *message);
static void WriteMessageToFIFO(FIFO::Ptr fifo, const Message& message);
};
}

View File

@ -1,177 +0,0 @@
#include <cstdlib>
#include <cctype>
#include <iostream>
#include <algorithm>
#include <fstream>
#include <string>
using namespace std;
void trim(string& str, const char *whitespace = "\r\n\t ")
{
string::size_type pos;
pos = str.find_first_not_of(whitespace);
if (pos != string::npos)
str.erase(0, pos);
pos = str.find_last_not_of(whitespace);
if (pos != string::npos)
str.erase(pos + 1);
}
int main(int argc, char **argv)
{
if (argc < 2) {
cerr << "Syntax: " << argv[0] << " <file.message>" << endl;
return EXIT_FAILURE;
}
char *pos;
pos = strrchr(argv[1], '.');
if (pos == NULL || strcmp(pos, ".message") != 0) {
cerr << "Input filename must have the '.message' extension." << endl;
return EXIT_FAILURE;
}
char *headername, *implname;
headername = strdup(argv[1]);
strcpy(&(headername[pos - argv[1]]), ".h");
implname = strdup(argv[1]);
strcpy(&(implname[pos - argv[1]]), ".cpp");
fstream inputfp, headerfp, implfp;
inputfp.open(argv[1], fstream::in);
headerfp.open(headername, fstream::out | fstream::trunc);
implfp.open(implname, fstream::out | fstream::trunc);
string line;
string klass, klassupper, base;
bool hasclass = false;
while (true) {
getline(inputfp, line);
if (inputfp.fail())
break;
if (!hasclass) {
string::size_type index = line.find(':');
if (index == string::npos) {
cerr << "Must specify class and base name." << endl;
return EXIT_FAILURE;
}
klass = line.substr(0, index);
trim(klass);
klassupper = klass;
transform(klassupper.begin(), klassupper.end(), klassupper.begin(), toupper);
base = line.substr(index + 1);
trim(base);
cout << "Class: '" << klass << "' (inherits from: '" << base << "')" << endl;
headerfp << "#ifndef " << klassupper << "_H" << endl
<< "#define " << klassupper << "_H" << endl
<< endl
<< "namespace icinga" << endl
<< "{" << endl
<< endl
<< "class " << klass << " : public " << base << endl
<< "{" << endl
<< endl
<< "public:" << endl
<< "\ttypedef shared_ptr<" << klass << "> Ptr;" << endl
<< "\ttypedef weak_ptr<" << klass << "> WeakPtr;" << endl
<< endl
<< "\t" << klass << "(void) : " << base << "() { }" << endl
<< "\t" << klass << "(const Message::Ptr& message) : " << base << "(message) { }" << endl
<< endl;
implfp << "#include \"i2-jsonrpc.h\"" << endl
<< "#include \"" << headername << "\"" << endl
<< endl
<< "using namespace icinga;" << endl
<< endl;
hasclass = true;
} else {
string::size_type index = line.find(':');
if (index == string::npos) {
cerr << "Must specify type and property name." << endl;
return EXIT_FAILURE;
}
string prop = line.substr(0, index);
trim(prop);
string type = line.substr(index + 1);
trim(type);
string typeaccessor = type;
typeaccessor[0] = toupper(typeaccessor[0]);
string rawtype = type;
/* assume it's a reference type if we don't know the type */
if (type != "int" && type != "string") {
type = type + "::Ptr";
typeaccessor = "Message";
}
cout << "Property: '" << prop << "' (Type: '" << type << "')" << endl;
headerfp << endl
<< "\tbool Get" << prop << "(" << type << " *value);" << endl
<< "\tvoid Set" << prop << "(const " << type << "& value);" << endl;
implfp << "bool " << klass << "::Get" << prop << "(" << type << " *value)" << endl
<< "{" << endl;
if (typeaccessor == "Message") {
implfp << "\tMessage::Ptr message;" << endl
<< endl
<< "\tif (!GetProperty" << typeaccessor << "(\"" << prop << "\", &message))" << endl
<< "\treturn false;" << endl
<< endl
<< "\t*value = message->Cast<" + rawtype + ">();" << endl
<< "return true;" << endl
<< endl;
} else {
implfp << "\treturn GetProperty" << typeaccessor << "(\"" << prop << "\", value);" << endl;
}
implfp << "}" << endl
<< endl;
implfp << "void " << klass << "::Set" << prop << "(const " << type << "& value)" << endl
<< "{" << endl
<< "\tSetProperty" << typeaccessor << "(\"" << prop << "\", value);" << endl
<< "}" << endl
<< endl;
}
}
headerfp << endl
<< "};" << endl
<< endl
<< "}" << endl
<< endl
<< "#endif /* " << klassupper << "_H */" << endl;
inputfp.close();
headerfp.close();
implfp.close();
free(headername);
free(implname);
return EXIT_SUCCESS;
}

View File

@ -1,82 +0,0 @@
<?xml version="1.0" encoding="utf-8"?>
<Project DefaultTargets="Build" ToolsVersion="4.0" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
<ItemGroup Label="ProjectConfigurations">
<ProjectConfiguration Include="Debug|Win32">
<Configuration>Debug</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
<ProjectConfiguration Include="Release|Win32">
<Configuration>Release</Configuration>
<Platform>Win32</Platform>
</ProjectConfiguration>
</ItemGroup>
<PropertyGroup Label="Globals">
<ProjectGuid>{4F00EE82-B829-4872-B8F0-C1A8D86C94B4}</ProjectGuid>
<Keyword>Win32Proj</Keyword>
<RootNamespace>msgc</RootNamespace>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.Default.props" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>true</UseDebugLibraries>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'" Label="Configuration">
<ConfigurationType>Application</ConfigurationType>
<UseDebugLibraries>false</UseDebugLibraries>
<WholeProgramOptimization>true</WholeProgramOptimization>
<CharacterSet>Unicode</CharacterSet>
</PropertyGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.props" />
<ImportGroup Label="ExtensionSettings">
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<ImportGroup Label="PropertySheets" Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<Import Project="$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props" Condition="exists('$(UserRootDir)\Microsoft.Cpp.$(Platform).user.props')" Label="LocalAppDataPlatform" />
</ImportGroup>
<PropertyGroup Label="UserMacros" />
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<LinkIncremental>true</LinkIncremental>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<LinkIncremental>false</LinkIncremental>
</PropertyGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">
<ClCompile>
<PrecompiledHeader>
</PrecompiledHeader>
<WarningLevel>Level3</WarningLevel>
<Optimization>Disabled</Optimization>
<PreprocessorDefinitions>WIN32;_DEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
</Link>
</ItemDefinitionGroup>
<ItemDefinitionGroup Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">
<ClCompile>
<WarningLevel>Level3</WarningLevel>
<PrecompiledHeader>
</PrecompiledHeader>
<Optimization>MaxSpeed</Optimization>
<FunctionLevelLinking>true</FunctionLevelLinking>
<IntrinsicFunctions>true</IntrinsicFunctions>
<PreprocessorDefinitions>WIN32;NDEBUG;_CONSOLE;%(PreprocessorDefinitions)</PreprocessorDefinitions>
</ClCompile>
<Link>
<SubSystem>Console</SubSystem>
<GenerateDebugInformation>true</GenerateDebugInformation>
<EnableCOMDATFolding>true</EnableCOMDATFolding>
<OptimizeReferences>true</OptimizeReferences>
</Link>
</ItemDefinitionGroup>
<ItemGroup>
<ClCompile Include="msgc.cpp" />
</ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets">
</ImportGroup>
</Project>