Refactor the socket subsystem.

This commit is contained in:
Gunnar Beutner 2013-04-04 16:08:02 +02:00
parent ca4157ea24
commit 09f395a7de
33 changed files with 622 additions and 1341 deletions

View File

@ -22,8 +22,6 @@ liblivestatus_la_SOURCES = \
commentstable.h \ commentstable.h \
component.cpp \ component.cpp \
component.h \ component.h \
connection.cpp \
connection.h \
contactgroupstable.cpp \ contactgroupstable.cpp \
contactgroupstable.h \ contactgroupstable.h \
contactstable.cpp \ contactstable.cpp \

View File

@ -21,6 +21,7 @@
#include "base/dynamictype.h" #include "base/dynamictype.h"
#include "base/logger_fwd.h" #include "base/logger_fwd.h"
#include "base/tcpsocket.h" #include "base/tcpsocket.h"
#include "base/networkstream.h"
#include "base/application.h" #include "base/application.h"
#include <boost/smart_ptr/make_shared.hpp> #include <boost/smart_ptr/make_shared.hpp>
@ -48,10 +49,10 @@ void LivestatusComponent::Start(void)
socket->Bind("6558", AF_INET); socket->Bind("6558", AF_INET);
//#endif /* _WIN32 */ //#endif /* _WIN32 */
socket->OnNewClient.connect(boost::bind(&LivestatusComponent::NewClientHandler, this, _2));
socket->Listen();
socket->Start();
m_Listener = socket; m_Listener = socket;
boost::thread thread(boost::bind(&LivestatusComponent::ServerThreadProc, this, socket));
thread.detach();
} }
String LivestatusComponent::GetSocketPath(void) const String LivestatusComponent::GetSocketPath(void) const
@ -63,21 +64,40 @@ String LivestatusComponent::GetSocketPath(void) const
return socketPath; return socketPath;
} }
void LivestatusComponent::NewClientHandler(const Socket::Ptr& client) void LivestatusComponent::ServerThreadProc(const Socket::Ptr& server)
{ {
Log(LogInformation, "livestatus", "Client connected"); server->Listen();
LivestatusConnection::Ptr lconnection = boost::make_shared<LivestatusConnection>(client); for (;;) {
lconnection->OnClosed.connect(boost::bind(&LivestatusComponent::ClientClosedHandler, this, _1)); Socket::Ptr client = server->Accept();
m_Connections.insert(lconnection); Log(LogInformation, "livestatus", "Client connected");
client->Start();
boost::thread thread(boost::bind(&LivestatusComponent::ClientThreadProc, this, client));
thread.detach();
}
} }
void LivestatusComponent::ClientClosedHandler(const Connection::Ptr& connection) void LivestatusComponent::ClientThreadProc(const Socket::Ptr& client)
{ {
LivestatusConnection::Ptr lconnection = static_pointer_cast<LivestatusConnection>(connection); Stream::Ptr stream = boost::make_shared<NetworkStream>(client);
Log(LogInformation, "livestatus", "Client disconnected"); for (;;) {
m_Connections.erase(lconnection); String line;
bool read_line = false;
std::vector<String> lines;
while (stream->ReadLine(&line)) {
read_line = true;
if (line.GetLength() > 0)
lines.push_back(line);
else
break;
}
Query::Ptr query = boost::make_shared<Query>(lines);
query->Execute(stream);
}
} }

View File

@ -20,7 +20,7 @@
#ifndef LIVESTATUSCOMPONENT_H #ifndef LIVESTATUSCOMPONENT_H
#define LIVESTATUSCOMPONENT_H #define LIVESTATUSCOMPONENT_H
#include "livestatus/connection.h" #include "livestatus/query.h"
#include "base/dynamicobject.h" #include "base/dynamicobject.h"
#include "base/socket.h" #include "base/socket.h"
@ -45,10 +45,9 @@ private:
Attribute<String> m_SocketPath; Attribute<String> m_SocketPath;
Socket::Ptr m_Listener; Socket::Ptr m_Listener;
std::set<LivestatusConnection::Ptr> m_Connections;
void NewClientHandler(const Socket::Ptr& client); void ServerThreadProc(const Socket::Ptr& server);
void ClientClosedHandler(const Connection::Ptr& connection); void ClientThreadProc(const Socket::Ptr& client);
}; };
} }

View File

@ -1,57 +0,0 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "livestatus/connection.h"
#include "livestatus/query.h"
#include <boost/smart_ptr/make_shared.hpp>
using namespace icinga;
using namespace livestatus;
LivestatusConnection::LivestatusConnection(const Stream::Ptr& stream)
: Connection(stream)
{ }
void LivestatusConnection::ProcessData(void)
{
String line;
bool read_line = false;
while (GetStream()->ReadLine(&line)) {
read_line = true;
if (line.GetLength() > 0)
m_Lines.push_back(line);
else
break;
}
/* Return if we didn't at least read one line. */
if (!read_line)
return;
/* Return if we haven't found the end of the query. */
if (line.GetLength() > 0 && !GetStream()->IsReadEOF())
return;
Query::Ptr query = boost::make_shared<Query>(m_Lines);
m_Lines.clear();
query->Execute(GetStream());
}

View File

@ -14,8 +14,8 @@ libbase_la_SOURCES = \
array.h \ array.h \
attribute.cpp \ attribute.cpp \
attribute.h \ attribute.h \
connection.cpp \ bufferedstream.cpp \
connection.h \ bufferedstream.h \
convert.cpp \ convert.cpp \
convert.h \ convert.h \
dictionary.cpp \ dictionary.cpp \
@ -34,6 +34,8 @@ libbase_la_SOURCES = \
logger_fwd.h \ logger_fwd.h \
netstring.cpp \ netstring.cpp \
netstring.h \ netstring.h \
networkstream.cpp \
networkstream.h \
object.cpp \ object.cpp \
object.h \ object.h \
objectlock.cpp \ objectlock.cpp \

133
lib/base/bufferedstream.cpp Normal file
View File

@ -0,0 +1,133 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "base/bufferedstream.h"
#include "base/objectlock.h"
#include "base/utility.h"
#include "base/logger_fwd.h"
#include <boost/smart_ptr/make_shared.hpp>
#include <sstream>
using namespace icinga;
BufferedStream::BufferedStream(const Stream::Ptr& innerStream)
: m_InnerStream(innerStream), m_RecvQ(boost::make_shared<FIFO>()), m_SendQ(boost::make_shared<FIFO>())
{
boost::thread readThread(boost::bind(&BufferedStream::ReadThreadProc, this));
readThread.detach();
boost::thread writeThread(boost::bind(&BufferedStream::WriteThreadProc, this));
writeThread.detach();
}
void BufferedStream::ReadThreadProc(void)
{
char buffer[512];
try {
for (;;) {
size_t rc = m_InnerStream->Read(buffer, sizeof(buffer));
if (rc == 0)
break;
boost::mutex::scoped_lock lock(m_Mutex);
m_RecvQ->Write(buffer, rc);
m_ReadCV.notify_all();
}
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error for buffered stream (Read): " << boost::diagnostic_information(ex);
Log(LogWarning, "base", msgbuf.str());
Close();
}
}
void BufferedStream::WriteThreadProc(void)
{
char buffer[512];
try {
for (;;) {
size_t rc;
{
boost::mutex::scoped_lock lock(m_Mutex);
while (m_SendQ->GetAvailableBytes() == 0)
m_WriteCV.wait(lock);
rc = m_SendQ->Read(buffer, sizeof(buffer));
}
m_InnerStream->Write(buffer, rc);
}
} catch (const std::exception& ex) {
std::ostringstream msgbuf;
msgbuf << "Error for buffered stream (Write): " << boost::diagnostic_information(ex);
Log(LogWarning, "base", msgbuf.str());
Close();
}
}
void BufferedStream::Close(void)
{
m_InnerStream->Close();
}
/**
* Reads data from the stream.
*
* @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read.
*/
size_t BufferedStream::Read(void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_RecvQ->Read(buffer, count);
}
/**
* Writes data to the stream.
*
* @param buffer The data that is to be written.
* @param count The number of bytes to write.
* @returns The number of bytes written
*/
void BufferedStream::Write(const void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_SendQ->Write(buffer, count);
m_WriteCV.notify_all();
}
void BufferedStream::WaitReadable(size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
while (m_RecvQ->GetAvailableBytes() < count)
m_ReadCV.wait(lock);
}
void BufferedStream::WaitWritable(size_t count)
{ /* Nothing to do here. */ }

View File

@ -17,38 +17,53 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#ifndef JSONRPCCONNECTION_H #ifndef BUFFEREDSTREAM_H
#define JSONRPCCONNECTION_H #define BUFFEREDSTREAM_H
#include "remoting/i2-remoting.h" #include "base/i2-base.h"
#include "remoting/messagepart.h" #include "base/stream.h"
#include "base/connection.h" #include "base/fifo.h"
#include <boost/signals2.hpp>
namespace icinga namespace icinga
{ {
/** /**
* A JSON-RPC connection. * A buffered stream.
* *
* @ingroup remoting * @ingroup base
*/ */
class I2_REMOTING_API JsonRpcConnection : public Connection class I2_BASE_API BufferedStream : public Stream
{ {
public: public:
typedef shared_ptr<JsonRpcConnection> Ptr; typedef shared_ptr<BufferedStream> Ptr;
typedef weak_ptr<JsonRpcConnection> WeakPtr; typedef weak_ptr<BufferedStream> WeakPtr;
explicit JsonRpcConnection(const Stream::Ptr& stream); BufferedStream(const Stream::Ptr& innerStream);
void SendMessage(const MessagePart& message); virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
boost::signals2::signal<void (const JsonRpcConnection::Ptr&, const MessagePart&)> OnNewMessage; virtual void Close(void);
protected: void WaitReadable(size_t count);
virtual void ProcessData(void); void WaitWritable(size_t count);
private:
Stream::Ptr m_InnerStream;
FIFO::Ptr m_RecvQ;
FIFO::Ptr m_SendQ;
boost::exception_ptr m_Exception;
boost::mutex m_Mutex;
boost::condition_variable m_ReadCV;
boost::condition_variable m_WriteCV;
void ReadThreadProc(void);
void WriteThreadProc(void);
}; };
} }
#endif /* JSONRPCCONNECTION_H */ #endif /* BUFFEREDSTREAM_H */

View File

@ -446,7 +446,6 @@ void DynamicObject::DumpObjects(const String& filename)
BOOST_THROW_EXCEPTION(std::runtime_error("Could not open '" + filename + "' file")); BOOST_THROW_EXCEPTION(std::runtime_error("Could not open '" + filename + "' file"));
StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false); StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
sfp->Start();
BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) { BOOST_FOREACH(const DynamicType::Ptr& type, DynamicType::GetTypes()) {
BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) { BOOST_FOREACH(const DynamicObject::Ptr& object, type->GetObjects()) {
@ -503,7 +502,6 @@ void DynamicObject::RestoreObjects(const String& filename)
fp.open(filename.CStr(), std::ios_base::in); fp.open(filename.CStr(), std::ios_base::in);
StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false); StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
sfp->Start();
unsigned long restored = 0; unsigned long restored = 0;

View File

@ -37,13 +37,6 @@ FIFO::~FIFO(void)
free(m_Buffer); free(m_Buffer);
} }
void FIFO::Start(void)
{
SetConnected(true);
Stream::Start();
}
/** /**
* Resizes the FIFO's buffer so that it is at least newSize bytes long. * Resizes the FIFO's buffer so that it is at least newSize bytes long.
* *
@ -95,46 +88,16 @@ void FIFO::Optimize(void)
} }
/** /**
* Implements IOQueue::GetAvailableBytes(). * Implements IOQueue::Read.
*/ */
size_t FIFO::GetAvailableBytes(void) const size_t FIFO::Read(void *buffer, size_t count)
{ {
return m_DataSize;
}
/**
* Returns a pointer to the start of the read buffer.
*
* @returns Pointer to the read buffer.
*/
/*const void *FIFO::GetReadBuffer(void) const
{
return m_Buffer + m_Offset;
}*/
/**
* Implements IOQueue::Peek.
*/
size_t FIFO::Peek(void *buffer, size_t count)
{
ASSERT(IsConnected());
if (count > m_DataSize) if (count > m_DataSize)
count = m_DataSize; count = m_DataSize;
if (buffer != NULL) if (buffer != NULL)
memcpy(buffer, m_Buffer + m_Offset, count); memcpy(buffer, m_Buffer + m_Offset, count);
return count;
}
/**
* Implements IOQueue::Read.
*/
size_t FIFO::Read(void *buffer, size_t count)
{
count = Peek(buffer, count);
m_DataSize -= count; m_DataSize -= count;
m_Offset += count; m_Offset += count;
@ -148,9 +111,15 @@ size_t FIFO::Read(void *buffer, size_t count)
*/ */
void FIFO::Write(const void *buffer, size_t count) void FIFO::Write(const void *buffer, size_t count)
{ {
ASSERT(IsConnected());
ResizeBuffer(m_Offset + m_DataSize + count); ResizeBuffer(m_Offset + m_DataSize + count);
memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count); memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count; m_DataSize += count;
} }
void FIFO::Close(void)
{ }
size_t FIFO::GetAvailableBytes(void) const
{
return m_DataSize;
}

View File

@ -42,12 +42,11 @@ public:
FIFO(void); FIFO(void);
~FIFO(void); ~FIFO(void);
void Start(void); virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
virtual void Close(void);
size_t GetAvailableBytes(void) const; size_t GetAvailableBytes(void) const;
size_t Peek(void *buffer, size_t count);
size_t Read(void *buffer, size_t count);
void Write(const void *buffer, size_t count);
private: private:
char *m_Buffer; char *m_Buffer;

View File

@ -18,107 +18,117 @@
******************************************************************************/ ******************************************************************************/
#include "base/netstring.h" #include "base/netstring.h"
#include "base/utility.h"
#include <sstream> #include <sstream>
using namespace icinga; using namespace icinga;
/** /**
* Reads data from a stream in netString format. * Reads data from a stream in netstring format.
* *
* @param stream The stream to read from. * @param stream The stream to read from.
* @param[out] str The String that has been read from the IOQueue. * @param[out] str The String that has been read from the IOQueue.
* @returns true if a complete String was read from the IOQueue, false otherwise. * @returns true if a complete String was read from the IOQueue, false otherwise.
* @exception invalid_argument The input stream is invalid. * @exception invalid_argument The input stream is invalid.
* @see https://github.com/PeterScott/netString-c/blob/master/netString.c * @see https://github.com/PeterScott/netstring-c/blob/master/netstring.c
*/ */
bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str) bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
{ {
/* 16 bytes are enough for the header */ /* 16 bytes are enough for the header */
size_t peek_length, buffer_length = 16; const size_t header_length = 16;
char *buffer = static_cast<char *>(malloc(buffer_length)); size_t read_length;
char *header = static_cast<char *>(malloc(header_length));
if (buffer == NULL) if (header == NULL)
BOOST_THROW_EXCEPTION(std::bad_alloc()); BOOST_THROW_EXCEPTION(std::bad_alloc());
peek_length = stream->Peek(buffer, buffer_length); read_length = 0;
/* minimum netString length is 3 */ while (read_length < header_length) {
if (peek_length < 3) { /* Read one byte. */
free(buffer); int rc = stream->Read(header + read_length, 1);
return false;
if (rc == 0) {
if (read_length == 0)
return false;
BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
}
ASSERT(rc == 1);
read_length++;
if (header[read_length - 1] == ':') {
break;
} else if (header_length == read_length) {
free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
}
}
/* minimum netstring length is 3 */
if (read_length < 3) {
free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (short header)"));
} }
/* no leading zeros allowed */ /* no leading zeros allowed */
if (buffer[0] == '0' && isdigit(buffer[1])) { if (header[0] == '0' && isdigit(header[1])) {
free(buffer); free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid netString (leading zero)")); BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (leading zero)"));
} }
size_t len, i; size_t len, i;
len = 0; len = 0;
for (i = 0; i < peek_length && isdigit(buffer[i]); i++) { for (i = 0; i < read_length && isdigit(header[i]); i++) {
/* length specifier must have at most 9 characters */ /* length specifier must have at most 9 characters */
if (i >= 9) { if (i >= 9) {
free(buffer); free(header);
BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters")); BOOST_THROW_EXCEPTION(std::invalid_argument("Length specifier must not exceed 9 characters"));
} }
len = len * 10 + (buffer[i] - '0'); len = len * 10 + (header[i] - '0');
} }
free(header);
/* read the whole message */ /* read the whole message */
buffer_length = i + 1 + len + 1; size_t data_length = len + 1;
char *new_buffer = static_cast<char *>(realloc(buffer, buffer_length)); char *data = static_cast<char *>(malloc(data_length));
if (new_buffer == NULL) { if (data == NULL) {
free(buffer);
BOOST_THROW_EXCEPTION(std::bad_alloc()); BOOST_THROW_EXCEPTION(std::bad_alloc());
} }
buffer = new_buffer; size_t rc = stream->Read(data, data_length);
if (rc != data_length)
BOOST_THROW_EXCEPTION(std::runtime_error("Read() failed."));
peek_length = stream->Peek(buffer, buffer_length); if (data[len] != ',')
if (peek_length < buffer_length)
return false;
/* check for the colon delimiter */
if (buffer[i] != ':') {
free(buffer);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing :)"));
}
/* check for the comma delimiter after the String */
if (buffer[i + 1 + len] != ',') {
free(buffer);
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)")); BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid NetString (missing ,)"));
}
*str = String(&data[0], &data[len]);
*str = String(&buffer[i + 1], &buffer[i + 1 + len]); free(data);
free(buffer);
/* remove the data from the stream */
stream->Read(NULL, peek_length);
return true; return true;
} }
/** /**
* Writes data into a stream using the netString format. * Writes data into a stream using the netstring format.
* *
* @param stream The stream. * @param stream The stream.
* @param str The String that is to be written. * @param str The String that is to be written.
*/ */
void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str) void NetString::WriteStringToStream(const Stream::Ptr& stream, const String& str)
{ {
std::ostringstream prefixbuf; std::ostringstream msgbuf;
prefixbuf << str.GetLength() << ":"; msgbuf << str.GetLength() << ":" << str << ",";
String prefix = prefixbuf.str(); String msg = msgbuf.str();
stream->Write(prefix.CStr(), prefix.GetLength()); stream->Write(msg.CStr(), msg.GetLength());
stream->Write(str.CStr(), str.GetLength());
stream->Write(",", 1);
} }

View File

@ -17,29 +17,45 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#include "base/connection.h" #include "base/networkstream.h"
#include <boost/bind.hpp> #include "base/objectlock.h"
#include "base/utility.h"
#include <boost/algorithm/string/trim.hpp>
using namespace icinga; using namespace icinga;
Connection::Connection(const Stream::Ptr& stream) NetworkStream::NetworkStream(const Socket::Ptr& socket)
: m_Stream(stream) : m_Socket(socket)
{ }
void NetworkStream::Close(void)
{ {
m_Stream->OnDataAvailable.connect(boost::bind(&Connection::ProcessData, this)); m_Socket->Close();
m_Stream->OnClosed.connect(boost::bind(&Connection::ClosedHandler, this));
} }
Stream::Ptr Connection::GetStream(void) const /**
* Reads data from the stream.
*
* @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read.
*/
size_t NetworkStream::Read(void *buffer, size_t count)
{ {
return m_Stream; return m_Socket->Read(buffer, count);
} }
void Connection::ClosedHandler(void) /**
* Writes data to the stream.
*
* @param buffer The data that is to be written.
* @param count The number of bytes to write.
* @returns The number of bytes written
*/
void NetworkStream::Write(const void *buffer, size_t count)
{ {
OnClosed(GetSelf()); size_t rc = m_Socket->Write(buffer, count);
} if (rc < count)
BOOST_THROW_EXCEPTION(std::runtime_error("Short write for socket."));
void Connection::Close(void)
{
m_Stream->Close();
} }

View File

@ -17,39 +17,38 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#ifndef CONNECTION_H #ifndef NETWORKSTREAM_H
#define CONNECTION_H #define NETWORKSTREAM_H
#include "base/i2-base.h" #include "base/i2-base.h"
#include "base/stream.h" #include "base/stream.h"
#include <boost/signals2.hpp> #include "base/socket.h"
namespace icinga namespace icinga
{ {
class I2_BASE_API Connection : public Object /**
* A network stream.
*
* @ingroup base
*/
class I2_BASE_API NetworkStream : public Stream
{ {
public: public:
typedef shared_ptr<Connection> Ptr; typedef shared_ptr<NetworkStream> Ptr;
typedef weak_ptr<Connection> WeakPtr; typedef weak_ptr<NetworkStream> WeakPtr;
explicit Connection(const Stream::Ptr& stream); NetworkStream(const Socket::Ptr& socket);
Stream::Ptr GetStream(void) const; virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
void Close(void); virtual void Close(void);
boost::signals2::signal<void (const Connection::Ptr&)> OnClosed;
protected:
virtual void ProcessData(void) = 0;
private: private:
Stream::Ptr m_Stream; Socket::Ptr m_Socket;
void ClosedHandler(void);
}; };
} }
#endif /* CONNECTION_H */ #endif /* NETWORKSTREAM_H */

View File

@ -34,11 +34,16 @@ using namespace icinga;
* Constructor for the Socket class. * Constructor for the Socket class.
*/ */
Socket::Socket(void) Socket::Socket(void)
: m_FD(INVALID_SOCKET), m_Connected(false), m_Listening(false), : m_FD(INVALID_SOCKET)
m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>()) { }
/**
* Constructor for the Socket class.
*/
Socket::Socket(SOCKET fd)
: m_FD(INVALID_SOCKET)
{ {
m_SendQueue->Start(); SetFD(fd);
m_RecvQueue->Start();
} }
/** /**
@ -46,28 +51,7 @@ Socket::Socket(void)
*/ */
Socket::~Socket(void) Socket::~Socket(void)
{ {
m_SendQueue->Close(); Close();
m_RecvQueue->Close();
CloseInternal();
}
/**
* Starts I/O processing for this socket.
*/
void Socket::Start(void)
{
ASSERT(!m_ReadThread.joinable() && !m_WriteThread.joinable());
ASSERT(GetFD() != INVALID_SOCKET);
// TODO: figure out why we're not using "this" here (hint: to keep the object alive until the threads are done)
m_ReadThread = boost::thread(boost::bind(&Socket::ReadThreadProc, static_cast<Socket::Ptr>(GetSelf())));
m_ReadThread.detach();
m_WriteThread = boost::thread(boost::bind(&Socket::WriteThreadProc, static_cast<Socket::Ptr>(GetSelf())));
m_WriteThread.detach();
Stream::Start();
} }
/** /**
@ -77,16 +61,14 @@ void Socket::Start(void)
*/ */
void Socket::SetFD(SOCKET fd) void Socket::SetFD(SOCKET fd)
{ {
ObjectLock olock(this);
/* mark the socket as non-blocking and close-on-exec */
if (fd != INVALID_SOCKET) { if (fd != INVALID_SOCKET) {
Utility::SetNonBlockingSocket(fd); /* mark the socket as close-on-exec */
#ifndef _WIN32 #ifndef _WIN32
Utility::SetCloExec(fd); Utility::SetCloExec(fd);
#endif /* _WIN32 */ #endif /* _WIN32 */
} }
ObjectLock olock(this);
m_FD = fd; m_FD = fd;
} }
@ -105,28 +87,15 @@ SOCKET Socket::GetFD(void) const
/** /**
* Closes the socket. * Closes the socket.
*/ */
void Socket::CloseInternal(void)
{
{
ObjectLock olock(this);
if (m_FD != INVALID_SOCKET) {
closesocket(m_FD);
m_FD = INVALID_SOCKET;
}
}
Stream::Close();
}
/**
* Shuts down the socket.
*/
void Socket::Close(void) void Socket::Close(void)
{ {
SetWriteEOF(true); ObjectLock olock(this);
}
if (m_FD != INVALID_SOCKET) {
closesocket(m_FD);
m_FD = INVALID_SOCKET;
}
}
/** /**
* Retrieves the last error that occured for the socket. * Retrieves the last error that occured for the socket.
@ -146,24 +115,6 @@ int Socket::GetError(void) const
return 0; return 0;
} }
/**
* Processes errors that have occured for the socket.
*/
void Socket::HandleException(void)
{
ObjectLock olock(this);
#ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("select")
<< boost::errinfo_errno(GetError()));
#else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("select")
<< errinfo_win32_error(GetError()));
#endif /* _WIN32 */
}
/** /**
* Formats a sockaddr in a human-readable way. * Formats a sockaddr in a human-readable way.
* *
@ -246,235 +197,6 @@ String Socket::GetPeerAddress(void)
return GetAddressFromSockaddr((sockaddr *)&sin, len); return GetAddressFromSockaddr((sockaddr *)&sin, len);
} }
/**
* Read thread procedure for sockets. This function waits until the
* socket is readable and processes inbound data.
*/
void Socket::ReadThreadProc(void)
{
boost::mutex::scoped_lock lock(m_SocketMutex);
for (;;) {
fd_set readfds, exceptfds;
FD_ZERO(&readfds);
FD_ZERO(&exceptfds);
int fd = GetFD();
if (fd == INVALID_SOCKET)
return;
if (WantsToRead())
FD_SET(fd, &readfds);
FD_SET(fd, &exceptfds);
lock.unlock();
timeval tv;
tv.tv_sec = 5;
tv.tv_usec = 0;
int rc = select(fd + 1, &readfds, NULL, &exceptfds, &tv);
lock.lock();
if (GetFD() == INVALID_SOCKET)
return;
try {
if (rc < 0) {
#ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("select")
<< boost::errinfo_errno(errno));
#else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("select")
<< errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */
}
if (FD_ISSET(fd, &readfds))
HandleReadable();
if (FD_ISSET(fd, &exceptfds))
HandleException();
} catch (...) {
SetException(boost::current_exception());
CloseInternal();
break;
}
if (WantsToWrite())
m_WriteCV.notify_all(); /* notify Write thread */
}
}
/**
* Write thread procedure for sockets. This function waits until the socket
* is writable and processes outbound data.
*/
void Socket::WriteThreadProc(void)
{
boost::mutex::scoped_lock lock(m_SocketMutex);
for (;;) {
fd_set writefds;
FD_ZERO(&writefds);
while (!WantsToWrite()) {
m_WriteCV.timed_wait(lock, boost::posix_time::seconds(1));
if (GetFD() == INVALID_SOCKET)
return;
}
int fd = GetFD();
if (fd == INVALID_SOCKET)
return;
FD_SET(fd, &writefds);
lock.unlock();
int rc = select(fd + 1, NULL, &writefds, NULL, NULL);
lock.lock();
if (GetFD() == INVALID_SOCKET)
return;
try {
if (rc < 0) {
#ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("select")
<< boost::errinfo_errno(errno));
#else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("select")
<< errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */
}
if (FD_ISSET(fd, &writefds))
HandleWritable();
} catch (...) {
SetException(boost::current_exception());
CloseInternal();
break;
}
}
}
/**
* Sets whether the socket is fully connected.
*
* @param connected Whether the socket is connected
*/
void Socket::SetConnected(bool connected)
{
ObjectLock olock(this);
m_Connected = connected;
}
/**
* Checks whether the socket is fully connected.
*
* @returns true if the socket is connected, false otherwise
*/
bool Socket::IsConnected(void) const
{
ObjectLock olock(this);
return m_Connected;
}
/**
* Returns how much data is available for reading.
*
* @returns The number of bytes available.
*/
size_t Socket::GetAvailableBytes(void) const
{
ObjectLock olock(this);
if (m_Listening)
throw new std::logic_error("Socket does not support GetAvailableBytes().");
return m_RecvQueue->GetAvailableBytes();
}
/**
* Reads data from the socket.
*
* @param buffer The buffer where the data should be stored.
* @param size The size of the buffer.
* @returns The number of bytes read.
*/
size_t Socket::Read(void *buffer, size_t size)
{
{
ObjectLock olock(this);
if (m_Listening)
throw new std::logic_error("Socket does not support Read().");
}
if (m_RecvQueue->GetAvailableBytes() == 0)
CheckException();
return m_RecvQueue->Read(buffer, size);
}
/**
* Peeks at data for the socket.
*
* @param buffer The buffer where the data should be stored.
* @param size The size of the buffer.
* @returns The number of bytes read.
*/
size_t Socket::Peek(void *buffer, size_t size)
{
{
ObjectLock olock(this);
if (m_Listening)
throw new std::logic_error("Socket does not support Peek().");
}
if (m_RecvQueue->GetAvailableBytes() == 0)
CheckException();
return m_RecvQueue->Peek(buffer, size);
}
/**
* Writes data to the socket.
*
* @param buffer The buffer that should be sent.
* @param size The size of the buffer.
*/
void Socket::Write(const void *buffer, size_t size)
{
{
ObjectLock olock(this);
if (m_Listening)
throw new std::logic_error("Socket does not support Write().");
}
m_SendQueue->Write(buffer, size);
}
/** /**
* Starts listening for incoming client connections. * Starts listening for incoming client connections.
*/ */
@ -491,134 +213,56 @@ void Socket::Listen(void)
<< errinfo_win32_error(WSAGetLastError())); << errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */ #endif /* _WIN32 */
} }
{
ObjectLock olock(this);
m_Listening = true;
}
}
void Socket::HandleWritable(void)
{
if (m_Listening)
HandleWritableServer();
else
HandleWritableClient();
}
void Socket::HandleReadable(void)
{
if (m_Listening)
HandleReadableServer();
else
HandleReadableClient();
} }
/** /**
* Processes data that is available for this socket. * Processes data that is available for this socket.
*/ */
void Socket::HandleWritableClient(void) size_t Socket::Write(const void *buffer, size_t count)
{ {
int rc; int rc = send(GetFD(), buffer, count, 0);
char data[1024];
size_t count;
if (!IsConnected()) if (rc < 0) {
SetConnected(true);
for (;;) {
count = m_SendQueue->Peek(data, sizeof(data));
if (count == 0) {
if (IsWriteEOF())
CloseInternal();
break;
}
rc = send(GetFD(), data, count, 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
break;
if (rc <= 0) {
#ifndef _WIN32 #ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error() BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("send") << boost::errinfo_api_function("send")
<< boost::errinfo_errno(errno)); << boost::errinfo_errno(errno));
#else /* _WIN32 */ #else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error() BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("send") << boost::errinfo_api_function("send")
<< errinfo_win32_error(WSAGetLastError())); << errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */ #endif /* _WIN32 */
}
m_SendQueue->Read(NULL, rc);
} }
return rc;
} }
/** /**
* Processes data that can be written for this socket. * Processes data that can be written for this socket.
*/ */
void Socket::HandleReadableClient(void) size_t Socket::Read(void *buffer, size_t count)
{ {
if (!IsConnected()) int rc = recv(GetFD(), buffer, count, 0);
SetConnected(true);
bool new_data = false; if (rc < 0) {
for (;;) {
char data[1024];
int rc = recv(GetFD(), data, sizeof(data), 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
#else /* _WIN32 */
if (rc < 0 && errno == EAGAIN)
#endif /* _WIN32 */
break;
if (rc < 0) {
#ifndef _WIN32 #ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error() BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("recv") << boost::errinfo_api_function("recv")
<< boost::errinfo_errno(errno)); << boost::errinfo_errno(errno));
#else /* _WIN32 */ #else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error() BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("recv") << boost::errinfo_api_function("recv")
<< errinfo_win32_error(WSAGetLastError())); << errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */ #endif /* _WIN32 */
}
new_data = true;
if (rc == 0) {
SetReadEOF(true);
break;
}
m_RecvQueue->Write(data, rc);
} }
if (new_data) return rc;
OnDataAvailable(GetSelf());
}
void Socket::HandleWritableServer(void)
{
throw std::logic_error("This should never happen.");
} }
/** /**
* Accepts a new client and creates a new client object for it * Accepts a new client and creates a new client object for it.
* using the client factory function.
*/ */
void Socket::HandleReadableServer(void) Socket::Ptr Socket::Accept(void)
{ {
int fd; int fd;
sockaddr_storage addr; sockaddr_storage addr;
@ -627,87 +271,16 @@ void Socket::HandleReadableServer(void)
fd = accept(GetFD(), (sockaddr *)&addr, &addrlen); fd = accept(GetFD(), (sockaddr *)&addr, &addrlen);
if (fd < 0) { if (fd < 0) {
#ifndef _WIN32 #ifndef _WIN32
BOOST_THROW_EXCEPTION(socket_error() BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("accept") << boost::errinfo_api_function("accept")
<< boost::errinfo_errno(errno)); << boost::errinfo_errno(errno));
#else /* _WIN32 */ #else /* _WIN32 */
BOOST_THROW_EXCEPTION(socket_error() BOOST_THROW_EXCEPTION(socket_error()
<< boost::errinfo_api_function("accept") << boost::errinfo_api_function("accept")
<< errinfo_win32_error(WSAGetLastError())); << errinfo_win32_error(WSAGetLastError()));
#endif /* _WIN32 */ #endif /* _WIN32 */
} }
Socket::Ptr client = boost::make_shared<Socket>(); return boost::make_shared<Socket>(fd);
client->SetFD(fd);
OnNewClient(GetSelf(), client);
}
/**
* Checks whether data should be written for this socket object.
*
* @returns true if the socket should be registered for writing, false otherwise.
*/
bool Socket::WantsToWrite(void) const
{
if (m_Listening)
return WantsToWriteServer();
else
return WantsToWriteClient();
}
/**
* Checks whether data should be read for this socket object.
*
* @returns true if the socket should be registered for reading, false otherwise.
*/
bool Socket::WantsToRead(void) const
{
if (m_Listening)
return WantsToReadServer();
else
return WantsToReadClient();
}
/**
* Checks whether data should be read for this socket.
*
* @returns true
*/
bool Socket::WantsToReadClient(void) const
{
return !IsReadEOF();
}
/**
* Checks whether data should be written for this socket.
*
* @returns true if data should be written, false otherwise.
*/
bool Socket::WantsToWriteClient(void) const
{
if (m_SendQueue->GetAvailableBytes() > 0)
return true;
return (!IsConnected());
}
/**
* Checks whether the TCP server wants to write.
*
* @returns false
*/
bool Socket::WantsToWriteServer(void) const
{
return false;
}
/**
* Checks whether the TCP server wants to read (i.e. accept new clients).
*
* @returns true
*/
bool Socket::WantsToReadServer(void) const
{
return true;
} }

View File

@ -21,7 +21,7 @@
#define SOCKET_H #define SOCKET_H
#include "base/i2-base.h" #include "base/i2-base.h"
#include "base/fifo.h" #include "base/stream.h"
#include <boost/thread/thread.hpp> #include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp> #include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp> #include <boost/thread/condition_variable.hpp>
@ -34,32 +34,26 @@ namespace icinga {
* *
* @ingroup base * @ingroup base
*/ */
class I2_BASE_API Socket : public Stream class I2_BASE_API Socket : public Object
{ {
public: public:
typedef shared_ptr<Socket> Ptr; typedef shared_ptr<Socket> Ptr;
typedef weak_ptr<Socket> WeakPtr; typedef weak_ptr<Socket> WeakPtr;
Socket(void); Socket(void);
Socket(SOCKET fd);
~Socket(void); ~Socket(void);
virtual void Start(void); void Close(void);
virtual void Close(void);
String GetClientAddress(void); String GetClientAddress(void);
String GetPeerAddress(void); String GetPeerAddress(void);
bool IsConnected(void) const; size_t Read(void *buffer, size_t size);
size_t Write(const void *buffer, size_t size);
virtual size_t GetAvailableBytes(void) const;
virtual size_t Read(void *buffer, size_t size);
virtual size_t Peek(void *buffer, size_t size);
virtual void Write(const void *buffer, size_t size);
void Listen(void); void Listen(void);
Socket::Ptr Accept(void);
boost::signals2::signal<void (const Socket::Ptr&, const Socket::Ptr&)> OnNewClient;
protected: protected:
void SetFD(SOCKET fd); void SetFD(SOCKET fd);
@ -73,44 +67,8 @@ protected:
private: private:
SOCKET m_FD; /**< The socket descriptor. */ SOCKET m_FD; /**< The socket descriptor. */
bool m_Connected;
bool m_Listening;
boost::thread m_ReadThread;
boost::thread m_WriteThread;
boost::condition_variable m_WriteCV;
void ReadThreadProc(void);
void WriteThreadProc(void);
void ExceptionEventHandler(void);
static String GetAddressFromSockaddr(sockaddr *address, socklen_t len); static String GetAddressFromSockaddr(sockaddr *address, socklen_t len);
void CloseInternal(void);
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
void HandleWritableClient(void);
void HandleReadableClient(void);
void HandleWritableServer(void);
void HandleReadableServer(void);
void HandleReadable(void);
void HandleWritable(void);
void HandleException(void);
bool WantsToWriteClient(void) const;
bool WantsToReadClient(void) const;
bool WantsToWriteServer(void) const;
bool WantsToReadServer(void) const;
bool WantsToWrite(void) const;
bool WantsToRead(void) const;
}; };
class socket_error : virtual public std::exception, virtual public boost::exception { }; class socket_error : virtual public std::exception, virtual public boost::exception { };

View File

@ -31,66 +31,15 @@ using namespace icinga;
* the stream's destructor deletes the inner stream. * the stream's destructor deletes the inner stream.
*/ */
StdioStream::StdioStream(std::iostream *innerStream, bool ownsStream) StdioStream::StdioStream(std::iostream *innerStream, bool ownsStream)
: m_InnerStream(innerStream), m_OwnsStream(ownsStream), : m_InnerStream(innerStream), m_OwnsStream(ownsStream)
m_ReadAheadBuffer(boost::make_shared<FIFO>()) { }
{
m_ReadAheadBuffer->Start();
}
/**
* Destructor for the StdioStream class.
*/
StdioStream::~StdioStream(void)
{
m_ReadAheadBuffer->Close();
}
void StdioStream::Start(void)
{
SetConnected(true);
Stream::Start();
}
size_t StdioStream::GetAvailableBytes(void) const
{
ObjectLock olock(this);
if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0)
return 0;
else
return 1024; /* doesn't have to be accurate */
}
size_t StdioStream::Read(void *buffer, size_t size) size_t StdioStream::Read(void *buffer, size_t size)
{ {
ObjectLock olock(this); ObjectLock olock(this);
size_t peek_len, read_len; m_InnerStream->read(static_cast<char *>(buffer), size);
return m_InnerStream->gcount();
peek_len = m_ReadAheadBuffer->GetAvailableBytes();
peek_len = m_ReadAheadBuffer->Read(buffer, peek_len);
m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
read_len = m_InnerStream->gcount();
return peek_len + read_len;
}
size_t StdioStream::Peek(void *buffer, size_t size)
{
ObjectLock olock(this);
size_t peek_len, read_len;
peek_len = m_ReadAheadBuffer->GetAvailableBytes();
peek_len = m_ReadAheadBuffer->Peek(buffer, peek_len);
m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
read_len = m_InnerStream->gcount();
m_ReadAheadBuffer->Write(static_cast<char *>(buffer) + peek_len, read_len);
return peek_len + read_len;
} }
void StdioStream::Write(const void *buffer, size_t size) void StdioStream::Write(const void *buffer, size_t size)
@ -104,6 +53,4 @@ void StdioStream::Close(void)
{ {
if (m_OwnsStream) if (m_OwnsStream)
delete m_InnerStream; delete m_InnerStream;
Stream::Close();
} }

View File

@ -33,13 +33,8 @@ public:
typedef weak_ptr<StdioStream> WeakPtr; typedef weak_ptr<StdioStream> WeakPtr;
StdioStream(std::iostream *innerStream, bool ownsStream); StdioStream(std::iostream *innerStream, bool ownsStream);
~StdioStream(void);
virtual void Start(void);
virtual size_t GetAvailableBytes(void) const;
virtual size_t Read(void *buffer, size_t size); virtual size_t Read(void *buffer, size_t size);
virtual size_t Peek(void *buffer, size_t size);
virtual void Write(const void *buffer, size_t size); virtual void Write(const void *buffer, size_t size);
virtual void Close(void); virtual void Close(void);
@ -47,7 +42,6 @@ public:
private: private:
std::iostream *m_InnerStream; std::iostream *m_InnerStream;
bool m_OwnsStream; bool m_OwnsStream;
FIFO::Ptr m_ReadAheadBuffer;
}; };
} }

View File

@ -24,112 +24,10 @@
using namespace icinga; using namespace icinga;
Stream::Stream(void)
: m_Connected(false), m_ReadEOF(false), m_WriteEOF(false)
{ }
Stream::~Stream(void)
{
ASSERT(!m_Running);
}
bool Stream::IsConnected(void) const
{
ObjectLock olock(this);
return m_Connected;
}
bool Stream::IsReadEOF(void) const
{
ObjectLock olock(this);
return m_ReadEOF;
}
bool Stream::IsWriteEOF(void) const
{
ObjectLock olock(this);
return m_WriteEOF;
}
void Stream::SetConnected(bool connected)
{
bool changed;
{
ObjectLock olock(this);
changed = (m_Connected != connected);
m_Connected = connected;
}
if (changed) {
if (connected)
OnConnected(GetSelf());
else
OnClosed(GetSelf());
}
}
void Stream::SetReadEOF(bool eof)
{
ObjectLock olock(this);
m_ReadEOF = eof;
}
void Stream::SetWriteEOF(bool eof)
{
ObjectLock olock(this);
m_WriteEOF = eof;
}
/**
* Checks whether an exception is available for this stream and re-throws
* the exception if there is one.
*/
void Stream::CheckException(void)
{
ObjectLock olock(this);
if (m_Exception)
rethrow_exception(m_Exception);
}
void Stream::SetException(boost::exception_ptr exception)
{
ObjectLock olock(this);
m_Exception = exception;
}
boost::exception_ptr Stream::GetException(void)
{
return m_Exception;
}
void Stream::Start(void)
{
ObjectLock olock(this);
m_Running = true;
}
void Stream::Close(void)
{
{
ObjectLock olock(this);
m_Running = false;
}
SetConnected(false);
}
bool Stream::ReadLine(String *line, size_t maxLength) bool Stream::ReadLine(String *line, size_t maxLength)
{ {
BOOST_THROW_EXCEPTION(std::runtime_error("Not implemented."));
/*
char *buffer = new char[maxLength]; char *buffer = new char[maxLength];
size_t rc = Peek(buffer, maxLength); size_t rc = Peek(buffer, maxLength);
@ -161,7 +59,7 @@ bool Stream::ReadLine(String *line, size_t maxLength)
return true; return true;
} }
delete buffer; delete buffer;*/
return false; return false;
} }

View File

@ -40,28 +40,6 @@ public:
typedef shared_ptr<Stream> Ptr; typedef shared_ptr<Stream> Ptr;
typedef weak_ptr<Stream> WeakPtr; typedef weak_ptr<Stream> WeakPtr;
Stream(void);
~Stream(void);
virtual void Start(void);
/**
* Retrieves the number of bytes available for reading.
*
* @returns The number of available bytes.
*/
virtual size_t GetAvailableBytes(void) const = 0;
/**
* Reads data from the stream without advancing the read pointer.
*
* @param buffer The buffer where data should be stored. May be NULL if
* you're not actually interested in the data.
* @param count The number of bytes to read from the queue.
* @returns The number of bytes actually read.
*/
virtual size_t Peek(void *buffer, size_t count) = 0;
/** /**
* Reads data from the stream. * Reads data from the stream.
* *
@ -84,34 +62,9 @@ public:
/** /**
* Closes the stream and releases resources. * Closes the stream and releases resources.
*/ */
virtual void Close(void); virtual void Close(void) = 0;
bool IsConnected(void) const;
bool IsReadEOF(void) const;
bool IsWriteEOF(void) const;
bool ReadLine(String *line, size_t maxLength = 4096); bool ReadLine(String *line, size_t maxLength = 4096);
boost::exception_ptr GetException(void);
void CheckException(void);
boost::signals2::signal<void (const Stream::Ptr&)> OnConnected;
boost::signals2::signal<void (const Stream::Ptr&)> OnDataAvailable;
boost::signals2::signal<void (const Stream::Ptr&)> OnClosed;
protected:
void SetConnected(bool connected);
void SetReadEOF(bool eof);
void SetWriteEOF(bool eof);
void SetException(boost::exception_ptr exception);
private:
bool m_Running;
bool m_Connected;
bool m_ReadEOF;
bool m_WriteEOF;
boost::exception_ptr m_Exception;
}; };
} }

View File

@ -123,7 +123,6 @@ static int I2Stream_write(BIO *bi, const char *in, int inl)
{ {
I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr; I2Stream_bio_t *bp = (I2Stream_bio_t *)bi->ptr;
bp->StreamObj->Write(in, inl); bp->StreamObj->Write(in, inl);
return inl; return inl;
} }

View File

@ -31,7 +31,7 @@ using namespace icinga;
* @param service The service. * @param service The service.
* @param family The address family for the socket. * @param family The address family for the socket.
*/ */
void TcpSocket::Bind(String service, int family) void TcpSocket::Bind(const String& service, int family)
{ {
Bind(String(), service, family); Bind(String(), service, family);
} }
@ -43,7 +43,7 @@ void TcpSocket::Bind(String service, int family)
* @param service The service. * @param service The service.
* @param family The address family for the socket. * @param family The address family for the socket.
*/ */
void TcpSocket::Bind(String node, String service, int family) void TcpSocket::Bind(const String& node, const String& service, int family)
{ {
addrinfo hints; addrinfo hints;
addrinfo *result; addrinfo *result;
@ -75,8 +75,6 @@ void TcpSocket::Bind(String node, String service, int family)
if (fd == INVALID_SOCKET) if (fd == INVALID_SOCKET)
continue; continue;
SetFD(fd);
const int optFalse = 0; const int optFalse = 0;
setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse)); setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, reinterpret_cast<const char *>(&optFalse), sizeof(optFalse));
@ -87,23 +85,20 @@ void TcpSocket::Bind(String node, String service, int family)
int rc = bind(fd, info->ai_addr, info->ai_addrlen); int rc = bind(fd, info->ai_addr, info->ai_addrlen);
#ifdef _WIN32 if (rc < 0) {
if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
#else /* _WIN32 */
if (rc < 0 && errno != EINPROGRESS) {
#endif /* _WIN32 */
closesocket(fd); closesocket(fd);
SetFD(INVALID_SOCKET);
continue; continue;
} }
SetFD(fd);
break; break;
} }
freeaddrinfo(result); freeaddrinfo(result);
if (fd == INVALID_SOCKET) if (GetFD() == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket.")); BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket."));
} }
@ -145,31 +140,21 @@ void TcpSocket::Connect(const String& node, const String& service)
if (fd == INVALID_SOCKET) if (fd == INVALID_SOCKET)
continue; continue;
SetFD(fd);
rc = connect(fd, info->ai_addr, info->ai_addrlen); rc = connect(fd, info->ai_addr, info->ai_addrlen);
#ifdef _WIN32 if (rc < 0) {
if (rc < 0 && WSAGetLastError() != WSAEWOULDBLOCK) {
#else /* _WIN32 */
if (rc < 0 && errno != EINPROGRESS) {
#endif /* _WIN32 */
closesocket(fd); closesocket(fd);
SetFD(INVALID_SOCKET);
continue; continue;
} }
if (rc >= 0) { SetFD(fd);
SetConnected(true);
OnConnected(GetSelf());
}
break; break;
} }
freeaddrinfo(result); freeaddrinfo(result);
if (fd == INVALID_SOCKET) if (GetFD() == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Could not create a suitable socket.")); BOOST_THROW_EXCEPTION(std::runtime_error("Could not connect to remote host."));
} }

View File

@ -37,8 +37,8 @@ public:
typedef shared_ptr<TcpSocket> Ptr; typedef shared_ptr<TcpSocket> Ptr;
typedef weak_ptr<TcpSocket> WeakPtr; typedef weak_ptr<TcpSocket> WeakPtr;
void Bind(String service, int family); void Bind(const String& service, int family);
void Bind(String node, String service, int family); void Bind(const String& node, const String& service, int family);
void Connect(const String& node, const String& service); void Connect(const String& node, const String& service);
}; };

View File

@ -37,51 +37,39 @@ bool I2_EXPORT TlsStream::m_SSLIndexInitialized = false;
* @param sslContext The SSL context for the client. * @param sslContext The SSL context for the client.
*/ */
TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext) TlsStream::TlsStream(const Stream::Ptr& innerStream, TlsRole role, shared_ptr<SSL_CTX> sslContext)
: m_SSLContext(sslContext), m_SendQueue(boost::make_shared<FIFO>()), m_RecvQueue(boost::make_shared<FIFO>()), : m_SSLContext(sslContext), m_Role(role)
m_InnerStream(innerStream), m_Role(role)
{ {
m_InnerStream->OnDataAvailable.connect(boost::bind(&TlsStream::DataAvailableHandler, this)); m_InnerStream = dynamic_pointer_cast<BufferedStream>(innerStream);
m_InnerStream->OnClosed.connect(boost::bind(&TlsStream::ClosedHandler, this));
m_SendQueue->Start(); if (!m_InnerStream)
m_RecvQueue->Start(); m_InnerStream = boost::make_shared<BufferedStream>(innerStream);
}
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free);
void TlsStream::Start(void) m_SSLContext.reset();
{
{
boost::mutex::scoped_lock lock(m_SSLMutex);
m_SSL = shared_ptr<SSL>(SSL_new(m_SSLContext.get()), SSL_free); if (!m_SSL) {
BOOST_THROW_EXCEPTION(openssl_error()
m_SSLContext.reset(); << boost::errinfo_api_function("SSL_new")
<< errinfo_openssl_error(ERR_get_error()));
if (!m_SSL) {
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SSL_new")
<< errinfo_openssl_error(ERR_get_error()));
}
if (!m_SSLIndexInitialized) {
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
m_SSLIndexInitialized = true;
}
SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
m_BIO = BIO_new_I2Stream(m_InnerStream);
SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
if (m_Role == TlsRoleServer)
SSL_set_accept_state(m_SSL.get());
else
SSL_set_connect_state(m_SSL.get());
} }
Stream::Start(); if (!m_SSLIndexInitialized) {
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), NULL, NULL, NULL);
m_SSLIndexInitialized = true;
}
HandleIO(); SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_FAIL_IF_NO_PEER_CERT, NULL);
m_BIO = BIO_new_I2Stream(m_InnerStream);
SSL_set_bio(m_SSL.get(), m_BIO, m_BIO);
if (m_Role == TlsRoleServer)
SSL_set_accept_state(m_SSL.get());
else
SSL_set_connect_state(m_SSL.get());
} }
/** /**
@ -91,8 +79,6 @@ void TlsStream::Start(void)
*/ */
shared_ptr<X509> TlsStream::GetClientCertificate(void) const shared_ptr<X509> TlsStream::GetClientCertificate(void) const
{ {
boost::mutex::scoped_lock lock(m_SSLMutex);
return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter); return shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
} }
@ -103,90 +89,70 @@ shared_ptr<X509> TlsStream::GetClientCertificate(void) const
*/ */
shared_ptr<X509> TlsStream::GetPeerCertificate(void) const shared_ptr<X509> TlsStream::GetPeerCertificate(void) const
{ {
boost::mutex::scoped_lock lock(m_SSLMutex);
return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free); return shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
} }
void TlsStream::DataAvailableHandler(void) void TlsStream::Handshake(void)
{ {
try { ASSERT(!OwnsLock());
HandleIO();
} catch (...) {
SetException(boost::current_exception());
Close(); int rc;
}
}
void TlsStream::ClosedHandler(void)
{
ObjectLock olock(this); ObjectLock olock(this);
SetException(m_InnerStream->GetException()); while ((rc = SSL_do_handshake(m_SSL.get())) <= 0) {
Close(); switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ:
olock.Unlock();
m_InnerStream->WaitReadable(1);
olock.Lock();
continue;
case SSL_ERROR_WANT_WRITE:
olock.Unlock();
m_InnerStream->WaitWritable(1);
olock.Lock();
continue;
case SSL_ERROR_ZERO_RETURN:
Close();
return;
default:
I2Stream_check_exception(m_BIO);
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SSL_read")
<< errinfo_openssl_error(ERR_get_error()));
}
}
} }
/** /**
* Processes data for the stream. * Processes data for the stream.
*/ */
void TlsStream::HandleIO(void) size_t TlsStream::Read(void *buffer, size_t count)
{ {
ASSERT(!OwnsLock()); ASSERT(!OwnsLock());
char data[16 * 1024]; size_t left = count;
int rc;
if (!IsConnected()) { ObjectLock olock(this);
boost::mutex::scoped_lock lock(m_SSLMutex);
rc = SSL_do_handshake(m_SSL.get()); while (left > 0) {
int rc = SSL_read(m_SSL.get(), ((char *)buffer) + (count - left), left);
if (rc == 1) { if (rc <= 0) {
lock.unlock();
SetConnected(true);
} else {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_WRITE:
/* fall through */
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
return; olock.Unlock();
m_InnerStream->WaitReadable(1);
olock.Lock();
continue;
case SSL_ERROR_WANT_WRITE:
olock.Unlock();
m_InnerStream->WaitWritable(1);
olock.Lock();
continue;
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
Close(); Close();
return; return count - left;
default:
I2Stream_check_exception(m_BIO);
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SSL_do_handshake")
<< errinfo_openssl_error(ERR_get_error()));
}
}
}
bool new_data = false, read_ok = true;
while (read_ok) {
boost::mutex::scoped_lock lock(m_SSLMutex);
rc = SSL_read(m_SSL.get(), data, sizeof(data));
if (rc > 0) {
lock.unlock();
ObjectLock olock(this);
m_RecvQueue->Write(data, rc);
new_data = true;
} else {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_WRITE:
/* fall through */
case SSL_ERROR_WANT_READ:
read_ok = false;
break;
case SSL_ERROR_ZERO_RETURN:
Close();
return;
default: default:
I2Stream_check_exception(m_BIO); I2Stream_check_exception(m_BIO);
BOOST_THROW_EXCEPTION(openssl_error() BOOST_THROW_EXCEPTION(openssl_error()
@ -194,41 +160,36 @@ void TlsStream::HandleIO(void)
<< errinfo_openssl_error(ERR_get_error())); << errinfo_openssl_error(ERR_get_error()));
} }
} }
left -= rc;
} }
if (new_data) return count;
OnDataAvailable(GetSelf()); }
void TlsStream::Write(const void *buffer, size_t count)
{
ASSERT(!OwnsLock());
size_t left = count;
ObjectLock olock(this); ObjectLock olock(this);
while (m_SendQueue->GetAvailableBytes() > 0) { while (left > 0) {
size_t count = m_SendQueue->GetAvailableBytes(); int rc = SSL_write(m_SSL.get(), ((const char *)buffer) + (count - left), left);
if (count == 0) if (rc <= 0) {
break;
if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
olock.Unlock();
boost::mutex::scoped_lock lock(m_SSLMutex);
rc = SSL_write(m_SSL.get(), (const char *)data, count);
if (rc > 0) {
lock.unlock();
olock.Lock();
m_SendQueue->Read(NULL, rc);
} else {
switch (SSL_get_error(m_SSL.get(), rc)) { switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ: case SSL_ERROR_WANT_READ:
/* fall through */ olock.Unlock();
m_InnerStream->WaitReadable(1);
olock.Lock();
continue;
case SSL_ERROR_WANT_WRITE: case SSL_ERROR_WANT_WRITE:
return; olock.Unlock();
m_InnerStream->WaitWritable(1);
olock.Lock();
continue;
case SSL_ERROR_ZERO_RETURN: case SSL_ERROR_ZERO_RETURN:
Close(); Close();
return; return;
@ -239,6 +200,8 @@ void TlsStream::HandleIO(void)
<< errinfo_openssl_error(ERR_get_error())); << errinfo_openssl_error(ERR_get_error()));
} }
} }
left -= rc;
} }
} }
@ -247,51 +210,5 @@ void TlsStream::HandleIO(void)
*/ */
void TlsStream::Close(void) void TlsStream::Close(void)
{ {
{ m_InnerStream->Close();
boost::mutex::scoped_lock lock(m_SSLMutex);
if (m_SSL)
SSL_shutdown(m_SSL.get());
}
{
ObjectLock olock(this);
m_SendQueue->Close();
m_RecvQueue->Close();
}
Stream::Close();
}
size_t TlsStream::GetAvailableBytes(void) const
{
ObjectLock olock(this);
return m_RecvQueue->GetAvailableBytes();
}
size_t TlsStream::Peek(void *buffer, size_t count)
{
ObjectLock olock(this);
return m_RecvQueue->Peek(buffer, count);
}
size_t TlsStream::Read(void *buffer, size_t count)
{
ObjectLock olock(this);
return m_RecvQueue->Read(buffer, count);
}
void TlsStream::Write(const void *buffer, size_t count)
{
{
ObjectLock olock(this);
m_SendQueue->Write(buffer, count);
}
Utility::QueueAsyncCallback(boost::bind(&TlsStream::HandleIO, this));
} }

View File

@ -21,6 +21,7 @@
#define TLSSTREAM_H #define TLSSTREAM_H
#include "base/i2-base.h" #include "base/i2-base.h"
#include "base/bufferedstream.h"
#include "base/stream.h" #include "base/stream.h"
#include "base/fifo.h" #include "base/fifo.h"
#include "base/tlsutility.h" #include "base/tlsutility.h"
@ -50,34 +51,24 @@ public:
shared_ptr<X509> GetClientCertificate(void) const; shared_ptr<X509> GetClientCertificate(void) const;
shared_ptr<X509> GetPeerCertificate(void) const; shared_ptr<X509> GetPeerCertificate(void) const;
virtual void Start(void); void Handshake(void);
virtual void Close(void); virtual void Close(void);
virtual size_t GetAvailableBytes(void) const;
virtual size_t Peek(void *buffer, size_t count);
virtual size_t Read(void *buffer, size_t count); virtual size_t Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count); virtual void Write(const void *buffer, size_t count);
private: private:
shared_ptr<SSL_CTX> m_SSLContext; shared_ptr<SSL_CTX> m_SSLContext;
shared_ptr<SSL> m_SSL; shared_ptr<SSL> m_SSL;
mutable boost::mutex m_SSLMutex;
BIO *m_BIO; BIO *m_BIO;
FIFO::Ptr m_SendQueue; BufferedStream::Ptr m_InnerStream;
FIFO::Ptr m_RecvQueue;
Stream::Ptr m_InnerStream;
TlsRole m_Role; TlsRole m_Role;
static int m_SSLIndex; static int m_SSLIndex;
static bool m_SSLIndexInitialized; static bool m_SSLIndexInitialized;
void DataAvailableHandler(void);
void ClosedHandler(void);
void HandleIO(void);
static void NullCertificateDeleter(X509 *certificate); static void NullCertificateDeleter(X509 *certificate);
}; };

View File

@ -212,7 +212,7 @@ Value Value::Deserialize(const String& jsonString)
cJSON *json = cJSON_Parse(jsonString.CStr()); cJSON *json = cJSON_Parse(jsonString.CStr());
if (!json) if (!json)
BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String")); BOOST_THROW_EXCEPTION(std::runtime_error("Invalid JSON String: " + jsonString));
Value value = FromJson(json); Value value = FromJson(json);
cJSON_Delete(json); cJSON_Delete(json);

View File

@ -16,8 +16,8 @@ libremoting_la_SOURCES = \
endpointmanager.cpp \ endpointmanager.cpp \
endpointmanager.h \ endpointmanager.h \
i2-remoting.h \ i2-remoting.h \
jsonrpcconnection.cpp \ jsonrpc.cpp \
jsonrpcconnection.h \ jsonrpc.h \
messagepart.cpp \ messagepart.cpp \
messagepart.h \ messagepart.h \
remoting-type.cpp \ remoting-type.cpp \

View File

@ -19,6 +19,7 @@
#include "remoting/endpoint.h" #include "remoting/endpoint.h"
#include "remoting/endpointmanager.h" #include "remoting/endpointmanager.h"
#include "remoting/jsonrpc.h"
#include "base/application.h" #include "base/application.h"
#include "base/dynamictype.h" #include "base/dynamictype.h"
#include "base/objectlock.h" #include "base/objectlock.h"
@ -106,30 +107,28 @@ bool Endpoint::IsConnected(void) const
if (IsLocalEndpoint()) { if (IsLocalEndpoint()) {
return true; return true;
} else { } else {
JsonRpcConnection::Ptr client = GetClient(); return GetClient();
return (client && client->GetStream()->IsConnected());
} }
} }
JsonRpcConnection::Ptr Endpoint::GetClient(void) const Stream::Ptr Endpoint::GetClient(void) const
{ {
ObjectLock olock(this); ObjectLock olock(this);
return m_Client; return m_Client;
} }
void Endpoint::SetClient(const JsonRpcConnection::Ptr& client) void Endpoint::SetClient(const Stream::Ptr& client)
{ {
client->OnNewMessage.connect(boost::bind(&Endpoint::NewMessageHandler, this, _2));
client->OnClosed.connect(boost::bind(&Endpoint::ClientClosedHandler, this));
{ {
ObjectLock olock(this); ObjectLock olock(this);
m_Client = client; m_Client = client;
} }
boost::thread thread(boost::bind(&Endpoint::MessageThreadProc, this, client));
thread.detach();
OnConnected(GetSelf()); OnConnected(GetSelf());
} }
@ -261,7 +260,7 @@ void Endpoint::ProcessRequest(const Endpoint::Ptr& sender, const RequestMessage&
Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request)); Utility::QueueAsyncCallback(boost::bind(boost::ref(*it->second), GetSelf(), sender, request));
} else { } else {
GetClient()->SendMessage(request); JsonRpc::SendMessage(GetClient(), request);
} }
} }
@ -272,61 +271,53 @@ void Endpoint::ProcessResponse(const Endpoint::Ptr& sender, const ResponseMessag
if (IsLocalEndpoint()) if (IsLocalEndpoint())
EndpointManager::GetInstance()->ProcessResponseMessage(sender, response); EndpointManager::GetInstance()->ProcessResponseMessage(sender, response);
else else {
GetClient()->SendMessage(response); JsonRpc::SendMessage(GetClient(), response);
}
} }
void Endpoint::NewMessageHandler(const MessagePart& message) void Endpoint::MessageThreadProc(const Stream::Ptr& stream)
{ {
Endpoint::Ptr sender = GetSelf(); try {
for (;;) {
MessagePart message = JsonRpc::ReadMessage(stream);
Endpoint::Ptr sender = GetSelf();
if (ResponseMessage::IsResponseMessage(message)) { if (ResponseMessage::IsResponseMessage(message)) {
/* rather than routing the message to the right virtual /* rather than routing the message to the right virtual
* endpoint we just process it here right away. */ * endpoint we just process it here right away. */
EndpointManager::GetInstance()->ProcessResponseMessage(sender, message); EndpointManager::GetInstance()->ProcessResponseMessage(sender, message);
return; return;
}
RequestMessage request = message;
String method;
if (!request.GetMethod(&method))
return;
String id;
if (request.GetID(&id))
EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
}
} catch (const std::exception& ex) {
Log(LogWarning, "jsonrpc", "Lost connection to endpoint '" + GetName() + "': " + boost::diagnostic_information(ex));
{
ObjectLock olock(this);
// TODO: _only_ clear non-persistent subscriptions
// unregister ourselves if no persistent subscriptions are left (use a
// timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
m_Client.reset();
}
OnDisconnected(GetSelf());
} }
RequestMessage request = message;
String method;
if (!request.GetMethod(&method))
return;
String id;
if (request.GetID(&id))
EndpointManager::GetInstance()->SendAnycastMessage(sender, request);
else
EndpointManager::GetInstance()->SendMulticastMessage(sender, request);
}
void Endpoint::ClientClosedHandler(void)
{
ASSERT(!OwnsLock());
/*try {
GetClient()->CheckException();
} catch (const exception& ex) {
stringstream message;
message << "Error occured for JSON-RPC socket: Message=" << diagnostic_information(ex);
Log(LogWarning, "jsonrpc", message.str());
}*/
Log(LogWarning, "jsonrpc", "Lost connection to endpoint: identity=" + GetName());
{
ObjectLock olock(this);
// TODO: _only_ clear non-persistent subscriptions
// unregister ourselves if no persistent subscriptions are left (use a
// timer for that, once we have a TTL property for the topics)
ClearSubscriptions();
m_Client.reset();
}
OnDisconnected(GetSelf());
} }
/** /**

View File

@ -23,8 +23,8 @@
#include "remoting/i2-remoting.h" #include "remoting/i2-remoting.h"
#include "remoting/requestmessage.h" #include "remoting/requestmessage.h"
#include "remoting/responsemessage.h" #include "remoting/responsemessage.h"
#include "remoting/jsonrpcconnection.h"
#include "base/dynamicobject.h" #include "base/dynamicobject.h"
#include "base/stream.h"
#include <boost/signals2.hpp> #include <boost/signals2.hpp>
namespace icinga namespace icinga
@ -50,8 +50,8 @@ public:
static Endpoint::Ptr GetByName(const String& name); static Endpoint::Ptr GetByName(const String& name);
JsonRpcConnection::Ptr GetClient(void) const; Stream::Ptr GetClient(void) const;
void SetClient(const JsonRpcConnection::Ptr& client); void SetClient(const Stream::Ptr& client);
void RegisterSubscription(const String& topic); void RegisterSubscription(const String& topic);
void UnregisterSubscription(const String& topic); void UnregisterSubscription(const String& topic);
@ -85,7 +85,7 @@ private:
Attribute<String> m_Node; Attribute<String> m_Node;
Attribute<String> m_Service; Attribute<String> m_Service;
JsonRpcConnection::Ptr m_Client; Stream::Ptr m_Client;
bool m_ReceivedWelcome; /**< Have we received a welcome message bool m_ReceivedWelcome; /**< Have we received a welcome message
from this endpoint? */ from this endpoint? */
@ -94,8 +94,7 @@ private:
std::map<String, shared_ptr<boost::signals2::signal<Callback> > > m_TopicHandlers; std::map<String, shared_ptr<boost::signals2::signal<Callback> > > m_TopicHandlers;
void NewMessageHandler(const MessagePart& message); void MessageThreadProc(const Stream::Ptr& stream);
void ClientClosedHandler(void);
}; };
} }

View File

@ -24,6 +24,7 @@
#include "base/convert.h" #include "base/convert.h"
#include "base/utility.h" #include "base/utility.h"
#include "base/tlsutility.h" #include "base/tlsutility.h"
#include "base/networkstream.h"
#include <boost/tuple/tuple.hpp> #include <boost/tuple/tuple.hpp>
#include <boost/foreach.hpp> #include <boost/foreach.hpp>
@ -129,14 +130,29 @@ void EndpointManager::AddListener(const String& service)
Log(LogInformation, "icinga", s.str()); Log(LogInformation, "icinga", s.str());
TcpSocket::Ptr server = boost::make_shared<TcpSocket>(); TcpSocket::Ptr server = boost::make_shared<TcpSocket>();
server->Bind(service, AF_INET6);
boost::thread thread(boost::bind(&EndpointManager::ListenerThreadProc, this, server));
thread.detach();
m_Servers.insert(server); m_Servers.insert(server);
server->OnNewClient.connect(boost::bind(&EndpointManager::NewClientHandler, }
this, _2, TlsRoleServer));
server->Bind(service, AF_INET6); void EndpointManager::ListenerThreadProc(const Socket::Ptr& server)
{
server->Listen(); server->Listen();
server->Start();
for (;;) {
Socket::Ptr client = server->Accept();
try {
NewClientHandler(client, TlsRoleServer);
} catch (const std::exception& ex) {
std::stringstream message;
message << "Error for new JSON-RPC socket: " << boost::diagnostic_information(ex);
Log(LogInformation, "remoting", message.str());
}
}
} }
/** /**
@ -146,16 +162,23 @@ void EndpointManager::AddListener(const String& service)
* @param service The remote port. * @param service The remote port.
*/ */
void EndpointManager::AddConnection(const String& node, const String& service) { void EndpointManager::AddConnection(const String& node, const String& service) {
ObjectLock olock(this); {
ObjectLock olock(this);
shared_ptr<SSL_CTX> sslContext = m_SSLContext; shared_ptr<SSL_CTX> sslContext = m_SSLContext;
if (!sslContext) if (!sslContext)
BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()")); BOOST_THROW_EXCEPTION(std::logic_error("SSL context is required for AddConnection()"));
}
TcpSocket::Ptr client = boost::make_shared<TcpSocket>(); TcpSocket::Ptr client = boost::make_shared<TcpSocket>();
client->Connect(node, service);
NewClientHandler(client, TlsRoleClient); try {
client->Connect(node, service);
NewClientHandler(client, TlsRoleClient);
} catch (const std::exception& ex) {
Log(LogInformation, "remoting", "Could not connect to " + node + ":" + service + ": " + ex.what());
}
} }
/** /**
@ -165,25 +188,10 @@ void EndpointManager::AddConnection(const String& node, const String& service) {
*/ */
void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role) void EndpointManager::NewClientHandler(const Socket::Ptr& client, TlsRole role)
{ {
TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(client, role, m_SSLContext); NetworkStream::Ptr netStream = boost::make_shared<NetworkStream>(client);
m_PendingClients.insert(tlsStream); TlsStream::Ptr tlsStream = boost::make_shared<TlsStream>(netStream, role, m_SSLContext);
tlsStream->OnConnected.connect(boost::bind(&EndpointManager::ClientConnectedHandler, this, _1)); tlsStream->Handshake();
tlsStream->OnClosed.connect(boost::bind(&EndpointManager::ClientClosedHandler, this, _1));
client->Start();
tlsStream->Start();
}
void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client)
{
TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
JsonRpcConnection::Ptr jclient = boost::make_shared<JsonRpcConnection>(tlsStream);
{
ObjectLock olock(this);
m_PendingClients.erase(tlsStream);
}
shared_ptr<X509> cert = tlsStream->GetPeerCertificate(); shared_ptr<X509> cert = tlsStream->GetPeerCertificate();
String identity = GetCertificateCN(cert); String identity = GetCertificateCN(cert);
@ -195,17 +203,7 @@ void EndpointManager::ClientConnectedHandler(const Stream::Ptr& client)
if (!endpoint) if (!endpoint)
endpoint = Endpoint::MakeEndpoint(identity, true); endpoint = Endpoint::MakeEndpoint(identity, true);
endpoint->SetClient(jclient); endpoint->SetClient(tlsStream);
}
void EndpointManager::ClientClosedHandler(const Stream::Ptr& client)
{
TlsStream::Ptr tlsStream = static_pointer_cast<TlsStream>(client);
{
ObjectLock olock(this);
m_PendingClients.erase(tlsStream);
}
} }
/** /**

View File

@ -81,7 +81,6 @@ private:
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
std::set<TcpSocket::Ptr> m_Servers; std::set<TcpSocket::Ptr> m_Servers;
std::set<TlsStream::Ptr> m_PendingClients;
/** /**
* Information about a pending API request. * Information about a pending API request.
@ -112,8 +111,8 @@ private:
void ReconnectTimerHandler(void); void ReconnectTimerHandler(void);
void NewClientHandler(const Socket::Ptr& client, TlsRole role); void NewClientHandler(const Socket::Ptr& client, TlsRole role);
void ClientConnectedHandler(const Stream::Ptr& client);
void ClientClosedHandler(const Stream::Ptr& client); void ListenerThreadProc(const Socket::Ptr& server);
}; };
} }

View File

@ -17,64 +17,41 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#include "remoting/jsonrpcconnection.h" #include "remoting/jsonrpc.h"
#include "base/netstring.h" #include "base/netstring.h"
#include "base/objectlock.h" #include "base/objectlock.h"
#include "base/logger_fwd.h" #include "base/logger_fwd.h"
#include <boost/exception/diagnostic_information.hpp> #include <boost/exception/diagnostic_information.hpp>
#include <iostream>
using namespace icinga; using namespace icinga;
/**
* Constructor for the JsonRpcConnection class.
*
* @param stream The stream.
*/
JsonRpcConnection::JsonRpcConnection(const Stream::Ptr& stream)
: Connection(stream)
{ }
/** /**
* Sends a message to the connected peer. * Sends a message to the connected peer.
* *
* @param message The message. * @param message The message.
*/ */
void JsonRpcConnection::SendMessage(const MessagePart& message) void JsonRpc::SendMessage(const Stream::Ptr& stream, const MessagePart& message)
{ {
ObjectLock olock(this);
Value value = message.GetDictionary(); Value value = message.GetDictionary();
String json = value.Serialize(); String json = value.Serialize();
//std::cerr << ">> " << json << std::endl; //std::cerr << ">> " << json << std::endl;
NetString::WriteStringToStream(GetStream(), json); NetString::WriteStringToStream(stream, json);
} }
/** MessagePart JsonRpc::ReadMessage(const Stream::Ptr& stream)
* Processes inbound data.
*/
void JsonRpcConnection::ProcessData(void)
{ {
ObjectLock olock(this);
String jsonString; String jsonString;
if (!NetString::ReadStringFromStream(stream, &jsonString))
BOOST_THROW_EXCEPTION(std::runtime_error("ReadStringFromStream signalled EOF."));
while (NetString::ReadStringFromStream(GetStream(), &jsonString)) { //std::cerr << "<< " << jsonString << std::endl;
//std::cerr << "<< " << jsonString << std::endl; Value value = Value::Deserialize(jsonString);
try { if (!value.IsObjectType<Dictionary>()) {
Value value = Value::Deserialize(jsonString); BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
" message must be a dictionary."));
if (!value.IsObjectType<Dictionary>()) {
BOOST_THROW_EXCEPTION(std::invalid_argument("JSON-RPC"
" message must be a dictionary."));
}
MessagePart mp(value);
OnNewMessage(GetSelf(), mp);
} catch (const std::exception& ex) {
Log(LogCritical, "remoting", "Exception"
" while processing message from JSON-RPC client: " +
boost::diagnostic_information(ex));
}
} }
return MessagePart(value);
} }

View File

@ -17,30 +17,31 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#ifndef LIVESTATUSCONNECTION_H #ifndef JSONRPC_H
#define LIVESTATUSCONNECTION_H #define JSONRPC_H
#include "base/connection.h" #include "remoting/i2-remoting.h"
#include "remoting/messagepart.h"
#include "base/stream.h"
using namespace icinga; namespace icinga
namespace livestatus
{ {
class LivestatusConnection : public Connection /**
* A JSON-RPC connection.
*
* @ingroup remoting
*/
class I2_REMOTING_API JsonRpc
{ {
public: public:
typedef shared_ptr<LivestatusConnection> Ptr; static void SendMessage(const Stream::Ptr& stream, const MessagePart& message);
typedef weak_ptr<LivestatusConnection> WeakPtr; static MessagePart ReadMessage(const Stream::Ptr& stream);
LivestatusConnection(const Stream::Ptr& stream); private:
JsonRpc(void);
protected:
std::vector<String> m_Lines;
virtual void ProcessData(void);
}; };
} }
#endif /* LIVESTATUSCONNECTION_H */ #endif /* JSONRPC_H */

View File

@ -39,8 +39,8 @@ class I2_REMOTING_API MessagePart
{ {
public: public:
MessagePart(void); MessagePart(void);
MessagePart(const MessagePart& message);
explicit MessagePart(const Dictionary::Ptr& dictionary); explicit MessagePart(const Dictionary::Ptr& dictionary);
explicit MessagePart(const MessagePart& message);
Dictionary::Ptr GetDictionary(void) const; Dictionary::Ptr GetDictionary(void) const;