Use new I/O engine in GelfWriter

This commit is contained in:
Michael Insel 2019-05-16 19:39:06 +02:00
parent 7963881715
commit bc0ab93e44
2 changed files with 53 additions and 30 deletions

View File

@ -22,6 +22,11 @@
#include "base/statsfunction.hpp" #include "base/statsfunction.hpp"
#include <boost/algorithm/string/replace.hpp> #include <boost/algorithm/string/replace.hpp>
#include <utility> #include <utility>
#include "base/io-engine.hpp"
#include <boost/asio/write.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/system/error_code.hpp>
#include <boost/asio/error.hpp>
using namespace icinga; using namespace icinga;
@ -126,11 +131,7 @@ void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
Log(LogDebug, "GelfWriter") Log(LogDebug, "GelfWriter")
<< "Exception during Graylog Gelf operation: " << DiagnosticInformation(std::move(exp)); << "Exception during Graylog Gelf operation: " << DiagnosticInformation(std::move(exp));
if (GetConnected()) { DisconnectInternal();
m_Stream->Close();
SetConnected(false);
}
} }
void GelfWriter::Reconnect() void GelfWriter::Reconnect()
@ -156,43 +157,46 @@ void GelfWriter::ReconnectInternal()
if (GetConnected()) if (GetConnected())
return; return;
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "GelfWriter") Log(LogNotice, "GelfWriter")
<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
try { bool ssl = GetEnableTls();
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter")
<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
if (GetEnableTls()) { if (ssl) {
std::shared_ptr<SSL_CTX> sslContext; std::shared_ptr<boost::asio::ssl::context> sslContext;
try { try {
sslContext = MakeSSLContext(GetCertPath(), GetKeyPath(), GetCaPath()); sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter") Log(LogWarning, "GelfWriter")
<< "Unable to create SSL context."; << "Unable to create SSL context.";
throw ex; throw;
} }
TlsStream::Ptr tlsStream = new TlsStream(socket, GetHost(), RoleClient, sslContext); m_Stream.first = std::make_shared<AsioTlsStream>(IoEngine::Get().GetIoService(), *sslContext, GetHost());
} else {
m_Stream.second = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
}
try { try {
tlsStream->Handshake(); icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter") Log(LogWarning, "GelfWriter")
<< "TLS handshake with host'" << GetHost() << "' on port '" << GetPort() << "' failed.'"; << "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
throw ex; throw;
} }
m_Stream = tlsStream; if (ssl) {
} else auto& tlsStream (m_Stream.first->next_layer());
m_Stream = new NetworkStream(socket);
try {
tlsStream.handshake(tlsStream.client);
} catch (const std::exception& ex) {
Log(LogWarning, "GelfWriter")
<< "TLS handshake with host '" << GetHost() << " failed.'";
throw;
}
}
SetConnected(true); SetConnected(true);
@ -217,9 +221,22 @@ void GelfWriter::DisconnectInternal()
if (!GetConnected()) if (!GetConnected())
return; return;
m_Stream->Close(); if (m_Stream.first) {
boost::system::error_code ec;
m_Stream.first->next_layer().shutdown(ec);
// https://stackoverflow.com/a/25703699
// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
if (ec.category() == boost::asio::error::get_ssl_category()) {
Log(LogCritical, "GelfWriter")
<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
}
} else if (m_Stream.second) {
m_Stream.second->close();
}
SetConnected(false); SetConnected(false);
} }
void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
@ -479,7 +496,13 @@ void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& g
Log(LogDebug, "GelfWriter") Log(LogDebug, "GelfWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'."; << "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
m_Stream->Write(log.CStr(), log.GetLength()); if (m_Stream.first) {
boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
m_Stream.first->flush();
} else {
boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
m_Stream.second->flush();
}
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogCritical, "GelfWriter") Log(LogCritical, "GelfWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";

View File

@ -33,7 +33,7 @@ protected:
void Pause() override; void Pause() override;
private: private:
Stream::Ptr m_Stream; OptionalTlsStream m_Stream;
WorkQueue m_WorkQueue{10000000, 1}; WorkQueue m_WorkQueue{10000000, 1};
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;