mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-10-31 19:24:25 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			774 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			774 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
| /* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
 | |
| 
 | |
| #include "icingadb/redisconnection.hpp"
 | |
| #include "base/array.hpp"
 | |
| #include "base/convert.hpp"
 | |
| #include "base/defer.hpp"
 | |
| #include "base/exception.hpp"
 | |
| #include "base/io-engine.hpp"
 | |
| #include "base/logger.hpp"
 | |
| #include "base/objectlock.hpp"
 | |
| #include "base/string.hpp"
 | |
| #include "base/tcpsocket.hpp"
 | |
| #include "base/tlsutility.hpp"
 | |
| #include "base/utility.hpp"
 | |
| #include <boost/asio.hpp>
 | |
| #include <boost/coroutine/exceptions.hpp>
 | |
| #include <boost/date_time/posix_time/posix_time_duration.hpp>
 | |
| #include <boost/utility/string_view.hpp>
 | |
| #include <boost/variant/get.hpp>
 | |
| #include <exception>
 | |
| #include <future>
 | |
| #include <iterator>
 | |
| #include <memory>
 | |
| #include <openssl/ssl.h>
 | |
| #include <openssl/x509_vfy.h>
 | |
| #include <utility>
 | |
| 
 | |
| using namespace icinga;
 | |
| namespace asio = boost::asio;
 | |
| 
 | |
| boost::regex RedisConnection::m_ErrAuth ("\\AERR AUTH ");
 | |
| 
 | |
| RedisConnection::RedisConnection(const String& host, int port, const String& path, const String& password, int db,
 | |
| 	bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath,
 | |
| 	const String& tlsProtocolmin, const String& cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
 | |
| 	: RedisConnection(IoEngine::Get().GetIoContext(), host, port, path, password, db,
 | |
| 	  useTls, insecure, certPath, keyPath, caPath, crlPath, tlsProtocolmin, cipherList, connectTimeout, std::move(di), parent)
 | |
| {
 | |
| }
 | |
| 
 | |
| RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password,
 | |
| 	int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath,
 | |
| 	String tlsProtocolmin, String cipherList, double connectTimeout, DebugInfo di, const RedisConnection::Ptr& parent)
 | |
| 	: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)),
 | |
| 	  m_DbIndex(db), m_CertPath(std::move(certPath)), m_KeyPath(std::move(keyPath)), m_Insecure(insecure),
 | |
| 	  m_CaPath(std::move(caPath)), m_CrlPath(std::move(crlPath)), m_TlsProtocolmin(std::move(tlsProtocolmin)),
 | |
| 	  m_CipherList(std::move(cipherList)), m_ConnectTimeout(connectTimeout), m_DebugInfo(std::move(di)), m_Connecting(false), m_Connected(false),
 | |
| 	  m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io), m_LogStatsTimer(io), m_Parent(parent)
 | |
| {
 | |
| 	if (useTls && m_Path.IsEmpty()) {
 | |
| 		UpdateTLSContext();
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void RedisConnection::UpdateTLSContext()
 | |
| {
 | |
| 	m_TLSContext = SetupSslContext(m_CertPath, m_KeyPath, m_CaPath,
 | |
| 		m_CrlPath, m_CipherList, m_TlsProtocolmin, m_DebugInfo);
 | |
| }
 | |
| 
 | |
| void RedisConnection::Start()
 | |
| {
 | |
| 	if (!m_Started.exchange(true)) {
 | |
| 		Ptr keepAlive (this);
 | |
| 
 | |
| 		IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
 | |
| 		IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
 | |
| 
 | |
| 		if (!m_Parent) {
 | |
| 			IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { LogStats(yc); });
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if (!m_Connecting.exchange(true)) {
 | |
| 		Ptr keepAlive (this);
 | |
| 
 | |
| 		IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
 | |
| 	}
 | |
| }
 | |
| 
 | |
| bool RedisConnection::IsConnected() {
 | |
| 	return m_Connected.load();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Append a Redis query to a log message
 | |
|  *
 | |
|  * @param query Redis query
 | |
|  * @param msg Log message
 | |
|  */
 | |
| static inline
 | |
| void LogQuery(RedisConnection::Query& query, Log& msg)
 | |
| {
 | |
| 	int i = 0;
 | |
| 
 | |
| 	for (auto& arg : query) {
 | |
| 		if (++i == 8) {
 | |
| 			msg << " ...";
 | |
| 			break;
 | |
| 		}
 | |
| 
 | |
| 		if (arg.GetLength() > 64) {
 | |
| 			msg << " '" << arg.SubStr(0, 61) << "...'";
 | |
| 		} else {
 | |
| 			msg << " '" << arg << '\'';
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Queue a Redis query for sending
 | |
|  *
 | |
|  * @param query Redis query
 | |
|  * @param priority The query's priority
 | |
|  */
 | |
| void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
 | |
| {
 | |
| 	if (LogDebug >= Logger::GetMinLogSeverity()) {
 | |
| 		Log msg (LogDebug, "IcingaDB", "Firing and forgetting query:");
 | |
| 		LogQuery(query, msg);
 | |
| 	}
 | |
| 
 | |
| 	auto item (Shared<Query>::Make(std::move(query)));
 | |
| 	auto ctime (Utility::GetTime());
 | |
| 
 | |
| 	asio::post(m_Strand, [this, item, priority, ctime, affects]() {
 | |
| 		m_Queues.Writes[priority].emplace(WriteQueueItem{item, nullptr, nullptr, nullptr, nullptr, ctime, affects});
 | |
| 		m_QueuedWrites.Set();
 | |
| 		IncreasePendingQueries(1);
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Queue Redis queries for sending
 | |
|  *
 | |
|  * @param queries Redis queries
 | |
|  * @param priority The queries' priority
 | |
|  */
 | |
| void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
 | |
| {
 | |
| 	if (LogDebug >= Logger::GetMinLogSeverity()) {
 | |
| 		for (auto& query : queries) {
 | |
| 			Log msg(LogDebug, "IcingaDB", "Firing and forgetting query:");
 | |
| 			LogQuery(query, msg);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	auto item (Shared<Queries>::Make(std::move(queries)));
 | |
| 	auto ctime (Utility::GetTime());
 | |
| 
 | |
| 	asio::post(m_Strand, [this, item, priority, ctime, affects]() {
 | |
| 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, item, nullptr, nullptr, nullptr, ctime, affects});
 | |
| 		m_QueuedWrites.Set();
 | |
| 		IncreasePendingQueries(item->size());
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Queue a Redis query for sending, wait for the response and return (or throw) it
 | |
|  *
 | |
|  * @param query Redis query
 | |
|  * @param priority The query's priority
 | |
|  *
 | |
|  * @return The response
 | |
|  */
 | |
| RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority, QueryAffects affects)
 | |
| {
 | |
| 	if (LogDebug >= Logger::GetMinLogSeverity()) {
 | |
| 		Log msg (LogDebug, "IcingaDB", "Executing query:");
 | |
| 		LogQuery(query, msg);
 | |
| 	}
 | |
| 
 | |
| 	std::promise<Reply> promise;
 | |
| 	auto future (promise.get_future());
 | |
| 	auto item (Shared<std::pair<Query, std::promise<Reply>>>::Make(std::move(query), std::move(promise)));
 | |
| 	auto ctime (Utility::GetTime());
 | |
| 
 | |
| 	asio::post(m_Strand, [this, item, priority, ctime, affects]() {
 | |
| 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, item, nullptr, nullptr, ctime, affects});
 | |
| 		m_QueuedWrites.Set();
 | |
| 		IncreasePendingQueries(1);
 | |
| 	});
 | |
| 
 | |
| 	item = nullptr;
 | |
| 	future.wait();
 | |
| 	return future.get();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Queue Redis queries for sending, wait for the responses and return (or throw) them
 | |
|  *
 | |
|  * @param queries Redis queries
 | |
|  * @param priority The queries' priority
 | |
|  *
 | |
|  * @return The responses
 | |
|  */
 | |
| RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority, QueryAffects affects)
 | |
| {
 | |
| 	if (LogDebug >= Logger::GetMinLogSeverity()) {
 | |
| 		for (auto& query : queries) {
 | |
| 			Log msg(LogDebug, "IcingaDB", "Executing query:");
 | |
| 			LogQuery(query, msg);
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	std::promise<Replies> promise;
 | |
| 	auto future (promise.get_future());
 | |
| 	auto item (Shared<std::pair<Queries, std::promise<Replies>>>::Make(std::move(queries), std::move(promise)));
 | |
| 	auto ctime (Utility::GetTime());
 | |
| 
 | |
| 	asio::post(m_Strand, [this, item, priority, ctime, affects]() {
 | |
| 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, item, nullptr, ctime, affects});
 | |
| 		m_QueuedWrites.Set();
 | |
| 		IncreasePendingQueries(item->first.size());
 | |
| 	});
 | |
| 
 | |
| 	item = nullptr;
 | |
| 	future.wait();
 | |
| 	return future.get();
 | |
| }
 | |
| 
 | |
| void RedisConnection::EnqueueCallback(const std::function<void(boost::asio::yield_context&)>& callback, RedisConnection::QueryPriority priority)
 | |
| {
 | |
| 	auto ctime (Utility::GetTime());
 | |
| 
 | |
| 	asio::post(m_Strand, [this, callback, priority, ctime]() {
 | |
| 		m_Queues.Writes[priority].emplace(WriteQueueItem{nullptr, nullptr, nullptr, nullptr, callback, ctime});
 | |
| 		m_QueuedWrites.Set();
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Puts a no-op command with a result at the end of the queue and wait for the result,
 | |
|  * i.e. for everything enqueued to be processed by the server.
 | |
|  *
 | |
|  * @ingroup icingadb
 | |
|  */
 | |
| void RedisConnection::Sync()
 | |
| {
 | |
| 	GetResultOfQuery({"PING"}, RedisConnection::QueryPriority::SyncConnection);
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Get the enqueue time of the oldest still queued Redis query
 | |
|  *
 | |
|  * @return *nix timestamp or 0
 | |
|  */
 | |
| double RedisConnection::GetOldestPendingQueryTs()
 | |
| {
 | |
| 	auto promise (Shared<std::promise<double>>::Make());
 | |
| 	auto future (promise->get_future());
 | |
| 
 | |
| 	asio::post(m_Strand, [this, promise]() {
 | |
| 		double oldest = 0;
 | |
| 
 | |
| 		for (auto& queue : m_Queues.Writes) {
 | |
| 			if (m_SuppressedQueryKinds.find(queue.first) == m_SuppressedQueryKinds.end() && !queue.second.empty()) {
 | |
| 				auto ctime (queue.second.front().CTime);
 | |
| 
 | |
| 				if (ctime < oldest || oldest == 0) {
 | |
| 					oldest = ctime;
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		promise->set_value(oldest);
 | |
| 	});
 | |
| 
 | |
| 	future.wait();
 | |
| 	return future.get();
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Mark kind as kind of queries not to actually send yet
 | |
|  *
 | |
|  * @param kind Query kind
 | |
|  */
 | |
| void RedisConnection::SuppressQueryKind(RedisConnection::QueryPriority kind)
 | |
| {
 | |
| 	asio::post(m_Strand, [this, kind]() { m_SuppressedQueryKinds.emplace(kind); });
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Unmark kind as kind of queries not to actually send yet
 | |
|  *
 | |
|  * @param kind Query kind
 | |
|  */
 | |
| void RedisConnection::UnsuppressQueryKind(RedisConnection::QueryPriority kind)
 | |
| {
 | |
| 	asio::post(m_Strand, [this, kind]() {
 | |
| 		m_SuppressedQueryKinds.erase(kind);
 | |
| 		m_QueuedWrites.Set();
 | |
| 	});
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Try to connect to Redis
 | |
|  */
 | |
| void RedisConnection::Connect(asio::yield_context& yc)
 | |
| {
 | |
| 	Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
 | |
| 
 | |
| 	boost::asio::deadline_timer timer (m_Strand.context());
 | |
| 
 | |
| 	auto waitForReadLoop ([this, &yc]() {
 | |
| 		while (!m_Queues.FutureResponseActions.empty()) {
 | |
| 			IoEngine::YieldCurrentCoroutine(yc);
 | |
| 		}
 | |
| 	});
 | |
| 
 | |
| 	for (;;) {
 | |
| 		try {
 | |
| 			if (m_Path.IsEmpty()) {
 | |
| 				if (m_TLSContext) {
 | |
| 					Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
 | |
| 						<< "Trying to connect to Redis server (async, TLS) on host '" << m_Host << ":" << m_Port << "'";
 | |
| 
 | |
| 					auto conn (Shared<AsioTlsStream>::Make(m_Strand.context(), *m_TLSContext, m_Host));
 | |
| 					auto& tlsConn (conn->next_layer());
 | |
| 					auto connectTimeout (MakeTimeout(conn));
 | |
| 					Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
 | |
| 
 | |
| 					icinga::Connect(conn->lowest_layer(), m_Host, Convert::ToString(m_Port), yc);
 | |
| 					tlsConn.async_handshake(tlsConn.client, yc);
 | |
| 
 | |
| 					if (!m_Insecure) {
 | |
| 						std::shared_ptr<X509> cert (tlsConn.GetPeerCertificate());
 | |
| 
 | |
| 						if (!cert) {
 | |
| 							BOOST_THROW_EXCEPTION(std::runtime_error(
 | |
| 								"Redis didn't present any TLS certificate."
 | |
| 							));
 | |
| 						}
 | |
| 
 | |
| 						if (!tlsConn.IsVerifyOK()) {
 | |
| 							BOOST_THROW_EXCEPTION(std::runtime_error(
 | |
| 								"TLS certificate validation failed: " + std::string(tlsConn.GetVerifyError())
 | |
| 							));
 | |
| 						}
 | |
| 					}
 | |
| 
 | |
| 					Handshake(conn, yc);
 | |
| 					waitForReadLoop();
 | |
| 					m_TlsConn = std::move(conn);
 | |
| 				} else {
 | |
| 					Log(m_Parent ? LogNotice : LogInformation, "IcingaDB")
 | |
| 						<< "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
 | |
| 
 | |
| 					auto conn (Shared<TcpConn>::Make(m_Strand.context()));
 | |
| 					auto connectTimeout (MakeTimeout(conn));
 | |
| 					Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
 | |
| 
 | |
| 					icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
 | |
| 					Handshake(conn, yc);
 | |
| 					waitForReadLoop();
 | |
| 					m_TcpConn = std::move(conn);
 | |
| 				}
 | |
| 			} else {
 | |
| 				Log(LogInformation, "IcingaDB")
 | |
| 					<< "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
 | |
| 
 | |
| 				auto conn (Shared<UnixConn>::Make(m_Strand.context()));
 | |
| 				auto connectTimeout (MakeTimeout(conn));
 | |
| 				Defer cancelTimeout ([&connectTimeout]() { connectTimeout->Cancel(); });
 | |
| 
 | |
| 				conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
 | |
| 				Handshake(conn, yc);
 | |
| 				waitForReadLoop();
 | |
| 				m_UnixConn = std::move(conn);
 | |
| 			}
 | |
| 
 | |
| 			m_Connected.store(true);
 | |
| 
 | |
| 			Log(m_Parent ? LogNotice : LogInformation, "IcingaDB", "Connected to Redis server");
 | |
| 
 | |
| 			// Operate on a copy so that the callback can set a new callback without destroying itself while running.
 | |
| 			auto callback (m_ConnectedCallback);
 | |
| 			if (callback) {
 | |
| 				callback(yc);
 | |
| 			}
 | |
| 
 | |
| 			break;
 | |
| 		} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 			throw;
 | |
| 		} catch (const std::exception& ex) {
 | |
| 			Log(LogCritical, "IcingaDB")
 | |
| 				<< "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
 | |
| 		}
 | |
| 
 | |
| 		timer.expires_from_now(boost::posix_time::seconds(5));
 | |
| 		timer.async_wait(yc);
 | |
| 	}
 | |
| 
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Actually receive the responses to the Redis queries send by WriteItem() and handle them
 | |
|  */
 | |
| void RedisConnection::ReadLoop(asio::yield_context& yc)
 | |
| {
 | |
| 	for (;;) {
 | |
| 		m_QueuedReads.Wait(yc);
 | |
| 
 | |
| 		while (!m_Queues.FutureResponseActions.empty()) {
 | |
| 			auto item (std::move(m_Queues.FutureResponseActions.front()));
 | |
| 			m_Queues.FutureResponseActions.pop();
 | |
| 
 | |
| 			switch (item.Action) {
 | |
| 				case ResponseAction::Ignore:
 | |
| 					try {
 | |
| 						for (auto i (item.Amount); i; --i) {
 | |
| 							ReadOne(yc);
 | |
| 						}
 | |
| 					} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 						throw;
 | |
| 					} catch (const std::exception& ex) {
 | |
| 						Log(LogCritical, "IcingaDB")
 | |
| 							<< "Error during receiving the response to a query which has been fired and forgotten: " << ex.what();
 | |
| 
 | |
| 						continue;
 | |
| 					} catch (...) {
 | |
| 						Log(LogCritical, "IcingaDB")
 | |
| 							<< "Error during receiving the response to a query which has been fired and forgotten";
 | |
| 
 | |
| 						continue;
 | |
| 					}
 | |
| 
 | |
| 					break;
 | |
| 				case ResponseAction::Deliver:
 | |
| 					for (auto i (item.Amount); i; --i) {
 | |
| 						auto promise (std::move(m_Queues.ReplyPromises.front()));
 | |
| 						m_Queues.ReplyPromises.pop();
 | |
| 
 | |
| 						Reply reply;
 | |
| 
 | |
| 						try {
 | |
| 							reply = ReadOne(yc);
 | |
| 						} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 							throw;
 | |
| 						} catch (...) {
 | |
| 							promise.set_exception(std::current_exception());
 | |
| 
 | |
| 							continue;
 | |
| 						}
 | |
| 
 | |
| 						promise.set_value(std::move(reply));
 | |
| 					}
 | |
| 
 | |
| 					break;
 | |
| 				case ResponseAction::DeliverBulk:
 | |
| 					{
 | |
| 						auto promise (std::move(m_Queues.RepliesPromises.front()));
 | |
| 						m_Queues.RepliesPromises.pop();
 | |
| 
 | |
| 						Replies replies;
 | |
| 						replies.reserve(item.Amount);
 | |
| 
 | |
| 						for (auto i (item.Amount); i; --i) {
 | |
| 							try {
 | |
| 								replies.emplace_back(ReadOne(yc));
 | |
| 							} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 								throw;
 | |
| 							} catch (...) {
 | |
| 								promise.set_exception(std::current_exception());
 | |
| 								break;
 | |
| 							}
 | |
| 						}
 | |
| 
 | |
| 						try {
 | |
| 							promise.set_value(std::move(replies));
 | |
| 						} catch (const std::future_error&) {
 | |
| 							// Complaint about the above op is not allowed
 | |
| 							// due to promise.set_exception() was already called
 | |
| 						}
 | |
| 					}
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		m_QueuedReads.Clear();
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}()
 | |
|  */
 | |
| void RedisConnection::WriteLoop(asio::yield_context& yc)
 | |
| {
 | |
| 	for (;;) {
 | |
| 		m_QueuedWrites.Wait(yc);
 | |
| 
 | |
| 	WriteFirstOfHighestPrio:
 | |
| 		for (auto& queue : m_Queues.Writes) {
 | |
| 			if (m_SuppressedQueryKinds.find(queue.first) != m_SuppressedQueryKinds.end() || queue.second.empty()) {
 | |
| 				continue;
 | |
| 			}
 | |
| 
 | |
| 			auto next (std::move(queue.second.front()));
 | |
| 			queue.second.pop();
 | |
| 
 | |
| 			WriteItem(yc, std::move(next));
 | |
| 
 | |
| 			goto WriteFirstOfHighestPrio;
 | |
| 		}
 | |
| 
 | |
| 		m_QueuedWrites.Clear();
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Periodically log current query performance
 | |
|  */
 | |
| void RedisConnection::LogStats(asio::yield_context& yc)
 | |
| {
 | |
| 	double lastMessage = 0;
 | |
| 
 | |
| 	m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
 | |
| 
 | |
| 	for (;;) {
 | |
| 		m_LogStatsTimer.async_wait(yc);
 | |
| 		m_LogStatsTimer.expires_from_now(boost::posix_time::seconds(10));
 | |
| 
 | |
| 		if (!IsConnected())
 | |
| 			continue;
 | |
| 
 | |
| 		auto now (Utility::GetTime());
 | |
| 		bool timeoutReached = now - lastMessage >= 5 * 60;
 | |
| 
 | |
| 		if (m_PendingQueries < 1 && !timeoutReached)
 | |
| 			continue;
 | |
| 
 | |
| 		auto output (round(m_OutputQueries.CalculateRate(now, 10)));
 | |
| 
 | |
| 		if (m_PendingQueries < output * 5 && !timeoutReached)
 | |
| 			continue;
 | |
| 
 | |
| 		Log(LogInformation, "IcingaDB")
 | |
| 			<< "Pending queries: " << m_PendingQueries << " (Input: "
 | |
| 			<< round(m_InputQueries.CalculateRate(now, 10)) << "/s; Output: " << output << "/s)";
 | |
| 
 | |
| 		lastMessage = now;
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send next and schedule receiving the response
 | |
|  *
 | |
|  * @param next Redis queries
 | |
|  */
 | |
| void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
 | |
| {
 | |
| 	if (next.FireAndForgetQuery) {
 | |
| 		auto& item (*next.FireAndForgetQuery);
 | |
| 		DecreasePendingQueries(1);
 | |
| 
 | |
| 		try {
 | |
| 			WriteOne(item, yc);
 | |
| 		} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 			throw;
 | |
| 		} catch (const std::exception& ex) {
 | |
| 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
 | |
| 			LogQuery(item, msg);
 | |
| 			msg << " which has been fired and forgotten: " << ex.what();
 | |
| 
 | |
| 			return;
 | |
| 		} catch (...) {
 | |
| 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
 | |
| 			LogQuery(item, msg);
 | |
| 			msg << " which has been fired and forgotten";
 | |
| 
 | |
| 			return;
 | |
| 		}
 | |
| 
 | |
| 		if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
 | |
| 			m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
 | |
| 		} else {
 | |
| 			++m_Queues.FutureResponseActions.back().Amount;
 | |
| 		}
 | |
| 
 | |
| 		m_QueuedReads.Set();
 | |
| 	}
 | |
| 
 | |
| 	if (next.FireAndForgetQueries) {
 | |
| 		auto& item (*next.FireAndForgetQueries);
 | |
| 		size_t i = 0;
 | |
| 
 | |
| 		DecreasePendingQueries(item.size());
 | |
| 
 | |
| 		try {
 | |
| 			for (auto& query : item) {
 | |
| 				WriteOne(query, yc);
 | |
| 				++i;
 | |
| 			}
 | |
| 		} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 			throw;
 | |
| 		} catch (const std::exception& ex) {
 | |
| 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
 | |
| 			LogQuery(item[i], msg);
 | |
| 			msg << " which has been fired and forgotten: " << ex.what();
 | |
| 
 | |
| 			return;
 | |
| 		} catch (...) {
 | |
| 			Log msg (LogCritical, "IcingaDB", "Error during sending query");
 | |
| 			LogQuery(item[i], msg);
 | |
| 			msg << " which has been fired and forgotten";
 | |
| 
 | |
| 			return;
 | |
| 		}
 | |
| 
 | |
| 		if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
 | |
| 			m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
 | |
| 		} else {
 | |
| 			m_Queues.FutureResponseActions.back().Amount += item.size();
 | |
| 		}
 | |
| 
 | |
| 		m_QueuedReads.Set();
 | |
| 	}
 | |
| 
 | |
| 	if (next.GetResultOfQuery) {
 | |
| 		auto& item (*next.GetResultOfQuery);
 | |
| 		DecreasePendingQueries(1);
 | |
| 
 | |
| 		try {
 | |
| 			WriteOne(item.first, yc);
 | |
| 		} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 			throw;
 | |
| 		} catch (...) {
 | |
| 			item.second.set_exception(std::current_exception());
 | |
| 
 | |
| 			return;
 | |
| 		}
 | |
| 
 | |
| 		m_Queues.ReplyPromises.emplace(std::move(item.second));
 | |
| 
 | |
| 		if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
 | |
| 			m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
 | |
| 		} else {
 | |
| 			++m_Queues.FutureResponseActions.back().Amount;
 | |
| 		}
 | |
| 
 | |
| 		m_QueuedReads.Set();
 | |
| 	}
 | |
| 
 | |
| 	if (next.GetResultsOfQueries) {
 | |
| 		auto& item (*next.GetResultsOfQueries);
 | |
| 		DecreasePendingQueries(item.first.size());
 | |
| 
 | |
| 		try {
 | |
| 			for (auto& query : item.first) {
 | |
| 				WriteOne(query, yc);
 | |
| 			}
 | |
| 		} catch (const boost::coroutines::detail::forced_unwind&) {
 | |
| 			throw;
 | |
| 		} catch (...) {
 | |
| 			item.second.set_exception(std::current_exception());
 | |
| 
 | |
| 			return;
 | |
| 		}
 | |
| 
 | |
| 		m_Queues.RepliesPromises.emplace(std::move(item.second));
 | |
| 		m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
 | |
| 
 | |
| 		m_QueuedReads.Set();
 | |
| 	}
 | |
| 
 | |
| 	if (next.Callback) {
 | |
| 		next.Callback(yc);
 | |
| 	}
 | |
| 
 | |
| 	RecordAffected(next.Affects, Utility::GetTime());
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Receive the response to a Redis query
 | |
|  *
 | |
|  * @return The response
 | |
|  */
 | |
| RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
 | |
| {
 | |
| 	if (m_Path.IsEmpty()) {
 | |
| 		if (m_TLSContext) {
 | |
| 			return ReadOne(m_TlsConn, yc);
 | |
| 		} else {
 | |
| 			return ReadOne(m_TcpConn, yc);
 | |
| 		}
 | |
| 	} else {
 | |
| 		return ReadOne(m_UnixConn, yc);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Send query
 | |
|  *
 | |
|  * @param query Redis query
 | |
|  */
 | |
| void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
 | |
| {
 | |
| 	if (m_Path.IsEmpty()) {
 | |
| 		if (m_TLSContext) {
 | |
| 			WriteOne(m_TlsConn, query, yc);
 | |
| 		} else {
 | |
| 			WriteOne(m_TcpConn, query, yc);
 | |
| 		}
 | |
| 	} else {
 | |
| 		WriteOne(m_UnixConn, query, yc);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| /**
 | |
|  * Specify a callback that is run each time a connection is successfully established
 | |
|  *
 | |
|  * The callback is executed from a Boost.Asio coroutine and should therefore not perform blocking operations.
 | |
|  *
 | |
|  * @param callback Callback to execute
 | |
|  */
 | |
| void RedisConnection::SetConnectedCallback(std::function<void(asio::yield_context& yc)> callback) {
 | |
| 	m_ConnectedCallback = std::move(callback);
 | |
| }
 | |
| 
 | |
| int RedisConnection::GetQueryCount(RingBuffer::SizeType span)
 | |
| {
 | |
| 	return m_OutputQueries.UpdateAndGetValues(Utility::GetTime(), span);
 | |
| }
 | |
| 
 | |
| void RedisConnection::IncreasePendingQueries(int count)
 | |
| {
 | |
| 	if (m_Parent) {
 | |
| 		auto parent (m_Parent);
 | |
| 
 | |
| 		asio::post(parent->m_Strand, [parent, count]() {
 | |
| 			parent->IncreasePendingQueries(count);
 | |
| 		});
 | |
| 	} else {
 | |
| 		m_PendingQueries += count;
 | |
| 		m_InputQueries.InsertValue(Utility::GetTime(), count);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void RedisConnection::DecreasePendingQueries(int count)
 | |
| {
 | |
| 	if (m_Parent) {
 | |
| 		auto parent (m_Parent);
 | |
| 
 | |
| 		asio::post(parent->m_Strand, [parent, count]() {
 | |
| 			parent->DecreasePendingQueries(count);
 | |
| 		});
 | |
| 	} else {
 | |
| 		m_PendingQueries -= count;
 | |
| 		m_OutputQueries.InsertValue(Utility::GetTime(), count);
 | |
| 	}
 | |
| }
 | |
| 
 | |
| void RedisConnection::RecordAffected(RedisConnection::QueryAffects affected, double when)
 | |
| {
 | |
| 	if (m_Parent) {
 | |
| 		auto parent (m_Parent);
 | |
| 
 | |
| 		asio::post(parent->m_Strand, [parent, affected, when]() {
 | |
| 			parent->RecordAffected(affected, when);
 | |
| 		});
 | |
| 	} else {
 | |
| 		if (affected.Config) {
 | |
| 			m_WrittenConfig.InsertValue(when, affected.Config);
 | |
| 		}
 | |
| 
 | |
| 		if (affected.State) {
 | |
| 			m_WrittenState.InsertValue(when, affected.State);
 | |
| 		}
 | |
| 
 | |
| 		if (affected.History) {
 | |
| 			m_WrittenHistory.InsertValue(when, affected.History);
 | |
| 		}
 | |
| 	}
 | |
| }
 |