mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-25 22:54:57 +02:00
parent
430c769371
commit
3b61b3d37a
@ -70,8 +70,28 @@ bool RedisConnection::IsConnected() {
|
|||||||
return m_Connected.load();
|
return m_Connected.load();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline
|
||||||
|
void LogQuery(RedisConnection::Query& query, Log& msg)
|
||||||
|
{
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
for (auto& arg : query) {
|
||||||
|
if (++i == 8) {
|
||||||
|
msg << " ...";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
msg << " '" << arg << '\'';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
|
void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
Log msg (LogNotice, "RedisWriter", "Firing and forgetting query:");
|
||||||
|
LogQuery(query, msg);
|
||||||
|
}
|
||||||
|
|
||||||
auto item (std::make_shared<decltype(m_Queues.FireAndForgetQuery)::value_type>(std::move(query)));
|
auto item (std::make_shared<decltype(m_Queues.FireAndForgetQuery)::value_type>(std::move(query)));
|
||||||
|
|
||||||
asio::post(m_Strand, [this, item]() {
|
asio::post(m_Strand, [this, item]() {
|
||||||
@ -82,6 +102,11 @@ void RedisConnection::FireAndForgetQuery(RedisConnection::Query query)
|
|||||||
|
|
||||||
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
|
void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
|
||||||
{
|
{
|
||||||
|
for (auto& query : queries) {
|
||||||
|
Log msg (LogNotice, "RedisWriter", "Firing and forgetting query:");
|
||||||
|
LogQuery(query, msg);
|
||||||
|
}
|
||||||
|
|
||||||
auto item (std::make_shared<decltype(m_Queues.FireAndForgetQueries)::value_type>(std::move(queries)));
|
auto item (std::make_shared<decltype(m_Queues.FireAndForgetQueries)::value_type>(std::move(queries)));
|
||||||
|
|
||||||
asio::post(m_Strand, [this, item]() {
|
asio::post(m_Strand, [this, item]() {
|
||||||
@ -92,6 +117,11 @@ void RedisConnection::FireAndForgetQueries(RedisConnection::Queries queries)
|
|||||||
|
|
||||||
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query)
|
RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query query)
|
||||||
{
|
{
|
||||||
|
{
|
||||||
|
Log msg (LogNotice, "RedisWriter", "Executing query:");
|
||||||
|
LogQuery(query, msg);
|
||||||
|
}
|
||||||
|
|
||||||
std::promise<Reply> promise;
|
std::promise<Reply> promise;
|
||||||
auto future (promise.get_future());
|
auto future (promise.get_future());
|
||||||
auto item (std::make_shared<decltype(m_Queues.GetResultOfQuery)::value_type>(std::move(query), std::move(promise)));
|
auto item (std::make_shared<decltype(m_Queues.GetResultOfQuery)::value_type>(std::move(query), std::move(promise)));
|
||||||
@ -108,6 +138,11 @@ RedisConnection::Reply RedisConnection::GetResultOfQuery(RedisConnection::Query
|
|||||||
|
|
||||||
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries)
|
RedisConnection::Replies RedisConnection::GetResultsOfQueries(RedisConnection::Queries queries)
|
||||||
{
|
{
|
||||||
|
for (auto& query : queries) {
|
||||||
|
Log msg (LogNotice, "RedisWriter", "Executing query:");
|
||||||
|
LogQuery(query, msg);
|
||||||
|
}
|
||||||
|
|
||||||
std::promise<Replies> promise;
|
std::promise<Replies> promise;
|
||||||
auto future (promise.get_future());
|
auto future (promise.get_future());
|
||||||
auto item (std::make_shared<decltype(m_Queues.GetResultsOfQueries)::value_type>(std::move(queries), std::move(promise)));
|
auto item (std::make_shared<decltype(m_Queues.GetResultsOfQueries)::value_type>(std::move(queries), std::move(promise)));
|
||||||
@ -140,6 +175,8 @@ void RedisConnection::Connect(asio::yield_context& yc)
|
|||||||
}
|
}
|
||||||
|
|
||||||
m_Connected.store(true);
|
m_Connected.store(true);
|
||||||
|
|
||||||
|
Log(LogInformation, "RedisWriter", "Connected to Redis server");
|
||||||
} catch (const boost::coroutines::detail::forced_unwind&) {
|
} catch (const boost::coroutines::detail::forced_unwind&) {
|
||||||
throw;
|
throw;
|
||||||
} catch (const std::exception& ex) {
|
} catch (const std::exception& ex) {
|
||||||
@ -165,7 +202,13 @@ void RedisConnection::ReadLoop(asio::yield_context& yc)
|
|||||||
}
|
}
|
||||||
} catch (const boost::coroutines::detail::forced_unwind&) {
|
} catch (const boost::coroutines::detail::forced_unwind&) {
|
||||||
throw;
|
throw;
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log(LogCritical, "RedisWriter")
|
||||||
|
<< "Error during receiving the response to a query which has been fired and forgotten: " << ex.what();
|
||||||
|
continue;
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
Log(LogCritical, "RedisWriter")
|
||||||
|
<< "Error during receiving the response to a query which has been fired and forgotten";
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
@ -234,7 +277,15 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
|||||||
WriteOne(item, yc);
|
WriteOne(item, yc);
|
||||||
} catch (const boost::coroutines::detail::forced_unwind&) {
|
} catch (const boost::coroutines::detail::forced_unwind&) {
|
||||||
throw;
|
throw;
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log msg (LogCritical, "RedisWriter", "Error during sending query");
|
||||||
|
LogQuery(item, msg);
|
||||||
|
msg << " which has been fired and forgotten: " << ex.what();
|
||||||
|
continue;
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
Log msg (LogCritical, "RedisWriter", "Error during sending query");
|
||||||
|
LogQuery(item, msg);
|
||||||
|
msg << " which has been fired and forgotten";
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,14 +302,24 @@ void RedisConnection::WriteLoop(asio::yield_context& yc)
|
|||||||
if (!m_Queues.FireAndForgetQueries.empty()) {
|
if (!m_Queues.FireAndForgetQueries.empty()) {
|
||||||
auto item (std::move(m_Queues.FireAndForgetQueries.front()));
|
auto item (std::move(m_Queues.FireAndForgetQueries.front()));
|
||||||
m_Queues.FireAndForgetQueries.pop();
|
m_Queues.FireAndForgetQueries.pop();
|
||||||
|
size_t i = 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (auto& query : item) {
|
for (auto& query : item) {
|
||||||
WriteOne(query, yc);
|
WriteOne(query, yc);
|
||||||
|
++i;
|
||||||
}
|
}
|
||||||
} catch (const boost::coroutines::detail::forced_unwind&) {
|
} catch (const boost::coroutines::detail::forced_unwind&) {
|
||||||
throw;
|
throw;
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log msg (LogCritical, "RedisWriter", "Error during sending query");
|
||||||
|
LogQuery(item[i], msg);
|
||||||
|
msg << " which has been fired and forgotten: " << ex.what();
|
||||||
|
continue;
|
||||||
} catch (...) {
|
} catch (...) {
|
||||||
|
Log msg (LogCritical, "RedisWriter", "Error during sending query");
|
||||||
|
LogQuery(item[i], msg);
|
||||||
|
msg << " which has been fired and forgotten";
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user