From 3f0066e33be0c6ea33f7ce1c7a889c0d1ed1b0db Mon Sep 17 00:00:00 2001 From: "Alexander A. Klimov" Date: Tue, 23 Apr 2019 13:15:38 +0200 Subject: [PATCH] Use new I/O engine in ElasticsearchWriter --- lib/perfdata/elasticsearchwriter.cpp | 150 ++++++++++++++++----------- lib/perfdata/elasticsearchwriter.hpp | 3 +- 2 files changed, 89 insertions(+), 64 deletions(-) diff --git a/lib/perfdata/elasticsearchwriter.cpp b/lib/perfdata/elasticsearchwriter.cpp index a8409f635..8ea4fdef1 100644 --- a/lib/perfdata/elasticsearchwriter.cpp +++ b/lib/perfdata/elasticsearchwriter.cpp @@ -8,7 +8,9 @@ #include "icinga/compatutility.hpp" #include "icinga/service.hpp" #include "icinga/checkcommand.hpp" +#include "base/application.hpp" #include "base/defer.hpp" +#include "base/io-engine.hpp" #include "base/tcpsocket.hpp" #include "base/stream.hpp" #include "base/base64.hpp" @@ -19,7 +21,19 @@ #include "base/exception.hpp" #include "base/statsfunction.hpp" #include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include #include +#include +#include #include using namespace icinga; @@ -418,6 +432,9 @@ void ElasticsearchWriter::Flush() void ElasticsearchWriter::SendRequest(const String& body) { + namespace beast = boost::beast; + namespace http = beast::http; + Url::Ptr url = new Url(); url->SetScheme(GetEnableTls() ? "https" : "http"); @@ -441,7 +458,7 @@ void ElasticsearchWriter::SendRequest(const String& body) url->SetPath(path); - Stream::Ptr stream; + OptionalTlsStream stream; try { stream = Connect(); @@ -451,67 +468,74 @@ void ElasticsearchWriter::SendRequest(const String& body) return; } - if (!stream) - return; + Defer s ([&stream]() { + if (stream.first) { + stream.first->next_layer().shutdown(); + } + }); - Defer close ([&stream]() { stream->Close(); }); + http::request request (http::verb::post, std::string(url->Format(true)), 10); - HttpRequest req(stream); + request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); /* Specify required headers by Elasticsearch. */ - req.AddHeader("Accept", "application/json"); + request.set(http::field::accept, "application/json"); /* Use application/x-ndjson for bulk streams. While ES * is able to handle application/json, the newline separator * causes problems with Logstash (#6609). */ - req.AddHeader("Content-Type", "application/x-ndjson"); + request.set(http::field::content_type, "application/x-ndjson"); /* Send authentication if configured. */ String username = GetUsername(); String password = GetPassword(); if (!username.IsEmpty() && !password.IsEmpty()) - req.AddHeader("Authorization", "Basic " + Base64::Encode(username + ":" + password)); + request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password)); - req.RequestMethod = "POST"; - req.RequestUrl = url; + request.body() = body; + request.set(http::field::content_length, request.body().size()); /* Don't log the request body to debug log, this is already done above. */ Log(LogDebug, "ElasticsearchWriter") - << "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) + << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) << " to '" << url->Format() << "'."; try { - req.WriteBody(body.CStr(), body.GetLength()); - req.Finish(); - } catch (const std::exception& ex) { + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + } else { + http::write(*stream.second, request); + stream.second->flush(); + } + } catch (const std::exception&) { Log(LogWarning, "ElasticsearchWriter") << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; + throw; } - HttpResponse resp(stream, req); - StreamReadContext context; + http::parser parser; + beast::flat_buffer buf; try { - resp.Parse(context, true); - while (resp.Parse(context, true) && !resp.Complete) - ; /* Do nothing */ + if (stream.first) { + http::read(*stream.first, buf, parser); + } else { + http::read(*stream.second, buf, parser); + } } catch (const std::exception& ex) { Log(LogWarning, "ElasticsearchWriter") << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); - throw ex; + throw; } - if (!resp.Complete) { - Log(LogWarning, "ElasticsearchWriter") - << "Failed to read a complete HTTP response from the Elasticsearch server."; - return; - } + auto& response (parser.get()); - if (resp.StatusCode > 299) { - if (resp.StatusCode == 401) { + if (response.result_int() > 299) { + if (response.result() == http::status::unauthorized) { /* More verbose error logging with Elasticsearch is hidden behind a proxy. */ if (!username.IsEmpty() && !password.IsEmpty()) { Log(LogCritical, "ElasticsearchWriter") @@ -526,30 +550,27 @@ void ElasticsearchWriter::SendRequest(const String& body) } std::ostringstream msgbuf; - msgbuf << "Unexpected response code " << resp.StatusCode << " from URL '" << req.RequestUrl->Format() << "'"; + msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'"; - String contentType = resp.Headers->Get("content-type"); + auto& contentType (response[http::field::content_type]); if (contentType != "application/json" && contentType != "application/json; charset=utf-8") { msgbuf << "; Unexpected Content-Type: '" << contentType << "'"; } - size_t responseSize = resp.GetBodySize(); - boost::scoped_array buffer(new char[responseSize + 1]); - resp.ReadBody(buffer.get(), responseSize); - buffer.get()[responseSize] = '\0'; + auto& body (response.body()); #ifdef I2_DEBUG - msgbuf << "; Response body: '" << buffer.get() << "'"; + msgbuf << "; Response body: '" << body << "'"; #endif /* I2_DEBUG */ - /* {"statusCode":404,"error":"Not Found","message":"Not Found"} */ Dictionary::Ptr jsonResponse; + try { - jsonResponse = JsonDecode(buffer.get()); + jsonResponse = JsonDecode(body); } catch (...) { Log(LogWarning, "ElasticsearchWriter") - << "Unable to parse JSON response:\n" << buffer.get(); + << "Unable to parse JSON response:\n" << body; return; } @@ -557,51 +578,54 @@ void ElasticsearchWriter::SendRequest(const String& body) Log(LogCritical, "ElasticsearchWriter") << "Error: '" << error << "'. " << msgbuf.str(); - - return; } } -Stream::Ptr ElasticsearchWriter::Connect() +OptionalTlsStream ElasticsearchWriter::Connect() { - TcpSocket::Ptr socket = new TcpSocket(); - Log(LogNotice, "ElasticsearchWriter") << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - try { - socket->Connect(GetHost(), GetPort()); - } catch (const std::exception& ex) { - Log(LogWarning, "ElasticsearchWriter") - << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; - throw ex; - } + OptionalTlsStream stream; + bool tls = GetEnableTls(); - if (GetEnableTls()) { - std::shared_ptr sslContext; + if (tls) { + std::shared_ptr sslContext; try { - sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath()); - } catch (const std::exception& ex) { + sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception&) { Log(LogWarning, "ElasticsearchWriter") << "Unable to create SSL context."; - throw ex; + throw; } - TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext); + stream.first = std::make_shared(IoEngine::Get().GetIoService(), *sslContext, GetHost()); + } else { + stream.second = std::make_shared(IoEngine::Get().GetIoService()); + } + + try { + icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchWriter") + << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (tls) { + auto& tlsStream (stream.first->next_layer()); try { - tlsStream->Handshake(); - } catch (const std::exception& ex) { + tlsStream.handshake(tlsStream.client); + } catch (const std::exception&) { Log(LogWarning, "ElasticsearchWriter") << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; - throw ex; + throw; } - - return tlsStream; - } else { - return new NetworkStream(socket); } + + return std::move(stream); } void ElasticsearchWriter::AssertOnWorkQueue() diff --git a/lib/perfdata/elasticsearchwriter.hpp b/lib/perfdata/elasticsearchwriter.hpp index cf60044cd..45658f574 100644 --- a/lib/perfdata/elasticsearchwriter.hpp +++ b/lib/perfdata/elasticsearchwriter.hpp @@ -8,6 +8,7 @@ #include "base/configobject.hpp" #include "base/workqueue.hpp" #include "base/timer.hpp" +#include "base/tlsstream.hpp" namespace icinga { @@ -50,7 +51,7 @@ private: void Enqueue(const Checkable::Ptr& checkable, const String& type, const Dictionary::Ptr& fields, double ts); - Stream::Ptr Connect(); + OptionalTlsStream Connect(); void AssertOnWorkQueue(); void ExceptionHandler(boost::exception_ptr exp); void FlushTimeout();