From c1fa07899c44702b00bce71299c2e5477fd369df Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 23 Apr 2019 11:25:26 +0200 Subject: [PATCH 1/2] Introduce OptionalTlsStream --- lib/base/tlsstream.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/lib/base/tlsstream.hpp b/lib/base/tlsstream.hpp index 6156a3d2f..e9027e24f 100644 --- a/lib/base/tlsstream.hpp +++ b/lib/base/tlsstream.hpp @@ -9,6 +9,7 @@ #include "base/stream.hpp" #include "base/tlsutility.hpp" #include "base/fifo.hpp" +#include #include #include #include @@ -163,6 +164,9 @@ private: } }; +typedef boost::asio::buffered_stream AsioTcpStream; +typedef std::pair, std::shared_ptr> OptionalTlsStream; + } #endif /* TLSSTREAM_H */ From 14fdfff770abf11f96253de44ff5fc27cf4a00d8 Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 23 Apr 2019 11:25:52 +0200 Subject: [PATCH 2/2] Use new I/O engine in InfluxdbWriter --- lib/perfdata/influxdbwriter.cpp | 136 +++++++++++++++++++------------- lib/perfdata/influxdbwriter.hpp | 3 +- 2 files changed, 84 insertions(+), 55 deletions(-) diff --git a/lib/perfdata/influxdbwriter.cpp b/lib/perfdata/influxdbwriter.cpp index d732a83f2..c99164e4a 100644 --- a/lib/perfdata/influxdbwriter.cpp +++ b/lib/perfdata/influxdbwriter.cpp @@ -9,7 +9,9 @@ #include "icinga/macroprocessor.hpp" #include "icinga/icingaapplication.hpp" #include "icinga/checkcommand.hpp" +#include "base/application.hpp" #include "base/defer.hpp" +#include "base/io-engine.hpp" #include "base/tcpsocket.hpp" #include "base/configtype.hpp" #include "base/objectlock.hpp" @@ -25,9 +27,21 @@ #include "base/tlsutility.hpp" #include #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include #include #include +#include +#include #include using namespace icinga; @@ -156,44 +170,51 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp) //TODO: Close the connection, if we keep it open. } -Stream::Ptr InfluxdbWriter::Connect() +OptionalTlsStream InfluxdbWriter::Connect() { - TcpSocket::Ptr socket = new TcpSocket(); - Log(LogNotice, "InfluxdbWriter") << "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - try { - socket->Connect(GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "InfluxdbWriter") - << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; - } + OptionalTlsStream stream; + bool ssl = GetSslEnable(); + + if (ssl) { + std::shared_ptr sslContext; - if (GetSslEnable()) { - std::shared_ptr sslContext; try { - sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert()); + sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert()); } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Unable to create SSL context."; - throw ex; + throw; } - TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext); + stream.first = std::make_shared(IoEngine::Get().GetIoService(), *sslContext, GetHost()); + } else { + stream.second = std::make_shared(IoEngine::Get().GetIoService()); + } + + try { + icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception& ex) { + Log(LogWarning, "InfluxdbWriter") + << "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (ssl) { + auto& tlsStream (stream.first->next_layer()); + try { - tlsStream->Handshake(); + tlsStream.handshake(tlsStream.client); } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "TLS handshake with host '" << GetHost() << "' failed."; - throw ex; + throw; } - - return tlsStream; - } else { - return new NetworkStream(socket); } + + return std::move(stream); } void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) @@ -429,6 +450,9 @@ void InfluxdbWriter::FlushTimeoutWQ() void InfluxdbWriter::Flush() { + namespace beast = boost::beast; + namespace http = beast::http; + /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ if (m_DataBuffer.empty()) return; @@ -439,7 +463,7 @@ void InfluxdbWriter::Flush() String body = boost::algorithm::join(m_DataBuffer, "\n"); m_DataBuffer.clear(); - Stream::Ptr stream; + OptionalTlsStream stream; try { stream = Connect(); @@ -449,10 +473,11 @@ void InfluxdbWriter::Flush() return; } - if (!stream) - return; - - Defer close ([&stream]() { stream->Close(); }); + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); Url::Ptr url = new Url(); url->SetScheme(GetSslEnable() ? "https" : "http"); @@ -470,59 +495,64 @@ void InfluxdbWriter::Flush() if (!GetPassword().IsEmpty()) url->AddQueryElement("p", GetPassword()); - HttpRequest req(stream); - req.RequestMethod = "POST"; - req.RequestUrl = url; + http::request request (http::verb::post, std::string(url->Format(true)), 10); + + request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + + request.body() = body; + request.set(http::field::content_length, request.body().size()); try { - req.WriteBody(body.CStr(), body.GetLength()); - req.Finish(); + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; + throw; } - HttpResponse resp(stream, req); - StreamReadContext context; + http::parser parser; + beast::flat_buffer buf; try { - while (resp.Parse(context, true) && !resp.Complete) - ; /* Do nothing */ + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } } catch (const std::exception& ex) { Log(LogWarning, "InfluxdbWriter") << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex); - throw ex; + throw; } - if (!resp.Complete) { - Log(LogWarning, "InfluxdbWriter") - << "Failed to read a complete HTTP response from the InfluxDB server."; - return; - } + auto& response (parser.get()); - if (resp.StatusCode != 204) { + if (response.result() != http::status::no_content) { Log(LogWarning, "InfluxdbWriter") - << "Unexpected response code: " << resp.StatusCode; + << "Unexpected response code: " << response.result(); - String contentType = resp.Headers->Get("content-type"); + auto& contentType (response[http::field::content_type]); if (contentType != "application/json") { Log(LogWarning, "InfluxdbWriter") << "Unexpected Content-Type: " << contentType; return; } - size_t responseSize = resp.GetBodySize(); - boost::scoped_array buffer(new char[responseSize + 1]); - resp.ReadBody(buffer.get(), responseSize); - buffer.get()[responseSize] = '\0'; - Dictionary::Ptr jsonResponse; + auto& body (response.body()); + try { - jsonResponse = JsonDecode(buffer.get()); + jsonResponse = JsonDecode(body); } catch (...) { Log(LogWarning, "InfluxdbWriter") - << "Unable to parse JSON response:\n" << buffer.get(); + << "Unable to parse JSON response:\n" << body; return; } @@ -530,8 +560,6 @@ void InfluxdbWriter::Flush() Log(LogCritical, "InfluxdbWriter") << "InfluxDB error message:\n" << error; - - return; } } diff --git a/lib/perfdata/influxdbwriter.hpp b/lib/perfdata/influxdbwriter.hpp index b3d35d365..1f7ab8309 100644 --- a/lib/perfdata/influxdbwriter.hpp +++ b/lib/perfdata/influxdbwriter.hpp @@ -8,6 +8,7 @@ #include "base/configobject.hpp" #include "base/tcpsocket.hpp" #include "base/timer.hpp" +#include "base/tlsstream.hpp" #include "base/workqueue.hpp" #include @@ -51,7 +52,7 @@ private: static String EscapeKeyOrTagValue(const String& str); static String EscapeValue(const Value& value); - Stream::Ptr Connect(); + OptionalTlsStream Connect(); void AssertOnWorkQueue();