Merge pull request #7715 from Icinga/feature/redisconnection-docs

Document RedisConnection
This commit is contained in:
Michael Friedrich 2019-12-18 13:31:20 +01:00 committed by GitHub
commit b7cde77444
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 151 additions and 1 deletions

View File

@ -53,6 +53,12 @@ bool RedisConnection::IsConnected() {
return m_Connected.load(); return m_Connected.load();
} }
/**
* Append a Redis query to a log message
*
* @param query Redis query
* @param msg Log message
*/
static inline static inline
void LogQuery(RedisConnection::Query& query, Log& msg) void LogQuery(RedisConnection::Query& query, Log& msg)
{ {
@ -68,6 +74,12 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
} }
} }
/**
* Queue a Redis query for sending
*
* @param query Redis query
* @param priority The query's priority
*/
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority) void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
{ {
{ {
@ -83,6 +95,12 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
}); });
} }
/**
* Queue Redis queries for sending
*
* @param queries Redis queries
* @param priority The queries' priority
*/
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority) void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
{ {
for (auto& query : queries) { for (auto& query : queries) {
@ -98,6 +116,14 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
}); });
} }
/**
* Queue a Redis query for sending, wait for the response and return (or throw) it
*
* @param query Redis query
* @param priority The query's priority
*
* @return The response
*/
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority) RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
{ {
{ {
@ -119,6 +145,14 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
return future.get(); return future.get();
} }
/**
* Queue Redis queries for sending, wait for the responses and return (or throw) them
*
* @param queries Redis queries
* @param priority The queries' priority
*
* @return The responses
*/
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority) RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
{ {
for (auto& query : queries) { for (auto& query : queries) {
@ -140,6 +174,9 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
return future.get(); return future.get();
} }
/**
* Try to connect to Redis
*/
void RedisConnection::Connect(asio::yield_context& yc) void RedisConnection::Connect(asio::yield_context& yc)
{ {
Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); }); Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
@ -182,6 +219,9 @@ void RedisConnection::Connect(asio::yield_context& yc)
} }
/**
* Actually receive the responses to the Redis queries send by WriteItem() and handle them
*/
void RedisConnection::ReadLoop(asio::yield_context& yc) void RedisConnection::ReadLoop(asio::yield_context& yc)
{ {
for (;;) { for (;;) {
@ -262,6 +302,9 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
} }
} }
/**
* Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}()
*/
void RedisConnection::WriteLoop(asio::yield_context& yc) void RedisConnection::WriteLoop(asio::yield_context& yc)
{ {
for (;;) { for (;;) {
@ -285,6 +328,11 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
} }
} }
/**
* Send next and schedule receiving the response
*
* @param next Redis queries
*/
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next) void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
{ {
if (next.FireAndForgetQuery) { if (next.FireAndForgetQuery) {
@ -397,6 +445,11 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
} }
} }
/**
* Receive the response to a Redis query
*
* @return The response
*/
RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
{ {
if (m_Path.IsEmpty()) { if (m_Path.IsEmpty()) {
@ -406,6 +459,11 @@ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
} }
} }
/**
* Send query
*
* @param query Redis query
*/
void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
{ {
if (m_Path.IsEmpty()) { if (m_Path.IsEmpty()) {

View File

@ -51,6 +51,11 @@ namespace icinga
typedef Value Reply; typedef Value Reply;
typedef std::vector<Reply> Replies; typedef std::vector<Reply> Replies;
/**
* Redis query priorities, highest first.
*
* @ingroup icingadb
*/
enum class QueryPriority : unsigned char enum class QueryPriority : unsigned char
{ {
Heartbeat, Heartbeat,
@ -74,17 +79,34 @@ namespace icinga
Replies GetResultsOfQueries(Queries queries, QueryPriority priority); Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
private: private:
/**
* What to do with the responses to Redis queries.
*
* @ingroup icingadb
*/
enum class ResponseAction : unsigned char enum class ResponseAction : unsigned char
{ {
Ignore, Deliver, DeliverBulk Ignore, // discard
Deliver, // submit to the requestor
DeliverBulk // submit multiple responses to the requestor at once
}; };
/**
* What to do with how many responses to Redis queries.
*
* @ingroup icingadb
*/
struct FutureResponseAction struct FutureResponseAction
{ {
size_t Amount; size_t Amount;
ResponseAction Action; ResponseAction Action;
}; };
/**
* Something to be send to Redis.
*
* @ingroup icingadb
*/
struct WriteQueueItem struct WriteQueueItem
{ {
std::shared_ptr<Query> FireAndForgetQuery; std::shared_ptr<Query> FireAndForgetQuery;
@ -138,15 +160,25 @@ namespace icinga
Atomic<bool> m_Connecting, m_Connected, m_Started; Atomic<bool> m_Connecting, m_Connected, m_Started;
struct { struct {
// Items to be send to Redis
std::map<QueryPriority, std::queue<WriteQueueItem>> Writes; std::map<QueryPriority, std::queue<WriteQueueItem>> Writes;
// Requestors, each waiting for a single response
std::queue<std::promise<Reply>> ReplyPromises; std::queue<std::promise<Reply>> ReplyPromises;
// Requestors, each waiting for multiple responses at once
std::queue<std::promise<Replies>> RepliesPromises; std::queue<std::promise<Replies>> RepliesPromises;
// Metadata about all of the above
std::queue<FutureResponseAction> FutureResponseActions; std::queue<FutureResponseAction> FutureResponseActions;
} m_Queues; } m_Queues;
// Indicate that there's something to send/receive
AsioConditionVariable m_QueuedWrites, m_QueuedReads; AsioConditionVariable m_QueuedWrites, m_QueuedReads;
}; };
/**
* An error response from the Redis server.
*
* @ingroup icingadb
*/
class RedisError final : public Object class RedisError final : public Object
{ {
public: public:
@ -165,6 +197,11 @@ private:
String m_Message; String m_Message;
}; };
/**
* Thrown if the connection to the Redis server has already been lost.
*
* @ingroup icingadb
*/
class RedisDisconnected : public std::runtime_error class RedisDisconnected : public std::runtime_error
{ {
public: public:
@ -173,6 +210,11 @@ public:
} }
}; };
/**
* Thrown on malformed Redis server responses.
*
* @ingroup icingadb
*/
class RedisProtocolError : public std::runtime_error class RedisProtocolError : public std::runtime_error
{ {
protected: protected:
@ -181,6 +223,11 @@ protected:
} }
}; };
/**
* Thrown on malformed types in Redis server responses.
*
* @ingroup icingadb
*/
class BadRedisType : public RedisProtocolError class BadRedisType : public RedisProtocolError
{ {
public: public:
@ -197,6 +244,11 @@ private:
char m_What[2]; char m_What[2];
}; };
/**
* Thrown on malformed ints in Redis server responses.
*
* @ingroup icingadb
*/
class BadRedisInt : public RedisProtocolError class BadRedisInt : public RedisProtocolError
{ {
public: public:
@ -214,6 +266,13 @@ private:
std::vector<char> m_What; std::vector<char> m_What;
}; };
/**
* Read a Redis server response from stream
*
* @param stream Redis server connection
*
* @return The response
*/
template<class StreamPtr> template<class StreamPtr>
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
{ {
@ -245,6 +304,12 @@ RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::
} }
} }
/**
* Write a Redis query to stream
*
* @param stream Redis server connection
* @param query Redis query
*/
template<class StreamPtr> template<class StreamPtr>
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
{ {
@ -277,6 +342,13 @@ void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query,
} }
} }
/**
* Read a Redis protocol value from stream
*
* @param stream Redis server connection
*
* @return The value
*/
template<class AsyncReadStream> template<class AsyncReadStream>
Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc)
{ {
@ -365,6 +437,14 @@ Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_cont
} }
} }
/**
* Read from stream until \r\n
*
* @param stream Redis server connection
* @param hint Expected amount of data
*
* @return Read data ex. \r\n
*/
template<class AsyncReadStream> template<class AsyncReadStream>
std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint)
{ {
@ -388,6 +468,12 @@ std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio
} }
} }
/**
* Write a Redis protocol value to stream
*
* @param stream Redis server connection
* @param query Redis protocol value
*/
template<class AsyncWriteStream> template<class AsyncWriteStream>
void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc)
{ {
@ -408,6 +494,12 @@ void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, bo
} }
} }
/**
* Write a Redis protocol int to stream
*
* @param stream Redis server connection
* @param i Redis protocol int
*/
template<class AsyncWriteStream> template<class AsyncWriteStream>
void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc) void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc)
{ {