/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #ifndef REDISCONNECTION_H #define REDISCONNECTION_H #include "base/array.hpp" #include "base/atomic.hpp" #include "base/convert.hpp" #include "base/io-engine.hpp" #include "base/object.hpp" #include "base/ringbuffer.hpp" #include "base/shared.hpp" #include "base/string.hpp" #include "base/tlsstream.hpp" #include "base/value.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace icinga { /** * An Async Redis connection. * * @ingroup icingadb */ class RedisConnection final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisConnection); typedef std::vector Query; typedef std::vector Queries; typedef Value Reply; typedef std::vector Replies; /** * Redis query priorities, highest first. * * @ingroup icingadb */ enum class QueryPriority : unsigned char { Heartbeat, Config, State, History, CheckResult, SyncConnection = 255 }; RedisConnection(const String& host, int port, const String& path, const String& password, int db, bool useTls, bool insecure, const String& certPath, const String& keyPath, const String& caPath, const String& crlPath, const String& tlsProtocolmin, const String& cipherList, DebugInfo di, const Ptr& parent = nullptr); void UpdateTLSContext(); void Start(); bool IsConnected(); void FireAndForgetQuery(Query query, QueryPriority priority); void FireAndForgetQueries(Queries queries, QueryPriority priority); Reply GetResultOfQuery(Query query, QueryPriority priority); Replies GetResultsOfQueries(Queries queries, QueryPriority priority); void EnqueueCallback(const std::function& callback, QueryPriority priority); void Sync(); void SuppressQueryKind(QueryPriority kind); void UnsuppressQueryKind(QueryPriority kind); void SetConnectedCallback(std::function callback); private: /** * What to do with the responses to Redis queries. * * @ingroup icingadb */ enum class ResponseAction : unsigned char { Ignore, // discard Deliver, // submit to the requestor DeliverBulk // submit multiple responses to the requestor at once }; /** * What to do with how many responses to Redis queries. * * @ingroup icingadb */ struct FutureResponseAction { size_t Amount; ResponseAction Action; }; /** * Something to be send to Redis. * * @ingroup icingadb */ struct WriteQueueItem { Shared::Ptr FireAndForgetQuery; Shared::Ptr FireAndForgetQueries; Shared>>::Ptr GetResultOfQuery; Shared>>::Ptr GetResultsOfQueries; std::function Callback; }; typedef boost::asio::ip::tcp Tcp; typedef boost::asio::local::stream_protocol Unix; typedef boost::asio::buffered_stream TcpConn; typedef boost::asio::buffered_stream UnixConn; Shared::Ptr m_TLSContext; template static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); template static std::vector ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); template static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); static boost::regex m_ErrAuth; RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db, bool useTls, bool insecure, String certPath, String keyPath, String caPath, String crlPath, String tlsProtocolmin, String cipherList, DebugInfo di, const Ptr& parent); void Connect(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc); void WriteLoop(boost::asio::yield_context& yc); void LogStats(boost::asio::yield_context& yc); void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); template Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); template void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); void IncreasePendingQueries(int count); void DecreasePendingQueries(int count); template void Handshake(StreamPtr& stream, boost::asio::yield_context& yc); String m_Path; String m_Host; int m_Port; String m_Password; int m_DbIndex; String m_CertPath; String m_KeyPath; bool m_Insecure; String m_CaPath; String m_CrlPath; String m_TlsProtocolmin; String m_CipherList; DebugInfo m_DebugInfo; boost::asio::io_context::strand m_Strand; Shared::Ptr m_TcpConn; Shared::Ptr m_UnixConn; Shared::Ptr m_TlsConn; Atomic m_Connecting, m_Connected, m_Started; struct { // Items to be send to Redis std::map> Writes; // Requestors, each waiting for a single response std::queue> ReplyPromises; // Requestors, each waiting for multiple responses at once std::queue> RepliesPromises; // Metadata about all of the above std::queue FutureResponseActions; } m_Queues; // Kinds of queries not to actually send yet std::set m_SuppressedQueryKinds; // Indicate that there's something to send/receive AsioConditionVariable m_QueuedWrites, m_QueuedReads; std::function m_ConnectedCallback; // Stats RingBuffer m_InputQueries{10}; RingBuffer m_OutputQueries{10}; int m_PendingQueries{0}; boost::asio::deadline_timer m_LogStatsTimer; Ptr m_Parent; }; /** * An error response from the Redis server. * * @ingroup icingadb */ class RedisError final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisError); inline RedisError(String message) : m_Message(std::move(message)) { } inline const String& GetMessage() { return m_Message; } private: String m_Message; }; /** * Thrown if the connection to the Redis server has already been lost. * * @ingroup icingadb */ class RedisDisconnected : public std::runtime_error { public: inline RedisDisconnected() : runtime_error("") { } }; /** * Thrown on malformed Redis server responses. * * @ingroup icingadb */ class RedisProtocolError : public std::runtime_error { protected: inline RedisProtocolError() : runtime_error("") { } }; /** * Thrown on malformed types in Redis server responses. * * @ingroup icingadb */ class BadRedisType : public RedisProtocolError { public: inline BadRedisType(char type) : m_What{type, 0} { } virtual const char * what() const noexcept override { return m_What; } private: char m_What[2]; }; /** * Thrown on malformed ints in Redis server responses. * * @ingroup icingadb */ class BadRedisInt : public RedisProtocolError { public: inline BadRedisInt(std::vector intStr) : m_What(std::move(intStr)) { m_What.emplace_back(0); } virtual const char * what() const noexcept override { return m_What.data(); } private: std::vector m_What; }; /** * Read a Redis server response from stream * * @param stream Redis server connection * * @return The response */ template RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { return ReadRESP(*strm, yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } throw; } } /** * Write a Redis query to stream * * @param stream Redis server connection * @param query Redis query */ template void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { WriteRESP(*strm, query, yc); strm->async_flush(yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } throw; } } /** * Initialize a Redis stream * * @param stream Redis server connection * @param query Redis query */ template void RedisConnection::Handshake(StreamPtr& strm, boost::asio::yield_context& yc) { if (m_Password.IsEmpty() && !m_DbIndex) { // Trigger NOAUTH WriteRESP(*strm, {"PING"}, yc); } else { if (!m_Password.IsEmpty()) { WriteRESP(*strm, {"AUTH", m_Password}, yc); } if (m_DbIndex) { WriteRESP(*strm, {"SELECT", Convert::ToString(m_DbIndex)}, yc); } } strm->async_flush(yc); if (m_Password.IsEmpty() && !m_DbIndex) { Reply pong (ReadRESP(*strm, yc)); if (pong.IsObjectType()) { // Likely NOAUTH BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(pong)->GetMessage())); } } else { if (!m_Password.IsEmpty()) { Reply auth (ReadRESP(*strm, yc)); if (auth.IsObjectType()) { auto& authErr (RedisError::Ptr(auth)->GetMessage().GetData()); boost::smatch what; if (boost::regex_search(authErr, what, m_ErrAuth)) { Log(LogWarning, "IcingaDB") << authErr; } else { // Likely WRONGPASS BOOST_THROW_EXCEPTION(std::runtime_error(authErr)); } } } if (m_DbIndex) { Reply select (ReadRESP(*strm, yc)); if (select.IsObjectType()) { // Likely NOAUTH or ERR DB BOOST_THROW_EXCEPTION(std::runtime_error(RedisError::Ptr(select)->GetMessage())); } } } } /** * Read a Redis protocol value from stream * * @param stream Redis server connection * * @return The value */ template Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; char type = 0; asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); switch (type) { case '+': { auto buf (ReadLine(stream, yc)); return String(buf.begin(), buf.end()); } case '-': { auto buf (ReadLine(stream, yc)); return new RedisError(String(buf.begin(), buf.end())); } case ':': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } return (double)i; } case '$': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } if (i < 0) { return Value(); } buf.clear(); buf.insert(buf.end(), i, 0); asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); { char crlf[2]; asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); } return String(buf.begin(), buf.end()); } case '*': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } Array::Ptr arr = new Array(); if (i < 0) { i = 0; } arr->Reserve(i); for (; i; --i) { arr->Add(ReadRESP(stream, yc)); } return arr; } default: throw BadRedisType(type); } } /** * Read from stream until \r\n * * @param stream Redis server connection * @param hint Expected amount of data * * @return Read data ex. \r\n */ template std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) { namespace asio = boost::asio; std::vector line; line.reserve(hint); char next = 0; asio::mutable_buffer buf (&next, 1); for (;;) { asio::async_read(stream, buf, yc); if (next == '\r') { asio::async_read(stream, buf, yc); return std::move(line); } line.emplace_back(next); } } /** * Write a Redis protocol value to stream * * @param stream Redis server connection * @param query Redis protocol value */ template void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; asio::streambuf writeBuffer; std::ostream msg(&writeBuffer); msg << "*" << query.size() << "\r\n"; for (auto& arg : query) { msg << "$" << arg.GetLength() << "\r\n" << arg << "\r\n"; } asio::async_write(stream, writeBuffer, yc); } } #endif //REDISCONNECTION_H