Fix priority ordering for IDO queries

fixes #10829
refs #8714
This commit is contained in:
Michael Friedrich 2015-12-14 11:36:03 +01:00 committed by Gunnar Beutner
parent 02184ad58f
commit 2bc1d32caa
12 changed files with 140 additions and 83 deletions

View File

@ -81,7 +81,7 @@ void WorkQueue::Enqueue(const boost::function<void (void)>& function, WorkQueueP
m_CVFull.wait(lock); m_CVFull.wait(lock);
} }
m_Tasks.push(Task(function, priority)); m_Tasks.push(Task(function, priority, ++m_NextTaskID));
m_CVEmpty.notify_one(); m_CVEmpty.notify_one();
} }

View File

@ -43,20 +43,31 @@ enum WorkQueuePriority
struct Task struct Task
{ {
Task(void) Task(void)
: Priority(PriorityNormal) : Priority(PriorityNormal), ID(-1)
{ } { }
Task(const boost::function<void (void)>& function, WorkQueuePriority priority) Task(const boost::function<void (void)>& function, WorkQueuePriority priority, int id)
: Function(function), Priority(priority) : Function(function), Priority(priority), ID(id)
{ } { }
boost::function<void (void)> Function; boost::function<void (void)> Function;
WorkQueuePriority Priority; WorkQueuePriority Priority;
int ID;
}; };
inline bool operator<(const Task& a, const Task& b) inline bool operator<(const Task& a, const Task& b)
{ {
return a.Priority < b.Priority; if (a.Priority < b.Priority)
return true;
if (a.Priority == b.Priority) {
if (a.ID > b.ID)
return true;
else
return false;
}
return false;
} }
/** /**
@ -101,6 +112,7 @@ private:
bool m_Stopped; bool m_Stopped;
int m_Processing; int m_Processing;
std::priority_queue<Task, std::deque<Task> > m_Tasks; std::priority_queue<Task, std::deque<Task> > m_Tasks;
int m_NextTaskID;
ExceptionCallback m_ExceptionCallback; ExceptionCallback m_ExceptionCallback;
std::vector<boost::exception_ptr> m_Exceptions; std::vector<boost::exception_ptr> m_Exceptions;
Timer::Ptr m_StatusTimer; Timer::Ptr m_StatusTimer;

View File

@ -61,6 +61,7 @@ void DbConnection::Start(bool runtimeCreated)
ObjectImpl<DbConnection>::Start(runtimeCreated); ObjectImpl<DbConnection>::Start(runtimeCreated);
DbObject::OnQuery.connect(boost::bind(&DbConnection::ExecuteQuery, this, _1)); DbObject::OnQuery.connect(boost::bind(&DbConnection::ExecuteQuery, this, _1));
DbObject::OnMultipleQueries.connect(boost::bind(&DbConnection::ExecuteMultipleQueries, this, _1));
ConfigObject::OnActiveChanged.connect(boost::bind(&DbConnection::UpdateObject, this, _1)); ConfigObject::OnActiveChanged.connect(boost::bind(&DbConnection::UpdateObject, this, _1));
} }
@ -131,6 +132,8 @@ void DbConnection::ProgramStatusHandler(void)
Log(LogNotice, "DbConnection") Log(LogNotice, "DbConnection")
<< "Updating programstatus table."; << "Updating programstatus table.";
std::vector<DbQuery> queries;
DbQuery query1; DbQuery query1;
query1.Table = "programstatus"; query1.Table = "programstatus";
query1.Type = DbQueryDelete; query1.Type = DbQueryDelete;
@ -138,7 +141,7 @@ void DbConnection::ProgramStatusHandler(void)
query1.WhereCriteria = new Dictionary(); query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
query1.Priority = PriorityHigh; query1.Priority = PriorityHigh;
DbObject::OnQuery(query1); queries.push_back(query1);
DbQuery query2; DbQuery query2;
query2.Table = "programstatus"; query2.Table = "programstatus";
@ -165,7 +168,9 @@ void DbConnection::ProgramStatusHandler(void)
query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0)); query2.Fields->Set("flap_detection_enabled", (IcingaApplication::GetInstance()->GetEnableFlapping() ? 1 : 0));
query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0)); query2.Fields->Set("process_performance_data", (IcingaApplication::GetInstance()->GetEnablePerfdata() ? 1 : 0));
query2.Priority = PriorityHigh; query2.Priority = PriorityHigh;
DbObject::OnQuery(query2); queries.push_back(query2);
DbObject::OnMultipleQueries(queries);
DbQuery query3; DbQuery query3;
query3.Table = "runtimevariables"; query3.Table = "runtimevariables";
@ -351,11 +356,6 @@ bool DbConnection::GetStatusUpdate(const DbObject::Ptr& dbobj) const
return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end()); return (m_StatusUpdates.find(dbobj) != m_StatusUpdates.end());
} }
void DbConnection::ExecuteQuery(const DbQuery&)
{
/* Default handler does nothing. */
}
void DbConnection::UpdateObject(const ConfigObject::Ptr& object) void DbConnection::UpdateObject(const ConfigObject::Ptr& object)
{ {
if (!GetConnected()) if (!GetConnected())

View File

@ -83,6 +83,7 @@ protected:
virtual void Pause(void) override; virtual void Pause(void) override;
virtual void ExecuteQuery(const DbQuery& query) = 0; virtual void ExecuteQuery(const DbQuery& query) = 0;
virtual void ExecuteMultipleQueries(const std::vector<DbQuery>&) = 0;
virtual void ActivateObject(const DbObject::Ptr& dbobj) = 0; virtual void ActivateObject(const DbObject::Ptr& dbobj) = 0;
virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0; virtual void DeactivateObject(const DbObject::Ptr& dbobj) = 0;

View File

@ -45,7 +45,7 @@ void DbEvents::StaticInitialize(void)
/* Status */ /* Status */
Comment::OnCommentAdded.connect(boost::bind(&DbEvents::AddComment, _1)); Comment::OnCommentAdded.connect(boost::bind(&DbEvents::AddComment, _1));
Comment::OnCommentRemoved.connect(boost::bind(&DbEvents::RemoveComment, _1)); Comment::OnCommentRemoved.connect(boost::bind(&DbEvents::RemoveComment, _1));
Downtime::OnDowntimeAdded.connect(boost::bind(&DbEvents::AddDowntime, _1, true)); Downtime::OnDowntimeAdded.connect(boost::bind(&DbEvents::AddDowntime, _1));
Downtime::OnDowntimeRemoved.connect(boost::bind(&DbEvents::RemoveDowntime, _1)); Downtime::OnDowntimeRemoved.connect(boost::bind(&DbEvents::RemoveDowntime, _1));
Downtime::OnDowntimeTriggered.connect(boost::bind(&DbEvents::TriggerDowntime, _1)); Downtime::OnDowntimeTriggered.connect(boost::bind(&DbEvents::TriggerDowntime, _1));
Checkable::OnAcknowledgementSet.connect(boost::bind(&DbEvents::AddAcknowledgement, _1, _4)); Checkable::OnAcknowledgementSet.connect(boost::bind(&DbEvents::AddAcknowledgement, _1, _4));
@ -303,30 +303,42 @@ void DbEvents::AddComments(const Checkable::Ptr& checkable)
{ {
std::set<Comment::Ptr> comments = checkable->GetComments(); std::set<Comment::Ptr> comments = checkable->GetComments();
if (!comments.empty()) if (comments.empty())
RemoveComments(checkable); return;
std::vector<DbQuery> queries;
DbQuery query1;
query1.Table = "comments";
query1.Type = DbQueryDelete;
query1.Category = DbCatComment;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
queries.push_back(query1);
BOOST_FOREACH(const Comment::Ptr& comment, comments) { BOOST_FOREACH(const Comment::Ptr& comment, comments) {
AddComment(comment); AddCommentInternal(queries, comment, false);
} }
DbObject::OnMultipleQueries(queries);
} }
void DbEvents::AddComment(const Comment::Ptr& comment) void DbEvents::AddComment(const Comment::Ptr& comment)
{ {
AddCommentInternal(comment, false); std::vector<DbQuery> queries;
RemoveCommentInternal(queries, comment);
AddCommentInternal(queries, comment, false);
DbObject::OnMultipleQueries(queries);
} }
void DbEvents::AddCommentHistory(const Comment::Ptr& comment) void DbEvents::AddCommentHistory(const Comment::Ptr& comment)
{ {
AddCommentInternal(comment, true); std::vector<DbQuery> queries;
AddCommentInternal(queries, comment, true);
DbObject::OnMultipleQueries(queries);
} }
void DbEvents::AddCommentInternal(const Comment::Ptr& comment, bool historical) void DbEvents::AddCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment, bool historical)
{
AddCommentByType(comment, historical);
}
void DbEvents::AddCommentByType(const Comment::Ptr& comment, bool historical)
{ {
Checkable::Ptr checkable = comment->GetCheckable(); Checkable::Ptr checkable = comment->GetCheckable();
@ -376,24 +388,18 @@ void DbEvents::AddCommentByType(const Comment::Ptr& comment, bool historical)
query1.Type = DbQueryInsert; query1.Type = DbQueryInsert;
query1.Category = DbCatComment; query1.Category = DbCatComment;
query1.Fields = fields1; query1.Fields = fields1;
DbObject::OnQuery(query1);
}
void DbEvents::RemoveComments(const Checkable::Ptr& checkable) queries.push_back(query1);
{
Log(LogDebug, "DbEvents")
<< "removing service comments for '" << checkable->GetName() << "'";
DbQuery query1;
query1.Table = "comments";
query1.Type = DbQueryDelete;
query1.Category = DbCatComment;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
DbObject::OnQuery(query1);
} }
void DbEvents::RemoveComment(const Comment::Ptr& comment) void DbEvents::RemoveComment(const Comment::Ptr& comment)
{
std::vector<DbQuery> queries;
RemoveCommentInternal(queries, comment);
DbObject::OnMultipleQueries(queries);
}
void DbEvents::RemoveCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment)
{ {
Checkable::Ptr checkable = comment->GetCheckable(); Checkable::Ptr checkable = comment->GetCheckable();
@ -425,6 +431,7 @@ void DbEvents::RemoveComment(const Comment::Ptr& comment)
query2.WhereCriteria = new Dictionary(); query2.WhereCriteria = new Dictionary();
query2.WhereCriteria->Set("internal_comment_id", comment->GetLegacyId()); query2.WhereCriteria->Set("internal_comment_id", comment->GetLegacyId());
query2.WhereCriteria->Set("object_id", checkable);
query2.WhereCriteria->Set("comment_time", DbValue::FromTimestamp(entry_time)); query2.WhereCriteria->Set("comment_time", DbValue::FromTimestamp(entry_time));
query2.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ query2.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
@ -436,37 +443,42 @@ void DbEvents::AddDowntimes(const Checkable::Ptr& checkable)
{ {
std::set<Downtime::Ptr> downtimes = checkable->GetDowntimes(); std::set<Downtime::Ptr> downtimes = checkable->GetDowntimes();
if (!downtimes.empty()) if (downtimes.empty())
RemoveDowntimes(checkable); return;
std::vector<DbQuery> queries;
DbQuery query1;
query1.Table = "scheduleddowntime";
query1.Type = DbQueryDelete;
query1.Category = DbCatDowntime;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
queries.push_back(query1);
BOOST_FOREACH(const Downtime::Ptr& downtime, downtimes) { BOOST_FOREACH(const Downtime::Ptr& downtime, downtimes) {
AddDowntime(downtime, false); AddDowntimeInternal(queries, downtime, false);
} }
DbObject::OnMultipleQueries(queries);
} }
void DbEvents::AddDowntime(const Downtime::Ptr& downtime, bool remove_existing) void DbEvents::AddDowntime(const Downtime::Ptr& downtime)
{ {
/* std::vector<DbQuery> queries;
* make sure to delete any old downtime to avoid multiple inserts from RemoveDowntimeInternal(queries, downtime);
* configured ScheduledDowntime dumps and CreateNextDowntime() calls AddDowntimeInternal(queries, downtime, false);
*/ DbObject::OnMultipleQueries(queries);
if (remove_existing)
RemoveDowntime(downtime);
AddDowntimeInternal(downtime, false);
} }
void DbEvents::AddDowntimeHistory(const Downtime::Ptr& downtime) void DbEvents::AddDowntimeHistory(const Downtime::Ptr& downtime)
{ {
AddDowntimeInternal(downtime, true); std::vector<DbQuery> queries;
AddDowntimeInternal(queries, downtime, false);
DbObject::OnMultipleQueries(queries);
} }
void DbEvents::AddDowntimeInternal(const Downtime::Ptr& downtime, bool historical) void DbEvents::AddDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime, bool historical)
{
AddDowntimeByType(downtime, historical);
}
void DbEvents::AddDowntimeByType(const Downtime::Ptr& downtime, bool historical)
{ {
Checkable::Ptr checkable = downtime->GetCheckable(); Checkable::Ptr checkable = downtime->GetCheckable();
@ -514,21 +526,18 @@ void DbEvents::AddDowntimeByType(const Downtime::Ptr& downtime, bool historical)
query1.Type = DbQueryInsert; query1.Type = DbQueryInsert;
query1.Category = DbCatDowntime; query1.Category = DbCatDowntime;
query1.Fields = fields1; query1.Fields = fields1;
DbObject::OnQuery(query1);
}
void DbEvents::RemoveDowntimes(const Checkable::Ptr& checkable) queries.push_back(query1);
{
DbQuery query1;
query1.Table = "scheduleddowntime";
query1.Type = DbQueryDelete;
query1.Category = DbCatDowntime;
query1.WhereCriteria = new Dictionary();
query1.WhereCriteria->Set("object_id", checkable);
DbObject::OnQuery(query1);
} }
void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime) void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
{
std::vector<DbQuery> queries;
RemoveDowntimeInternal(queries, downtime);
DbObject::OnMultipleQueries(queries);
}
void DbEvents::RemoveDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime)
{ {
Checkable::Ptr checkable = downtime->GetCheckable(); Checkable::Ptr checkable = downtime->GetCheckable();
@ -541,7 +550,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
query1.WhereCriteria->Set("object_id", checkable); query1.WhereCriteria->Set("object_id", checkable);
query1.WhereCriteria->Set("internal_downtime_id", downtime->GetLegacyId()); query1.WhereCriteria->Set("internal_downtime_id", downtime->GetLegacyId());
query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ query1.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
DbObject::OnQuery(query1); queries.push_back(query1);
/* History - update actual_end_time, was_cancelled for service (and host in case) */ /* History - update actual_end_time, was_cancelled for service (and host in case) */
double now = Utility::GetTime(); double now = Utility::GetTime();
@ -564,7 +573,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
query3.WhereCriteria->Set("entry_time", DbValue::FromTimestamp(downtime->GetEntryTime())); query3.WhereCriteria->Set("entry_time", DbValue::FromTimestamp(downtime->GetEntryTime()));
query3.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ query3.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
DbObject::OnQuery(query3); queries.push_back(query3);
/* host/service status */ /* host/service status */
Host::Ptr host; Host::Ptr host;
@ -577,7 +586,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
else else
query4.Table = "hoststatus"; query4.Table = "hoststatus";
query4.Type = DbQueryInsert | DbQueryUpdate; query4.Type = DbQueryUpdate;
query4.Category = DbCatState; query4.Category = DbCatState;
query4.StatusUpdate = true; query4.StatusUpdate = true;
query4.Object = DbObject::GetOrCreateByObject(checkable); query4.Object = DbObject::GetOrCreateByObject(checkable);
@ -595,7 +604,7 @@ void DbEvents::RemoveDowntime(const Downtime::Ptr& downtime)
query4.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */ query4.WhereCriteria->Set("instance_id", 0); /* DbConnection class fills in real ID */
DbObject::OnQuery(query4); queries.push_back(query4);
} }
void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime) void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime)
@ -660,7 +669,7 @@ void DbEvents::TriggerDowntime(const Downtime::Ptr& downtime)
else else
query4.Table = "hoststatus"; query4.Table = "hoststatus";
query4.Type = DbQueryInsert | DbQueryUpdate; query4.Type = DbQueryUpdate;
query4.Category = DbCatState; query4.Category = DbCatState;
query4.StatusUpdate = true; query4.StatusUpdate = true;
query4.Object = DbObject::GetOrCreateByObject(checkable); query4.Object = DbObject::GetOrCreateByObject(checkable);
@ -754,7 +763,7 @@ void DbEvents::AddAcknowledgementInternal(const Checkable::Ptr& checkable, Ackno
else else
query1.Table = "hoststatus"; query1.Table = "hoststatus";
query1.Type = DbQueryInsert | DbQueryUpdate; query1.Type = DbQueryUpdate;
query1.Category = DbCatState; query1.Category = DbCatState;
query1.StatusUpdate = true; query1.StatusUpdate = true;
query1.Object = DbObject::GetOrCreateByObject(checkable); query1.Object = DbObject::GetOrCreateByObject(checkable);

View File

@ -61,11 +61,8 @@ class DbEvents
public: public:
static void StaticInitialize(void); static void StaticInitialize(void);
static void AddCommentByType(const Comment::Ptr& comment, bool historical);
static void AddComments(const Checkable::Ptr& checkable); static void AddComments(const Checkable::Ptr& checkable);
static void RemoveComments(const Checkable::Ptr& checkable);
static void AddDowntimeByType(const Downtime::Ptr& downtime, bool historical);
static void AddDowntimes(const Checkable::Ptr& checkable); static void AddDowntimes(const Checkable::Ptr& checkable);
static void RemoveDowntimes(const Checkable::Ptr& checkable); static void RemoveDowntimes(const Checkable::Ptr& checkable);
@ -85,7 +82,7 @@ public:
static void AddComment(const Comment::Ptr& comment); static void AddComment(const Comment::Ptr& comment);
static void RemoveComment(const Comment::Ptr& comment); static void RemoveComment(const Comment::Ptr& comment);
static void AddDowntime(const Downtime::Ptr& downtime, bool remove_existing); static void AddDowntime(const Downtime::Ptr& downtime);
static void RemoveDowntime(const Downtime::Ptr& downtime); static void RemoveDowntime(const Downtime::Ptr& downtime);
static void TriggerDowntime(const Downtime::Ptr& downtime); static void TriggerDowntime(const Downtime::Ptr& downtime);
@ -130,8 +127,10 @@ public:
private: private:
DbEvents(void); DbEvents(void);
static void AddCommentInternal(const Comment::Ptr& comment, bool historical); static void AddCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment, bool historical);
static void AddDowntimeInternal(const Downtime::Ptr& downtime, bool historical); static void RemoveCommentInternal(std::vector<DbQuery>& queries, const Comment::Ptr& comment);
static void AddDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime, bool historical);
static void RemoveDowntimeInternal(std::vector<DbQuery>& queries, const Downtime::Ptr& downtime);
static void EnableChangedHandlerInternal(const Checkable::Ptr& checkable, const String& fieldName, bool enabled); static void EnableChangedHandlerInternal(const Checkable::Ptr& checkable, const String& fieldName, bool enabled);
}; };

View File

@ -37,6 +37,7 @@
using namespace icinga; using namespace icinga;
boost::signals2::signal<void (const DbQuery&)> DbObject::OnQuery; boost::signals2::signal<void (const DbQuery&)> DbObject::OnQuery;
boost::signals2::signal<void (const std::vector<DbQuery>&)> DbObject::OnMultipleQueries;
INITIALIZE_ONCE(&DbObject::StaticInitialize); INITIALIZE_ONCE(&DbObject::StaticInitialize);

View File

@ -79,6 +79,7 @@ public:
static DbObject::Ptr GetOrCreateByObject(const ConfigObject::Ptr& object); static DbObject::Ptr GetOrCreateByObject(const ConfigObject::Ptr& object);
static boost::signals2::signal<void (const DbQuery&)> OnQuery; static boost::signals2::signal<void (const DbQuery&)> OnQuery;
static boost::signals2::signal<void (const std::vector<DbQuery>&)> OnMultipleQueries;
void SendConfigUpdate(void); void SendConfigUpdate(void);
void SendStatusUpdate(void); void SendStatusUpdate(void);

View File

@ -748,9 +748,21 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
boost::mutex::scoped_lock lock(m_QueryQueueMutex);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true); m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
} }
void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
{
boost::mutex::scoped_lock lock(m_QueryQueueMutex);
BOOST_FOREACH(const DbQuery& query, queries) {
ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
}
}
void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -839,8 +851,10 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
if (kv.second.IsEmpty() && !kv.second.IsString()) if (kv.second.IsEmpty() && !kv.second.IsString())
continue; continue;
if (!FieldToEscapedString(kv.first, kv.second, &value)) if (!FieldToEscapedString(kv.first, kv.second, &value)) {
m_QueryQueue.Enqueue(boost::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
return; return;
}
if (type == DbQueryInsert) { if (type == DbQueryInsert) {
if (!first) { if (!first) {

View File

@ -63,6 +63,7 @@ protected:
virtual void ActivateObject(const DbObject::Ptr& dbobj) override; virtual void ActivateObject(const DbObject::Ptr& dbobj) override;
virtual void DeactivateObject(const DbObject::Ptr& dbobj) override; virtual void DeactivateObject(const DbObject::Ptr& dbobj) override;
virtual void ExecuteQuery(const DbQuery& query) override; virtual void ExecuteQuery(const DbQuery& query) override;
virtual void ExecuteMultipleQueries(const std::vector<DbQuery>& queries);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) override; virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) override;
virtual void FillIDCache(const DbType::Ptr& type) override; virtual void FillIDCache(const DbType::Ptr& type) override;
virtual void NewTransaction(void) override; virtual void NewTransaction(void) override;
@ -72,6 +73,7 @@ private:
int m_SessionToken; int m_SessionToken;
WorkQueue m_QueryQueue; WorkQueue m_QueryQueue;
boost::mutex m_QueryQueueMutex;
MYSQL m_Connection; MYSQL m_Connection;
int m_AffectedRows; int m_AffectedRows;

View File

@ -630,9 +630,21 @@ void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
{ {
ASSERT(query.Category != DbCatInvalid); ASSERT(query.Category != DbCatInvalid);
boost::mutex::scoped_lock lock(m_QueryQueueMutex);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true); m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
} }
void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
{
boost::mutex::scoped_lock lock(m_QueryQueueMutex);
BOOST_FOREACH(const DbQuery& query, queries) {
ASSERT(query.Category != DbCatInvalid);
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority, true);
}
}
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride) void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType *typeOverride)
{ {
AssertOnWorkQueue(); AssertOnWorkQueue();
@ -657,8 +669,10 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
bool first = true; bool first = true;
BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) { BOOST_FOREACH(const Dictionary::Pair& kv, query.WhereCriteria) {
if (!FieldToEscapedString(kv.first, kv.second, &value)) if (!FieldToEscapedString(kv.first, kv.second, &value)) {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
return; return;
}
if (!first) if (!first)
where << " AND "; where << " AND ";
@ -718,8 +732,10 @@ void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, DbQueryType
if (kv.second.IsEmpty() && !kv.second.IsString()) if (kv.second.IsEmpty() && !kv.second.IsString())
continue; continue;
if (!FieldToEscapedString(kv.first, kv.second, &value)) if (!FieldToEscapedString(kv.first, kv.second, &value)) {
m_QueryQueue.Enqueue(boost::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, (DbQueryType *)NULL), query.Priority);
return; return;
}
if (type == DbQueryInsert) { if (type == DbQueryInsert) {
if (!first) { if (!first) {

View File

@ -55,6 +55,7 @@ protected:
virtual void ActivateObject(const DbObject::Ptr& dbobj) override; virtual void ActivateObject(const DbObject::Ptr& dbobj) override;
virtual void DeactivateObject(const DbObject::Ptr& dbobj) override; virtual void DeactivateObject(const DbObject::Ptr& dbobj) override;
virtual void ExecuteQuery(const DbQuery& query) override; virtual void ExecuteQuery(const DbQuery& query) override;
virtual void ExecuteMultipleQueries(const std::vector<DbQuery>& queries);
virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) override; virtual void CleanUpExecuteQuery(const String& table, const String& time_key, double time_value) override;
virtual void FillIDCache(const DbType::Ptr& type) override; virtual void FillIDCache(const DbType::Ptr& type) override;
virtual void NewTransaction(void) override; virtual void NewTransaction(void) override;
@ -64,6 +65,7 @@ private:
int m_SessionToken; int m_SessionToken;
WorkQueue m_QueryQueue; WorkQueue m_QueryQueue;
boost::mutex m_QueryQueueMutex;
PGconn *m_Connection; PGconn *m_Connection;
int m_AffectedRows; int m_AffectedRows;