Quality: Rewrite OpenTSDB to use Boost ASIO and I/O engine

The connection handling and code isn't really good, but not
really actively maintained either.

Besides that, the "telnet" method doesn't allow for TLS,
this needs a general rewrite against their HTTP API.

I've also added function documentation where applicable.
This commit is contained in:
Michael Friedrich 2019-05-27 15:09:26 +02:00
parent c77d6eb869
commit 0466316019
3 changed files with 93 additions and 21 deletions

View File

@ -28,6 +28,9 @@ REGISTER_TYPE(OpenTsdbWriter);
REGISTER_STATSFUNCTION(OpenTsdbWriter, &OpenTsdbWriter::StatsFunc);
/*
* Enable HA capabilities once the config object is loaded.
*/
void OpenTsdbWriter::OnConfigLoaded()
{
ObjectImpl<OpenTsdbWriter>::OnConfigLoaded();
@ -42,17 +45,27 @@ void OpenTsdbWriter::OnConfigLoaded()
}
}
/**
* Feature stats interface
*
* @param status Key value pairs for feature stats
*/
void OpenTsdbWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
{
DictionaryData nodes;
for (const OpenTsdbWriter::Ptr& opentsdbwriter : ConfigType::GetObjectsByType<OpenTsdbWriter>()) {
nodes.emplace_back(opentsdbwriter->GetName(), 1); //add more stats
nodes.emplace_back(opentsdbwriter->GetName(), new Dictionary({
{ "connected", opentsdbwriter->GetConnected() }
}));
}
status->Set("opentsdbwriter", new Dictionary(std::move(nodes)));
}
/**
* Resume is equivalent to Start, but with HA capabilities to resume at runtime.
*/
void OpenTsdbWriter::Resume()
{
ObjectImpl<OpenTsdbWriter>::Resume();
@ -69,7 +82,9 @@ void OpenTsdbWriter::Resume()
Service::OnNewCheckResult.connect(std::bind(&OpenTsdbWriter::CheckResultHandler, this, _1, _2));
}
/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
/**
* Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
*/
void OpenTsdbWriter::Pause()
{
m_ReconnectTimer.reset();
@ -77,33 +92,54 @@ void OpenTsdbWriter::Pause()
Log(LogInformation, "OpentsdbWriter")
<< "'" << GetName() << "' paused.";
m_Stream->close();
SetConnected(false);
ObjectImpl<OpenTsdbWriter>::Pause();
}
/**
* Reconnect handler called by the timer.
* Handles TLS
*/
void OpenTsdbWriter::ReconnectTimerHandler()
{
if (IsPaused())
return;
if (m_Stream)
return;
SetShouldConnect(true);
TcpSocket::Ptr socket = new TcpSocket();
if (GetConnected())
return;
Log(LogNotice, "OpenTsdbWriter")
<< "Reconnect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
/*
* We're using telnet as input method. Future PRs may change this into using the HTTP API.
* http://opentsdb.net/docs/build/html/user_guide/writing/index.html#telnet
*/
m_Stream = std::make_shared<AsioTcpStream>(IoEngine::Get().GetIoService());
try {
socket->Connect(GetHost(), GetPort());
} catch (std::exception&) {
Log(LogCritical, "OpenTsdbWriter")
<< "Can't connect to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() << "'.";
return;
icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
} catch (const std::exception& ex) {
Log(LogWarning, "OpenTsdbWriter")
<< "Can't connect to OpenTSDB on host '" << GetHost() << "' port '" << GetPort() << ".'";
}
m_Stream = new NetworkStream(socket);
SetConnected(true);
}
/**
* Registered check result handler processing data.
* Calculates tags from the config.
*
* @param checkable Host/service object
* @param cr Check result
*/
void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
{
if (IsPaused())
@ -165,6 +201,15 @@ void OpenTsdbWriter::CheckResultHandler(const Checkable::Ptr& checkable, const C
SendMetric(checkable, metric + ".execution_time", tags, cr->CalculateExecutionTime(), ts);
}
/**
* Parse and send performance data metrics to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
* @param tags Tag key pairs
* @param cr Check result containing performance data
* @param ts Timestamp when the check result was received
*/
void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, const CheckResult::Ptr& cr, double ts)
{
@ -209,6 +254,15 @@ void OpenTsdbWriter::SendPerfdata(const Checkable::Ptr& checkable, const String&
}
}
/**
* Send given metric to OpenTSDB
*
* @param checkable Host/service object
* @param metric Full metric name
* @param tags Tag key pairs
* @param value Floating point metric value
* @param ts Timestamp where the metric was received from the check result
*/
void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& metric,
const std::map<String, String>& tags, double value, double ts)
{
@ -220,7 +274,7 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
std::ostringstream msgbuf;
/*
* must be (http://opentsdb.net/docs/build/html/user_guide/writing.html)
* must be (http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html)
* put <metric> <timestamp> <value> <tagk1=tagv1[ tagk2=tagv2 ...tagkN=tagvN]>
* "tags" must include at least one tag, we use "host=HOSTNAME"
*/
@ -235,21 +289,27 @@ void OpenTsdbWriter::SendMetric(const Checkable::Ptr& checkable, const String& m
ObjectLock olock(this);
if (!m_Stream)
if (!GetConnected())
return;
try {
m_Stream->Write(put.CStr(), put.GetLength());
Log(LogDebug, "OpenTsdbWriter")
<< "Checkable '" << checkable->GetName() << "' sending message '" << put << "'.";
boost::asio::write(*m_Stream, boost::asio::buffer(msgbuf.str()));
m_Stream->flush();
} catch (const std::exception& ex) {
Log(LogCritical, "OpenTsdbWriter")
<< "Cannot write to OpenTSDB TSD on host '" << GetHost() << "' port '" << GetPort() + "'.";
m_Stream.reset();
<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
}
}
/* for metric and tag name rules, see
* http://opentsdb.net/docs/build/html/user_guide/writing.html#metrics-and-tags
/**
* Escape tags for OpenTSDB
* http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
*
* @param str Tag name
* @return Escaped tag
*/
String OpenTsdbWriter::EscapeTag(const String& str)
{
@ -261,6 +321,13 @@ String OpenTsdbWriter::EscapeTag(const String& str)
return result;
}
/**
* Escape metric name for OpenTSDB
* http://opentsdb.net/docs/build/html/user_guide/query/timeseries.html#precisions-on-metrics-and-tags
*
* @param str Metric name
* @return Escaped metric
*/
String OpenTsdbWriter::EscapeMetric(const String& str)
{
String result = str;
@ -271,4 +338,4 @@ String OpenTsdbWriter::EscapeMetric(const String& str)
boost::replace_all(result, ":", "_");
return result;
}
}

View File

@ -32,7 +32,7 @@ protected:
void Pause() override;
private:
Stream::Ptr m_Stream;
std::shared_ptr<AsioTcpStream> m_Stream;
Timer::Ptr m_ReconnectTimer;

View File

@ -20,6 +20,11 @@ class OpenTsdbWriter : ConfigObject
[config] bool enable_ha {
default {{{ return false; }}}
};
[no_user_modify] bool connected;
[no_user_modify] bool should_connect {
default {{{ return true; }}}
};
};
}