/****************************************************************************** * Icinga 2 * * Copyright (C) 2012-2018 Icinga Development Team (https://www.icinga.com/) * * * * This program is free software; you can redistribute it and/or * * modify it under the terms of the GNU General Public License * * as published by the Free Software Foundation; either version 2 * * of the License, or (at your option) any later version. * * * * This program is distributed in the hope that it will be useful, * * but WITHOUT ANY WARRANTY; without even the implied warranty of * * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * * GNU General Public License for more details. * * * * You should have received a copy of the GNU General Public License * * along with this program; if not, write to the Free Software Foundation * * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. * ******************************************************************************/ #ifndef REDISCONNECTION_H #define REDISCONNECTION_H #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "base/array.hpp" #include "base/atomic.hpp" #include "base/io-engine.hpp" #include "base/object.hpp" #include "base/string.hpp" #include "base/value.hpp" namespace icinga { /** * An Async Redis connection. * * @ingroup icingadb */ class RedisConnection final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisConnection); typedef std::vector Query; typedef std::vector Queries; typedef Value Reply; typedef std::vector Replies; RedisConnection(const String host, const int port, const String path, const String password = "", const int db = 0); void Start(); bool IsConnected(); void FireAndForgetQuery(Query query); void FireAndForgetQueries(Queries queries); Reply GetResultOfQuery(Query query); Replies GetResultsOfQueries(Queries queries); private: enum class ResponseAction : unsigned char { Ignore, Deliver, DeliverBulk }; struct FutureResponseAction { size_t Amount; ResponseAction Action; }; typedef boost::asio::ip::tcp Tcp; typedef boost::asio::local::stream_protocol Unix; typedef boost::asio::buffered_stream TcpConn; typedef boost::asio::buffered_stream UnixConn; template static Value ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc); template static std::vector ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint = 0); template static void WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc); template static void WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc); RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db); void Connect(boost::asio::yield_context& yc); void ReadLoop(boost::asio::yield_context& yc); void WriteLoop(boost::asio::yield_context& yc); Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); template Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); template void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); String m_Path; String m_Host; int m_Port; String m_Password; int m_DbIndex; boost::asio::io_context::strand m_Strand; std::shared_ptr m_TcpConn; std::shared_ptr m_UnixConn; Atomic m_Connecting, m_Connected, m_Started; struct WriteQueueItem { std::shared_ptr FireAndForgetQuery; std::shared_ptr FireAndForgetQueries; std::shared_ptr>> GetResultOfQuery; std::shared_ptr>> GetResultsOfQueries; }; struct { std::queue Writes; std::queue> ReplyPromises; std::queue> RepliesPromises; std::queue FutureResponseActions; } m_Queues; AsioConditionVariable m_QueuedWrites, m_QueuedReads; }; class RedisError final : public Object { public: DECLARE_PTR_TYPEDEFS(RedisError); inline RedisError(String message) : m_Message(std::move(message)) { } inline const String& GetMessage() { return m_Message; } private: String m_Message; }; class RedisDisconnected : public std::runtime_error { public: inline RedisDisconnected() : runtime_error("") { } }; class RedisProtocolError : public std::runtime_error { protected: inline RedisProtocolError() : runtime_error("") { } }; class BadRedisType : public RedisProtocolError { public: inline BadRedisType(char type) : m_What{type, 0} { } virtual const char * what() const noexcept override { return m_What; } private: char m_What[2]; }; class BadRedisInt : public RedisProtocolError { public: inline BadRedisInt(std::vector intStr) : m_What(std::move(intStr)) { m_What.emplace_back(0); } virtual const char * what() const noexcept override { return m_What.data(); } private: std::vector m_What; }; template RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) { if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { return ReadRESP(*strm, yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; } throw; } } template void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) { if (!stream) { throw RedisDisconnected(); } auto strm (stream); try { WriteRESP(*strm, query, yc); strm->async_flush(yc); } catch (const boost::coroutines::detail::forced_unwind&) { throw; } catch (...) { if (m_Connecting.exchange(false)) { m_Connected.store(false); stream = nullptr; } throw; } } template Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) { namespace asio = boost::asio; char type = 0; asio::async_read(stream, asio::mutable_buffer(&type, 1), yc); switch (type) { case '+': { auto buf (ReadLine(stream, yc)); return String(buf.begin(), buf.end()); } case '-': { auto buf (ReadLine(stream, yc)); return new RedisError(String(buf.begin(), buf.end())); } case ':': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } return (double)i; } case '$': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } if (i < 0) { return Value(); } buf.clear(); buf.insert(buf.end(), i, 0); asio::async_read(stream, asio::mutable_buffer(buf.data(), buf.size()), yc); { char crlf[2]; asio::async_read(stream, asio::mutable_buffer(crlf, 2), yc); } return String(buf.begin(), buf.end()); } case '*': { auto buf (ReadLine(stream, yc, 21)); intmax_t i = 0; try { i = boost::lexical_cast(boost::string_view(buf.data(), buf.size())); } catch (...) { throw BadRedisInt(std::move(buf)); } Array::Ptr arr = new Array(); if (i < 0) { i = 0; } arr->Reserve(i); for (; i; --i) { arr->Add(ReadRESP(stream, yc)); } return arr; } default: throw BadRedisType(type); } } template std::vector RedisConnection::ReadLine(AsyncReadStream& stream, boost::asio::yield_context& yc, size_t hint) { namespace asio = boost::asio; std::vector line; line.reserve(hint); char next = 0; asio::mutable_buffer buf (&next, 1); for (;;) { asio::async_read(stream, buf, yc); if (next == '\r') { asio::async_read(stream, buf, yc); return std::move(line); } line.emplace_back(next); } } template void RedisConnection::WriteRESP(AsyncWriteStream& stream, const Query& query, boost::asio::yield_context& yc) { namespace asio = boost::asio; asio::async_write(stream, asio::const_buffer("*", 1), yc); WriteInt(stream, query.size(), yc); asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); for (auto& arg : query) { asio::async_write(stream, asio::const_buffer("$", 1), yc); WriteInt(stream, arg.GetLength(), yc); asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); asio::async_write(stream, asio::const_buffer(arg.CStr(), arg.GetLength()), yc); asio::async_write(stream, asio::const_buffer("\r\n", 2), yc); } } template void RedisConnection::WriteInt(AsyncWriteStream& stream, intmax_t i, boost::asio::yield_context& yc) { namespace asio = boost::asio; char buf[21] = {}; sprintf(buf, "%jd", i); asio::async_write(stream, asio::const_buffer(buf, strlen(buf)), yc); } } #endif //REDISCONNECTION_H