Fix object updates and implement transaction support.

This commit is contained in:
Gunnar Beutner 2013-07-17 14:10:28 +02:00
parent e66c36ec9e
commit c50f6ab82b
9 changed files with 191 additions and 64 deletions

View File

@ -19,6 +19,7 @@
#include "base/logger_fwd.h" #include "base/logger_fwd.h"
#include "base/objectlock.h" #include "base/objectlock.h"
#include "base/convert.h"
#include "ido/dbtype.h" #include "ido/dbtype.h"
#include "ido_mysql/mysqldbconnection.h" #include "ido_mysql/mysqldbconnection.h"
#include <boost/tuple/tuple.hpp> #include <boost/tuple/tuple.hpp>
@ -41,6 +42,11 @@ MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
RegisterAttribute("instance_name", Attribute_Config, &m_InstanceName); RegisterAttribute("instance_name", Attribute_Config, &m_InstanceName);
RegisterAttribute("instance_description", Attribute_Config, &m_InstanceDescription); RegisterAttribute("instance_description", Attribute_Config, &m_InstanceDescription);
m_TxTimer = boost::make_shared<Timer>();
m_TxTimer->SetInterval(5);
m_TxTimer->OnTimerExpired.connect(boost::bind(&MysqlDbConnection::TxTimerHandler, this));
m_TxTimer->Start();
/* TODO: move this to a timer so we can periodically check if we're still connected - and reconnect if necessary */ /* TODO: move this to a timer so we can periodically check if we're still connected - and reconnect if necessary */
String ihost, iuser, ipasswd, idb; String ihost, iuser, ipasswd, idb;
const char *host, *user , *passwd, *db; const char *host, *user , *passwd, *db;
@ -75,7 +81,7 @@ MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
if (rows->GetLength() == 0) { if (rows->GetLength() == 0) {
Query("INSERT INTO icinga_instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + m_InstanceDescription + "')"); Query("INSERT INTO icinga_instances (instance_name, instance_description) VALUES ('" + Escape(instanceName) + "', '" + m_InstanceDescription + "')");
m_InstanceID = GetInsertId(); m_InstanceID = GetInsertID();
} else { } else {
Dictionary::Ptr row = rows->Get(0); Dictionary::Ptr row = rows->Get(0);
m_InstanceID = DbReference(row->Get("instance_id")); m_InstanceID = DbReference(row->Get("instance_id"));
@ -88,20 +94,23 @@ MysqlDbConnection::MysqlDbConnection(const Dictionary::Ptr& serializedUpdate)
Query("UPDATE icinga_objects SET is_active = 0"); Query("UPDATE icinga_objects SET is_active = 0");
std::ostringstream qbuf; std::ostringstream qbuf;
qbuf << "SELECT objecttype_id, name1, name2 FROM icinga_objects WHERE instance_id = " << static_cast<long>(m_InstanceID); qbuf << "SELECT object_id, objecttype_id, name1, name2 FROM icinga_objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
rows = Query(qbuf.str()); rows = Query(qbuf.str());
ObjectLock olock(rows); ObjectLock olock(rows);
BOOST_FOREACH(const Dictionary::Ptr& row, rows) { BOOST_FOREACH(const Dictionary::Ptr& row, rows) {
DbType::Ptr dbtype = DbType::GetById(row->Get("objecttype_id")); DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
if (!dbtype) if (!dbtype)
continue; continue;
dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2")); DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
SetReference(dbobj, DbReference(row->Get("object_id")));
} }
} }
Query("BEGIN");
UpdateAllObjects(); UpdateAllObjects();
} }
@ -111,8 +120,17 @@ void MysqlDbConnection::Stop(void)
mysql_close(&m_Connection); mysql_close(&m_Connection);
} }
void MysqlDbConnection::TxTimerHandler(void)
{
boost::mutex::scoped_lock lock(m_ConnectionMutex);
Query("COMMIT");
Query("BEGIN");
}
Array::Ptr MysqlDbConnection::Query(const String& query) Array::Ptr MysqlDbConnection::Query(const String& query)
{ {
Log(LogDebug, "ido_mysql", "Query: " + query);
if (mysql_query(&m_Connection, query.CStr()) != 0) if (mysql_query(&m_Connection, query.CStr()) != 0)
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection))); BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
@ -141,7 +159,7 @@ Array::Ptr MysqlDbConnection::Query(const String& query)
return rows; return rows;
} }
DbReference MysqlDbConnection::GetInsertId(void) DbReference MysqlDbConnection::GetInsertID(void)
{ {
return DbReference(mysql_insert_id(&m_Connection)); return DbReference(mysql_insert_id(&m_Connection));
} }
@ -189,6 +207,8 @@ Dictionary::Ptr MysqlDbConnection::FetchRow(MYSQL_RES *result)
} }
void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) { void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) {
boost::mutex::scoped_lock lock(m_ConnectionMutex);
DbReference dbref = GetReference(dbobj); DbReference dbref = GetReference(dbobj);
if (kind == DbObjectRemoved) { if (kind == DbObjectRemoved) {
@ -200,56 +220,69 @@ void MysqlDbConnection::UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType ki
Log(LogWarning, "ido_mysql", "Query: " + qbuf.str()); Log(LogWarning, "ido_mysql", "Query: " + qbuf.str());
} }
std::ostringstream q1buf; if (kind == DbObjectCreated) {
std::ostringstream q1buf;
if (!dbref.IsValid()) { if (!dbref.IsValid()) {
q1buf << "INSERT INTO icinga_objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("; q1buf << "INSERT INTO icinga_objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
} else { << static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
q1buf << "UPDATE icinga_objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref); << "'" << Escape(dbobj->GetName1()) << "', '" << Escape(dbobj->GetName2()) << "', 1)";
} Query(q1buf.str());
dbref = GetInsertID();
Dictionary::Ptr cols = boost::make_shared<Dictionary>(); } else if (kind == DbObjectCreated) {
q1buf << "UPDATE icinga_objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
Dictionary::Ptr fields = dbobj->GetFields(); Query(q1buf.str());
if (!fields)
return;
ObjectLock olock(fields);
String key;
Value value;
BOOST_FOREACH(boost::tie(key, value), fields) {
if (value.IsObjectType<DynamicObject>()) {
DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(value);
if (!dbobjcol)
return;
DbReference dbrefcol = GetReference(dbobjcol);
if (!dbrefcol.IsValid()) {
UpdateObject(dbobjcol, DbObjectCreated);
dbrefcol = GetReference(dbobjcol);
if (!dbrefcol.IsValid())
return;
}
value = static_cast<long>(dbrefcol);
} }
cols->Set(key, value); //Dictionary::Ptr cols = boost::make_shared<Dictionary>();
Dictionary::Ptr fields = dbobj->GetFields();
if (!fields)
return;
String cols;
String values;
ObjectLock olock(fields);
String key;
Value value;
BOOST_FOREACH(boost::tie(key, value), fields) {
if (value.IsObjectType<DynamicObject>()) {
DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(value);
if (!dbobjcol)
return;
DbReference dbrefcol = GetReference(dbobjcol);
if (!dbrefcol.IsValid()) {
UpdateObject(dbobjcol, DbObjectCreated);
dbrefcol = GetReference(dbobjcol);
if (!dbrefcol.IsValid())
return;
}
value = static_cast<long>(dbrefcol);
}
cols += ", " + key;
values += ", '" + Convert::ToString(value) + "'";
}
std::ostringstream q2buf;
q2buf << "DELETE FROM icinga_" << dbobj->GetType()->GetTable() << "s WHERE " << dbobj->GetType()->GetTable() << "_object_id = " << static_cast<long>(dbref);
Query(q2buf.str());
std::ostringstream q3buf;
q3buf << "INSERT INTO icinga_" << dbobj->GetType()->GetTable()
<< "s (instance_id, " << dbobj->GetType()->GetTable() << "_object_id" << cols << ") VALUES ("
<< static_cast<long>(m_InstanceID) << ", " << static_cast<long>(dbref) << values << ")";
Query(q3buf.str());
} }
std::ostringstream q2buf; // TODO: Object and status updates
if (dbref.IsValid()) {
q2buf << "UPDATE icinga_" << dbobj->GetType()->GetTable() << "s SET xxx WHERE id = " << static_cast<long>(dbref);
} else {
q2buf << "INSERT INTO icinga_" << dbobj->GetType()->GetTable() << "s xxx";
}
Log(LogWarning, "ido_mysql", "Query: " + q2buf.str());
} }

View File

@ -22,6 +22,7 @@
#include "base/array.h" #include "base/array.h"
#include "base/dynamictype.h" #include "base/dynamictype.h"
#include "base/timer.h"
#include "ido/dbconnection.h" #include "ido/dbconnection.h"
#include <mysql/mysql.h> #include <mysql/mysql.h>
@ -58,10 +59,14 @@ private:
boost::mutex m_ConnectionMutex; boost::mutex m_ConnectionMutex;
MYSQL m_Connection; MYSQL m_Connection;
Timer::Ptr m_TxTimer;
Array::Ptr Query(const String& query); Array::Ptr Query(const String& query);
DbReference GetInsertId(void); DbReference GetInsertID(void);
String Escape(const String& s); String Escape(const String& s);
Dictionary::Ptr FetchRow(MYSQL_RES *result); Dictionary::Ptr FetchRow(MYSQL_RES *result);
void TxTimerHandler(void);
}; };
} }

View File

@ -20,7 +20,9 @@ libido_la_SOURCES = \
dbtype.h \ dbtype.h \
hostdbobject.cpp \ hostdbobject.cpp \
hostdbobject.h \ hostdbobject.h \
ido-type.cpp ido-type.cpp \
servicedbobject.cpp \
servicedbobject.h
libido_la_CPPFLAGS = \ libido_la_CPPFLAGS = \
$(LTDLINCL) \ $(LTDLINCL) \

View File

@ -27,7 +27,7 @@ DbConnection::DbConnection(const Dictionary::Ptr& serializedUpdate)
: DynamicObject(serializedUpdate) : DynamicObject(serializedUpdate)
{ } { }
void DbConnection::Initialize(void) void DbConnection::Start(void)
{ {
DbObject::OnObjectUpdated.connect(boost::bind(&DbConnection::InternalUpdateObject, this, _1, _2)); DbObject::OnObjectUpdated.connect(boost::bind(&DbConnection::InternalUpdateObject, this, _1, _2));
} }

View File

@ -42,12 +42,13 @@ public:
DbReference GetReference(const DbObject::Ptr& dbobj) const; DbReference GetReference(const DbObject::Ptr& dbobj) const;
protected: protected:
virtual void Start(void);
virtual void UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) = 0; virtual void UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind) = 0;
void UpdateAllObjects(void); void UpdateAllObjects(void);
private: private:
void Initialize(void);
void InternalUpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind); void InternalUpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind);
std::map<DbObject::Ptr, DbReference> m_References; std::map<DbObject::Ptr, DbReference> m_References;

View File

@ -28,7 +28,7 @@ using namespace icinga;
boost::mutex DbType::m_StaticMutex; boost::mutex DbType::m_StaticMutex;
DbType::DbType(const String& name, const String& table, long tid, const DbType::ObjectFactory& factory) DbType::DbType(const String& name, const String& table, long tid, const DbType::ObjectFactory& factory)
: m_Name(name), m_Table(table), m_TypeId(tid), m_ObjectFactory(factory) : m_Name(name), m_Table(table), m_TypeID(tid), m_ObjectFactory(factory)
{ {
static boost::once_flag initializeOnce = BOOST_ONCE_INIT; static boost::once_flag initializeOnce = BOOST_ONCE_INIT;
boost::call_once(initializeOnce, &DbObject::StaticInitialize); boost::call_once(initializeOnce, &DbObject::StaticInitialize);
@ -44,9 +44,9 @@ String DbType::GetTable(void) const
return m_Table; return m_Table;
} }
long DbType::GetTypeId(void) const long DbType::GetTypeID(void) const
{ {
return m_TypeId; return m_TypeID;
} }
void DbType::RegisterType(const DbType::Ptr& type) void DbType::RegisterType(const DbType::Ptr& type)
@ -66,12 +66,12 @@ DbType::Ptr DbType::GetByName(const String& name)
return it->second; return it->second;
} }
DbType::Ptr DbType::GetById(long tid) DbType::Ptr DbType::GetByID(long tid)
{ {
String name; String name;
DbType::Ptr type; DbType::Ptr type;
BOOST_FOREACH(boost::tie(name, type), GetTypes()) { BOOST_FOREACH(boost::tie(name, type), GetTypes()) {
if (type->GetTypeId() == tid) if (type->GetTypeID() == tid)
return type; return type;
} }

View File

@ -45,19 +45,19 @@ public:
String GetName(void) const; String GetName(void) const;
String GetTable(void) const; String GetTable(void) const;
long GetTypeId(void) const; long GetTypeID(void) const;
static void RegisterType(const DbType::Ptr& type); static void RegisterType(const DbType::Ptr& type);
static DbType::Ptr GetByName(const String& name); static DbType::Ptr GetByName(const String& name);
static DbType::Ptr GetById(long tid); static DbType::Ptr GetByID(long tid);
DbObject::Ptr GetOrCreateObjectByName(const String& name1, const String& name2); DbObject::Ptr GetOrCreateObjectByName(const String& name1, const String& name2);
private: private:
String m_Name; String m_Name;
String m_Table; String m_Table;
long m_TypeId; long m_TypeID;
ObjectFactory m_ObjectFactory; ObjectFactory m_ObjectFactory;
static boost::mutex m_StaticMutex; static boost::mutex m_StaticMutex;

View File

@ -0,0 +1,40 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#include "ido/servicedbobject.h"
#include "ido/dbtype.h"
#include "icinga/service.h"
using namespace icinga;
REGISTER_DBTYPE("Service", "service", 2, ServiceDbObject);
ServiceDbObject::ServiceDbObject(const String& name1, const String& name2)
: DbObject(DbType::GetByName("Service"), name1, name2)
{ }
Dictionary::Ptr ServiceDbObject::GetFields(void) const
{
Dictionary::Ptr fields = boost::make_shared<Dictionary>();
Service::Ptr service = static_pointer_cast<Service>(GetObject());
fields->Set("display_name", service->GetDisplayName());
return fields;
}

46
lib/ido/servicedbobject.h Normal file
View File

@ -0,0 +1,46 @@
/******************************************************************************
* Icinga 2 *
* Copyright (C) 2012 Icinga Development Team (http://www.icinga.org/) *
* *
* This program is free software; you can redistribute it and/or *
* modify it under the terms of the GNU General Public License *
* as published by the Free Software Foundation; either version 2 *
* of the License, or (at your option) any later version. *
* *
* This program is distributed in the hope that it will be useful, *
* but WITHOUT ANY WARRANTY; without even the implied warranty of *
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the *
* GNU General Public License for more details. *
* *
* You should have received a copy of the GNU General Public License *
* along with this program; if not, write to the Free Software Foundation *
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301, USA. *
******************************************************************************/
#ifndef SERVICEDBOBJECT_H
#define SERVICEDBOBJECT_H
#include "ido/dbobject.h"
#include "base/dynamicobject.h"
namespace icinga
{
/**
* A Service database object.
*
* @ingroup ido
*/
class ServiceDbObject : public DbObject
{
public:
DECLARE_PTR_TYPEDEFS(ServiceDbObject);
ServiceDbObject(const String& name1, const String& name2);
virtual Dictionary::Ptr GetFields(void) const;
};
}
#endif /* SERVICEDBOBJECT_H */