ImportRun: take over specific logic from Db
This commit is contained in:
parent
f863a9b223
commit
d4a680a6b9
|
@ -402,105 +402,6 @@ class Db extends DbConnection
|
|||
return $objects;
|
||||
}
|
||||
|
||||
public function fetchLatestImportedRows($source, $desiredColumns = null)
|
||||
{
|
||||
$db = $this->db();
|
||||
if ($desiredColumns === null) {
|
||||
$columns = null;
|
||||
} else {
|
||||
$columns = array();
|
||||
foreach ($desiredColumns as $column) {
|
||||
if (false === ($pos = strpos($column, '.'))) {
|
||||
$columns[$column] = $column;
|
||||
} else {
|
||||
$column = substr($column, 0, $pos);
|
||||
$columns[$column] = $column;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$lastRun = $db->select()->from(
|
||||
array('r' => 'import_run'),
|
||||
array('checksum' => $this->dbHexFunc('r.rowset_checksum'))
|
||||
);
|
||||
|
||||
if (is_int($source) || ctype_digit($source)) {
|
||||
$lastRun->where('r.source_id = ?', $source);
|
||||
} else {
|
||||
$lastRun->where('r.source_name = ?', $source);
|
||||
}
|
||||
|
||||
$lastRun->order('r.start_time DESC')->limit(1);
|
||||
$checksum = $db->fetchOne($lastRun);
|
||||
|
||||
return $this->fetchImportedRowsetRows($checksum, $columns);
|
||||
}
|
||||
|
||||
public function fetchImportedRowsetRows($checksum, $columns, $filter = null)
|
||||
{
|
||||
$db = $this->db();
|
||||
$binchecksum = Util::hex2binary($checksum);
|
||||
|
||||
$query = $db->select()->from(
|
||||
array('rsr' => 'imported_rowset_row'),
|
||||
array(
|
||||
'object_name' => 'r.object_name',
|
||||
'property_name' => 'p.property_name',
|
||||
'property_value' => 'p.property_value',
|
||||
'format' => 'p.format'
|
||||
)
|
||||
)->join(
|
||||
array('r' => 'imported_row'),
|
||||
'rsr.row_checksum = r.checksum',
|
||||
array()
|
||||
)->join(
|
||||
array('rp' => 'imported_row_property'),
|
||||
'r.checksum = rp.row_checksum',
|
||||
array()
|
||||
)->join(
|
||||
array('p' => 'imported_property'),
|
||||
'p.checksum = rp.property_checksum',
|
||||
array()
|
||||
)->where('rsr.rowset_checksum = ?', $this->quoteBinary($binchecksum))->order('r.object_name');
|
||||
|
||||
if ($columns === null) {
|
||||
$columns = $this->listImportedRowsetColumnNames($checksum);
|
||||
} else {
|
||||
$query->where('p.property_name IN (?)', $columns);
|
||||
}
|
||||
|
||||
$result = array();
|
||||
$empty = (object) array();
|
||||
foreach ($columns as $k => $v) {
|
||||
$empty->$k = null;
|
||||
}
|
||||
|
||||
foreach ($db->fetchAll($query) as $row) {
|
||||
if (! array_key_exists($row->object_name, $result)) {
|
||||
$result[$row->object_name] = clone($empty);
|
||||
}
|
||||
|
||||
if ($row->format === 'json') {
|
||||
$result[$row->object_name]->{$row->property_name} = json_decode($row->property_value);
|
||||
} else {
|
||||
$result[$row->object_name]->{$row->property_name} = $row->property_value;
|
||||
}
|
||||
}
|
||||
|
||||
if ($filter) {
|
||||
$filtered = array();
|
||||
foreach ($result as $key => $row) {
|
||||
if ($filter->matches($row)) {
|
||||
$filtered[$key] = $row;
|
||||
}
|
||||
}
|
||||
|
||||
return $filtered;
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
public function getLatestImportedChecksum($source)
|
||||
{
|
||||
$db = $this->db();
|
||||
|
@ -564,26 +465,6 @@ class Db extends DbConnection
|
|||
return $result;
|
||||
}
|
||||
|
||||
public function listImportedRowsetColumnNames($checksum)
|
||||
{
|
||||
$db = $this->db();
|
||||
|
||||
$query = $db->select()->distinct()->from(
|
||||
array('p' => 'imported_property'),
|
||||
'property_name'
|
||||
)->join(
|
||||
array('rp' => 'imported_row_property'),
|
||||
'rp.property_checksum = p.checksum',
|
||||
array()
|
||||
)->join(
|
||||
array('rsr' => 'imported_rowset_row'),
|
||||
'rsr.row_checksum = rp.row_checksum',
|
||||
array()
|
||||
)->where('rsr.rowset_checksum = ?', $this->quoteBinary(Util::hex2binary($checksum)));
|
||||
|
||||
return $db->fetchCol($query);
|
||||
}
|
||||
|
||||
public function enumCommands()
|
||||
{
|
||||
return $this->enumIcingaObjects('command');
|
||||
|
|
|
@ -392,10 +392,8 @@ class Sync
|
|||
// Provide an alias column for our key. TODO: double-check this!
|
||||
$key = $source->key_column;
|
||||
$this->sourceColumns[$sourceId][$key] = $key;
|
||||
$rows = $this->db->fetchLatestImportedRows(
|
||||
$sourceId,
|
||||
$this->sourceColumns[$sourceId]
|
||||
);
|
||||
$run = $source->fetchLastRun(true);
|
||||
$rows = $run->fetchRows($this->sourceColumns[$sourceId]);
|
||||
|
||||
$this->imported[$sourceId] = array();
|
||||
foreach ($rows as $row) {
|
||||
|
@ -434,6 +432,8 @@ class Sync
|
|||
$this->imported[$sourceId][$row->$key] = $row;
|
||||
}
|
||||
}
|
||||
|
||||
unset($rows);
|
||||
}
|
||||
|
||||
return $this;
|
||||
|
|
|
@ -36,4 +36,89 @@ class ImportRun extends DbObject
|
|||
$this->getConnection()->quoteBinary($this->rowset_checksum)
|
||||
);
|
||||
}
|
||||
|
||||
public function listColumnNames()
|
||||
{
|
||||
$db = $this->getDb();
|
||||
|
||||
$query = $db->select()->distinct()->from(
|
||||
array('p' => 'imported_property'),
|
||||
'property_name'
|
||||
)->join(
|
||||
array('rp' => 'imported_row_property'),
|
||||
'rp.property_checksum = p.checksum',
|
||||
array()
|
||||
)->join(
|
||||
array('rsr' => 'imported_rowset_row'),
|
||||
'rsr.row_checksum = rp.row_checksum',
|
||||
array()
|
||||
)->where('rsr.rowset_checksum = ?', $this->getConnection()->quoteBinary($this->rowset_checksum));
|
||||
|
||||
return $db->fetchCol($query);
|
||||
}
|
||||
|
||||
public function fetchRows($columns, $filter = null)
|
||||
{
|
||||
$db = $this->getDb();
|
||||
$binchecksum = $this->rowset_checksum;
|
||||
|
||||
$query = $db->select()->from(
|
||||
array('rsr' => 'imported_rowset_row'),
|
||||
array(
|
||||
'object_name' => 'r.object_name',
|
||||
'property_name' => 'p.property_name',
|
||||
'property_value' => 'p.property_value',
|
||||
'format' => 'p.format'
|
||||
)
|
||||
)->join(
|
||||
array('r' => 'imported_row'),
|
||||
'rsr.row_checksum = r.checksum',
|
||||
array()
|
||||
)->join(
|
||||
array('rp' => 'imported_row_property'),
|
||||
'r.checksum = rp.row_checksum',
|
||||
array()
|
||||
)->join(
|
||||
array('p' => 'imported_property'),
|
||||
'p.checksum = rp.property_checksum',
|
||||
array()
|
||||
)->where('rsr.rowset_checksum = ?', $this->getConnection()->quoteBinary($binchecksum))->order('r.object_name');
|
||||
|
||||
if ($columns === null) {
|
||||
$columns = $this->listColumnNames();
|
||||
} else {
|
||||
$query->where('p.property_name IN (?)', $columns);
|
||||
}
|
||||
|
||||
$result = array();
|
||||
$empty = (object) array();
|
||||
foreach ($columns as $k => $v) {
|
||||
$empty->$k = null;
|
||||
}
|
||||
|
||||
foreach ($db->fetchAll($query) as $row) {
|
||||
if (! array_key_exists($row->object_name, $result)) {
|
||||
$result[$row->object_name] = clone($empty);
|
||||
}
|
||||
|
||||
if ($row->format === 'json') {
|
||||
$result[$row->object_name]->{$row->property_name} = json_decode($row->property_value);
|
||||
} else {
|
||||
$result[$row->object_name]->{$row->property_name} = $row->property_value;
|
||||
}
|
||||
}
|
||||
|
||||
if ($filter) {
|
||||
$filtered = array();
|
||||
foreach ($result as $key => $row) {
|
||||
if ($filter->matches($row)) {
|
||||
$filtered[$key] = $row;
|
||||
}
|
||||
}
|
||||
|
||||
return $filtered;
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@
|
|||
namespace Icinga\Module\Director\Objects;
|
||||
|
||||
use Icinga\Application\Benchmark;
|
||||
use Icinga\Exception\NotFoundError;
|
||||
use Icinga\Module\Director\Data\Db\DbObjectWithSettings;
|
||||
use Icinga\Module\Director\Import\Import;
|
||||
use Exception;
|
||||
|
@ -29,15 +30,15 @@ class ImportSource extends DbObjectWithSettings
|
|||
|
||||
protected $settingsRemoteId = 'source_id';
|
||||
|
||||
public function fetchLastRun()
|
||||
public function fetchLastRun($required = false)
|
||||
{
|
||||
return $this->fetchLastRunBefore(time());
|
||||
return $this->fetchLastRunBefore(time() + 1, $required);
|
||||
}
|
||||
|
||||
public function fetchLastRunBefore($timestamp)
|
||||
public function fetchLastRunBefore($timestamp, $required = false)
|
||||
{
|
||||
if (! $this->hasBeenLoadedFromDb()) {
|
||||
return null;
|
||||
return $this->nullUnlessRequired($required);
|
||||
}
|
||||
|
||||
if ($timestamp === null) {
|
||||
|
@ -58,10 +59,22 @@ class ImportSource extends DbObjectWithSettings
|
|||
if ($runId) {
|
||||
return ImportRun::load($runId, $this->getConnection());
|
||||
} else {
|
||||
return null;
|
||||
return $this->nullUnlessRequired($required);
|
||||
}
|
||||
}
|
||||
|
||||
protected function nullUnlessRequired($required)
|
||||
{
|
||||
if ($required) {
|
||||
throw new NotFoundError(
|
||||
'No data has been imported for "%s" yet',
|
||||
$this->source_name
|
||||
);
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
public function fetchRowModifiers()
|
||||
{
|
||||
$db = $this->getDb();
|
||||
|
|
Loading…
Reference in New Issue