2016-04-19 13:54:41 +02:00
/******************************************************************************
* Icinga 2 *
2017-01-10 15:54:22 +01:00
* Copyright ( C ) 2012 - 2017 Icinga Development Team ( https : //www.icinga.com/) *
2016-04-19 13:54:41 +02:00
* *
* This program is free software ; you can redistribute it and / or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation ; either version 2 *
* of the License , or ( at your option ) any later version . *
* *
* This program is distributed in the hope that it will be useful , *
* but WITHOUT ANY WARRANTY ; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE . See the *
* GNU General Public License for more details . *
* *
* You should have received a copy of the GNU General Public License *
* along with this program ; if not , write to the Free Software Foundation *
* Inc . , 51 Franklin St , Fifth Floor , Boston , MA 02110 - 1301 , USA . *
* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * */
# include "perfdata/influxdbwriter.hpp"
# include "perfdata/influxdbwriter.tcpp"
# include "remote/url.hpp"
# include "remote/httprequest.hpp"
# include "remote/httpresponse.hpp"
# include "icinga/service.hpp"
# include "icinga/macroprocessor.hpp"
# include "icinga/icingaapplication.hpp"
# include "icinga/checkcommand.hpp"
# include "base/tcpsocket.hpp"
# include "base/configtype.hpp"
# include "base/objectlock.hpp"
# include "base/logger.hpp"
# include "base/convert.hpp"
# include "base/utility.hpp"
2017-05-15 15:51:39 +02:00
# include "base/perfdatavalue.hpp"
2016-04-19 13:54:41 +02:00
# include "base/stream.hpp"
2016-07-28 14:29:37 +02:00
# include "base/json.hpp"
2016-04-19 13:54:41 +02:00
# include "base/networkstream.hpp"
# include "base/exception.hpp"
# include "base/statsfunction.hpp"
# include "base/tlsutility.hpp"
# include <boost/algorithm/string.hpp>
# include <boost/algorithm/string/classification.hpp>
# include <boost/algorithm/string/split.hpp>
# include <boost/algorithm/string/replace.hpp>
2016-06-07 14:35:16 +02:00
# include <boost/regex.hpp>
2016-07-28 14:29:37 +02:00
# include <boost/scoped_array.hpp>
2016-04-19 13:54:41 +02:00
using namespace icinga ;
REGISTER_TYPE ( InfluxdbWriter ) ;
REGISTER_STATSFUNCTION ( InfluxdbWriter , & InfluxdbWriter : : StatsFunc ) ;
2017-05-04 10:29:49 +02:00
//TODO: Evaluate whether multiple WQ threads and InfluxDB connections are possible. 10 threads will hog InfluxDB in large scale environments.
InfluxdbWriter : : InfluxdbWriter ( void )
: m_WorkQueue ( 10000000 , 1 ) , m_TaskStats ( 15 * 60 ) , m_PendingTasks ( 0 ) , m_PendingTasksTimestamp ( 0 )
{ }
void InfluxdbWriter : : OnConfigLoaded ( void )
{
ObjectImpl < InfluxdbWriter > : : OnConfigLoaded ( ) ;
m_WorkQueue . SetName ( " InfluxdbWriter, " + GetName ( ) ) ;
}
2016-04-19 13:54:41 +02:00
void InfluxdbWriter : : StatsFunc ( const Dictionary : : Ptr & status , const Array : : Ptr & )
{
Dictionary : : Ptr nodes = new Dictionary ( ) ;
2016-08-25 06:19:44 +02:00
for ( const InfluxdbWriter : : Ptr & influxdbwriter : ConfigType : : GetObjectsByType < InfluxdbWriter > ( ) ) {
2017-05-04 10:29:49 +02:00
size_t workQueueItems = influxdbwriter - > m_WorkQueue . GetLength ( ) ;
2017-05-11 17:30:20 +02:00
double workQueueItemRate = influxdbwriter - > m_WorkQueue . GetTaskCount ( 60 ) / 60.0 ;
2017-05-04 10:29:49 +02:00
size_t dataBufferItems = influxdbwriter - > m_DataBuffer . size ( ) ;
//TODO: Collect more stats
Dictionary : : Ptr stats = new Dictionary ( ) ;
stats - > Set ( " work_queue_items " , workQueueItems ) ;
2017-05-11 17:30:20 +02:00
stats - > Set ( " work_queue_item_rate " , workQueueItemRate ) ;
2017-05-04 10:29:49 +02:00
stats - > Set ( " data_buffer_items " , dataBufferItems ) ;
nodes - > Set ( influxdbwriter - > GetName ( ) , stats ) ;
2016-04-19 13:54:41 +02:00
}
status - > Set ( " influxdbwriter " , nodes ) ;
}
void InfluxdbWriter : : Start ( bool runtimeCreated )
{
ObjectImpl < InfluxdbWriter > : : Start ( runtimeCreated ) ;
2017-02-08 14:53:52 +01:00
Log ( LogInformation , " InfluxdbWriter " )
< < " ' " < < GetName ( ) < < " ' started. " ;
2017-05-04 10:29:49 +02:00
/* Register exception handler for WQ tasks. */
m_WorkQueue . SetExceptionCallback ( boost : : bind ( & InfluxdbWriter : : ExceptionHandler , this , _1 ) ) ;
/* Setup timer for periodically flushing m_DataBuffer */
2016-04-19 13:54:41 +02:00
m_FlushTimer = new Timer ( ) ;
m_FlushTimer - > SetInterval ( GetFlushInterval ( ) ) ;
m_FlushTimer - > OnTimerExpired . connect ( boost : : bind ( & InfluxdbWriter : : FlushTimeout , this ) ) ;
m_FlushTimer - > Start ( ) ;
m_FlushTimer - > Reschedule ( 0 ) ;
2017-05-04 10:29:49 +02:00
/* Timer for updating and logging work queue stats */
m_StatsLoggerTimer = new Timer ( ) ;
m_StatsLoggerTimer - > SetInterval ( 60 ) ; // don't be too noisy
m_StatsLoggerTimer - > OnTimerExpired . connect ( boost : : bind ( & InfluxdbWriter : : StatsLoggerTimerHandler , this ) ) ;
m_StatsLoggerTimer - > Start ( ) ;
/* Register for new metrics. */
2016-04-19 13:54:41 +02:00
Service : : OnNewCheckResult . connect ( boost : : bind ( & InfluxdbWriter : : CheckResultHandler , this , _1 , _2 ) ) ;
}
2017-02-08 14:53:52 +01:00
void InfluxdbWriter : : Stop ( bool runtimeRemoved )
{
Log ( LogInformation , " InfluxdbWriter " )
< < " ' " < < GetName ( ) < < " ' stopped. " ;
2017-05-04 10:29:49 +02:00
m_WorkQueue . Join ( ) ;
2017-02-08 14:53:52 +01:00
ObjectImpl < InfluxdbWriter > : : Stop ( runtimeRemoved ) ;
}
2017-05-04 10:29:49 +02:00
void InfluxdbWriter : : AssertOnWorkQueue ( void )
{
ASSERT ( m_WorkQueue . IsWorkerThread ( ) ) ;
}
void InfluxdbWriter : : ExceptionHandler ( boost : : exception_ptr exp )
{
Log ( LogCritical , " InfluxdbWriter " , " Exception during InfluxDB operation: Verify that your backend is operational! " ) ;
Log ( LogDebug , " InfluxdbWriter " )
< < " Exception during InfluxDB operation: " < < DiagnosticInformation ( exp ) ;
//TODO: Close the connection, if we keep it open.
}
void InfluxdbWriter : : StatsLoggerTimerHandler ( void )
{
int pending = m_WorkQueue . GetLength ( ) ;
double now = Utility : : GetTime ( ) ;
double gradient = ( pending - m_PendingTasks ) / ( now - m_PendingTasksTimestamp ) ;
double timeToZero = pending / gradient ;
String timeInfo ;
if ( pending > GetTaskCount ( 5 ) ) {
timeInfo = " empty in " ;
if ( timeToZero < 0 )
timeInfo + = " infinite time, your backend isn't able to keep up " ;
else
timeInfo + = Utility : : FormatDuration ( timeToZero ) ;
}
m_PendingTasks = pending ;
m_PendingTasksTimestamp = now ;
Log ( LogInformation , " InfluxdbWriter " )
< < " Work queue items: " < < pending
< < " , rate: " < < std : : setw ( 2 ) < < GetTaskCount ( 60 ) / 60.0 < < " /s "
< < " ( " < < GetTaskCount ( 60 ) < < " /min " < < GetTaskCount ( 60 * 5 ) < < " /5min " < < GetTaskCount ( 60 * 15 ) < < " /15min); "
< < timeInfo ;
}
2017-01-20 16:45:44 +01:00
Stream : : Ptr InfluxdbWriter : : Connect ( TcpSocket : : Ptr & socket )
2016-04-19 13:54:41 +02:00
{
2017-01-20 16:45:44 +01:00
socket = new TcpSocket ( ) ;
2016-04-19 13:54:41 +02:00
Log ( LogNotice , " InfluxdbWriter " )
< < " Reconnecting to InfluxDB on host ' " < < GetHost ( ) < < " ' port ' " < < GetPort ( ) < < " '. " ;
try {
socket - > Connect ( GetHost ( ) , GetPort ( ) ) ;
2017-05-04 10:29:49 +02:00
} catch ( const std : : exception & ex ) {
2016-04-19 13:54:41 +02:00
Log ( LogWarning , " InfluxdbWriter " )
< < " Can't connect to InfluxDB on host ' " < < GetHost ( ) < < " ' port ' " < < GetPort ( ) < < " '. " ;
2017-05-04 10:29:49 +02:00
throw ex ;
2016-04-19 13:54:41 +02:00
}
if ( GetSslEnable ( ) ) {
2017-05-04 10:29:49 +02:00
boost : : shared_ptr < SSL_CTX > sslContext ;
2016-04-19 13:54:41 +02:00
try {
2017-05-04 10:29:49 +02:00
sslContext = MakeSSLContext ( GetSslCert ( ) , GetSslKey ( ) , GetSslCaCert ( ) ) ;
} catch ( const std : : exception & ex ) {
2016-04-19 13:54:41 +02:00
Log ( LogWarning , " InfluxdbWriter " )
< < " Unable to create SSL context. " ;
2017-05-04 10:29:49 +02:00
throw ex ;
2016-04-19 13:54:41 +02:00
}
2017-05-04 10:29:49 +02:00
TlsStream : : Ptr tlsStream = new TlsStream ( socket , GetHost ( ) , RoleClient , sslContext ) ;
2016-04-19 13:54:41 +02:00
try {
2017-05-04 10:29:49 +02:00
tlsStream - > Handshake ( ) ;
} catch ( const std : : exception & ex ) {
2016-04-19 13:54:41 +02:00
Log ( LogWarning , " InfluxdbWriter " )
< < " TLS handshake with host ' " < < GetHost ( ) < < " ' failed. " ;
2017-05-04 10:29:49 +02:00
throw ex ;
2016-04-19 13:54:41 +02:00
}
2017-05-04 10:29:49 +02:00
return tlsStream ;
2016-04-19 13:54:41 +02:00
} else {
return new NetworkStream ( socket ) ;
}
}
void InfluxdbWriter : : CheckResultHandler ( const Checkable : : Ptr & checkable , const CheckResult : : Ptr & cr )
{
CONTEXT ( " Processing check result for ' " + checkable - > GetName ( ) + " ' " ) ;
if ( ! IcingaApplication : : GetInstance ( ) - > GetEnablePerfdata ( ) | | ! checkable - > GetEnablePerfdata ( ) )
return ;
Host : : Ptr host ;
Service : : Ptr service ;
boost : : tie ( host , service ) = GetHostService ( checkable ) ;
MacroProcessor : : ResolverList resolvers ;
if ( service )
resolvers . push_back ( std : : make_pair ( " service " , service ) ) ;
resolvers . push_back ( std : : make_pair ( " host " , host ) ) ;
resolvers . push_back ( std : : make_pair ( " icinga " , IcingaApplication : : GetInstance ( ) ) ) ;
String prefix ;
double ts = cr - > GetExecutionEnd ( ) ;
// Clone the template and perform an in-place macro expansion of measurement and tag values
Dictionary : : Ptr tmpl_clean = service ? GetServiceTemplate ( ) : GetHostTemplate ( ) ;
Dictionary : : Ptr tmpl = static_pointer_cast < Dictionary > ( tmpl_clean - > Clone ( ) ) ;
tmpl - > Set ( " measurement " , MacroProcessor : : ResolveMacros ( tmpl - > Get ( " measurement " ) , resolvers , cr ) ) ;
Dictionary : : Ptr tags = tmpl - > Get ( " tags " ) ;
if ( tags ) {
ObjectLock olock ( tags ) ;
2016-08-25 06:19:44 +02:00
for ( const Dictionary : : Pair & pair : tags ) {
2016-04-19 13:54:41 +02:00
// Prevent missing macros from warning; will return an empty value
// which will be filtered out in SendMetric()
String missing_macro ;
2017-05-09 09:01:08 +02:00
tags - > Set ( pair . first , MacroProcessor : : ResolveMacros ( pair . second , resolvers , cr , & missing_macro ) ) ;
2016-04-19 13:54:41 +02:00
}
}
2016-06-08 12:09:21 +02:00
SendPerfdata ( tmpl , checkable , cr , ts ) ;
2016-04-19 13:54:41 +02:00
}
2017-05-09 09:01:08 +02:00
String InfluxdbWriter : : FormatInteger ( int val )
2016-06-08 12:09:21 +02:00
{
return Convert : : ToString ( val ) + " i " ;
}
2017-05-09 09:01:08 +02:00
String InfluxdbWriter : : FormatBoolean ( bool val )
2016-06-08 12:09:21 +02:00
{
2017-05-04 10:29:49 +02:00
return String ( val ) ;
2016-06-08 12:09:21 +02:00
}
void InfluxdbWriter : : SendPerfdata ( const Dictionary : : Ptr & tmpl , const Checkable : : Ptr & checkable , const CheckResult : : Ptr & cr , double ts )
2016-04-19 13:54:41 +02:00
{
Array : : Ptr perfdata = cr - > GetPerformanceData ( ) ;
2016-10-06 11:49:00 +02:00
if ( perfdata ) {
ObjectLock olock ( perfdata ) ;
for ( const Value & val : perfdata ) {
PerfdataValue : : Ptr pdv ;
if ( val . IsObjectType < PerfdataValue > ( ) )
pdv = val ;
else {
try {
pdv = PerfdataValue : : Parse ( val ) ;
} catch ( const std : : exception & ) {
Log ( LogWarning , " InfluxdbWriter " )
< < " Ignoring invalid perfdata value: " < < val ;
continue ;
}
}
2016-04-19 13:54:41 +02:00
2016-10-06 11:49:00 +02:00
Dictionary : : Ptr fields = new Dictionary ( ) ;
fields - > Set ( String ( " value " ) , pdv - > GetValue ( ) ) ;
if ( GetEnableSendThresholds ( ) ) {
if ( pdv - > GetCrit ( ) )
fields - > Set ( String ( " crit " ) , pdv - > GetCrit ( ) ) ;
if ( pdv - > GetWarn ( ) )
fields - > Set ( String ( " warn " ) , pdv - > GetWarn ( ) ) ;
if ( pdv - > GetMin ( ) )
fields - > Set ( String ( " min " ) , pdv - > GetMin ( ) ) ;
if ( pdv - > GetMax ( ) )
fields - > Set ( String ( " max " ) , pdv - > GetMax ( ) ) ;
2016-04-19 13:54:41 +02:00
}
2016-10-06 11:49:00 +02:00
SendMetric ( tmpl , pdv - > GetLabel ( ) , fields , ts ) ;
2016-04-19 13:54:41 +02:00
}
2016-10-06 11:49:00 +02:00
}
2016-06-07 14:35:16 +02:00
2016-10-06 11:49:00 +02:00
if ( GetEnableSendMetadata ( ) ) {
Host : : Ptr host ;
Service : : Ptr service ;
boost : : tie ( host , service ) = GetHostService ( checkable ) ;
Dictionary : : Ptr fields = new Dictionary ( ) ;
2016-06-08 12:09:21 +02:00
2016-10-06 11:49:00 +02:00
if ( service )
fields - > Set ( String ( " state " ) , FormatInteger ( service - > GetState ( ) ) ) ;
else
fields - > Set ( String ( " state " ) , FormatInteger ( host - > GetState ( ) ) ) ;
fields - > Set ( String ( " current_attempt " ) , FormatInteger ( checkable - > GetCheckAttempt ( ) ) ) ;
fields - > Set ( String ( " max_check_attempts " ) , FormatInteger ( checkable - > GetMaxCheckAttempts ( ) ) ) ;
fields - > Set ( String ( " state_type " ) , FormatInteger ( checkable - > GetStateType ( ) ) ) ;
fields - > Set ( String ( " reachable " ) , FormatBoolean ( checkable - > IsReachable ( ) ) ) ;
fields - > Set ( String ( " downtime_depth " ) , FormatInteger ( checkable - > GetDowntimeDepth ( ) ) ) ;
fields - > Set ( String ( " acknowledgement " ) , FormatInteger ( checkable - > GetAcknowledgement ( ) ) ) ;
fields - > Set ( String ( " latency " ) , cr - > CalculateLatency ( ) ) ;
fields - > Set ( String ( " execution_time " ) , cr - > CalculateExecutionTime ( ) ) ;
SendMetric ( tmpl , String ( ) , fields , ts ) ;
2016-04-19 13:54:41 +02:00
}
}
2016-06-07 14:35:16 +02:00
String InfluxdbWriter : : EscapeKey ( const String & str )
{
// Iterate over the key name and escape commas and spaces with a backslash
String result = str ;
2016-08-16 22:16:37 +02:00
boost : : algorithm : : replace_all ( result , " \" " , " \\ \" " ) ;
boost : : algorithm : : replace_all ( result , " = " , " \\ = " ) ;
2016-06-07 14:35:16 +02:00
boost : : algorithm : : replace_all ( result , " , " , " \\ , " ) ;
boost : : algorithm : : replace_all ( result , " " , " \\ " ) ;
2016-07-28 11:28:53 +02:00
// InfluxDB 'feature': although backslashes are allowed in keys they also act
// as escape sequences when followed by ',' or ' '. When your tag is like
// 'metric=C:\' bad things happen. Backslashes themselves cannot be escaped
// and through experimentation they also escape '='. To be safe we replace
// trailing backslashes with and underscore.
size_t length = result . GetLength ( ) ;
if ( result [ length - 1 ] = = ' \\ ' )
result [ length - 1 ] = ' _ ' ;
2016-06-27 11:38:07 +02:00
return result ;
2016-06-07 14:35:16 +02:00
}
String InfluxdbWriter : : EscapeField ( const String & str )
{
2017-05-04 10:29:49 +02:00
//TODO: Evaluate whether boost::regex is really needed here.
2016-06-08 12:09:21 +02:00
// Handle integers
boost : : regex integer ( " -? \\ d+i " ) ;
if ( boost : : regex_match ( str . GetData ( ) , integer ) ) {
return str ;
}
2016-06-07 14:35:16 +02:00
// Handle numerics
boost : : regex numeric ( " -? \\ d+( \\ . \\ d+)?((e|E)[+-]? \\ d+)? " ) ;
if ( boost : : regex_match ( str . GetData ( ) , numeric ) ) {
return str ;
}
// Handle booleans
boost : : regex boolean_true ( " t|true " , boost : : regex : : icase ) ;
if ( boost : : regex_match ( str . GetData ( ) , boolean_true ) )
return " true " ;
boost : : regex boolean_false ( " f|false " , boost : : regex : : icase ) ;
if ( boost : : regex_match ( str . GetData ( ) , boolean_false ) )
return " false " ;
// Otherwise it's a string and needs escaping and quoting
String result = str ;
boost : : algorithm : : replace_all ( result , " \" " , " \\ \" " ) ;
return " \" " + result + " \" " ;
}
void InfluxdbWriter : : SendMetric ( const Dictionary : : Ptr & tmpl , const String & label , const Dictionary : : Ptr & fields , double ts )
2016-04-19 13:54:41 +02:00
{
std : : ostringstream msgbuf ;
2016-06-07 14:35:16 +02:00
msgbuf < < EscapeKey ( tmpl - > Get ( " measurement " ) ) ;
2016-04-19 13:54:41 +02:00
Dictionary : : Ptr tags = tmpl - > Get ( " tags " ) ;
if ( tags ) {
ObjectLock olock ( tags ) ;
2016-08-25 06:19:44 +02:00
for ( const Dictionary : : Pair & pair : tags ) {
2016-04-19 13:54:41 +02:00
// Empty macro expansion, no tag
2016-06-07 14:35:16 +02:00
if ( ! pair . second . IsEmpty ( ) ) {
msgbuf < < " , " < < EscapeKey ( pair . first ) < < " = " < < EscapeKey ( pair . second ) ;
}
2016-04-19 13:54:41 +02:00
}
}
2016-10-06 11:49:00 +02:00
// Label is may be empty in the case of metadata
if ( ! label . IsEmpty ( ) )
msgbuf < < " ,metric= " < < EscapeKey ( label ) ;
msgbuf < < " " ;
2016-06-07 14:35:16 +02:00
2017-05-04 10:29:49 +02:00
{
bool first = true ;
ObjectLock fieldLock ( fields ) ;
for ( const Dictionary : : Pair & pair : fields ) {
if ( first )
first = false ;
else
msgbuf < < " , " ;
msgbuf < < EscapeKey ( pair . first ) < < " = " < < EscapeField ( pair . second ) ;
}
2016-06-07 14:35:16 +02:00
}
msgbuf < < " " < < static_cast < unsigned long > ( ts ) ;
2016-04-19 13:54:41 +02:00
Log ( LogDebug , " InfluxdbWriter " )
2017-05-04 10:29:49 +02:00
< < " Add to metric list: ' " < < msgbuf . str ( ) < < " '. " ;
2016-04-19 13:54:41 +02:00
// Atomically buffer the data point
2017-05-04 10:29:49 +02:00
boost : : mutex : : scoped_lock lock ( m_DataBufferMutex ) ;
m_DataBuffer . push_back ( String ( msgbuf . str ( ) ) ) ;
2016-04-19 13:54:41 +02:00
// Flush if we've buffered too much to prevent excessive memory use
2017-05-08 08:47:27 +02:00
if ( static_cast < int > ( m_DataBuffer . size ( ) ) > = GetFlushThreshold ( ) ) {
2016-04-19 13:54:41 +02:00
Log ( LogDebug , " InfluxdbWriter " )
2017-05-04 10:29:49 +02:00
< < " Data buffer overflow writing " < < m_DataBuffer . size ( ) < < " data points " ;
2016-04-19 13:54:41 +02:00
Flush ( ) ;
}
}
void InfluxdbWriter : : FlushTimeout ( void )
{
// Prevent new data points from being added to the array, there is a
// race condition where they could disappear
2017-05-04 10:29:49 +02:00
boost : : mutex : : scoped_lock lock ( m_DataBufferMutex ) ;
2016-04-19 13:54:41 +02:00
// Flush if there are any data available
2017-05-04 10:29:49 +02:00
if ( m_DataBuffer . size ( ) > 0 ) {
2016-04-19 13:54:41 +02:00
Log ( LogDebug , " InfluxdbWriter " )
2017-05-04 10:29:49 +02:00
< < " Timer expired writing " < < m_DataBuffer . size ( ) < < " data points " ;
2016-04-19 13:54:41 +02:00
Flush ( ) ;
}
}
void InfluxdbWriter : : Flush ( void )
{
2017-05-04 10:29:49 +02:00
// Ensure you hold a lock against m_DataBuffer so that things
// don't go missing after creating the body and clearing the buffer
String body = boost : : algorithm : : join ( m_DataBuffer , " \n " ) ;
m_DataBuffer . clear ( ) ;
// Asynchronously flush the metric body to InfluxDB
m_WorkQueue . Enqueue ( boost : : bind ( & InfluxdbWriter : : FlushHandler , this , body ) ) ;
}
void InfluxdbWriter : : FlushHandler ( const String & body )
{
AssertOnWorkQueue ( ) ;
2017-01-20 16:45:44 +01:00
TcpSocket : : Ptr socket ;
Stream : : Ptr stream = Connect ( socket ) ;
2016-04-19 13:54:41 +02:00
2017-05-04 10:29:49 +02:00
if ( ! stream )
2016-04-19 13:54:41 +02:00
return ;
2017-05-04 10:29:49 +02:00
IncreaseTaskCount ( ) ;
2016-04-19 13:54:41 +02:00
Url : : Ptr url = new Url ( ) ;
url - > SetScheme ( GetSslEnable ( ) ? " https " : " http " ) ;
url - > SetHost ( GetHost ( ) ) ;
url - > SetPort ( GetPort ( ) ) ;
std : : vector < String > path ;
path . push_back ( " write " ) ;
url - > SetPath ( path ) ;
url - > AddQueryElement ( " db " , GetDatabase ( ) ) ;
url - > AddQueryElement ( " precision " , " s " ) ;
if ( ! GetUsername ( ) . IsEmpty ( ) )
url - > AddQueryElement ( " u " , GetUsername ( ) ) ;
if ( ! GetPassword ( ) . IsEmpty ( ) )
url - > AddQueryElement ( " p " , GetPassword ( ) ) ;
HttpRequest req ( stream ) ;
req . RequestMethod = " POST " ;
req . RequestUrl = url ;
try {
req . WriteBody ( body . CStr ( ) , body . GetLength ( ) ) ;
req . Finish ( ) ;
2017-05-04 10:29:49 +02:00
} catch ( const std : : exception & ex ) {
2016-04-19 13:54:41 +02:00
Log ( LogWarning , " InfluxdbWriter " )
< < " Cannot write to TCP socket on host ' " < < GetHost ( ) < < " ' port ' " < < GetPort ( ) < < " '. " ;
2017-05-04 10:29:49 +02:00
throw ex ;
2016-04-19 13:54:41 +02:00
}
2017-05-04 10:29:49 +02:00
//TODO: Evaluate whether waiting for the result makes sense here. KeepAlive and close are options.
2016-04-19 13:54:41 +02:00
HttpResponse resp ( stream , req ) ;
StreamReadContext context ;
2017-01-20 16:45:44 +01:00
struct timeval timeout = { GetSocketTimeout ( ) , 0 } ;
2017-05-04 10:29:49 +02:00
2017-01-20 16:45:44 +01:00
if ( ! socket - > Poll ( true , false , & timeout ) ) {
Log ( LogWarning , " InfluxdbWriter " )
< < " Response timeout of TCP socket from host ' " < < GetHost ( ) < < " ' port ' " < < GetPort ( ) < < " '. " ;
return ;
}
2016-04-19 13:54:41 +02:00
try {
resp . Parse ( context , true ) ;
2017-05-04 10:29:49 +02:00
} catch ( const std : : exception & ex ) {
2016-04-19 13:54:41 +02:00
Log ( LogWarning , " InfluxdbWriter " )
< < " Cannot read from TCP socket from host ' " < < GetHost ( ) < < " ' port ' " < < GetPort ( ) < < " '. " ;
2017-05-04 10:29:49 +02:00
throw ex ;
2016-04-19 13:54:41 +02:00
}
if ( resp . StatusCode ! = 204 ) {
Log ( LogWarning , " InfluxdbWriter " )
< < " Unexpected response code " < < resp . StatusCode ;
2016-07-28 14:29:37 +02:00
// Finish parsing the headers and body
while ( ! resp . Complete )
resp . Parse ( context , true ) ;
String contentType = resp . Headers - > Get ( " content-type " ) ;
if ( contentType ! = " application/json " ) {
Log ( LogWarning , " InfluxdbWriter " )
< < " Unexpected Content-Type: " < < contentType ;
return ;
}
size_t responseSize = resp . GetBodySize ( ) ;
boost : : scoped_array < char > buffer ( new char [ responseSize + 1 ] ) ;
resp . ReadBody ( buffer . get ( ) , responseSize ) ;
buffer . get ( ) [ responseSize ] = ' \0 ' ;
Dictionary : : Ptr jsonResponse ;
try {
jsonResponse = JsonDecode ( buffer . get ( ) ) ;
} catch ( . . . ) {
Log ( LogWarning , " InfluxdbWriter " )
< < " Unable to parse JSON response: \n " < < buffer . get ( ) ;
return ;
}
String error = jsonResponse - > Get ( " error " ) ;
Log ( LogCritical , " InfluxdbWriter " )
< < " InfluxDB error message: \n " < < error ;
2016-04-19 13:54:41 +02:00
}
}
2017-05-04 10:29:49 +02:00
void InfluxdbWriter : : IncreaseTaskCount ( void )
{
double now = Utility : : GetTime ( ) ;
boost : : mutex : : scoped_lock lock ( m_StatsMutex ) ;
m_TaskStats . InsertValue ( now , 1 ) ;
}
int InfluxdbWriter : : GetTaskCount ( RingBuffer : : SizeType span ) const
{
boost : : mutex : : scoped_lock lock ( m_StatsMutex ) ;
return m_TaskStats . GetValues ( span ) ;
}
2016-04-19 13:54:41 +02:00
void InfluxdbWriter : : ValidateHostTemplate ( const Dictionary : : Ptr & value , const ValidationUtils & utils )
{
ObjectImpl < InfluxdbWriter > : : ValidateHostTemplate ( value , utils ) ;
String measurement = value - > Get ( " measurement " ) ;
if ( ! MacroProcessor : : ValidateMacroString ( measurement ) )
BOOST_THROW_EXCEPTION ( ValidationError ( this , boost : : assign : : list_of ( " host_template " ) ( " measurement " ) , " Closing $ not found in macro format string ' " + measurement + " '. " ) ) ;
Dictionary : : Ptr tags = value - > Get ( " tags " ) ;
if ( tags ) {
ObjectLock olock ( tags ) ;
2016-08-25 06:19:44 +02:00
for ( const Dictionary : : Pair & pair : tags ) {
2016-04-19 13:54:41 +02:00
if ( ! MacroProcessor : : ValidateMacroString ( pair . second ) )
BOOST_THROW_EXCEPTION ( ValidationError ( this , boost : : assign : : list_of < String > ( " host_template " ) ( " tags " ) ( pair . first ) , " Closing $ not found in macro format string ' " + pair . second ) ) ;
}
}
}
void InfluxdbWriter : : ValidateServiceTemplate ( const Dictionary : : Ptr & value , const ValidationUtils & utils )
{
ObjectImpl < InfluxdbWriter > : : ValidateServiceTemplate ( value , utils ) ;
String measurement = value - > Get ( " measurement " ) ;
if ( ! MacroProcessor : : ValidateMacroString ( measurement ) )
BOOST_THROW_EXCEPTION ( ValidationError ( this , boost : : assign : : list_of ( " service_template " ) ( " measurement " ) , " Closing $ not found in macro format string ' " + measurement + " '. " ) ) ;
Dictionary : : Ptr tags = value - > Get ( " tags " ) ;
if ( tags ) {
ObjectLock olock ( tags ) ;
2016-08-25 06:19:44 +02:00
for ( const Dictionary : : Pair & pair : tags ) {
2016-04-19 13:54:41 +02:00
if ( ! MacroProcessor : : ValidateMacroString ( pair . second ) )
BOOST_THROW_EXCEPTION ( ValidationError ( this , boost : : assign : : list_of < String > ( " service_template " ) ( " tags " ) ( pair . first ) , " Closing $ not found in macro format string ' " + pair . second ) ) ;
}
}
}