Implemented support for binding methods to the ConnectionManager class

This commit is contained in:
Gunnar Beutner 2012-03-29 20:03:29 +02:00
parent 89ddfdd7e7
commit 1ebda64e75
11 changed files with 133 additions and 91 deletions

View File

@ -42,12 +42,12 @@ void Application::RunEventLoop(void)
int fd = socket->GetFD(); int fd = socket->GetFD();
if (socket->WantsToRead())
FD_SET(fd, &readfds);
if (socket->WantsToWrite()) if (socket->WantsToWrite())
FD_SET(fd, &writefds); FD_SET(fd, &writefds);
if (socket->WantsToRead())
FD_SET(fd, &readfds);
FD_SET(fd, &exceptfds); FD_SET(fd, &exceptfds);
if (fd > nfds) if (fd > nfds)
@ -96,12 +96,12 @@ void Application::RunEventLoop(void)
int fd = socket->GetFD(); int fd = socket->GetFD();
if (FD_ISSET(fd, &readfds))
socket->OnReadable(ea);
if (FD_ISSET(fd, &writefds)) if (FD_ISSET(fd, &writefds))
socket->OnWritable(ea); socket->OnWritable(ea);
if (FD_ISSET(fd, &readfds))
socket->OnReadable(ea);
if (FD_ISSET(fd, &exceptfds)) if (FD_ISSET(fd, &exceptfds))
socket->OnException(ea); socket->OnException(ea);
} }

View File

@ -75,9 +75,10 @@ size_t FIFO::Read(void *buffer, size_t count)
return count; return count;
} }
void *FIFO::GetWriteBuffer(size_t count) void *FIFO::GetWriteBuffer(size_t *count)
{ {
ResizeBuffer(m_Offset + m_DataSize + count); ResizeBuffer(m_Offset + m_DataSize + *count);
*count = m_AllocSize - m_Offset - m_DataSize;
return m_Buffer + m_Offset + m_DataSize; return m_Buffer + m_Offset + m_DataSize;
} }
@ -85,7 +86,8 @@ void *FIFO::GetWriteBuffer(size_t count)
size_t FIFO::Write(const void *buffer, size_t count) size_t FIFO::Write(const void *buffer, size_t count)
{ {
if (buffer != NULL) { if (buffer != NULL) {
void *target_buffer = GetWriteBuffer(count); size_t bufferSize = count;
void *target_buffer = GetWriteBuffer(&bufferSize);
memcpy(target_buffer, buffer, count); memcpy(target_buffer, buffer, count);
} }

View File

@ -27,7 +27,7 @@ public:
size_t GetSize(void) const; size_t GetSize(void) const;
const void *GetReadBuffer(void) const; const void *GetReadBuffer(void) const;
void *GetWriteBuffer(size_t count); void *GetWriteBuffer(size_t *count);
size_t Read(void *buffer, size_t count); size_t Read(void *buffer, size_t count);
size_t Write(const void *buffer, size_t count); size_t Write(const void *buffer, size_t count);

View File

@ -31,35 +31,26 @@ FIFO::RefType TCPClient::GetRecvQueue(void)
int TCPClient::ReadableEventHandler(EventArgs::RefType ea) int TCPClient::ReadableEventHandler(EventArgs::RefType ea)
{ {
int read_total, rc; int rc;
read_total = 0; size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
while (true) { rc = recv(GetFD(), buffer, bufferSize, 0);
static const size_t BufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)m_RecvQueue->GetWriteBuffer(BufferSize);
rc = recv(GetFD(), buffer, BufferSize, 0);
#ifdef _WIN32 #ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK) if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */ #else /* _WIN32 */
if (rc < 0 && errno == EAGAIN) if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */ #endif /* _WIN32 */
break; return 0;
if (rc <= 0) { if (rc <= 0) {
Close(); Close();
return 0; return 0;
}
m_RecvQueue->Write(NULL, rc);
read_total += rc;
/* make sure we don't starve other sockets */
if (read_total > 128 * 1024)
break;
} }
m_RecvQueue->Write(NULL, rc);
EventArgs::RefType dea = new_object<EventArgs>(); EventArgs::RefType dea = new_object<EventArgs>();
dea->Source = shared_from_this(); dea->Source = shared_from_this();
OnDataAvailable(dea); OnDataAvailable(dea);

View File

@ -5,6 +5,7 @@
#include <unistd.h> #include <unistd.h>
#include <sys/types.h> #include <sys/types.h>
#include <sys/socket.h> #include <sys/socket.h>
#include <sys/ioctl.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <arpa/inet.h> #include <arpa/inet.h>

View File

@ -1,6 +1,8 @@
#include "i2-jsonrpc.h" #include "i2-jsonrpc.h"
using namespace icinga; using namespace icinga;
using std::map;
using std::function;
void ConnectionManager::BindServer(JsonRpcServer::RefType server) void ConnectionManager::BindServer(JsonRpcServer::RefType server)
{ {
@ -43,7 +45,41 @@ int ConnectionManager::CloseClientHandler(EventArgs::RefType ea)
int ConnectionManager::NewMessageHandler(NewMessageEventArgs::RefType nmea) int ConnectionManager::NewMessageHandler(NewMessageEventArgs::RefType nmea)
{ {
OnNewMessage(nmea); JsonRpcMessage::RefType request = nmea->Message;
JsonRpcClient::RefType client = static_pointer_cast<JsonRpcClient>(nmea->Source);
map<string, event<NewMessageEventArgs::RefType> >::iterator i;
i = m_Methods.find(request->GetMethod());
if (i == m_Methods.end()) {
JsonRpcMessage::RefType response = new_object<JsonRpcMessage>();
response->SetVersion("2.0");
response->SetError("Unknown method.");
response->SetID(request->GetID());
Netstring::WriteJSONToFIFO(client->GetSendQueue(), response->GetJSON());
return 0;
}
i->second(nmea);
return 0; return 0;
} }
void ConnectionManager::RegisterMethod(string method, function<int (NewMessageEventArgs::RefType)> callback)
{
map<string, event<NewMessageEventArgs::RefType> >::iterator i;
i = m_Methods.find(method);
if (i == m_Methods.end()) {
m_Methods[method] = event<NewMessageEventArgs::RefType>();
i = m_Methods.find(method);
}
i->second.bind(callback);
}
void ConnectionManager::UnregisterMethod(string method, function<int (NewMessageEventArgs::RefType)> function)
{
// TODO: implement
}

View File

@ -4,10 +4,13 @@
namespace icinga namespace icinga
{ {
using std::map;
class ConnectionManager : public Object class ConnectionManager : public Object
{ {
list<JsonRpcServer::RefType> m_Servers; list<JsonRpcServer::RefType> m_Servers;
list<JsonRpcClient::RefType> m_Clients; list<JsonRpcClient::RefType> m_Clients;
map< string, event<NewMessageEventArgs::RefType> > m_Methods;
int NewClientHandler(NewClientEventArgs::RefType ncea); int NewClientHandler(NewClientEventArgs::RefType ncea);
int CloseClientHandler(EventArgs::RefType ea); int CloseClientHandler(EventArgs::RefType ea);
@ -23,7 +26,8 @@ public:
void BindClient(JsonRpcClient::RefType client); void BindClient(JsonRpcClient::RefType client);
void UnbindClient(JsonRpcClient::RefType client); void UnbindClient(JsonRpcClient::RefType client);
event<NewMessageEventArgs::RefType> OnNewMessage; void RegisterMethod(string method, function<int (NewMessageEventArgs::RefType)> function);
void UnregisterMethod(string method, function<int (NewMessageEventArgs::RefType)> function);
}; };
} }

View File

@ -1,6 +1,7 @@
#ifndef I2_JSONRPC_H #ifndef I2_JSONRPC_H
#define I2_JSONRPC_H #define I2_JSONRPC_H
#include <map>
#include <i2-base.h> #include <i2-base.h>
#include "cJSON.h" #include "cJSON.h"

View File

@ -23,27 +23,47 @@ cJSON *JsonRpcMessage::GetJSON(void)
return m_JSON; return m_JSON;
} }
void JsonRpcMessage::SetFieldString(const char *field, const string& value) void JsonRpcMessage::InitJson(void)
{ {
if (m_JSON == NULL) if (m_JSON == NULL)
m_JSON = cJSON_CreateObject(); m_JSON = cJSON_CreateObject();
}
void JsonRpcMessage::SetFieldObject(const char *field, cJSON *object)
{
if (m_JSON == NULL && object == NULL)
return;
InitJson();
cJSON *object = cJSON_CreateString(value.c_str());
cJSON_DeleteItemFromObject(m_JSON, field); cJSON_DeleteItemFromObject(m_JSON, field);
cJSON_AddItemToObject(m_JSON, field, object);
if (object != NULL)
cJSON_AddItemToObject(m_JSON, field, object);
}
cJSON *JsonRpcMessage::GetFieldObject(const char *field)
{
if (m_JSON == NULL)
return NULL;
return cJSON_GetObjectItem(m_JSON, field);
}
void JsonRpcMessage::SetFieldString(const char *field, const string& value)
{
cJSON *object = cJSON_CreateString(value.c_str());
SetFieldObject(field, object);
} }
string JsonRpcMessage::GetFieldString(const char *field) string JsonRpcMessage::GetFieldString(const char *field)
{ {
if (m_JSON == NULL) cJSON *object = GetFieldObject(field);
m_JSON = cJSON_CreateObject();
cJSON *idObject = cJSON_GetObjectItem(m_JSON, field); if (object == NULL || object->type != cJSON_String)
if (idObject == NULL || idObject->type != cJSON_String)
return string(); return string();
return string(idObject->valuestring); return string(object->valuestring);
} }
void JsonRpcMessage::SetVersion(const string& version) void JsonRpcMessage::SetVersion(const string& version)
@ -76,24 +96,38 @@ string JsonRpcMessage::GetMethod(void)
return GetFieldString("method"); return GetFieldString("method");
} }
void JsonRpcMessage::SetParams(const string& params) void JsonRpcMessage::ClearParams(void)
{ {
SetFieldString("params", params); SetFieldObject("params", NULL);
} }
string JsonRpcMessage::GetParams(void) cJSON *JsonRpcMessage::GetParams(void)
{ {
return GetFieldString("params"); cJSON *object = GetFieldObject("params");
if (object == NULL) {
object = cJSON_CreateObject();
cJSON_AddItemToObject(m_JSON, "params", object);
}
return object;
} }
void JsonRpcMessage::SetResult(const string& result) void JsonRpcMessage::ClearResult(void)
{ {
SetFieldString("result", result); SetFieldObject("result", NULL);
} }
string JsonRpcMessage::GetResult(void) cJSON *JsonRpcMessage::GetResult(void)
{ {
return GetFieldString("result"); cJSON *object = GetFieldObject("result");
if (object == NULL) {
object = cJSON_CreateObject();
cJSON_AddItemToObject(m_JSON, "result", object);
}
return object;
} }
void JsonRpcMessage::SetError(const string& error) void JsonRpcMessage::SetError(const string& error)

View File

@ -9,9 +9,15 @@ class JsonRpcMessage : public Object
private: private:
cJSON *m_JSON; cJSON *m_JSON;
void InitJson(void);
void SetFieldString(const char *field, const string& value); void SetFieldString(const char *field, const string& value);
string GetFieldString(const char *field); string GetFieldString(const char *field);
void ClearField(const char *field);
void SetFieldObject(const char *field, cJSON *object);
cJSON *GetFieldObject(const char *field);
public: public:
typedef shared_ptr<JsonRpcMessage> RefType; typedef shared_ptr<JsonRpcMessage> RefType;
typedef weak_ptr<JsonRpcMessage> WeakRefType; typedef weak_ptr<JsonRpcMessage> WeakRefType;
@ -31,11 +37,11 @@ public:
void SetMethod(const string& method); void SetMethod(const string& method);
string GetMethod(void); string GetMethod(void);
void SetParams(const string& params); void ClearParams(void);
string GetParams(void); cJSON *GetParams(void);
void SetResult(const string& result); void ClearResult();
string GetResult(void); cJSON *GetResult(void);
void SetError(const string& error); void SetError(const string& error);
string GetError(void); string GetError(void);

View File

@ -10,37 +10,19 @@ using std::endl;
class MyApplication : public Application class MyApplication : public Application
{ {
private:
int m_Foo;
public: public:
typedef shared_ptr<MyApplication> RefType; typedef shared_ptr<MyApplication> RefType;
typedef weak_ptr<MyApplication> WeakRefType; typedef weak_ptr<MyApplication> WeakRefType;
MyApplication(void)
{
m_Foo = 0;
}
int Main(const vector<string>& args) int Main(const vector<string>& args)
{ {
/*FIFO::RefType f = new_object<FIFO>();
f->Write("12:Hello World!,", 16);
Netstring::RefType ns = new_object<Netstring>();
ns->ReadFromFIFO(f);
Timer::RefType t = new_object<Timer>();
t->SetInterval(2);
t->OnTimerExpired.bind(bind_weak(&MyApplication::TimerCallback, shared_from_this()));
t->Start();*/
JsonRpcServer::RefType ts = new_object<JsonRpcServer>(); JsonRpcServer::RefType ts = new_object<JsonRpcServer>();
ts->MakeSocket(); ts->MakeSocket();
ts->Bind(7777); ts->Bind(7777);
ts->Listen(); ts->Listen();
ConnectionManager::RefType cm = new_object<ConnectionManager>(); ConnectionManager::RefType cm = new_object<ConnectionManager>();
cm->OnNewMessage.bind(bind_weak(&MyApplication::MessageHandler, shared_from_this())); cm->RegisterMethod("HelloWorld", bind_weak(&MyApplication::HelloWorld, shared_from_this()));
cm->BindServer(ts); cm->BindServer(ts);
RunEventLoop(); RunEventLoop();
@ -48,7 +30,7 @@ public:
return 0; return 0;
} }
int MessageHandler(NewMessageEventArgs::RefType nea) int HelloWorld(NewMessageEventArgs::RefType nea)
{ {
JsonRpcClient::RefType client = static_pointer_cast<JsonRpcClient>(nea->Source); JsonRpcClient::RefType client = static_pointer_cast<JsonRpcClient>(nea->Source);
JsonRpcMessage::RefType msg = nea->Message; JsonRpcMessage::RefType msg = nea->Message;
@ -56,27 +38,12 @@ public:
JsonRpcMessage::RefType response = new_object<JsonRpcMessage>(); JsonRpcMessage::RefType response = new_object<JsonRpcMessage>();
response->SetVersion("2.0"); response->SetVersion("2.0");
response->SetID(msg->GetID()); response->SetID(msg->GetID());
response->SetResult("moo"); cJSON *result = response->GetResult();
cJSON_AddStringToObject(result, "greeting", "Hello World!");
client->SendMessage(response); client->SendMessage(response);
return 0; return 0;
} }
int TimerCallback(TimerEventArgs::RefType tda)
{
Timer::RefType t = static_pointer_cast<Timer>(tda->Source);
m_Foo++;
printf("Hello World!\n");
if (m_Foo >= 5) {
t->Stop();
Shutdown();
}
return 0;
}
}; };
SET_START_CLASS(MyApplication); SET_START_CLASS(MyApplication);