Use new I/O engine in ElasticsearchWriter

This commit is contained in:
Alexander A. Klimov 2019-04-23 13:15:38 +02:00
parent 856877d1fe
commit 3f0066e33b
2 changed files with 89 additions and 64 deletions

View File

@ -8,7 +8,9 @@
#include "icinga/compatutility.hpp" #include "icinga/compatutility.hpp"
#include "icinga/service.hpp" #include "icinga/service.hpp"
#include "icinga/checkcommand.hpp" #include "icinga/checkcommand.hpp"
#include "base/application.hpp"
#include "base/defer.hpp" #include "base/defer.hpp"
#include "base/io-engine.hpp"
#include "base/tcpsocket.hpp" #include "base/tcpsocket.hpp"
#include "base/stream.hpp" #include "base/stream.hpp"
#include "base/base64.hpp" #include "base/base64.hpp"
@ -19,7 +21,19 @@
#include "base/exception.hpp" #include "base/exception.hpp"
#include "base/statsfunction.hpp" #include "base/statsfunction.hpp"
#include <boost/algorithm/string.hpp> #include <boost/algorithm/string.hpp>
#include <boost/asio/ssl/context.hpp>
#include <boost/beast/core/flat_buffer.hpp>
#include <boost/beast/http/field.hpp>
#include <boost/beast/http/message.hpp>
#include <boost/beast/http/parser.hpp>
#include <boost/beast/http/read.hpp>
#include <boost/beast/http/status.hpp>
#include <boost/beast/http/string_body.hpp>
#include <boost/beast/http/verb.hpp>
#include <boost/beast/http/write.hpp>
#include <boost/scoped_array.hpp> #include <boost/scoped_array.hpp>
#include <memory>
#include <string>
#include <utility> #include <utility>
using namespace icinga; using namespace icinga;
@ -418,6 +432,9 @@ void ElasticsearchWriter::Flush()
void ElasticsearchWriter::SendRequest(const String& body) void ElasticsearchWriter::SendRequest(const String& body)
{ {
namespace beast = boost::beast;
namespace http = beast::http;
Url::Ptr url = new Url(); Url::Ptr url = new Url();
url->SetScheme(GetEnableTls() ? "https" : "http"); url->SetScheme(GetEnableTls() ? "https" : "http");
@ -441,7 +458,7 @@ void ElasticsearchWriter::SendRequest(const String& body)
url->SetPath(path); url->SetPath(path);
Stream::Ptr stream; OptionalTlsStream stream;
try { try {
stream = Connect(); stream = Connect();
@ -451,67 +468,74 @@ void ElasticsearchWriter::SendRequest(const String& body)
return; return;
} }
if (!stream) Defer s ([&stream]() {
return; if (stream.first) {
stream.first->next_layer().shutdown();
}
});
Defer close ([&stream]() { stream->Close(); }); http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
HttpRequest req(stream); request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
/* Specify required headers by Elasticsearch. */ /* Specify required headers by Elasticsearch. */
req.AddHeader("Accept", "application/json"); request.set(http::field::accept, "application/json");
/* Use application/x-ndjson for bulk streams. While ES /* Use application/x-ndjson for bulk streams. While ES
* is able to handle application/json, the newline separator * is able to handle application/json, the newline separator
* causes problems with Logstash (#6609). * causes problems with Logstash (#6609).
*/ */
req.AddHeader("Content-Type", "application/x-ndjson"); request.set(http::field::content_type, "application/x-ndjson");
/* Send authentication if configured. */ /* Send authentication if configured. */
String username = GetUsername(); String username = GetUsername();
String password = GetPassword(); String password = GetPassword();
if (!username.IsEmpty() && !password.IsEmpty()) if (!username.IsEmpty() && !password.IsEmpty())
req.AddHeader("Authorization", "Basic " + Base64::Encode(username + ":" + password)); request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
req.RequestMethod = "POST"; request.body() = body;
req.RequestUrl = url; request.set(http::field::content_length, request.body().size());
/* Don't log the request body to debug log, this is already done above. */ /* Don't log the request body to debug log, this is already done above. */
Log(LogDebug, "ElasticsearchWriter") Log(LogDebug, "ElasticsearchWriter")
<< "Sending " << req.RequestMethod << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
<< " to '" << url->Format() << "'."; << " to '" << url->Format() << "'.";
try { try {
req.WriteBody(body.CStr(), body.GetLength()); if (stream.first) {
req.Finish(); http::write(*stream.first, request);
} catch (const std::exception& ex) { stream.first->flush();
} else {
http::write(*stream.second, request);
stream.second->flush();
}
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter") Log(LogWarning, "ElasticsearchWriter")
<< "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Cannot write to HTTP API on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex; throw;
} }
HttpResponse resp(stream, req); http::parser<false, http::string_body> parser;
StreamReadContext context; beast::flat_buffer buf;
try { try {
resp.Parse(context, true); if (stream.first) {
while (resp.Parse(context, true) && !resp.Complete) http::read(*stream.first, buf, parser);
; /* Do nothing */ } else {
http::read(*stream.second, buf, parser);
}
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "ElasticsearchWriter") Log(LogWarning, "ElasticsearchWriter")
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false); << "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex, false);
throw ex; throw;
} }
if (!resp.Complete) { auto& response (parser.get());
Log(LogWarning, "ElasticsearchWriter")
<< "Failed to read a complete HTTP response from the Elasticsearch server.";
return;
}
if (resp.StatusCode > 299) { if (response.result_int() > 299) {
if (resp.StatusCode == 401) { if (response.result() == http::status::unauthorized) {
/* More verbose error logging with Elasticsearch is hidden behind a proxy. */ /* More verbose error logging with Elasticsearch is hidden behind a proxy. */
if (!username.IsEmpty() && !password.IsEmpty()) { if (!username.IsEmpty() && !password.IsEmpty()) {
Log(LogCritical, "ElasticsearchWriter") Log(LogCritical, "ElasticsearchWriter")
@ -526,30 +550,27 @@ void ElasticsearchWriter::SendRequest(const String& body)
} }
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "Unexpected response code " << resp.StatusCode << " from URL '" << req.RequestUrl->Format() << "'"; msgbuf << "Unexpected response code " << response.result_int() << " from URL '" << url->Format() << "'";
String contentType = resp.Headers->Get("content-type"); auto& contentType (response[http::field::content_type]);
if (contentType != "application/json" && contentType != "application/json; charset=utf-8") { if (contentType != "application/json" && contentType != "application/json; charset=utf-8") {
msgbuf << "; Unexpected Content-Type: '" << contentType << "'"; msgbuf << "; Unexpected Content-Type: '" << contentType << "'";
} }
size_t responseSize = resp.GetBodySize(); auto& body (response.body());
boost::scoped_array<char> buffer(new char[responseSize + 1]);
resp.ReadBody(buffer.get(), responseSize);
buffer.get()[responseSize] = '\0';
#ifdef I2_DEBUG #ifdef I2_DEBUG
msgbuf << "; Response body: '" << buffer.get() << "'"; msgbuf << "; Response body: '" << body << "'";
#endif /* I2_DEBUG */ #endif /* I2_DEBUG */
/* {"statusCode":404,"error":"Not Found","message":"Not Found"} */
Dictionary::Ptr jsonResponse; Dictionary::Ptr jsonResponse;
try { try {
jsonResponse = JsonDecode(buffer.get()); jsonResponse = JsonDecode(body);
} catch (...) { } catch (...) {
Log(LogWarning, "ElasticsearchWriter") Log(LogWarning, "ElasticsearchWriter")
<< "Unable to parse JSON response:\n" << buffer.get(); << "Unable to parse JSON response:\n" << body;
return; return;
} }
@ -557,51 +578,54 @@ void ElasticsearchWriter::SendRequest(const String& body)
Log(LogCritical, "ElasticsearchWriter") Log(LogCritical, "ElasticsearchWriter")
<< "Error: '" << error << "'. " << msgbuf.str(); << "Error: '" << error << "'. " << msgbuf.str();
return;
} }
} }
Stream::Ptr ElasticsearchWriter::Connect() OptionalTlsStream ElasticsearchWriter::Connect()
{ {
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "ElasticsearchWriter") Log(LogNotice, "ElasticsearchWriter")
<< "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
try { OptionalTlsStream stream;
socket->Connect(GetHost(), GetPort()); bool tls = GetEnableTls();
} catch (const std::exception& ex) {
Log(LogWarning, "ElasticsearchWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
if (GetEnableTls()) { if (tls) {
std::shared_ptr<SSL_CTX> sslContext; std::shared_ptr<boost::asio::ssl::context> sslContext;
try { try {
sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath()); sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) { } catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter") Log(LogWarning, "ElasticsearchWriter")
<< "Unable to create SSL context."; << "Unable to create SSL context.";
throw ex; throw;
} }
TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext); stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
} else {
stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
}
try {
icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter")
<< "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw;
}
if (tls) {
auto& tlsStream (stream.first->next_layer());
try { try {
tlsStream->Handshake(); tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) { } catch (const std::exception&) {
Log(LogWarning, "ElasticsearchWriter") Log(LogWarning, "ElasticsearchWriter")
<< "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
throw ex; throw;
} }
return tlsStream;
} else {
return new NetworkStream(socket);
} }
return std::move(stream);
} }
void ElasticsearchWriter::AssertOnWorkQueue() void ElasticsearchWriter::AssertOnWorkQueue()

View File

@ -8,6 +8,7 @@
#include "base/configobject.hpp" #include "base/configobject.hpp"
#include "base/workqueue.hpp" #include "base/workqueue.hpp"
#include "base/timer.hpp" #include "base/timer.hpp"
#include "base/tlsstream.hpp"
namespace icinga namespace icinga
{ {
@ -50,7 +51,7 @@ private:
void Enqueue(const Checkable::Ptr& checkable, const String& type, void Enqueue(const Checkable::Ptr& checkable, const String& type,
const Dictionary::Ptr& fields, double ts); const Dictionary::Ptr& fields, double ts);
Stream::Ptr Connect(); OptionalTlsStream Connect();
void AssertOnWorkQueue(); void AssertOnWorkQueue();
void ExceptionHandler(boost::exception_ptr exp); void ExceptionHandler(boost::exception_ptr exp);
void FlushTimeout(); void FlushTimeout();