diff --git a/base/Makefile.am b/base/Makefile.am
index 824f5a61d..2e0a786b5 100644
--- a/base/Makefile.am
+++ b/base/Makefile.am
@@ -21,6 +21,7 @@ libbase_la_SOURCES = \
fifo.cpp \
fifo.h \
i2-base.h \
+ ioqueue.h \
logger.cpp \
logger.h \
object.cpp \
diff --git a/base/asynctask.h b/base/asynctask.h
index b674f94cf..47b57dabf 100644
--- a/base/asynctask.h
+++ b/base/asynctask.h
@@ -69,7 +69,7 @@ public:
try {
Run();
- } catch (const exception& ex) {
+ } catch (const exception&) {
FinishException(boost::current_exception());
}
}
diff --git a/base/base.vcxproj b/base/base.vcxproj
index 216a75ff4..72cfeb18c 100644
--- a/base/base.vcxproj
+++ b/base/base.vcxproj
@@ -51,6 +51,7 @@
+
diff --git a/base/base.vcxproj.filters b/base/base.vcxproj.filters
index 4d4526411..fa329bab1 100644
--- a/base/base.vcxproj.filters
+++ b/base/base.vcxproj.filters
@@ -180,6 +180,9 @@
Headerdateien
+
+ Headerdateien
+
diff --git a/base/fifo.cpp b/base/fifo.cpp
index 1a5caa65a..e71c2f4e1 100644
--- a/base/fifo.cpp
+++ b/base/fifo.cpp
@@ -93,11 +93,9 @@ void FIFO::Optimize(void)
}
/**
- * Returns the number of bytes that are contained in the FIFO.
- *
- * @returns The number of bytes.
+ * Implements IOQueue::GetAvailableBytes().
*/
-size_t FIFO::GetSize(void) const
+size_t FIFO::GetAvailableBytes(void) const
{
return m_DataSize;
}
@@ -107,32 +105,33 @@ size_t FIFO::GetSize(void) const
*
* @returns Pointer to the read buffer.
*/
-const void *FIFO::GetReadBuffer(void) const
+/*const void *FIFO::GetReadBuffer(void) const
{
return m_Buffer + m_Offset;
-}
+}*/
/**
- * Reads data from the FIFO and places it in the specified buffer.
- *
- * @param buffer The buffer where the data should be placed (can be NULL if
- * the reader is not interested in the data).
- * @param count The number of bytes to read.
- * @returns The number of bytes read which may be less than what was requested.
+ * Implements IOQueue::Peek.
*/
-size_t FIFO::Read(void *buffer, size_t count)
+void FIFO::Peek(void *buffer, size_t count)
{
- count = (count <= m_DataSize) ? count : m_DataSize;
+ assert(m_DataSize >= count);
if (buffer != NULL)
memcpy(buffer, m_Buffer + m_Offset, count);
+}
+
+/**
+ * Implements IOQueue::Read.
+ */
+void FIFO::Read(void *buffer, size_t count)
+{
+ Peek(buffer, count);
m_DataSize -= count;
m_Offset += count;
Optimize();
-
- return count;
}
/**
@@ -142,31 +141,20 @@ size_t FIFO::Read(void *buffer, size_t count)
* contains the actual size of the available buffer which can
* be larger than the requested size.
*/
-void *FIFO::GetWriteBuffer(size_t *count)
+/*void *FIFO::GetWriteBuffer(size_t *count)
{
ResizeBuffer(m_Offset + m_DataSize + *count);
*count = m_AllocSize - m_Offset - m_DataSize;
return m_Buffer + m_Offset + m_DataSize;
-}
+}*/
/**
- * Writes data to the FIFO.
- *
- * @param buffer The data that is to be written (can be NULL if the writer has
- * already filled the write buffer, e.g. via GetWriteBuffer()).
- * @param count The number of bytes to write.
- * @returns The number of bytes written
+ * Implements IOQueue::Write.
*/
-size_t FIFO::Write(const void *buffer, size_t count)
+void FIFO::Write(const void *buffer, size_t count)
{
- if (buffer != NULL) {
- size_t bufferSize = count;
- void *target_buffer = GetWriteBuffer(&bufferSize);
- memcpy(target_buffer, buffer, count);
- }
-
+ ResizeBuffer(m_Offset + m_DataSize + count);
+ memcpy(m_Buffer + m_Offset + m_DataSize, buffer, count);
m_DataSize += count;
-
- return count;
}
diff --git a/base/fifo.h b/base/fifo.h
index 485eaa2c1..f472833e9 100644
--- a/base/fifo.h
+++ b/base/fifo.h
@@ -28,7 +28,7 @@ namespace icinga
*
* @ingroup base
*/
-class I2_BASE_API FIFO : public Object
+class I2_BASE_API FIFO : public Object, public IOQueue
{
public:
static const size_t BlockSize = 16 * 1024;
@@ -39,13 +39,13 @@ public:
FIFO(void);
~FIFO(void);
- size_t GetSize(void) const;
+ /*const void *GetReadBuffer(void) const;
+ void *GetWriteBuffer(size_t *count);*/
- const void *GetReadBuffer(void) const;
- void *GetWriteBuffer(size_t *count);
-
- size_t Read(void *buffer, size_t count);
- size_t Write(const void *buffer, size_t count);
+ virtual size_t GetAvailableBytes(void) const;
+ virtual void Peek(void *buffer, size_t count);
+ virtual void Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
private:
char *m_Buffer;
diff --git a/base/i2-base.h b/base/i2-base.h
index 0d46d028c..e953777cf 100644
--- a/base/i2-base.h
+++ b/base/i2-base.h
@@ -161,6 +161,7 @@ using boost::system_time;
#include "dictionary.h"
#include "ringbuffer.h"
#include "timer.h"
+#include "ioqueue.h"
#include "fifo.h"
#include "socket.h"
#include "tcpsocket.h"
diff --git a/base/ioqueue.h b/base/ioqueue.h
new file mode 100644
index 000000000..11fd03035
--- /dev/null
+++ b/base/ioqueue.h
@@ -0,0 +1,54 @@
+#ifndef IOQUEUE_H
+#define IOQUEUE_H
+
+namespace icinga
+{
+
+/**
+ * An I/O queue.
+ */
+class IOQueue
+{
+public:
+ /**
+ * Retrieves the number of bytes available for reading.
+ *
+ * @returns The number of available bytes.
+ */
+ virtual size_t GetAvailableBytes(void) const = 0;
+
+ /**
+ * Reads data from the queue without advancing the read pointer. Trying
+ * to read more data than is available in the queue is a programming error.
+ * Use GetBytesAvailable() to check how much data is available.
+ *
+ * @buffer The buffer where data should be stored. May be NULL if you're
+ * not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ */
+ virtual void Peek(void *buffer, size_t count) = 0;
+
+ /**
+ * Reads data from the queue. Trying to read more data than is
+ * available in the queue is a programming error. Use GetBytesAvailable()
+ * to check how much data is available.
+ *
+ * @param buffer The buffer where data should be stored. May be NULL if you're
+ * not actually interested in the data.
+ * @param count The number of bytes to read from the queue.
+ */
+ virtual void Read(void *buffer, size_t count) = 0;
+
+ /**
+ * Writes data to the queue.
+ *
+ * @param buffer The data that is to be written.
+ * @param count The number of bytes to write.
+ * @returns The number of bytes written
+ */
+ virtual void Write(const void *buffer, size_t count) = 0;
+};
+
+}
+
+#endif /* IOQUEUE_H */
\ No newline at end of file
diff --git a/base/tcpclient.cpp b/base/tcpclient.cpp
index 3f95783e3..fcfdb233d 100644
--- a/base/tcpclient.cpp
+++ b/base/tcpclient.cpp
@@ -103,46 +103,79 @@ void TcpClient::Connect(const string& node, const string& service)
"Could not create a suitable socket."));
}
-/**
- * Retrieves the send queue for the socket.
- *
- * @returns The send queue.
- */
-FIFO::Ptr TcpClient::GetSendQueue(void)
-{
- return m_SendQueue;
-}
-
void TcpClient::HandleWritable(void)
{
int rc;
+ char data[1024];
+ size_t count;
- rc = send(GetFD(), (const char *)m_SendQueue->GetReadBuffer(), m_SendQueue->GetSize(), 0);
+ for (;;) {
+ count = m_SendQueue->GetAvailableBytes();
- if (rc <= 0) {
- HandleSocketError(SocketException("send() failed", GetError()));
- return;
+ if (count == 0)
+ break;
+
+ if (count > sizeof(data))
+ count = sizeof(data);
+
+ m_SendQueue->Peek(data, count);
+
+ rc = send(GetFD(), (const char *)data, count, 0);
+
+ if (rc <= 0) {
+ HandleSocketError(SocketException("send() failed", GetError()));
+ return;
+ }
+
+ m_SendQueue->Read(NULL, rc);
}
-
- m_SendQueue->Read(NULL, rc);
}
/**
- * Retrieves the recv queue for the socket.
- *
- * @returns The recv queue.
+ * Implements IOQueue::GetAvailableBytes.
*/
-FIFO::Ptr TcpClient::GetRecvQueue(void)
+size_t TcpClient::GetAvailableBytes(void) const
{
- return m_RecvQueue;
+ mutex::scoped_lock lock(GetMutex());
+
+ return m_RecvQueue->GetAvailableBytes();
+}
+
+/**
+ * Implements IOQueue::Peek.
+ */
+void TcpClient::Peek(void *buffer, size_t count)
+{
+ mutex::scoped_lock lock(GetMutex());
+
+ m_RecvQueue->Peek(buffer, count);
+}
+
+/**
+ * Implements IOQueue::Read.
+ */
+void TcpClient::Read(void *buffer, size_t count)
+{
+ mutex::scoped_lock lock(GetMutex());
+
+ m_RecvQueue->Read(buffer, count);
+}
+
+/**
+ * Implements IOQueue::Write.
+ */
+void TcpClient::Write(const void *buffer, size_t count)
+{
+ mutex::scoped_lock lock(GetMutex());
+
+ m_SendQueue->Write(buffer, count);
}
void TcpClient::HandleReadable(void)
{
for (;;) {
- size_t bufferSize = FIFO::BlockSize / 2;
- char *buffer = (char *)m_RecvQueue->GetWriteBuffer(&bufferSize);
- int rc = recv(GetFD(), buffer, bufferSize, 0);
+ char data[1024];
+ int rc = recv(GetFD(), data, sizeof(data), 0);
#ifdef _WIN32
if (rc < 0 && WSAGetLastError() == WSAEWOULDBLOCK)
@@ -156,7 +189,7 @@ void TcpClient::HandleReadable(void)
return;
}
- m_RecvQueue->Write(NULL, rc);
+ m_RecvQueue->Write(data, rc);
}
Event::Post(boost::bind(boost::ref(OnDataAvailable), GetSelf()));
@@ -179,7 +212,7 @@ bool TcpClient::WantsToRead(void) const
*/
bool TcpClient::WantsToWrite(void) const
{
- return (m_SendQueue->GetSize() > 0);
+ return (m_SendQueue->GetAvailableBytes() > 0);
}
/**
diff --git a/base/tcpclient.h b/base/tcpclient.h
index 479d61734..32e918d25 100644
--- a/base/tcpclient.h
+++ b/base/tcpclient.h
@@ -41,7 +41,7 @@ enum TcpClientRole
*
* @ingroup base
*/
-class I2_BASE_API TcpClient : public TcpSocket
+class I2_BASE_API TcpClient : public TcpSocket, public IOQueue
{
public:
typedef shared_ptr Ptr;
@@ -53,11 +53,13 @@ public:
void Connect(const string& node, const string& service);
- FIFO::Ptr GetSendQueue(void);
- FIFO::Ptr GetRecvQueue(void);
-
boost::signal OnDataAvailable;
+ virtual size_t GetAvailableBytes(void) const;
+ virtual void Peek(void *buffer, size_t count);
+ virtual void Read(void *buffer, size_t count);
+ virtual void Write(const void *buffer, size_t count);
+
protected:
virtual bool WantsToRead(void) const;
virtual bool WantsToWrite(void) const;
@@ -65,11 +67,11 @@ protected:
virtual void HandleReadable(void);
virtual void HandleWritable(void);
-private:
- TcpClientRole m_Role;
-
FIFO::Ptr m_SendQueue;
FIFO::Ptr m_RecvQueue;
+
+private:
+ TcpClientRole m_Role;
};
/**
diff --git a/base/tlsclient.cpp b/base/tlsclient.cpp
index 7ddb76ce0..593bfed85 100644
--- a/base/tlsclient.cpp
+++ b/base/tlsclient.cpp
@@ -117,9 +117,8 @@ void TlsClient::HandleReadable(void)
result = 0;
for (;;) {
- size_t bufferSize = FIFO::BlockSize / 2;
- char *buffer = (char *)GetRecvQueue()->GetWriteBuffer(&bufferSize);
- int rc = SSL_read(m_SSL.get(), buffer, bufferSize);
+ char data[1024];
+ int rc = SSL_read(m_SSL.get(), data, sizeof(data));
if (rc <= 0) {
switch (SSL_get_error(m_SSL.get(), rc)) {
@@ -138,7 +137,7 @@ void TlsClient::HandleReadable(void)
}
}
- GetRecvQueue()->Write(NULL, rc);
+ m_RecvQueue->Write(data, rc);
}
post_event:
@@ -153,26 +152,41 @@ void TlsClient::HandleWritable(void)
m_BlockRead = false;
m_BlockWrite = false;
- int rc = SSL_write(m_SSL.get(), (const char *)GetSendQueue()->GetReadBuffer(), GetSendQueue()->GetSize());
+ char data[1024];
+ size_t count;
- if (rc <= 0) {
- switch (SSL_get_error(m_SSL.get(), rc)) {
- case SSL_ERROR_WANT_READ:
- m_BlockWrite = true;
- /* fall through */
- case SSL_ERROR_WANT_WRITE:
- return;
- case SSL_ERROR_ZERO_RETURN:
- CloseInternal(false);
- return;
- default:
- HandleSocketError(OpenSSLException(
- "SSL_write failed", ERR_get_error()));
- return;
+ for (;;) {
+ count = m_SendQueue->GetAvailableBytes();
+
+ if (count == 0)
+ break;
+
+ if (count > sizeof(data))
+ count = sizeof(data);
+
+ m_SendQueue->Peek(data, count);
+
+ int rc = SSL_write(m_SSL.get(), (const char *)data, count);
+
+ if (rc <= 0) {
+ switch (SSL_get_error(m_SSL.get(), rc)) {
+ case SSL_ERROR_WANT_READ:
+ m_BlockWrite = true;
+ /* fall through */
+ case SSL_ERROR_WANT_WRITE:
+ return;
+ case SSL_ERROR_ZERO_RETURN:
+ CloseInternal(false);
+ return;
+ default:
+ HandleSocketError(OpenSSLException(
+ "SSL_write failed", ERR_get_error()));
+ return;
+ }
}
- }
- GetSendQueue()->Read(NULL, rc);
+ m_SendQueue->Read(NULL, rc);
+ }
}
/**