Quality: Use Boost ASIO/IO engine in Graphite feature

This commit changes the reconnect priority to high.

Also add function docs.
This commit is contained in:
Michael Friedrich 2019-05-27 16:49:51 +02:00
parent 0466316019
commit efd4e8ad40
2 changed files with 128 additions and 17 deletions

View File

@ -28,6 +28,9 @@ REGISTER_TYPE(GraphiteWriter);
REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc); REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
/*
* Enable HA capabilities once the config object is loaded.
*/
void GraphiteWriter::OnConfigLoaded() void GraphiteWriter::OnConfigLoaded()
{ {
ObjectImpl<GraphiteWriter>::OnConfigLoaded(); ObjectImpl<GraphiteWriter>::OnConfigLoaded();
@ -44,6 +47,12 @@ void GraphiteWriter::OnConfigLoaded()
} }
} }
/**
* Feature stats interface
*
* @param status Key value pairs for feature stats
* @param perfdata Array of PerfdataValue objects
*/
void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
{ {
DictionaryData nodes; DictionaryData nodes;
@ -65,6 +74,9 @@ void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&
status->Set("graphitewriter", new Dictionary(std::move(nodes))); status->Set("graphitewriter", new Dictionary(std::move(nodes)));
} }
/**
* Resume is equivalent to Start, but with HA capabilities to resume at runtime.
*/
void GraphiteWriter::Resume() void GraphiteWriter::Resume()
{ {
ObjectImpl<GraphiteWriter>::Resume(); ObjectImpl<GraphiteWriter>::Resume();
@ -86,7 +98,9 @@ void GraphiteWriter::Resume()
Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2)); Checkable::OnNewCheckResult.connect(std::bind(&GraphiteWriter::CheckResultHandler, this, _1, _2));
} }
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ /**
* Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
*/
void GraphiteWriter::Pause() void GraphiteWriter::Pause()
{ {
m_ReconnectTimer.reset(); m_ReconnectTimer.reset();
@ -110,11 +124,21 @@ void GraphiteWriter::Pause()
ObjectImpl<GraphiteWriter>::Pause(); ObjectImpl<GraphiteWriter>::Pause();
} }
/**
* Check if method is called inside the WQ thread.
*/
void GraphiteWriter::AssertOnWorkQueue() void GraphiteWriter::AssertOnWorkQueue()
{ {
ASSERT(m_WorkQueue.IsWorkerThread()); ASSERT(m_WorkQueue.IsWorkerThread());
} }
/**
* Exception handler for the WQ.
*
* Closes the connection if connected.
*
* @param exp Exception pointer
*/
void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp) void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
{ {
Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!"); Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
@ -123,12 +147,17 @@ void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp)); << "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
if (GetConnected()) { if (GetConnected()) {
m_Stream->Close(); m_Stream->close();
SetConnected(false); SetConnected(false);
} }
} }
/**
* Reconnect method, stops when the feature is paused in HA zones.
*
* Called inside the WQ.
*/
void GraphiteWriter::Reconnect() void GraphiteWriter::Reconnect()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -141,6 +170,9 @@ void GraphiteWriter::Reconnect()
ReconnectInternal(); ReconnectInternal();
} }
/**
* Reconnect method, connects to a TCP Stream
*/
void GraphiteWriter::ReconnectInternal() void GraphiteWriter::ReconnectInternal()
{ {
double startTime = Utility::GetTime(); double startTime = Utility::GetTime();
@ -152,20 +184,17 @@ void GraphiteWriter::ReconnectInternal()
if (GetConnected()) if (GetConnected())
return; return;
TcpSocket::Ptr socket = new TcpSocket();
Log(LogNotice, "GraphiteWriter") Log(LogNotice, "GraphiteWriter")
<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
try { m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
socket->Connect(GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
throw ex;
}
m_Stream = new NetworkStream(socket); try {
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "GraphiteWriter")
<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
}
SetConnected(true); SetConnected(true);
@ -173,14 +202,24 @@ void GraphiteWriter::ReconnectInternal()
<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s)."; << "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
} }
/**
* Reconnect handler called by the timer.
*
* Enqueues a reconnect task into the WQ.
*/
void GraphiteWriter::ReconnectTimerHandler() void GraphiteWriter::ReconnectTimerHandler()
{ {
if (IsPaused()) if (IsPaused())
return; return;
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityNormal); m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::Reconnect, this), PriorityHigh);
} }
/**
* Disconnect the stream.
*
* Called inside the WQ.
*/
void GraphiteWriter::Disconnect() void GraphiteWriter::Disconnect()
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -188,16 +227,27 @@ void GraphiteWriter::Disconnect()
DisconnectInternal(); DisconnectInternal();
} }
/**
* Disconnect the stream.
*
* Called outside the WQ.
*/
void GraphiteWriter::DisconnectInternal() void GraphiteWriter::DisconnectInternal()
{ {
if (!GetConnected()) if (!GetConnected())
return; return;
m_Stream->Close(); m_Stream->close();
SetConnected(false); SetConnected(false);
} }
/**
* Check result event handler, checks whether feature is not paused in HA setups.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{ {
if (IsPaused()) if (IsPaused())
@ -206,6 +256,14 @@ void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr)); m_WorkQueue.Enqueue(std::bind(&GraphiteWriter::CheckResultHandlerInternal, this, checkable, cr));
} }
/**
* Check result event handler, prepares metadata and perfdata values and calls Send*()
*
* Called inside the WQ.
*
* @param checkable Host/Service object
* @param cr Check result including performance data
*/
void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -262,6 +320,14 @@ void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable,
SendPerfdata(checkable, prefixPerfdata, cr, ts); SendPerfdata(checkable, prefixPerfdata, cr, ts);
} }
/**
* Parse performance data from check result and call SendMetric()
*
* @param checkable Host/service object
* @param prefix Metric prefix string
* @param cr Check result including performance data
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts) void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
{ {
Array::Ptr perfdata = cr->GetPerformanceData(); Array::Ptr perfdata = cr->GetPerformanceData();
@ -306,8 +372,19 @@ void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
} }
} }
/**
* Computes metric data and sends to Graphite
*
* @param checkable Host/service object
* @param prefix Computed metric prefix string
* @param name Metric name
* @param value Metric value
* @param ts Timestamp when the check result was created
*/
void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts) void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
{ {
namespace asio = boost::asio;
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts); msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
@ -316,7 +393,6 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
// do not send \n to debug log // do not send \n to debug log
msgbuf << "\n"; msgbuf << "\n";
String metric = msgbuf.str();
boost::mutex::scoped_lock lock(m_StreamMutex); boost::mutex::scoped_lock lock(m_StreamMutex);
@ -324,7 +400,8 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
return; return;
try { try {
m_Stream->Write(metric.CStr(), metric.GetLength()); asio::write(*m_Stream, asio::buffer(msgbuf.str()));
m_Stream->flush();
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
Log(LogCritical, "GraphiteWriter") Log(LogCritical, "GraphiteWriter")
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'."; << "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
@ -333,6 +410,14 @@ void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& p
} }
} }
/**
* Escape metric tree elements
*
* Dots are not allowed, e.g. in host names
*
* @param str Metric part name
* @return Escape string
*/
String GraphiteWriter::EscapeMetric(const String& str) String GraphiteWriter::EscapeMetric(const String& str)
{ {
String result = str; String result = str;
@ -346,6 +431,14 @@ String GraphiteWriter::EscapeMetric(const String& str)
return result; return result;
} }
/**
* Escape metric label
*
* Dots are allowed - users can create trees from perfdata labels
*
* @param str Metric label name
* @return Escaped string
*/
String GraphiteWriter::EscapeMetricLabel(const String& str) String GraphiteWriter::EscapeMetricLabel(const String& str)
{ {
String result = str; String result = str;
@ -359,6 +452,12 @@ String GraphiteWriter::EscapeMetricLabel(const String& str)
return result; return result;
} }
/**
* Escape macro metrics found via host/service name templates
*
* @param value Array or string with macro metric names
* @return Escaped string. Arrays are joined with dots.
*/
Value GraphiteWriter::EscapeMacroMetric(const Value& value) Value GraphiteWriter::EscapeMacroMetric(const Value& value)
{ {
if (value.IsObjectType<Array>()) { if (value.IsObjectType<Array>()) {
@ -375,6 +474,12 @@ Value GraphiteWriter::EscapeMacroMetric(const Value& value)
return EscapeMetric(value); return EscapeMetric(value);
} }
/**
* Validate the configuration setting 'host_name_template'
*
* @param lvalue String containing runtime macros.
* @param utils Helper, unused
*/
void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{ {
ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils); ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
@ -383,6 +488,12 @@ void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const
BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'.")); BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
} }
/**
* Validate the configuration setting 'service_name_template'
*
* @param lvalue String containing runtime macros.
* @param utils Helper, unused
*/
void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils) void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
{ {
ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils); ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);

View File

@ -37,7 +37,7 @@ protected:
void Pause() override; void Pause() override;
private: private:
Stream::Ptr m_Stream; std::shared_ptr<AsioTcpStream> m_Stream;
boost::mutex m_StreamMutex; boost::mutex m_StreamMutex;
WorkQueue m_WorkQueue{10000000, 1}; WorkQueue m_WorkQueue{10000000, 1};