mirror of https://github.com/Icinga/icinga2.git
parent
eafdb48ffe
commit
2617521ce3
|
@ -30,7 +30,12 @@ if(MYSQL_FOUND)
|
|||
set_target_properties (
|
||||
db_ido_mysql PROPERTIES
|
||||
INSTALL_RPATH ${CMAKE_INSTALL_FULL_LIBDIR}/icinga2
|
||||
FOLDER Components
|
||||
)
|
||||
|
||||
install(TARGETS db_ido_mysql RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR}/icinga2)
|
||||
install(
|
||||
TARGETS db_ido_mysql
|
||||
RUNTIME DESTINATION ${CMAKE_INSTALL_SBINDIR}
|
||||
LIBRARY DESTINATION ${CMAKE_INSTALL_FULL_LIBDIR}/icinga2
|
||||
)
|
||||
endif()
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "base/utility.h"
|
||||
#include "base/application.h"
|
||||
#include "base/dynamictype.h"
|
||||
#include "base/exception.h"
|
||||
#include "db_ido/dbtype.h"
|
||||
#include "db_ido/dbvalue.h"
|
||||
#include "db_ido_mysql/idomysqlconnection.h"
|
||||
|
@ -43,6 +44,8 @@ void IdoMysqlConnection::Start(void)
|
|||
|
||||
m_Connected = false;
|
||||
|
||||
m_QueryQueue.SetExceptionCallback(&IdoMysqlConnection::ExceptionHandler);
|
||||
|
||||
m_TxTimer = boost::make_shared<Timer>();
|
||||
m_TxTimer->SetInterval(5);
|
||||
m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoMysqlConnection::TxTimerHandler, this));
|
||||
|
@ -63,6 +66,19 @@ void IdoMysqlConnection::Stop(void)
|
|||
m_QueryQueue.Join();
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
||||
{
|
||||
Log(LogCritical, "db_ido_mysql", "Exception during database operation: " + boost::diagnostic_information(exp));
|
||||
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
if (m_Connected) {
|
||||
mysql_close(&m_Connection);
|
||||
|
||||
m_Connected = false;
|
||||
}
|
||||
}
|
||||
|
||||
void IdoMysqlConnection::AssertOnWorkQueue(void)
|
||||
{
|
||||
ASSERT(boost::this_thread::get_id() == m_QueryQueue.GetThreadId());
|
||||
|
@ -250,13 +266,21 @@ Array::Ptr IdoMysqlConnection::Query(const String& query)
|
|||
Log(LogDebug, "db_ido_mysql", "Query: " + query);
|
||||
|
||||
if (mysql_query(&m_Connection, query.CStr()) != 0)
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
|
||||
BOOST_THROW_EXCEPTION(
|
||||
database_error()
|
||||
<< errinfo_message(mysql_error(&m_Connection))
|
||||
<< errinfo_database_query(query)
|
||||
);
|
||||
|
||||
MYSQL_RES *result = mysql_store_result(&m_Connection);
|
||||
|
||||
if (!result) {
|
||||
if (mysql_field_count(&m_Connection) > 0)
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(mysql_error(&m_Connection)));
|
||||
BOOST_THROW_EXCEPTION(
|
||||
database_error()
|
||||
<< errinfo_message(mysql_error(&m_Connection))
|
||||
<< errinfo_database_query(query)
|
||||
);
|
||||
|
||||
return Array::Ptr();
|
||||
}
|
||||
|
@ -437,6 +461,8 @@ bool IdoMysqlConnection::FieldToEscapedString(const String& key, const Value& va
|
|||
|
||||
void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
|
||||
{
|
||||
ASSERT(query.Category != DbCatInvalid);
|
||||
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query));
|
||||
}
|
||||
|
||||
|
@ -444,8 +470,6 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
|
|||
{
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
ASSERT(query.Category != DbCatInvalid);
|
||||
|
||||
if ((query.Category & GetCategories()) == 0)
|
||||
return;
|
||||
|
||||
|
|
|
@ -39,8 +39,6 @@ class IdoMysqlConnection : public ObjectImpl<IdoMysqlConnection>
|
|||
public:
|
||||
DECLARE_PTR_TYPEDEFS(IdoMysqlConnection);
|
||||
|
||||
//virtual void UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind);
|
||||
|
||||
protected:
|
||||
virtual void Start(void);
|
||||
virtual void Stop(void);
|
||||
|
@ -48,11 +46,11 @@ protected:
|
|||
virtual void ActivateObject(const DbObject::Ptr& dbobj);
|
||||
virtual void DeactivateObject(const DbObject::Ptr& dbobj);
|
||||
virtual void ExecuteQuery(const DbQuery& query);
|
||||
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
|
||||
private:
|
||||
DbReference m_InstanceID;
|
||||
DbReference m_LastNotificationID;
|
||||
DbReference m_LastNotificationID;
|
||||
|
||||
WorkQueue m_QueryQueue;
|
||||
|
||||
|
@ -81,10 +79,12 @@ private:
|
|||
void ReconnectTimerHandler(void);
|
||||
|
||||
void InternalExecuteQuery(const DbQuery& query);
|
||||
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
|
||||
void ClearConfigTables(void);
|
||||
void ClearConfigTable(const String& table);
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ if(PostgreSQL_FOUND)
|
|||
set_target_properties (
|
||||
db_ido_pgsql PROPERTIES
|
||||
INSTALL_RPATH ${CMAKE_INSTALL_FULL_LIBDIR}/icinga2
|
||||
FOLDER Components
|
||||
)
|
||||
|
||||
install(
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "base/utility.h"
|
||||
#include "base/application.h"
|
||||
#include "base/dynamictype.h"
|
||||
#include "base/exception.h"
|
||||
#include "db_ido/dbtype.h"
|
||||
#include "db_ido/dbvalue.h"
|
||||
#include "db_ido_pgsql/idopgsqlconnection.h"
|
||||
|
@ -43,6 +44,8 @@ void IdoPgsqlConnection::Start(void)
|
|||
|
||||
m_Connection = NULL;
|
||||
|
||||
m_QueryQueue.SetExceptionCallback(&IdoPgsqlConnection::ExceptionHandler);
|
||||
|
||||
m_TxTimer = boost::make_shared<Timer>();
|
||||
m_TxTimer->SetInterval(5);
|
||||
m_TxTimer->OnTimerExpired.connect(boost::bind(&IdoPgsqlConnection::TxTimerHandler, this));
|
||||
|
@ -63,6 +66,18 @@ void IdoPgsqlConnection::Stop(void)
|
|||
m_QueryQueue.Join();
|
||||
}
|
||||
|
||||
void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
||||
{
|
||||
Log(LogCritical, "db_ido_pgsql", "Exception during database operation: " + boost::diagnostic_information(exp));
|
||||
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
if (m_Connection) {
|
||||
PQfinish(m_Connection);
|
||||
m_Connection = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void IdoPgsqlConnection::AssertOnWorkQueue(void)
|
||||
{
|
||||
ASSERT(boost::this_thread::get_id() == m_QueryQueue.GetThreadId());
|
||||
|
@ -256,7 +271,10 @@ Array::Ptr IdoPgsqlConnection::Query(const String& query)
|
|||
PGresult *result = PQexec(m_Connection, query.CStr());
|
||||
|
||||
if (!result)
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error("unknown error during pgSQL query"));
|
||||
BOOST_THROW_EXCEPTION(
|
||||
database_error()
|
||||
<< errinfo_database_query(query)
|
||||
);
|
||||
|
||||
if (PQresultStatus(result) == PGRES_COMMAND_OK)
|
||||
return Array::Ptr();
|
||||
|
@ -265,7 +283,11 @@ Array::Ptr IdoPgsqlConnection::Query(const String& query)
|
|||
String message = PQresultErrorMessage(result);
|
||||
PQclear(result);
|
||||
|
||||
BOOST_THROW_EXCEPTION(std::runtime_error(message));
|
||||
BOOST_THROW_EXCEPTION(
|
||||
database_error()
|
||||
<< errinfo_message(message)
|
||||
<< errinfo_database_query(query)
|
||||
);
|
||||
}
|
||||
|
||||
Array::Ptr rows = boost::make_shared<Array>();
|
||||
|
@ -298,6 +320,10 @@ DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const Stri
|
|||
|
||||
Dictionary::Ptr row = rows->Get(0);
|
||||
|
||||
std::ostringstream msgbuf;
|
||||
msgbuf << "Sequence Value: " << row->Get("id");
|
||||
Log(LogDebug, "db_ido_pgsql", msgbuf.str());
|
||||
|
||||
return DbReference(Convert::ToLong(row->Get("id")));
|
||||
}
|
||||
|
||||
|
@ -305,7 +331,7 @@ String IdoPgsqlConnection::Escape(const String& s)
|
|||
{
|
||||
AssertOnWorkQueue();
|
||||
|
||||
ssize_t length = s.GetLength();
|
||||
size_t length = s.GetLength();
|
||||
char *to = new char[s.GetLength() * 2 + 1];
|
||||
|
||||
PQescapeStringConn(m_Connection, to, s.CStr(), length, NULL);
|
||||
|
@ -444,6 +470,8 @@ bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& va
|
|||
|
||||
void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
|
||||
{
|
||||
ASSERT(query.Category != DbCatInvalid);
|
||||
|
||||
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query));
|
||||
}
|
||||
|
||||
|
@ -451,8 +479,6 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
|
|||
{
|
||||
boost::mutex::scoped_lock lock(m_ConnectionMutex);
|
||||
|
||||
ASSERT(query.Category != DbCatInvalid);
|
||||
|
||||
if ((query.Category & GetCategories()) == 0)
|
||||
return;
|
||||
|
||||
|
|
|
@ -39,8 +39,6 @@ class IdoPgsqlConnection : public ObjectImpl<IdoPgsqlConnection>
|
|||
public:
|
||||
DECLARE_PTR_TYPEDEFS(IdoPgsqlConnection);
|
||||
|
||||
//virtual void UpdateObject(const DbObject::Ptr& dbobj, DbUpdateType kind);
|
||||
|
||||
protected:
|
||||
virtual void Start(void);
|
||||
virtual void Stop(void);
|
||||
|
@ -52,7 +50,7 @@ protected:
|
|||
|
||||
private:
|
||||
DbReference m_InstanceID;
|
||||
DbReference m_LastNotificationID;
|
||||
DbReference m_LastNotificationID;
|
||||
|
||||
WorkQueue m_QueryQueue;
|
||||
|
||||
|
@ -80,10 +78,12 @@ private:
|
|||
void ReconnectTimerHandler(void);
|
||||
|
||||
void InternalExecuteQuery(const DbQuery& query);
|
||||
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
void InternalCleanUpExecuteQuery(const String& table, const String& time_key, double time_value);
|
||||
|
||||
void ClearConfigTables(void);
|
||||
void ClearConfigTable(const String& table);
|
||||
|
||||
void ExceptionHandler(boost::exception_ptr exp);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -94,6 +94,9 @@ inline std::string to_string(const errinfo_getaddrinfo_error& e)
|
|||
return gai_strerror(e.value());
|
||||
}
|
||||
|
||||
struct errinfo_message_;
|
||||
typedef boost::error_info<struct errinfo_message_, std::string> errinfo_message;
|
||||
|
||||
}
|
||||
|
||||
#endif /* EXCEPTION_H */
|
||||
|
|
|
@ -29,7 +29,8 @@ using namespace icinga;
|
|||
int WorkQueue::m_NextID = 1;
|
||||
|
||||
WorkQueue::WorkQueue(size_t maxItems)
|
||||
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), m_Stopped(false)
|
||||
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false),
|
||||
m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback)
|
||||
{
|
||||
m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
|
||||
}
|
||||
|
@ -73,6 +74,18 @@ boost::thread::id WorkQueue::GetThreadId(void) const
|
|||
return m_Thread.get_id();
|
||||
}
|
||||
|
||||
void WorkQueue::SetExceptionCallback(const ExceptionCallback& callback)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
||||
m_ExceptionCallback = callback;
|
||||
}
|
||||
|
||||
void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
|
||||
void WorkQueue::WorkerThreadProc(void)
|
||||
{
|
||||
boost::mutex::scoped_lock lock(m_Mutex);
|
||||
|
@ -96,13 +109,13 @@ void WorkQueue::WorkerThreadProc(void)
|
|||
lock.unlock();
|
||||
wi();
|
||||
} catch (const std::exception& ex) {
|
||||
std::ostringstream msgbuf;
|
||||
msgbuf << "Exception thrown in workqueue handler: " << std::endl
|
||||
<< boost::diagnostic_information(ex);
|
||||
lock.lock();
|
||||
|
||||
Log(LogCritical, "base", msgbuf.str());
|
||||
} catch (...) {
|
||||
Log(LogCritical, "base", "Exception of unknown type thrown in workqueue handler.");
|
||||
ExceptionCallback callback = m_ExceptionCallback;
|
||||
|
||||
lock.unlock();
|
||||
|
||||
callback(boost::current_exception());
|
||||
}
|
||||
|
||||
lock.lock();
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include <boost/thread/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>
|
||||
#include <boost/exception_ptr.hpp>
|
||||
|
||||
namespace icinga
|
||||
{
|
||||
|
@ -39,6 +40,7 @@ class I2_BASE_API WorkQueue
|
|||
{
|
||||
public:
|
||||
typedef boost::function<void (void)> WorkCallback;
|
||||
typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
|
||||
|
||||
WorkQueue(size_t maxItems = 25000);
|
||||
~WorkQueue(void);
|
||||
|
@ -48,6 +50,8 @@ public:
|
|||
|
||||
boost::thread::id GetThreadId(void) const;
|
||||
|
||||
void SetExceptionCallback(const ExceptionCallback& callback);
|
||||
|
||||
private:
|
||||
int m_ID;
|
||||
static int m_NextID;
|
||||
|
@ -59,8 +63,11 @@ private:
|
|||
bool m_Joined;
|
||||
bool m_Stopped;
|
||||
std::deque<WorkCallback> m_Items;
|
||||
ExceptionCallback m_ExceptionCallback;
|
||||
|
||||
void WorkerThreadProc(void);
|
||||
|
||||
static void DefaultExceptionCallback(boost::exception_ptr exp);
|
||||
};
|
||||
|
||||
}
|
||||
|
|
|
@ -79,6 +79,11 @@ private:
|
|||
static void ProgramStatusHandler(void);
|
||||
};
|
||||
|
||||
struct database_error : virtual std::exception, virtual boost::exception { };
|
||||
|
||||
struct errinfo_database_query_;
|
||||
typedef boost::error_info<struct errinfo_database_query_, std::string> errinfo_database_query;
|
||||
|
||||
}
|
||||
|
||||
#endif /* DBCONNECTION_H */
|
||||
|
|
|
@ -78,7 +78,7 @@ void DbObject::SendConfigUpdate(void)
|
|||
DbQuery query;
|
||||
query.Table = GetType()->GetTable() + "s";
|
||||
query.Type = DbQueryInsert | DbQueryUpdate;
|
||||
query.Type = DbCatConfig;
|
||||
query.Category = DbCatConfig;
|
||||
query.Fields = fields;
|
||||
query.Fields->Set(GetType()->GetIDColumn(), GetObject());
|
||||
query.Fields->Set("instance_id", 0); /* DbConnection class fills in real ID */
|
||||
|
|
Loading…
Reference in New Issue