Merge pull request #7196 from Icinga/feature/network-cleanup

Cleanup old code (HTTP, Cluster)
This commit is contained in:
Michael Friedrich 2019-05-29 14:50:40 +02:00 committed by GitHub
commit 99bb7fa99c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
50 changed files with 289 additions and 2564 deletions

View File

@ -61,7 +61,6 @@ set(base_SOURCES
serializer.cpp serializer.hpp
singleton.hpp
socket.cpp socket.hpp
socketevents.cpp socketevents-epoll.cpp socketevents-poll.cpp socketevents.hpp
stacktrace.cpp stacktrace.hpp
statsfunction.hpp
stdiostream.cpp stdiostream.hpp

View File

@ -11,7 +11,7 @@ namespace icinga
{
/**
* A network stream.
* A network stream. DEPRECATED - Use Boost ASIO instead.
*
* @ingroup base
*/

View File

@ -1,189 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/socketevents.hpp"
#include "base/exception.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include <boost/thread/once.hpp>
#include <map>
#ifdef __linux__
# include <sys/epoll.h>
using namespace icinga;
void SocketEventEngineEpoll::InitializeThread(int tid)
{
m_PollFDs[tid] = epoll_create(128);
Utility::SetCloExec(m_PollFDs[tid]);
SocketEventDescriptor sed;
m_Sockets[tid][m_EventFDs[tid][0]] = sed;
m_FDChanged[tid] = true;
epoll_event event;
memset(&event, 0, sizeof(event));
event.data.fd = m_EventFDs[tid][0];
event.events = EPOLLIN;
epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, m_EventFDs[tid][0], &event);
}
int SocketEventEngineEpoll::PollToEpoll(int events)
{
int result = 0;
if (events & POLLIN)
result |= EPOLLIN;
if (events & POLLOUT)
result |= EPOLLOUT;
return events;
}
int SocketEventEngineEpoll::EpollToPoll(int events)
{
int result = 0;
if (events & EPOLLIN)
result |= POLLIN;
if (events & EPOLLOUT)
result |= POLLOUT;
return events;
}
void SocketEventEngineEpoll::ThreadProc(int tid)
{
Utility::SetThreadName("SocketIO");
for (;;) {
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
if (m_FDChanged[tid]) {
m_FDChanged[tid] = false;
m_CV[tid].notify_all();
}
}
epoll_event pevents[64];
int ready = epoll_wait(m_PollFDs[tid], pevents, sizeof(pevents) / sizeof(pevents[0]), -1);
std::vector<EventDescription> events;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
if (m_FDChanged[tid]) {
m_FDChanged[tid] = false;
continue;
}
for (int i = 0; i < ready; i++) {
if (pevents[i].data.fd == m_EventFDs[tid][0]) {
char buffer[512];
if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
Log(LogCritical, "SocketEvents", "Read from event FD failed.");
continue;
}
if ((pevents[i].events & (EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLERR)) == 0)
continue;
EventDescription event;
event.REvents = SocketEventEngineEpoll::EpollToPoll(pevents[i].events);
event.Descriptor = m_Sockets[tid][pevents[i].data.fd];
events.emplace_back(std::move(event));
}
}
for (const EventDescription& event : events) {
try {
event.Descriptor.EventInterface->OnEvent(event.REvents);
} catch (const std::exception& ex) {
Log(LogCritical, "SocketEvents")
<< "Exception thrown in socket I/O handler:\n"
<< DiagnosticInformation(ex);
} catch (...) {
Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
}
}
}
}
void SocketEventEngineEpoll::Register(SocketEvents *se)
{
int tid = se->m_ID % SOCKET_IOTHREADS;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
VERIFY(se->m_FD != INVALID_SOCKET);
SocketEventDescriptor desc;
desc.EventInterface = se;
VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end());
m_Sockets[tid][se->m_FD] = desc;
epoll_event event;
memset(&event, 0, sizeof(event));
event.data.fd = se->m_FD;
event.events = 0;
epoll_ctl(m_PollFDs[tid], EPOLL_CTL_ADD, se->m_FD, &event);
se->m_Events = true;
}
}
void SocketEventEngineEpoll::Unregister(SocketEvents *se)
{
int tid = se->m_ID % SOCKET_IOTHREADS;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
if (se->m_FD == INVALID_SOCKET)
return;
m_Sockets[tid].erase(se->m_FD);
m_FDChanged[tid] = true;
epoll_ctl(m_PollFDs[tid], EPOLL_CTL_DEL, se->m_FD, nullptr);
se->m_FD = INVALID_SOCKET;
se->m_Events = false;
}
WakeUpThread(tid, true);
}
void SocketEventEngineEpoll::ChangeEvents(SocketEvents *se, int events)
{
if (se->m_FD == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
int tid = se->m_ID % SOCKET_IOTHREADS;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
auto it = m_Sockets[tid].find(se->m_FD);
if (it == m_Sockets[tid].end())
return;
epoll_event event;
memset(&event, 0, sizeof(event));
event.data.fd = se->m_FD;
event.events = SocketEventEngineEpoll::PollToEpoll(events);
epoll_ctl(m_PollFDs[tid], EPOLL_CTL_MOD, se->m_FD, &event);
}
}
#endif /* __linux__ */

View File

@ -1,190 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/socketevents.hpp"
#include "base/exception.hpp"
#include "base/logger.hpp"
#include "base/utility.hpp"
#include <boost/thread/once.hpp>
#include <map>
using namespace icinga;
void SocketEventEnginePoll::InitializeThread(int tid)
{
SocketEventDescriptor sed;
sed.Events = POLLIN;
m_Sockets[tid][m_EventFDs[tid][0]] = sed;
m_FDChanged[tid] = true;
}
void SocketEventEnginePoll::ThreadProc(int tid)
{
Utility::SetThreadName("SocketIO");
std::vector<pollfd> pfds;
std::vector<SocketEventDescriptor> descriptors;
for (;;) {
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
if (m_FDChanged[tid]) {
pfds.resize(m_Sockets[tid].size());
descriptors.resize(m_Sockets[tid].size());
int i = 0;
typedef std::map<SOCKET, SocketEventDescriptor>::value_type kv_pair;
for (const kv_pair& desc : m_Sockets[tid]) {
if (desc.second.Events == 0)
continue;
int events = desc.second.Events;
if (desc.second.EventInterface) {
desc.second.EventInterface->m_EnginePrivate = &pfds[i];
if (!desc.second.EventInterface->m_Events)
events = 0;
}
pfds[i].fd = desc.first;
pfds[i].events = events;
descriptors[i] = desc.second;
i++;
}
pfds.resize(i);
m_FDChanged[tid] = false;
m_CV[tid].notify_all();
}
}
ASSERT(!pfds.empty());
#ifdef _WIN32
(void) WSAPoll(&pfds[0], pfds.size(), -1);
#else /* _WIN32 */
(void) poll(&pfds[0], pfds.size(), -1);
#endif /* _WIN32 */
std::vector<EventDescription> events;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
if (m_FDChanged[tid])
continue;
for (std::vector<pollfd>::size_type i = 0; i < pfds.size(); i++) {
if ((pfds[i].revents & (POLLIN | POLLOUT | POLLHUP | POLLERR)) == 0)
continue;
if (pfds[i].fd == m_EventFDs[tid][0]) {
char buffer[512];
if (recv(m_EventFDs[tid][0], buffer, sizeof(buffer), 0) < 0)
Log(LogCritical, "SocketEvents", "Read from event FD failed.");
continue;
}
EventDescription event;
event.REvents = pfds[i].revents;
event.Descriptor = descriptors[i];
events.emplace_back(std::move(event));
}
}
for (const EventDescription& event : events) {
try {
event.Descriptor.EventInterface->OnEvent(event.REvents);
} catch (const std::exception& ex) {
Log(LogCritical, "SocketEvents")
<< "Exception thrown in socket I/O handler:\n"
<< DiagnosticInformation(ex);
} catch (...) {
Log(LogCritical, "SocketEvents", "Exception of unknown type thrown in socket I/O handler.");
}
}
}
}
void SocketEventEnginePoll::Register(SocketEvents *se)
{
int tid = se->m_ID % SOCKET_IOTHREADS;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
VERIFY(se->m_FD != INVALID_SOCKET);
SocketEventDescriptor desc;
desc.Events = 0;
desc.EventInterface = se;
VERIFY(m_Sockets[tid].find(se->m_FD) == m_Sockets[tid].end());
m_Sockets[tid][se->m_FD] = desc;
m_FDChanged[tid] = true;
se->m_Events = true;
}
WakeUpThread(tid, true);
}
void SocketEventEnginePoll::Unregister(SocketEvents *se)
{
int tid = se->m_ID % SOCKET_IOTHREADS;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
if (se->m_FD == INVALID_SOCKET)
return;
m_Sockets[tid].erase(se->m_FD);
m_FDChanged[tid] = true;
se->m_FD = INVALID_SOCKET;
se->m_Events = false;
}
WakeUpThread(tid, true);
}
void SocketEventEnginePoll::ChangeEvents(SocketEvents *se, int events)
{
if (se->m_FD == INVALID_SOCKET)
BOOST_THROW_EXCEPTION(std::runtime_error("Tried to read/write from a closed socket."));
int tid = se->m_ID % SOCKET_IOTHREADS;
{
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
auto it = m_Sockets[tid].find(se->m_FD);
if (it == m_Sockets[tid].end())
return;
if (it->second.Events == events)
return;
it->second.Events = events;
if (se->m_EnginePrivate && std::this_thread::get_id() == m_Threads[tid].get_id())
((pollfd *)se->m_EnginePrivate)->events = events;
else
m_FDChanged[tid] = true;
}
WakeUpThread(tid, false);
}

View File

@ -1,142 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/socketevents.hpp"
#include "base/exception.hpp"
#include "base/logger.hpp"
#include "base/application.hpp"
#include "base/scriptglobal.hpp"
#include "base/utility.hpp"
#include <boost/thread/once.hpp>
#include <map>
#ifdef __linux__
# include <sys/epoll.h>
#endif /* __linux__ */
using namespace icinga;
static boost::once_flag l_SocketIOOnceFlag = BOOST_ONCE_INIT;
static SocketEventEngine *l_SocketIOEngine;
int SocketEvents::m_NextID = 0;
void SocketEventEngine::Start()
{
for (int tid = 0; tid < SOCKET_IOTHREADS; tid++) {
Socket::SocketPair(m_EventFDs[tid]);
Utility::SetNonBlockingSocket(m_EventFDs[tid][0]);
Utility::SetNonBlockingSocket(m_EventFDs[tid][1]);
#ifndef _WIN32
Utility::SetCloExec(m_EventFDs[tid][0]);
Utility::SetCloExec(m_EventFDs[tid][1]);
#endif /* _WIN32 */
InitializeThread(tid);
m_Threads[tid] = std::thread(std::bind(&SocketEventEngine::ThreadProc, this, tid));
}
}
void SocketEventEngine::WakeUpThread(int sid, bool wait)
{
int tid = sid % SOCKET_IOTHREADS;
if (std::this_thread::get_id() == m_Threads[tid].get_id())
return;
if (wait) {
boost::mutex::scoped_lock lock(m_EventMutex[tid]);
m_FDChanged[tid] = true;
while (m_FDChanged[tid]) {
(void) send(m_EventFDs[tid][1], "T", 1, 0);
boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(50);
m_CV[tid].timed_wait(lock, timeout);
}
} else {
(void) send(m_EventFDs[tid][1], "T", 1, 0);
}
}
void SocketEvents::InitializeEngine()
{
String eventEngine = Configuration::EventEngine;
if (eventEngine.IsEmpty())
#ifdef __linux__
eventEngine = "epoll";
#else /* __linux__ */
eventEngine = "poll";
#endif /* __linux__ */
if (eventEngine == "poll")
l_SocketIOEngine = new SocketEventEnginePoll();
#ifdef __linux__
else if (eventEngine == "epoll")
l_SocketIOEngine = new SocketEventEngineEpoll();
#endif /* __linux__ */
else {
Log(LogWarning, "SocketEvents")
<< "Invalid event engine selected: " << eventEngine << " - Falling back to 'poll'";
eventEngine = "poll";
l_SocketIOEngine = new SocketEventEnginePoll();
}
l_SocketIOEngine->Start();
Configuration::EventEngine = eventEngine;
}
/**
* Constructor for the SocketEvents class.
*/
SocketEvents::SocketEvents(const Socket::Ptr& socket)
: m_ID(m_NextID++), m_FD(socket->GetFD()), m_EnginePrivate(nullptr)
{
boost::call_once(l_SocketIOOnceFlag, &SocketEvents::InitializeEngine);
Register();
}
SocketEvents::~SocketEvents()
{
VERIFY(m_FD == INVALID_SOCKET);
}
void SocketEvents::Register()
{
l_SocketIOEngine->Register(this);
}
void SocketEvents::Unregister()
{
l_SocketIOEngine->Unregister(this);
}
void SocketEvents::ChangeEvents(int events)
{
l_SocketIOEngine->ChangeEvents(this, events);
}
boost::mutex& SocketEventEngine::GetMutex(int tid)
{
return m_EventMutex[tid];
}
bool SocketEvents::IsHandlingEvents() const
{
int tid = m_ID % SOCKET_IOTHREADS;
boost::mutex::scoped_lock lock(l_SocketIOEngine->GetMutex(tid));
return m_Events;
}
void SocketEvents::OnEvent(int revents)
{
}

View File

@ -1,137 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef SOCKETEVENTS_H
#define SOCKETEVENTS_H
#include "base/i2-base.hpp"
#include "base/socket.hpp"
#include "base/stream.hpp"
#include <boost/thread/condition_variable.hpp>
#include <thread>
#ifndef _WIN32
# include <poll.h>
#endif /* _WIN32 */
namespace icinga
{
/**
* Socket event interface
*
* @ingroup base
*/
class SocketEvents : public Stream
{
public:
DECLARE_PTR_TYPEDEFS(SocketEvents);
~SocketEvents();
virtual void OnEvent(int revents);
void Unregister();
void ChangeEvents(int events);
bool IsHandlingEvents() const;
void *GetEnginePrivate() const;
void SetEnginePrivate(void *priv);
protected:
SocketEvents(const Socket::Ptr& socket);
private:
int m_ID;
SOCKET m_FD;
bool m_Events;
void *m_EnginePrivate;
static int m_NextID;
static void InitializeEngine();
void WakeUpThread(bool wait = false);
void Register();
friend class SocketEventEnginePoll;
friend class SocketEventEngineEpoll;
};
#define SOCKET_IOTHREADS 8
struct SocketEventDescriptor
{
int Events{POLLIN};
SocketEvents::Ptr EventInterface;
};
struct EventDescription
{
int REvents;
SocketEventDescriptor Descriptor;
};
class SocketEventEngine
{
public:
void Start();
void WakeUpThread(int sid, bool wait);
boost::mutex& GetMutex(int tid);
protected:
virtual void InitializeThread(int tid) = 0;
virtual void ThreadProc(int tid) = 0;
virtual void Register(SocketEvents *se) = 0;
virtual void Unregister(SocketEvents *se) = 0;
virtual void ChangeEvents(SocketEvents *se, int events) = 0;
std::thread m_Threads[SOCKET_IOTHREADS];
SOCKET m_EventFDs[SOCKET_IOTHREADS][2];
bool m_FDChanged[SOCKET_IOTHREADS];
boost::mutex m_EventMutex[SOCKET_IOTHREADS];
boost::condition_variable m_CV[SOCKET_IOTHREADS];
std::map<SOCKET, SocketEventDescriptor> m_Sockets[SOCKET_IOTHREADS];
friend class SocketEvents;
};
class SocketEventEnginePoll final : public SocketEventEngine
{
public:
void Register(SocketEvents *se) override;
void Unregister(SocketEvents *se) override;
void ChangeEvents(SocketEvents *se, int events) override;
protected:
void InitializeThread(int tid) override;
void ThreadProc(int tid) override;
};
#ifdef __linux__
class SocketEventEngineEpoll : public SocketEventEngine
{
public:
virtual void Register(SocketEvents *se);
virtual void Unregister(SocketEvents *se);
virtual void ChangeEvents(SocketEvents *se, int events);
protected:
virtual void InitializeThread(int tid);
virtual void ThreadProc(int tid);
private:
SOCKET m_PollFDs[SOCKET_IOTHREADS];
static int PollToEpoll(int events);
static int EpollToPoll(int events);
};
#endif /* __linux__ */
}
#endif /* SOCKETEVENTS_H */

View File

@ -12,7 +12,7 @@ namespace icinga
{
/**
* A TCP socket.
* A TCP socket. DEPRECATED - Use Boost ASIO instead.
*
* @ingroup base
*/
@ -27,6 +27,11 @@ public:
void Connect(const String& node, const String& service);
};
/**
* TCP Connect based on Boost ASIO.
*
* @ingroup base
*/
template<class Socket>
void Connect(Socket& socket, const String& node, const String& service)
{

View File

@ -1,7 +1,7 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "base/application.hpp"
#include "base/tlsstream.hpp"
#include "base/application.hpp"
#include "base/utility.hpp"
#include "base/exception.hpp"
#include "base/logger.hpp"
@ -16,446 +16,8 @@
#include <openssl/x509.h>
#include <sstream>
#ifndef _WIN32
# include <poll.h>
#endif /* _WIN32 */
#define TLS_TIMEOUT_SECONDS 10
using namespace icinga;
int TlsStream::m_SSLIndex;
bool TlsStream::m_SSLIndexInitialized = false;
/**
* Constructor for the TlsStream class.
*
* @param role The role of the client.
* @param sslContext The SSL context for the client.
*/
TlsStream::TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, const std::shared_ptr<SSL_CTX>& sslContext)
: TlsStream(socket, hostname, role, sslContext.get())
{
}
/**
* Constructor for the TlsStream class.
*
* @param role The role of the client.
* @param sslContext The SSL context for the client.
*/
TlsStream::TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, const std::shared_ptr<boost::asio::ssl::context>& sslContext)
: TlsStream(socket, hostname, role, sslContext->native_handle())
{
}
/**
* Constructor for the TlsStream class.
*
* @param role The role of the client.
* @param sslContext The SSL context for the client.
*/
TlsStream::TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, SSL_CTX* sslContext)
: SocketEvents(socket), m_Eof(false), m_HandshakeOK(false), m_VerifyOK(true), m_ErrorCode(0),
m_ErrorOccurred(false), m_Socket(socket), m_Role(role), m_SendQ(new FIFO()), m_RecvQ(new FIFO()),
m_CurrentAction(TlsActionNone), m_Retry(false), m_Shutdown(false)
{
std::ostringstream msgbuf;
char errbuf[256];
m_SSL = std::shared_ptr<SSL>(SSL_new(sslContext), SSL_free);
if (!m_SSL) {
msgbuf << "SSL_new() failed with code " << ERR_peek_error() << ", \"" << ERR_error_string(ERR_peek_error(), errbuf) << "\"";
Log(LogCritical, "TlsStream", msgbuf.str());
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("SSL_new")
<< errinfo_openssl_error(ERR_peek_error()));
}
if (!m_SSLIndexInitialized) {
m_SSLIndex = SSL_get_ex_new_index(0, const_cast<char *>("TlsStream"), nullptr, nullptr, nullptr);
m_SSLIndexInitialized = true;
}
SSL_set_ex_data(m_SSL.get(), m_SSLIndex, this);
SSL_set_verify(m_SSL.get(), SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, &TlsStream::ValidateCertificate);
socket->MakeNonBlocking();
SSL_set_fd(m_SSL.get(), socket->GetFD());
if (m_Role == RoleServer)
SSL_set_accept_state(m_SSL.get());
else {
#ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
if (!hostname.IsEmpty())
SSL_set_tlsext_host_name(m_SSL.get(), hostname.CStr());
#endif /* SSL_CTRL_SET_TLSEXT_HOSTNAME */
SSL_set_connect_state(m_SSL.get());
}
}
TlsStream::~TlsStream()
{
CloseInternal(true);
}
int TlsStream::ValidateCertificate(int preverify_ok, X509_STORE_CTX *ctx)
{
auto *ssl = static_cast<SSL *>(X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx()));
auto *stream = static_cast<TlsStream *>(SSL_get_ex_data(ssl, m_SSLIndex));
if (!preverify_ok) {
stream->m_VerifyOK = false;
std::ostringstream msgbuf;
int err = X509_STORE_CTX_get_error(ctx);
msgbuf << "code " << err << ": " << X509_verify_cert_error_string(err);
stream->m_VerifyError = msgbuf.str();
}
return 1;
}
bool TlsStream::IsVerifyOK() const
{
return m_VerifyOK;
}
String TlsStream::GetVerifyError() const
{
return m_VerifyError;
}
/**
* Retrieves the X509 certficate for this client.
*
* @returns The X509 certificate.
*/
std::shared_ptr<X509> TlsStream::GetClientCertificate() const
{
boost::mutex::scoped_lock lock(m_Mutex);
return std::shared_ptr<X509>(SSL_get_certificate(m_SSL.get()), &Utility::NullDeleter);
}
/**
* Retrieves the X509 certficate for the peer.
*
* @returns The X509 certificate.
*/
std::shared_ptr<X509> TlsStream::GetPeerCertificate() const
{
boost::mutex::scoped_lock lock(m_Mutex);
return std::shared_ptr<X509>(SSL_get_peer_certificate(m_SSL.get()), X509_free);
}
void TlsStream::OnEvent(int revents)
{
int rc;
size_t count;
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_SSL)
return;
char buffer[64 * 1024];
if (m_CurrentAction == TlsActionNone) {
if (revents & (POLLIN | POLLERR | POLLHUP))
m_CurrentAction = TlsActionRead;
else if (m_SendQ->GetAvailableBytes() > 0 && (revents & POLLOUT))
m_CurrentAction = TlsActionWrite;
else {
ChangeEvents(POLLIN);
return;
}
}
bool success = false;
/* Clear error queue for this thread before using SSL_{read,write,do_handshake}.
* Otherwise SSL_*_error() does not work reliably.
*/
ERR_clear_error();
size_t readTotal = 0;
switch (m_CurrentAction) {
case TlsActionRead:
do {
rc = SSL_read(m_SSL.get(), buffer, sizeof(buffer));
if (rc > 0) {
m_RecvQ->Write(buffer, rc);
success = true;
readTotal += rc;
}
#ifdef I2_DEBUG /* I2_DEBUG */
Log(LogDebug, "TlsStream")
<< "Read bytes: " << rc << " Total read bytes: " << readTotal;
#endif /* I2_DEBUG */
/* Limit read size. We cannot do this check inside the while loop
* since below should solely check whether OpenSSL has more data
* or not. */
if (readTotal >= 64 * 1024) {
#ifdef I2_DEBUG /* I2_DEBUG */
Log(LogWarning, "TlsStream")
<< "Maximum read bytes exceeded: " << readTotal;
#endif /* I2_DEBUG */
break;
}
/* Use OpenSSL's state machine here to determine whether we need
* to read more data. SSL_has_pending() is available with 1.1.0.
*/
} while (SSL_pending(m_SSL.get()));
if (success)
m_CV.notify_all();
break;
case TlsActionWrite:
count = m_SendQ->Peek(buffer, sizeof(buffer), true);
rc = SSL_write(m_SSL.get(), buffer, count);
if (rc > 0) {
m_SendQ->Read(nullptr, rc, true);
success = true;
}
break;
case TlsActionHandshake:
rc = SSL_do_handshake(m_SSL.get());
if (rc > 0) {
success = true;
m_HandshakeOK = true;
m_CV.notify_all();
}
break;
default:
VERIFY(!"Invalid TlsAction");
}
if (rc <= 0) {
int err = SSL_get_error(m_SSL.get(), rc);
switch (err) {
case SSL_ERROR_WANT_READ:
m_Retry = true;
ChangeEvents(POLLIN);
break;
case SSL_ERROR_WANT_WRITE:
m_Retry = true;
ChangeEvents(POLLOUT);
break;
case SSL_ERROR_ZERO_RETURN:
lock.unlock();
Close();
return;
default:
m_ErrorCode = ERR_peek_error();
m_ErrorOccurred = true;
if (m_ErrorCode != 0) {
char errbuf[256];
Log(LogWarning, "TlsStream")
<< "OpenSSL error: " << ERR_error_string(m_ErrorCode, errbuf);
} else {
Log(LogWarning, "TlsStream", "TLS stream was disconnected.");
}
lock.unlock();
Close();
return;
}
}
if (success) {
m_CurrentAction = TlsActionNone;
if (!m_Eof) {
if (m_SendQ->GetAvailableBytes() > 0)
ChangeEvents(POLLIN|POLLOUT);
else
ChangeEvents(POLLIN);
}
lock.unlock();
while (m_RecvQ->IsDataAvailable() && IsHandlingEvents())
SignalDataAvailable();
}
if (m_Shutdown && !m_SendQ->IsDataAvailable()) {
if (!success)
lock.unlock();
Close();
}
}
void TlsStream::HandleError() const
{
if (m_ErrorOccurred) {
BOOST_THROW_EXCEPTION(openssl_error()
<< boost::errinfo_api_function("TlsStream::OnEvent")
<< errinfo_openssl_error(m_ErrorCode));
}
}
void TlsStream::Handshake()
{
boost::mutex::scoped_lock lock(m_Mutex);
m_CurrentAction = TlsActionHandshake;
ChangeEvents(POLLOUT);
boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(long(Configuration::TlsHandshakeTimeout * 1000));
while (!m_HandshakeOK && !m_ErrorOccurred && !m_Eof && timeout > boost::get_system_time())
m_CV.timed_wait(lock, timeout);
if (timeout < boost::get_system_time())
BOOST_THROW_EXCEPTION(std::runtime_error("Timeout was reached (" + Convert::ToString(Configuration::TlsHandshakeTimeout) + ") during TLS handshake."));
if (m_Eof)
BOOST_THROW_EXCEPTION(std::runtime_error("Socket was closed during TLS handshake."));
HandleError();
}
/**
* Processes data for the stream.
*/
size_t TlsStream::Peek(void *buffer, size_t count, bool allow_partial)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!allow_partial)
while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof)
m_CV.wait(lock);
HandleError();
return m_RecvQ->Peek(buffer, count, true);
}
size_t TlsStream::Read(void *buffer, size_t count, bool allow_partial)
{
boost::mutex::scoped_lock lock(m_Mutex);
if (!allow_partial)
while (m_RecvQ->GetAvailableBytes() < count && !m_ErrorOccurred && !m_Eof)
m_CV.wait(lock);
HandleError();
return m_RecvQ->Read(buffer, count, true);
}
void TlsStream::Write(const void *buffer, size_t count)
{
boost::mutex::scoped_lock lock(m_Mutex);
m_SendQ->Write(buffer, count);
ChangeEvents(POLLIN|POLLOUT);
}
void TlsStream::Shutdown()
{
m_Shutdown = true;
ChangeEvents(POLLOUT);
}
/**
* Closes the stream.
*/
void TlsStream::Close()
{
CloseInternal(false);
}
void TlsStream::CloseInternal(bool inDestructor)
{
if (m_Eof)
return;
m_Eof = true;
if (!inDestructor)
SignalDataAvailable();
SocketEvents::Unregister();
Stream::Close();
boost::mutex::scoped_lock lock(m_Mutex);
if (!m_SSL)
return;
/* https://www.openssl.org/docs/manmaster/man3/SSL_shutdown.html
*
* It is recommended to do a bidirectional shutdown by checking
* the return value of SSL_shutdown() and call it again until
* it returns 1 or a fatal error. A maximum of 2x pending + 2x data
* is recommended.
*/
int rc = 0;
for (int i = 0; i < 4; i++) {
if ((rc = SSL_shutdown(m_SSL.get())))
break;
}
m_SSL.reset();
m_Socket->Close();
m_Socket.reset();
m_CV.notify_all();
}
bool TlsStream::IsEof() const
{
return m_Eof && m_RecvQ->GetAvailableBytes() < 1u;
}
bool TlsStream::SupportsWaiting() const
{
return true;
}
bool TlsStream::IsDataAvailable() const
{
boost::mutex::scoped_lock lock(m_Mutex);
return m_RecvQ->GetAvailableBytes() > 0;
}
Socket::Ptr TlsStream::GetSocket() const
{
return m_Socket;
}
bool UnbufferedAsioTlsStream::IsVerifyOK() const
{
return m_VerifyOK;

View File

@ -5,7 +5,6 @@
#include "base/i2-base.hpp"
#include "base/socket.hpp"
#include "base/socketevents.hpp"
#include "base/stream.hpp"
#include "base/tlsutility.hpp"
#include "base/fifo.hpp"
@ -20,86 +19,6 @@
namespace icinga
{
enum TlsAction
{
TlsActionNone,
TlsActionRead,
TlsActionWrite,
TlsActionHandshake
};
/**
* A TLS stream.
*
* @ingroup base
*/
class TlsStream final : public SocketEvents
{
public:
DECLARE_PTR_TYPEDEFS(TlsStream);
TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, const std::shared_ptr<SSL_CTX>& sslContext = MakeSSLContext());
TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, const std::shared_ptr<boost::asio::ssl::context>& sslContext);
~TlsStream() override;
Socket::Ptr GetSocket() const;
std::shared_ptr<X509> GetClientCertificate() const;
std::shared_ptr<X509> GetPeerCertificate() const;
void Handshake();
void Close() override;
void Shutdown() override;
size_t Peek(void *buffer, size_t count, bool allow_partial = false) override;
size_t Read(void *buffer, size_t count, bool allow_partial = false) override;
void Write(const void *buffer, size_t count) override;
bool IsEof() const override;
bool SupportsWaiting() const override;
bool IsDataAvailable() const override;
bool IsVerifyOK() const;
String GetVerifyError() const;
private:
std::shared_ptr<SSL> m_SSL;
bool m_Eof;
mutable boost::mutex m_Mutex;
mutable boost::condition_variable m_CV;
bool m_HandshakeOK;
bool m_VerifyOK;
String m_VerifyError;
int m_ErrorCode;
bool m_ErrorOccurred;
Socket::Ptr m_Socket;
ConnectionRole m_Role;
FIFO::Ptr m_SendQ;
FIFO::Ptr m_RecvQ;
TlsAction m_CurrentAction;
bool m_Retry;
bool m_Shutdown;
static int m_SSLIndex;
static bool m_SSLIndexInitialized;
TlsStream(const Socket::Ptr& socket, const String& hostname, ConnectionRole role, SSL_CTX* sslContext);
void OnEvent(int revents) override;
void HandleError() const;
static int ValidateCertificate(int preverify_ok, X509_STORE_CTX *ctx);
static void NullCertificateDeleter(X509 *certificate);
void CloseInternal(bool inDestructor);
};
struct UnbufferedAsioTlsStreamParams
{
boost::asio::io_service& IoService;

View File

@ -129,25 +129,6 @@ static void SetupSslContext(SSL_CTX *sslContext, const String& pubkey, const Str
}
}
/**
* Initializes an SSL context using the specified certificates.
*
* @param pubkey The public key.
* @param privkey The matching private key.
* @param cakey CA certificate chain file.
* @returns An SSL context.
*/
std::shared_ptr<SSL_CTX> MakeSSLContext(const String& pubkey, const String& privkey, const String& cakey)
{
InitializeOpenSSL();
std::shared_ptr<SSL_CTX> sslContext = std::shared_ptr<SSL_CTX>(SSL_CTX_new(SSLv23_method()), SSL_CTX_free);
SetupSslContext(sslContext.get(), pubkey, privkey, cakey);
return sslContext;
}
/**
* Initializes an SSL context using the specified certificates.
*

View File

@ -21,25 +21,30 @@ namespace icinga
{
void InitializeOpenSSL();
std::shared_ptr<SSL_CTX> MakeSSLContext(const String& pubkey = String(), const String& privkey = String(), const String& cakey = String());
std::shared_ptr<boost::asio::ssl::context> MakeAsioSslContext(const String& pubkey = String(), const String& privkey = String(), const String& cakey = String());
void AddCRLToSSLContext(const std::shared_ptr<boost::asio::ssl::context>& context, const String& crlPath);
void SetCipherListToSSLContext(const std::shared_ptr<boost::asio::ssl::context>& context, const String& cipherList);
void SetTlsProtocolminToSSLContext(const std::shared_ptr<boost::asio::ssl::context>& context, const String& tlsProtocolmin);
String GetCertificateCN(const std::shared_ptr<X509>& certificate);
std::shared_ptr<X509> GetX509Certificate(const String& pemfile);
int MakeX509CSR(const String& cn, const String& keyfile, const String& csrfile = String(), const String& certfile = String(), bool ca = false);
std::shared_ptr<X509> CreateCert(EVP_PKEY *pubkey, X509_NAME *subject, X509_NAME *issuer, EVP_PKEY *cakey, bool ca);
String GetIcingaCADir();
String CertificateToString(const std::shared_ptr<X509>& cert);
std::shared_ptr<X509> StringToCertificate(const String& cert);
std::shared_ptr<X509> CreateCertIcingaCA(EVP_PKEY *pubkey, X509_NAME *subject);
std::shared_ptr<X509> CreateCertIcingaCA(const std::shared_ptr<X509>& cert);
String PBKDF2_SHA1(const String& password, const String& salt, int iterations);
String PBKDF2_SHA256(const String& password, const String& salt, int iterations);
String SHA1(const String& s, bool binary = false);
String SHA256(const String& s);
String RandomString(int length);
bool VerifyCertificate(const std::shared_ptr<X509>& caCertificate, const std::shared_ptr<X509>& certificate);
class openssl_error : virtual public std::exception, virtual public boost::exception { };

View File

@ -9,6 +9,11 @@
namespace icinga
{
/**
* A TCP socket. DEPRECATED - Use Boost ASIO instead.
*
* @ingroup base
*/
class UnixSocket final : public Socket
{
public:

View File

@ -2,7 +2,6 @@
#include "cli/consolecommand.hpp"
#include "config/configcompiler.hpp"
#include "remote/apiclient.hpp"
#include "remote/consolehandler.hpp"
#include "remote/url.hpp"
#include "base/configwriter.hpp"
@ -721,4 +720,4 @@ Array::Ptr ConsoleCommand::AutoCompleteScript(const String& session, const Strin
}
return suggestions;
}
}

View File

@ -17,7 +17,6 @@
#include "base/context.hpp"
#include "config.h"
#include <boost/program_options.hpp>
#include <boost/tuple/tuple.hpp>
#include <iostream>
#include <fstream>

View File

@ -22,7 +22,6 @@
#include "base/application.hpp"
#include "base/context.hpp"
#include "base/statsfunction.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <fstream>

View File

@ -13,7 +13,6 @@
#include "base/configtype.hpp"
#include "base/exception.hpp"
#include "base/statsfunction.hpp"
#include <boost/tuple/tuple.hpp>
#include <utility>
using namespace icinga;

View File

@ -14,7 +14,6 @@
#include "base/exception.hpp"
#include "base/context.hpp"
#include "base/statsfunction.hpp"
#include <boost/tuple/tuple.hpp>
#include <utility>
using namespace icinga;

View File

@ -6,7 +6,6 @@
#include "icinga/service.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include <boost/tuple/tuple.hpp>
using namespace icinga;

View File

@ -8,7 +8,6 @@
#include "base/objectlock.hpp"
#include "base/json.hpp"
#include "base/utility.hpp"
#include <boost/tuple/tuple.hpp>
using namespace icinga;

View File

@ -6,7 +6,6 @@
#include "icinga/service.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include <boost/tuple/tuple.hpp>
using namespace icinga;

View File

@ -10,7 +10,6 @@
#include "base/objectlock.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string/replace.hpp>
using namespace icinga;

View File

@ -18,7 +18,6 @@
#include "base/json.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string/replace.hpp>
using namespace icinga;

View File

@ -10,7 +10,6 @@
#include "base/utility.hpp"
#include "base/convert.hpp"
#include "base/logger.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/predicate.hpp>

View File

@ -19,7 +19,6 @@
#include "base/logger.hpp"
#include "base/application.hpp"
#include "base/objectlock.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/predicate.hpp>

View File

@ -20,7 +20,6 @@
#include "base/json.hpp"
#include "base/convert.hpp"
#include "base/utility.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string/replace.hpp>
using namespace icinga;

View File

@ -19,7 +19,6 @@
#include "base/logger.hpp"
#include "base/application.hpp"
#include "base/objectlock.hpp"
#include <boost/tuple/tuple.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/replace.hpp>
#include <boost/algorithm/string/predicate.hpp>

View File

@ -20,7 +20,6 @@
#include "base/array.hpp"
#include "base/dictionary.hpp"
#include <boost/algorithm/string/case_conv.hpp>
#include <boost/tuple/tuple.hpp>
using namespace icinga;

View File

@ -3,8 +3,6 @@
#include "perfdata/elasticsearchwriter.hpp"
#include "perfdata/elasticsearchwriter-ti.cpp"
#include "remote/url.hpp"
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
#include "icinga/compatutility.hpp"
#include "icinga/service.hpp"
#include "icinga/checkcommand.hpp"

View File

@ -28,6 +28,9 @@ REGISTER_TYPE(GraphiteWriter);
REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
/*
* Enable HA capabilities once the config object is loaded.
*/
void GraphiteWriter::OnConfigLoaded()
{
ObjectImpl<GraphiteWriter>::OnConfigLoaded();
@ -44,6 +47,12 @@ void GraphiteWriter::OnConfigLoaded()
}
}
/**
* Feature stats interface
*
* @param status Key value pairs for feature stats
* @param perfdata Array of PerfdataValue objects
*/
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{
DictionaryData nodes;
@ -65,6 +74,9 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
status->Set("graphitewriter", new Dictionary(std::move(nodes)));
}
/**
* Resume is equivalent to Start, but with HA capabilities to resume at runtime.
*/
void GraphiteWriter::Resume()
{
ObjectImpl<GraphiteWriter>::Resume();
@ -86,7 +98,9 @@ void GraphiteWriter::Resume()
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
}
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
/**
* Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
*/
void GraphiteWriter::Pause()
{
m_ReconnectTimer.reset();
@ -110,11 +124,21 @@ void GraphiteWriter::Pause()
ObjectImpl<GraphiteWriter>::Pause();
}
/**
* Check if method is called inside the WQ thread.
*/
void GraphiteWriter::AssertOnWorkQueue()
{
ASSERT(m_WorkQueue.IsWorkerThread());
}
/**
* Exception handler for the WQ.
*
* Closes the connection if connected.
*
* @param exp Exception pointer
*/
void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
{
Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
@ -123,12 +147,17 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
if (GetConnected()) {
m_Stream->Close();
m_Stream->close();
SetConnected(false);
}
}
/**
* Reconnect method, stops when the feature is paused in HA zones.
*
* Called inside the WQ.
*/
void GraphiteWriter::Reconnect()
{
AssertOnWorkQueue();
@ -141,6 +170,9 @@ void GraphiteWriter::Reconnect()
ReconnectInternal();
}
/**
* Reconnect method, connects to a TCP Stream
*/
void GraphiteWriter::ReconnectInternal()
{
double startTime = Utility::GetTime();
@ -152,20 +184,17 @@ void GraphiteWriter::ReconnectInternal()
if (GetConnected())
return;
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "GraphiteWriter")
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
try {
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
m_Stream = new NetworkStream(socket);
try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
}
SetConnected(true);
@ -173,14 +202,24 @@ void GraphiteWriter::ReconnectInternal()
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
}
/**
* Reconnect handler called by the timer.
*
* Enqueues a reconnect task into the WQ.
*/
void GraphiteWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal);
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
}
/**
* Disconnect the stream.
*
* Called inside the WQ.
*/
void GraphiteWriter::Disconnect()
{
AssertOnWorkQueue();
@ -188,16 +227,27 @@ void GraphiteWriter::Disconnect()
DisconnectInternal();
}
/**
* Disconnect the stream.
*
* Called outside the WQ.
*/
void GraphiteWriter::DisconnectInternal()
{
if (!GetConnected())
return;
m_Stream->Close();
m_Stream->close();
SetConnected(false);
}
/**
* Check result event handler, checks whether feature is not paused in HA setups.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
@ -206,6 +256,14 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
}
/**
* Check result event handler, prepares metadata and perfdata values and calls Send*()
*
* Called inside the WQ.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
AssertOnWorkQueue();
@ -262,6 +320,14 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
SendPerfdata(checkable, prefixPerfdata, cr, ts);
}
/**
* Parse performance data from check result and call SendMetric()
*
* @param checkable Host/service object
* @param prefix Metric prefix string
* @param cr Check result including performance data
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
{
Array::Ptr perfdata = cr->GetPerformanceData();
@ -306,8 +372,19 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
}
}
/**
* Computes metric data and sends to Graphite
*
* @param checkable Host/service object
* @param prefix Computed metric prefix string
* @param name Metric name
* @param value Metric value
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{
namespace asio = boost::asio;
std::ostringstream msgbuf;
msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
@ -316,7 +393,6 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
// do not send \n to debug log
msgbuf << "\n";
String metric = msgbuf.str();
boost::mutex::scoped_lock lock(m_StreamMutex);
@ -324,7 +400,8 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
return;
try {
m_Stream->Write(metric.CStr(), metric.GetLength());
asio::write(*m_Stream, asio::buffer(msgbuf.str()));
m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
@ -333,6 +410,14 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
}
}
/**
* Escape metric tree elements
*
* Dots are not allowed, e.g. in host names
*
* @param str Metric part name
* @return Escape string
*/
String GraphiteWriter::EscapeMetric(const String& str)
{
String result = str;
@ -346,6 +431,14 @@ String GraphiteWriter::EscapeMetric(const String& str)
return result;
}
/**
* Escape metric label
*
* Dots are allowed - users can create trees from perfdata labels
*
* @param str Metric label name
* @return Escaped string
*/
String GraphiteWriter::EscapeMetricLabel(const String& str)
{
String result = str;
@ -359,6 +452,12 @@ String GraphiteWriter::EscapeMetricLabel(const String& str)
return result;
}
/**
* Escape macro metrics found via host/service name templates
*
* @param value Array or string with macro metric names
* @return Escaped string. Arrays are joined with dots.
*/
Value GraphiteWriter::EscapeMacroMetric(const Value& value)
{
if (value.IsObjectType<Array>()) {
@ -375,6 +474,12 @@ Value GraphiteWriter::EscapeMacroMetric(const Value& value)
return EscapeMetric(value);
}
/**
* Validate the configuration setting 'host_name_template'
*
* @param lvalue String containing runtime macros.
* @param utils Helper, unused
*/
void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
@ -383,6 +488,12 @@ void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
}
/**
* Validate the configuration setting 'service_name_template'
*
* @param lvalue String containing runtime macros.
* @param utils Helper, unused
*/
void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{
ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);

View File

@ -37,7 +37,7 @@ protected:
void Pause() override;
private:
Stream::Ptr m_Stream;
std::shared_ptr<AsioTcpStream> m_Stream;
boost::mutex m_StreamMutex;
WorkQueue m_WorkQueue{10000000, 1};

View File

@ -3,8 +3,6 @@
#include "perfdata/influxdbwriter.hpp"
#include "perfdata/influxdbwriter-ti.cpp"
#include "remote/url.hpp"
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
#include "icinga/service.hpp"
#include "icinga/macroprocessor.hpp"
#include "icinga/icingaapplication.hpp"

View File

@ -28,6 +28,9 @@ REGISTER_TYPE(OpenTsdbWriter);
REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
/*
* Enable HA capabilities once the config object is loaded.
*/
void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
@ -42,17 +45,27 @@ void OpenTsdbWriter::OnConfigLoaded()
}
}
/**
* Feature stats interface
*
* @param status Key value pairs for feature stats
*/
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{
DictionaryData nodes;
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats
nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
{ "connected", opentsdbwriter->GetConnected() }
}));
}
status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
}
/**
* Resume is equivalent to Start, but with HA capabilities to resume at runtime.
*/
void OpenTsdbWriter::Resume()
{
ObjectImpl<OpenTsdbWriter>::Resume();
@ -69,7 +82,9 @@ void OpenTsdbWriter::Resume()
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
}
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
/**
* Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
*/
void OpenTsdbWriter::Pause()
{
m_ReconnectTimer.reset();
@ -77,33 +92,54 @@ void OpenTsdbWriter::Pause()
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' paused.";
m_Stream->close();
SetConnected(false);
ObjectImpl<OpenTsdbWriter>::Pause();
}
/**
* Reconnect handler called by the timer.
* Handles TLS
*/
void OpenTsdbWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
if (m_Stream)
return;
SetShouldConnect(true);
TcpSocket::Ptr socket = new TcpSocket();
if (GetConnected())
return;
Log(LogNotice, "OpenTsdbWriter")
<< "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
/*
* We're using telnet as input method. Future PRs may change this into using the HTTP API.
* http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
*/
m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
try {
socket->Connect(GetHost(), GetPort());
} catch (std::exception&) {
Log(LogCritical, "OpenTsdbWriter")
<< "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
return;
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "OpenTsdbWriter")
<< "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << ".'";
}
m_Stream = new NetworkStream(socket);
SetConnected(true);
}
/**
* Registered check result handler processing data.
* Calculates tags from the config.
*
* @param checkable Host/service object
* @param cr Check result
*/
void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
@ -165,6 +201,15 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
}
/**
* Parse and send performance data metrics to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
* @param tags Tag key pairs
* @param cr Check result containing performance data
* @param ts Timestamp when the check result was received
*/
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
{
@ -209,6 +254,15 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
}
}
/**
* Send given metric to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
* @param tags Tag key pairs
* @param value Floating point metric value
* @param ts Timestamp where the metric was received from the check result
*/
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts)
{
@ -220,7 +274,7 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
std::ostringstream msgbuf;
/*
* must be (http://opentsdb.net/docs/build/html/user_guide/writing.html)
* must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
* put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
* "tags" must include at least one tag, we use "host=HOSTNAME"
*/
@ -235,21 +289,27 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
ObjectLock olock(this);
if (!m_Stream)
if (!GetConnected())
return;
try {
m_Stream->Write(put.CStr(), put.GetLength());
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "OpenTsdbWriter")
<< "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'.";
m_Stream.reset();
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
}
}
/* for metric and tag name rules, see
* http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
/**
* Escape tags for OpenTSDB
* http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
*
* @param str Tag name
* @return Escaped tag
*/
String OpenTsdbWriter::EscapeTag(const String& str)
{
@ -261,6 +321,13 @@ String OpenTsdbWriter::EscapeTag(const String& str)
return result;
}
/**
* Escape metric name for OpenTSDB
* http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
*
* @param str Metric name
* @return Escaped metric
*/
String OpenTsdbWriter::EscapeMetric(const String& str)
{
String result = str;
@ -271,4 +338,4 @@ String OpenTsdbWriter::EscapeMetric(const String& str)
boost::replace_all(result, ":", "_");
return result;
}
}

View File

@ -32,7 +32,7 @@ protected:
void Pause() override;
private:
Stream::Ptr m_Stream;
std::shared_ptr<AsioTcpStream> m_Stream;
Timer::Ptr m_ReconnectTimer;

View File

@ -20,6 +20,11 @@ class OpenTsdbWriter : ConfigObject
[config] bool enable_ha {
default {{{ return false; }}}
};
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
};
};
}

View File

@ -9,7 +9,6 @@ set(remote_SOURCES
i2-remote.hpp
actionshandler.cpp actionshandler.hpp
apiaction.cpp apiaction.hpp
apiclient.cpp apiclient.hpp
apifunction.cpp apifunction.hpp
apilistener.cpp apilistener.hpp apilistener-ti.hpp apilistener-configsync.cpp apilistener-filesync.cpp
apilistener-authority.cpp
@ -26,11 +25,7 @@ set(remote_SOURCES
eventqueue.cpp eventqueue.hpp
eventshandler.cpp eventshandler.hpp
filterutility.cpp filterutility.hpp
httpchunkedencoding.cpp httpchunkedencoding.hpp
httpclientconnection.cpp httpclientconnection.hpp
httphandler.cpp httphandler.hpp
httprequest.cpp httprequest.hpp
httpresponse.cpp httpresponse.hpp
httpserverconnection.cpp httpserverconnection.hpp
httputility.cpp httputility.hpp
infohandler.cpp infohandler.hpp

View File

@ -1,164 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "remote/apiclient.hpp"
#include "base/base64.hpp"
#include "base/json.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
using namespace icinga;
ApiClient::ApiClient(const String& host, const String& port,
String user, String password)
: m_Connection(new HttpClientConnection(host, port, true)), m_User(std::move(user)), m_Password(std::move(password))
{
m_Connection->Start();
}
void ApiClient::ExecuteScript(const String& session, const String& command, bool sandboxed,
const ExecuteScriptCompletionCallback& callback) const
{
Url::Ptr url = new Url();
url->SetScheme("https");
url->SetHost(m_Connection->GetHost());
url->SetPort(m_Connection->GetPort());
url->SetPath({ "v1", "console", "execute-script" });
url->SetQuery({
{"session", session},
{"command", command},
{"sandboxed", sandboxed ? "1" : "0"}
});
try {
std::shared_ptr<HttpRequest> req = m_Connection->NewRequest();
req->RequestMethod = "POST";
req->RequestUrl = url;
req->AddHeader("Authorization", "Basic " + Base64::Encode(m_User + ":" + m_Password));
req->AddHeader("Accept", "application/json");
m_Connection->SubmitRequest(req, std::bind(ExecuteScriptHttpCompletionCallback, _1, _2, callback));
} catch (const std::exception&) {
callback(boost::current_exception(), Empty);
}
}
void ApiClient::ExecuteScriptHttpCompletionCallback(HttpRequest& request,
HttpResponse& response, const ExecuteScriptCompletionCallback& callback)
{
Dictionary::Ptr result;
String body;
char buffer[1024];
size_t count;
while ((count = response.ReadBody(buffer, sizeof(buffer))) > 0)
body += String(buffer, buffer + count);
try {
if (response.StatusCode < 200 || response.StatusCode > 299) {
std::string message = "HTTP request failed; Code: " + Convert::ToString(response.StatusCode) + "; Body: " + body;
BOOST_THROW_EXCEPTION(ScriptError(message));
}
result = JsonDecode(body);
Array::Ptr results = result->Get("results");
Value result;
String errorMessage = "Unexpected result from API.";
if (results && results->GetLength() > 0) {
Dictionary::Ptr resultInfo = results->Get(0);
errorMessage = resultInfo->Get("status");
if (resultInfo->Get("code") >= 200 && resultInfo->Get("code") <= 299) {
result = resultInfo->Get("result");
} else {
DebugInfo di;
Dictionary::Ptr debugInfo = resultInfo->Get("debug_info");
if (debugInfo) {
di.Path = debugInfo->Get("path");
di.FirstLine = debugInfo->Get("first_line");
di.FirstColumn = debugInfo->Get("first_column");
di.LastLine = debugInfo->Get("last_line");
di.LastColumn = debugInfo->Get("last_column");
}
bool incompleteExpression = resultInfo->Get("incomplete_expression");
BOOST_THROW_EXCEPTION(ScriptError(errorMessage, di, incompleteExpression));
}
}
callback(boost::exception_ptr(), result);
} catch (const std::exception&) {
callback(boost::current_exception(), Empty);
}
}
void ApiClient::AutocompleteScript(const String& session, const String& command, bool sandboxed,
const AutocompleteScriptCompletionCallback& callback) const
{
Url::Ptr url = new Url();
url->SetScheme("https");
url->SetHost(m_Connection->GetHost());
url->SetPort(m_Connection->GetPort());
url->SetPath({ "v1", "console", "auto-complete-script" });
url->SetQuery({
{"session", session},
{"command", command},
{"sandboxed", sandboxed ? "1" : "0"}
});
try {
std::shared_ptr<HttpRequest> req = m_Connection->NewRequest();
req->RequestMethod = "POST";
req->RequestUrl = url;
req->AddHeader("Authorization", "Basic " + Base64::Encode(m_User + ":" + m_Password));
req->AddHeader("Accept", "application/json");
m_Connection->SubmitRequest(req, std::bind(AutocompleteScriptHttpCompletionCallback, _1, _2, callback));
} catch (const std::exception&) {
callback(boost::current_exception(), nullptr);
}
}
void ApiClient::AutocompleteScriptHttpCompletionCallback(HttpRequest& request,
HttpResponse& response, const AutocompleteScriptCompletionCallback& callback)
{
Dictionary::Ptr result;
String body;
char buffer[1024];
size_t count;
while ((count = response.ReadBody(buffer, sizeof(buffer))) > 0)
body += String(buffer, buffer + count);
try {
if (response.StatusCode < 200 || response.StatusCode > 299) {
std::string message = "HTTP request failed; Code: " + Convert::ToString(response.StatusCode) + "; Body: " + body;
BOOST_THROW_EXCEPTION(ScriptError(message));
}
result = JsonDecode(body);
Array::Ptr results = result->Get("results");
Array::Ptr suggestions;
String errorMessage = "Unexpected result from API.";
if (results && results->GetLength() > 0) {
Dictionary::Ptr resultInfo = results->Get(0);
errorMessage = resultInfo->Get("status");
if (resultInfo->Get("code") >= 200 && resultInfo->Get("code") <= 299)
suggestions = resultInfo->Get("suggestions");
else
BOOST_THROW_EXCEPTION(ScriptError(errorMessage));
}
callback(boost::exception_ptr(), suggestions);
} catch (const std::exception&) {
callback(boost::current_exception(), nullptr);
}
}

View File

@ -1,43 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef APICLIENT_H
#define APICLIENT_H
#include "remote/httpclientconnection.hpp"
#include "base/value.hpp"
#include "base/exception.hpp"
#include <vector>
namespace icinga
{
class ApiClient : public Object
{
public:
DECLARE_PTR_TYPEDEFS(ApiClient);
ApiClient(const String& host, const String& port,
String user, String password);
typedef std::function<void(boost::exception_ptr, const Value&)> ExecuteScriptCompletionCallback;
void ExecuteScript(const String& session, const String& command, bool sandboxed,
const ExecuteScriptCompletionCallback& callback) const;
typedef std::function<void(boost::exception_ptr, const Array::Ptr&)> AutocompleteScriptCompletionCallback;
void AutocompleteScript(const String& session, const String& command, bool sandboxed,
const AutocompleteScriptCompletionCallback& callback) const;
private:
HttpClientConnection::Ptr m_Connection;
String m_User;
String m_Password;
static void ExecuteScriptHttpCompletionCallback(HttpRequest& request,
HttpResponse& response, const ExecuteScriptCompletionCallback& callback);
static void AutocompleteScriptHttpCompletionCallback(HttpRequest& request,
HttpResponse& response, const AutocompleteScriptCompletionCallback& callback);
};
}
#endif /* APICLIENT_H */

View File

@ -1,66 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "remote/httpchunkedencoding.hpp"
#include <sstream>
using namespace icinga;
StreamReadStatus HttpChunkedEncoding::ReadChunkFromStream(const Stream::Ptr& stream,
char **data, size_t *size, ChunkReadContext& context, bool may_wait)
{
if (context.LengthIndicator == -1) {
String line;
StreamReadStatus status = stream->ReadLine(&line, context.StreamContext, may_wait);
may_wait = false;
if (status != StatusNewItem)
return status;
std::stringstream msgbuf;
msgbuf << std::hex << line;
msgbuf >> context.LengthIndicator;
if (context.LengthIndicator < 0)
BOOST_THROW_EXCEPTION(std::invalid_argument("HTTP chunk length must not be negative."));
}
StreamReadContext& scontext = context.StreamContext;
if (scontext.Eof)
return StatusEof;
if (scontext.MustRead) {
if (!scontext.FillFromStream(stream, may_wait)) {
scontext.Eof = true;
return StatusEof;
}
scontext.MustRead = false;
}
size_t NewlineLength = context.LengthIndicator ? 2 : 0;
if (scontext.Size < (size_t)context.LengthIndicator + NewlineLength) {
scontext.MustRead = true;
return StatusNeedData;
}
*data = new char[context.LengthIndicator];
*size = context.LengthIndicator;
memcpy(*data, scontext.Buffer, context.LengthIndicator);
scontext.DropData(context.LengthIndicator + NewlineLength);
context.LengthIndicator = -1;
return StatusNewItem;
}
void HttpChunkedEncoding::WriteChunkToStream(const Stream::Ptr& stream, const char *data, size_t count)
{
std::ostringstream msgbuf;
msgbuf << std::hex << count << "\r\n";
String lengthIndicator = msgbuf.str();
stream->Write(lengthIndicator.CStr(), lengthIndicator.GetLength());
stream->Write(data, count);
if (count > 0)
stream->Write("\r\n", 2);
}

View File

@ -1,37 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef HTTPCHUNKEDENCODING_H
#define HTTPCHUNKEDENCODING_H
#include "remote/i2-remote.hpp"
#include "base/stream.hpp"
namespace icinga
{
struct ChunkReadContext
{
StreamReadContext& StreamContext;
int LengthIndicator;
ChunkReadContext(StreamReadContext& scontext)
: StreamContext(scontext), LengthIndicator(-1)
{ }
};
/**
* HTTP chunked encoding.
*
* @ingroup remote
*/
struct HttpChunkedEncoding
{
static StreamReadStatus ReadChunkFromStream(const Stream::Ptr& stream,
char **data, size_t *size, ChunkReadContext& ccontext, bool may_wait = false);
static void WriteChunkToStream(const Stream::Ptr& stream, const char *data, size_t count);
};
}
#endif /* HTTPCHUNKEDENCODING_H */

View File

@ -1,159 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "remote/httpclientconnection.hpp"
#include "base/configtype.hpp"
#include "base/objectlock.hpp"
#include "base/base64.hpp"
#include "base/utility.hpp"
#include "base/logger.hpp"
#include "base/exception.hpp"
#include "base/convert.hpp"
#include "base/tcpsocket.hpp"
#include "base/tlsstream.hpp"
#include "base/networkstream.hpp"
using namespace icinga;
HttpClientConnection::HttpClientConnection(String host, String port, bool tls)
: m_Host(std::move(host)), m_Port(std::move(port)), m_Tls(tls)
{ }
void HttpClientConnection::Start()
{
/* Nothing to do here atm. */
}
void HttpClientConnection::Reconnect()
{
if (m_Stream)
m_Stream->Close();
m_Context.~StreamReadContext();
new (&m_Context) StreamReadContext();
m_Requests.clear();
m_CurrentResponse.reset();
TcpSocket::Ptr socket = new TcpSocket();
socket->Connect(m_Host, m_Port);
if (m_Tls)
m_Stream = new TlsStream(socket, m_Host, RoleClient);
else
ASSERT(!"Non-TLS HTTP connections not supported.");
/* m_Stream = new NetworkStream(socket);
* -- does not currently work because the NetworkStream class doesn't support async I/O
*/
/* the stream holds an owning reference to this object through the callback we're registering here */
m_Stream->RegisterDataHandler(std::bind(&HttpClientConnection::DataAvailableHandler, HttpClientConnection::Ptr(this), _1));
if (m_Stream->IsDataAvailable())
DataAvailableHandler(m_Stream);
}
Stream::Ptr HttpClientConnection::GetStream() const
{
return m_Stream;
}
String HttpClientConnection::GetHost() const
{
return m_Host;
}
String HttpClientConnection::GetPort() const
{
return m_Port;
}
bool HttpClientConnection::GetTls() const
{
return m_Tls;
}
void HttpClientConnection::Disconnect()
{
Log(LogDebug, "HttpClientConnection", "Http client disconnected");
m_Stream->Shutdown();
}
bool HttpClientConnection::ProcessMessage()
{
bool res;
if (m_Requests.empty()) {
m_Stream->Close();
return false;
}
const std::pair<std::shared_ptr<HttpRequest>, HttpCompletionCallback>& currentRequest = *m_Requests.begin();
HttpRequest& request = *currentRequest.first.get();
const HttpCompletionCallback& callback = currentRequest.second;
if (!m_CurrentResponse)
m_CurrentResponse = std::make_shared<HttpResponse>(m_Stream, request);
std::shared_ptr<HttpResponse> currentResponse = m_CurrentResponse;
HttpResponse& response = *currentResponse.get();
try {
res = response.Parse(m_Context, false);
} catch (const std::exception&) {
callback(request, response);
m_Stream->Shutdown();
return false;
}
if (response.Complete) {
callback(request, response);
m_Requests.pop_front();
m_CurrentResponse.reset();
return true;
}
return res;
}
void HttpClientConnection::DataAvailableHandler(const Stream::Ptr& stream)
{
ASSERT(stream == m_Stream);
bool close = false;
if (!m_Stream->IsEof()) {
boost::mutex::scoped_lock lock(m_DataHandlerMutex);
try {
while (ProcessMessage())
; /* empty loop body */
} catch (const std::exception& ex) {
Log(LogWarning, "HttpClientConnection")
<< "Error while reading Http response: " << DiagnosticInformation(ex);
close = true;
Disconnect();
}
} else
close = true;
if (close)
m_Stream->Close();
}
std::shared_ptr<HttpRequest> HttpClientConnection::NewRequest()
{
Reconnect();
return std::make_shared<HttpRequest>(m_Stream);
}
void HttpClientConnection::SubmitRequest(const std::shared_ptr<HttpRequest>& request,
const HttpCompletionCallback& callback)
{
m_Requests.emplace_back(request, callback);
request->Finish();
}

View File

@ -1,61 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef HTTPCLIENTCONNECTION_H
#define HTTPCLIENTCONNECTION_H
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
#include "base/stream.hpp"
#include "base/timer.hpp"
#include <deque>
namespace icinga
{
/**
* An HTTP client connection.
*
* @ingroup remote
*/
class HttpClientConnection final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(HttpClientConnection);
HttpClientConnection(String host, String port, bool tls = true);
void Start();
Stream::Ptr GetStream() const;
String GetHost() const;
String GetPort() const;
bool GetTls() const;
void Disconnect();
std::shared_ptr<HttpRequest> NewRequest();
typedef std::function<void(HttpRequest&, HttpResponse&)> HttpCompletionCallback;
void SubmitRequest(const std::shared_ptr<HttpRequest>& request, const HttpCompletionCallback& callback);
private:
String m_Host;
String m_Port;
bool m_Tls;
Stream::Ptr m_Stream;
std::deque<std::pair<std::shared_ptr<HttpRequest>, HttpCompletionCallback> > m_Requests;
std::shared_ptr<HttpResponse> m_CurrentResponse;
boost::mutex m_DataHandlerMutex;
StreamReadContext m_Context;
void Reconnect();
bool ProcessMessage();
void DataAvailableHandler(const Stream::Ptr& stream);
void ProcessMessageAsync(HttpRequest& request);
};
}
#endif /* HTTPCLIENTCONNECTION_H */

View File

@ -5,7 +5,6 @@
#include "remote/i2-remote.hpp"
#include "remote/url.hpp"
#include "remote/httpresponse.hpp"
#include "remote/httpserverconnection.hpp"
#include "remote/apiuser.hpp"
#include "base/registry.hpp"

View File

@ -1,248 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "remote/httprequest.hpp"
#include "base/logger.hpp"
#include "base/application.hpp"
#include "base/convert.hpp"
using namespace icinga;
HttpRequest::HttpRequest(Stream::Ptr stream)
: CompleteHeaders(false),
CompleteHeaderCheck(false),
CompleteBody(false),
ProtocolVersion(HttpVersion11),
Headers(new Dictionary()),
m_Stream(std::move(stream)),
m_State(HttpRequestStart)
{ }
bool HttpRequest::ParseHeaders(StreamReadContext& src, bool may_wait)
{
if (!m_Stream)
return false;
if (m_State != HttpRequestStart && m_State != HttpRequestHeaders)
BOOST_THROW_EXCEPTION(std::runtime_error("Invalid HTTP state"));
String line;
StreamReadStatus srs = m_Stream->ReadLine(&line, src, may_wait);
if (srs != StatusNewItem) {
if (src.Size > 8 * 1024)
BOOST_THROW_EXCEPTION(std::invalid_argument("Line length for HTTP header exceeded"));
return false;
}
if (line.GetLength() > 8 * 1024) {
#ifdef I2_DEBUG /* I2_DEBUG */
Log(LogDebug, "HttpRequest")
<< "Header size: " << line.GetLength() << " content: '" << line << "'.";
#endif /* I2_DEBUG */
BOOST_THROW_EXCEPTION(std::invalid_argument("Line length for HTTP header exceeded"));
}
if (m_State == HttpRequestStart) {
/* ignore trailing new-lines */
if (line == "")
return true;
std::vector<String> tokens = line.Split(" ");
Log(LogDebug, "HttpRequest")
<< "line: " << line << ", tokens: " << tokens.size();
if (tokens.size() != 3)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid HTTP request"));
RequestMethod = tokens[0];
RequestUrl = new class Url(tokens[1]);
if (tokens[2] == "HTTP/1.0")
ProtocolVersion = HttpVersion10;
else if (tokens[2] == "HTTP/1.1") {
ProtocolVersion = HttpVersion11;
} else
BOOST_THROW_EXCEPTION(std::invalid_argument("Unsupported HTTP version"));
m_State = HttpRequestHeaders;
return true;
} else { // m_State = HttpRequestHeaders
if (line == "") {
m_State = HttpRequestBody;
CompleteHeaders = true;
return true;
} else {
if (Headers->GetLength() > 128)
BOOST_THROW_EXCEPTION(std::invalid_argument("Maximum number of HTTP request headers exceeded"));
String::SizeType pos = line.FindFirstOf(":");
if (pos == String::NPos)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid HTTP request"));
String key = line.SubStr(0, pos).ToLower().Trim();
String value = line.SubStr(pos + 1).Trim();
Headers->Set(key, value);
if (key == "x-http-method-override")
RequestMethod = value;
return true;
}
}
}
bool HttpRequest::ParseBody(StreamReadContext& src, bool may_wait)
{
if (!m_Stream)
return false;
if (m_State != HttpRequestBody)
BOOST_THROW_EXCEPTION(std::runtime_error("Invalid HTTP state"));
/* we're done if the request doesn't contain a message body */
if (!Headers->Contains("content-length") && !Headers->Contains("transfer-encoding")) {
CompleteBody = true;
return true;
} else if (!m_Body)
m_Body = new FIFO();
if (Headers->Get("transfer-encoding") == "chunked") {
if (!m_ChunkContext)
m_ChunkContext = std::make_shared<ChunkReadContext>(std::ref(src));
char *data;
size_t size;
StreamReadStatus srs = HttpChunkedEncoding::ReadChunkFromStream(m_Stream, &data, &size, *m_ChunkContext.get(), may_wait);
if (srs != StatusNewItem)
return false;
m_Body->Write(data, size);
delete [] data;
if (size == 0) {
CompleteBody = true;
}
return true;
}
if (src.Eof)
BOOST_THROW_EXCEPTION(std::invalid_argument("Unexpected EOF in HTTP body"));
if (src.MustRead) {
if (!src.FillFromStream(m_Stream, false)) {
src.Eof = true;
BOOST_THROW_EXCEPTION(std::invalid_argument("Unexpected EOF in HTTP body"));
}
src.MustRead = false;
}
long length_indicator_signed = Convert::ToLong(Headers->Get("content-length"));
if (length_indicator_signed < 0)
BOOST_THROW_EXCEPTION(std::invalid_argument("Content-Length must not be negative."));
size_t length_indicator = length_indicator_signed;
if (src.Size < length_indicator) {
src.MustRead = true;
return false;
}
m_Body->Write(src.Buffer, length_indicator);
src.DropData(length_indicator);
CompleteBody = true;
return true;
}
size_t HttpRequest::ReadBody(char *data, size_t count)
{
if (!m_Body)
return 0;
else
return m_Body->Read(data, count, true);
}
void HttpRequest::AddHeader(const String& key, const String& value)
{
ASSERT(m_State == HttpRequestStart || m_State == HttpRequestHeaders);
Headers->Set(key.ToLower(), value);
}
void HttpRequest::FinishHeaders()
{
if (m_State == HttpRequestStart) {
String rqline = RequestMethod + " " + RequestUrl->Format(true) + " HTTP/1." + (ProtocolVersion == HttpVersion10 ? "0" : "1") + "\r\n";
m_Stream->Write(rqline.CStr(), rqline.GetLength());
m_State = HttpRequestHeaders;
}
if (m_State == HttpRequestHeaders) {
AddHeader("User-Agent", "Icinga/" + Application::GetAppVersion());
if (ProtocolVersion == HttpVersion11) {
AddHeader("Transfer-Encoding", "chunked");
if (!Headers->Contains("Host"))
AddHeader("Host", RequestUrl->GetHost() + ":" + RequestUrl->GetPort());
}
ObjectLock olock(Headers);
for (const Dictionary::Pair& kv : Headers)
{
String header = kv.first + ": " + kv.second + "\r\n";
m_Stream->Write(header.CStr(), header.GetLength());
}
m_Stream->Write("\r\n", 2);
m_State = HttpRequestBody;
}
}
void HttpRequest::WriteBody(const char *data, size_t count)
{
ASSERT(m_State == HttpRequestStart || m_State == HttpRequestHeaders || m_State == HttpRequestBody);
if (ProtocolVersion == HttpVersion10) {
if (!m_Body)
m_Body = new FIFO();
m_Body->Write(data, count);
} else {
FinishHeaders();
HttpChunkedEncoding::WriteChunkToStream(m_Stream, data, count);
}
}
void HttpRequest::Finish()
{
ASSERT(m_State != HttpRequestEnd);
if (ProtocolVersion == HttpVersion10) {
if (m_Body)
AddHeader("Content-Length", Convert::ToString(m_Body->GetAvailableBytes()));
FinishHeaders();
while (m_Body && m_Body->IsDataAvailable()) {
char buffer[1024];
size_t rc = m_Body->Read(buffer, sizeof(buffer), true);
m_Stream->Write(buffer, rc);
}
} else {
if (m_State == HttpRequestStart || m_State == HttpRequestHeaders)
FinishHeaders();
WriteBody(nullptr, 0);
m_Stream->Write("\r\n", 2);
}
m_State = HttpRequestEnd;
}

View File

@ -1,69 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef HTTPREQUEST_H
#define HTTPREQUEST_H
#include "remote/i2-remote.hpp"
#include "remote/httpchunkedencoding.hpp"
#include "remote/url.hpp"
#include "base/stream.hpp"
#include "base/fifo.hpp"
#include "base/dictionary.hpp"
namespace icinga
{
enum HttpVersion
{
HttpVersion10,
HttpVersion11
};
enum HttpRequestState
{
HttpRequestStart,
HttpRequestHeaders,
HttpRequestBody,
HttpRequestEnd
};
/**
* An HTTP request.
*
* @ingroup remote
*/
struct HttpRequest
{
public:
bool CompleteHeaders;
bool CompleteHeaderCheck;
bool CompleteBody;
String RequestMethod;
Url::Ptr RequestUrl;
HttpVersion ProtocolVersion;
Dictionary::Ptr Headers;
HttpRequest(Stream::Ptr stream);
bool ParseHeaders(StreamReadContext& src, bool may_wait);
bool ParseBody(StreamReadContext& src, bool may_wait);
size_t ReadBody(char *data, size_t count);
void AddHeader(const String& key, const String& value);
void WriteBody(const char *data, size_t count);
void Finish();
private:
Stream::Ptr m_Stream;
std::shared_ptr<ChunkReadContext> m_ChunkContext;
HttpRequestState m_State;
FIFO::Ptr m_Body;
void FinishHeaders();
};
}
#endif /* HTTPREQUEST_H */

View File

@ -1,259 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#include "remote/httpresponse.hpp"
#include "remote/httpchunkedencoding.hpp"
#include "base/logger.hpp"
#include "base/application.hpp"
#include "base/convert.hpp"
using namespace icinga;
HttpResponse::HttpResponse(Stream::Ptr stream, const HttpRequest& request)
: Complete(false), m_State(HttpResponseStart), m_Request(&request), m_Stream(std::move(stream))
{ }
void HttpResponse::SetStatus(int code, const String& message)
{
ASSERT(code >= 100 && code <= 599);
ASSERT(!message.IsEmpty());
if (m_State != HttpResponseStart) {
Log(LogWarning, "HttpResponse", "Tried to set Http response status after headers had already been sent.");
return;
}
String status = "HTTP/";
if (m_Request->ProtocolVersion == HttpVersion10)
status += "1.0";
else
status += "1.1";
status += " " + Convert::ToString(code) + " " + message + "\r\n";
m_Stream->Write(status.CStr(), status.GetLength());
m_State = HttpResponseHeaders;
}
void HttpResponse::AddHeader(const String& key, const String& value)
{
m_Headers.emplace_back(key + ": " + value + "\r\n");
}
void HttpResponse::FinishHeaders()
{
if (m_State == HttpResponseHeaders) {
if (m_Request->ProtocolVersion == HttpVersion11)
AddHeader("Transfer-Encoding", "chunked");
AddHeader("Server", "Icinga/" + Application::GetAppVersion());
for (const String& header : m_Headers)
m_Stream->Write(header.CStr(), header.GetLength());
m_Stream->Write("\r\n", 2);
m_State = HttpResponseBody;
}
}
void HttpResponse::WriteBody(const char *data, size_t count)
{
ASSERT(m_State == HttpResponseHeaders || m_State == HttpResponseBody);
if (m_Request->ProtocolVersion == HttpVersion10) {
if (!m_Body)
m_Body = new FIFO();
m_Body->Write(data, count);
} else {
FinishHeaders();
HttpChunkedEncoding::WriteChunkToStream(m_Stream, data, count);
}
}
void HttpResponse::Finish()
{
ASSERT(m_State != HttpResponseEnd);
if (m_Request->ProtocolVersion == HttpVersion10) {
if (m_Body)
AddHeader("Content-Length", Convert::ToString(m_Body->GetAvailableBytes()));
FinishHeaders();
while (m_Body && m_Body->IsDataAvailable()) {
char buffer[1024];
size_t rc = m_Body->Read(buffer, sizeof(buffer), true);
m_Stream->Write(buffer, rc);
}
} else {
WriteBody(nullptr, 0);
m_Stream->Write("\r\n", 2);
}
m_State = HttpResponseEnd;
/* Close the connection on
* a) HTTP/1.0
* b) Connection: close in the sent header.
*
* Do this here and not in DataAvailableHandler - there might still be incoming data in there.
*/
if (m_Request->ProtocolVersion == HttpVersion10 || m_Request->Headers->Get("connection") == "close")
m_Stream->Shutdown();
}
bool HttpResponse::Parse(StreamReadContext& src, bool may_wait)
{
if (m_State != HttpResponseBody) {
String line;
StreamReadStatus srs = m_Stream->ReadLine(&line, src, may_wait);
if (srs != StatusNewItem)
return false;
if (m_State == HttpResponseStart) {
/* ignore trailing new-lines */
if (line == "")
return true;
std::vector<String> tokens = line.Split(" ");
Log(LogDebug, "HttpRequest")
<< "line: " << line << ", tokens: " << tokens.size();
if (tokens.size() < 2)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid HTTP response (Status line)"));
if (tokens[0] == "HTTP/1.0")
ProtocolVersion = HttpVersion10;
else if (tokens[0] == "HTTP/1.1") {
ProtocolVersion = HttpVersion11;
} else
BOOST_THROW_EXCEPTION(std::invalid_argument("Unsupported HTTP version"));
StatusCode = Convert::ToLong(tokens[1]);
if (tokens.size() >= 3)
StatusMessage = tokens[2]; // TODO: Join tokens[2..end]
m_State = HttpResponseHeaders;
} else if (m_State == HttpResponseHeaders) {
if (!Headers)
Headers = new Dictionary();
if (line == "") {
m_State = HttpResponseBody;
m_Body = new FIFO();
return true;
} else {
String::SizeType pos = line.FindFirstOf(":");
if (pos == String::NPos)
BOOST_THROW_EXCEPTION(std::invalid_argument("Invalid HTTP request"));
String key = line.SubStr(0, pos).ToLower().Trim();
String value = line.SubStr(pos + 1).Trim();
Headers->Set(key, value);
}
} else {
VERIFY(!"Invalid HTTP request state.");
}
} else if (m_State == HttpResponseBody) {
if (Headers->Get("transfer-encoding") == "chunked") {
if (!m_ChunkContext)
m_ChunkContext = std::make_shared<ChunkReadContext>(std::ref(src));
char *data;
size_t size;
StreamReadStatus srs = HttpChunkedEncoding::ReadChunkFromStream(m_Stream, &data, &size, *m_ChunkContext.get(), may_wait);
if (srs != StatusNewItem)
return false;
Log(LogNotice, "HttpResponse")
<< "Read " << size << " bytes";
m_Body->Write(data, size);
delete[] data;
if (size == 0) {
Complete = true;
return true;
}
} else {
bool hasLengthIndicator = false;
size_t lengthIndicator = 0;
Value contentLengthHeader;
if (Headers->Get("content-length", &contentLengthHeader)) {
hasLengthIndicator = true;
lengthIndicator = Convert::ToLong(contentLengthHeader);
}
if (!hasLengthIndicator && ProtocolVersion != HttpVersion10 && !Headers->Contains("transfer-encoding")) {
Complete = true;
return true;
}
if (hasLengthIndicator && src.Eof)
BOOST_THROW_EXCEPTION(std::invalid_argument("Unexpected EOF in HTTP body"));
if (src.MustRead) {
if (!src.FillFromStream(m_Stream, may_wait))
src.Eof = true;
src.MustRead = false;
}
if (!hasLengthIndicator)
lengthIndicator = src.Size;
if (src.Size < lengthIndicator) {
src.MustRead = true;
return may_wait;
}
m_Body->Write(src.Buffer, lengthIndicator);
src.DropData(lengthIndicator);
if (!hasLengthIndicator && !src.Eof) {
src.MustRead = true;
return may_wait;
}
Complete = true;
return true;
}
}
return true;
}
size_t HttpResponse::ReadBody(char *data, size_t count)
{
if (!m_Body)
return 0;
else
return m_Body->Read(data, count, true);
}
size_t HttpResponse::GetBodySize() const
{
if (!m_Body)
return 0;
else
return m_Body->GetAvailableBytes();
}
bool HttpResponse::IsPeerConnected() const
{
return !m_Stream->IsEof();
}
void HttpResponse::RebindRequest(const HttpRequest& request)
{
m_Request = &request;
}

View File

@ -1,66 +0,0 @@
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
#ifndef HTTPRESPONSE_H
#define HTTPRESPONSE_H
#include "remote/httprequest.hpp"
#include "base/stream.hpp"
#include "base/fifo.hpp"
#include <vector>
namespace icinga
{
enum HttpResponseState
{
HttpResponseStart,
HttpResponseHeaders,
HttpResponseBody,
HttpResponseEnd
};
/**
* An HTTP response.
*
* @ingroup remote
*/
struct HttpResponse
{
public:
bool Complete;
HttpVersion ProtocolVersion;
int StatusCode;
String StatusMessage;
Dictionary::Ptr Headers;
HttpResponse(Stream::Ptr stream, const HttpRequest& request);
bool Parse(StreamReadContext& src, bool may_wait);
size_t ReadBody(char *data, size_t count);
size_t GetBodySize() const;
void SetStatus(int code, const String& message);
void AddHeader(const String& key, const String& value);
void WriteBody(const char *data, size_t count);
void Finish();
bool IsPeerConnected() const;
void RebindRequest(const HttpRequest& request);
private:
HttpResponseState m_State;
std::shared_ptr<ChunkReadContext> m_ChunkContext;
const HttpRequest *m_Request;
Stream::Ptr m_Stream;
FIFO::Ptr m_Body;
std::vector<String> m_Headers;
void FinishHeaders();
};
}
#endif /* HTTPRESPONSE_H */

View File

@ -37,29 +37,6 @@ Dictionary::Ptr HttpUtility::FetchRequestParameters(const Url::Ptr& url, const s
return result;
}
void HttpUtility::SendJsonBody(HttpResponse& response, const Dictionary::Ptr& params, const Value& val)
{
response.AddHeader("Content-Type", "application/json");
bool prettyPrint = false;
if (params)
prettyPrint = GetLastParameter(params, "pretty");
String body = JsonEncode(val, prettyPrint);
response.WriteBody(body.CStr(), body.GetLength());
}
void HttpUtility::SendJsonBody(boost::beast::http::response<boost::beast::http::string_body>& response, const Dictionary::Ptr& params, const Value& val)
{
namespace http = boost::beast::http;
response.set(http::field::content_type, "application/json");
response.body() = JsonEncode(val, params && GetLastParameter(params, "pretty"));
response.set(http::field::content_length, response.body().size());
}
Value HttpUtility::GetLastParameter(const Dictionary::Ptr& params, const String& key)
{
Value varr = params->Get(key);
@ -75,27 +52,13 @@ Value HttpUtility::GetLastParameter(const Dictionary::Ptr& params, const String&
return arr->Get(arr->GetLength() - 1);
}
void HttpUtility::SendJsonError(HttpResponse& response, const Dictionary::Ptr& params,
int code, const String& info, const String& diagnosticInformation)
void HttpUtility::SendJsonBody(boost::beast::http::response<boost::beast::http::string_body>& response, const Dictionary::Ptr& params, const Value& val)
{
Dictionary::Ptr result = new Dictionary();
response.SetStatus(code, HttpUtility::GetErrorNameByCode(code));
result->Set("error", code);
namespace http = boost::beast::http;
bool verbose = false;
if (params)
verbose = HttpUtility::GetLastParameter(params, "verbose");
if (!info.IsEmpty())
result->Set("status", info);
if (verbose) {
if (!diagnosticInformation.IsEmpty())
result->Set("diagnostic_information", diagnosticInformation);
}
HttpUtility::SendJsonBody(response, params, result);
response.set(http::field::content_type, "application/json");
response.body() = JsonEncode(val, params && GetLastParameter(params, "pretty"));
response.set(http::field::content_length, response.body().size());
}
void HttpUtility::SendJsonError(boost::beast::http::response<boost::beast::http::string_body>& response,
@ -115,32 +78,3 @@ void HttpUtility::SendJsonError(boost::beast::http::response<boost::beast::http:
HttpUtility::SendJsonBody(response, params, result);
}
String HttpUtility::GetErrorNameByCode(const int code)
{
switch(code) {
case 200:
return "OK";
case 201:
return "Created";
case 204:
return "No Content";
case 304:
return "Not Modified";
case 400:
return "Bad Request";
case 401:
return "Unauthorized";
case 403:
return "Forbidden";
case 404:
return "Not Found";
case 409:
return "Conflict";
case 500:
return "Internal Server Error";
default:
return "Unknown Error Code";
}
}

View File

@ -3,12 +3,10 @@
#ifndef HTTPUTILITY_H
#define HTTPUTILITY_H
#include "remote/httprequest.hpp"
#include "remote/httpresponse.hpp"
#include "remote/url.hpp"
#include "base/dictionary.hpp"
#include <string>
#include <boost/beast/http.hpp>
#include <string>
namespace icinga
{
@ -23,17 +21,11 @@ class HttpUtility
public:
static Dictionary::Ptr FetchRequestParameters(const Url::Ptr& url, const std::string& body);
static void SendJsonBody(HttpResponse& response, const Dictionary::Ptr& params, const Value& val);
static void SendJsonBody(boost::beast::http::response<boost::beast::http::string_body>& response, const Dictionary::Ptr& params, const Value& val);
static Value GetLastParameter(const Dictionary::Ptr& params, const String& key);
static void SendJsonError(HttpResponse& response, const Dictionary::Ptr& params, const int code,
const String& verbose = String(), const String& diagnosticInformation = String());
static void SendJsonBody(boost::beast::http::response<boost::beast::http::string_body>& response, const Dictionary::Ptr& params, const Value& val);
static void SendJsonError(boost::beast::http::response<boost::beast::http::string_body>& response, const Dictionary::Ptr& params, const int code,
const String& verbose = String(), const String& diagnosticInformation = String());
private:
static String GetErrorNameByCode(int code);
};
}

View File

@ -15,6 +15,11 @@
using namespace icinga;
#ifdef I2_DEBUG
/**
* Determine whether the developer wants to see raw JSON messages.
*
* @return Internal.DebugJsonRpc boolean
*/
static bool GetDebugJsonRpcCached()
{
static int debugJsonRpc = -1;
@ -40,25 +45,6 @@ static bool GetDebugJsonRpcCached()
}
#endif /* I2_DEBUG */
/**
* Sends a message to the connected peer and returns the bytes sent.
*
* @param message The message.
*
* @return The amount of bytes sent.
*/
size_t JsonRpc::SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message)
{
String json = JsonEncode(message);
#ifdef I2_DEBUG
if (GetDebugJsonRpcCached())
std::cerr << ConsoleColorTag(Console_ForegroundBlue) << ">> " << json << ConsoleColorTag(Console_Normal) << "\n";
#endif /* I2_DEBUG */
return NetString::WriteStringToStream(stream, json);
}
/**
* Sends a message to the connected peer and returns the bytes sent.
*
@ -90,13 +76,15 @@ size_t JsonRpc::SendMessage(const std::shared_ptr<AsioTlsStream>& stream, const
return JsonRpc::SendRawMessage(stream, JsonEncode(message), yc);
}
/**
* Sends a message to the connected peer and returns the bytes sent.
*
* @param message The message.
*
* @return The amount of bytes sent.
*/
/**
* Sends a raw message to the connected peer.
*
* @param stream ASIO TLS Stream
* @param json message
* @param yc Yield context required for ASIO
*
* @return bytes sent
*/
size_t JsonRpc::SendRawMessage(const std::shared_ptr<AsioTlsStream>& stream, const String& json, boost::asio::yield_context yc)
{
#ifdef I2_DEBUG
@ -107,23 +95,14 @@ size_t JsonRpc::SendRawMessage(const std::shared_ptr<AsioTlsStream>& stream, con
return NetString::WriteStringToStream(stream, json, yc);
}
StreamReadStatus JsonRpc::ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait, ssize_t maxMessageLength)
{
String jsonString;
StreamReadStatus srs = NetString::ReadStringFromStream(stream, &jsonString, src, may_wait, maxMessageLength);
if (srs != StatusNewItem)
return srs;
*message = jsonString;
#ifdef I2_DEBUG
if (GetDebugJsonRpcCached())
std::cerr << ConsoleColorTag(Console_ForegroundBlue) << "<< " << jsonString << ConsoleColorTag(Console_Normal) << "\n";
#endif /* I2_DEBUG */
return StatusNewItem;
}
/**
* Reads a message from the connected peer.
*
* @param stream ASIO TLS Stream
* @param maxMessageLength maximum size of bytes read.
*
* @return A JSON string
*/
String JsonRpc::ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, ssize_t maxMessageLength)
{
@ -137,6 +116,15 @@ String JsonRpc::ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, ssize_
return std::move(jsonString);
}
/**
* Reads a message from the connected peer.
*
* @param stream ASIO TLS Stream
* @param yc Yield Context for ASIO
* @param maxMessageLength maximum size of bytes read.
*
* @return A JSON string
*/
String JsonRpc::ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, boost::asio::yield_context yc, ssize_t maxMessageLength)
{
String jsonString = NetString::ReadStringFromStream(stream, yc, maxMessageLength);
@ -149,6 +137,13 @@ String JsonRpc::ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, boost:
return std::move(jsonString);
}
/**
* Decode message, enforce a Dictionary
*
* @param message JSON string
*
* @return Dictionary ptr
*/
Dictionary::Ptr JsonRpc::DecodeMessage(const String& message)
{
Value value = JsonDecode(message);

View File

@ -21,13 +21,13 @@ namespace icinga
class JsonRpc
{
public:
static size_t SendMessage(const Stream::Ptr& stream, const Dictionary::Ptr& message);
static size_t SendMessage(const std::shared_ptr<AsioTlsStream>& stream, const Dictionary::Ptr& message);
static size_t SendMessage(const std::shared_ptr<AsioTlsStream>& stream, const Dictionary::Ptr& message, boost::asio::yield_context yc);
static size_t SendRawMessage(const std::shared_ptr<AsioTlsStream>& stream, const String& json, boost::asio::yield_context yc);
static StreamReadStatus ReadMessage(const Stream::Ptr& stream, String *message, StreamReadContext& src, bool may_wait = false, ssize_t maxMessageLength = -1);
static String ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, ssize_t maxMessageLength = -1);
static String ReadMessage(const std::shared_ptr<AsioTlsStream>& stream, boost::asio::yield_context yc, ssize_t maxMessageLength = -1);
static Dictionary::Ptr DecodeMessage(const String& message);
private: