Import: refine API, code cleanup

This commit is contained in:
Thomas Gelf 2015-11-03 12:52:38 +01:00
parent 9bdfb7dc89
commit 39b58d24b7
3 changed files with 258 additions and 102 deletions

View File

@ -13,7 +13,9 @@ class ImportCommand extends Command
public function runAction() public function runAction()
{ {
if ($runId = Import::run($id = ImportSource::load($this->params->shift(), $this->db()))) { $id = $this->params->shift();
$import = new Import(ImportSource::load($id, $this->db()));
if ($runId = $import->run()) {
echo "Triggered new import\n"; echo "Triggered new import\n";
} else { } else {
echo "Nothing changed\n"; echo "Nothing changed\n";

View File

@ -22,7 +22,9 @@ class ImportsourceController extends ActionController
public function runAction() public function runAction()
{ {
if ($runId = Import::run($id = ImportSource::load($this->params->get('id'), $this->db()))) { $id = $this->params->get('id');
$import = new Import(ImportSource::load($id, $this->db()));
if ($runId = $import->run()) {
Notification::success('Import succeeded'); Notification::success('Import succeeded');
$this->redirectNow(Url::fromPath('director/importrun', array('id' => $runId))); $this->redirectNow(Url::fromPath('director/importrun', array('id' => $runId)));
} else { } else {

View File

@ -25,22 +25,135 @@ class Import
*/ */
protected $db; protected $db;
/**
* Raw data that should be imported, array of stdClass objects
*
* @var array
*/
protected $data; protected $data;
protected function __construct(ImportSource $source) /**
* Checksum of the rowset that should be imported
*
* @var string
*/
private $rowsetChecksum;
/**
* Checksum-indexed rows
*
* @var array
*/
private $rows;
/**
* Checksum-indexed row -> property
*
* @var array
*/
private $rowProperties;
/**
* Whether this rowset exists, for caching purposes
*
* @var boolean
*/
private $rowsetExists;
protected $properties = array();
/**
* Checksums of all rows
*/
private $rowChecksums;
public function __construct(ImportSource $source)
{ {
$this->source = $source; $this->source = $source;
$this->connection = $source->getConnection(); $this->connection = $source->getConnection();
$this->db = $connection->getDbAdapter(); $this->db = $this->connection->getDbAdapter();
} }
public static function run(ImportSource $source) /**
* Whether this import provides modified data
*
* @return boolean
*/
public function providesChanges()
{ {
$import = new self($source); return ! $this->rowsetExists()
return $import->importFromSource($source); || ! $this->lastRowsetIs($this->rowsetChecksum());
} }
protected function rawData() /**
* Trigger an import run
*
* @return int Last import run ID
*/
public function run()
{
if ($this->providesChanges() && ! $this->rowsetExists()) {
$this->storeRowset();
}
$this->db->insert(
'import_run',
array(
'source_id' => $this->source->id,
'rowset_checksum' => $this->rowsetChecksum(),
'start_time' => date('Y-m-d H:i:s'),
'succeeded' => 'y'
)
);
return $this->db->lastInsertId();
}
/**
* Whether there are no rows to be fetched from import source
*
* @return boolean
*/
public function isEmpty()
{
$rows = $this->checksummedRows();
return empty($rows);
}
/**
* Checksum of all available rows
*
* @return string
*/
protected function & rowsetChecksum()
{
if ($this->rowsetChecksum === null) {
$this->prepareChecksummedRows();
}
return $this->rowsetChecksum;
}
/**
* Checksum of all available rows
*
* @return string
*/
protected function & checksummedRows()
{
if ($this->rows === null) {
$this->prepareChecksummedRows();
}
return $this->rows;
}
/**
* Checksum of all available rows
*
* @return string
*/
protected function & rawData()
{ {
if ($this->data === null) { if ($this->data === null) {
$this->data = ImportSourceHook::loadByName( $this->data = ImportSourceHook::loadByName(
@ -49,145 +162,159 @@ class Import
)->fetchData(); )->fetchData();
} }
return & $this->data; return $this->data;
} }
protected function importFromSource() /**
* Prepare and remember an ImportedProperty
*
* @return array
*/
protected function prepareImportedProperty($key, $rawValue)
{ {
$source = $this->source; if (is_array($rawValue)) {
$connection = $source->getConnection(); $value = json_encode($rawValue);
$this->db = $db = $connection->getDbAdapter(); $format = 'json';
} elseif ($rawValue instanceof stdClass) {
$value = json_encode($this->sortObject($rawValue));
$format = 'json';
} else {
$value = $rawValue;
$format = 'string';
}
$keyColumn = $source->key_column; $checksum = sha1(sprintf('%s=(%s)%s', $key, $format, $value), true);
$rows = array();
$props = array(); if (! array_key_exists($checksum, $this->properties)) {
$rowsProps = array(); $this->properties[$checksum] = array(
'checksum' => $checksum,
'property_name' => $key,
'property_value' => $value,
'format' => $format
);
}
return $this->properties[$checksum];
}
/**
* Walk through each row, prepare properties and calculate checksums
*/
protected function prepareChecksummedRows()
{
$keyColumn = $this->source->key_column;
$this->rows = array();
$this->rowProperties = array();
$objects = array();
$rowCount = 0;
foreach ($this->rawData() as $row) { foreach ($this->rawData() as $row) {
// TODO: Check for name collision $rowCount++;
// Key column must be set
if (! isset($row->$keyColumn)) { if (! isset($row->$keyColumn)) {
// TODO: re-enable errors
continue;
throw new IcingaException( throw new IcingaException(
'No key column "%s" in row: %s', 'No key column "%s" in row %d: %s',
$keyColumn, $keyColumn,
$rowCount,
json_encode($row) json_encode($row)
); );
} }
$object_name = $row->$keyColumn; $object_name = $row->$keyColumn;
// Check for name collision
if (array_key_exists($object_name, $objects)) {
throw new IcingaException(
'Duplicate entry: %s',
$object_name
);
}
$rowChecksums = array(); $rowChecksums = array();
$keys = array_keys((array) $row); $keys = array_keys((array) $row);
sort($keys); sort($keys);
foreach ($keys as $key) { foreach ($keys as $key) {
// TODO: Specify how to treat NULL values. Ignoring for now. // TODO: Specify how to treat NULL values. Ignoring for now.
// One option might be to import null (checksum '(null)')
// and to provide a flag at sync time
if ($row->$key === null) { if ($row->$key === null) {
continue; continue;
} }
$pval = $row->$key; $property = $this->prepareImportedProperty($key, $row->$key);
if (is_array($pval)) { $rowChecksums[] = $property['checksum'];
$pval = json_encode($pval);
$format = 'json';
} elseif ($pval instanceof stdClass) {
$pval = json_encode($this->sortObject($pval));
$format = 'json';
} else {
$format = 'string';
}
$checksum = sha1(sprintf('%s=(%s)%s', $key, $format, $pval), true);
if (! array_key_exists($checksum, $props)) {
$props[$checksum] = array(
'checksum' => $checksum,
'property_name' => $key,
'property_value' => $pval,
'format' => $format
);
}
$rowChecksums[] = $checksum;
} }
$checksum = sha1($object_name . ';' . implode(';', $rowChecksums), true); $checksum = sha1($object_name . ';' . implode(';', $rowChecksums), true);
if (array_key_exists($checksum, $rows)) { if (array_key_exists($checksum, $this->rows)) {
die('WTF, collision?'); die('WTF, collision?');
} }
$rows[$checksum] = array( $this->rows[$checksum] = array(
'checksum' => $checksum, 'checksum' => $checksum,
'object_name' => $object_name 'object_name' => $object_name
); );
$rowsProps[$checksum] = $rowChecksums; $this->rowProperties[$checksum] = $rowChecksums;
$objects[$object_name] = $checksum;
} }
$rowSums = array_keys($rows); $this->rowChecksums = array_keys($this->rows);
$rowset = sha1(implode(';', $rowSums), true); $this->rowsetChecksum = sha1(implode(';', $this->rowChecksums), true);
return $this;
}
if ($this->rowSetExists($rowset)) { /**
if ($this->lastRowsetIs($rowset)) { * Store our new rowset
return false; */
} protected function storeRowset()
} {
$db = $this->db;
$rowset = $this->rowsetChecksum();
$rows = $this->checksummedRows();
$db->beginTransaction(); $db->beginTransaction();
if (! $this->rowSetExists($rowset)) {
if (empty($rowSums)) { if ($this->isEmpty()) {
$newRows = array(); $newRows = array();
} else { $newProperties = array();
$newRows = $this->newChecksums('imported_row', $rowSums); } else {
} $newRows = $this->newChecksums('imported_row', $this->rowChecksums);
$newProperties = $this->newChecksums('imported_property', array_keys($this->properties));
}
if (empty($rowSums)) { $db->insert('imported_rowset', array('checksum' => $rowset));
$newProperties = array();
} else {
$newProperties = $this->newChecksums('imported_property', array_keys($props));
}
$db->insert('imported_rowset', array('checksum' => $rowset)); foreach ($newProperties as $checksum) {
$db->insert('imported_property', $this->properties[$checksum]);
}
foreach ($newProperties as $checksum) { foreach ($newRows as $row) {
$db->insert('imported_property', $props[$checksum]); $db->insert('imported_row', $rows[$row]);
} foreach ($this->rowProperties[$row] as $property) {
$db->insert('imported_row_property', array(
foreach ($newRows as $checksum) { 'row_checksum' => $row,
$db->insert('imported_row', $rows[$checksum]); 'property_checksum' => $property
foreach ($rowsProps[$checksum] as $propChecksum) { ));
$db->insert('imported_row_property', array(
'row_checksum' => $checksum,
'property_checksum' => $propChecksum
));
}
}
foreach (array_keys($rows) as $checksum) {
$db->insert(
'imported_rowset_row',
array(
'rowset_checksum' => $rowset,
'row_checksum' => $checksum
)
);
} }
} }
$db->insert(
'import_run', foreach (array_keys($rows) as $row) {
array( $db->insert(
'source_id' => $source->id, 'imported_rowset_row',
'rowset_checksum' => $rowset, array(
'start_time' => date('Y-m-d H:i:s'), 'rowset_checksum' => $rowset,
'succeeded' => 'y' 'row_checksum' => $row
) )
); );
$id = $db->lastInsertId(); }
$db->commit(); $db->commit();
return $id; $this->rowsetExists = true;
} }
/** /**
@ -199,11 +326,36 @@ class Import
=== Util::binary2hex($checksum); === Util::binary2hex($checksum);
} }
protected function rowSetExists($checksum) /**
* Whether our rowset already exists in the database
*
* @return boolean
*/
protected function rowsetExists()
{ {
return count($this->newChecksums('imported_rowset', array($checksum))) === 0; if (null === $this->rowsetExists) {
$this->rowsetExists = 0 === count(
$this->newChecksums(
'imported_rowset',
array($this->rowsetChecksum())
)
);
}
return $this->rowsetExists;
} }
/**
* Finde new checksums for a specific table
*
* Accepts an array of checksums and gives you an array with those checksums
* that are missing in the given table
*
* @param string $table Database table name
* @param array $checksums Array with the checksums that should be verified
*
* @return array
*/
protected function newChecksums($table, $checksums) protected function newChecksums($table, $checksums)
{ {
$db = $this->db; $db = $this->db;