Incrementally fetch MySQL/PGsql rows.

Fixes #5374
This commit is contained in:
Gunnar Beutner 2014-01-28 11:42:12 +01:00
parent e0596f2d33
commit 08303f9aca
4 changed files with 64 additions and 72 deletions

View File

@ -165,12 +165,15 @@ void IdoMysqlConnection::Reconnect(void)
m_Connected = true; m_Connected = true;
String dbVersionName = "idoutils"; String dbVersionName = "idoutils";
Array::Ptr version_rows = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); IdoMysqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
if (version_rows->GetLength() == 0) Dictionary::Ptr version_row = FetchRow(result);
if (!version_row)
BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation.")); BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation."));
Dictionary::Ptr version_row = version_rows->Get(0); DiscardRows(result);
String version = version_row->Get("version"); String version = version_row->Get("version");
if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) { if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
@ -180,13 +183,16 @@ void IdoMysqlConnection::Reconnect(void)
String instanceName = GetInstanceName(); String instanceName = GetInstanceName();
Array::Ptr rows = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
if (rows->GetLength() == 0) { Dictionary::Ptr row = FetchRow(result);
if (!row) {
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
m_InstanceID = GetLastInsertID(); m_InstanceID = GetLastInsertID();
} else { } else {
Dictionary::Ptr row = rows->Get(0); DiscardRows(result);
m_InstanceID = DbReference(row->Get("instance_id")); m_InstanceID = DbReference(row->Get("instance_id"));
} }
@ -208,10 +214,9 @@ void IdoMysqlConnection::Reconnect(void)
std::ostringstream q1buf; std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID); q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
rows = Query(q1buf.str()); result = Query(q1buf.str());
ObjectLock olock(rows); while ((row = FetchRow(result))) {
BOOST_FOREACH(const Dictionary::Ptr& row, rows) {
DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
if (!dbtype) if (!dbtype)
@ -267,7 +272,7 @@ void IdoMysqlConnection::ClearConfigTable(const String& table)
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID))); Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID)));
} }
Array::Ptr IdoMysqlConnection::Query(const String& query) IdoMysqlResult IdoMysqlConnection::Query(const String& query)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -290,23 +295,10 @@ Array::Ptr IdoMysqlConnection::Query(const String& query)
<< errinfo_database_query(query) << errinfo_database_query(query)
); );
return Array::Ptr(); return IdoMysqlResult();
} }
Array::Ptr rows = make_shared<Array>(); return IdoMysqlResult(result, std::ptr_fun(mysql_free_result));
for (;;) {
Dictionary::Ptr row = FetchRow(result);
if (!row)
break;
rows->Add(row);
}
mysql_free_result(result);
return rows;
} }
DbReference IdoMysqlConnection::GetLastInsertID(void) DbReference IdoMysqlConnection::GetLastInsertID(void)
@ -332,7 +324,7 @@ String IdoMysqlConnection::Escape(const String& s)
return result; return result;
} }
Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result) Dictionary::Ptr IdoMysqlConnection::FetchRow(const IdoMysqlResult& result)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -340,25 +332,33 @@ Dictionary::Ptr IdoMysqlConnection::FetchRow(MYSQL_RES *result)
MYSQL_FIELD *field; MYSQL_FIELD *field;
unsigned long *lengths, i; unsigned long *lengths, i;
row = mysql_fetch_row(result); row = mysql_fetch_row(result.get());
if (!row) if (!row)
return Dictionary::Ptr(); return Dictionary::Ptr();
lengths = mysql_fetch_lengths(result); lengths = mysql_fetch_lengths(result.get());
if (!lengths) if (!lengths)
return Dictionary::Ptr(); return Dictionary::Ptr();
Dictionary::Ptr dict = make_shared<Dictionary>(); Dictionary::Ptr dict = make_shared<Dictionary>();
mysql_field_seek(result, 0); mysql_field_seek(result.get(), 0);
for (field = mysql_fetch_field(result), i = 0; field; field = mysql_fetch_field(result), i++) for (field = mysql_fetch_field(result.get()), i = 0; field; field = mysql_fetch_field(result.get()), i++)
dict->Set(field->name, String(row[i], row[i] + lengths[i])); dict->Set(field->name, String(row[i], row[i] + lengths[i]));
return dict; return dict;
} }
void IdoMysqlConnection::DiscardRows(const IdoMysqlResult& result)
{
Dictionary::Ptr row;
while ((row = FetchRow(result)))
; /* empty loop body */
}
void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj) void IdoMysqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
{ {
boost::mutex::scoped_lock lock(m_ConnectionMutex); boost::mutex::scoped_lock lock(m_ConnectionMutex);

View File

@ -29,6 +29,8 @@
namespace icinga namespace icinga
{ {
typedef shared_ptr<MYSQL_RES> IdoMysqlResult;
/** /**
* An IDO MySQL database connection. * An IDO MySQL database connection.
* *
@ -61,10 +63,11 @@ private:
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer; Timer::Ptr m_TxTimer;
Array::Ptr Query(const String& query); IdoMysqlResult Query(const String& query);
DbReference GetLastInsertID(void); DbReference GetLastInsertID(void);
String Escape(const String& s); String Escape(const String& s);
Dictionary::Ptr FetchRow(MYSQL_RES *result); Dictionary::Ptr FetchRow(const IdoMysqlResult& result);
void DiscardRows(const IdoMysqlResult& result);
bool FieldToEscapedString(const String& key, const Value& value, Value *result); bool FieldToEscapedString(const String& key, const Value& value, Value *result);
void InternalActivateObject(const DbObject::Ptr& dbobj); void InternalActivateObject(const DbObject::Ptr& dbobj);

View File

@ -172,12 +172,13 @@ void IdoPgsqlConnection::Reconnect(void)
} }
String dbVersionName = "idoutils"; String dbVersionName = "idoutils";
Array::Ptr version_rows = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'"); IdoPgsqlResult result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name='" + Escape(dbVersionName) + "'");
if (version_rows->GetLength() == 0) Dictionary::Ptr version_row = FetchRow(result, 0);
if (!version_row)
BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation.")); BOOST_THROW_EXCEPTION(std::runtime_error("Schema does not provide any valid version! Verify your schema installation."));
Dictionary::Ptr version_row = version_rows->Get(0);
String version = version_row->Get("version"); String version = version_row->Get("version");
if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) { if (Utility::CompareVersion(SCHEMA_VERSION, version) < 0) {
@ -187,13 +188,14 @@ void IdoPgsqlConnection::Reconnect(void)
String instanceName = GetInstanceName(); String instanceName = GetInstanceName();
Array::Ptr rows = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'"); result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = '" + Escape(instanceName) + "'");
if (rows->GetLength() == 0) { Dictionary::Ptr row = FetchRow(result, 0);
if (!row) {
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')"); Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + Escape(GetInstanceDescription()) + "')");
m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id"); m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id");
} else { } else {
Dictionary::Ptr row = rows->Get(0);
m_InstanceID = DbReference(row->Get("instance_id")); m_InstanceID = DbReference(row->Get("instance_id"));
} }
@ -212,10 +214,12 @@ void IdoPgsqlConnection::Reconnect(void)
std::ostringstream q1buf; std::ostringstream q1buf;
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID); q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
rows = Query(q1buf.str()); result = Query(q1buf.str());
int index = 0;
while ((row = FetchRow(result, index))) {
index++;
ObjectLock olock(rows);
BOOST_FOREACH(const Dictionary::Ptr& row, rows) {
DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id")); DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
if (!dbtype) if (!dbtype)
@ -271,7 +275,7 @@ void IdoPgsqlConnection::ClearConfigTable(const String& table)
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID))); Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " + Convert::ToString(static_cast<long>(m_InstanceID)));
} }
Array::Ptr IdoPgsqlConnection::Query(const String& query) IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -286,7 +290,7 @@ Array::Ptr IdoPgsqlConnection::Query(const String& query)
); );
if (PQresultStatus(result) == PGRES_COMMAND_OK) if (PQresultStatus(result) == PGRES_COMMAND_OK)
return Array::Ptr(); return IdoPgsqlResult();
if (PQresultStatus(result) != PGRES_TUPLES_OK) { if (PQresultStatus(result) != PGRES_TUPLES_OK) {
String message = PQresultErrorMessage(result); String message = PQresultErrorMessage(result);
@ -299,35 +303,18 @@ Array::Ptr IdoPgsqlConnection::Query(const String& query)
); );
} }
Array::Ptr rows = make_shared<Array>(); return IdoPgsqlResult(result, std::ptr_fun(PQclear));
int rownum = 0;
for (;;) {
Dictionary::Ptr row = FetchRow(result, rownum);
if (!row)
break;
rows->Add(row);
rownum++;
}
PQclear(result);
return rows;
} }
DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column) DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
Array::Ptr rows = Query("SELECT CURRVAL(pg_get_serial_sequence('" + Escape(table) + "', '" + Escape(column) + "')) AS id"); IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence('" + Escape(table) + "', '" + Escape(column) + "')) AS id");
ASSERT(rows->GetLength() == 1); Dictionary::Ptr row = FetchRow(result, 0);
Dictionary::Ptr row = rows->Get(0); ASSERT(row);
std::ostringstream msgbuf; std::ostringstream msgbuf;
msgbuf << "Sequence Value: " << row->Get("id"); msgbuf << "Sequence Value: " << row->Get("id");
@ -352,24 +339,24 @@ String IdoPgsqlConnection::Escape(const String& s)
return result; return result;
} }
Dictionary::Ptr IdoPgsqlConnection::FetchRow(PGresult *result, int row) Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int row)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
if (row >= PQntuples(result)) if (row >= PQntuples(result.get()))
return Dictionary::Ptr(); return Dictionary::Ptr();
int columns = PQnfields(result); int columns = PQnfields(result.get());
Dictionary::Ptr dict = make_shared<Dictionary>(); Dictionary::Ptr dict = make_shared<Dictionary>();
for (int column = 0; column < columns; column++) { for (int column = 0; column < columns; column++) {
Value value; Value value;
if (!PQgetisnull(result, row, column)) if (!PQgetisnull(result.get(), row, column))
value = PQgetvalue(result, row, column); value = PQgetvalue(result.get(), row, column);
dict->Set(PQfname(result, column), value); dict->Set(PQfname(result.get(), column), value);
} }
return dict; return dict;

View File

@ -29,6 +29,8 @@
namespace icinga namespace icinga
{ {
typedef shared_ptr<PGresult> IdoPgsqlResult;
/** /**
* An IDO pgSQL database connection. * An IDO pgSQL database connection.
* *
@ -60,10 +62,10 @@ private:
Timer::Ptr m_ReconnectTimer; Timer::Ptr m_ReconnectTimer;
Timer::Ptr m_TxTimer; Timer::Ptr m_TxTimer;
Array::Ptr Query(const String& query); IdoPgsqlResult Query(const String& query);
DbReference GetSequenceValue(const String& table, const String& column); DbReference GetSequenceValue(const String& table, const String& column);
String Escape(const String& s); String Escape(const String& s);
Dictionary::Ptr FetchRow(PGresult *result, int row); Dictionary::Ptr FetchRow(const IdoPgsqlResult& result, int row);
bool FieldToEscapedString(const String& key, const Value& value, Value *result); bool FieldToEscapedString(const String& key, const Value& value, Value *result);
void InternalActivateObject(const DbObject::Ptr& dbobj); void InternalActivateObject(const DbObject::Ptr& dbobj);