RedisConnection: forward I/O errors to async-ly waiting requestors

refs #50
This commit is contained in:
Alexander A. Klimov 2019-08-07 14:15:01 +02:00 committed by Michael Friedrich
parent fa5c9ac985
commit 06d88477bd
1 changed files with 39 additions and 5 deletions

View File

@ -31,6 +31,7 @@
#include <boost/coroutine/exceptions.hpp>
#include <boost/utility/string_view.hpp>
#include <boost/variant/get.hpp>
#include <exception>
#include <iterator>
#include <memory>
#include <utility>
@ -169,7 +170,18 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
auto promise (std::move(m_Queues.ReplyPromises.front()));
m_Queues.ReplyPromises.pop();
promise.set_value(ReadOne(yc));
Reply reply;
try {
reply = ReadOne(yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
promise.set_exception(std::current_exception());
throw;
}
promise.set_value(std::move(reply));
}
break;
case ResponseAction::DeliverBulk:
@ -181,7 +193,14 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
replies.reserve(item.Amount);
for (auto i (item.Amount); i; --i) {
replies.emplace_back(ReadOne(yc));
try {
replies.emplace_back(ReadOne(yc));
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
promise.set_exception(std::current_exception());
throw;
}
}
promise.set_value(std::move(replies));
@ -239,7 +258,15 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
if (!m_Queues.GetResultOfQuery.empty()) {
auto item (std::move(m_Queues.GetResultOfQuery.front()));
m_Queues.GetResultOfQuery.pop();
WriteOne(item.first, yc);
try {
WriteOne(item.first, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
throw;
}
m_Queues.ReplyPromises.emplace(std::move(item.second));
@ -257,8 +284,15 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
auto item (std::move(m_Queues.GetResultsOfQueries.front()));
m_Queues.GetResultsOfQueries.pop();
for (auto& query : item.first) {
WriteOne(query, yc);
try {
for (auto& query : item.first) {
WriteOne(query, yc);
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
throw;
}
m_Queues.RepliesPromises.emplace(std::move(item.second));