diff --git a/lib/redis/redisconnection.cpp b/lib/redis/redisconnection.cpp index 080b64c4c..86187d2fc 100644 --- a/lib/redis/redisconnection.cpp +++ b/lib/redis/redisconnection.cpp @@ -206,6 +206,7 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) if (!m_Queues.FireAndForgetQuery.empty()) { auto item (std::move(m_Queues.FireAndForgetQuery.front())); m_Queues.FireAndForgetQuery.pop(); + WriteOne(item, yc); if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore}); @@ -215,14 +216,16 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) m_QueuedReads.Set(); writtenAll = false; - - WriteOne(item, yc); } if (!m_Queues.FireAndForgetQueries.empty()) { auto item (std::move(m_Queues.FireAndForgetQueries.front())); m_Queues.FireAndForgetQueries.pop(); + for (auto& query : item) { + WriteOne(query, yc); + } + if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) { m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore}); } else { @@ -231,15 +234,13 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) m_QueuedReads.Set(); writtenAll = false; - - for (auto& query : item) { - WriteOne(query, yc); - } } if (!m_Queues.GetResultOfQuery.empty()) { auto item (std::move(m_Queues.GetResultOfQuery.front())); m_Queues.GetResultOfQuery.pop(); + WriteOne(item.first, yc); + m_Queues.ReplyPromises.emplace(std::move(item.second)); if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) { @@ -250,33 +251,25 @@ void RedisConnection::WriteLoop(asio::yield_context& yc) m_QueuedReads.Set(); writtenAll = false; - - WriteOne(item.first, yc); } if (!m_Queues.GetResultsOfQueries.empty()) { auto item (std::move(m_Queues.GetResultsOfQueries.front())); m_Queues.GetResultsOfQueries.pop(); + + for (auto& query : item.first) { + WriteOne(query, yc); + } + m_Queues.RepliesPromises.emplace(std::move(item.second)); m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk}); m_QueuedReads.Set(); writtenAll = false; - - for (auto& query : item.first) { - WriteOne(query, yc); - } } } while (!writtenAll); m_QueuedWrites.Clear(); - - if (m_Path.IsEmpty()) { - m_TcpConn->async_flush(yc); - } else { - m_UnixConn->async_flush(yc); - } - } } @@ -293,7 +286,9 @@ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_contex { if (m_Path.IsEmpty()) { WriteRESP(*m_TcpConn, query, yc); + m_TcpConn->async_flush(yc); } else { WriteRESP(*m_UnixConn, query, yc); + m_UnixConn->async_flush(yc); } }