mirror of
https://github.com/Icinga/icinga2.git
synced 2025-08-26 12:08:12 +02:00
Before postgres 9.1, this setting defaulted to off and icinga2 code was making heavy use of this feature. Since postgres 9.1, this settings defaults to on. During the adoption of postgres >= 9.1, the icinga2 postgres ido code maintained compatibility by setting it to off explicitly. In the mean time, the postgres ido code has been converted to using the `E'...'` escape literal syntax exclusively. The last remaining step is now to no longer force the setting to off because no query is using the feature any longer. Closes github issue #8122.
1031 lines
28 KiB
C++
1031 lines
28 KiB
C++
/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
|
|
|
|
#include "db_ido_pgsql/idopgsqlconnection.hpp"
|
|
#include "db_ido_pgsql/idopgsqlconnection-ti.cpp"
|
|
#include "db_ido/dbtype.hpp"
|
|
#include "db_ido/dbvalue.hpp"
|
|
#include "base/logger.hpp"
|
|
#include "base/objectlock.hpp"
|
|
#include "base/convert.hpp"
|
|
#include "base/utility.hpp"
|
|
#include "base/perfdatavalue.hpp"
|
|
#include "base/application.hpp"
|
|
#include "base/configtype.hpp"
|
|
#include "base/exception.hpp"
|
|
#include "base/context.hpp"
|
|
#include "base/statsfunction.hpp"
|
|
#include "base/defer.hpp"
|
|
#include <utility>
|
|
|
|
using namespace icinga;
|
|
|
|
REGISTER_TYPE(IdoPgsqlConnection);
|
|
|
|
REGISTER_STATSFUNCTION(IdoPgsqlConnection, &IdoPgsqlConnection::StatsFunc);
|
|
|
|
IdoPgsqlConnection::IdoPgsqlConnection()
|
|
{
|
|
m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName());
|
|
}
|
|
|
|
void IdoPgsqlConnection::OnConfigLoaded()
|
|
{
|
|
ObjectImpl<IdoPgsqlConnection>::OnConfigLoaded();
|
|
|
|
m_QueryQueue.SetName("IdoPgsqlConnection, " + GetName());
|
|
|
|
Library shimLibrary{"pgsql_shim"};
|
|
|
|
auto create_pgsql_shim = shimLibrary.GetSymbolAddress<create_pgsql_shim_ptr>("create_pgsql_shim");
|
|
|
|
m_Pgsql.reset(create_pgsql_shim());
|
|
|
|
std::swap(m_Library, shimLibrary);
|
|
}
|
|
|
|
void IdoPgsqlConnection::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata)
|
|
{
|
|
DictionaryData nodes;
|
|
|
|
for (const IdoPgsqlConnection::Ptr& idopgsqlconnection : ConfigType::GetObjectsByType<IdoPgsqlConnection>()) {
|
|
size_t queryQueueItems = idopgsqlconnection->m_QueryQueue.GetLength();
|
|
double queryQueueItemRate = idopgsqlconnection->m_QueryQueue.GetTaskCount(60) / 60.0;
|
|
|
|
nodes.emplace_back(idopgsqlconnection->GetName(), new Dictionary({
|
|
{ "version", idopgsqlconnection->GetSchemaVersion() },
|
|
{ "instance_name", idopgsqlconnection->GetInstanceName() },
|
|
{ "connected", idopgsqlconnection->GetConnected() },
|
|
{ "query_queue_items", queryQueueItems },
|
|
{ "query_queue_item_rate", queryQueueItemRate }
|
|
}));
|
|
|
|
perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_rate", idopgsqlconnection->GetQueryCount(60) / 60.0));
|
|
perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_1min", idopgsqlconnection->GetQueryCount(60)));
|
|
perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_5mins", idopgsqlconnection->GetQueryCount(5 * 60)));
|
|
perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_queries_15mins", idopgsqlconnection->GetQueryCount(15 * 60)));
|
|
perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_items", queryQueueItems));
|
|
perfdata->Add(new PerfdataValue("idopgsqlconnection_" + idopgsqlconnection->GetName() + "_query_queue_item_rate", queryQueueItemRate));
|
|
}
|
|
|
|
status->Set("idopgsqlconnection", new Dictionary(std::move(nodes)));
|
|
}
|
|
|
|
void IdoPgsqlConnection::Resume()
|
|
{
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "'" << GetName() << "' resumed.";
|
|
|
|
SetConnected(false);
|
|
|
|
m_QueryQueue.SetExceptionCallback(std::bind(&IdoPgsqlConnection::ExceptionHandler, this, _1));
|
|
|
|
/* Immediately try to connect on Resume() without timer. */
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityImmediate);
|
|
|
|
m_TxTimer = new Timer();
|
|
m_TxTimer->SetInterval(1);
|
|
m_TxTimer->OnTimerExpired.connect(std::bind(&IdoPgsqlConnection::TxTimerHandler, this));
|
|
m_TxTimer->Start();
|
|
|
|
m_ReconnectTimer = new Timer();
|
|
m_ReconnectTimer->SetInterval(10);
|
|
m_ReconnectTimer->OnTimerExpired.connect(std::bind(&IdoPgsqlConnection::ReconnectTimerHandler, this));
|
|
m_ReconnectTimer->Start();
|
|
|
|
/* Start with queries after connect. */
|
|
DbConnection::Resume();
|
|
|
|
ASSERT(m_Pgsql->isthreadsafe());
|
|
}
|
|
|
|
void IdoPgsqlConnection::Pause()
|
|
{
|
|
DbConnection::Pause();
|
|
|
|
m_ReconnectTimer.reset();
|
|
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Disconnect, this), PriorityLow);
|
|
|
|
/* Work on remaining tasks but never delete the threads, for HA resuming later. */
|
|
m_QueryQueue.Join();
|
|
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "'" << GetName() << "' paused.";
|
|
}
|
|
|
|
void IdoPgsqlConnection::ExceptionHandler(boost::exception_ptr exp)
|
|
{
|
|
Log(LogWarning, "IdoPgsqlConnection", "Exception during database operation: Verify that your database is operational!");
|
|
|
|
Log(LogDebug, "IdoPgsqlConnection")
|
|
<< "Exception during database operation: " << DiagnosticInformation(std::move(exp));
|
|
|
|
if (GetConnected()) {
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
}
|
|
}
|
|
|
|
void IdoPgsqlConnection::AssertOnWorkQueue()
|
|
{
|
|
ASSERT(m_QueryQueue.IsWorkerThread());
|
|
}
|
|
|
|
void IdoPgsqlConnection::Disconnect()
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (!GetConnected())
|
|
return;
|
|
|
|
IncreasePendingQueries(1);
|
|
Query("COMMIT");
|
|
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "Disconnected from '" << GetName() << "' database '" << GetDatabase() << "'.";
|
|
}
|
|
|
|
void IdoPgsqlConnection::TxTimerHandler()
|
|
{
|
|
NewTransaction();
|
|
}
|
|
|
|
void IdoPgsqlConnection::NewTransaction()
|
|
{
|
|
if (IsPaused())
|
|
return;
|
|
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalNewTransaction, this), PriorityNormal, true);
|
|
}
|
|
|
|
void IdoPgsqlConnection::InternalNewTransaction()
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (!GetConnected())
|
|
return;
|
|
|
|
IncreasePendingQueries(2);
|
|
Query("COMMIT");
|
|
Query("BEGIN");
|
|
}
|
|
|
|
void IdoPgsqlConnection::ReconnectTimerHandler()
|
|
{
|
|
/* Only allow Reconnect events with high priority. */
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::Reconnect, this), PriorityHigh);
|
|
}
|
|
|
|
void IdoPgsqlConnection::Reconnect()
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
CONTEXT("Reconnecting to PostgreSQL IDO database '" + GetName() + "'");
|
|
|
|
double startTime = Utility::GetTime();
|
|
|
|
SetShouldConnect(true);
|
|
|
|
bool reconnect = false;
|
|
|
|
if (GetConnected()) {
|
|
/* Check if we're really still connected */
|
|
try {
|
|
IncreasePendingQueries(1);
|
|
Query("SELECT 1");
|
|
return;
|
|
} catch (const std::exception&) {
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
reconnect = true;
|
|
}
|
|
}
|
|
|
|
ClearIDCache();
|
|
|
|
String host = GetHost();
|
|
String port = GetPort();
|
|
String user = GetUser();
|
|
String password = GetPassword();
|
|
String database = GetDatabase();
|
|
|
|
String sslMode = GetSslMode();
|
|
String sslKey = GetSslKey();
|
|
String sslCert = GetSslCert();
|
|
String sslCa = GetSslCa();
|
|
|
|
String conninfo;
|
|
|
|
if (!host.IsEmpty())
|
|
conninfo += " host=" + host;
|
|
if (!port.IsEmpty())
|
|
conninfo += " port=" + port;
|
|
if (!user.IsEmpty())
|
|
conninfo += " user=" + user;
|
|
if (!password.IsEmpty())
|
|
conninfo += " password=" + password;
|
|
if (!database.IsEmpty())
|
|
conninfo += " dbname=" + database;
|
|
|
|
if (!sslMode.IsEmpty())
|
|
conninfo += " sslmode=" + sslMode;
|
|
if (!sslKey.IsEmpty())
|
|
conninfo += " sslkey=" + sslKey;
|
|
if (!sslCert.IsEmpty())
|
|
conninfo += " sslcert=" + sslCert;
|
|
if (!sslCa.IsEmpty())
|
|
conninfo += " sslrootcert=" + sslCa;
|
|
|
|
/* connection */
|
|
m_Connection = m_Pgsql->connectdb(conninfo.CStr());
|
|
|
|
if (!m_Connection)
|
|
return;
|
|
|
|
if (m_Pgsql->status(m_Connection) != CONNECTION_OK) {
|
|
String message = m_Pgsql->errorMessage(m_Connection);
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
|
|
Log(LogCritical, "IdoPgsqlConnection")
|
|
<< "Connection to database '" << database << "' with user '" << user << "' on '" << host << ":" << port
|
|
<< "' failed: \"" << message << "\"";
|
|
|
|
BOOST_THROW_EXCEPTION(std::runtime_error(message));
|
|
}
|
|
|
|
SetConnected(true);
|
|
|
|
IdoPgsqlResult result;
|
|
|
|
String dbVersionName = "idoutils";
|
|
IncreasePendingQueries(1);
|
|
result = Query("SELECT version FROM " + GetTablePrefix() + "dbversion WHERE name=E'" + Escape(dbVersionName) + "'");
|
|
|
|
Dictionary::Ptr row = FetchRow(result, 0);
|
|
|
|
if (!row) {
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
|
|
Log(LogCritical, "IdoPgsqlConnection", "Schema does not provide any valid version! Verify your schema installation.");
|
|
|
|
BOOST_THROW_EXCEPTION(std::runtime_error("Invalid schema."));
|
|
}
|
|
|
|
String version = row->Get("version");
|
|
|
|
SetSchemaVersion(version);
|
|
|
|
if (Utility::CompareVersion(IDO_COMPAT_SCHEMA_VERSION, version) < 0) {
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
|
|
Log(LogCritical, "IdoPgsqlConnection")
|
|
<< "Schema version '" << version << "' does not match the required version '"
|
|
<< IDO_COMPAT_SCHEMA_VERSION << "' (or newer)! Please check the upgrade documentation at "
|
|
<< "https://icinga.com/docs/icinga2/latest/doc/16-upgrading-icinga-2/#upgrading-postgresql-db";
|
|
|
|
BOOST_THROW_EXCEPTION(std::runtime_error("Schema version mismatch."));
|
|
}
|
|
|
|
String instanceName = GetInstanceName();
|
|
|
|
IncreasePendingQueries(1);
|
|
result = Query("SELECT instance_id FROM " + GetTablePrefix() + "instances WHERE instance_name = E'" + Escape(instanceName) + "'");
|
|
row = FetchRow(result, 0);
|
|
|
|
if (!row) {
|
|
IncreasePendingQueries(1);
|
|
Query("INSERT INTO " + GetTablePrefix() + "instances (instance_name, instance_description) VALUES (E'" + Escape(instanceName) + "', E'" + Escape(GetInstanceDescription()) + "')");
|
|
m_InstanceID = GetSequenceValue(GetTablePrefix() + "instances", "instance_id");
|
|
} else {
|
|
m_InstanceID = DbReference(row->Get("instance_id"));
|
|
}
|
|
|
|
Endpoint::Ptr my_endpoint = Endpoint::GetLocalEndpoint();
|
|
|
|
/* we have an endpoint in a cluster setup, so decide if we can proceed here */
|
|
if (my_endpoint && GetHAMode() == HARunOnce) {
|
|
/* get the current endpoint writing to programstatus table */
|
|
IncreasePendingQueries(1);
|
|
result = Query("SELECT UNIX_TIMESTAMP(status_update_time) AS status_update_time, endpoint_name FROM " +
|
|
GetTablePrefix() + "programstatus WHERE instance_id = " + Convert::ToString(m_InstanceID));
|
|
row = FetchRow(result, 0);
|
|
|
|
String endpoint_name;
|
|
|
|
if (row)
|
|
endpoint_name = row->Get("endpoint_name");
|
|
else
|
|
Log(LogNotice, "IdoPgsqlConnection", "Empty program status table");
|
|
|
|
/* if we did not write into the database earlier, another instance is active */
|
|
if (endpoint_name != my_endpoint->GetName()) {
|
|
double status_update_time;
|
|
|
|
if (row)
|
|
status_update_time = row->Get("status_update_time");
|
|
else
|
|
status_update_time = 0;
|
|
|
|
double now = Utility::GetTime();
|
|
|
|
double status_update_age = now - status_update_time;
|
|
double failoverTimeout = GetFailoverTimeout();
|
|
|
|
if (status_update_age < GetFailoverTimeout()) {
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "Last update by endpoint '" << endpoint_name << "' was "
|
|
<< status_update_age << "s ago (< failover timeout of " << failoverTimeout << "s). Retrying.";
|
|
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
SetShouldConnect(false);
|
|
|
|
return;
|
|
}
|
|
|
|
/* activate the IDO only, if we're authoritative in this zone */
|
|
if (IsPaused()) {
|
|
Log(LogNotice, "IdoPgsqlConnection")
|
|
<< "Local endpoint '" << my_endpoint->GetName() << "' is not authoritative, bailing out.";
|
|
|
|
m_Pgsql->finish(m_Connection);
|
|
SetConnected(false);
|
|
|
|
return;
|
|
}
|
|
|
|
SetLastFailover(now);
|
|
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "Last update by endpoint '" << endpoint_name << "' was "
|
|
<< status_update_age << "s ago. Taking over '" << GetName() << "' in HA zone '" << Zone::GetLocalZone()->GetName() << "'.";
|
|
}
|
|
|
|
Log(LogNotice, "IdoPgsqlConnection", "Enabling IDO connection.");
|
|
}
|
|
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "PGSQL IDO instance id: " << static_cast<long>(m_InstanceID) << " (schema version: '" + version + "')"
|
|
<< (!sslMode.IsEmpty() ? ", sslmode='" + sslMode + "'" : "");
|
|
|
|
IncreasePendingQueries(1);
|
|
Query("BEGIN");
|
|
|
|
/* update programstatus table */
|
|
UpdateProgramStatus();
|
|
|
|
/* record connection */
|
|
IncreasePendingQueries(1);
|
|
Query("INSERT INTO " + GetTablePrefix() + "conninfo " +
|
|
"(instance_id, connect_time, last_checkin_time, agent_name, agent_version, connect_type, data_start_time) VALUES ("
|
|
+ Convert::ToString(static_cast<long>(m_InstanceID)) + ", NOW(), NOW(), E'icinga2 db_ido_pgsql', E'" + Escape(Application::GetAppVersion())
|
|
+ "', E'" + (reconnect ? "RECONNECT" : "INITIAL") + "', NOW())");
|
|
|
|
/* clear config tables for the initial config dump */
|
|
PrepareDatabase();
|
|
|
|
std::ostringstream q1buf;
|
|
q1buf << "SELECT object_id, objecttype_id, name1, name2, is_active FROM " + GetTablePrefix() + "objects WHERE instance_id = " << static_cast<long>(m_InstanceID);
|
|
IncreasePendingQueries(1);
|
|
result = Query(q1buf.str());
|
|
|
|
std::vector<DbObject::Ptr> activeDbObjs;
|
|
|
|
int index = 0;
|
|
while ((row = FetchRow(result, index))) {
|
|
index++;
|
|
|
|
DbType::Ptr dbtype = DbType::GetByID(row->Get("objecttype_id"));
|
|
|
|
if (!dbtype)
|
|
continue;
|
|
|
|
DbObject::Ptr dbobj = dbtype->GetOrCreateObjectByName(row->Get("name1"), row->Get("name2"));
|
|
SetObjectID(dbobj, DbReference(row->Get("object_id")));
|
|
bool active = row->Get("is_active");
|
|
SetObjectActive(dbobj, active);
|
|
|
|
if (active)
|
|
activeDbObjs.push_back(dbobj);
|
|
}
|
|
|
|
SetIDCacheValid(true);
|
|
|
|
EnableActiveChangedHandler();
|
|
|
|
for (const DbObject::Ptr& dbobj : activeDbObjs) {
|
|
if (dbobj->GetObject())
|
|
continue;
|
|
|
|
Log(LogNotice, "IdoPgsqlConnection")
|
|
<< "Deactivate deleted object name1: '" << dbobj->GetName1()
|
|
<< "' name2: '" << dbobj->GetName2() + "'.";
|
|
DeactivateObject(dbobj);
|
|
}
|
|
|
|
UpdateAllObjects();
|
|
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::ClearTablesBySession, this), PriorityNormal);
|
|
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::FinishConnect, this, startTime), PriorityNormal);
|
|
}
|
|
|
|
void IdoPgsqlConnection::FinishConnect(double startTime)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (!GetConnected())
|
|
return;
|
|
|
|
Log(LogInformation, "IdoPgsqlConnection")
|
|
<< "Finished reconnecting to '" << GetName() << "' database '" << GetDatabase() << "' in "
|
|
<< std::setw(2) << Utility::GetTime() - startTime << " second(s).";
|
|
|
|
IncreasePendingQueries(2);
|
|
Query("COMMIT");
|
|
Query("BEGIN");
|
|
}
|
|
|
|
void IdoPgsqlConnection::ClearTablesBySession()
|
|
{
|
|
/* delete all comments and downtimes without current session token */
|
|
ClearTableBySession("comments");
|
|
ClearTableBySession("scheduleddowntime");
|
|
}
|
|
|
|
void IdoPgsqlConnection::ClearTableBySession(const String& table)
|
|
{
|
|
IncreasePendingQueries(1);
|
|
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
|
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND session_token <> " +
|
|
Convert::ToString(GetSessionToken()));
|
|
}
|
|
|
|
IdoPgsqlResult IdoPgsqlConnection::Query(const String& query)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
Defer decreaseQueries ([this]() { DecreasePendingQueries(1); });
|
|
|
|
Log(LogDebug, "IdoPgsqlConnection")
|
|
<< "Query: " << query;
|
|
|
|
IncreaseQueryCount();
|
|
|
|
PGresult *result = m_Pgsql->exec(m_Connection, query.CStr());
|
|
|
|
if (!result) {
|
|
String message = m_Pgsql->errorMessage(m_Connection);
|
|
Log(LogCritical, "IdoPgsqlConnection")
|
|
<< "Error \"" << message << "\" when executing query \"" << query << "\"";
|
|
|
|
BOOST_THROW_EXCEPTION(
|
|
database_error()
|
|
<< errinfo_message(message)
|
|
<< errinfo_database_query(query)
|
|
);
|
|
}
|
|
|
|
char *rowCount = m_Pgsql->cmdTuples(result);
|
|
m_AffectedRows = atoi(rowCount);
|
|
|
|
if (m_Pgsql->resultStatus(result) == PGRES_COMMAND_OK) {
|
|
m_Pgsql->clear(result);
|
|
return IdoPgsqlResult();
|
|
}
|
|
|
|
if (m_Pgsql->resultStatus(result) != PGRES_TUPLES_OK) {
|
|
String message = m_Pgsql->resultErrorMessage(result);
|
|
m_Pgsql->clear(result);
|
|
|
|
Log(LogCritical, "IdoPgsqlConnection")
|
|
<< "Error \"" << message << "\" when executing query \"" << query << "\"";
|
|
|
|
BOOST_THROW_EXCEPTION(
|
|
database_error()
|
|
<< errinfo_message(message)
|
|
<< errinfo_database_query(query)
|
|
);
|
|
}
|
|
|
|
return IdoPgsqlResult(result, std::bind(&PgsqlInterface::clear, std::cref(m_Pgsql), _1));
|
|
}
|
|
|
|
DbReference IdoPgsqlConnection::GetSequenceValue(const String& table, const String& column)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
IncreasePendingQueries(1);
|
|
IdoPgsqlResult result = Query("SELECT CURRVAL(pg_get_serial_sequence(E'" + Escape(table) + "', E'" + Escape(column) + "')) AS id");
|
|
|
|
Dictionary::Ptr row = FetchRow(result, 0);
|
|
|
|
ASSERT(row);
|
|
|
|
Log(LogDebug, "IdoPgsqlConnection")
|
|
<< "Sequence Value: " << row->Get("id");
|
|
|
|
return {Convert::ToLong(row->Get("id"))};
|
|
}
|
|
|
|
int IdoPgsqlConnection::GetAffectedRows()
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
return m_AffectedRows;
|
|
}
|
|
|
|
String IdoPgsqlConnection::Escape(const String& s)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
String utf8s = Utility::ValidateUTF8(s);
|
|
|
|
size_t length = utf8s.GetLength();
|
|
auto *to = new char[utf8s.GetLength() * 2 + 1];
|
|
|
|
m_Pgsql->escapeStringConn(m_Connection, to, utf8s.CStr(), length, nullptr);
|
|
|
|
String result = String(to);
|
|
|
|
delete [] to;
|
|
|
|
return result;
|
|
}
|
|
|
|
Dictionary::Ptr IdoPgsqlConnection::FetchRow(const IdoPgsqlResult& result, int row)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (row >= m_Pgsql->ntuples(result.get()))
|
|
return nullptr;
|
|
|
|
int columns = m_Pgsql->nfields(result.get());
|
|
|
|
DictionaryData dict;
|
|
|
|
for (int column = 0; column < columns; column++) {
|
|
Value value;
|
|
|
|
if (!m_Pgsql->getisnull(result.get(), row, column))
|
|
value = m_Pgsql->getvalue(result.get(), row, column);
|
|
|
|
dict.emplace_back(m_Pgsql->fname(result.get(), column), value);
|
|
}
|
|
|
|
return new Dictionary(std::move(dict));
|
|
}
|
|
|
|
void IdoPgsqlConnection::ActivateObject(const DbObject::Ptr& dbobj)
|
|
{
|
|
if (IsPaused())
|
|
return;
|
|
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalActivateObject, this, dbobj), PriorityNormal);
|
|
}
|
|
|
|
void IdoPgsqlConnection::InternalActivateObject(const DbObject::Ptr& dbobj)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (!GetConnected())
|
|
return;
|
|
|
|
DbReference dbref = GetObjectID(dbobj);
|
|
std::ostringstream qbuf;
|
|
|
|
if (!dbref.IsValid()) {
|
|
if (!dbobj->GetName2().IsEmpty()) {
|
|
qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, name2, is_active) VALUES ("
|
|
<< static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
|
|
<< "E'" << Escape(dbobj->GetName1()) << "', E'" << Escape(dbobj->GetName2()) << "', 1)";
|
|
} else {
|
|
qbuf << "INSERT INTO " + GetTablePrefix() + "objects (instance_id, objecttype_id, name1, is_active) VALUES ("
|
|
<< static_cast<long>(m_InstanceID) << ", " << dbobj->GetType()->GetTypeID() << ", "
|
|
<< "E'" << Escape(dbobj->GetName1()) << "', 1)";
|
|
}
|
|
|
|
IncreasePendingQueries(1);
|
|
Query(qbuf.str());
|
|
SetObjectID(dbobj, GetSequenceValue(GetTablePrefix() + "objects", "object_id"));
|
|
} else {
|
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 1 WHERE object_id = " << static_cast<long>(dbref);
|
|
IncreasePendingQueries(1);
|
|
Query(qbuf.str());
|
|
}
|
|
}
|
|
|
|
void IdoPgsqlConnection::DeactivateObject(const DbObject::Ptr& dbobj)
|
|
{
|
|
if (IsPaused())
|
|
return;
|
|
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalDeactivateObject, this, dbobj), PriorityNormal);
|
|
}
|
|
|
|
void IdoPgsqlConnection::InternalDeactivateObject(const DbObject::Ptr& dbobj)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (!GetConnected())
|
|
return;
|
|
|
|
DbReference dbref = GetObjectID(dbobj);
|
|
|
|
if (!dbref.IsValid())
|
|
return;
|
|
|
|
std::ostringstream qbuf;
|
|
qbuf << "UPDATE " + GetTablePrefix() + "objects SET is_active = 0 WHERE object_id = " << static_cast<long>(dbref);
|
|
IncreasePendingQueries(1);
|
|
Query(qbuf.str());
|
|
|
|
/* Note that we're _NOT_ clearing the db refs via SetReference/SetConfigUpdate/SetStatusUpdate
|
|
* because the object is still in the database. */
|
|
}
|
|
|
|
bool IdoPgsqlConnection::FieldToEscapedString(const String& key, const Value& value, Value *result)
|
|
{
|
|
if (key == "instance_id") {
|
|
*result = static_cast<long>(m_InstanceID);
|
|
return true;
|
|
} else if (key == "session_token") {
|
|
*result = GetSessionToken();
|
|
return true;
|
|
}
|
|
|
|
Value rawvalue = DbValue::ExtractValue(value);
|
|
|
|
if (rawvalue.IsObjectType<ConfigObject>()) {
|
|
DbObject::Ptr dbobjcol = DbObject::GetOrCreateByObject(rawvalue);
|
|
|
|
if (!dbobjcol) {
|
|
*result = 0;
|
|
return true;
|
|
}
|
|
|
|
if (!IsIDCacheValid())
|
|
return false;
|
|
|
|
DbReference dbrefcol;
|
|
|
|
if (DbValue::IsObjectInsertID(value)) {
|
|
dbrefcol = GetInsertID(dbobjcol);
|
|
|
|
if (!dbrefcol.IsValid())
|
|
return false;
|
|
} else {
|
|
dbrefcol = GetObjectID(dbobjcol);
|
|
|
|
if (!dbrefcol.IsValid()) {
|
|
InternalActivateObject(dbobjcol);
|
|
|
|
dbrefcol = GetObjectID(dbobjcol);
|
|
|
|
if (!dbrefcol.IsValid())
|
|
return false;
|
|
}
|
|
}
|
|
|
|
*result = static_cast<long>(dbrefcol);
|
|
} else if (DbValue::IsTimestamp(value)) {
|
|
long ts = rawvalue;
|
|
std::ostringstream msgbuf;
|
|
msgbuf << "TO_TIMESTAMP(" << ts << ") AT TIME ZONE 'UTC'";
|
|
*result = Value(msgbuf.str());
|
|
} else if (DbValue::IsObjectInsertID(value)) {
|
|
auto id = static_cast<long>(rawvalue);
|
|
|
|
if (id <= 0)
|
|
return false;
|
|
|
|
*result = id;
|
|
return true;
|
|
} else {
|
|
Value fvalue;
|
|
|
|
if (rawvalue.IsBoolean())
|
|
fvalue = Convert::ToLong(rawvalue);
|
|
else
|
|
fvalue = rawvalue;
|
|
|
|
*result = "E'" + Escape(fvalue) + "'";
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void IdoPgsqlConnection::ExecuteQuery(const DbQuery& query)
|
|
{
|
|
if (IsPaused())
|
|
return;
|
|
|
|
ASSERT(query.Category != DbCatInvalid);
|
|
|
|
IncreasePendingQueries(1);
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority, true);
|
|
}
|
|
|
|
void IdoPgsqlConnection::ExecuteMultipleQueries(const std::vector<DbQuery>& queries)
|
|
{
|
|
if (IsPaused())
|
|
return;
|
|
|
|
if (queries.empty())
|
|
return;
|
|
|
|
IncreasePendingQueries(queries.size());
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), queries[0].Priority, true);
|
|
}
|
|
|
|
bool IdoPgsqlConnection::CanExecuteQuery(const DbQuery& query)
|
|
{
|
|
if (query.Object && !IsIDCacheValid())
|
|
return false;
|
|
|
|
if (query.WhereCriteria) {
|
|
ObjectLock olock(query.WhereCriteria);
|
|
Value value;
|
|
|
|
for (const Dictionary::Pair& kv : query.WhereCriteria) {
|
|
if (!FieldToEscapedString(kv.first, kv.second, &value))
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (query.Fields) {
|
|
ObjectLock olock(query.Fields);
|
|
|
|
for (const Dictionary::Pair& kv : query.Fields) {
|
|
Value value;
|
|
|
|
if (kv.second.IsEmpty() && !kv.second.IsString())
|
|
continue;
|
|
|
|
if (!FieldToEscapedString(kv.first, kv.second, &value))
|
|
return false;
|
|
}
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void IdoPgsqlConnection::InternalExecuteMultipleQueries(const std::vector<DbQuery>& queries)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (IsPaused()) {
|
|
DecreasePendingQueries(queries.size());
|
|
return;
|
|
}
|
|
|
|
if (!GetConnected()) {
|
|
DecreasePendingQueries(queries.size());
|
|
return;
|
|
}
|
|
|
|
for (const DbQuery& query : queries) {
|
|
ASSERT(query.Type == DbQueryNewTransaction || query.Category != DbCatInvalid);
|
|
|
|
if (!CanExecuteQuery(query)) {
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteMultipleQueries, this, queries), query.Priority);
|
|
return;
|
|
}
|
|
}
|
|
|
|
for (const DbQuery& query : queries) {
|
|
InternalExecuteQuery(query);
|
|
}
|
|
}
|
|
|
|
void IdoPgsqlConnection::InternalExecuteQuery(const DbQuery& query, int typeOverride)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (IsPaused()) {
|
|
DecreasePendingQueries(1);
|
|
return;
|
|
}
|
|
|
|
if (!GetConnected()) {
|
|
DecreasePendingQueries(1);
|
|
return;
|
|
}
|
|
|
|
if (query.Type == DbQueryNewTransaction) {
|
|
DecreasePendingQueries(1);
|
|
InternalNewTransaction();
|
|
return;
|
|
}
|
|
|
|
/* check whether we're allowed to execute the query first */
|
|
if (GetCategoryFilter() != DbCatEverything && (query.Category & GetCategoryFilter()) == 0) {
|
|
DecreasePendingQueries(1);
|
|
return;
|
|
}
|
|
|
|
if (query.Object && query.Object->GetObject()->GetExtension("agent_check").ToBool()) {
|
|
DecreasePendingQueries(1);
|
|
return;
|
|
}
|
|
|
|
/* check if there are missing object/insert ids and re-enqueue the query */
|
|
if (!CanExecuteQuery(query)) {
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, typeOverride), query.Priority);
|
|
return;
|
|
}
|
|
|
|
std::ostringstream qbuf, where;
|
|
int type;
|
|
|
|
if (query.WhereCriteria) {
|
|
where << " WHERE ";
|
|
|
|
ObjectLock olock(query.WhereCriteria);
|
|
Value value;
|
|
bool first = true;
|
|
|
|
for (const Dictionary::Pair& kv : query.WhereCriteria) {
|
|
if (!FieldToEscapedString(kv.first, kv.second, &value)) {
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
|
|
return;
|
|
}
|
|
|
|
if (!first)
|
|
where << " AND ";
|
|
|
|
where << kv.first << " = " << value;
|
|
|
|
if (first)
|
|
first = false;
|
|
}
|
|
}
|
|
|
|
type = (typeOverride != -1) ? typeOverride : query.Type;
|
|
|
|
bool upsert = false;
|
|
|
|
if ((type & DbQueryInsert) && (type & DbQueryUpdate)) {
|
|
bool hasid = false;
|
|
|
|
if (query.Object) {
|
|
if (query.ConfigUpdate)
|
|
hasid = GetConfigUpdate(query.Object);
|
|
else if (query.StatusUpdate)
|
|
hasid = GetStatusUpdate(query.Object);
|
|
}
|
|
|
|
if (!hasid)
|
|
upsert = true;
|
|
|
|
type = DbQueryUpdate;
|
|
}
|
|
|
|
if ((type & DbQueryInsert) && (type & DbQueryDelete)) {
|
|
std::ostringstream qdel;
|
|
qdel << "DELETE FROM " << GetTablePrefix() << query.Table << where.str();
|
|
IncreasePendingQueries(1);
|
|
Query(qdel.str());
|
|
|
|
type = DbQueryInsert;
|
|
}
|
|
|
|
switch (type) {
|
|
case DbQueryInsert:
|
|
qbuf << "INSERT INTO " << GetTablePrefix() << query.Table;
|
|
break;
|
|
case DbQueryUpdate:
|
|
qbuf << "UPDATE " << GetTablePrefix() << query.Table << " SET";
|
|
break;
|
|
case DbQueryDelete:
|
|
qbuf << "DELETE FROM " << GetTablePrefix() << query.Table;
|
|
break;
|
|
default:
|
|
VERIFY(!"Invalid query type.");
|
|
}
|
|
|
|
if (type == DbQueryInsert || type == DbQueryUpdate) {
|
|
std::ostringstream colbuf, valbuf;
|
|
|
|
if (type == DbQueryUpdate && query.Fields->GetLength() == 0)
|
|
return;
|
|
|
|
ObjectLock olock(query.Fields);
|
|
|
|
Value value;
|
|
bool first = true;
|
|
for (const Dictionary::Pair& kv : query.Fields) {
|
|
if (kv.second.IsEmpty() && !kv.second.IsString())
|
|
continue;
|
|
|
|
if (!FieldToEscapedString(kv.first, kv.second, &value)) {
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalExecuteQuery, this, query, -1), query.Priority);
|
|
return;
|
|
}
|
|
|
|
if (type == DbQueryInsert) {
|
|
if (!first) {
|
|
colbuf << ", ";
|
|
valbuf << ", ";
|
|
}
|
|
|
|
colbuf << kv.first;
|
|
valbuf << value;
|
|
} else {
|
|
if (!first)
|
|
qbuf << ", ";
|
|
|
|
qbuf << " " << kv.first << " = " << value;
|
|
}
|
|
|
|
if (first)
|
|
first = false;
|
|
}
|
|
|
|
if (type == DbQueryInsert)
|
|
qbuf << " (" << colbuf.str() << ") VALUES (" << valbuf.str() << ")";
|
|
}
|
|
|
|
if (type != DbQueryInsert)
|
|
qbuf << where.str();
|
|
|
|
Query(qbuf.str());
|
|
|
|
if (upsert && GetAffectedRows() == 0) {
|
|
IncreasePendingQueries(1);
|
|
InternalExecuteQuery(query, DbQueryDelete | DbQueryInsert);
|
|
|
|
return;
|
|
}
|
|
|
|
if (type == DbQueryInsert && query.Object) {
|
|
if (query.ConfigUpdate) {
|
|
String idField = query.IdColumn;
|
|
|
|
if (idField.IsEmpty())
|
|
idField = query.Table.SubStr(0, query.Table.GetLength() - 1) + "_id";
|
|
|
|
SetInsertID(query.Object, GetSequenceValue(GetTablePrefix() + query.Table, idField));
|
|
|
|
SetConfigUpdate(query.Object, true);
|
|
} else if (query.StatusUpdate)
|
|
SetStatusUpdate(query.Object, true);
|
|
}
|
|
|
|
if (type == DbQueryInsert && query.Table == "notifications" && query.NotificationInsertID) {
|
|
DbReference seqval = GetSequenceValue(GetTablePrefix() + query.Table, "notification_id");
|
|
query.NotificationInsertID->SetValue(static_cast<long>(seqval));
|
|
}
|
|
}
|
|
|
|
void IdoPgsqlConnection::CleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
|
|
{
|
|
if (IsPaused())
|
|
return;
|
|
|
|
IncreasePendingQueries(1);
|
|
m_QueryQueue.Enqueue(std::bind(&IdoPgsqlConnection::InternalCleanUpExecuteQuery, this, table, time_column, max_age), PriorityLow, true);
|
|
}
|
|
|
|
void IdoPgsqlConnection::InternalCleanUpExecuteQuery(const String& table, const String& time_column, double max_age)
|
|
{
|
|
AssertOnWorkQueue();
|
|
|
|
if (!GetConnected()) {
|
|
DecreasePendingQueries(1);
|
|
return;
|
|
}
|
|
|
|
Query("DELETE FROM " + GetTablePrefix() + table + " WHERE instance_id = " +
|
|
Convert::ToString(static_cast<long>(m_InstanceID)) + " AND " + time_column +
|
|
" < TO_TIMESTAMP(" + Convert::ToString(static_cast<long>(max_age)) + ") AT TIME ZONE 'UTC'");
|
|
}
|
|
|
|
void IdoPgsqlConnection::FillIDCache(const DbType::Ptr& type)
|
|
{
|
|
String query = "SELECT " + type->GetIDColumn() + " AS object_id, " + type->GetTable() + "_id, config_hash FROM " + GetTablePrefix() + type->GetTable() + "s";
|
|
IncreasePendingQueries(1);
|
|
IdoPgsqlResult result = Query(query);
|
|
|
|
Dictionary::Ptr row;
|
|
|
|
int index = 0;
|
|
while ((row = FetchRow(result, index))) {
|
|
index++;
|
|
DbReference dbref(row->Get("object_id"));
|
|
SetInsertID(type, dbref, DbReference(row->Get(type->GetTable() + "_id")));
|
|
SetConfigHash(type, dbref, row->Get("config_hash"));
|
|
}
|
|
}
|
|
|
|
int IdoPgsqlConnection::GetPendingQueryCount() const
|
|
{
|
|
return m_QueryQueue.GetLength();
|
|
}
|