Update PGSQL driver to reflect changes in MySQL driver

Fixes #8989

Signed-off-by: Michael Friedrich <michael.friedrich@netways.de>
This commit is contained in:
Paul Richards 2015-04-04 00:03:39 +01:00 committed by Michael Friedrich
parent 4f767ab06a
commit a3065d8b79
2 changed files with 29 additions and 37 deletions

View File

@ -41,7 +41,7 @@ REGISTER_TYPE(IdoPgsqlConnection);
REGISTER_STATSFUNCTION(IdoPgsqlConnectionStats, &IdoPgsqlConnection::StatsFunc); REGISTER_STATSFUNCTION(IdoPgsqlConnectionStats, &IdoPgsqlConnection::StatsFunc);
IdoPgsqlConnection::IdoPgsqlConnection(void) IdoPgsqlConnection::IdoPgsqlConnection(void)
: m_QueryQueue(500000), m_Connection(NULL) : m_QueryQueue(500000)
{ } { }
void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata) void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
@ -69,7 +69,6 @@ void IdoPgsqlConnection::Resume(void)
DbConnection::Resume(); DbConnection::Resume();
SetConnected(false); SetConnected(false);
m_Connection = NULL;
m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1)); m_QueryQueue.SetExceptionCallback(boost::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1));
@ -104,12 +103,9 @@ void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
Log(LogDebug, "IdoPgsqlConnection") Log(LogDebug, "IdoPgsqlConnection")
<< "Exception during database operation: " << DiagnosticInformation(exp); << "Exception during database operation: " << DiagnosticInformation(exp);
boost::mutex::scoped_lock lock(m_ConnectionMutex); if (GetConnected()) {
if (m_Connection) {
PQfinish(m_Connection); PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL;
} }
} }
@ -122,16 +118,13 @@ void IdoPgsqlConnection::Disconnect(void)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
boost::mutex::scoped_lock lock(m_ConnectionMutex); if (!GetConnected())
if (!m_Connection)
return; return;
Query("COMMIT"); Query("COMMIT");
PQfinish(m_Connection);
PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL;
} }
void IdoPgsqlConnection::TxTimerHandler(void) void IdoPgsqlConnection::TxTimerHandler(void)
@ -146,9 +139,9 @@ void IdoPgsqlConnection::NewTransaction(void)
void IdoPgsqlConnection::InternalNewTransaction(void) void IdoPgsqlConnection::InternalNewTransaction(void)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); AssertOnWorkQueue();
if (!m_Connection) if (!GetConnected())
return; return;
Query("COMMIT"); Query("COMMIT");
@ -166,22 +159,22 @@ void IdoPgsqlConnection::Reconnect(void)
CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'"); CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'");
SetShouldConnect(true);
std::vector<DbObject::Ptr> active_dbobjs; std::vector<DbObject::Ptr> active_dbobjs;
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex);
bool reconnect = false; bool reconnect = false;
if (m_Connection) { if (GetConnected()) {
/* Check if we're really still connected */ /* Check if we're really still connected */
try { try {
Query("SELECT 1"); Query("SELECT 1");
return; return;
} catch (const std::exception&) { } catch (const std::exception&) {
SetConnected(false);
PQfinish(m_Connection); PQfinish(m_Connection);
m_Connection = NULL; SetConnected(false);
reconnect = true; reconnect = true;
} }
} }
@ -208,13 +201,10 @@ void IdoPgsqlConnection::Reconnect(void)
if (!m_Connection) if (!m_Connection)
return; return;
SetConnected(true);
if (PQstatus(m_Connection) != CONNECTION_OK) { if (PQstatus(m_Connection) != CONNECTION_OK) {
String message = PQerrorMessage(m_Connection); String message = PQerrorMessage(m_Connection);
PQfinish(m_Connection); PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL;
Log(LogCritical, "IdoPgsqlConnection") Log(LogCritical, "IdoPgsqlConnection")
<< "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port << "Connection to database '" << db << "' with user '" << user << "' on '" << host << ":" << port
@ -223,6 +213,8 @@ void IdoPgsqlConnection::Reconnect(void)
BOOST_THROW_EXCEPTION(std::runtime_error(message)); BOOST_THROW_EXCEPTION(std::runtime_error(message));
} }
SetConnected(true);
String dbVersionName = "idoutils"; String dbVersionName = "idoutils";
IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'"); IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
@ -231,7 +223,6 @@ void IdoPgsqlConnection::Reconnect(void)
if (!row) { if (!row) {
PQfinish(m_Connection); PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL;
Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation."); Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
@ -245,7 +236,6 @@ void IdoPgsqlConnection::Reconnect(void)
if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) { if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
PQfinish(m_Connection); PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL;
Log(LogCritical, "IdoPgsqlConnection") Log(LogCritical, "IdoPgsqlConnection")
<< "Schema version '" << version << "' does not match the required version '" << "Schema version '" << version << "' does not match the required version '"
@ -299,7 +289,7 @@ void IdoPgsqlConnection::Reconnect(void)
if (status_update_age < GetFailoverTimeout()) { if (status_update_age < GetFailoverTimeout()) {
PQfinish(m_Connection); PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL; SetShouldConnect(false);
return; return;
} }
@ -311,7 +301,6 @@ void IdoPgsqlConnection::Reconnect(void)
PQfinish(m_Connection); PQfinish(m_Connection);
SetConnected(false); SetConnected(false);
m_Connection = NULL;
return; return;
} }
@ -486,13 +475,14 @@ Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int r
void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj));
InternalActivateObject(dbobj);
} }
void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
{ {
if (!m_Connection) AssertOnWorkQueue();
if (!GetConnected())
return; return;
DbReference dbref = GetObjectID(dbobj); DbReference dbref = GetObjectID(dbobj);
@ -519,9 +509,14 @@ void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj) void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj));
}
if (!m_Connection) void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
{
AssertOnWorkQueue();
if (!GetConnected())
return; return;
DbReference dbref = GetObjectID(dbobj); DbReference dbref = GetObjectID(dbobj);
@ -537,7 +532,6 @@ void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
* because the object is still in the database. */ * because the object is still in the database. */
} }
/* caller must hold m_ConnectionMutex */
bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result) bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
{ {
if (key == "instance_id") { if (key == "instance_id") {
@ -609,12 +603,12 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); AssertOnWorkQueue();
if ((query.Category & GetCategories()) == 0) if ((query.Category & GetCategories()) == 0)
return; return;
if (!m_Connection) if (!GetConnected())
return; return;
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool())
@ -721,8 +715,6 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
Query(qbuf.str()); Query(qbuf.str());
if (upsert && GetAffectedRows() == 0) { if (upsert && GetAffectedRows() == 0) {
lock.unlock();
DbQueryType to = DbQueryInsert; DbQueryType to = DbQueryInsert;
InternalExecuteQuery(query, &to); InternalExecuteQuery(query, &to);
@ -759,9 +751,9 @@ void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String&
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); AssertOnWorkQueue();
if (!m_Connection) if (!GetConnected())
return; return;
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +

View File

@ -64,7 +64,6 @@ private:
WorkQueue m_QueryQueue; WorkQueue m_QueryQueue;
boost::mutex m_ConnectionMutex;
PGconn *m_Connection; PGconn *m_Connection;
int m_AffectedRows; int m_AffectedRows;
@ -79,6 +78,7 @@ private:
bool FieldToEscapedString(const String& key, const Value& value, Value *result); bool FieldToEscapedString(const String& key, const Value& value, Value *result);
void InternalActivateObject(const DbObject::Ptr& dbobj); void InternalActivateObject(const DbObject::Ptr& dbobj);
void InternalDeactivateObject(const DbObject::Ptr& dbobj);
void Disconnect(void); void Disconnect(void);
void InternalNewTransaction(void); void InternalNewTransaction(void);