RedisConnection: merge write queues

refs #50
This commit is contained in:
Alexander A. Klimov 2019-09-03 15:12:12 +02:00 committed by Michael Friedrich
parent 3b61b3d37a
commit 1d3109458d
2 changed files with 29 additions and 33 deletions

View File

@ -92,10 +92,10 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
LogQuery(query, msg);
}
auto item (std::make_shared<decltype(m_Queues.FireAndForgetQuery)::value_type>(std::move(query)));
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQuery)::element_type>(std::move(query)));
asio::post(m_Strand, [this, item]() {
m_Queues.FireAndForgetQuery.emplace(std::move(*item));
m_Queues.Writes.emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
m_QueuedWrites.Set();
});
}
@ -107,10 +107,10 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
LogQuery(query, msg);
}
auto item (std::make_shared<decltype(m_Queues.FireAndForgetQueries)::value_type>(std::move(queries)));
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQueries)::element_type>(std::move(queries)));
asio::post(m_Strand, [this, item]() {
m_Queues.FireAndForgetQueries.emplace(std::move(*item));
m_Queues.Writes.emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
m_QueuedWrites.Set();
});
}
@ -124,10 +124,10 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
std::promise<Reply> promise;
auto future (promise.get_future());
auto item (std::make_shared<decltype(m_Queues.GetResultOfQuery)::value_type>(std::move(query), std::move(promise)));
auto item (std::make_shared<decltype(WriteQueueItem().GetResultOfQuery)::element_type>(std::move(query), std::move(promise)));
asio::post(m_Strand, [this, item]() {
m_Queues.GetResultOfQuery.emplace(std::move(*item));
m_Queues.Writes.emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
m_QueuedWrites.Set();
});
@ -145,10 +145,10 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
std::promise<Replies> promise;
auto future (promise.get_future());
auto item (std::make_shared<decltype(m_Queues.GetResultsOfQueries)::value_type>(std::move(queries), std::move(promise)));
auto item (std::make_shared<decltype(WriteQueueItem().GetResultsOfQueries)::element_type>(std::move(queries), std::move(promise)));
asio::post(m_Strand, [this, item]() {
m_Queues.GetResultsOfQueries.emplace(std::move(*item));
m_Queues.Writes.emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
m_QueuedWrites.Set();
});
@ -264,14 +264,12 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
for (;;) {
m_QueuedWrites.Wait(yc);
bool writtenAll = true;
while (!m_Queues.Writes.empty()) {
auto next (std::move(m_Queues.Writes.front()));
m_Queues.Writes.pop();
do {
writtenAll = true;
if (!m_Queues.FireAndForgetQuery.empty()) {
auto item (std::move(m_Queues.FireAndForgetQuery.front()));
m_Queues.FireAndForgetQuery.pop();
if (next.FireAndForgetQuery) {
auto& item (*next.FireAndForgetQuery);
try {
WriteOne(item, yc);
@ -296,12 +294,10 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
}
m_QueuedReads.Set();
writtenAll = false;
}
if (!m_Queues.FireAndForgetQueries.empty()) {
auto item (std::move(m_Queues.FireAndForgetQueries.front()));
m_Queues.FireAndForgetQueries.pop();
if (next.FireAndForgetQueries) {
auto& item (*next.FireAndForgetQueries);
size_t i = 0;
try {
@ -330,12 +326,10 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
}
m_QueuedReads.Set();
writtenAll = false;
}
if (!m_Queues.GetResultOfQuery.empty()) {
auto item (std::move(m_Queues.GetResultOfQuery.front()));
m_Queues.GetResultOfQuery.pop();
if (next.GetResultOfQuery) {
auto& item (*next.GetResultOfQuery);
try {
WriteOne(item.first, yc);
@ -355,12 +349,10 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
}
m_QueuedReads.Set();
writtenAll = false;
}
if (!m_Queues.GetResultsOfQueries.empty()) {
auto item (std::move(m_Queues.GetResultsOfQueries.front()));
m_Queues.GetResultsOfQueries.pop();
if (next.GetResultsOfQueries) {
auto& item (*next.GetResultsOfQueries);
try {
for (auto& query : item.first) {
@ -377,9 +369,8 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
m_QueuedReads.Set();
writtenAll = false;
}
} while (!writtenAll);
}
m_QueuedWrites.Clear();
}

View File

@ -134,11 +134,16 @@ namespace icinga
std::shared_ptr<UnixConn> m_UnixConn;
Atomic<bool> m_Connecting, m_Connected, m_Started;
struct WriteQueueItem
{
std::shared_ptr<Query> FireAndForgetQuery;
std::shared_ptr<Queries> FireAndForgetQueries;
std::shared_ptr<std::pair<Query, std::promise<Reply>>> GetResultOfQuery;
std::shared_ptr<std::pair<Queries, std::promise<Replies>>> GetResultsOfQueries;
};
struct {
std::queue<Query> FireAndForgetQuery;
std::queue<Queries> FireAndForgetQueries;
std::queue<std::pair<Query, std::promise<Reply>>> GetResultOfQuery;
std::queue<std::pair<Queries, std::promise<Replies>>> GetResultsOfQueries;
std::queue<WriteQueueItem> Writes;
std::queue<std::promise<Reply>> ReplyPromises;
std::queue<std::promise<Replies>> RepliesPromises;
std::queue<FutureResponseAction> FutureResponseActions;