diff --git a/lib/db_ido_pgsql/idopgsqlconnection.cpp b/lib/db_ido_pgsql/idopgsqlconnection.cpp index ab9333133..729ff4c1b 100644 --- a/lib/db_ido_pgsql/idopgsqlconnection.cpp +++ b/lib/db_ido_pgsql/idopgsqlconnection.cpp @@ -41,7 +41,7 @@ REGISTER_TYPE(IdoPgsqlConnection); REGISTER_STATSFUNCTION(IdoPgsqlConnectionStats, &IdoPgsqlConnection::StatsFunc); IdoPgsqlConnection::IdoPgsqlConnection(void) - : m_QueryQueue(500000), m_Connection(NULL) + : m_QueryQueue(500000) { } void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) @@ -69,7 +69,6 @@ void IdoPgsqlConnection::Resume(void) DbConnection::Resume(); SetConnected(false); - m_Connection = NULL; m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1)); @@ -104,12 +103,9 @@ void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp) Log(LogDebug, "IdoPgsqlConnection") << "Exception during database operation: " << DiagnosticInformation(exp); - boost::mutex::scoped_lock lock(m_ConnectionMutex); - - if (m_Connection) { + if (GetConnected()) { PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; } } @@ -122,16 +118,13 @@ void IdoPgsqlConnection::Disconnect(void) { AssertOnWorkQueue(); - boost::mutex::scoped_lock lock(m_ConnectionMutex); - - if (!m_Connection) + if (!GetConnected()) return; Query("COMMIT"); - PQfinish(m_Connection); + PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; } void IdoPgsqlConnection::TxTimerHandler(void) @@ -146,9 +139,9 @@ void IdoPgsqlConnection::NewTransaction(void) void IdoPgsqlConnection::InternalNewTransaction(void) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + AssertOnWorkQueue(); - if (!m_Connection) + if (!GetConnected()) return; Query("COMMIT"); @@ -166,22 +159,22 @@ void IdoPgsqlConnection::Reconnect(void) CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'"); + SetShouldConnect(true); + std::vector active_dbobjs; { - boost::mutex::scoped_lock lock(m_ConnectionMutex); bool reconnect = false; - if (m_Connection) { + if (GetConnected()) { /* Check if we're really still connected */ try { Query("SELECT 1"); return; } catch (const std::exception&) { - SetConnected(false); PQfinish(m_Connection); - m_Connection = NULL; + SetConnected(false); reconnect = true; } } @@ -208,13 +201,10 @@ void IdoPgsqlConnection::Reconnect(void) if (!m_Connection) return; - SetConnected(true); - if (PQstatus(m_Connection) != CONNECTION_OK) { String message = PQerrorMessage(m_Connection); PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; Log(LogCritical, "IdoPgsqlConnection") << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port @@ -223,6 +213,8 @@ void IdoPgsqlConnection::Reconnect(void) BOOST_THROW_EXCEPTION(std::runtime_error(message)); } + SetConnected(true); + String dbVersionName = "idoutils"; IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'"); @@ -231,7 +223,6 @@ void IdoPgsqlConnection::Reconnect(void) if (!row) { PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation."); @@ -245,7 +236,6 @@ void IdoPgsqlConnection::Reconnect(void) if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) { PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; Log(LogCritical, "IdoPgsqlConnection") << "Schema version '" << version << "' does not match the required version '" @@ -299,7 +289,7 @@ void IdoPgsqlConnection::Reconnect(void) if (status_update_age < GetFailoverTimeout()) { PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; + SetShouldConnect(false); return; } @@ -311,7 +301,6 @@ void IdoPgsqlConnection::Reconnect(void) PQfinish(m_Connection); SetConnected(false); - m_Connection = NULL; return; } @@ -486,13 +475,14 @@ Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int r void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); - InternalActivateObject(dbobj); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj)); } void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) { - if (!m_Connection) + AssertOnWorkQueue(); + + if (!GetConnected()) return; DbReference dbref = GetObjectID(dbobj); @@ -519,9 +509,14 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj)); +} - if (!m_Connection) +void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj) +{ + AssertOnWorkQueue(); + + if (!GetConnected()) return; DbReference dbref = GetObjectID(dbobj); @@ -537,7 +532,6 @@ void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) * because the object is still in the database. */ } -/* caller must hold m_ConnectionMutex */ bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result) { if (key == "instance_id") { @@ -609,12 +603,12 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query) void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + AssertOnWorkQueue(); if ((query.Category & GetCategories()) == 0) return; - if (!m_Connection) + if (!GetConnected()) return; if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) @@ -721,8 +715,6 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType Query(qbuf.str()); if (upsert && GetAffectedRows() == 0) { - lock.unlock(); - DbQueryType to = DbQueryInsert; InternalExecuteQuery(query, &to); @@ -759,9 +751,9 @@ void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) { - boost::mutex::scoped_lock lock(m_ConnectionMutex); + AssertOnWorkQueue(); - if (!m_Connection) + if (!GetConnected()) return; Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + diff --git a/lib/db_ido_pgsql/idopgsqlconnection.hpp b/lib/db_ido_pgsql/idopgsqlconnection.hpp index 8ec15a93e..be009b5b9 100644 --- a/lib/db_ido_pgsql/idopgsqlconnection.hpp +++ b/lib/db_ido_pgsql/idopgsqlconnection.hpp @@ -64,7 +64,6 @@ private: WorkQueue m_QueryQueue; - boost::mutex m_ConnectionMutex; PGconn *m_Connection; int m_AffectedRows; @@ -79,6 +78,7 @@ private: bool FieldToEscapedString(const String& key, const Value& value, Value *result); void InternalActivateObject(const DbObject::Ptr& dbobj); + void InternalDeactivateObject(const DbObject::Ptr& dbobj); void Disconnect(void); void InternalNewTransaction(void);