mirror of
https://github.com/Icinga/icinga2.git
synced 2025-07-23 21:55:03 +02:00
Merge pull request #7643 from Icinga/bugfix/icingadb-recover
IcingaDB: handle Redis restarts
This commit is contained in:
commit
8a14f0e6f7
@ -9,9 +9,9 @@
|
|||||||
#include "base/objectlock.hpp"
|
#include "base/objectlock.hpp"
|
||||||
#include "base/string.hpp"
|
#include "base/string.hpp"
|
||||||
#include "base/tcpsocket.hpp"
|
#include "base/tcpsocket.hpp"
|
||||||
#include <boost/asio/post.hpp>
|
#include <boost/asio.hpp>
|
||||||
#include <boost/asio/spawn.hpp>
|
|
||||||
#include <boost/coroutine/exceptions.hpp>
|
#include <boost/coroutine/exceptions.hpp>
|
||||||
|
#include <boost/date_time/posix_time/posix_time_duration.hpp>
|
||||||
#include <boost/utility/string_view.hpp>
|
#include <boost/utility/string_view.hpp>
|
||||||
#include <boost/variant/get.hpp>
|
#include <boost/variant/get.hpp>
|
||||||
#include <exception>
|
#include <exception>
|
||||||
@ -144,32 +144,42 @@ void RedisConnection::Connect(asio::yield_context& yc)
|
|||||||
{
|
{
|
||||||
Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
|
Defer notConnecting ([this]() { m_Connecting.store(m_Connected.load()); });
|
||||||
|
|
||||||
try {
|
boost::asio::deadline_timer timer (m_Strand.context());
|
||||||
if (m_Path.IsEmpty()) {
|
|
||||||
Log(LogInformation, "IcingaDB")
|
|
||||||
<< "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
|
|
||||||
|
|
||||||
decltype(m_TcpConn) conn (new TcpConn(m_Strand.context()));
|
for (;;) {
|
||||||
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
|
try {
|
||||||
m_TcpConn = std::move(conn);
|
if (m_Path.IsEmpty()) {
|
||||||
} else {
|
Log(LogInformation, "IcingaDB")
|
||||||
Log(LogInformation, "IcingaDB")
|
<< "Trying to connect to Redis server (async) on host '" << m_Host << ":" << m_Port << "'";
|
||||||
<< "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
|
|
||||||
|
|
||||||
decltype(m_UnixConn) conn (new UnixConn(m_Strand.context()));
|
decltype(m_TcpConn) conn (new TcpConn(m_Strand.context()));
|
||||||
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
|
icinga::Connect(conn->next_layer(), m_Host, Convert::ToString(m_Port), yc);
|
||||||
m_UnixConn = std::move(conn);
|
m_TcpConn = std::move(conn);
|
||||||
|
} else {
|
||||||
|
Log(LogInformation, "IcingaDB")
|
||||||
|
<< "Trying to connect to Redis server (async) on unix socket path '" << m_Path << "'";
|
||||||
|
|
||||||
|
decltype(m_UnixConn) conn (new UnixConn(m_Strand.context()));
|
||||||
|
conn->next_layer().async_connect(Unix::endpoint(m_Path.CStr()), yc);
|
||||||
|
m_UnixConn = std::move(conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
m_Connected.store(true);
|
||||||
|
|
||||||
|
Log(LogInformation, "IcingaDB", "Connected to Redis server");
|
||||||
|
|
||||||
|
break;
|
||||||
|
} catch (const boost::coroutines::detail::forced_unwind&) {
|
||||||
|
throw;
|
||||||
|
} catch (const std::exception& ex) {
|
||||||
|
Log(LogCritical, "IcingaDB")
|
||||||
|
<< "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
|
||||||
}
|
}
|
||||||
|
|
||||||
m_Connected.store(true);
|
timer.expires_from_now(boost::posix_time::seconds(5));
|
||||||
|
timer.async_wait(yc);
|
||||||
Log(LogInformation, "IcingaDB", "Connected to Redis server");
|
|
||||||
} catch (const boost::coroutines::detail::forced_unwind&) {
|
|
||||||
throw;
|
|
||||||
} catch (const std::exception& ex) {
|
|
||||||
Log(LogCritical, "IcingaDB")
|
|
||||||
<< "Cannot connect to " << m_Host << ":" << m_Port << ": " << ex.what();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisConnection::ReadLoop(asio::yield_context& yc)
|
void RedisConnection::ReadLoop(asio::yield_context& yc)
|
||||||
|
@ -207,6 +207,8 @@ private:
|
|||||||
template<class StreamPtr>
|
template<class StreamPtr>
|
||||||
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
|
RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::yield_context& yc)
|
||||||
{
|
{
|
||||||
|
namespace asio = boost::asio;
|
||||||
|
|
||||||
if (!stream) {
|
if (!stream) {
|
||||||
throw RedisDisconnected();
|
throw RedisDisconnected();
|
||||||
}
|
}
|
||||||
@ -221,6 +223,12 @@ RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::
|
|||||||
if (m_Connecting.exchange(false)) {
|
if (m_Connecting.exchange(false)) {
|
||||||
m_Connected.store(false);
|
m_Connected.store(false);
|
||||||
stream = nullptr;
|
stream = nullptr;
|
||||||
|
|
||||||
|
if (!m_Connecting.exchange(true)) {
|
||||||
|
Ptr keepAlive (this);
|
||||||
|
|
||||||
|
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw;
|
throw;
|
||||||
@ -230,6 +238,8 @@ RedisConnection::Reply RedisConnection::ReadOne(StreamPtr& stream, boost::asio::
|
|||||||
template<class StreamPtr>
|
template<class StreamPtr>
|
||||||
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
|
void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query, boost::asio::yield_context& yc)
|
||||||
{
|
{
|
||||||
|
namespace asio = boost::asio;
|
||||||
|
|
||||||
if (!stream) {
|
if (!stream) {
|
||||||
throw RedisDisconnected();
|
throw RedisDisconnected();
|
||||||
}
|
}
|
||||||
@ -245,6 +255,12 @@ void RedisConnection::WriteOne(StreamPtr& stream, RedisConnection::Query& query,
|
|||||||
if (m_Connecting.exchange(false)) {
|
if (m_Connecting.exchange(false)) {
|
||||||
m_Connected.store(false);
|
m_Connected.store(false);
|
||||||
stream = nullptr;
|
stream = nullptr;
|
||||||
|
|
||||||
|
if (!m_Connecting.exchange(true)) {
|
||||||
|
Ptr keepAlive (this);
|
||||||
|
|
||||||
|
asio::spawn(m_Strand, [this, keepAlive](asio::yield_context yc) { Connect(yc); });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
throw;
|
throw;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user