From d4a680a6b9e19240f1b58163af074437ddc2f801 Mon Sep 17 00:00:00 2001 From: Thomas Gelf Date: Wed, 13 Jul 2016 10:19:03 +0200 Subject: [PATCH] ImportRun: take over specific logic from Db --- library/Director/Db.php | 119 ---------------------- library/Director/Import/Sync.php | 8 +- library/Director/Objects/ImportRun.php | 85 ++++++++++++++++ library/Director/Objects/ImportSource.php | 23 ++++- 4 files changed, 107 insertions(+), 128 deletions(-) diff --git a/library/Director/Db.php b/library/Director/Db.php index 1b13c5bd..9f5159ff 100644 --- a/library/Director/Db.php +++ b/library/Director/Db.php @@ -402,105 +402,6 @@ class Db extends DbConnection return $objects; } - public function fetchLatestImportedRows($source, $desiredColumns = null) - { - $db = $this->db(); - if ($desiredColumns === null) { - $columns = null; - } else { - $columns = array(); - foreach ($desiredColumns as $column) { - if (false === ($pos = strpos($column, '.'))) { - $columns[$column] = $column; - } else { - $column = substr($column, 0, $pos); - $columns[$column] = $column; - } - } - } - - $lastRun = $db->select()->from( - array('r' => 'import_run'), - array('checksum' => $this->dbHexFunc('r.rowset_checksum')) - ); - - if (is_int($source) || ctype_digit($source)) { - $lastRun->where('r.source_id = ?', $source); - } else { - $lastRun->where('r.source_name = ?', $source); - } - - $lastRun->order('r.start_time DESC')->limit(1); - $checksum = $db->fetchOne($lastRun); - - return $this->fetchImportedRowsetRows($checksum, $columns); - } - - public function fetchImportedRowsetRows($checksum, $columns, $filter = null) - { - $db = $this->db(); - $binchecksum = Util::hex2binary($checksum); - - $query = $db->select()->from( - array('rsr' => 'imported_rowset_row'), - array( - 'object_name' => 'r.object_name', - 'property_name' => 'p.property_name', - 'property_value' => 'p.property_value', - 'format' => 'p.format' - ) - )->join( - array('r' => 'imported_row'), - 'rsr.row_checksum = r.checksum', - array() - )->join( - array('rp' => 'imported_row_property'), - 'r.checksum = rp.row_checksum', - array() - )->join( - array('p' => 'imported_property'), - 'p.checksum = rp.property_checksum', - array() - )->where('rsr.rowset_checksum = ?', $this->quoteBinary($binchecksum))->order('r.object_name'); - - if ($columns === null) { - $columns = $this->listImportedRowsetColumnNames($checksum); - } else { - $query->where('p.property_name IN (?)', $columns); - } - - $result = array(); - $empty = (object) array(); - foreach ($columns as $k => $v) { - $empty->$k = null; - } - - foreach ($db->fetchAll($query) as $row) { - if (! array_key_exists($row->object_name, $result)) { - $result[$row->object_name] = clone($empty); - } - - if ($row->format === 'json') { - $result[$row->object_name]->{$row->property_name} = json_decode($row->property_value); - } else { - $result[$row->object_name]->{$row->property_name} = $row->property_value; - } - } - - if ($filter) { - $filtered = array(); - foreach ($result as $key => $row) { - if ($filter->matches($row)) { - $filtered[$key] = $row; - } - } - - return $filtered; - } - - return $result; - } - public function getLatestImportedChecksum($source) { $db = $this->db(); @@ -564,26 +465,6 @@ class Db extends DbConnection return $result; } - public function listImportedRowsetColumnNames($checksum) - { - $db = $this->db(); - - $query = $db->select()->distinct()->from( - array('p' => 'imported_property'), - 'property_name' - )->join( - array('rp' => 'imported_row_property'), - 'rp.property_checksum = p.checksum', - array() - )->join( - array('rsr' => 'imported_rowset_row'), - 'rsr.row_checksum = rp.row_checksum', - array() - )->where('rsr.rowset_checksum = ?', $this->quoteBinary(Util::hex2binary($checksum))); - - return $db->fetchCol($query); - } - public function enumCommands() { return $this->enumIcingaObjects('command'); diff --git a/library/Director/Import/Sync.php b/library/Director/Import/Sync.php index 87d70983..30434d36 100644 --- a/library/Director/Import/Sync.php +++ b/library/Director/Import/Sync.php @@ -392,10 +392,8 @@ class Sync // Provide an alias column for our key. TODO: double-check this! $key = $source->key_column; $this->sourceColumns[$sourceId][$key] = $key; - $rows = $this->db->fetchLatestImportedRows( - $sourceId, - $this->sourceColumns[$sourceId] - ); + $run = $source->fetchLastRun(true); + $rows = $run->fetchRows($this->sourceColumns[$sourceId]); $this->imported[$sourceId] = array(); foreach ($rows as $row) { @@ -434,6 +432,8 @@ class Sync $this->imported[$sourceId][$row->$key] = $row; } } + + unset($rows); } return $this; diff --git a/library/Director/Objects/ImportRun.php b/library/Director/Objects/ImportRun.php index a82fec18..3994997f 100644 --- a/library/Director/Objects/ImportRun.php +++ b/library/Director/Objects/ImportRun.php @@ -36,4 +36,89 @@ class ImportRun extends DbObject $this->getConnection()->quoteBinary($this->rowset_checksum) ); } + + public function listColumnNames() + { + $db = $this->getDb(); + + $query = $db->select()->distinct()->from( + array('p' => 'imported_property'), + 'property_name' + )->join( + array('rp' => 'imported_row_property'), + 'rp.property_checksum = p.checksum', + array() + )->join( + array('rsr' => 'imported_rowset_row'), + 'rsr.row_checksum = rp.row_checksum', + array() + )->where('rsr.rowset_checksum = ?', $this->getConnection()->quoteBinary($this->rowset_checksum)); + + return $db->fetchCol($query); + } + + public function fetchRows($columns, $filter = null) + { + $db = $this->getDb(); + $binchecksum = $this->rowset_checksum; + + $query = $db->select()->from( + array('rsr' => 'imported_rowset_row'), + array( + 'object_name' => 'r.object_name', + 'property_name' => 'p.property_name', + 'property_value' => 'p.property_value', + 'format' => 'p.format' + ) + )->join( + array('r' => 'imported_row'), + 'rsr.row_checksum = r.checksum', + array() + )->join( + array('rp' => 'imported_row_property'), + 'r.checksum = rp.row_checksum', + array() + )->join( + array('p' => 'imported_property'), + 'p.checksum = rp.property_checksum', + array() + )->where('rsr.rowset_checksum = ?', $this->getConnection()->quoteBinary($binchecksum))->order('r.object_name'); + + if ($columns === null) { + $columns = $this->listColumnNames(); + } else { + $query->where('p.property_name IN (?)', $columns); + } + + $result = array(); + $empty = (object) array(); + foreach ($columns as $k => $v) { + $empty->$k = null; + } + + foreach ($db->fetchAll($query) as $row) { + if (! array_key_exists($row->object_name, $result)) { + $result[$row->object_name] = clone($empty); + } + + if ($row->format === 'json') { + $result[$row->object_name]->{$row->property_name} = json_decode($row->property_value); + } else { + $result[$row->object_name]->{$row->property_name} = $row->property_value; + } + } + + if ($filter) { + $filtered = array(); + foreach ($result as $key => $row) { + if ($filter->matches($row)) { + $filtered[$key] = $row; + } + } + + return $filtered; + } + + return $result; + } } diff --git a/library/Director/Objects/ImportSource.php b/library/Director/Objects/ImportSource.php index 77e23fc0..12a55898 100644 --- a/library/Director/Objects/ImportSource.php +++ b/library/Director/Objects/ImportSource.php @@ -3,6 +3,7 @@ namespace Icinga\Module\Director\Objects; use Icinga\Application\Benchmark; +use Icinga\Exception\NotFoundError; use Icinga\Module\Director\Data\Db\DbObjectWithSettings; use Icinga\Module\Director\Import\Import; use Exception; @@ -29,15 +30,15 @@ class ImportSource extends DbObjectWithSettings protected $settingsRemoteId = 'source_id'; - public function fetchLastRun() + public function fetchLastRun($required = false) { - return $this->fetchLastRunBefore(time()); + return $this->fetchLastRunBefore(time() + 1, $required); } - public function fetchLastRunBefore($timestamp) + public function fetchLastRunBefore($timestamp, $required = false) { if (! $this->hasBeenLoadedFromDb()) { - return null; + return $this->nullUnlessRequired($required); } if ($timestamp === null) { @@ -58,10 +59,22 @@ class ImportSource extends DbObjectWithSettings if ($runId) { return ImportRun::load($runId, $this->getConnection()); } else { - return null; + return $this->nullUnlessRequired($required); } } + protected function nullUnlessRequired($required) + { + if ($required) { + throw new NotFoundError( + 'No data has been imported for "%s" yet', + $this->source_name + ); + } + + return null; + } + public function fetchRowModifiers() { $db = $this->getDb();