icinga2/lib/base/stream.cpp

150 lines
3.2 KiB
C++
Raw Normal View History

/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
2014-05-25 16:23:35 +02:00
#include "base/stream.hpp"
2013-03-15 18:21:29 +01:00
#include <boost/algorithm/string/trim.hpp>
2021-02-02 10:16:04 +01:00
#include <chrono>
2012-03-28 13:24:49 +02:00
using namespace icinga;
void Stream::RegisterDataHandler(const std::function<void(const Stream::Ptr&)>& handler)
{
if (SupportsWaiting())
OnDataAvailable.connect(handler);
else
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
}
bool Stream::SupportsWaiting() const
{
return false;
}
bool Stream::IsDataAvailable() const
{
return false;
}
void Stream::Shutdown()
2015-06-22 11:11:21 +02:00
{
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Shutdown()."));
}
size_t Stream::Peek(void *buffer, size_t count, bool allow_partial)
{
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support Peek()."));
}
void Stream::SignalDataAvailable()
{
OnDataAvailable(this);
{
2021-02-02 10:16:04 +01:00
std::unique_lock<std::mutex> lock(m_Mutex);
m_CV.notify_all();
}
}
2018-02-13 17:29:48 +01:00
bool Stream::WaitForData()
{
if (!SupportsWaiting())
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
2013-03-10 05:10:51 +01:00
2021-02-02 10:16:04 +01:00
std::unique_lock<std::mutex> lock(m_Mutex);
2015-06-22 11:11:21 +02:00
while (!IsDataAvailable() && !IsEof())
2018-02-13 17:29:48 +01:00
m_CV.wait(lock);
return IsDataAvailable() || IsEof();
}
bool Stream::WaitForData(int timeout)
{
2021-02-02 10:16:04 +01:00
namespace ch = std::chrono;
2018-02-13 17:29:48 +01:00
if (!SupportsWaiting())
BOOST_THROW_EXCEPTION(std::runtime_error("Stream does not support waiting."));
if (timeout < 0)
BOOST_THROW_EXCEPTION(std::runtime_error("Timeout can't be negative"));
2021-02-02 10:16:04 +01:00
std::unique_lock<std::mutex> lock(m_Mutex);
2018-02-13 17:29:48 +01:00
2021-02-02 10:16:04 +01:00
return m_CV.wait_for(lock, ch::duration<int>(timeout), [this]() { return IsDataAvailable() || IsEof(); });
}
2013-03-10 05:10:51 +01:00
void Stream::Close()
{
OnDataAvailable.disconnect_all_slots();
/* Force signals2 to remove the slots, see https://stackoverflow.com/questions/2049291/force-deletion-of-slot-in-boostsignals2
* for details. */
OnDataAvailable.connect([](const Stream::Ptr&) { });
}
StreamReadStatus Stream::ReadLine(String *line, StreamReadContext& context, bool may_wait)
{
if (context.Eof)
return StatusEof;
if (context.MustRead) {
if (!context.FillFromStream(this, may_wait)) {
context.Eof = true;
*line = String(context.Buffer, &(context.Buffer[context.Size]));
boost::algorithm::trim_right(*line);
return StatusNewItem;
}
}
for (size_t i = 0; i < context.Size; i++) {
if (context.Buffer[i] == '\n') {
*line = String(context.Buffer, context.Buffer + i);
boost::algorithm::trim_right(*line);
2013-03-10 05:10:51 +01:00
context.DropData(i + 1u);
2013-03-10 05:10:51 +01:00
context.MustRead = !context.Size;
return StatusNewItem;
}
}
context.MustRead = true;
return StatusNeedData;
}
bool StreamReadContext::FillFromStream(const Stream::Ptr& stream, bool may_wait)
{
if (may_wait && stream->SupportsWaiting())
stream->WaitForData();
size_t count = 0;
do {
Buffer = (char *)realloc(Buffer, Size + 4096);
if (!Buffer)
throw std::bad_alloc();
if (stream->IsEof())
break;
size_t rc = stream->Read(Buffer + Size, 4096, true);
Size += rc;
count += rc;
} while (count < 64 * 1024 && stream->IsDataAvailable());
if (count == 0 && stream->IsEof())
return false;
else
return true;
}
void StreamReadContext::DropData(size_t count)
{
2015-06-22 11:11:21 +02:00
ASSERT(count <= Size);
memmove(Buffer, Buffer + count, Size - count);
Size -= count;
}