Implemented the StdioStream and UnixSocket classes.

This commit is contained in:
Gunnar Beutner 2012-11-23 11:02:34 +01:00
parent 1bf945f367
commit 334bfe388a
12 changed files with 288 additions and 29 deletions

View File

@ -41,6 +41,8 @@ libbase_la_SOURCES = \
scripttask.h \ scripttask.h \
socket.cpp \ socket.cpp \
socket.h \ socket.h \
stdiostream.cpp \
stdiostream.h \
stream.cpp \ stream.cpp \
stream.h \ stream.h \
stream_bio.cpp \ stream_bio.cpp \
@ -56,6 +58,8 @@ libbase_la_SOURCES = \
tlsstream.cpp \ tlsstream.cpp \
tlsstream.h \ tlsstream.h \
unix.h \ unix.h \
unixsocket.cpp \
unixsocket.h \
utility.cpp \ utility.cpp \
utility.h \ utility.h \
value.cpp \ value.cpp \

View File

@ -42,6 +42,7 @@
<ClCompile Include="scriptfunction.cpp" /> <ClCompile Include="scriptfunction.cpp" />
<ClCompile Include="scripttask.cpp" /> <ClCompile Include="scripttask.cpp" />
<ClCompile Include="socket.cpp" /> <ClCompile Include="socket.cpp" />
<ClCompile Include="stdiostream.cpp" />
<ClCompile Include="stream.cpp" /> <ClCompile Include="stream.cpp" />
<ClCompile Include="streamlogger.cpp" /> <ClCompile Include="streamlogger.cpp" />
<ClCompile Include="stream_bio.cpp" /> <ClCompile Include="stream_bio.cpp" />
@ -49,6 +50,7 @@
<ClCompile Include="tcpsocket.cpp" /> <ClCompile Include="tcpsocket.cpp" />
<ClCompile Include="timer.cpp" /> <ClCompile Include="timer.cpp" />
<ClCompile Include="tlsstream.cpp" /> <ClCompile Include="tlsstream.cpp" />
<ClCompile Include="unixsocket.cpp" />
<ClCompile Include="utility.cpp" /> <ClCompile Include="utility.cpp" />
<ClCompile Include="value.cpp" /> <ClCompile Include="value.cpp" />
</ItemGroup> </ItemGroup>
@ -61,6 +63,7 @@
<ClInclude Include="dictionary.h" /> <ClInclude Include="dictionary.h" />
<ClInclude Include="event.h" /> <ClInclude Include="event.h" />
<ClInclude Include="fifo.h" /> <ClInclude Include="fifo.h" />
<ClInclude Include="stdiostream.h" />
<ClInclude Include="stream.h" /> <ClInclude Include="stream.h" />
<ClInclude Include="netstring.h" /> <ClInclude Include="netstring.h" />
<ClInclude Include="qstring.h" /> <ClInclude Include="qstring.h" />
@ -80,6 +83,7 @@
<ClInclude Include="timer.h" /> <ClInclude Include="timer.h" />
<ClInclude Include="tlsstream.h" /> <ClInclude Include="tlsstream.h" />
<ClInclude Include="unix.h" /> <ClInclude Include="unix.h" />
<ClInclude Include="unixsocket.h" />
<ClInclude Include="utility.h" /> <ClInclude Include="utility.h" />
<ClInclude Include="value.h" /> <ClInclude Include="value.h" />
<ClInclude Include="win32.h" /> <ClInclude Include="win32.h" />

View File

@ -82,6 +82,12 @@
<ClCompile Include="connection.cpp"> <ClCompile Include="connection.cpp">
<Filter>Quelldateien</Filter> <Filter>Quelldateien</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="unixsocket.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
<ClCompile Include="stdiostream.cpp">
<Filter>Quelldateien</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="application.h"> <ClInclude Include="application.h">
@ -174,6 +180,12 @@
<ClInclude Include="connection.h"> <ClInclude Include="connection.h">
<Filter>Headerdateien</Filter> <Filter>Headerdateien</Filter>
</ClInclude> </ClInclude>
<ClInclude Include="unixsocket.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="stdiostream.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<Filter Include="Quelldateien"> <Filter Include="Quelldateien">

View File

@ -361,14 +361,14 @@ void DynamicObject::DumpObjects(const String& filename)
String tempFilename = filename + ".tmp"; String tempFilename = filename + ".tmp";
ofstream fp; fstream fp;
fp.open(tempFilename.CStr()); fp.open(tempFilename.CStr(), std::ios_base::out);
if (!fp) if (!fp)
throw_exception(runtime_error("Could not open '" + filename + "' file")); throw_exception(runtime_error("Could not open '" + filename + "' file"));
FIFO::Ptr fifo = boost::make_shared<FIFO>(); StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
fifo->Start(); sfp->Start();
DynamicObject::TypeMap::iterator tt; DynamicObject::TypeMap::iterator tt;
for (tt = GetAllObjects().begin(); tt != GetAllObjects().end(); tt++) { for (tt = GetAllObjects().begin(); tt != GetAllObjects().end(); tt++) {
@ -402,22 +402,22 @@ void DynamicObject::DumpObjects(const String& filename)
String json = value.Serialize(); String json = value.Serialize();
/* This is quite ugly, unfortunatelly NetString requires an IOQueue object */ /* This is quite ugly, unfortunatelly NetString requires an IOQueue object */
NetString::WriteStringToStream(fifo, json); NetString::WriteStringToStream(sfp, json);
size_t count; size_t count;
while ((count = fifo->GetAvailableBytes()) > 0) { while ((count = sfp->GetAvailableBytes()) > 0) {
char buffer[1024]; char buffer[1024];
if (count > sizeof(buffer)) if (count > sizeof(buffer))
count = sizeof(buffer); count = sizeof(buffer);
fifo->Read(buffer, count); sfp->Read(buffer, count);
fp.write(buffer, count); fp.write(buffer, count);
} }
} }
} }
fifo->Close(); sfp->Close();
fp.close(); fp.close();
@ -433,23 +433,14 @@ void DynamicObject::RestoreObjects(const String& filename)
{ {
Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'"); Logger::Write(LogInformation, "base", "Restoring program state from file '" + filename + "'");
std::ifstream fp; std::fstream fp;
fp.open(filename.CStr()); fp.open(filename.CStr(), std::ios_base::in);
/* TODO: Fix this horrible mess by implementing a class that provides StdioStream::Ptr sfp = boost::make_shared<StdioStream>(&fp, false);
* IOQueue functionality for files. */ sfp->Start();
FIFO::Ptr fifo = boost::make_shared<FIFO>();
fifo->Start();
while (fp) {
char buffer[1024];
fp.read(buffer, sizeof(buffer));
fifo->Write(buffer, fp.gcount());
}
String message; String message;
while (NetString::ReadStringFromStream(fifo, &message)) { while (NetString::ReadStringFromStream(sfp, &message)) {
Dictionary::Ptr persistentObject = Value::Deserialize(message); Dictionary::Ptr persistentObject = Value::Deserialize(message);
String type = persistentObject->Get("type"); String type = persistentObject->Get("type");
@ -468,7 +459,7 @@ void DynamicObject::RestoreObjects(const String& filename)
} }
} }
fifo->Close(); sfp->Close();
} }
void DynamicObject::DeactivateObjects(void) void DynamicObject::DeactivateObjects(void)

View File

@ -103,8 +103,10 @@ using std::make_pair;
using std::stringstream; using std::stringstream;
using std::istream; using std::istream;
using std::ostream; using std::ostream;
using std::fstream;
using std::ifstream; using std::ifstream;
using std::ofstream; using std::ofstream;
using std::iostream;
using std::exception; using std::exception;
using std::bad_alloc; using std::bad_alloc;
@ -180,8 +182,10 @@ namespace tuples = boost::tuples;
#include "connection.h" #include "connection.h"
#include "netstring.h" #include "netstring.h"
#include "fifo.h" #include "fifo.h"
#include "stdiostream.h"
#include "socket.h" #include "socket.h"
#include "tcpsocket.h" #include "tcpsocket.h"
#include "unixsocket.h"
#include "tlsstream.h" #include "tlsstream.h"
#include "asynctask.h" #include "asynctask.h"
#include "process.h" #include "process.h"

View File

@ -47,7 +47,12 @@ bool NetString::ReadStringFromStream(const Stream::Ptr& stream, String *str)
if (buffer == NULL && buffer_length > 0) if (buffer == NULL && buffer_length > 0)
throw_exception(bad_alloc()); throw_exception(bad_alloc());
stream->Peek(buffer, buffer_length); buffer_length = stream->Peek(buffer, buffer_length);
if (buffer_length < 3) {
free(buffer);
return false;
}
/* no leading zeros allowed */ /* no leading zeros allowed */
if (buffer[0] == '0' && isdigit(buffer[1])) { if (buffer[0] == '0' && isdigit(buffer[1])) {

View File

@ -44,10 +44,10 @@ public:
bool IsConnected(void) const; bool IsConnected(void) const;
size_t GetAvailableBytes(void) const; virtual size_t GetAvailableBytes(void) const;
size_t Read(void *buffer, size_t size); virtual size_t Read(void *buffer, size_t size);
size_t Peek(void *buffer, size_t size); virtual size_t Peek(void *buffer, size_t size);
void Write(const void *buffer, size_t size); virtual void Write(const void *buffer, size_t size);
void Listen(void); void Listen(void);

89
lib/base/stdiostream.cpp Normal file
View File

@ -0,0 +1,89 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
using namespace icinga;
StdioStream::StdioStream(iostream *innerStream, bool ownsStream)
: m_InnerStream(innerStream), m_OwnsStream(ownsStream),
m_ReadAheadBuffer(boost::make_shared<FIFO>())
{
m_ReadAheadBuffer->Start();
}
StdioStream::~StdioStream(void)
{
m_ReadAheadBuffer->Close();
}
void StdioStream::Start(void)
{
SetConnected(true);
Stream::Start();
}
size_t StdioStream::GetAvailableBytes(void) const
{
if (m_InnerStream->eof() && m_ReadAheadBuffer->GetAvailableBytes() == 0)
return 0;
else
return 1024; /* doesn't have to be accurate */
}
size_t StdioStream::Read(void *buffer, size_t size)
{
size_t peek_len, read_len;
peek_len = m_ReadAheadBuffer->GetAvailableBytes();
peek_len = m_ReadAheadBuffer->Read(buffer, peek_len);
m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
read_len = m_InnerStream->gcount();
return peek_len + read_len;
}
size_t StdioStream::Peek(void *buffer, size_t size)
{
size_t peek_len, read_len;
peek_len = m_ReadAheadBuffer->GetAvailableBytes();
peek_len = m_ReadAheadBuffer->Peek(buffer, peek_len);
m_InnerStream->read(static_cast<char *>(buffer) + peek_len, size - peek_len);
read_len = m_InnerStream->gcount();
m_ReadAheadBuffer->Write(static_cast<char *>(buffer) + peek_len, read_len);
return peek_len + read_len;
}
void StdioStream::Write(const void *buffer, size_t size)
{
m_InnerStream->write(static_cast<const char *>(buffer), size);
}
void StdioStream::Close(void)
{
if (m_OwnsStream)
delete *m_InnerStream;
Stream::Close();
}

51
lib/base/stdiostream.h Normal file
View File

@ -0,0 +1,51 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef STDIOSTREAM_H
#define STDIOSTREAM_H
namespace icinga {
class StdioStream : public Stream
{
public:
typedef shared_ptr<StdioStream> Ptr;
typedef weak_ptr<StdioStream> WeakPtr;
StdioStream(iostream *innerStream, bool ownsStream);
~StdioStream(void);
virtual void Start(void);
virtual size_t GetAvailableBytes(void) const;
virtual size_t Read(void *buffer, size_t size);
virtual size_t Peek(void *buffer, size_t size);
virtual void Write(const void *buffer, size_t size);
virtual void Close(void);
private:
iostream *m_InnerStream;
bool m_OwnsStream;
FIFO::Ptr m_ReadAheadBuffer;
};
}
#endif /* STDIOSTREAM_H */

View File

@ -51,7 +51,7 @@ void StreamLogger::OpenFile(const String& filename)
ofstream *stream = new ofstream(); ofstream *stream = new ofstream();
try { try {
stream->open(filename.CStr(), ofstream::out | ofstream::trunc); stream->open(filename.CStr(), fstream::out | fstream::trunc);
if (!stream->good()) if (!stream->good())
throw_exception(runtime_error("Could not open logfile '" + filename + "'")); throw_exception(runtime_error("Could not open logfile '" + filename + "'"));

58
lib/base/unixsocket.cpp Normal file
View File

@ -0,0 +1,58 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "i2-base.h"
#ifndef _WIN32
using namespace icinga;
UnixSocket::UnixSocket(void)
{
int fd = socket(AF_UNIX, SOCK_STREAM, PF_UNIX);
if (fd < 0)
throw_exception(PosixException("socket() failed", errno));
SetFD(fd);
}
void UnixSocket::Bind(const String& path)
{
unlink(path.CStr());
sockaddr_un sun = {};
sun.sun_family = AF_UNIX;
strncpy(sun.sun_path, path.CStr(), sizeof(sun.sun_path));
sun.sun_path[sizeof(sun.sun_path) - 1] = '\0';
if (bind(GetFD(), (sockaddr *)&sun, SUN_LEN(sun)) < 0)
throw_exception(PosixException("bind() failed", errno);
}
void UnixSocket::Connect(const String& path)
{
sockaddr_un sun = {};
sun.sun_family = AF_UNIX;
strncpy(sun.sun_path, path.CStr(), sizeof(sun.sun_path));
sun.sun_path[sizeof(sun.sun_path) - 1] = '\0';
if (connect(GetFD(), (sockaddr *)&sun, SUN_LEN(sun)) < 0 && errno != EINPROGRESS)
throw_exception(PosixException("connect() failed", errno);
}
#endif /* _WIN32 */

41
lib/base/unixsocket.h Normal file
View File

@ -0,0 +1,41 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef UNIXSOCKET_H
#define UNIXSOCKET_H
#ifndef _WIN32
namespace icinga
{
class UnixSocket : public Socket
{
public:
typedef shared_ptr<UnixSocket> Ptr;
typedef weak_ptr<UnixSocket> WeakPtr;
void Bind(const String& path);
void Connect(const String& path);
};
}
#endif /* _WIN32 */
#endif /* UNIXSOCKET_H */