mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-10-27 01:03:50 +01:00 
			
		
		
		
	before permitting their parent objects' destruction. For the cases where the handlers have raw pointers to these objects.
		
			
				
	
	
		
			515 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			515 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
 | |
| 
 | |
| #include "perfdata/graphitewriter.hpp"
 | |
| #include "perfdata/graphitewriter-ti.cpp"
 | |
| #include "icinga/service.hpp"
 | |
| #include "icinga/checkcommand.hpp"
 | |
| #include "icinga/macroprocessor.hpp"
 | |
| #include "icinga/icingaapplication.hpp"
 | |
| #include "base/tcpsocket.hpp"
 | |
| #include "base/configtype.hpp"
 | |
| #include "base/objectlock.hpp"
 | |
| #include "base/logger.hpp"
 | |
| #include "base/convert.hpp"
 | |
| #include "base/utility.hpp"
 | |
| #include "base/perfdatavalue.hpp"
 | |
| #include "base/application.hpp"
 | |
| #include "base/stream.hpp"
 | |
| #include "base/networkstream.hpp"
 | |
| #include "base/exception.hpp"
 | |
| #include "base/statsfunction.hpp"
 | |
| #include <boost/algorithm/string.hpp>
 | |
| #include <boost/algorithm/string/replace.hpp>
 | |
| #include <utility>
 | |
| 
 | |
| using namespace icinga;
 | |
| 
 | |
| REGISTER_TYPE(GraphiteWriter);
 | |
| 
 | |
| REGISTER_STATSFUNCTION(GraphiteWriter, &GraphiteWriter::StatsFunc);
 | |
| 
 | |
| /*
 | |
|  * Enable HA capabilities once the config object is loaded.
 | |
|  */
 | |
| void GraphiteWriter::OnConfigLoaded()
 | |
| {
 | |
| 	ObjectImpl<GraphiteWriter>::OnConfigLoaded();
 | |
| 
 | |
| 	m_WorkQueue.SetName("GraphiteWriter, " + GetName());
 | |
| 
 | |
| 	if (!GetEnableHa()) {
 | |
| 		Log(LogDebug, "GraphiteWriter")
 | |
| 			<< "HA functionality disabled. Won't pause connection: " << GetName();
 | |
| 
 | |
| 		SetHAMode(HARunEverywhere);
 | |
| 	} else {
 | |
| 		SetHAMode(HARunOnce);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Feature stats interface
 | |
|  *
 | |
|  * @param status Key value pairs for feature stats
 | |
|  * @param perfdata Array of PerfdataValue objects
 | |
|  */
 | |
| void GraphiteWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
 | |
| {
 | |
| 	DictionaryData nodes;
 | |
| 
 | |
| 	for (const GraphiteWriter::Ptr& graphitewriter : ConfigType::GetObjectsByType<GraphiteWriter>()) {
 | |
| 		size_t workQueueItems = graphitewriter->m_WorkQueue.GetLength();
 | |
| 		double workQueueItemRate = graphitewriter->m_WorkQueue.GetTaskCount(60) / 60.0;
 | |
| 
 | |
| 		nodes.emplace_back(graphitewriter->GetName(), new Dictionary({
 | |
| 			{ "work_queue_items", workQueueItems },
 | |
| 			{ "work_queue_item_rate", workQueueItemRate },
 | |
| 			{ "connected", graphitewriter->GetConnected() }
 | |
| 		}));
 | |
| 
 | |
| 		perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_items", workQueueItems));
 | |
| 		perfdata->Add(new PerfdataValue("graphitewriter_" + graphitewriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
 | |
| 	}
 | |
| 
 | |
| 	status->Set("graphitewriter", new Dictionary(std::move(nodes)));
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Resume is equivalent to Start, but with HA capabilities to resume at runtime.
 | |
|  */
 | |
| void GraphiteWriter::Resume()
 | |
| {
 | |
| 	ObjectImpl<GraphiteWriter>::Resume();
 | |
| 
 | |
| 	Log(LogInformation, "GraphiteWriter")
 | |
| 		<< "'" << GetName() << "' resumed.";
 | |
| 
 | |
| 	/* Register exception handler for WQ tasks. */
 | |
| 	m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
 | |
| 
 | |
| 	/* Timer for reconnecting */
 | |
| 	m_ReconnectTimer = Timer::Create();
 | |
| 	m_ReconnectTimer->SetInterval(10);
 | |
| 	m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
 | |
| 	m_ReconnectTimer->Start();
 | |
| 	m_ReconnectTimer->Reschedule(0);
 | |
| 
 | |
| 	/* Register event handlers. */
 | |
| 	m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
 | |
| 		const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
 | |
| 		CheckResultHandler(checkable, cr);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Pause is equivalent to Stop, but with HA capabilities to resume at runtime.
 | |
|  */
 | |
| void GraphiteWriter::Pause()
 | |
| {
 | |
| 	m_HandleCheckResults.disconnect();
 | |
| 	m_ReconnectTimer->Stop(true);
 | |
| 
 | |
| 	try {
 | |
| 		ReconnectInternal();
 | |
| 	} catch (const std::exception&) {
 | |
| 		Log(LogInformation, "GraphiteWriter")
 | |
| 			<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
 | |
| 
 | |
| 		ObjectImpl<GraphiteWriter>::Pause();
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	m_WorkQueue.Join();
 | |
| 	DisconnectInternal();
 | |
| 
 | |
| 	Log(LogInformation, "GraphiteWriter")
 | |
| 		<< "'" << GetName() << "' paused.";
 | |
| 
 | |
| 	ObjectImpl<GraphiteWriter>::Pause();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check if method is called inside the WQ thread.
 | |
|  */
 | |
| void GraphiteWriter::AssertOnWorkQueue()
 | |
| {
 | |
| 	ASSERT(m_WorkQueue.IsWorkerThread());
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Exception handler for the WQ.
 | |
|  *
 | |
|  * Closes the connection if connected.
 | |
|  *
 | |
|  * @param exp Exception pointer
 | |
|  */
 | |
| void GraphiteWriter::ExceptionHandler(boost::exception_ptr exp)
 | |
| {
 | |
| 	Log(LogCritical, "GraphiteWriter", "Exception during Graphite operation: Verify that your backend is operational!");
 | |
| 
 | |
| 	Log(LogDebug, "GraphiteWriter")
 | |
| 		<< "Exception during Graphite operation: " << DiagnosticInformation(std::move(exp));
 | |
| 
 | |
| 	if (GetConnected()) {
 | |
| 		m_Stream->close();
 | |
| 
 | |
| 		SetConnected(false);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Reconnect method, stops when the feature is paused in HA zones.
 | |
|  *
 | |
|  * Called inside the WQ.
 | |
|  */
 | |
| void GraphiteWriter::Reconnect()
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	if (IsPaused()) {
 | |
| 		SetConnected(false);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	ReconnectInternal();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Reconnect method, connects to a TCP Stream
 | |
|  */
 | |
| void GraphiteWriter::ReconnectInternal()
 | |
| {
 | |
| 	double startTime = Utility::GetTime();
 | |
| 
 | |
| 	CONTEXT("Reconnecting to Graphite '" << GetName() << "'");
 | |
| 
 | |
| 	SetShouldConnect(true);
 | |
| 
 | |
| 	if (GetConnected())
 | |
| 		return;
 | |
| 
 | |
| 	Log(LogNotice, "GraphiteWriter")
 | |
| 		<< "Reconnecting to Graphite on host '" << GetHost() << "' port '" << GetPort() << "'.";
 | |
| 
 | |
| 	m_Stream = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
 | |
| 
 | |
| 	try {
 | |
| 		icinga::Connect(m_Stream->lowest_layer(), GetHost(), GetPort());
 | |
| 	} catch (const std::exception& ex) {
 | |
| 		Log(LogWarning, "GraphiteWriter")
 | |
| 			<< "Can't connect to Graphite on host '" << GetHost() << "' port '" << GetPort() << ".'";
 | |
| 
 | |
| 		SetConnected(false);
 | |
| 
 | |
| 		throw;
 | |
| 	}
 | |
| 
 | |
| 	SetConnected(true);
 | |
| 
 | |
| 	Log(LogInformation, "GraphiteWriter")
 | |
| 		<< "Finished reconnecting to Graphite in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Reconnect handler called by the timer.
 | |
|  *
 | |
|  * Enqueues a reconnect task into the WQ.
 | |
|  */
 | |
| void GraphiteWriter::ReconnectTimerHandler()
 | |
| {
 | |
| 	if (IsPaused())
 | |
| 		return;
 | |
| 
 | |
| 	m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityHigh);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Disconnect the stream.
 | |
|  *
 | |
|  * Called inside the WQ.
 | |
|  */
 | |
| void GraphiteWriter::Disconnect()
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	DisconnectInternal();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Disconnect the stream.
 | |
|  *
 | |
|  * Called outside the WQ.
 | |
|  */
 | |
| void GraphiteWriter::DisconnectInternal()
 | |
| {
 | |
| 	if (!GetConnected())
 | |
| 		return;
 | |
| 
 | |
| 	m_Stream->close();
 | |
| 
 | |
| 	SetConnected(false);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check result event handler, checks whether feature is not paused in HA setups.
 | |
|  *
 | |
|  * @param checkable Host/Service object
 | |
|  * @param cr Check result including performance data
 | |
|  */
 | |
| void GraphiteWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 | |
| {
 | |
| 	if (IsPaused())
 | |
| 		return;
 | |
| 
 | |
| 	m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Check result event handler, prepares metadata and perfdata values and calls Send*()
 | |
|  *
 | |
|  * Called inside the WQ.
 | |
|  *
 | |
|  * @param checkable Host/Service object
 | |
|  * @param cr Check result including performance data
 | |
|  */
 | |
| void GraphiteWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	CONTEXT("Processing check result for '" << checkable->GetName() << "'");
 | |
| 
 | |
| 	/* TODO: Deal with missing connection here. Needs refactoring
 | |
| 	 * into parsing the actual performance data and then putting it
 | |
| 	 * into a queue for re-inserting. */
 | |
| 
 | |
| 	if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
 | |
| 		return;
 | |
| 
 | |
| 	Host::Ptr host;
 | |
| 	Service::Ptr service;
 | |
| 	tie(host, service) = GetHostService(checkable);
 | |
| 
 | |
| 	MacroProcessor::ResolverList resolvers;
 | |
| 	if (service)
 | |
| 		resolvers.emplace_back("service", service);
 | |
| 	resolvers.emplace_back("host", host);
 | |
| 
 | |
| 	String prefix;
 | |
| 
 | |
| 	if (service) {
 | |
| 		prefix = MacroProcessor::ResolveMacros(GetServiceNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value {
 | |
| 			return EscapeMacroMetric(value);
 | |
| 		});
 | |
| 	} else {
 | |
| 		prefix = MacroProcessor::ResolveMacros(GetHostNameTemplate(), resolvers, cr, nullptr, [](const Value& value) -> Value {
 | |
| 			return EscapeMacroMetric(value);
 | |
| 		});
 | |
| 	}
 | |
| 
 | |
| 	String prefixPerfdata = prefix + ".perfdata";
 | |
| 	String prefixMetadata = prefix + ".metadata";
 | |
| 
 | |
| 	double ts = cr->GetExecutionEnd();
 | |
| 
 | |
| 	if (GetEnableSendMetadata()) {
 | |
| 		if (service) {
 | |
| 			SendMetric(checkable, prefixMetadata, "state", service->GetState(), ts);
 | |
| 		} else {
 | |
| 			SendMetric(checkable, prefixMetadata, "state", host->GetState(), ts);
 | |
| 		}
 | |
| 
 | |
| 		SendMetric(checkable, prefixMetadata, "current_attempt", checkable->GetCheckAttempt(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "max_check_attempts", checkable->GetMaxCheckAttempts(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "state_type", checkable->GetStateType(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "reachable", checkable->IsReachable(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "downtime_depth", checkable->GetDowntimeDepth(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "acknowledgement", checkable->GetAcknowledgement(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "latency", cr->CalculateLatency(), ts);
 | |
| 		SendMetric(checkable, prefixMetadata, "execution_time", cr->CalculateExecutionTime(), ts);
 | |
| 	}
 | |
| 
 | |
| 	SendPerfdata(checkable, prefixPerfdata, cr, ts);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Parse performance data from check result and call SendMetric()
 | |
|  *
 | |
|  * @param checkable Host/service object
 | |
|  * @param prefix Metric prefix string
 | |
|  * @param cr Check result including performance data
 | |
|  * @param ts Timestamp when the check result was created
 | |
|  */
 | |
| void GraphiteWriter::SendPerfdata(const Checkable::Ptr& checkable, const String& prefix, const CheckResult::Ptr& cr, double ts)
 | |
| {
 | |
| 	Array::Ptr perfdata = cr->GetPerformanceData();
 | |
| 
 | |
| 	if (!perfdata)
 | |
| 		return;
 | |
| 
 | |
| 	CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
 | |
| 
 | |
| 	ObjectLock olock(perfdata);
 | |
| 	for (const Value& val : perfdata) {
 | |
| 		PerfdataValue::Ptr pdv;
 | |
| 
 | |
| 		if (val.IsObjectType<PerfdataValue>())
 | |
| 			pdv = val;
 | |
| 		else {
 | |
| 			try {
 | |
| 				pdv = PerfdataValue::Parse(val);
 | |
| 			} catch (const std::exception&) {
 | |
| 				Log(LogWarning, "GraphiteWriter")
 | |
| 					<< "Ignoring invalid perfdata for checkable '"
 | |
| 					<< checkable->GetName() << "' and command '"
 | |
| 					<< checkCommand->GetName() << "' with value: " << val;
 | |
| 				continue;
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		String escapedKey = EscapeMetricLabel(pdv->GetLabel());
 | |
| 
 | |
| 		SendMetric(checkable, prefix, escapedKey + ".value", pdv->GetValue(), ts);
 | |
| 
 | |
| 		if (GetEnableSendThresholds()) {
 | |
| 			if (!pdv->GetCrit().IsEmpty())
 | |
| 				SendMetric(checkable, prefix, escapedKey + ".crit", pdv->GetCrit(), ts);
 | |
| 			if (!pdv->GetWarn().IsEmpty())
 | |
| 				SendMetric(checkable, prefix, escapedKey + ".warn", pdv->GetWarn(), ts);
 | |
| 			if (!pdv->GetMin().IsEmpty())
 | |
| 				SendMetric(checkable, prefix, escapedKey + ".min", pdv->GetMin(), ts);
 | |
| 			if (!pdv->GetMax().IsEmpty())
 | |
| 				SendMetric(checkable, prefix, escapedKey + ".max", pdv->GetMax(), ts);
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Computes metric data and sends to Graphite
 | |
|  *
 | |
|  * @param checkable Host/service object
 | |
|  * @param prefix Computed metric prefix string
 | |
|  * @param name Metric name
 | |
|  * @param value Metric value
 | |
|  * @param ts Timestamp when the check result was created
 | |
|  */
 | |
| void GraphiteWriter::SendMetric(const Checkable::Ptr& checkable, const String& prefix, const String& name, double value, double ts)
 | |
| {
 | |
| 	namespace asio = boost::asio;
 | |
| 
 | |
| 	std::ostringstream msgbuf;
 | |
| 	msgbuf << prefix << "." << name << " " << Convert::ToString(value) << " " << static_cast<long>(ts);
 | |
| 
 | |
| 	Log(LogDebug, "GraphiteWriter")
 | |
| 		<< "Checkable '" << checkable->GetName() << "' adds to metric list: '" << msgbuf.str() << "'.";
 | |
| 
 | |
| 	// do not send \n to debug log
 | |
| 	msgbuf << "\n";
 | |
| 
 | |
| 	std::unique_lock<std::mutex> lock(m_StreamMutex);
 | |
| 
 | |
| 	if (!GetConnected())
 | |
| 		return;
 | |
| 
 | |
| 	try {
 | |
| 		asio::write(*m_Stream, asio::buffer(msgbuf.str()));
 | |
| 		m_Stream->flush();
 | |
| 	} catch (const std::exception& ex) {
 | |
| 		Log(LogCritical, "GraphiteWriter")
 | |
| 			<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
 | |
| 
 | |
| 		throw ex;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Escape metric tree elements
 | |
|  *
 | |
|  * Dots are not allowed, e.g. in host names
 | |
|  *
 | |
|  * @param str Metric part name
 | |
|  * @return Escape string
 | |
|  */
 | |
| String GraphiteWriter::EscapeMetric(const String& str)
 | |
| {
 | |
| 	String result = str;
 | |
| 
 | |
| 	//don't allow '.' in metric prefixes
 | |
| 	boost::replace_all(result, " ", "_");
 | |
| 	boost::replace_all(result, ".", "_");
 | |
| 	boost::replace_all(result, "\\", "_");
 | |
| 	boost::replace_all(result, "/", "_");
 | |
| 
 | |
| 	return result;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Escape metric label
 | |
|  *
 | |
|  * Dots are allowed - users can create trees from perfdata labels
 | |
|  *
 | |
|  * @param str Metric label name
 | |
|  * @return Escaped string
 | |
|  */
 | |
| String GraphiteWriter::EscapeMetricLabel(const String& str)
 | |
| {
 | |
| 	String result = str;
 | |
| 
 | |
| 	//allow to pass '.' in perfdata labels
 | |
| 	boost::replace_all(result, " ", "_");
 | |
| 	boost::replace_all(result, "\\", "_");
 | |
| 	boost::replace_all(result, "/", "_");
 | |
| 	boost::replace_all(result, "::", ".");
 | |
| 
 | |
| 	return result;
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Escape macro metrics found via host/service name templates
 | |
|  *
 | |
|  * @param value Array or string with macro metric names
 | |
|  * @return Escaped string. Arrays are joined with dots.
 | |
|  */
 | |
| Value GraphiteWriter::EscapeMacroMetric(const Value& value)
 | |
| {
 | |
| 	if (value.IsObjectType<Array>()) {
 | |
| 		Array::Ptr arr = value;
 | |
| 		ArrayData result;
 | |
| 
 | |
| 		ObjectLock olock(arr);
 | |
| 		for (const Value& arg : arr) {
 | |
| 			result.push_back(EscapeMetric(arg));
 | |
| 		}
 | |
| 
 | |
| 		return Utility::Join(new Array(std::move(result)), '.');
 | |
| 	} else
 | |
| 		return EscapeMetric(value);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Validate the configuration setting 'host_name_template'
 | |
|  *
 | |
|  * @param lvalue String containing runtime macros.
 | |
|  * @param utils Helper, unused
 | |
|  */
 | |
| void GraphiteWriter::ValidateHostNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
 | |
| {
 | |
| 	ObjectImpl<GraphiteWriter>::ValidateHostNameTemplate(lvalue, utils);
 | |
| 
 | |
| 	if (!MacroProcessor::ValidateMacroString(lvalue()))
 | |
| 		BOOST_THROW_EXCEPTION(ValidationError(this, { "host_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Validate the configuration setting 'service_name_template'
 | |
|  *
 | |
|  * @param lvalue String containing runtime macros.
 | |
|  * @param utils Helper, unused
 | |
|  */
 | |
| void GraphiteWriter::ValidateServiceNameTemplate(const Lazy<String>& lvalue, const ValidationUtils& utils)
 | |
| {
 | |
| 	ObjectImpl<GraphiteWriter>::ValidateServiceNameTemplate(lvalue, utils);
 | |
| 
 | |
| 	if (!MacroProcessor::ValidateMacroString(lvalue()))
 | |
| 		BOOST_THROW_EXCEPTION(ValidationError(this, { "service_name_template" }, "Closing $ not found in macro format string '" + lvalue() + "'."));
 | |
| }
 |