RedisConnection: introduce high-priority queue

refs #57
This commit is contained in:
Alexander A. Klimov 2019-10-31 14:28:27 +01:00 committed by Michael Friedrich
parent 91ecfc35cf
commit 67909210a6
2 changed files with 144 additions and 127 deletions

View File

@ -85,7 +85,7 @@ void LogQuery(RedisConnection::Query& query, Log& msg)
}
}
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query, bool highPrio)
{
{
Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:");
@ -94,13 +94,13 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQuery)::element_type>(std::move(query)));
asio::post(m_Strand, [this, item]() {
m_Queues.Writes.emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{item, nullptr, nullptr, nullptr});
m_QueuedWrites.Set();
});
}
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries, bool highPrio)
{
for (auto& query : queries) {
Log msg (LogNotice, "IcingaDB", "Firing and forgetting query:");
@ -109,13 +109,13 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
auto item (std::make_shared<decltype(WriteQueueItem().FireAndForgetQueries)::element_type>(std::move(queries)));
asio::post(m_Strand, [this, item]() {
m_Queues.Writes.emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, item, nullptr, nullptr});
m_QueuedWrites.Set();
});
}
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query)
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query, bool highPrio)
{
{
Log msg (LogNotice, "IcingaDB", "Executing query:");
@ -126,8 +126,8 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
auto future (promise.get_future());
auto item (std::make_shared<decltype(WriteQueueItem().GetResultOfQuery)::element_type>(std::move(query), std::move(promise)));
asio::post(m_Strand, [this, item]() {
m_Queues.Writes.emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, item, nullptr});
m_QueuedWrites.Set();
});
@ -136,7 +136,7 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
return future.get();
}
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries)
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries, bool highPrio)
{
for (auto& query : queries) {
Log msg (LogNotice, "IcingaDB", "Executing query:");
@ -147,8 +147,8 @@ RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Q
auto future (promise.get_future());
auto item (std::make_shared<decltype(WriteQueueItem().GetResultsOfQueries)::element_type>(std::move(queries), std::move(promise)));
asio::post(m_Strand, [this, item]() {
m_Queues.Writes.emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
asio::post(m_Strand, [this, item, highPrio]() {
(highPrio ? &m_Queues.HighPrioWrites : &m_Queues.Writes)->emplace(WriteQueueItem{nullptr, nullptr, nullptr, item});
m_QueuedWrites.Set();
});
@ -264,111 +264,21 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
for (;;) {
m_QueuedWrites.Wait(yc);
while (!m_Queues.Writes.empty()) {
auto next (std::move(m_Queues.Writes.front()));
m_Queues.Writes.pop();
if (next.FireAndForgetQuery) {
auto& item (*next.FireAndForgetQuery);
try {
WriteOne(item, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten: " << ex.what();
continue;
} catch (...) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten";
continue;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
for (;;) {
if (m_Queues.HighPrioWrites.empty()) {
if (m_Queues.Writes.empty()) {
break;
} else {
++m_Queues.FutureResponseActions.back().Amount;
auto next (std::move(m_Queues.Writes.front()));
m_Queues.Writes.pop();
WriteItem(yc, std::move(next));
}
} else {
auto next (std::move(m_Queues.HighPrioWrites.front()));
m_Queues.HighPrioWrites.pop();
m_QueuedReads.Set();
}
if (next.FireAndForgetQueries) {
auto& item (*next.FireAndForgetQueries);
size_t i = 0;
try {
for (auto& query : item) {
WriteOne(query, yc);
++i;
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten: " << ex.what();
continue;
} catch (...) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten";
continue;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
} else {
m_Queues.FutureResponseActions.back().Amount += item.size();
}
m_QueuedReads.Set();
}
if (next.GetResultOfQuery) {
auto& item (*next.GetResultOfQuery);
try {
WriteOne(item.first, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
continue;
}
m_Queues.ReplyPromises.emplace(std::move(item.second));
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
}
if (next.GetResultsOfQueries) {
auto& item (*next.GetResultsOfQueries);
try {
for (auto& query : item.first) {
WriteOne(query, yc);
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
continue;
}
m_Queues.RepliesPromises.emplace(std::move(item.second));
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
m_QueuedReads.Set();
WriteItem(yc, std::move(next));
}
}
@ -376,6 +286,112 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
}
}
void RedisConnection::WriteItem(boost::asio::yield_context& yc, RedisConnection::WriteQueueItem next)
{
if (next.FireAndForgetQuery) {
auto& item (*next.FireAndForgetQuery);
try {
WriteOne(item, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten: " << ex.what();
return;
} catch (...) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item, msg);
msg << " which has been fired and forgotten";
return;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Ignore});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
}
if (next.FireAndForgetQueries) {
auto& item (*next.FireAndForgetQueries);
size_t i = 0;
try {
for (auto& query : item) {
WriteOne(query, yc);
++i;
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (const std::exception& ex) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten: " << ex.what();
return;
} catch (...) {
Log msg (LogCritical, "IcingaDB", "Error during sending query");
LogQuery(item[i], msg);
msg << " which has been fired and forgotten";
return;
}
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Ignore) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.size(), ResponseAction::Ignore});
} else {
m_Queues.FutureResponseActions.back().Amount += item.size();
}
m_QueuedReads.Set();
}
if (next.GetResultOfQuery) {
auto& item (*next.GetResultOfQuery);
try {
WriteOne(item.first, yc);
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
return;
}
m_Queues.ReplyPromises.emplace(std::move(item.second));
if (m_Queues.FutureResponseActions.empty() || m_Queues.FutureResponseActions.back().Action != ResponseAction::Deliver) {
m_Queues.FutureResponseActions.emplace(FutureResponseAction{1, ResponseAction::Deliver});
} else {
++m_Queues.FutureResponseActions.back().Amount;
}
m_QueuedReads.Set();
}
if (next.GetResultsOfQueries) {
auto& item (*next.GetResultsOfQueries);
try {
for (auto& query : item.first) {
WriteOne(query, yc);
}
} catch (const boost::coroutines::detail::forced_unwind&) {
throw;
} catch (...) {
item.second.set_exception(std::current_exception());
return;
}
m_Queues.RepliesPromises.emplace(std::move(item.second));
m_Queues.FutureResponseActions.emplace(FutureResponseAction{item.first.size(), ResponseAction::DeliverBulk});
m_QueuedReads.Set();
}
}
RedisConnection::Reply RedisConnection::ReadOne(boost::asio::yield_context& yc)
{
if (m_Path.IsEmpty()) {

View File

@ -73,11 +73,11 @@ namespace icinga
bool IsConnected();
void FireAndForgetQuery(Query query);
void FireAndForgetQueries(Queries queries);
void FireAndForgetQuery(Query query, bool highPrio = false);
void FireAndForgetQueries(Queries queries, bool highPrio = false);
Reply GetResultOfQuery(Query query);
Replies GetResultsOfQueries(Queries queries);
Reply GetResultOfQuery(Query query, bool highPrio = false);
Replies GetResultsOfQueries(Queries queries, bool highPrio = false);
private:
enum class ResponseAction : unsigned char
@ -91,6 +91,14 @@ namespace icinga
ResponseAction Action;
};
struct WriteQueueItem
{
std::shared_ptr<Query> FireAndForgetQuery;
std::shared_ptr<Queries> FireAndForgetQueries;
std::shared_ptr<std::pair<Query, std::promise<Reply>>> GetResultOfQuery;
std::shared_ptr<std::pair<Queries, std::promise<Replies>>> GetResultsOfQueries;
};
typedef boost::asio::ip::tcp Tcp;
typedef boost::asio::local::stream_protocol Unix;
@ -114,6 +122,7 @@ namespace icinga
void Connect(boost::asio::yield_context& yc);
void ReadLoop(boost::asio::yield_context& yc);
void WriteLoop(boost::asio::yield_context& yc);
void WriteItem(boost::asio::yield_context& yc, WriteQueueItem item);
Reply ReadOne(boost::asio::yield_context& yc);
void WriteOne(Query& query, boost::asio::yield_context& yc);
@ -134,16 +143,8 @@ namespace icinga
std::shared_ptr<UnixConn> m_UnixConn;
Atomic<bool> m_Connecting, m_Connected, m_Started;
struct WriteQueueItem
{
std::shared_ptr<Query> FireAndForgetQuery;
std::shared_ptr<Queries> FireAndForgetQueries;
std::shared_ptr<std::pair<Query, std::promise<Reply>>> GetResultOfQuery;
std::shared_ptr<std::pair<Queries, std::promise<Replies>>> GetResultsOfQueries;
};
struct {
std::queue<WriteQueueItem> Writes;
std::queue<WriteQueueItem> Writes, HighPrioWrites;
std::queue<std::promise<Reply>> ReplyPromises;
std::queue<std::promise<Replies>> RepliesPromises;
std::queue<FutureResponseAction> FutureResponseActions;