Re-use IDs where possible.

Refs #5565
This commit is contained in:
Gunnar Beutner 2014-01-31 08:28:00 +01:00
parent 3a23f0110b
commit 856f01198d
8 changed files with 158 additions and 45 deletions

View File

@ -159,7 +159,7 @@ void IdoMysqlConnection::Reconnect(void)
if (!mysql_init(&m_Connection)) if (!mysql_init(&m_Connection))
BOOST_THROW_EXCEPTION(std::bad_alloc()); BOOST_THROW_EXCEPTION(std::bad_alloc());
if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, 0)) if (!mysql_real_connect(&m_Connection, host, user, passwd, db, port, NULL, CLIENT_FOUND_ROWS))
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection))); BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
m_Connected = true; m_Connected = true;
@ -210,7 +210,7 @@ void IdoMysqlConnection::Reconnect(void)
+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())"); + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
/* clear config tables for the initial config dump */ /* clear config tables for the initial config dump */
ClearConfigTables(); PrepareDatabase();
std::ostringstream q1buf; std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID); q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
@ -251,6 +251,8 @@ IdoMysqlResult IdoMysqlConnection::Query(const String& query)
<< errinfo_database_query(query) << errinfo_database_query(query)
); );
m_AffectedRows = mysql_affected_rows(&m_Connection);
MYSQL_RES *result = mysql_use_result(&m_Connection); MYSQL_RES *result = mysql_use_result(&m_Connection);
if (!result) { if (!result) {
@ -274,6 +276,13 @@ DbReference IdoMysqlConnection::GetLastInsertID(void)
return DbReference(mysql_insert_id(&m_Connection)); return DbReference(mysql_insert_id(&m_Connection));
} }
int IdoMysqlConnection::GetAffectedRows(void)
{
AssertOnWorkQueue();
return m_AffectedRows;
}
String IdoMysqlConnection::Escape(const String& s) String IdoMysqlConnection::Escape(const String& s)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -431,10 +440,10 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
} }
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query) void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); boost::mutex::scoped_lock lock(m_ConnectionMutex);
@ -468,7 +477,11 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
} }
} }
if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) { type = typeOverride ? *typeOverride : query.Type;
bool upsert = false;
if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
bool hasid = false; bool hasid = false;
ASSERT(query.Object); ASSERT(query.Object);
@ -480,12 +493,11 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
else else
ASSERT(!"Invalid query flags."); ASSERT(!"Invalid query flags.");
if (hasid) if (!hasid)
type = DbQueryUpdate; upsert = true;
else
type = DbQueryInsert; type = DbQueryUpdate;
} else }
type = query.Type;
switch (type) { switch (type) {
case DbQueryInsert: case DbQueryInsert:
@ -541,6 +553,15 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
Query(qbuf.str()); Query(qbuf.str());
if (upsert && GetAffectedRows() == 0) {
lock.unlock();
DbQueryType to = DbQueryInsert;
InternalExecuteQuery(query, &to);
return;
}
if (query.Object) { if (query.Object) {
if (query.ConfigUpdate) if (query.ConfigUpdate)
SetConfigUpdate(query.Object, true); SetConfigUpdate(query.Object, true);
@ -573,3 +594,15 @@ void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column + Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
" < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")"); " < FROM_UNIXTIME(" + Convert::ToString(static_cast<long>(max_age)) + ")");
} }
void IdoMysqlConnection::FillIDCache(const DbType::Ptr& type)
{
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
IdoMysqlResult result = Query(query);
Dictionary::Ptr row;
while ((row = FetchRow(result))) {
SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
}
}

View File

@ -49,6 +49,7 @@ protected:
virtual void DeactivateObject(const DbObject::Ptr& dbobj); virtual void DeactivateObject(const DbObject::Ptr& dbobj);
virtual void ExecuteQuery(const DbQuery& query); virtual void ExecuteQuery(const DbQuery& query);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value); virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void FillIDCache(const DbType::Ptr& type);
private: private:
DbReference m_InstanceID; DbReference m_InstanceID;
@ -58,12 +59,14 @@ private:
boost::mutex m_ConnectionMutex; boost::mutex m_ConnectionMutex;
bool m_Connected; bool m_Connected;
MYSQL m_Connection; MYSQL m_Connection;
int m_AffectedRows;
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer; Timer::Ptr m_TxTimer;
IdoMysqlResult Query(const String& query); IdoMysqlResult Query(const String& query);
DbReference GetLastInsertID(void); DbReference GetLastInsertID(void);
int GetAffectedRows(void);
String Escape(const String& s); String Escape(const String& s);
Dictionary::Ptr FetchRow(const IdoMysqlResult& result); Dictionary::Ptr FetchRow(const IdoMysqlResult& result);
void DiscardRows(const IdoMysqlResult& result); void DiscardRows(const IdoMysqlResult& result);
@ -80,7 +83,7 @@ private:
void TxTimerHandler(void); void TxTimerHandler(void);
void ReconnectTimerHandler(void); void ReconnectTimerHandler(void);
void InternalExecuteQuery(const DbQuery& query); void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value); void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void ClearConfigTable(const String& table); virtual void ClearConfigTable(const String& table);

View File

@ -210,7 +210,7 @@ void IdoPgsqlConnection::Reconnect(void)
+ "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())"); + "', '" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
/* clear config tables for the initial config dump */ /* clear config tables for the initial config dump */
ClearConfigTables(); PrepareDatabase();
std::ostringstream q1buf; std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID); q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
@ -269,6 +269,9 @@ IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
); );
} }
char *rowCount = PQcmdTuples(result);
m_AffectedRows = atoi(rowCount);
return IdoPgsqlResult(result, std::ptr_fun(PQclear)); return IdoPgsqlResult(result, std::ptr_fun(PQclear));
} }
@ -289,6 +292,13 @@ DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const Stri
return DbReference(Convert::ToLong(row->Get("id"))); return DbReference(Convert::ToLong(row->Get("id")));
} }
int IdoPgsqlConnection::GetAffectedRows(void)
{
AssertOnWorkQueue();
return m_AffectedRows;
}
String IdoPgsqlConnection::Escape(const String& s) String IdoPgsqlConnection::Escape(const String& s)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -434,10 +444,10 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), true);
} }
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query) void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); boost::mutex::scoped_lock lock(m_ConnectionMutex);
@ -471,6 +481,10 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
} }
} }
type = typeOverride ? *typeOverride : query.Type;
bool upsert = false;
if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) { if ((query.Type & DbQueryInsert) && (query.Type & DbQueryUpdate)) {
bool hasid = false; bool hasid = false;
@ -483,12 +497,11 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
else else
ASSERT(!"Invalid query flags."); ASSERT(!"Invalid query flags.");
if (hasid) if (!hasid)
type = DbQueryUpdate; upsert = true;
else
type = DbQueryInsert; type = DbQueryUpdate;
} else }
type = query.Type;
switch (type) { switch (type) {
case DbQueryInsert: case DbQueryInsert:
@ -543,6 +556,15 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
Query(qbuf.str()); Query(qbuf.str());
if (upsert && GetAffectedRows() == 0) {
lock.unlock();
DbQueryType to = DbQueryInsert;
InternalExecuteQuery(query, &to);
return;
}
if (query.Object) { if (query.Object) {
if (query.ConfigUpdate) if (query.ConfigUpdate)
SetConfigUpdate(query.Object, true); SetConfigUpdate(query.Object, true);
@ -582,3 +604,17 @@ void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column + Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
" < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ")"); " < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ")");
} }
void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
{
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id FROM " + GetTablePrefix() + type->GetTable() + "s";
IdoPgsqlResult result = Query(query);
Dictionary::Ptr row;
int index = 0;
while ((row = FetchRow(result, index))) {
index++;
SetInsertID(type, DbReference(row->Get("object_id")), DbReference(row->Get(type->GetTable() + "_id")));
}
}

View File

@ -49,6 +49,7 @@ protected:
virtual void DeactivateObject(const DbObject::Ptr& dbobj); virtual void DeactivateObject(const DbObject::Ptr& dbobj);
virtual void ExecuteQuery(const DbQuery& query); virtual void ExecuteQuery(const DbQuery& query);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value); virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void FillIDCache(const DbType::Ptr& type);
private: private:
DbReference m_InstanceID; DbReference m_InstanceID;
@ -57,12 +58,14 @@ private:
boost::mutex m_ConnectionMutex; boost::mutex m_ConnectionMutex;
PGconn *m_Connection; PGconn *m_Connection;
int m_AffectedRows;
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer; Timer::Ptr m_TxTimer;
IdoPgsqlResult Query(const String& query); IdoPgsqlResult Query(const String& query);
DbReference GetSequenceValue(const String& table, const String& column); DbReference GetSequenceValue(const String& table, const String& column);
int GetAffectedRows(void);
String Escape(const String& s); String Escape(const String& s);
Dictionary::Ptr FetchRow(const IdoPgsqlResult& result, int row); Dictionary::Ptr FetchRow(const IdoPgsqlResult& result, int row);
@ -78,7 +81,7 @@ private:
void TxTimerHandler(void); void TxTimerHandler(void);
void ReconnectTimerHandler(void); void ReconnectTimerHandler(void);
void InternalExecuteQuery(const DbQuery& query); void InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride = NULL);
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value); void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
virtual void ClearConfigTable(const String& table); virtual void ClearConfigTable(const String& table);

View File

@ -183,17 +183,33 @@ DbReference DbConnection::GetObjectID(const DbObject::Ptr& dbobj) const
void DbConnection::SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref) void DbConnection::SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref)
{ {
SetInsertID(dbobj->GetType(), GetObjectID(dbobj), dbref);
}
void DbConnection::SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref)
{
if (!objid.IsValid())
return;
if (dbref.IsValid()) if (dbref.IsValid())
m_InsertIDs[dbobj] = dbref; m_InsertIDs[std::make_pair(type, objid)] = dbref;
else else
m_InsertIDs.erase(dbobj); m_InsertIDs.erase(std::make_pair(type, objid));
} }
DbReference DbConnection::GetInsertID(const DbObject::Ptr& dbobj) const DbReference DbConnection::GetInsertID(const DbObject::Ptr& dbobj) const
{ {
std::map<DbObject::Ptr, DbReference>::const_iterator it; return GetInsertID(dbobj->GetType(), GetObjectID(dbobj));
}
it = m_InsertIDs.find(dbobj); DbReference DbConnection::GetInsertID(const DbType::Ptr& type, const DbReference& objid) const
{
if (!objid.IsValid())
return DbReference();
std::map<std::pair<DbType::Ptr, DbReference>, DbReference>::const_iterator it;
it = m_InsertIDs.find(std::make_pair(type, objid));
if (it == m_InsertIDs.end()) if (it == m_InsertIDs.end())
return DbReference(); return DbReference();
@ -293,17 +309,17 @@ void DbConnection::UpdateAllObjects(void)
} }
} }
void DbConnection::ClearConfigTables(void) void DbConnection::PrepareDatabase(void)
{ {
/* TODO make hardcoded table names modular */ /* TODO make hardcoded table names modular */
ClearConfigTable("commands"); //ClearConfigTable("commands");
ClearConfigTable("comments"); ClearConfigTable("comments");
ClearConfigTable("contact_addresses"); ClearConfigTable("contact_addresses");
ClearConfigTable("contact_notificationcommands"); ClearConfigTable("contact_notificationcommands");
ClearConfigTable("contactgroup_members"); ClearConfigTable("contactgroup_members");
ClearConfigTable("contactgroups"); //ClearConfigTable("contactgroups");
ClearConfigTable("contacts"); //ClearConfigTable("contacts");
ClearConfigTable("contactstatus"); //ClearConfigTable("contactstatus");
ClearConfigTable("customvariables"); ClearConfigTable("customvariables");
ClearConfigTable("customvariablestatus"); ClearConfigTable("customvariablestatus");
ClearConfigTable("host_contactgroups"); ClearConfigTable("host_contactgroups");
@ -311,18 +327,22 @@ void DbConnection::ClearConfigTables(void)
ClearConfigTable("host_parenthosts"); ClearConfigTable("host_parenthosts");
ClearConfigTable("hostdependencies"); ClearConfigTable("hostdependencies");
ClearConfigTable("hostgroup_members"); ClearConfigTable("hostgroup_members");
ClearConfigTable("hostgroups"); //ClearConfigTable("hostgroups");
ClearConfigTable("hosts"); //ClearConfigTable("hosts");
ClearConfigTable("hoststatus"); //ClearConfigTable("hoststatus");
ClearConfigTable("programstatus"); ClearConfigTable("programstatus");
ClearConfigTable("scheduleddowntime"); ClearConfigTable("scheduleddowntime");
ClearConfigTable("service_contactgroups"); //ClearConfigTable("service_contactgroups");
ClearConfigTable("service_contacts"); //ClearConfigTable("service_contacts");
ClearConfigTable("servicedependencies"); //ClearConfigTable("servicedependencies");
ClearConfigTable("servicegroup_members"); //ClearConfigTable("servicegroup_members");
ClearConfigTable("servicegroups"); //ClearConfigTable("servicegroups");
ClearConfigTable("services"); //ClearConfigTable("services");
ClearConfigTable("servicestatus"); //ClearConfigTable("servicestatus");
ClearConfigTable("timeperiod_timeranges"); //ClearConfigTable("timeperiod_timeranges");
ClearConfigTable("timeperiods"); //ClearConfigTable("timeperiods");
BOOST_FOREACH(const DbType::Ptr& type, DbType::GetAllTypes()) {
FillIDCache(type);
}
} }

View File

@ -45,7 +45,9 @@ public:
DbReference GetObjectID(const DbObject::Ptr& dbobj) const; DbReference GetObjectID(const DbObject::Ptr& dbobj) const;
void SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref); void SetInsertID(const DbObject::Ptr& dbobj, const DbReference& dbref);
void SetInsertID(const DbType::Ptr& type, const DbReference& objid, const DbReference& dbref);
DbReference GetInsertID(const DbObject::Ptr& dbobj) const; DbReference GetInsertID(const DbObject::Ptr& dbobj) const;
DbReference GetInsertID(const DbType::Ptr& type, const DbReference& objid) const;
void SetNotificationInsertID(const DynamicObject::Ptr& obj, const DbReference& dbref); void SetNotificationInsertID(const DynamicObject::Ptr& obj, const DbReference& dbref);
DbReference GetNotificationInsertID(const DynamicObject::Ptr& obj) const; DbReference GetNotificationInsertID(const DynamicObject::Ptr& obj) const;
@ -69,14 +71,15 @@ protected:
virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0; virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0;
virtual void CleanUpExecuteQuery(const String& table, const String& time_column, double max_age); virtual void CleanUpExecuteQuery(const String& table, const String& time_column, double max_age);
virtual void FillIDCache(const DbType::Ptr& type) = 0;
void UpdateAllObjects(void); void UpdateAllObjects(void);
void ClearConfigTables(void); void PrepareDatabase(void);
private: private:
std::map<DbObject::Ptr, DbReference> m_ObjectIDs; std::map<DbObject::Ptr, DbReference> m_ObjectIDs;
std::map<DbObject::Ptr, DbReference> m_InsertIDs; std::map<std::pair<DbType::Ptr, DbReference>, DbReference> m_InsertIDs;
std::map<DynamicObject::Ptr, DbReference> m_NotificationInsertIDs; std::map<DynamicObject::Ptr, DbReference> m_NotificationInsertIDs;
std::set<DbObject::Ptr> m_ActiveObjects; std::set<DbObject::Ptr> m_ActiveObjects;
std::set<DbObject::Ptr> m_ConfigUpdates; std::set<DbObject::Ptr> m_ConfigUpdates;

View File

@ -112,3 +112,16 @@ DbType::TypeMap& DbType::GetTypes(void)
static DbType::TypeMap tm; static DbType::TypeMap tm;
return tm; return tm;
} }
std::set<DbType::Ptr> DbType::GetAllTypes(void)
{
std::set<DbType::Ptr> result;
boost::mutex::scoped_lock lock(GetStaticMutex());
std::pair<String, DbType::Ptr> kv;
BOOST_FOREACH(kv, GetTypes()) {
result.insert(kv.second);
}
return result;
}

View File

@ -58,6 +58,8 @@ public:
shared_ptr<DbObject> GetOrCreateObjectByName(const String& name1, const String& name2); shared_ptr<DbObject> GetOrCreateObjectByName(const String& name1, const String& name2);
static std::set<DbType::Ptr> GetAllTypes(void);
private: private:
std::vector<String> m_Names; std::vector<String> m_Names;
String m_Table; String m_Table;