Merge branch 'feature/fix-ido-queries-5235' into next

Fixes #5235
This commit is contained in:
Gunnar Beutner 2013-11-28 12:33:25 +01:00
commit 31c70fe5ae
5 changed files with 44 additions and 31 deletions

View File

@ -99,7 +99,7 @@ void IdoMysqlConnection::Disconnect(void)
void IdoMysqlConnection::TxTimerHandler(void) void IdoMysqlConnection::TxTimerHandler(void)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::NewTransaction, this), true);
} }
void IdoMysqlConnection::NewTransaction(void) void IdoMysqlConnection::NewTransaction(void)
@ -280,7 +280,7 @@ Array::Ptr IdoMysqlConnection::Query(const String& query)
<< errinfo_database_query(query) << errinfo_database_query(query)
); );
MYSQL_RES *result = mysql_store_result(&m_Connection); MYSQL_RES *result = mysql_use_result(&m_Connection);
if (!result) { if (!result) {
if (mysql_field_count(&m_Connection) > 0) if (mysql_field_count(&m_Connection) > 0)
@ -465,7 +465,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query), true);
} }
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query) void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
@ -595,7 +595,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age)); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
} }
void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoMysqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)

View File

@ -99,7 +99,7 @@ void IdoPgsqlConnection::Disconnect(void)
void IdoPgsqlConnection::TxTimerHandler(void) void IdoPgsqlConnection::TxTimerHandler(void)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::NewTransaction, this), true);
} }
void IdoPgsqlConnection::NewTransaction(void) void IdoPgsqlConnection::NewTransaction(void)
@ -481,7 +481,7 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query), true);
} }
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query) void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
@ -616,7 +616,7 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query)
void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
{ {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age)); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), true);
} }
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age) void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)

View File

@ -21,6 +21,7 @@
#include "base/utility.h" #include "base/utility.h"
#include "base/debug.h" #include "base/debug.h"
#include "base/logger_fwd.h" #include "base/logger_fwd.h"
#include "base/convert.h"
#include <boost/bind.hpp> #include <boost/bind.hpp>
using namespace icinga; using namespace icinga;
@ -29,7 +30,8 @@ int WorkQueue::m_NextID = 1;
WorkQueue::WorkQueue(size_t maxItems) WorkQueue::WorkQueue(size_t maxItems)
: m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false), : m_ID(m_NextID++), m_MaxItems(maxItems), m_Joined(false),
m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback) m_Stopped(false), m_ExceptionCallback(WorkQueue::DefaultExceptionCallback),
m_LastStatus(0)
{ {
m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this)); m_Thread = boost::thread(boost::bind(&WorkQueue::WorkerThreadProc, this));
} }
@ -45,7 +47,7 @@ WorkQueue::~WorkQueue(void)
* Enqueues a work item. Work items are guaranteed to be executed in the order * Enqueues a work item. Work items are guaranteed to be executed in the order
* they were enqueued in. * they were enqueued in.
*/ */
void WorkQueue::Enqueue(const WorkCallback& item) void WorkQueue::Enqueue(const WorkCallback& callback, bool allowInterleaved)
{ {
boost::mutex::scoped_lock lock(m_Mutex); boost::mutex::scoped_lock lock(m_Mutex);
@ -58,10 +60,14 @@ void WorkQueue::Enqueue(const WorkCallback& item)
m_CV.wait(lock); m_CV.wait(lock);
} }
WorkItem item;
item.Callback = callback;
item.AllowInterleaved = allowInterleaved;
m_Items.push_back(item); m_Items.push_back(item);
if (wq_thread) if (wq_thread)
ProcessItems(lock); ProcessItems(lock, true);
else else
m_CV.notify_all(); m_CV.notify_all();
} }
@ -93,16 +99,27 @@ void WorkQueue::DefaultExceptionCallback(boost::exception_ptr exp)
throw; throw;
} }
void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock) void WorkQueue::ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved)
{ {
while (!m_Items.empty()) { while (!m_Items.empty()) {
try { try {
WorkCallback wi = m_Items.front(); WorkItem wi = m_Items.front();
if (interleaved && !wi.AllowInterleaved)
return;
m_Items.pop_front(); m_Items.pop_front();
m_CV.notify_all(); m_CV.notify_all();
double now = Utility::GetTime();
if (m_LastStatus + 10 < now) {
Log(LogInformation, "base", "WQ items: " + Convert::ToString(m_Items.size()));
m_LastStatus = now;
}
lock.unlock(); lock.unlock();
wi(); wi.Callback();
} catch (const std::exception& ex) { } catch (const std::exception& ex) {
lock.lock(); lock.lock();
@ -132,7 +149,7 @@ void WorkQueue::WorkerThreadProc(void)
if (m_Joined) if (m_Joined)
break; break;
ProcessItems(lock); ProcessItems(lock, false);
} }
m_Stopped = true; m_Stopped = true;

View File

@ -31,6 +31,15 @@
namespace icinga namespace icinga
{ {
typedef boost::function<void (void)> WorkCallback;
struct WorkItem
{
WorkCallback Callback;
bool AllowInterleaved;
};
/** /**
* A workqueue. * A workqueue.
* *
@ -39,13 +48,12 @@ namespace icinga
class I2_BASE_API WorkQueue class I2_BASE_API WorkQueue
{ {
public: public:
typedef boost::function<void (void)> WorkCallback;
typedef boost::function<void (boost::exception_ptr)> ExceptionCallback; typedef boost::function<void (boost::exception_ptr)> ExceptionCallback;
WorkQueue(size_t maxItems = 25000); WorkQueue(size_t maxItems = 25000);
~WorkQueue(void); ~WorkQueue(void);
void Enqueue(const WorkCallback& item); void Enqueue(const WorkCallback& callback, bool allowInterleaved = false);
void Join(void); void Join(void);
boost::thread::id GetThreadId(void) const; boost::thread::id GetThreadId(void) const;
@ -62,10 +70,11 @@ private:
size_t m_MaxItems; size_t m_MaxItems;
bool m_Joined; bool m_Joined;
bool m_Stopped; bool m_Stopped;
std::deque<WorkCallback> m_Items; std::deque<WorkItem> m_Items;
ExceptionCallback m_ExceptionCallback; ExceptionCallback m_ExceptionCallback;
double m_LastStatus;
void ProcessItems(boost::mutex::scoped_lock& lock); void ProcessItems(boost::mutex::scoped_lock& lock, bool interleaved);
void WorkerThreadProc(void); void WorkerThreadProc(void);
static void DefaultExceptionCallback(boost::exception_ptr exp); static void DefaultExceptionCallback(boost::exception_ptr exp);

View File

@ -303,19 +303,6 @@ void ServiceDbObject::OnConfigUpdate(void)
/* update comments and downtimes on config change */ /* update comments and downtimes on config change */
AddComments(service); AddComments(service);
AddDowntimes(service); AddDowntimes(service);
/* service host config update */
Host::Ptr host = service->GetHost();
if (host->GetCheckService() != service)
return;
DbObject::Ptr dbobj = GetOrCreateByObject(host);
if (!dbobj)
return;
dbobj->SendConfigUpdate();
} }
void ServiceDbObject::OnStatusUpdate(void) void ServiceDbObject::OnStatusUpdate(void)