Document RedisConnection

This commit is contained in:
Alexander A. Klimov 2019-12-17 11:44:00 +01:00
parent cd44c5371a
commit b1cc0cd767
2 changed files with 151 additions and 1 deletions

View File

@ -53,6 +53,12 @@ bool RedisConnection::IsConnected() {
return m_Connected.load();
}
/**
* Append a Redis query to a log message
*
* @param query Redis query
* @param msg Log message
*/
static inline
void LogQuery(RedisConnection::Query& query, Log& msg)
{
@ -68,6 +74,12 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
}
}
/**
* Queue a Redis query for sending
*
* @param query Redis query
* @param priority The query's priority
*/
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
{
{
@ -83,6 +95,12 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, RedisConn
});
}
/**
* Queue Redis queries for sending
*
* @param queries Redis queries
* @param priority The queries' priority
*/
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
{
for (auto& query : queries) {
@ -98,6 +116,14 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, Red
});
}
/**
* Queue a Redis query for sending, wait for the response and return (or throw) it
*
* @param query Redis query
* @param priority The query's priority
*
* @return The response
*/
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, RedisConnection::QueryPriority priority)
{
{
@ -119,6 +145,14 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
return future.get();
}
/**
* Queue Redis queries for sending, wait for the responses and return (or throw) them
*
* @param queries Redis queries
* @param priority The queries' priority
*
* @return The responses
*/
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, RedisConnection::QueryPriority priority)
{
for (auto& query : queries) {
@ -140,6 +174,9 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
return future.get();
}
/**
* Try to connect to Redis
*/
void RedisConnection::Connect(asio::yield_context& yc)
{
Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
@ -182,6 +219,9 @@ void RedisConnection::Connect(asio::yield_context& yc)
}
/**
* Actually receive the responses to the Redis queries send by WriteItem() and handle them
*/
void RedisConnection::ReadLoop(asio::yield_context& yc)
{
for (;;) {
@ -262,6 +302,9 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
}
}
/**
* Actually send the Redis queries queued by {FireAndForget,GetResultsOf}{Query,Queries}()
*/
void RedisConnection::WriteLoop(asio::yield_context& yc)
{
for (;;) {
@ -285,6 +328,11 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
}
}
/**
* Send next and schedule receiving the response
*
* @param next Redis queries
*/
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
{
if (next.FireAndForgetQuery) {
@ -397,6 +445,11 @@ void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection:
}
}
/**
* Receive the response to a Redis query
*
* @return The response
*/
RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {
@ -406,6 +459,11 @@ RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
}
}
/**
* Send query
*
* @param query Redis query
*/
void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {

View File

@ -51,6 +51,11 @@ namespace icinga
typedef Value Reply;
typedef std::vector<Reply> Replies;
/**
* Redis query priorities, highest first.
*
* @ingroup icingadb
*/
enum class QueryPriority : unsigned char
{
Heartbeat,
@ -74,17 +79,34 @@ namespace icinga
Replies GetResultsOfQueries(Queries queries, QueryPriority priority);
private:
/**
* What to do with the responses to Redis queries.
*
* @ingroup icingadb
*/
enum class ResponseAction : unsigned char
{
Ignore, Deliver, DeliverBulk
Ignore, // discard
Deliver, // submit to the requestor
DeliverBulk // submit multiple responses to the requestor at once
};
/**
* What to do with how many responses to Redis queries.
*
* @ingroup icingadb
*/
struct FutureResponseAction
{
size_t Amount;
ResponseAction Action;
};
/**
* Something to be send to Redis.
*
* @ingroup icingadb
*/
struct WriteQueueItem
{
std::shared_ptr<Query> FireAndForgetQuery;
@ -138,15 +160,25 @@ namespace icinga
Atomic<bool> m_Connecting, m_Connected, m_Started;
struct {
// Items to be send to Redis
std::map<QueryPriority, std::queue<WriteQueueItem>> Writes;
// Requestors, each waiting for a single response
std::queue<std::promise<Reply>> ReplyPromises;
// Requestors, each waiting for multiple responses at once
std::queue<std::promise<Replies>> RepliesPromises;
// Metadata about all of the above
std::queue<FutureResponseAction> FutureResponseActions;
} m_Queues;
// Indicate that there's something to send/receive
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
};
/**
* An error response from the Redis server.
*
* @ingroup icingadb
*/
class RedisError final : public Object
{
public:
@ -165,6 +197,11 @@ private:
String m_Message;
};
/**
* Thrown if the connection to the Redis server has already been lost.
*
* @ingroup icingadb
*/
class RedisDisconnected : public std::runtime_error
{
public:
@ -173,6 +210,11 @@ public:
}
};
/**
* Thrown on malformed Redis server responses.
*
* @ingroup icingadb
*/
class RedisProtocolError : public std::runtime_error
{
protected:
@ -181,6 +223,11 @@ protected:
}
};
/**
* Thrown on malformed types in Redis server responses.
*
* @ingroup icingadb
*/
class BadRedisType : public RedisProtocolError
{
public:
@ -197,6 +244,11 @@ private:
char m_What[2];
};
/**
* Thrown on malformed ints in Redis server responses.
*
* @ingroup icingadb
*/
class BadRedisInt : public RedisProtocolError
{
public:
@ -214,6 +266,13 @@ private:
std::vector<char> m_What;
};
/**
* Read a Redis server response from stream
*
* @param stream Redis server connection
*
* @return The response
*/
template<class StreamPtr>
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
{
@ -245,6 +304,12 @@ RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::
}
}
/**
* Write a Redis query to stream
*
* @param stream Redis server connection
* @param query Redis query
*/
template<class StreamPtr>
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
{
@ -277,6 +342,13 @@ void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query,
}
}
/**
* Read a Redis protocol value from stream
*
* @param stream Redis server connection
*
* @return The value
*/
template<class AsyncReadStream>
Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc)
{
@ -365,6 +437,14 @@ Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_cont
}
}
/**
* Read from stream until \r\n
*
* @param stream Redis server connection
* @param hint Expected amount of data
*
* @return Read data ex. \r\n
*/
template<class AsyncReadStream>
std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint)
{
@ -388,6 +468,12 @@ std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio
}
}
/**
* Write a Redis protocol value to stream
*
* @param stream Redis server connection
* @param query Redis protocol value
*/
template<class AsyncWriteStream>
void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc)
{
@ -408,6 +494,12 @@ void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, bo
}
}
/**
* Write a Redis protocol int to stream
*
* @param stream Redis server connection
* @param i Redis protocol int
*/
template<class AsyncWriteStream>
void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc)
{