/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ #ifndef REDISCONNECTION_H #define REDISCONNECTION_H #include "base/array.hpp" #include "base/atomic.hpp" #include "base/io-engine.hpp" #include "base/object.hpp" #include "base/shared.hpp" #include "base/string.hpp" #include "base/value.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace icinga { /** * An Async Redis connection. * * @ingroup icingadb */ class RedisConnection final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisConnection); typedef std::vector Query; typedef std::vector Queries; typedef Value Reply; typedef std::vector Replies; /** * Redis query priorities, highest first. * * @ingroup icingadb */ enum class QueryPriority : unsigned char { Heartbeat, Config, State, History, CheckResult }; RedisConnection(const String& host, const int port, const String& path, const String& password = "", const int db = 0); void Start(); bool IsConnected(); void FireAndForgetQuery(Query query, QueryPriority priority); void FireAndForgetQueries(Queries queries, QueryPriority priority); Reply GetResultOfQuery(Query query, QueryPriority priority); Replies GetResultsOfQueries(Queries queries, QueryPriority priority); void SuppressQueryKind(QueryPriority kind); void UnsuppressQueryKind(QueryPriority kind); private: /** * What to do with the responses to Redis queries. * * @ingroup icingadb */ enum class ResponseAction : unsigned char { Ignore, // discard Deliver, // submit to the requestor DeliverBulk // submit multiple responses to the requestor at once }; /** * What to do with how many responses to Redis queries. * * @ingroup icingadb */ struct FutureResponseAction { size_t Amount; ResponseAction Action; }; /** * Something to be send to Redis. * * @ingroup icingadb */ struct WriteQueueItem { Shared::Ptr FireAndForgetQuery; Shared::Ptr FireAndForgetQueries; Shared>>::Ptr GetResultOfQuery; Shared>>::Ptr GetResultsOfQueries; }; typedef boost::asio::ip::tcp Tcp; typedef boost::asio::local::stream_protocol Unix; typedef boost::asio::buffered_stream TcpConn; typedef boost::asio::buffered_stream UnixConn; template static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); template static std::vector ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); template static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); template static void WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc); RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db); void Connect(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc); void WriteLoop(boost::asio::yield_context& yc); void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item); Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); template Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); template void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); String m_Path; String m_Host; int m_Port; String m_Password; int m_DbIndex; boost::asio::io_context::strand m_Strand; Shared::Ptr m_TcpConn; Shared::Ptr m_UnixConn; Atomic m_Connecting, m_Connected, m_Started; struct { // Items to be send to Redis std::map> Writes; // Requestors, each waiting for a single response std::queue> ReplyPromises; // Requestors, each waiting for multiple responses at once std::queue> RepliesPromises; // Metadata about all of the above std::queue FutureResponseActions; } m_Queues; // Kinds of queries not to actually send yet std::set m_SuppressedQueryKinds; // Indicate that there's something to send/receive AsioConditionVariable m_QueuedWrites, m_QueuedReads; }; /** * An error response from the Redis server. * * @ingroup icingadb */ class RedisError final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisError); inline RedisError(String message) : m_Message(std::move(message)) { } inline const String& GetMessage() { return m_Message; } private: String m_Message; }; /** * Thrown if the connection to the Redis server has already been lost. * * @ingroup icingadb */ class RedisDisconnected : public std::runtime_error { public: inline RedisDisconnected() : runtime_error("") { } }; /** * Thrown on malformed Redis server responses. * * @ingroup icingadb */ class RedisProtocolError : public std::runtime_error { protected: inline RedisProtocolError() : runtime_error("") { } }; /** * Thrown on malformed types in Redis server responses. * * @ingroup icingadb */ class BadRedisType : public RedisProtocolError { public: inline BadRedisType(char type) : m_What{type, 0} { } virtual const char * what() const noexcept override { return m_What; } private: char m_What[2]; }; /** * Thrown on malformed ints in Redis server responses. * * @ingroup icingadb */ class BadRedisInt : public RedisProtocolError { public: inline BadRedisInt(std::vector intStr) : m_What(std::move(intStr)) { m_What.emplace_back(0); } virtual const char * what() const noexcept override { return m_What.data(); } private: std::vector m_What; }; /** * Read a Redis server response from stream * * @param stream Redis server connection * * @return The response */ template RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { return ReadRESP(*strm, yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } throw; } } /** * Write a Redis query to stream * * @param stream Redis server connection * @param query Redis query */ template void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { WriteRESP(*strm, query, yc); strm->async_flush(yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); IoEngine::SpawnCoroutine(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); }); } } throw; } } /** * Read a Redis protocol value from stream * * @param stream Redis server connection * * @return The value */ template Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; char type = 0; asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); switch (type) { case '+': { auto buf (ReadLine(stream, yc)); return String(buf.begin(), buf.end()); } case '-': { auto buf (ReadLine(stream, yc)); return new RedisError(String(buf.begin(), buf.end())); } case ':': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } return (double)i; } case '$': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } if (i < 0) { return Value(); } buf.clear(); buf.insert(buf.end(), i, 0); asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); { char crlf[2]; asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); } return String(buf.begin(), buf.end()); } case '*': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } Array::Ptr arr = new Array(); if (i < 0) { i = 0; } arr->Reserve(i); for (; i; --i) { arr->Add(ReadRESP(stream, yc)); } return arr; } default: throw BadRedisType(type); } } /** * Read from stream until \r\n * * @param stream Redis server connection * @param hint Expected amount of data * * @return Read data ex. \r\n */ template std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) { namespace asio = boost::asio; std::vector line; line.reserve(hint); char next = 0; asio::mutable_buffer buf (&next, 1); for (;;) { asio::async_read(stream, buf, yc); if (next == '\r') { asio::async_read(stream, buf, yc); return std::move(line); } line.emplace_back(next); } } /** * Write a Redis protocol value to stream * * @param stream Redis server connection * @param query Redis protocol value */ template void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; asio::async_write(stream, asio::const_buffer("*", 1), yc); WriteInt(stream, query.size(), yc); asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); for (auto& arg : query) { asio::async_write(stream, asio::const_buffer("$", 1), yc); WriteInt(stream, arg.GetLength(), yc); asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); asio::async_write(stream, asio::const_buffer(arg.CStr(), arg.GetLength()), yc); asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); } } /** * Write a Redis protocol int to stream * * @param stream Redis server connection * @param i Redis protocol int */ template void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc) { namespace asio = boost::asio; char buf[21] = {}; sprintf(buf, "%jd", i); asio::async_write(stream, asio::const_buffer(buf, strlen(buf)), yc); } } #endif //REDISCONNECTION_H