mirror of
https://github.com/Icinga/icinga2.git
synced 2025-09-24 10:17:59 +02:00
WIP
This commit is contained in:
parent
2483f7570b
commit
078ddd9e8e
@ -76,6 +76,11 @@ void DbConnection::Resume()
|
|||||||
m_CleanUpTimer->SetInterval(60);
|
m_CleanUpTimer->SetInterval(60);
|
||||||
m_CleanUpTimer->OnTimerExpired.connect(std::bind(&DbConnection::CleanUpHandler, this));
|
m_CleanUpTimer->OnTimerExpired.connect(std::bind(&DbConnection::CleanUpHandler, this));
|
||||||
m_CleanUpTimer->Start();
|
m_CleanUpTimer->Start();
|
||||||
|
|
||||||
|
m_LogStatsTimer = new Timer();
|
||||||
|
m_LogStatsTimer->SetInterval(10);
|
||||||
|
m_LogStatsTimer->OnTimerExpired.connect([this](const Timer * const&) { LogStatsHandler(); });
|
||||||
|
m_LogStatsTimer->Start();
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbConnection::Pause()
|
void DbConnection::Pause()
|
||||||
@ -236,6 +241,36 @@ void DbConnection::CleanUpHandler()
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DbConnection::LogStatsHandler()
|
||||||
|
{
|
||||||
|
auto pending = m_PendingQueries.load();
|
||||||
|
|
||||||
|
if (pending == 0u) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto now = Utility::GetTime();
|
||||||
|
auto input = m_InputQueries.CalculateRate(now, 60);
|
||||||
|
auto output = m_OutputQueries.CalculateRate(now, 60);
|
||||||
|
String timeInfo;
|
||||||
|
|
||||||
|
{
|
||||||
|
auto rate = output - input;
|
||||||
|
|
||||||
|
if (pending < rate * 5) {
|
||||||
|
timeInfo = " empty in ";
|
||||||
|
if (rate <= 0)
|
||||||
|
timeInfo += "infinite time, your task handler isn't able to keep up";
|
||||||
|
else
|
||||||
|
timeInfo += Utility::FormatDuration(pending / rate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Log(LogInformation, GetReflectionType()->GetName())
|
||||||
|
<< "Pending queries: " << pending << " (Input: " << input
|
||||||
|
<< "/s; Output: " << output << "/s)" << timeInfo;
|
||||||
|
}
|
||||||
|
|
||||||
void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
|
void DbConnection::CleanUpExecuteQuery(const String&, const String&, double)
|
||||||
{
|
{
|
||||||
/* Default handler does nothing. */
|
/* Default handler does nothing. */
|
||||||
@ -507,3 +542,18 @@ int DbConnection::GetSessionToken()
|
|||||||
{
|
{
|
||||||
return Application::GetStartTime();
|
return Application::GetStartTime();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DbConnection::IncreasePendingQueries(int count)
|
||||||
|
{
|
||||||
|
m_PendingQueries.fetch_add(count);
|
||||||
|
m_InputQueries.InsertValue(Utility::GetTime(), count);
|
||||||
|
}
|
||||||
|
|
||||||
|
void DbConnection::DecreasePendingQueries(int count, bool increaseOutputRate)
|
||||||
|
{
|
||||||
|
m_PendingQueries.fetch_sub(count);
|
||||||
|
|
||||||
|
if (increaseOutputRate) {
|
||||||
|
m_OutputQueries.InsertValue(Utility::GetTime(), count);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -92,6 +92,9 @@ protected:
|
|||||||
|
|
||||||
static int GetSessionToken();
|
static int GetSessionToken();
|
||||||
|
|
||||||
|
void IncreasePendingQueries(int count);
|
||||||
|
void DecreasePendingQueries(int count, bool increaseOutputRate = true);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
bool m_IDCacheValid{false};
|
bool m_IDCacheValid{false};
|
||||||
std::map<std::pair<DbType::Ptr, DbReference>, String> m_ConfigHashes;
|
std::map<std::pair<DbType::Ptr, DbReference>, String> m_ConfigHashes;
|
||||||
@ -101,8 +104,10 @@ private:
|
|||||||
std::set<DbObject::Ptr> m_ConfigUpdates;
|
std::set<DbObject::Ptr> m_ConfigUpdates;
|
||||||
std::set<DbObject::Ptr> m_StatusUpdates;
|
std::set<DbObject::Ptr> m_StatusUpdates;
|
||||||
Timer::Ptr m_CleanUpTimer;
|
Timer::Ptr m_CleanUpTimer;
|
||||||
|
Timer::Ptr m_LogStatsTimer;
|
||||||
|
|
||||||
void CleanUpHandler();
|
void CleanUpHandler();
|
||||||
|
void LogStatsHandler();
|
||||||
|
|
||||||
static Timer::Ptr m_ProgramStatusTimer;
|
static Timer::Ptr m_ProgramStatusTimer;
|
||||||
static boost::once_flag m_OnceFlag;
|
static boost::once_flag m_OnceFlag;
|
||||||
@ -112,6 +117,10 @@ private:
|
|||||||
mutable boost::mutex m_StatsMutex;
|
mutable boost::mutex m_StatsMutex;
|
||||||
RingBuffer m_QueryStats{15 * 60};
|
RingBuffer m_QueryStats{15 * 60};
|
||||||
bool m_ActiveChangedHandler{false};
|
bool m_ActiveChangedHandler{false};
|
||||||
|
|
||||||
|
RingBuffer m_InputQueries{60};
|
||||||
|
RingBuffer m_OutputQueries{60};
|
||||||
|
Atomic<uint_fast64_t> m_PendingQueries{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
struct database_error : virtual std::exception, virtual boost::exception { };
|
struct database_error : virtual std::exception, virtual boost::exception { };
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
#include "base/configtype.hpp"
|
#include "base/configtype.hpp"
|
||||||
#include "base/exception.hpp"
|
#include "base/exception.hpp"
|
||||||
#include "base/statsfunction.hpp"
|
#include "base/statsfunction.hpp"
|
||||||
|
#include "base/defer.hpp"
|
||||||
#include <utility>
|
#include <utility>
|
||||||
|
|
||||||
using namespace icinga;
|
using namespace icinga;
|
||||||
@ -175,6 +176,8 @@ void IdoMysqlConnection::InternalNewTransaction()
|
|||||||
if (!GetConnected())
|
if (!GetConnected())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
IncreasePendingQueries(2);
|
||||||
|
|
||||||
AsyncQuery("COMMIT");
|
AsyncQuery("COMMIT");
|
||||||
AsyncQuery("BEGIN");
|
AsyncQuery("BEGIN");
|
||||||
}
|
}
|
||||||
@ -524,12 +527,28 @@ void IdoMysqlConnection::FinishAsyncQueries()
|
|||||||
|
|
||||||
std::vector<IdoAsyncQuery>::size_type offset = 0;
|
std::vector<IdoAsyncQuery>::size_type offset = 0;
|
||||||
|
|
||||||
|
// This will be executed if there is a problem with executing the queries,
|
||||||
|
// at which point this function throws an exception and the queries should
|
||||||
|
// not be listed as still pending in the queue.
|
||||||
|
Defer decreaseQueries ([this, &offset, &queries]() {
|
||||||
|
auto lostQueries = queries.size() - offset;
|
||||||
|
|
||||||
|
if (lostQueries > 0) {
|
||||||
|
DecreasePendingQueries(lostQueries, false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
while (offset < queries.size()) {
|
while (offset < queries.size()) {
|
||||||
std::ostringstream querybuf;
|
std::ostringstream querybuf;
|
||||||
|
|
||||||
std::vector<IdoAsyncQuery>::size_type count = 0;
|
std::vector<IdoAsyncQuery>::size_type count = 0;
|
||||||
size_t num_bytes = 0;
|
size_t num_bytes = 0;
|
||||||
|
|
||||||
|
Defer decreaseQueries ([this, &offset, &count]() {
|
||||||
|
offset += count;
|
||||||
|
DecreasePendingQueries(count);
|
||||||
|
});
|
||||||
|
|
||||||
for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
|
for (std::vector<IdoAsyncQuery>::size_type i = offset; i < queries.size(); i++) {
|
||||||
const IdoAsyncQuery& aq = queries[i];
|
const IdoAsyncQuery& aq = queries[i];
|
||||||
|
|
||||||
@ -617,6 +636,9 @@ IdoMysqlResult IdoMysqlConnection::Query(const String& query)
|
|||||||
{
|
{
|
||||||
AssertOnWorkQueue();
|
AssertOnWorkQueue();
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
|
Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
|
||||||
|
|
||||||
/* finish all async queries to maintain the right order for queries */
|
/* finish all async queries to maintain the right order for queries */
|
||||||
FinishAsyncQueries();
|
FinishAsyncQueries();
|
||||||
|
|
||||||
@ -770,6 +792,7 @@ void IdoMysqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
|||||||
SetObjectID(dbobj, GetLastInsertID());
|
SetObjectID(dbobj, GetLastInsertID());
|
||||||
} else {
|
} else {
|
||||||
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
AsyncQuery(qbuf.str());
|
AsyncQuery(qbuf.str());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -804,6 +827,7 @@ void IdoMysqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
|||||||
|
|
||||||
std::ostringstream qbuf;
|
std::ostringstream qbuf;
|
||||||
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
||||||
|
IncreasePendingQueries(1);
|
||||||
AsyncQuery(qbuf.str());
|
AsyncQuery(qbuf.str());
|
||||||
|
|
||||||
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
||||||
@ -893,6 +917,7 @@ void IdoMysqlConnection::ExecuteQuery(const DbQuery& query)
|
|||||||
<< "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
|
<< "Scheduling execute query task, type " << query.Type << ", table '" << query.Table << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -909,6 +934,7 @@ void IdoMysqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& quer
|
|||||||
<< "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
|
<< "Scheduling multiple execute query task, type " << queries[0].Type << ", table '" << queries[0].Table << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(queries.size());
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1066,6 +1092,7 @@ void IdoMysqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOver
|
|||||||
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
||||||
std::ostringstream qdel;
|
std::ostringstream qdel;
|
||||||
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
||||||
|
IncreasePendingQueries(1);
|
||||||
AsyncQuery(qdel.str());
|
AsyncQuery(qdel.str());
|
||||||
|
|
||||||
type = DbQueryInsert;
|
type = DbQueryInsert;
|
||||||
@ -1150,6 +1177,7 @@ void IdoMysqlConnection::FinishExecuteQuery(const DbQuery& query, int type, bool
|
|||||||
<< "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
|
<< "Rescheduling DELETE/INSERT query: Upsert UPDATE did not affect rows, type " << type << ", table '" << query.Table << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalExecuteQuery, this, query, DbQueryDelete | DbQueryInsert), query.Priority);
|
||||||
|
|
||||||
return;
|
return;
|
||||||
@ -1178,6 +1206,7 @@ void IdoMysqlConnection::CleanUpExecuteQuery(const String& table, const String&
|
|||||||
<< time_column << "'. max_age is set to '" << max_age << "'.";
|
<< time_column << "'. max_age is set to '" << max_age << "'.";
|
||||||
#endif /* I2_DEBUG */
|
#endif /* I2_DEBUG */
|
||||||
|
|
||||||
|
IncreasePendingQueries(1);
|
||||||
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
m_QueryQueue.Enqueue(std::bind(&IdoMysqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user