Cleaned up TcpClient interface.

This commit is contained in:
Gunnar Beutner 2012-07-16 00:02:31 +02:00
parent 07b30e19a9
commit 802fc15969
11 changed files with 192 additions and 95 deletions

View File

@ -21,6 +21,7 @@ libbase_la_SOURCES = \
fifo.cpp \
fifo.h \
i2-base.h \
ioqueue.h \
logger.cpp \
logger.h \
object.cpp \

View File

@ -69,7 +69,7 @@ public:
try {
Run();
} catch (const exception& ex) {
} catch (const exception&) {
FinishException(boost::current_exception());
}
}

View File

@ -51,6 +51,7 @@
<ClInclude Include="configobject.h" />
<ClInclude Include="dictionary.h" />
<ClInclude Include="event.h" />
<ClInclude Include="ioqueue.h" />
<ClInclude Include="scriptfunction.h" />
<ClInclude Include="scripttask.h" />
<ClInclude Include="logger.h" />

View File

@ -180,6 +180,9 @@
<ClInclude Include="scriptfunction.h">
<Filter>Headerdateien</Filter>
</ClInclude>
<ClInclude Include="ioqueue.h">
<Filter>Headerdateien</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<Filter Include="Quelldateien">

View File

@ -93,11 +93,9 @@ void FIFO::Optimize(void)
}
/**
* Returns the number of bytes that are contained in the FIFO.
*
* @returns The number of bytes.
* Implements IOQueue::GetAvailableBytes().
*/
size_t FIFO::GetSize(void) const
size_t FIFO::GetAvailableBytes(void) const
{
return m_DataSize;
}
@ -107,32 +105,33 @@ size_t FIFO::GetSize(void) const
*
* @returns Pointer to the read buffer.
*/
const void *FIFO::GetReadBuffer(void) const
/*const void *FIFO::GetReadBuffer(void) const
{
return m_Buffer + m_Offset;
}
}*/
/**
* Reads data from the FIFO and places it in the specified buffer.
*
* @param buffer The buffer where the data should be placed (can be NULL if
* the reader is not interested in the data).
* @param count The number of bytes to read.
* @returns The number of bytes read which may be less than what was requested.
* Implements IOQueue::Peek.
*/
size_t FIFO::Read(void *buffer, size_t count)
void FIFO::Peek(void *buffer, size_t count)
{
count = (count <= m_DataSize) ? count : m_DataSize;
assert(m_DataSize >= count);
if (buffer != NULL)
memcpy(buffer, m_Buffer + m_Offset, count);
}
/**
* Implements IOQueue::Read.
*/
void FIFO::Read(void *buffer, size_t count)
{
Peek(buffer, count);
m_DataSize -= count;
m_Offset += count;
Optimize();
return count;
}
/**
@ -142,31 +141,20 @@ size_t FIFO::Read(void *buffer, size_t count)
* contains the actual size of the available buffer which can
* be larger than the requested size.
*/
void *FIFO::GetWriteBuffer(size_t *count)
/*void *FIFO::GetWriteBuffer(size_t *count)
{
ResizeBuffer(m_Offset + m_DataSize + *count);
*count = m_AllocSize - m_Offset - m_DataSize;
return m_Buffer + m_Offset + m_DataSize;
}
}*/
/**
* Writes data to the FIFO.
*
* @param buffer The data that is to be written (can be NULL if the writer has
* already filled the write buffer, e.g. via GetWriteBuffer()).
* @param count The number of bytes to write.
* @returns The number of bytes written
* Implements IOQueue::Write.
*/
size_t FIFO::Write(const void *buffer, size_t count)
void FIFO::Write(const void *buffer, size_t count)
{
if (buffer != NULL) {
size_t bufferSize = count;
void *target_buffer = GetWriteBuffer(&bufferSize);
memcpy(target_buffer, buffer, count);
}
ResizeBuffer(m_Offset + m_DataSize + count);
memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count;
return count;
}

View File

@ -28,7 +28,7 @@ namespace icinga
*
* @ingroup base
*/
class I2_BASE_API FIFO : public Object
class I2_BASE_API FIFO : public Object, public IOQueue
{
public:
static const size_t BlockSize = 16 * 1024;
@ -39,13 +39,13 @@ public:
FIFO(void);
~FIFO(void);
size_t GetSize(void) const;
/*const void *GetReadBuffer(void) const;
void *GetWriteBuffer(size_t *count);*/
const void *GetReadBuffer(void) const;
void *GetWriteBuffer(size_t *count);
size_t Read(void *buffer, size_t count);
size_t Write(const void *buffer, size_t count);
virtual size_t GetAvailableBytes(void) const;
virtual void Peek(void *buffer, size_t count);
virtual void Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
private:
char *m_Buffer;

View File

@ -161,6 +161,7 @@ using boost::system_time;
#include "dictionary.h"
#include "ringbuffer.h"
#include "timer.h"
#include "ioqueue.h"
#include "fifo.h"
#include "socket.h"
#include "tcpsocket.h"

54
base/ioqueue.h Normal file
View File

@ -0,0 +1,54 @@
#ifndef IOQUEUE_H
#define IOQUEUE_H
namespace icinga
{
/**
* An I/O queue.
*/
class IOQueue
{
public:
/**
* Retrieves the number of bytes available for reading.
*
* @returns The number of available bytes.
*/
virtual size_t GetAvailableBytes(void) const = 0;
/**
* Reads data from the queue without advancing the read pointer. Trying
* to read more data than is available in the queue is a programming error.
* Use GetBytesAvailable() to check how much data is available.
*
* @buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
*/
virtual void Peek(void *buffer, size_t count) = 0;
/**
* Reads data from the queue. Trying to read more data than is
* available in the queue is a programming error. Use GetBytesAvailable()
* to check how much data is available.
*
* @param buffer The buffer where data should be stored. May be NULL if you're
* not actually interested in the data.
* @param count The number of bytes to read from the queue.
*/
virtual void Read(void *buffer, size_t count) = 0;
/**
* Writes data to the queue.
*
* @param buffer The data that is to be written.
* @param count The number of bytes to write.
* @returns The number of bytes written
*/
virtual void Write(const void *buffer, size_t count) = 0;
};
}
#endif /* IOQUEUE_H */

View File

@ -103,46 +103,79 @@ void TcpClient::Connect(const string& node, const string& service)
"Could not create a suitable socket."));
}
/**
* Retrieves the send queue for the socket.
*
* @returns The send queue.
*/
FIFO::Ptr TcpClient::GetSendQueue(void)
{
return m_SendQueue;
}
void TcpClient::HandleWritable(void)
{
int rc;
char data[1024];
size_t count;
rc = send(GetFD(), (const char *)m_SendQueue->GetReadBuffer(), m_SendQueue->GetSize(), 0);
for (;;) {
count = m_SendQueue->GetAvailableBytes();
if (rc <= 0) {
HandleSocketError(SocketException("send() failed", GetError()));
return;
if (count == 0)
break;
if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
rc = send(GetFD(), (const char *)data, count, 0);
if (rc <= 0) {
HandleSocketError(SocketException("send() failed", GetError()));
return;
}
m_SendQueue->Read(NULL, rc);
}
m_SendQueue->Read(NULL, rc);
}
/**
* Retrieves the recv queue for the socket.
*
* @returns The recv queue.
* Implements IOQueue::GetAvailableBytes.
*/
FIFO::Ptr TcpClient::GetRecvQueue(void)
size_t TcpClient::GetAvailableBytes(void) const
{
return m_RecvQueue;
mutex::scoped_lock lock(GetMutex());
return m_RecvQueue->GetAvailableBytes();
}
/**
* Implements IOQueue::Peek.
*/
void TcpClient::Peek(void *buffer, size_t count)
{
mutex::scoped_lock lock(GetMutex());
m_RecvQueue->Peek(buffer, count);
}
/**
* Implements IOQueue::Read.
*/
void TcpClient::Read(void *buffer, size_t count)
{
mutex::scoped_lock lock(GetMutex());
m_RecvQueue->Read(buffer, count);
}
/**
* Implements IOQueue::Write.
*/
void TcpClient::Write(const void *buffer, size_t count)
{
mutex::scoped_lock lock(GetMutex());
m_SendQueue->Write(buffer, count);
}
void TcpClient::HandleReadable(void)
{
for (;;) {
size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
int rc = recv(GetFD(), buffer, bufferSize, 0);
char data[1024];
int rc = recv(GetFD(), data, sizeof(data), 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
@ -156,7 +189,7 @@ void TcpClient::HandleReadable(void)
return;
}
m_RecvQueue->Write(NULL, rc);
m_RecvQueue->Write(data, rc);
}
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
@ -179,7 +212,7 @@ bool TcpClient::WantsToRead(void) const
*/
bool TcpClient::WantsToWrite(void) const
{
return (m_SendQueue->GetSize() > 0);
return (m_SendQueue->GetAvailableBytes() > 0);
}
/**

View File

@ -41,7 +41,7 @@ enum TcpClientRole
*
* @ingroup base
*/
class I2_BASE_API TcpClient : public TcpSocket
class I2_BASE_API TcpClient : public TcpSocket, public IOQueue
{
public:
typedef shared_ptr<TcpClient> Ptr;
@ -53,11 +53,13 @@ public:
void Connect(const string& node, const string& service);
FIFO::Ptr GetSendQueue(void);
FIFO::Ptr GetRecvQueue(void);
boost::signal<void (const TcpClient::Ptr&)> OnDataAvailable;
virtual size_t GetAvailableBytes(void) const;
virtual void Peek(void *buffer, size_t count);
virtual void Read(void *buffer, size_t count);
virtual void Write(const void *buffer, size_t count);
protected:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
@ -65,11 +67,11 @@ protected:
virtual void HandleReadable(void);
virtual void HandleWritable(void);
private:
TcpClientRole m_Role;
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
private:
TcpClientRole m_Role;
};
/**

View File

@ -117,9 +117,8 @@ void TlsClient::HandleReadable(void)
result = 0;
for (;;) {
size_t bufferSize = FIFO::BlockSize / 2;
char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
char data[1024];
int rc = SSL_read(m_SSL.get(), data, sizeof(data));
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
@ -138,7 +137,7 @@ void TlsClient::HandleReadable(void)
}
}
GetRecvQueue()->Write(NULL, rc);
m_RecvQueue->Write(data, rc);
}
post_event:
@ -153,26 +152,41 @@ void TlsClient::HandleWritable(void)
m_BlockRead = false;
m_BlockWrite = false;
int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
char data[1024];
size_t count;
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ:
m_BlockWrite = true;
/* fall through */
case SSL_ERROR_WANT_WRITE:
return;
case SSL_ERROR_ZERO_RETURN:
CloseInternal(false);
return;
default:
HandleSocketError(OpenSSLException(
"SSL_write failed", ERR_get_error()));
return;
for (;;) {
count = m_SendQueue->GetAvailableBytes();
if (count == 0)
break;
if (count > sizeof(data))
count = sizeof(data);
m_SendQueue->Peek(data, count);
int rc = SSL_write(m_SSL.get(), (const char *)data, count);
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
case SSL_ERROR_WANT_READ:
m_BlockWrite = true;
/* fall through */
case SSL_ERROR_WANT_WRITE:
return;
case SSL_ERROR_ZERO_RETURN:
CloseInternal(false);
return;
default:
HandleSocketError(OpenSSLException(
"SSL_write failed", ERR_get_error()));
return;
}
}
}
GetSendQueue()->Read(NULL, rc);
m_SendQueue->Read(NULL, rc);
}
}
/**