mirror of https://github.com/Icinga/icinga2.git
RedisConnection: don't read the response before the request has been written
refs #50
This commit is contained in:
parent
132b2dcb77
commit
fa5c9ac985
|
@ -206,6 +206,7 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
||||||
if (!m_Queues.FireAndForgetQuery.empty()) {
|
if (!m_Queues.FireAndForgetQuery.empty()) {
|
||||||
auto item (std::move(m_Queues.FireAndForgetQuery.front()));
|
auto item (std::move(m_Queues.FireAndForgetQuery.front()));
|
||||||
m_Queues.FireAndForgetQuery.pop();
|
m_Queues.FireAndForgetQuery.pop();
|
||||||
|
WriteOne(item, yc);
|
||||||
|
|
||||||
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
|
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
|
||||||
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
|
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
|
||||||
|
@ -215,14 +216,16 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
||||||
|
|
||||||
m_QueuedReads.Set();
|
m_QueuedReads.Set();
|
||||||
writtenAll = false;
|
writtenAll = false;
|
||||||
|
|
||||||
WriteOne(item, yc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!m_Queues.FireAndForgetQueries.empty()) {
|
if (!m_Queues.FireAndForgetQueries.empty()) {
|
||||||
auto item (std::move(m_Queues.FireAndForgetQueries.front()));
|
auto item (std::move(m_Queues.FireAndForgetQueries.front()));
|
||||||
m_Queues.FireAndForgetQueries.pop();
|
m_Queues.FireAndForgetQueries.pop();
|
||||||
|
|
||||||
|
for (auto& query : item) {
|
||||||
|
WriteOne(query, yc);
|
||||||
|
}
|
||||||
|
|
||||||
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
|
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
|
||||||
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
|
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
|
||||||
} else {
|
} else {
|
||||||
|
@ -231,15 +234,13 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
||||||
|
|
||||||
m_QueuedReads.Set();
|
m_QueuedReads.Set();
|
||||||
writtenAll = false;
|
writtenAll = false;
|
||||||
|
|
||||||
for (auto& query : item) {
|
|
||||||
WriteOne(query, yc);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!m_Queues.GetResultOfQuery.empty()) {
|
if (!m_Queues.GetResultOfQuery.empty()) {
|
||||||
auto item (std::move(m_Queues.GetResultOfQuery.front()));
|
auto item (std::move(m_Queues.GetResultOfQuery.front()));
|
||||||
m_Queues.GetResultOfQuery.pop();
|
m_Queues.GetResultOfQuery.pop();
|
||||||
|
WriteOne(item.first, yc);
|
||||||
|
|
||||||
m_Queues.ReplyPromises.emplace(std::move(item.second));
|
m_Queues.ReplyPromises.emplace(std::move(item.second));
|
||||||
|
|
||||||
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
|
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
|
||||||
|
@ -250,33 +251,25 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
||||||
|
|
||||||
m_QueuedReads.Set();
|
m_QueuedReads.Set();
|
||||||
writtenAll = false;
|
writtenAll = false;
|
||||||
|
|
||||||
WriteOne(item.first, yc);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!m_Queues.GetResultsOfQueries.empty()) {
|
if (!m_Queues.GetResultsOfQueries.empty()) {
|
||||||
auto item (std::move(m_Queues.GetResultsOfQueries.front()));
|
auto item (std::move(m_Queues.GetResultsOfQueries.front()));
|
||||||
m_Queues.GetResultsOfQueries.pop();
|
m_Queues.GetResultsOfQueries.pop();
|
||||||
|
|
||||||
|
for (auto& query : item.first) {
|
||||||
|
WriteOne(query, yc);
|
||||||
|
}
|
||||||
|
|
||||||
m_Queues.RepliesPromises.emplace(std::move(item.second));
|
m_Queues.RepliesPromises.emplace(std::move(item.second));
|
||||||
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
|
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
|
||||||
|
|
||||||
m_QueuedReads.Set();
|
m_QueuedReads.Set();
|
||||||
writtenAll = false;
|
writtenAll = false;
|
||||||
|
|
||||||
for (auto& query : item.first) {
|
|
||||||
WriteOne(query, yc);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} while (!writtenAll);
|
} while (!writtenAll);
|
||||||
|
|
||||||
m_QueuedWrites.Clear();
|
m_QueuedWrites.Clear();
|
||||||
|
|
||||||
if (m_Path.IsEmpty()) {
|
|
||||||
m_TcpConn->async_flush(yc);
|
|
||||||
} else {
|
|
||||||
m_UnixConn->async_flush(yc);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -293,7 +286,9 @@ void RedisConnection::WriteOne(RedisConnection::Query& query, asio::yield_contex
|
||||||
{
|
{
|
||||||
if (m_Path.IsEmpty()) {
|
if (m_Path.IsEmpty()) {
|
||||||
WriteRESP(*m_TcpConn, query, yc);
|
WriteRESP(*m_TcpConn, query, yc);
|
||||||
|
m_TcpConn->async_flush(yc);
|
||||||
} else {
|
} else {
|
||||||
WriteRESP(*m_UnixConn, query, yc);
|
WriteRESP(*m_UnixConn, query, yc);
|
||||||
|
m_UnixConn->async_flush(yc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue