RedisConnection: handle errors as expected

refs #50
This commit is contained in:
Alexander A. Klimov 2019-08-08 17:42:31 +02:00 committed by Michael Friedrich
parent 06d88477bd
commit 430c769371
2 changed files with 111 additions and 37 deletions

View File

@ -46,12 +46,19 @@ RedisConnection::RedisConnection(const String host, const int port, const String
RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db) RedisConnection::RedisConnection(boost::asio::io_context& io, String host, int port, String path, String password, int db)
: m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db), : m_Host(std::move(host)), m_Port(port), m_Path(std::move(path)), m_Password(std::move(password)), m_DbIndex(db),
m_Connecting(false), m_Connected(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io) m_Connecting(false), m_Connected(false), m_Started(false), m_Strand(io), m_QueuedWrites(io), m_QueuedReads(io)
{ {
} }
void RedisConnection::Start() void RedisConnection::Start()
{ {
if (!m_Started.exchange(true)) {
Ptr keepAlive (this);
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
}
if (!m_Connecting.exchange(true)) { if (!m_Connecting.exchange(true)) {
Ptr keepAlive (this); Ptr keepAlive (this);
@ -117,28 +124,19 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
void RedisConnection::Connect(asio::yield_context& yc) void RedisConnection::Connect(asio::yield_context& yc)
{ {
Defer notConnecting ([this]() { Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
if (!m_Connected.load()) {
m_Connecting.store(false);
}
});
Log(LogInformation, "RedisWriter", "Trying to connect to Redis server (async)"); Log(LogInformation, "RedisWriter", "Trying to connect to Redis server (async)");
try { try {
if (m_Path.IsEmpty()) { if (m_Path.IsEmpty()) {
m_TcpConn = decltype(m_TcpConn)(new TcpConn(m_Strand.context())); decltype(m_TcpConn) conn (new TcpConn(m_Strand.context()));
icinga::Connect(m_TcpConn->next_layer(), m_Host, Convert::ToString(m_Port), yc); icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
m_TcpConn = std::move(conn);
} else { } else {
m_UnixConn = decltype(m_UnixConn)(new UnixConn(m_Strand.context())); decltype(m_UnixConn) conn (new UnixConn(m_Strand.context()));
m_UnixConn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc); conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
} m_UnixConn = std::move(conn);
{
Ptr keepAlive (this);
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { ReadLoop(yc); });
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { WriteLoop(yc); });
} }
m_Connected.store(true); m_Connected.store(true);
@ -155,15 +153,21 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
for (;;) { for (;;) {
m_QueuedReads.Wait(yc); m_QueuedReads.Wait(yc);
do { while (!m_Queues.FutureResponseActions.empty()) {
auto item (std::move(m_Queues.FutureResponseActions.front())); auto item (std::move(m_Queues.FutureResponseActions.front()));
m_Queues.FutureResponseActions.pop(); m_Queues.FutureResponseActions.pop();
switch (item.Action) { switch (item.Action) {
case ResponseAction::Ignore: case ResponseAction::Ignore:
try {
for (auto i (item.Amount); i; --i) { for (auto i (item.Amount); i; --i) {
ReadOne(yc); ReadOne(yc);
} }
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
continue;
}
break; break;
case ResponseAction::Deliver: case ResponseAction::Deliver:
for (auto i (item.Amount); i; --i) { for (auto i (item.Amount); i; --i) {
@ -178,7 +182,7 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
throw; throw;
} catch (...) { } catch (...) {
promise.set_exception(std::current_exception()); promise.set_exception(std::current_exception());
throw; continue;
} }
promise.set_value(std::move(reply)); promise.set_value(std::move(reply));
@ -199,14 +203,14 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
throw; throw;
} catch (...) { } catch (...) {
promise.set_exception(std::current_exception()); promise.set_exception(std::current_exception());
throw; continue;
} }
} }
promise.set_value(std::move(replies)); promise.set_value(std::move(replies));
} }
} }
} while (!m_Queues.FutureResponseActions.empty()); }
m_QueuedReads.Clear(); m_QueuedReads.Clear();
} }
@ -225,7 +229,14 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
if (!m_Queues.FireAndForgetQuery.empty()) { if (!m_Queues.FireAndForgetQuery.empty()) {
auto item (std::move(m_Queues.FireAndForgetQuery.front())); auto item (std::move(m_Queues.FireAndForgetQuery.front()));
m_Queues.FireAndForgetQuery.pop(); m_Queues.FireAndForgetQuery.pop();
try {
WriteOne(item, yc); WriteOne(item, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
continue;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
@ -241,9 +252,15 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
auto item (std::move(m_Queues.FireAndForgetQueries.front())); auto item (std::move(m_Queues.FireAndForgetQueries.front()));
m_Queues.FireAndForgetQueries.pop(); m_Queues.FireAndForgetQueries.pop();
try {
for (auto& query : item) { for (auto& query : item) {
WriteOne(query, yc); WriteOne(query, yc);
} }
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
continue;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore}); m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
@ -265,7 +282,7 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
throw; throw;
} catch (...) { } catch (...) {
item.second.set_exception(std::current_exception()); item.second.set_exception(std::current_exception());
throw; continue;
} }
m_Queues.ReplyPromises.emplace(std::move(item.second)); m_Queues.ReplyPromises.emplace(std::move(item.second));
@ -292,7 +309,7 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
throw; throw;
} catch (...) { } catch (...) {
item.second.set_exception(std::current_exception()); item.second.set_exception(std::current_exception());
throw; continue;
} }
m_Queues.RepliesPromises.emplace(std::move(item.second)); m_Queues.RepliesPromises.emplace(std::move(item.second));
@ -310,19 +327,17 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc) RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
{ {
if (m_Path.IsEmpty()) { if (m_Path.IsEmpty()) {
return ReadRESP(*m_TcpConn, yc); return ReadOne(m_TcpConn, yc);
} else { } else {
return ReadRESP(*m_UnixConn, yc); return ReadOne(m_UnixConn, yc);
} }
} }
void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc) void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_context& yc)
{ {
if (m_Path.IsEmpty()) { if (m_Path.IsEmpty()) {
WriteRESP(*m_TcpConn, query, yc); WriteOne(m_TcpConn, query, yc);
m_TcpConn->async_flush(yc);
} else { } else {
WriteRESP(*m_UnixConn, query, yc); WriteOne(m_UnixConn, query, yc);
m_UnixConn->async_flush(yc);
} }
} }

View File

@ -117,6 +117,12 @@ namespace icinga
Reply ReadOne(boost::asio::yield_context& yc); Reply ReadOne(boost::asio::yield_context& yc);
void WriteOne(Query& query, boost::asio::yield_context& yc); void WriteOne(Query& query, boost::asio::yield_context& yc);
template<class StreamPtr>
Reply ReadOne(StreamPtr& stream, boost::asio::yield_context& yc);
template<class StreamPtr>
void WriteOne(StreamPtr& stream, Query& query, boost::asio::yield_context& yc);
String m_Path; String m_Path;
String m_Host; String m_Host;
int m_Port; int m_Port;
@ -124,9 +130,9 @@ namespace icinga
int m_DbIndex; int m_DbIndex;
boost::asio::io_context::strand m_Strand; boost::asio::io_context::strand m_Strand;
std::unique_ptr<TcpConn> m_TcpConn; std::shared_ptr<TcpConn> m_TcpConn;
std::unique_ptr<UnixConn> m_UnixConn; std::shared_ptr<UnixConn> m_UnixConn;
Atomic<bool> m_Connecting, m_Connected; Atomic<bool> m_Connecting, m_Connected, m_Started;
struct { struct {
std::queue<Query> FireAndForgetQuery; std::queue<Query> FireAndForgetQuery;
@ -159,6 +165,14 @@ private:
String m_Message; String m_Message;
}; };
class RedisDisconnected : public std::runtime_error
{
public:
inline RedisDisconnected() : runtime_error("")
{
}
};
class RedisProtocolError : public std::runtime_error class RedisProtocolError : public std::runtime_error
{ {
protected: protected:
@ -200,6 +214,51 @@ private:
std::vector<char> m_What; std::vector<char> m_What;
}; };
template<class StreamPtr>
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
{
if (!stream) {
throw RedisDisconnected();
}
auto strm (stream);
try {
return ReadRESP(*strm, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
if (m_Connecting.exchange(false)) {
m_Connected.store(false);
stream = nullptr;
}
throw;
}
}
template<class StreamPtr>
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
{
if (!stream) {
throw RedisDisconnected();
}
auto strm (stream);
try {
WriteRESP(*strm, query, yc);
strm->async_flush(yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
if (m_Connecting.exchange(false)) {
m_Connected.store(false);
stream = nullptr;
}
throw;
}
}
template<class AsyncReadStream> template<class AsyncReadStream>
Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc) Value RedisConnection::ReadRESP(AsyncReadStream& stream, boost::asio::yield_context& yc)
{ {