diff --git a/lib/redis/redisconnection.cpp b/lib/redis/redisconnection.cpp index 22c3f43da..c710920d1 100644 --- a/lib/redis/redisconnection.cpp +++ b/lib/redis/redisconnection.cpp @@ -46,12 +46,19 @@ RedisConnection::RedisConnection(const String host, const int port, const String RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db) : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db), - m_Connecting(false), m_Connected(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io) + m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io) { } void RedisConnection::Start() { + if (!m_Started.exchange(true)) { + Ptr keepAlive (this); + + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); + asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); + } + if (!m_Connecting.exchange(true)) { Ptr keepAlive (this); @@ -117,28 +124,19 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q void RedisConnection::Connect(asio::yield_context& yc) { - Defer notConnecting ([this]() { - if (!m_Connected.load()) { - m_Connecting.store(false); - } - }); + Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); }); Log(LogInformation, "RedisWriter", "Trying to connect to Redis server (async)"); try { if (m_Path.IsEmpty()) { - m_TcpConn = decltype(m_TcpConn)(new TcpConn(m_Strand.context())); - icinga::Connect(m_TcpConn->next_layer(), m_Host, Convert::ToString(m_Port), yc); + decltype(m_TcpConn) conn (new TcpConn(m_Strand.context())); + icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc); + m_TcpConn = std::move(conn); } else { - m_UnixConn = decltype(m_UnixConn)(new UnixConn(m_Strand.context())); - m_UnixConn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); - } - - { - Ptr keepAlive (this); - - asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); }); - asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); }); + decltype(m_UnixConn) conn (new UnixConn(m_Strand.context())); + conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); + m_UnixConn = std::move(conn); } m_Connected.store(true); @@ -155,14 +153,20 @@ void RedisConnection::ReadLoop(asio::yield_context& yc) for (;;) { m_QueuedReads.Wait(yc); - do { + while (!m_Queues.FutureResponseActions.empty()) { auto item (std::move(m_Queues.FutureResponseActions.front())); m_Queues.FutureResponseActions.pop(); switch (item.Action) { case ResponseAction::Ignore: - for (auto i (item.Amount); i; --i) { - ReadOne(yc); + try { + for (auto i (item.Amount); i; --i) { + ReadOne(yc); + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + continue; } break; case ResponseAction::Deliver: @@ -178,7 +182,7 @@ void RedisConnection::ReadLoop(asio::yield_context& yc) throw; } catch (...) { promise.set_exception(std::current_exception()); - throw; + continue; } promise.set_value(std::move(reply)); @@ -199,14 +203,14 @@ void RedisConnection::ReadLoop(asio::yield_context& yc) throw; } catch (...) { promise.set_exception(std::current_exception()); - throw; + continue; } } promise.set_value(std::move(replies)); } } - } while (!m_Queues.FutureResponseActions.empty()); + } m_QueuedReads.Clear(); } @@ -225,7 +229,14 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) if (!m_Queues.FireAndForgetQuery.empty()) { auto item (std::move(m_Queues.FireAndForgetQuery.front())); m_Queues.FireAndForgetQuery.pop(); - WriteOne(item, yc); + + try { + WriteOne(item, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + continue; + } if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); @@ -241,8 +252,14 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) auto item (std::move(m_Queues.FireAndForgetQueries.front())); m_Queues.FireAndForgetQueries.pop(); - for (auto& query : item) { - WriteOne(query, yc); + try { + for (auto& query : item) { + WriteOne(query, yc); + } + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + continue; } if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { @@ -265,7 +282,7 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) throw; } catch (...) { item.second.set_exception(std::current_exception()); - throw; + continue; } m_Queues.ReplyPromises.emplace(std::move(item.second)); @@ -292,7 +309,7 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) throw; } catch (...) { item.second.set_exception(std::current_exception()); - throw; + continue; } m_Queues.RepliesPromises.emplace(std::move(item.second)); @@ -310,19 +327,17 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) { if (m_Path.IsEmpty()) { - return ReadRESP(*m_TcpConn, yc); + return ReadOne(m_TcpConn, yc); } else { - return ReadRESP(*m_UnixConn, yc); + return ReadOne(m_UnixConn, yc); } } void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) { if (m_Path.IsEmpty()) { - WriteRESP(*m_TcpConn, query, yc); - m_TcpConn->async_flush(yc); + WriteOne(m_TcpConn, query, yc); } else { - WriteRESP(*m_UnixConn, query, yc); - m_UnixConn->async_flush(yc); + WriteOne(m_UnixConn, query, yc); } } diff --git a/lib/redis/redisconnection.hpp b/lib/redis/redisconnection.hpp index b6b5c005d..a39c152e6 100644 --- a/lib/redis/redisconnection.hpp +++ b/lib/redis/redisconnection.hpp @@ -117,6 +117,12 @@ namespace icinga Reply ReadOne(boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc); + template + Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc); + + template + void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc); + String m_Path; String m_Host; int m_Port; @@ -124,9 +130,9 @@ namespace icinga int m_DbIndex; boost::asio::io_context::strand m_Strand; - std::unique_ptr m_TcpConn; - std::unique_ptr m_UnixConn; - Atomic m_Connecting, m_Connected; + std::shared_ptr m_TcpConn; + std::shared_ptr m_UnixConn; + Atomic m_Connecting, m_Connected, m_Started; struct { std::queue FireAndForgetQuery; @@ -159,6 +165,14 @@ private: String m_Message; }; +class RedisDisconnected : public std::runtime_error +{ +public: + inline RedisDisconnected() : runtime_error("") + { + } +}; + class RedisProtocolError : public std::runtime_error { protected: @@ -200,6 +214,51 @@ private: std::vector m_What; }; +template +RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc) +{ + if (!stream) { + throw RedisDisconnected(); + } + + auto strm (stream); + + try { + return ReadRESP(*strm, yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + if (m_Connecting.exchange(false)) { + m_Connected.store(false); + stream = nullptr; + } + throw; + } +} + +template +void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc) +{ + if (!stream) { + throw RedisDisconnected(); + } + + auto strm (stream); + + try { + WriteRESP(*strm, query, yc); + strm->async_flush(yc); + } catch (const boost::coroutines::detail::forced_unwind&) { + throw; + } catch (...) { + if (m_Connecting.exchange(false)) { + m_Connected.store(false); + stream = nullptr; + } + throw; + } +} + template Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) {