RedisConnection: I/O the Redis protocol by itself (PoC)

This commit is contained in:
Alexander A. Klimov 2019-08-05 13:30:09 +02:00 committed by Michael Friedrich
parent 752c5998df
commit a5971df039
5 changed files with 549 additions and 310 deletions

View File

@ -17,244 +17,283 @@
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/ ******************************************************************************/
#include "base/object.hpp"
#include "redis/redisconnection.hpp" #include "redis/redisconnection.hpp"
#include "base/workqueue.hpp" #include "base/array.hpp"
#include "base/logger.hpp"
#include "base/convert.hpp" #include "base/convert.hpp"
#include "base/utility.hpp" #include "base/defer.hpp"
#include "redis/rediswriter.hpp" #include "base/io-engine.hpp"
#include "hiredis/hiredis.h" #include "base/logger.hpp"
#include "base/objectlock.hpp"
#include "base/string.hpp"
#include "base/tcpsocket.hpp"
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/coroutine/exceptions.hpp>
#include <boost/utility/string_view.hpp>
#include <boost/variant/get.hpp>
#include <iterator>
#include <memory> #include <memory>
#include <utility> #include <utility>
using namespace icinga; using namespace icinga;
namespace asio = boost::asio;
RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) : RedisConnection::RedisConnection(const String host, const int port, const String path, const String password, const int db) :
m_Host(host), m_Port(port), m_Path(path), m_Password(password), m_DbIndex(db), m_Context(NULL), m_Connected(false) RedisConnection(IoEngine::Get().GetIoService(), host, port, path, password, db)
{ {
m_RedisConnectionWorkQueue.SetName("RedisConnection");
} }
void RedisConnection::StaticInitialize() RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db)
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
m_Connecting(false), m_Connected(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io)
{ {
} }
void RedisConnection::Start() void RedisConnection::Start()
{ {
RedisConnection::Connect(); if (!m_Connecting.exchange(true)) {
Ptr keepAlive (this);
std::thread thread(&RedisConnection::HandleRW, this); asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
thread.detach();
}
void RedisConnection::AssertOnWorkQueue()
{
ASSERT(m_RedisConnectionWorkQueue.IsWorkerThread());
}
void RedisConnection::HandleRW()
{
Utility::SetThreadName("RedisConnection Handler");
for (;;) {
try {
{
boost::mutex::scoped_lock lock(m_CMutex);
if (!m_Connected)
return;
redisAsyncHandleWrite(m_Context);
redisAsyncHandleRead(m_Context);
}
Utility::Sleep(0.1);
} catch (const std::exception&) {
Log(LogCritical, "RedisWriter", "Internal Redis Error");
}
} }
} }
void RedisConnection::RedisInitialCallback(redisAsyncContext *c, void *r, void *p)
{
auto state = (ConnectionState *) p;
if (state->state != Starting && !r) {
Log(LogCritical, "RedisConnection")
<< "No answer from Redis during initial connection, is the Redis server running?";
return;
} else if (r != nullptr) {
redisReply *rep = (redisReply *) r;
if (rep->type == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisConnection")
<< "Failed to connect to Redis: " << rep->str;
state->conn->m_Connected = false;
return;
}
}
if (state->state == Starting) {
state->state = Auth;
if (!state->conn->m_Password.IsEmpty()) {
boost::mutex::scoped_lock lock(state->conn->m_CMutex);
redisAsyncCommand(c, &RedisInitialCallback, p, "AUTH %s", state->conn->m_Password.CStr());
return;
}
}
if (state->state == Auth)
{
state->state = DBSelect;
if (state->conn->m_DbIndex != 0) {
boost::mutex::scoped_lock lock(state->conn->m_CMutex);
redisAsyncCommand(c, &RedisInitialCallback, p, "SELECT %d", state->conn->m_DbIndex);
return;
}
}
if (state->state == DBSelect)
state->conn->m_Connected = true;
}
bool RedisConnection::IsConnected() { bool RedisConnection::IsConnected() {
return m_Connected; return m_Connected.load();
} }
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
void RedisConnection::Connect()
{ {
if (m_Connected) auto item (std::make_shared<decltype(m_Queues.FireAndForgetQuery)::value_type>(std::move(query)));
return;
Log(LogInformation, "RedisWriter", "Trying to connect to redis server Async"); asio::post(m_Strand, [this, item]() {
{ m_Queues.FireAndForgetQuery.emplace(std::move(*item));
boost::mutex::scoped_lock lock(m_CMutex); m_QueuedWrites.Set();
});
if (m_Path.IsEmpty())
m_Context = redisAsyncConnect(m_Host.CStr(), m_Port);
else
m_Context = redisAsyncConnectUnix(m_Path.CStr());
m_Context->data = (void*) this;
redisAsyncSetConnectCallback(m_Context, &ConnectCallback);
redisAsyncSetDisconnectCallback(m_Context, &DisconnectCallback);
}
m_State = ConnectionState{Starting, this};
RedisInitialCallback(m_Context, nullptr, (void*)&m_State);
} }
void RedisConnection::Disconnect() void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
{ {
redisAsyncDisconnect(m_Context); auto item (std::make_shared<decltype(m_Queues.FireAndForgetQueries)::value_type>(std::move(queries)));
asio::post(m_Strand, [this, item]() {
m_Queues.FireAndForgetQueries.emplace(std::move(*item));
m_QueuedWrites.Set();
});
} }
void RedisConnection::ConnectCallback(const redisAsyncContext *c, int status) RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query)
{ {
auto *rc = (RedisConnection* ) const_cast<redisAsyncContext*>(c)->data; std::promise<Reply> promise;
if (status != REDIS_OK) { auto future (promise.get_future());
if (c->err != 0) { auto item (std::make_shared<decltype(m_Queues.GetResultOfQuery)::value_type>(std::move(query), std::move(promise)));
Log(LogCritical, "RedisConnection")
<< "Redis connection failure: " << c->errstr; asio::post(m_Strand, [this, item]() {
m_Queues.GetResultOfQuery.emplace(std::move(*item));
m_QueuedWrites.Set();
});
item = nullptr;
future.wait();
return future.get();
}
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries)
{
std::promise<Replies> promise;
auto future (promise.get_future());
auto item (std::make_shared<decltype(m_Queues.GetResultsOfQueries)::value_type>(std::move(queries), std::move(promise)));
asio::post(m_Strand, [this, item]() {
m_Queues.GetResultsOfQueries.emplace(std::move(*item));
m_QueuedWrites.Set();
});
item = nullptr;
future.wait();
return future.get();
}
void RedisConnection::Connect(asio::yield_context& yc)
{
Defer notConnecting ([this]() {
if (!m_Connected.load()) {
m_Connecting.store(false);
}
});
Log(LogInformation, "RedisWriter", "Trying to connect to Redis server (async)");
try {
if (m_Path.IsEmpty()) {
m_TcpConn = decltype(m_TcpConn)(new TcpConn(m_Strand.context()));
icinga::Connect(m_TcpConn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
} else { } else {
Log(LogCritical, "RedisConnection") m_UnixConn = decltype(m_UnixConn)(new UnixConn(m_Strand.context()));
<< "Redis connection failure"; m_UnixConn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
} }
rc->m_Connected = false;
} else {
Log(LogInformation, "RedisConnection")
<< "Redis Connection: O N L I N E";
}
}
// It's unfortunate we can not pass any user data here. All we get to do is log a message and hope for the best {
void RedisConnection::DisconnectCallback(const redisAsyncContext *c, int status) Ptr keepAlive (this);
{
auto *rc = (RedisConnection* ) const_cast<redisAsyncContext*>(c)->data;
boost::mutex::scoped_lock lock(rc->m_CMutex);
if (status == REDIS_OK)
Log(LogInformation, "RedisConnection") << "Redis disconnected by us";
else {
if (c->err != 0)
Log(LogCritical, "RedisConnection") << "Redis disconnected by server. Reason: " << c->errstr;
else
Log(LogCritical, "RedisConnection") << "Redis disconnected by server";
}
rc->m_Connected = false; asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
} asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
void RedisConnection::ExecuteQuery(std::vector<String> query, redisCallbackFn *fn, void *privdata)
{
auto queryPtr (std::make_shared<decltype(query)>(std::move(query)));
m_RedisConnectionWorkQueue.Enqueue([this, queryPtr, fn, privdata]() {
SendMessageInternal(*queryPtr, fn, privdata);
});
}
void
RedisConnection::ExecuteQueries(std::vector<std::vector<String>> queries, redisCallbackFn *fn, void *privdata)
{
auto queriesPtr (std::make_shared<decltype(queries)>(std::move(queries)));
m_RedisConnectionWorkQueue.Enqueue([this, queriesPtr, fn, privdata]() {
SendMessagesInternal(*queriesPtr, fn, privdata);
});
}
void RedisConnection::SendMessageInternal(const std::vector<String>& query, redisCallbackFn *fn, void *privdata)
{
AssertOnWorkQueue();
{
boost::mutex::scoped_lock lock(m_CMutex);
if (!m_Context || !m_Connected) {
Log(LogCritical, "RedisWriter")
<< "Not connected to Redis";
return;
} }
}
const char **argv; m_Connected.store(true);
size_t *argvlen; } catch (const boost::coroutines::detail::forced_unwind&) {
throw;
argv = new const char *[query.size()]; } catch (const std::exception& ex) {
argvlen = new size_t[query.size()];
String debugstr;
for (std::vector<String>::size_type i = 0; i < query.size(); i++) {
argv[i] = query[i].CStr();
argvlen[i] = query[i].GetLength();
debugstr += argv[i];
debugstr += " ";
}
Log(LogDebug, "RedisWriter, Connection")
<< "Sending Command: " << debugstr;
int r;
{
boost::mutex::scoped_lock lock(m_CMutex);
r = redisAsyncCommandArgv(m_Context, fn, privdata, query.size(), argv, argvlen);
}
delete[] argv;
delete[] argvlen;
if (r == REDIS_REPLY_ERROR) {
Log(LogCritical, "RedisWriter") Log(LogCritical, "RedisWriter")
<< "Redis Async query failed"; << "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
BOOST_THROW_EXCEPTION(
redis_error()
<< errinfo_redis_query(Utility::Join(Array::FromVector(query), ' ', false))
);
} }
} }
void RedisConnection::SendMessagesInternal(const std::vector<std::vector<String>>& queries, redisCallbackFn *fn, void *privdata) void RedisConnection::ReadLoop(asio::yield_context& yc)
{ {
for (const auto& query : queries) { for (;;) {
SendMessageInternal(query, fn, privdata); m_QueuedReads.Wait(yc);
do {
auto item (std::move(m_Queues.FutureResponseActions.front()));
m_Queues.FutureResponseActions.pop();
switch (item.Action) {
case ResponseAction::Ignore:
for (auto i (item.Amount); i; --i) {
ReadOne(yc);
}
break;
case ResponseAction::Deliver:
for (auto i (item.Amount); i; --i) {
auto promise (std::move(m_Queues.ReplyPromises.front()));
m_Queues.ReplyPromises.pop();
promise.set_value(ReadOne(yc));
}
break;
case ResponseAction::DeliverBulk:
{
auto promise (std::move(m_Queues.RepliesPromises.front()));
m_Queues.RepliesPromises.pop();
Replies replies;
replies.reserve(item.Amount);
for (auto i (item.Amount); i; --i) {
replies.emplace_back(ReadOne(yc));
}
promise.set_value(std::move(replies));
}
}
} while (!m_Queues.FutureResponseActions.empty());
m_QueuedReads.Clear();
}
}
void RedisConnection::WriteLoop(asio::yield_context& yc)
{
for (;;) {
m_QueuedWrites.Wait(yc);
bool writtenAll = true;
do {
writtenAll = true;
if (!m_Queues.FireAndForgetQuery.empty()) {
auto item (std::move(m_Queues.FireAndForgetQuery.front()));
m_Queues.FireAndForgetQuery.pop();
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
writtenAll = false;
WriteOne(item, yc);
}
if (!m_Queues.FireAndForgetQueries.empty()) {
auto item (std::move(m_Queues.FireAndForgetQueries.front()));
m_Queues.FireAndForgetQueries.pop();
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
} else {
m_Queues.FutureResponseActions.back().Amount += item.size();
}
m_QueuedReads.Set();
writtenAll = false;
for (auto& query : item) {
WriteOne(query, yc);
}
}
if (!m_Queues.GetResultOfQuery.empty()) {
auto item (std::move(m_Queues.GetResultOfQuery.front()));
m_Queues.GetResultOfQuery.pop();
m_Queues.ReplyPromises.emplace(std::move(item.second));
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
writtenAll = false;
WriteOne(item.first, yc);
}
if (!m_Queues.GetResultsOfQueries.empty()) {
auto item (std::move(m_Queues.GetResultsOfQueries.front()));
m_Queues.GetResultsOfQueries.pop();
m_Queues.RepliesPromises.emplace(std::move(item.second));
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
m_QueuedReads.Set();
writtenAll = false;
for (auto& query : item.first) {
WriteOne(query, yc);
}
}
} while (!writtenAll);
m_QueuedWrites.Clear();
if (m_Path.IsEmpty()) {
m_TcpConn->async_flush(yc);
} else {
m_UnixConn->async_flush(yc);
}
}
}
RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {
return ReadRESP(*m_TcpConn, yc);
} else {
return ReadRESP(*m_UnixConn, yc);
}
}
void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {
WriteRESP(*m_TcpConn, query, yc);
} else {
WriteRESP(*m_UnixConn, query, yc);
} }
} }

View File

@ -20,9 +20,35 @@
#ifndef REDISCONNECTION_H #ifndef REDISCONNECTION_H
#define REDISCONNECTION_H #define REDISCONNECTION_H
#include <third-party/hiredis/async.h> #include <boost/asio/spawn.hpp>
#include <boost/asio/buffer.hpp>
#include <boost/asio/buffered_stream.hpp>
#include <boost/asio/io_context.hpp>
#include <boost/asio/io_context_strand.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/local/stream_protocol.hpp>
#include <boost/asio/read.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/streambuf.hpp>
#include <boost/asio/write.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/utility/string_view.hpp>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <future>
#include <memory>
#include <queue>
#include <stdexcept>
#include <utility>
#include <vector>
#include "base/array.hpp"
#include "base/atomic.hpp"
#include "base/io-engine.hpp"
#include "base/object.hpp" #include "base/object.hpp"
#include "base/workqueue.hpp" #include "base/string.hpp"
#include "base/value.hpp"
namespace icinga namespace icinga
{ {
@ -31,77 +57,288 @@ namespace icinga
* *
* @ingroup redis * @ingroup redis
*/ */
enum conn_state{
Starting,
Auth,
DBSelect,
Done,
};
class RedisConnection;
struct ConnectionState {
conn_state state;
RedisConnection *conn;
};
class RedisConnection final : public Object class RedisConnection final : public Object
{ {
public: public:
DECLARE_PTR_TYPEDEFS(RedisConnection); DECLARE_PTR_TYPEDEFS(RedisConnection);
typedef std::vector<String> Query;
typedef std::vector<Query> Queries;
typedef Value Reply;
typedef std::vector<Reply> Replies;
RedisConnection(const String host, const int port, const String path, const String password = "", const int db = 0); RedisConnection(const String host, const int port, const String path, const String password = "", const int db = 0);
void Start(); void Start();
void Connect();
void Disconnect();
bool IsConnected(); bool IsConnected();
void ExecuteQuery(std::vector<String> query, redisCallbackFn *fn = NULL, void *privdata = NULL); void FireAndForgetQuery(Query query);
void FireAndForgetQueries(Queries queries);
void ExecuteQueries(std::vector<std::vector<String>> queries, redisCallbackFn *fn = NULL, Reply GetResultOfQuery(Query query);
void *privdata = NULL); Replies GetResultsOfQueries(Queries queries);
private: private:
static void StaticInitialize(); enum class ResponseAction : unsigned char
{
Ignore, Deliver, DeliverBulk
};
void SendMessageInternal(const std::vector<String>& query, redisCallbackFn *fn, void *privdata); struct FutureResponseAction
void SendMessagesInternal(const std::vector<std::vector<String>>& queries, redisCallbackFn *fn, void *privdata); {
size_t Amount;
ResponseAction Action;
};
void AssertOnWorkQueue(); typedef boost::asio::ip::tcp Tcp;
typedef boost::asio::local::stream_protocol Unix;
void HandleRW(); typedef boost::asio::buffered_stream<Tcp::socket> TcpConn;
typedef boost::asio::buffered_stream<Unix::socket> UnixConn;
static void DisconnectCallback(const redisAsyncContext *c, int status); template<class AsyncReadStream>
static void ConnectCallback(const redisAsyncContext *c, int status); static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc);
static void RedisInitialCallback(redisAsyncContext *c, void *r, void *p); template<class AsyncReadStream>
static std::vector<char> ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0);
template<class AsyncWriteStream>
static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc);
WorkQueue m_RedisConnectionWorkQueue{100000}; template<class AsyncWriteStream>
Timer::Ptr m_EventLoop; static void WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc);
redisAsyncContext *m_Context; RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db);
void Connect(boost::asio::yield_context& yc);
void ReadLoop(boost::asio::yield_context& yc);
void WriteLoop(boost::asio::yield_context& yc);
Reply ReadOne(boost::asio::yield_context& yc);
void WriteOne(Query& query, boost::asio::yield_context& yc);
String m_Path; String m_Path;
String m_Host; String m_Host;
int m_Port; int m_Port;
String m_Password; String m_Password;
int m_DbIndex; int m_DbIndex;
bool m_Connected;
boost::mutex m_CMutex; boost::asio::io_context::strand m_Strand;
ConnectionState m_State; std::unique_ptr<TcpConn> m_TcpConn;
std::unique_ptr<UnixConn> m_UnixConn;
Atomic<bool> m_Connecting, m_Connected;
struct {
std::queue<Query> FireAndForgetQuery;
std::queue<Queries> FireAndForgetQueries;
std::queue<std::pair<Query, std::promise<Reply>>> GetResultOfQuery;
std::queue<std::pair<Queries, std::promise<Replies>>> GetResultsOfQueries;
std::queue<std::promise<Reply>> ReplyPromises;
std::queue<std::promise<Replies>> RepliesPromises;
std::queue<FutureResponseAction> FutureResponseActions;
} m_Queues;
AsioConditionVariable m_QueuedWrites, m_QueuedReads;
}; };
struct redis_error : virtual std::exception, virtual boost::exception { }; class RedisError final : public Object
{
public:
DECLARE_PTR_TYPEDEFS(RedisError);
struct errinfo_redis_query_; inline RedisError(String message) : m_Message(std::move(message))
typedef boost::error_info<struct errinfo_redis_query_, std::string> errinfo_redis_query; {
}
inline const String& GetMessage()
{
return m_Message;
}
private:
String m_Message;
};
class RedisProtocolError : public std::runtime_error
{
protected:
inline RedisProtocolError() : runtime_error("")
{
}
};
class BadRedisType : public RedisProtocolError
{
public:
inline BadRedisType(char type) : m_What{type, 0}
{
}
virtual const char * what() const noexcept override
{
return m_What;
}
private:
char m_What[2];
};
class BadRedisInt : public RedisProtocolError
{
public:
inline BadRedisInt(std::vector<char> intStr) : m_What(std::move(intStr))
{
m_What.emplace_back(0);
}
virtual const char * what() const noexcept override
{
return m_What.data();
}
private:
std::vector<char> m_What;
};
template<class AsyncReadStream>
Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc)
{
namespace asio = boost::asio;
char type = 0;
asio::async_read(stream, asio::mutable_buffer(&type, 1), yc);
switch (type) {
case '+':
{
auto buf (ReadLine(stream, yc));
return String(buf.begin(), buf.end());
}
case '-':
{
auto buf (ReadLine(stream, yc));
return new RedisError(String(buf.begin(), buf.end()));
}
case ':':
{
auto buf (ReadLine(stream, yc, 21));
intmax_t i = 0;
try {
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
} catch (...) {
throw BadRedisInt(std::move(buf));
}
return (double)i;
}
case '$':
{
auto buf (ReadLine(stream, yc, 21));
intmax_t i = 0;
try {
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
} catch (...) {
throw BadRedisInt(std::move(buf));
}
if (i < 0) {
return Value();
}
buf.clear();
buf.insert(buf.end(), i, 0);
asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc);
{
char crlf[2];
asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc);
}
return String(buf.begin(), buf.end());
}
case '*':
{
auto buf (ReadLine(stream, yc, 21));
intmax_t i = 0;
try {
i = boost::lexical_cast<intmax_t>(boost::string_view(buf.data(), buf.size()));
} catch (...) {
throw BadRedisInt(std::move(buf));
}
Array::Ptr arr = new Array();
if (i < 0) {
i = 0;
}
arr->Reserve(i);
for (; i; --i) {
arr->Add(ReadRESP(stream, yc));
}
return arr;
}
default:
throw BadRedisType(type);
}
}
template<class AsyncReadStream>
std::vector<char> RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint)
{
namespace asio = boost::asio;
std::vector<char> line;
line.reserve(hint);
char next = 0;
asio::mutable_buffer buf (&next, 1);
for (;;) {
asio::async_read(stream, buf, yc);
if (next == '\r') {
asio::async_read(stream, buf, yc);
return std::move(line);
}
line.emplace_back(next);
}
}
template<class AsyncWriteStream>
void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc)
{
namespace asio = boost::asio;
asio::async_write(stream, asio::const_buffer("*", 1), yc);
WriteInt(stream, query.size(), yc);
asio::async_write(stream, asio::const_buffer("\r\n", 2), yc);
for (auto& arg : query) {
asio::async_write(stream, asio::const_buffer("$", 1), yc);
WriteInt(stream, arg.GetLength(), yc);
asio::async_write(stream, asio::const_buffer("\r\n", 2), yc);
asio::async_write(stream, asio::const_buffer(arg.CStr(), arg.GetLength()), yc);
asio::async_write(stream, asio::const_buffer("\r\n", 2), yc);
}
}
template<class AsyncWriteStream>
void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc)
{
namespace asio = boost::asio;
char buf[21] = {};
sprintf(buf, "%jd", i);
asio::async_write(stream, asio::const_buffer(buf, strlen(buf)), yc);
}
} }

View File

@ -166,7 +166,7 @@ void RedisWriter::UpdateAllConfigObjects()
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(std::move(transaction)); m_Rcon->FireAndForgetQueries(std::move(transaction));
transaction = {{"MULTI"}}; transaction = {{"MULTI"}};
} }
} }
@ -184,10 +184,10 @@ void RedisWriter::UpdateAllConfigObjects()
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(std::move(transaction)); m_Rcon->FireAndForgetQueries(std::move(transaction));
} }
m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:dump", lcType}); m_Rcon->FireAndForgetQuery({"PUBLISH", "icinga:config:dump", lcType});
Log(LogNotice, "RedisWriter") Log(LogNotice, "RedisWriter")
<< "Dumped " << bulkCounter << " objects of type " << type.second; << "Dumped " << bulkCounter << " objects of type " << type.second;
@ -249,7 +249,7 @@ void RedisWriter::DeleteKeys(const std::vector<String>& keys) {
query.emplace_back(key); query.emplace_back(key);
} }
m_Rcon->ExecuteQuery(std::move(query)); m_Rcon->FireAndForgetQuery(std::move(query));
} }
std::vector<String> RedisWriter::GetTypeObjectKeys(const String& type) std::vector<String> RedisWriter::GetTypeObjectKeys(const String& type)
@ -649,7 +649,7 @@ void RedisWriter::UpdateState(const Checkable::Ptr& checkable)
{ {
Dictionary::Ptr stateAttrs = SerializeState(checkable); Dictionary::Ptr stateAttrs = SerializeState(checkable);
m_Rcon->ExecuteQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)}); m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + GetLowerCaseTypeNameDB(checkable), GetObjectIdentifier(checkable), JsonEncode(stateAttrs)});
} }
// Used to update a single object, used for runtime updates // Used to update a single object, used for runtime updates
@ -666,7 +666,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime
CreateConfigUpdate(object, typeName, statements, runtimeUpdate); CreateConfigUpdate(object, typeName, statements, runtimeUpdate);
Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object); Checkable::Ptr checkable = dynamic_pointer_cast<Checkable>(object);
if (checkable) { if (checkable) {
m_Rcon->ExecuteQuery({"HSET", m_PrefixStateObject + typeName, m_Rcon->FireAndForgetQuery({"HSET", m_PrefixStateObject + typeName,
GetObjectIdentifier(checkable), JsonEncode(SerializeState(checkable))}); GetObjectIdentifier(checkable), JsonEncode(SerializeState(checkable))});
} }
@ -681,7 +681,7 @@ void RedisWriter::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtime
if (transaction.size() > 1) { if (transaction.size() > 1) {
transaction.push_back({"EXEC"}); transaction.push_back({"EXEC"});
m_Rcon->ExecuteQueries(std::move(transaction)); m_Rcon->FireAndForgetQueries(std::move(transaction));
} }
} }
@ -965,7 +965,7 @@ RedisWriter::CreateConfigUpdate(const ConfigObject::Ptr& object, const String ty
/* Send an update event to subscribers. */ /* Send an update event to subscribers. */
if (runtimeUpdate) { if (runtimeUpdate) {
m_Rcon->ExecuteQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey}); m_Rcon->FireAndForgetQuery({"PUBLISH", "icinga:config:update", typeName + ":" + objectKey});
} }
} }
@ -974,7 +974,7 @@ void RedisWriter::SendConfigDelete(const ConfigObject::Ptr& object)
String typeName = object->GetReflectionType()->GetName().ToLower(); String typeName = object->GetReflectionType()->GetName().ToLower();
String objectKey = GetObjectIdentifier(object); String objectKey = GetObjectIdentifier(object);
m_Rcon->ExecuteQueries({ m_Rcon->FireAndForgetQueries({
{"HDEL", m_PrefixConfigObject + typeName, objectKey}, {"HDEL", m_PrefixConfigObject + typeName, objectKey},
{"DEL", m_PrefixStateObject + typeName + ":" + objectKey}, {"DEL", m_PrefixStateObject + typeName + ":" + objectKey},
{"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey} {"PUBLISH", "icinga:config:delete", typeName + ":" + objectKey}
@ -1010,7 +1010,7 @@ void RedisWriter::SendStatusUpdate(const ConfigObject::Ptr& object)
streamadd.emplace_back(kv.second); streamadd.emplace_back(kv.second);
} }
m_Rcon->ExecuteQuery(std::move(streamadd)); m_Rcon->FireAndForgetQuery(std::move(streamadd));
} }
Dictionary::Ptr RedisWriter::SerializeState(const Checkable::Ptr& checkable) Dictionary::Ptr RedisWriter::SerializeState(const Checkable::Ptr& checkable)
@ -1147,7 +1147,7 @@ RedisWriter::UpdateObjectAttrs(const ConfigObject::Ptr& object, int fieldType,
typeName = typeNameOverride.ToLower(); typeName = typeNameOverride.ToLower();
return {GetObjectIdentifier(object), JsonEncode(attrs)}; return {GetObjectIdentifier(object), JsonEncode(attrs)};
//m_Rcon->ExecuteQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)}); //m_Rcon->FireAndForgetQuery({"HSET", keyPrefix + typeName, GetObjectIdentifier(object), JsonEncode(attrs)});
} }
void RedisWriter::StateChangeHandler(const ConfigObject::Ptr &object) void RedisWriter::StateChangeHandler(const ConfigObject::Ptr &object)

View File

@ -150,29 +150,23 @@ void RedisWriter::UpdateSubscriptions()
if (!m_Rcon || !m_Rcon->IsConnected()) if (!m_Rcon || !m_Rcon->IsConnected())
return; return;
long long cursor = 0; String cursor = "0";
String keyPrefix = "icinga:subscription:"; String keyPrefix = "icinga:subscription:";
do { do {
auto reply = RedisGet({ "SCAN", Convert::ToString(cursor), "MATCH", keyPrefix + "*", "COUNT", "1000" }); Array::Ptr reply = m_Rcon->GetResultOfQuery({ "SCAN", cursor, "MATCH", keyPrefix + "*", "COUNT", "1000" });
VERIFY(reply->GetLength() % 2u == 0u);
VERIFY(reply->type == REDIS_REPLY_ARRAY); cursor = reply->Get(0);
VERIFY(reply->elements % 2 == 0);
redisReply *cursorReply = reply->element[0]; Array::Ptr keys = reply->Get(1);
cursor = Convert::ToLong(cursorReply->str); ObjectLock oLock (keys);
redisReply *keysReply = reply->element[1]; for (String key : keys) {
if (boost::algorithm::ends_with(key, ":limit"))
for (size_t i = 0; i < keysReply->elements; i++) {
if (boost::algorithm::ends_with(keysReply->element[i]->str, ":limit"))
continue; continue;
redisReply *keyReply = keysReply->element[i];
VERIFY(keyReply->type == REDIS_REPLY_STRING);
RedisSubscriptionInfo rsi; RedisSubscriptionInfo rsi;
String key = keysReply->element[i]->str;
if (!RedisWriter::GetSubscriptionTypes(key, rsi)) { if (!RedisWriter::GetSubscriptionTypes(key, rsi)) {
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
@ -181,7 +175,7 @@ void RedisWriter::UpdateSubscriptions()
m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi; m_Subscriptions[key.SubStr(keyPrefix.GetLength())] = rsi;
} }
} }
} while (cursor != 0); } while (cursor != "0");
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Current Redis event subscriptions: " << m_Subscriptions.size(); << "Current Redis event subscriptions: " << m_Subscriptions.size();
@ -190,14 +184,17 @@ void RedisWriter::UpdateSubscriptions()
bool RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi) bool RedisWriter::GetSubscriptionTypes(String key, RedisSubscriptionInfo& rsi)
{ {
try { try {
auto redisReply = RedisGet({ "SMEMBERS", key }); Array::Ptr redisReply = m_Rcon->GetResultOfQuery({ "SMEMBERS", key });
VERIFY(redisReply->type == REDIS_REPLY_ARRAY);
if (redisReply->elements == 0) if (redisReply->GetLength() == 0)
return false; return false;
for (size_t j = 0; j < redisReply->elements; j++) { {
rsi.EventTypes.insert(redisReply->element[j]->str); ObjectLock oLock (redisReply);
for (String member : redisReply) {
rsi.EventTypes.insert(member);
}
} }
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
@ -229,7 +226,7 @@ void RedisWriter::PublishStats()
status->Set("config_dump_in_progress", m_ConfigDumpInProgress); status->Set("config_dump_in_progress", m_ConfigDumpInProgress);
String jsonStats = JsonEncode(status); String jsonStats = JsonEncode(status);
m_Rcon->ExecuteQuery({ "PUBLISH", "icinga:stats", jsonStats }); m_Rcon->FireAndForgetQuery({ "PUBLISH", "icinga:stats", jsonStats });
} }
void RedisWriter::HandleEvents() void RedisWriter::HandleEvents()
@ -281,20 +278,19 @@ void RedisWriter::HandleEvent(const Dictionary::Ptr& event)
String body = JsonEncode(event); String body = JsonEncode(event);
auto maxExists = RedisGet({ "EXISTS", "icinga:subscription:" + name + ":limit" }); double maxExists = m_Rcon->GetResultOfQuery({ "EXISTS", "icinga:subscription:" + name + ":limit" });
long maxEvents = MAX_EVENTS_DEFAULT; long maxEvents = MAX_EVENTS_DEFAULT;
if (maxExists->integer) { if (maxExists != 0) {
auto redisReply = RedisGet({ "GET", "icinga:subscription:" + name + ":limit"}); String redisReply = m_Rcon->GetResultOfQuery({ "GET", "icinga:subscription:" + name + ":limit"});
VERIFY(redisReply->type == REDIS_REPLY_STRING);
Log(LogInformation, "RedisWriter") Log(LogInformation, "RedisWriter")
<< "Got limit " << redisReply->str << " for " << name; << "Got limit " << redisReply << " for " << name;
maxEvents = Convert::ToLong(redisReply->str); maxEvents = Convert::ToLong(redisReply);
} }
m_Rcon->ExecuteQueries({ m_Rcon->FireAndForgetQueries({
{ "MULTI" }, { "MULTI" },
{ "LPUSH", "icinga:event:" + name, body }, { "LPUSH", "icinga:event:" + name, body },
{ "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)}, { "LTRIM", "icinga:event:" + name, "0", String(maxEvents - 1)},
@ -354,7 +350,7 @@ void RedisWriter::SendEvent(const Dictionary::Ptr& event)
// Log(LogInformation, "RedisWriter") // Log(LogInformation, "RedisWriter")
// << "Sending event \"" << body << "\""; // << "Sending event \"" << body << "\"";
m_Rcon->ExecuteQueries({ m_Rcon->FireAndForgetQueries({
{ "PUBLISH", "icinga:event:all", body }, { "PUBLISH", "icinga:event:all", body },
{ "PUBLISH", "icinga:event:" + event->Get("type"), body }}); { "PUBLISH", "icinga:event:" + event->Get("type"), body }});
} }
@ -385,21 +381,6 @@ struct synchronousWait {
redisReply* reply; redisReply* reply;
}; };
void RedisWriter::RedisQueryCallback(redisAsyncContext *c, void *r, void *p) {
auto wait = (struct synchronousWait*) p;
auto rp = reinterpret_cast<redisReply *>(r);
if (r == NULL)
wait->reply = nullptr;
else
wait->reply = RedisWriter::dupReplyObject(rp);
boost::mutex::scoped_lock lock(wait->mtx);
wait->ready = true;
wait->cv.notify_all();
}
struct RedisReplyDeleter struct RedisReplyDeleter
{ {
inline void operator() (redisReply *reply) inline void operator() (redisReply *reply)
@ -407,19 +388,3 @@ struct RedisReplyDeleter
freeReplyObject(reply); freeReplyObject(reply);
} }
}; };
std::shared_ptr<redisReply> RedisWriter::RedisGet(std::vector<String> query) {
auto *wait = new synchronousWait;
wait->ready = false;
m_Rcon->ExecuteQuery(std::move(query), RedisQueryCallback, wait);
boost::mutex::scoped_lock lock(wait->mtx);
while (!wait->ready) {
wait->cv.wait(lock);
if (!wait->ready)
wait->ready = true;
}
return std::shared_ptr<redisReply>(wait->reply, RedisReplyDeleter());
}

View File

@ -115,8 +115,6 @@ private:
void ExceptionHandler(boost::exception_ptr exp); void ExceptionHandler(boost::exception_ptr exp);
//Used to get a reply from the asyncronous connection //Used to get a reply from the asyncronous connection
std::shared_ptr<redisReply> RedisGet(std::vector<String> query);
static void RedisQueryCallback(redisAsyncContext *c, void *r, void *p);
static redisReply* dupReplyObject(redisReply* reply); static redisReply* dupReplyObject(redisReply* reply);