mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-10-26 08:43:51 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			539 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			539 lines
		
	
	
		
			16 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
 | |
| 
 | |
| #include "perfdata/gelfwriter.hpp"
 | |
| #include "perfdata/gelfwriter-ti.cpp"
 | |
| #include "icinga/service.hpp"
 | |
| #include "icinga/notification.hpp"
 | |
| #include "icinga/checkcommand.hpp"
 | |
| #include "icinga/macroprocessor.hpp"
 | |
| #include "icinga/compatutility.hpp"
 | |
| #include "base/tcpsocket.hpp"
 | |
| #include "base/configtype.hpp"
 | |
| #include "base/objectlock.hpp"
 | |
| #include "base/logger.hpp"
 | |
| #include "base/utility.hpp"
 | |
| #include "base/perfdatavalue.hpp"
 | |
| #include "base/application.hpp"
 | |
| #include "base/stream.hpp"
 | |
| #include "base/networkstream.hpp"
 | |
| #include "base/context.hpp"
 | |
| #include "base/exception.hpp"
 | |
| #include "base/json.hpp"
 | |
| #include "base/statsfunction.hpp"
 | |
| #include <boost/algorithm/string/replace.hpp>
 | |
| #include <utility>
 | |
| #include "base/io-engine.hpp"
 | |
| #include <boost/asio/write.hpp>
 | |
| #include <boost/asio/buffer.hpp>
 | |
| #include <boost/system/error_code.hpp>
 | |
| #include <boost/asio/error.hpp>
 | |
| 
 | |
| using namespace icinga;
 | |
| 
 | |
| REGISTER_TYPE(GelfWriter);
 | |
| 
 | |
| REGISTER_STATSFUNCTION(GelfWriter, &GelfWriter::StatsFunc);
 | |
| 
 | |
| void GelfWriter::OnConfigLoaded()
 | |
| {
 | |
| 	ObjectImpl<GelfWriter>::OnConfigLoaded();
 | |
| 
 | |
| 	m_WorkQueue.SetName("GelfWriter, " + GetName());
 | |
| 
 | |
| 	if (!GetEnableHa()) {
 | |
| 		Log(LogDebug, "GelfWriter")
 | |
| 			<< "HA functionality disabled. Won't pause connection: " << GetName();
 | |
| 
 | |
| 		SetHAMode(HARunEverywhere);
 | |
| 	} else {
 | |
| 		SetHAMode(HARunOnce);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void GelfWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
 | |
| {
 | |
| 	DictionaryData nodes;
 | |
| 
 | |
| 	for (const GelfWriter::Ptr& gelfwriter : ConfigType::GetObjectsByType<GelfWriter>()) {
 | |
| 		size_t workQueueItems = gelfwriter->m_WorkQueue.GetLength();
 | |
| 		double workQueueItemRate = gelfwriter->m_WorkQueue.GetTaskCount(60) / 60.0;
 | |
| 
 | |
| 		nodes.emplace_back(gelfwriter->GetName(), new Dictionary({
 | |
| 			{ "work_queue_items", workQueueItems },
 | |
| 			{ "work_queue_item_rate", workQueueItemRate },
 | |
| 			{ "connected", gelfwriter->GetConnected() },
 | |
| 			{ "source", gelfwriter->GetSource() }
 | |
| 		}));
 | |
| 
 | |
| 		perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_items", workQueueItems));
 | |
| 		perfdata->Add(new PerfdataValue("gelfwriter_" + gelfwriter->GetName() + "_work_queue_item_rate", workQueueItemRate));
 | |
| 	}
 | |
| 
 | |
| 	status->Set("gelfwriter", new Dictionary(std::move(nodes)));
 | |
| }
 | |
| 
 | |
| void GelfWriter::Resume()
 | |
| {
 | |
| 	ObjectImpl<GelfWriter>::Resume();
 | |
| 
 | |
| 	Log(LogInformation, "GelfWriter")
 | |
| 		<< "'" << GetName() << "' resumed.";
 | |
| 
 | |
| 	/* Register exception handler for WQ tasks. */
 | |
| 	m_WorkQueue.SetExceptionCallback([this](boost::exception_ptr exp) { ExceptionHandler(std::move(exp)); });
 | |
| 
 | |
| 	/* Timer for reconnecting */
 | |
| 	m_ReconnectTimer = new Timer();
 | |
| 	m_ReconnectTimer->SetInterval(10);
 | |
| 	m_ReconnectTimer->OnTimerExpired.connect([this](const Timer * const&) { ReconnectTimerHandler(); });
 | |
| 	m_ReconnectTimer->Start();
 | |
| 	m_ReconnectTimer->Reschedule(0);
 | |
| 
 | |
| 	/* Register event handlers. */
 | |
| 	m_HandleCheckResults = Checkable::OnNewCheckResult.connect([this](const Checkable::Ptr& checkable,
 | |
| 		const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
 | |
| 		CheckResultHandler(checkable, cr);
 | |
| 	});
 | |
| 	m_HandleNotifications = Checkable::OnNotificationSentToUser.connect([this](const Notification::Ptr& notification,
 | |
| 		const Checkable::Ptr& checkable, const User::Ptr& user, const NotificationType& type, const CheckResult::Ptr& cr,
 | |
| 		const String& author, const String& commentText, const String& commandName, const MessageOrigin::Ptr&) {
 | |
| 		NotificationToUserHandler(notification, checkable, user, type, cr, author, commentText, commandName);
 | |
| 	});
 | |
| 	m_HandleStateChanges = Checkable::OnStateChange.connect([this](const Checkable::Ptr& checkable,
 | |
| 		const CheckResult::Ptr& cr, StateType type, const MessageOrigin::Ptr&) {
 | |
| 		StateChangeHandler(checkable, cr, type);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
 | |
| void GelfWriter::Pause()
 | |
| {
 | |
| 	m_HandleCheckResults.disconnect();
 | |
| 	m_HandleNotifications.disconnect();
 | |
| 	m_HandleStateChanges.disconnect();
 | |
| 
 | |
| 	m_ReconnectTimer.reset();
 | |
| 
 | |
| 	try {
 | |
| 		ReconnectInternal();
 | |
| 	} catch (const std::exception&) {
 | |
| 		Log(LogInformation, "GelfWriter")
 | |
| 			<< "'" << GetName() << "' paused. Unable to connect, not flushing buffers. Data may be lost on reload.";
 | |
| 
 | |
| 		ObjectImpl<GelfWriter>::Pause();
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	m_WorkQueue.Join();
 | |
| 	DisconnectInternal();
 | |
| 
 | |
| 	Log(LogInformation, "GelfWriter")
 | |
| 		<< "'" << GetName() << "' paused.";
 | |
| 
 | |
| 	ObjectImpl<GelfWriter>::Pause();
 | |
| }
 | |
| 
 | |
| void GelfWriter::AssertOnWorkQueue()
 | |
| {
 | |
| 	ASSERT(m_WorkQueue.IsWorkerThread());
 | |
| }
 | |
| 
 | |
| void GelfWriter::ExceptionHandler(boost::exception_ptr exp)
 | |
| {
 | |
| 	Log(LogCritical, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, false);
 | |
| 	Log(LogDebug, "GelfWriter") << "Exception during Graylog Gelf operation: " << DiagnosticInformation(exp, true);
 | |
| 
 | |
| 	DisconnectInternal();
 | |
| }
 | |
| 
 | |
| void GelfWriter::Reconnect()
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	if (IsPaused()) {
 | |
| 		SetConnected(false);
 | |
| 		return;
 | |
| 	}
 | |
| 
 | |
| 	ReconnectInternal();
 | |
| }
 | |
| 
 | |
| void GelfWriter::ReconnectInternal()
 | |
| {
 | |
| 	double startTime = Utility::GetTime();
 | |
| 
 | |
| 	CONTEXT("Reconnecting to Graylog Gelf '" + GetName() + "'");
 | |
| 
 | |
| 	SetShouldConnect(true);
 | |
| 
 | |
| 	if (GetConnected())
 | |
| 		return;
 | |
| 
 | |
| 	Log(LogNotice, "GelfWriter")
 | |
| 		<< "Reconnecting to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << "'.";
 | |
| 
 | |
| 	bool ssl = GetEnableTls();
 | |
| 
 | |
| 	if (ssl) {
 | |
| 		Shared<boost::asio::ssl::context>::Ptr sslContext;
 | |
| 
 | |
| 		try {
 | |
| 			sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
 | |
| 		} catch (const std::exception& ex) {
 | |
| 			Log(LogWarning, "GelfWriter")
 | |
| 				<< "Unable to create SSL context.";
 | |
| 			throw;
 | |
| 		}
 | |
| 
 | |
| 		m_Stream.first = Shared<AsioTlsStream>::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
 | |
| 
 | |
| 	} else {
 | |
| 		m_Stream.second = Shared<AsioTcpStream>::Make(IoEngine::Get().GetIoContext());
 | |
| 	}
 | |
| 
 | |
| 	try {
 | |
| 		icinga::Connect(ssl ? m_Stream.first->lowest_layer() : m_Stream.second->lowest_layer(), GetHost(), GetPort());
 | |
| 	} catch (const std::exception& ex) {
 | |
| 		Log(LogWarning, "GelfWriter")
 | |
| 			<< "Can't connect to Graylog Gelf on host '" << GetHost() << "' port '" << GetPort() << ".'";
 | |
| 		throw;
 | |
| 	}
 | |
| 
 | |
| 	if (ssl) {
 | |
| 		auto& tlsStream (m_Stream.first->next_layer());
 | |
| 
 | |
| 		try {
 | |
| 			tlsStream.handshake(tlsStream.client);
 | |
| 		} catch (const std::exception& ex) {
 | |
| 			Log(LogWarning, "GelfWriter")
 | |
| 				<< "TLS handshake with host '" << GetHost() << " failed.'";
 | |
| 			throw;
 | |
| 		}
 | |
| 
 | |
| 		if (!GetInsecureNoverify()) {
 | |
| 			if (!tlsStream.GetPeerCertificate()) {
 | |
| 				BOOST_THROW_EXCEPTION(std::runtime_error("Graylog Gelf didn't present any TLS certificate."));
 | |
| 			}
 | |
| 
 | |
| 			if (!tlsStream.IsVerifyOK()) {
 | |
| 				BOOST_THROW_EXCEPTION(std::runtime_error(
 | |
| 					"TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
 | |
| 				));
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	SetConnected(true);
 | |
| 
 | |
| 	Log(LogInformation, "GelfWriter")
 | |
| 		<< "Finished reconnecting to Graylog Gelf in " << std::setw(2) << Utility::GetTime() - startTime << " second(s).";
 | |
| }
 | |
| 
 | |
| void GelfWriter::ReconnectTimerHandler()
 | |
| {
 | |
| 	m_WorkQueue.Enqueue([this]() { Reconnect(); }, PriorityNormal);
 | |
| }
 | |
| 
 | |
| void GelfWriter::Disconnect()
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	DisconnectInternal();
 | |
| }
 | |
| 
 | |
| void GelfWriter::DisconnectInternal()
 | |
| {
 | |
| 	if (!GetConnected())
 | |
| 		return;
 | |
| 
 | |
| 	if (m_Stream.first) {
 | |
| 		boost::system::error_code ec;
 | |
| 		m_Stream.first->next_layer().shutdown(ec);
 | |
| 
 | |
| 		// https://stackoverflow.com/a/25703699
 | |
| 		// As long as the error code's category is not an SSL category, then the protocol was securely shutdown
 | |
| 		if (ec.category() == boost::asio::error::get_ssl_category()) {
 | |
| 			Log(LogCritical, "GelfWriter")
 | |
| 				<< "TLS shutdown with host '" << GetHost() << "' could not be done securely.";
 | |
| 		}
 | |
| 	} else if (m_Stream.second) {
 | |
| 		m_Stream.second->close();
 | |
| 	}
 | |
| 
 | |
| 	SetConnected(false);
 | |
| 
 | |
| }
 | |
| 
 | |
| void GelfWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 | |
| {
 | |
| 	if (IsPaused())
 | |
| 		return;
 | |
| 
 | |
| 	m_WorkQueue.Enqueue([this, checkable, cr]() { CheckResultHandlerInternal(checkable, cr); });
 | |
| }
 | |
| 
 | |
| void GelfWriter::CheckResultHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	CONTEXT("GELF Processing check result for '" + checkable->GetName() + "'");
 | |
| 
 | |
| 	Log(LogDebug, "GelfWriter")
 | |
| 		<< "Processing check result for '" << checkable->GetName() << "'";
 | |
| 
 | |
| 	Host::Ptr host;
 | |
| 	Service::Ptr service;
 | |
| 	tie(host, service) = GetHostService(checkable);
 | |
| 
 | |
| 	Dictionary::Ptr fields = new Dictionary();
 | |
| 
 | |
| 	if (service) {
 | |
| 		fields->Set("_service_name", service->GetShortName());
 | |
| 		fields->Set("_service_state", Service::StateToString(service->GetState()));
 | |
| 		fields->Set("_last_state", service->GetLastState());
 | |
| 		fields->Set("_last_hard_state", service->GetLastHardState());
 | |
| 	} else {
 | |
| 		fields->Set("_last_state", host->GetLastState());
 | |
| 		fields->Set("_last_hard_state", host->GetLastHardState());
 | |
| 	}
 | |
| 
 | |
| 	fields->Set("_hostname", host->GetName());
 | |
| 	fields->Set("_type", "CHECK RESULT");
 | |
| 	fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
 | |
| 
 | |
| 	fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
 | |
| 	fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
 | |
| 
 | |
| 	fields->Set("_reachable", checkable->IsReachable());
 | |
| 
 | |
| 	CheckCommand::Ptr checkCommand = checkable->GetCheckCommand();
 | |
| 
 | |
| 	if (checkCommand)
 | |
| 		fields->Set("_check_command", checkCommand->GetName());
 | |
| 
 | |
| 	double ts = Utility::GetTime();
 | |
| 
 | |
| 	if (cr) {
 | |
| 		fields->Set("_latency", cr->CalculateLatency());
 | |
| 		fields->Set("_execution_time", cr->CalculateExecutionTime());
 | |
| 		fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
 | |
| 		fields->Set("full_message", cr->GetOutput());
 | |
| 		fields->Set("_check_source", cr->GetCheckSource());
 | |
| 		ts = cr->GetExecutionEnd();
 | |
| 	}
 | |
| 
 | |
| 	if (cr && GetEnableSendPerfdata()) {
 | |
| 		Array::Ptr perfdata = cr->GetPerformanceData();
 | |
| 
 | |
| 		if (perfdata) {
 | |
| 			ObjectLock olock(perfdata);
 | |
| 			for (const Value& val : perfdata) {
 | |
| 				PerfdataValue::Ptr pdv;
 | |
| 
 | |
| 				if (val.IsObjectType<PerfdataValue>())
 | |
| 					pdv = val;
 | |
| 				else {
 | |
| 					try {
 | |
| 						pdv = PerfdataValue::Parse(val);
 | |
| 					} catch (const std::exception&) {
 | |
| 						Log(LogWarning, "GelfWriter")
 | |
| 							<< "Ignoring invalid perfdata for checkable '"
 | |
| 							<< checkable->GetName() << "' and command '"
 | |
| 							<< checkCommand->GetName() << "' with value: " << val;
 | |
| 						continue;
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				String escaped_key = pdv->GetLabel();
 | |
| 				boost::replace_all(escaped_key, " ", "_");
 | |
| 				boost::replace_all(escaped_key, ".", "_");
 | |
| 				boost::replace_all(escaped_key, "\\", "_");
 | |
| 				boost::algorithm::replace_all(escaped_key, "::", ".");
 | |
| 
 | |
| 				fields->Set("_" + escaped_key, pdv->GetValue());
 | |
| 
 | |
| 				if (!pdv->GetMin().IsEmpty())
 | |
| 					fields->Set("_" + escaped_key + "_min", pdv->GetMin());
 | |
| 				if (!pdv->GetMax().IsEmpty())
 | |
| 					fields->Set("_" + escaped_key + "_max", pdv->GetMax());
 | |
| 				if (!pdv->GetWarn().IsEmpty())
 | |
| 					fields->Set("_" + escaped_key + "_warn", pdv->GetWarn());
 | |
| 				if (!pdv->GetCrit().IsEmpty())
 | |
| 					fields->Set("_" + escaped_key + "_crit", pdv->GetCrit());
 | |
| 
 | |
| 				if (!pdv->GetUnit().IsEmpty())
 | |
| 					fields->Set("_" + escaped_key + "_unit", pdv->GetUnit());
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
 | |
| }
 | |
| 
 | |
| void GelfWriter::NotificationToUserHandler(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
 | |
| 	const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
 | |
| 	const String& author, const String& commentText, const String& commandName)
 | |
| {
 | |
| 	if (IsPaused())
 | |
| 		return;
 | |
| 
 | |
| 	m_WorkQueue.Enqueue([this, notification, checkable, user, notificationType, cr, author, commentText, commandName]() {
 | |
| 		NotificationToUserHandlerInternal(notification, checkable, user, notificationType, cr, author, commentText, commandName);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| void GelfWriter::NotificationToUserHandlerInternal(const Notification::Ptr& notification, const Checkable::Ptr& checkable,
 | |
| 	const User::Ptr& user, NotificationType notificationType, CheckResult::Ptr const& cr,
 | |
| 	const String& author, const String& commentText, const String& commandName)
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	CONTEXT("GELF Processing notification to all users '" + checkable->GetName() + "'");
 | |
| 
 | |
| 	Log(LogDebug, "GelfWriter")
 | |
| 		<< "Processing notification for '" << checkable->GetName() << "'";
 | |
| 
 | |
| 	Host::Ptr host;
 | |
| 	Service::Ptr service;
 | |
| 	tie(host, service) = GetHostService(checkable);
 | |
| 
 | |
| 	String notificationTypeString = Notification::NotificationTypeToStringCompat(notificationType); //TODO: Change that to our own types.
 | |
| 
 | |
| 	String authorComment = "";
 | |
| 
 | |
| 	if (notificationType == NotificationCustom || notificationType == NotificationAcknowledgement) {
 | |
| 		authorComment = author + ";" + commentText;
 | |
| 	}
 | |
| 
 | |
| 	String output;
 | |
| 	double ts = Utility::GetTime();
 | |
| 
 | |
| 	if (cr) {
 | |
| 		output = CompatUtility::GetCheckResultOutput(cr);
 | |
| 		ts = cr->GetExecutionEnd();
 | |
| 	}
 | |
| 
 | |
| 	Dictionary::Ptr fields = new Dictionary();
 | |
| 
 | |
| 	if (service) {
 | |
| 		fields->Set("_type", "SERVICE NOTIFICATION");
 | |
| 		//TODO: fix this to _service_name
 | |
| 		fields->Set("_service", service->GetShortName());
 | |
| 		fields->Set("short_message", output);
 | |
| 	} else {
 | |
| 		fields->Set("_type", "HOST NOTIFICATION");
 | |
| 		fields->Set("short_message", output);
 | |
| 	}
 | |
| 
 | |
| 	fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
 | |
| 
 | |
| 	fields->Set("_hostname", host->GetName());
 | |
| 	fields->Set("_command", commandName);
 | |
| 	fields->Set("_notification_type", notificationTypeString);
 | |
| 	fields->Set("_comment", authorComment);
 | |
| 
 | |
| 	CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
 | |
| 
 | |
| 	if (commandObj)
 | |
| 		fields->Set("_check_command", commandObj->GetName());
 | |
| 
 | |
| 	SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
 | |
| }
 | |
| 
 | |
| void GelfWriter::StateChangeHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
 | |
| {
 | |
| 	if (IsPaused())
 | |
| 		return;
 | |
| 
 | |
| 	m_WorkQueue.Enqueue([this, checkable, cr, type]() { StateChangeHandlerInternal(checkable, cr, type); });
 | |
| }
 | |
| 
 | |
| void GelfWriter::StateChangeHandlerInternal(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, StateType type)
 | |
| {
 | |
| 	AssertOnWorkQueue();
 | |
| 
 | |
| 	CONTEXT("GELF Processing state change '" + checkable->GetName() + "'");
 | |
| 
 | |
| 	Log(LogDebug, "GelfWriter")
 | |
| 		<< "Processing state change for '" << checkable->GetName() << "'";
 | |
| 
 | |
| 	Host::Ptr host;
 | |
| 	Service::Ptr service;
 | |
| 	tie(host, service) = GetHostService(checkable);
 | |
| 
 | |
| 	Dictionary::Ptr fields = new Dictionary();
 | |
| 
 | |
| 	fields->Set("_state", service ? Service::StateToString(service->GetState()) : Host::StateToString(host->GetState()));
 | |
| 	fields->Set("_type", "STATE CHANGE");
 | |
| 	fields->Set("_current_check_attempt", checkable->GetCheckAttempt());
 | |
| 	fields->Set("_max_check_attempts", checkable->GetMaxCheckAttempts());
 | |
| 	fields->Set("_hostname", host->GetName());
 | |
| 
 | |
| 	if (service) {
 | |
| 		fields->Set("_service_name", service->GetShortName());
 | |
| 		fields->Set("_service_state", Service::StateToString(service->GetState()));
 | |
| 		fields->Set("_last_state", service->GetLastState());
 | |
| 		fields->Set("_last_hard_state", service->GetLastHardState());
 | |
| 	} else {
 | |
| 		fields->Set("_last_state", host->GetLastState());
 | |
| 		fields->Set("_last_hard_state", host->GetLastHardState());
 | |
| 	}
 | |
| 
 | |
| 	CheckCommand::Ptr commandObj = checkable->GetCheckCommand();
 | |
| 
 | |
| 	if (commandObj)
 | |
| 		fields->Set("_check_command", commandObj->GetName());
 | |
| 
 | |
| 	double ts = Utility::GetTime();
 | |
| 
 | |
| 	if (cr) {
 | |
| 		fields->Set("short_message", CompatUtility::GetCheckResultOutput(cr));
 | |
| 		fields->Set("full_message", cr->GetOutput());
 | |
| 		fields->Set("_check_source", cr->GetCheckSource());
 | |
| 		ts = cr->GetExecutionEnd();
 | |
| 	}
 | |
| 
 | |
| 	SendLogMessage(checkable, ComposeGelfMessage(fields, GetSource(), ts));
 | |
| }
 | |
| 
 | |
| String GelfWriter::ComposeGelfMessage(const Dictionary::Ptr& fields, const String& source, double ts)
 | |
| {
 | |
| 	fields->Set("version", "1.1");
 | |
| 	fields->Set("host", source);
 | |
| 	fields->Set("timestamp", ts);
 | |
| 
 | |
| 	return JsonEncode(fields);
 | |
| }
 | |
| 
 | |
| void GelfWriter::SendLogMessage(const Checkable::Ptr& checkable, const String& gelfMessage)
 | |
| {
 | |
| 	std::ostringstream msgbuf;
 | |
| 	msgbuf << gelfMessage;
 | |
| 	msgbuf << '\0';
 | |
| 
 | |
| 	String log = msgbuf.str();
 | |
| 
 | |
| 	ObjectLock olock(this);
 | |
| 
 | |
| 	if (!GetConnected())
 | |
| 		return;
 | |
| 
 | |
| 	try {
 | |
| 		Log(LogDebug, "GelfWriter")
 | |
| 			<< "Checkable '" << checkable->GetName() << "' sending message '" << log << "'.";
 | |
| 
 | |
| 		if (m_Stream.first) {
 | |
| 			boost::asio::write(*m_Stream.first, boost::asio::buffer(msgbuf.str()));
 | |
| 			m_Stream.first->flush();
 | |
| 		} else {
 | |
| 			boost::asio::write(*m_Stream.second, boost::asio::buffer(msgbuf.str()));
 | |
| 			m_Stream.second->flush();
 | |
| 		}
 | |
| 	} catch (const std::exception& ex) {
 | |
| 		Log(LogCritical, "GelfWriter")
 | |
| 			<< "Cannot write to TCP socket on host '" << GetHost() << "' port '" << GetPort() << "'.";
 | |
| 
 | |
| 		throw ex;
 | |
| 	}
 | |
| }
 |