ido_mysql: Automatically reconnect when necessary.

This commit is contained in:
Gunnar Beutner 2013-07-19 10:18:47 +02:00
parent f42f4fbdb2
commit 8a99019146
4 changed files with 80 additions and 50 deletions

View File

@ -23,7 +23,7 @@ libido_mysql_la_CPPFLAGS = \
libido_mysql_la_LDFLAGS = \ libido_mysql_la_LDFLAGS = \
$(BOOST_LDFLAGS) \ $(BOOST_LDFLAGS) \
$(MYSQL_LDFLAGS) \ $(MYSQLR_LDFLAGS) \
-module \ -module \
-no-undefined \ -no-undefined \
@RELEASE_INFO@ \ @RELEASE_INFO@ \

View File

@ -31,7 +31,7 @@ using namespace icinga;
REGISTER_TYPE(MysqlDbConnection); REGISTER_TYPE(MysqlDbConnection);
MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate) MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
: DbConnection(serializedUpdate) : DbConnection(serializedUpdate), m_Connected(false)
{ {
RegisterAttribute("host", Attribute_Config, &m_Host); RegisterAttribute("host", Attribute_Config, &m_Host);
RegisterAttribute("port", Attribute_Config, &m_Port); RegisterAttribute("port", Attribute_Config, &m_Port);
@ -47,7 +47,44 @@ MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
m_TxTimer->OnTimerExpired.connect(boost::bind(&MysqlDbConnection::TxTimerHandler, this)); m_TxTimer->OnTimerExpired.connect(boost::bind(&MysqlDbConnection::TxTimerHandler, this));
m_TxTimer->Start(); m_TxTimer->Start();
/* TODO: move this to a timer so we can periodically check if we're still connected - and reconnect if necessary */ m_ReconnectTimer = boost::make_shared<Timer>();
m_ReconnectTimer->SetInterval(10);
m_ReconnectTimer->OnTimerExpired.connect(boost::bind(&MysqlDbConnection::ReconnectTimerHandler, this));
m_ReconnectTimer->Start();
ASSERT(mysql_thread_safe());
}
void MysqlDbConnection::Stop(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
mysql_close(&m_Connection);
}
void MysqlDbConnection::TxTimerHandler(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
if (!m_Connected)
return;
Query("COMMIT");
Query("BEGIN");
}
void MysqlDbConnection::ReconnectTimerHandler(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
if (m_Connected) {
/* Check if we're really still connected */
if (mysql_ping(&m_Connection) == 0)
return;
mysql_close(&m_Connection);
m_Connected = false;
}
String ihost, iuser, ipasswd, idb; String ihost, iuser, ipasswd, idb;
const char *host, *user , *passwd, *db; const char *host, *user , *passwd, *db;
long port; long port;
@ -63,50 +100,48 @@ MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL; passwd = (!ipasswd.IsEmpty()) ? ipasswd.CStr() : NULL;
db = (!idb.IsEmpty()) ? idb.CStr() : NULL; db = (!idb.IsEmpty()) ? idb.CStr() : NULL;
{ if (!mysql_init(&m_Connection))
boost::mutex::scoped_lock lock(m_ConnectionMutex); BOOST_THROW_EXCEPTION(std::bad_alloc());
if (!mysql_init(&m_Connection)) if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0))
BOOST_THROW_EXCEPTION(std::bad_alloc()); BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0)) m_Connected = true;
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
String instanceName = "default"; String instanceName = "default";
if (!m_InstanceName.IsEmpty()) if (!m_InstanceName.IsEmpty())
instanceName = m_InstanceName; instanceName = m_InstanceName;
Array::Ptr rows = Query("SELECT instance_id FROM icinga_instances WHERE instance_name = '" + Escape(instanceName) + "'"); Array::Ptr rows = Query("SELECT instance_id FROM icinga_instances WHERE instance_name = '" + Escape(instanceName) + "'");
if (rows->GetLength() == 0) { if (rows->GetLength() == 0) {
Query("INSERT INTO icinga_instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + m_InstanceDescription + "')"); Query("INSERT INTO icinga_instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + m_InstanceDescription + "')");
m_InstanceID = GetInsertID(); m_InstanceID = GetInsertID();
} else { } else {
Dictionary::Ptr row = rows->Get(0); Dictionary::Ptr row = rows->Get(0);
m_InstanceID = DbReference(row->Get("instance_id")); m_InstanceID = DbReference(row->Get("instance_id"));
} }
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID); msgbuf << "MySQL IDO instance id: " << static_cast<long>(m_InstanceID);
Log(LogInformation, "ido_mysql", msgbuf.str()); Log(LogInformation, "ido_mysql", msgbuf.str());
Query("UPDATE icinga_objects SET is_active = 0"); Query("UPDATE icinga_objects SET is_active = 0");
std::ostringstream qbuf; std::ostringstream qbuf;
qbuf << "SELECT object_id, objecttype_id, name1, name2 FROM icinga_objects WHERE instance_id = " << static_cast<long>(m_InstanceID); qbuf << "SELECT object_id, objecttype_id, name1, name2 FROM icinga_objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
rows = Query(qbuf.str()); rows = Query(qbuf.str());
ObjectLock olock(rows); ObjectLock olock(rows);
BOOST_FOREACH(const Dictionary::Ptr& row, rows) { BOOST_FOREACH(const Dictionary::Ptr& row, rows) {
DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
if (!dbtype) if (!dbtype)
continue; continue;
DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2")); DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
SetReference(dbobj, DbReference(row->Get("object_id"))); SetReference(dbobj, DbReference(row->Get("object_id")));
}
} }
Query("BEGIN"); Query("BEGIN");
@ -114,19 +149,6 @@ MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
UpdateAllObjects(); UpdateAllObjects();
} }
void MysqlDbConnection::Stop(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
mysql_close(&m_Connection);
}
void MysqlDbConnection::TxTimerHandler(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
Query("COMMIT");
Query("BEGIN");
}
Array::Ptr MysqlDbConnection::Query(const String& query) Array::Ptr MysqlDbConnection::Query(const String& query)
{ {
Log(LogDebug, "ido_mysql", "Query: " + query); Log(LogDebug, "ido_mysql", "Query: " + query);
@ -209,6 +231,10 @@ Dictionary::Ptr MysqlDbConnection::FetchRow(MYSQL_RES *result)
void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) { void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) {
boost::mutex::scoped_lock lock(m_ConnectionMutex); boost::mutex::scoped_lock lock(m_ConnectionMutex);
/* Check if we can handle updates right now */
if (!m_Connected)
return;
DbReference dbref = GetReference(dbobj); DbReference dbref = GetReference(dbobj);
if (kind == DbObjectRemoved) { if (kind == DbObjectRemoved) {
@ -234,8 +260,6 @@ void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType ki
Query(q1buf.str()); Query(q1buf.str());
} }
//Dictionary::Ptr cols = boost::make_shared<Dictionary>();
Dictionary::Ptr fields = dbobj->GetFields(); Dictionary::Ptr fields = dbobj->GetFields();
if (!fields) if (!fields)

View File

@ -57,8 +57,10 @@ private:
DbReference m_InstanceID; DbReference m_InstanceID;
boost::mutex m_ConnectionMutex; boost::mutex m_ConnectionMutex;
bool m_Connected;
MYSQL m_Connection; MYSQL m_Connection;
Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer; Timer::Ptr m_TxTimer;
Array::Ptr Query(const String& query); Array::Ptr Query(const String& query);
@ -67,6 +69,7 @@ private:
Dictionary::Ptr FetchRow(MYSQL_RES *result); Dictionary::Ptr FetchRow(MYSQL_RES *result);
void TxTimerHandler(void); void TxTimerHandler(void);
void ReconnectTimerHandler(void);
}; };
} }

View File

@ -28,6 +28,7 @@
# #
# AC_SUBST(MYSQL_CFLAGS) # AC_SUBST(MYSQL_CFLAGS)
# AC_SUBST(MYSQL_LDFLAGS) # AC_SUBST(MYSQL_LDFLAGS)
# AC_SUBST(MYSQLR_LDFLAGS)
# AC_SUBST(MYSQL_VERSION) # AC_SUBST(MYSQL_VERSION)
# #
# And sets: # And sets:
@ -82,6 +83,7 @@ AC_DEFUN([AX_LIB_MYSQL],
if test "$MYSQL_CONFIG" != "no"; then if test "$MYSQL_CONFIG" != "no"; then
MYSQL_CFLAGS="`$MYSQL_CONFIG --cflags`" MYSQL_CFLAGS="`$MYSQL_CONFIG --cflags`"
MYSQL_LDFLAGS="`$MYSQL_CONFIG --libs`" MYSQL_LDFLAGS="`$MYSQL_CONFIG --libs`"
MYSQLR_LDFLAGS="`$MYSQL_CONFIG --libs_r`"
MYSQL_VERSION=`$MYSQL_CONFIG --version` MYSQL_VERSION=`$MYSQL_CONFIG --version`
@ -144,4 +146,5 @@ AC_DEFUN([AX_LIB_MYSQL],
AC_SUBST([MYSQL_VERSION]) AC_SUBST([MYSQL_VERSION])
AC_SUBST([MYSQL_CFLAGS]) AC_SUBST([MYSQL_CFLAGS])
AC_SUBST([MYSQL_LDFLAGS]) AC_SUBST([MYSQL_LDFLAGS])
AC_SUBST([MYSQLR_LDFLAGS])
]) ])