diff --git a/lib/icingadb/redisconnection.cpp b/lib/icingadb/redisconnection.cpp index bd94273d9..4b7d316f0 100644 --- a/lib/icingadb/redisconnection.cpp +++ b/lib/icingadb/redisconnection.cpp @@ -53,6 +53,12 @@ bool RedisConnection::IsConnected() { return m_Connected.load(); } +/** + * Append a Redis query to a log message + * + * @param query Redis query + * @param msg Log message + */ static inline void LogQuery(RedisConnection::Query& query, Log& msg) { @@ -68,6 +74,12 @@ void LogQuery(RedisConnection::Query& query, Log& msg) } } +/** + * Queue a Redis query for sending + * + * @param query Redis query + * @param priority The query's priority + */ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority) { { @@ -83,6 +95,12 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn }); } +/** + * Queue Redis queries for sending + * + * @param queries Redis queries + * @param priority The queries' priority + */ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority) { for (auto& query : queries) { @@ -98,6 +116,14 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red }); } +/** + * Queue a Redis query for sending, wait for the response and return (or throw) it + * + * @param query Redis query + * @param priority The query's priority + * + * @return The response + */ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority) { { @@ -119,6 +145,14 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query return future.get(); } +/** + * Queue Redis queries for sending, wait for the responses and return (or throw) them + * + * @param queries Redis queries + * @param priority The queries' priority + * + * @return The responses + */ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority) { for (auto& query : queries) { @@ -140,6 +174,9 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q return future.get(); } +/** + * Try to connect to Redis + */ void RedisConnection::Connect(asio::yield_context& yc) { Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); }); @@ -182,6 +219,9 @@ void RedisConnection::Connect(asio::yield_context& yc) } +/** + * Actually receive the responses to the Redis queries send by WriteItem() and handle them + */ void RedisConnection::ReadLoop(asio::yield_context& yc) { for (;;) { @@ -262,6 +302,9 @@ void RedisConnection::ReadLoop(asio::yield_context& yc) } } +/** + * Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}() + */ void RedisConnection::WriteLoop(asio::yield_context& yc) { for (;;) { @@ -285,6 +328,11 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) } } +/** + * Send next and schedule receiving the response + * + * @param next Redis queries + */ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next) { if (next.FireAndForgetQuery) { @@ -397,6 +445,11 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection: } } +/** + * Receive the response to a Redis query + * + * @return The response + */ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) { if (m_Path.IsEmpty()) { @@ -406,6 +459,11 @@ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) } } +/** + * Send query + * + * @param query Redis query + */ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) { if (m_Path.IsEmpty()) { diff --git a/lib/icingadb/redisconnection.hpp b/lib/icingadb/redisconnection.hpp index a9767d408..60a6283d5 100644 --- a/lib/icingadb/redisconnection.hpp +++ b/lib/icingadb/redisconnection.hpp @@ -51,6 +51,11 @@ namespace icinga typedef Value Reply; typedef std::vector Replies; + /** + * Redis query priorities, highest first. + * + * @ingroup icingadb + */ enum class QueryPriority : unsigned char { Heartbeat, @@ -74,17 +79,34 @@ namespace icinga Replies GetResultsOfQueries(Queries queries, QueryPriority priority); private: + /** + * What to do with the responses to Redis queries. + * + * @ingroup icingadb + */ enum class ResponseAction : unsigned char { - Ignore, Deliver, DeliverBulk + Ignore, // discard + Deliver, // submit to the requestor + DeliverBulk // submit multiple responses to the requestor at once }; + /** + * What to do with how many responses to Redis queries. + * + * @ingroup icingadb + */ struct FutureResponseAction { size_t Amount; ResponseAction Action; }; + /** + * Something to be send to Redis. + * + * @ingroup icingadb + */ struct WriteQueueItem { std::shared_ptr FireAndForgetQuery; @@ -138,15 +160,25 @@ namespace icinga Atomic m_Connecting, m_Connected, m_Started; struct { + // Items to be send to Redis std::map> Writes; + // Requestors, each waiting for a single response std::queue> ReplyPromises; + // Requestors, each waiting for multiple responses at once std::queue> RepliesPromises; + // Metadata about all of the above std::queue FutureResponseActions; } m_Queues; + // Indicate that there's something to send/receive AsioConditionVariable m_QueuedWrites, m_QueuedReads; }; +/** + * An error response from the Redis server. + * + * @ingroup icingadb + */ class RedisError final : public Object { public: @@ -165,6 +197,11 @@ private: String m_Message; }; +/** + * Thrown if the connection to the Redis server has already been lost. + * + * @ingroup icingadb + */ class RedisDisconnected : public std::runtime_error { public: @@ -173,6 +210,11 @@ public: } }; +/** + * Thrown on malformed Redis server responses. + * + * @ingroup icingadb + */ class RedisProtocolError : public std::runtime_error { protected: @@ -181,6 +223,11 @@ protected: } }; +/** + * Thrown on malformed types in Redis server responses. + * + * @ingroup icingadb + */ class BadRedisType : public RedisProtocolError { public: @@ -197,6 +244,11 @@ private: char m_What[2]; }; +/** + * Thrown on malformed ints in Redis server responses. + * + * @ingroup icingadb + */ class BadRedisInt : public RedisProtocolError { public: @@ -214,6 +266,13 @@ private: std::vector m_What; }; +/** + * Read a Redis server response from stream + * + * @param stream Redis server connection + * + * @return The response + */ template RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) { @@ -245,6 +304,12 @@ RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio:: } } +/** + * Write a Redis query to stream + * + * @param stream Redis server connection + * @param query Redis query + */ template void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) { @@ -277,6 +342,13 @@ void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, } } +/** + * Read a Redis protocol value from stream + * + * @param stream Redis server connection + * + * @return The value + */ template Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) { @@ -365,6 +437,14 @@ Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_cont } } +/** + * Read from stream until \r\n + * + * @param stream Redis server connection + * @param hint Expected amount of data + * + * @return Read data ex. \r\n + */ template std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) { @@ -388,6 +468,12 @@ std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio } } +/** + * Write a Redis protocol value to stream + * + * @param stream Redis server connection + * @param query Redis protocol value + */ template void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) { @@ -408,6 +494,12 @@ void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, bo } } +/** + * Write a Redis protocol int to stream + * + * @param stream Redis server connection + * @param i Redis protocol int + */ template void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc) {