mirror of https://github.com/Icinga/icinga2.git
Use new I/O engine in InfluxdbWriter
This commit is contained in:
parent
c1fa07899c
commit
14fdfff770
|
@ -9,7 +9,9 @@
|
||||||
#include "icinga/macroprocessor.hpp"
|
#include "icinga/macroprocessor.hpp"
|
||||||
#include "icinga/icingaapplication.hpp"
|
#include "icinga/icingaapplication.hpp"
|
||||||
#include "icinga/checkcommand.hpp"
|
#include "icinga/checkcommand.hpp"
|
||||||
|
#include "base/application.hpp"
|
||||||
#include "base/defer.hpp"
|
#include "base/defer.hpp"
|
||||||
|
#include "base/io-engine.hpp"
|
||||||
#include "base/tcpsocket.hpp"
|
#include "base/tcpsocket.hpp"
|
||||||
#include "base/configtype.hpp"
|
#include "base/configtype.hpp"
|
||||||
#include "base/objectlock.hpp"
|
#include "base/objectlock.hpp"
|
||||||
|
@ -25,9 +27,21 @@
|
||||||
#include "base/tlsutility.hpp"
|
#include "base/tlsutility.hpp"
|
||||||
#include <boost/algorithm/string.hpp>
|
#include <boost/algorithm/string.hpp>
|
||||||
#include <boost/algorithm/string/replace.hpp>
|
#include <boost/algorithm/string/replace.hpp>
|
||||||
|
#include <boost/asio/ssl/context.hpp>
|
||||||
|
#include <boost/beast/core/flat_buffer.hpp>
|
||||||
|
#include <boost/beast/http/field.hpp>
|
||||||
|
#include <boost/beast/http/message.hpp>
|
||||||
|
#include <boost/beast/http/parser.hpp>
|
||||||
|
#include <boost/beast/http/read.hpp>
|
||||||
|
#include <boost/beast/http/status.hpp>
|
||||||
|
#include <boost/beast/http/string_body.hpp>
|
||||||
|
#include <boost/beast/http/verb.hpp>
|
||||||
|
#include <boost/beast/http/write.hpp>
|
||||||
#include <boost/math/special_functions/fpclassify.hpp>
|
#include <boost/math/special_functions/fpclassify.hpp>
|
||||||
#include <boost/regex.hpp>
|
#include <boost/regex.hpp>
|
||||||
#include <boost/scoped_array.hpp>
|
#include <boost/scoped_array.hpp>
|
||||||
|
#include <memory>
|
||||||
|
#include <string>
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
|
@ -156,44 +170,51 @@ void InfluxdbWriter::ExceptionHandler(boost::exception_ptr exp)
|
||||||
//TODO: Close the connection, if we keep it open.
|
//TODO: Close the connection, if we keep it open.
|
||||||
}
|
}
|
||||||
|
|
||||||
Stream::Ptr InfluxdbWriter::Connect()
|
OptionalTlsStream InfluxdbWriter::Connect()
|
||||||
{
|
{
|
||||||
TcpSocket::Ptr socket = new TcpSocket();
|
|
||||||
|
|
||||||
Log(LogNotice, "InfluxdbWriter")
|
Log(LogNotice, "InfluxdbWriter")
|
||||||
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
<< "Reconnecting to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
|
||||||
try {
|
OptionalTlsStream stream;
|
||||||
socket->Connect(GetHost(), GetPort());
|
bool ssl = GetSslEnable();
|
||||||
} catch (const std::exception& ex) {
|
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
if (ssl) {
|
||||||
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
std::shared_ptr<boost::asio::ssl::context> sslContext;
|
||||||
throw ex;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (GetSslEnable()) {
|
|
||||||
std::shared_ptr<SSL_CTX> sslContext;
|
|
||||||
try {
|
try {
|
||||||
sslContext = MakeSSLContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
sslContext = MakeAsioSslContext(GetSslCert(), GetSslKey(), GetSslCaCert());
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "Unable to create SSL context.";
|
<< "Unable to create SSL context.";
|
||||||
throw ex;
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
|
||||||
|
} else {
|
||||||
|
stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
|
||||||
}
|
}
|
||||||
|
|
||||||
TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext);
|
|
||||||
try {
|
try {
|
||||||
tlsStream->Handshake();
|
icinga::Connect(ssl ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
|
<< "Can't connect to InfluxDB on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ssl) {
|
||||||
|
auto& tlsStream (stream.first->next_layer());
|
||||||
|
|
||||||
|
try {
|
||||||
|
tlsStream.handshake(tlsStream.client);
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "TLS handshake with host '" << GetHost() << "' failed.";
|
<< "TLS handshake with host '" << GetHost() << "' failed.";
|
||||||
throw ex;
|
throw;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return tlsStream;
|
return std::move(stream);
|
||||||
} else {
|
|
||||||
return new NetworkStream(socket);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
void InfluxdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
|
||||||
|
@ -429,6 +450,9 @@ void InfluxdbWriter::FlushTimeoutWQ()
|
||||||
|
|
||||||
void InfluxdbWriter::Flush()
|
void InfluxdbWriter::Flush()
|
||||||
{
|
{
|
||||||
|
namespace beast = boost::beast;
|
||||||
|
namespace http = beast::http;
|
||||||
|
|
||||||
/* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
|
/* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
|
||||||
if (m_DataBuffer.empty())
|
if (m_DataBuffer.empty())
|
||||||
return;
|
return;
|
||||||
|
@ -439,7 +463,7 @@ void InfluxdbWriter::Flush()
|
||||||
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
String body = boost::algorithm::join(m_DataBuffer, "\n");
|
||||||
m_DataBuffer.clear();
|
m_DataBuffer.clear();
|
||||||
|
|
||||||
Stream::Ptr stream;
|
OptionalTlsStream stream;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
stream = Connect();
|
stream = Connect();
|
||||||
|
@ -449,10 +473,11 @@ void InfluxdbWriter::Flush()
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!stream)
|
Defer s ([&stream]() {
|
||||||
return;
|
if (stream.first) {
|
||||||
|
stream.first->next_layer().shutdown();
|
||||||
Defer close ([&stream]() { stream->Close(); });
|
}
|
||||||
|
});
|
||||||
|
|
||||||
Url::Ptr url = new Url();
|
Url::Ptr url = new Url();
|
||||||
url->SetScheme(GetSslEnable() ? "https" : "http");
|
url->SetScheme(GetSslEnable() ? "https" : "http");
|
||||||
|
@ -470,59 +495,64 @@ void InfluxdbWriter::Flush()
|
||||||
if (!GetPassword().IsEmpty())
|
if (!GetPassword().IsEmpty())
|
||||||
url->AddQueryElement("p", GetPassword());
|
url->AddQueryElement("p", GetPassword());
|
||||||
|
|
||||||
HttpRequest req(stream);
|
http::request<http::string_body> request (http::verb::post, std::string(url->Format(true)), 10);
|
||||||
req.RequestMethod = "POST";
|
|
||||||
req.RequestUrl = url;
|
request.set(http::field::user_agent, "Icinga/" + Application::GetAppVersion());
|
||||||
|
request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
|
||||||
|
|
||||||
|
request.body() = body;
|
||||||
|
request.set(http::field::content_length, request.body().size());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
req.WriteBody(body.CStr(), body.GetLength());
|
if (stream.first) {
|
||||||
req.Finish();
|
http::write(*stream.first, request);
|
||||||
|
stream.first->flush();
|
||||||
|
} else {
|
||||||
|
http::write(*stream.second, request);
|
||||||
|
stream.second->flush();
|
||||||
|
}
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
|
||||||
throw ex;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
HttpResponse resp(stream, req);
|
http::parser<false, http::string_body> parser;
|
||||||
StreamReadContext context;
|
beast::flat_buffer buf;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
while (resp.Parse(context, true) && !resp.Complete)
|
if (stream.first) {
|
||||||
; /* Do nothing */
|
http::read(*stream.first, buf, parser);
|
||||||
|
} else {
|
||||||
|
http::read(*stream.second, buf, parser);
|
||||||
|
}
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
|
<< "Failed to parse HTTP response from host '" << GetHost() << "' port '" << GetPort() << "': " << DiagnosticInformation(ex);
|
||||||
throw ex;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!resp.Complete) {
|
auto& response (parser.get());
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
|
||||||
<< "Failed to read a complete HTTP response from the InfluxDB server.";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (resp.StatusCode != 204) {
|
if (response.result() != http::status::no_content) {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "Unexpected response code: " << resp.StatusCode;
|
<< "Unexpected response code: " << response.result();
|
||||||
|
|
||||||
String contentType = resp.Headers->Get("content-type");
|
auto& contentType (response[http::field::content_type]);
|
||||||
if (contentType != "application/json") {
|
if (contentType != "application/json") {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "Unexpected Content-Type: " << contentType;
|
<< "Unexpected Content-Type: " << contentType;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t responseSize = resp.GetBodySize();
|
|
||||||
boost::scoped_array<char> buffer(new char[responseSize + 1]);
|
|
||||||
resp.ReadBody(buffer.get(), responseSize);
|
|
||||||
buffer.get()[responseSize] = '\0';
|
|
||||||
|
|
||||||
Dictionary::Ptr jsonResponse;
|
Dictionary::Ptr jsonResponse;
|
||||||
|
auto& body (response.body());
|
||||||
|
|
||||||
try {
|
try {
|
||||||
jsonResponse = JsonDecode(buffer.get());
|
jsonResponse = JsonDecode(body);
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
Log(LogWarning, "InfluxdbWriter")
|
Log(LogWarning, "InfluxdbWriter")
|
||||||
<< "Unable to parse JSON response:\n" << buffer.get();
|
<< "Unable to parse JSON response:\n" << body;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -530,8 +560,6 @@ void InfluxdbWriter::Flush()
|
||||||
|
|
||||||
Log(LogCritical, "InfluxdbWriter")
|
Log(LogCritical, "InfluxdbWriter")
|
||||||
<< "InfluxDB error message:\n" << error;
|
<< "InfluxDB error message:\n" << error;
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
#include "base/configobject.hpp"
|
#include "base/configobject.hpp"
|
||||||
#include "base/tcpsocket.hpp"
|
#include "base/tcpsocket.hpp"
|
||||||
#include "base/timer.hpp"
|
#include "base/timer.hpp"
|
||||||
|
#include "base/tlsstream.hpp"
|
||||||
#include "base/workqueue.hpp"
|
#include "base/workqueue.hpp"
|
||||||
#include <fstream>
|
#include <fstream>
|
||||||
|
|
||||||
|
@ -51,7 +52,7 @@ private:
|
||||||
static String EscapeKeyOrTagValue(const String& str);
|
static String EscapeKeyOrTagValue(const String& str);
|
||||||
static String EscapeValue(const Value& value);
|
static String EscapeValue(const Value& value);
|
||||||
|
|
||||||
Stream::Ptr Connect();
|
OptionalTlsStream Connect();
|
||||||
|
|
||||||
void AssertOnWorkQueue();
|
void AssertOnWorkQueue();
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue