Import: various fixes for PostgreSQL

refs #11315
This commit is contained in:
Thomas Gelf 2016-03-05 20:01:31 +01:00
parent 5268db6a61
commit 9ede46f839
2 changed files with 33 additions and 18 deletions

View File

@ -306,7 +306,7 @@ class Db extends DbConnection
{ {
$db = $this->db(); $db = $this->db();
$query = $db->select() $query = $db->select()
->from('import_run', 'rowset_checksum') ->from('import_run', $this->dbHexFunc('rowset_checksum'))
->where('id = ?', $id); ->where('id = ?', $id);
return $db->fetchOne($query); return $db->fetchOne($query);
@ -381,7 +381,10 @@ class Db extends DbConnection
} }
} }
$lastRun = $db->select()->from('import_run', array('rowset_checksum')); $lastRun = $db->select()->from(
'import_run',
array('checksum' => $this->dbHexFunc('rowset_checksum'))
);
if (is_int($source) || ctype_digit($source)) { if (is_int($source) || ctype_digit($source)) {
$lastRun->where('source_id = ?', $source); $lastRun->where('source_id = ?', $source);
@ -391,7 +394,6 @@ class Db extends DbConnection
$lastRun->order('start_time DESC')->limit(1); $lastRun->order('start_time DESC')->limit(1);
$checksum = $db->fetchOne($lastRun); $checksum = $db->fetchOne($lastRun);
// TODO: Postgres decoding?
return $this->fetchImportedRowsetRows($checksum, $columns); return $this->fetchImportedRowsetRows($checksum, $columns);
} }
@ -399,6 +401,8 @@ class Db extends DbConnection
public function fetchImportedRowsetRows($checksum, $columns) public function fetchImportedRowsetRows($checksum, $columns)
{ {
$db = $this->db(); $db = $this->db();
$binchecksum = Util::hex2binary($checksum);
$query = $db->select()->from( $query = $db->select()->from(
array('rsr' => 'imported_rowset_row'), array('rsr' => 'imported_rowset_row'),
array( array(
@ -419,7 +423,7 @@ class Db extends DbConnection
array('p' => 'imported_property'), array('p' => 'imported_property'),
'p.checksum = rp.property_checksum', 'p.checksum = rp.property_checksum',
array() array()
)->where('rsr.rowset_checksum = ?', $checksum)->order('r.object_name'); )->where('rsr.rowset_checksum = ?', $this->quoteBinary($binchecksum))->order('r.object_name');
if ($columns === null) { if ($columns === null) {
$columns = $this->listImportedRowsetColumnNames($checksum); $columns = $this->listImportedRowsetColumnNames($checksum);
@ -528,7 +532,7 @@ class Db extends DbConnection
array('rsr' => 'imported_rowset_row'), array('rsr' => 'imported_rowset_row'),
'rsr.row_checksum = rp.row_checksum', 'rsr.row_checksum = rp.row_checksum',
array() array()
)->where('rsr.rowset_checksum = ?', $checksum); )->where('rsr.rowset_checksum = ?', $this->quoteBinary(Util::hex2binary($checksum)));
return $db->fetchCol($query); return $db->fetchCol($query);
} }
@ -701,7 +705,7 @@ class Db extends DbConnection
return $this->getDbType() === 'pgsql'; return $this->getDbType() === 'pgsql';
} }
protected function dbHexFunc($column) public function dbHexFunc($column)
{ {
if ($this->isPgsql()) { if ($this->isPgsql()) {
return sprintf("LOWER(ENCODE(%s, 'hex'))", $column); return sprintf("LOWER(ENCODE(%s, 'hex'))", $column);
@ -710,10 +714,10 @@ class Db extends DbConnection
} }
} }
protected function quoteBinary($binary) public function quoteBinary($binary)
{ {
if ($this->isPgsql()) { if ($this->isPgsql()) {
return new Zend_Db_Expr("\\x" . bin2hex($binary)); return new Zend_Db_Expr("'\\x" . bin2hex($binary) . "'");
} }
return $binary; return $binary;

View File

@ -100,14 +100,17 @@ class Import
'import_run', 'import_run',
array( array(
'source_id' => $this->source->id, 'source_id' => $this->source->id,
'rowset_checksum' => $this->rowsetChecksum(), 'rowset_checksum' => $this->quoteBinary($this->rowsetChecksum()),
'start_time' => date('Y-m-d H:i:s'), 'start_time' => date('Y-m-d H:i:s'),
'succeeded' => 'y' 'succeeded' => 'y'
) )
); );
if ($this->connection->isPgsql()) {
return $this->db->lastInsertId('import_run', 'id');
} else {
return $this->db->lastInsertId(); return $this->db->lastInsertId();
} }
}
/** /**
* Whether there are no rows to be fetched from import source * Whether there are no rows to be fetched from import source
@ -207,7 +210,7 @@ class Import
if (! array_key_exists($checksum, $this->properties)) { if (! array_key_exists($checksum, $this->properties)) {
$this->properties[$checksum] = array( $this->properties[$checksum] = array(
'checksum' => $checksum, 'checksum' => $this->quoteBinary($checksum),
'property_name' => $key, 'property_name' => $key,
'property_value' => $value, 'property_value' => $value,
'format' => $format 'format' => $format
@ -290,7 +293,7 @@ class Import
} }
$this->rows[$checksum] = array( $this->rows[$checksum] = array(
'checksum' => $checksum, 'checksum' => $this->quoteBinary($checksum),
'object_name' => $object_name 'object_name' => $object_name
); );
@ -323,7 +326,7 @@ class Import
$newProperties = $this->newChecksums('imported_property', array_keys($this->properties)); $newProperties = $this->newChecksums('imported_property', array_keys($this->properties));
} }
$db->insert('imported_rowset', array('checksum' => $rowset)); $db->insert('imported_rowset', array('checksum' => $this->quoteBinary($rowset)));
foreach ($newProperties as $checksum) { foreach ($newProperties as $checksum) {
$db->insert('imported_property', $this->properties[$checksum]); $db->insert('imported_property', $this->properties[$checksum]);
@ -333,7 +336,7 @@ class Import
$db->insert('imported_row', $rows[$row]); $db->insert('imported_row', $rows[$row]);
foreach ($this->rowProperties[$row] as $property) { foreach ($this->rowProperties[$row] as $property) {
$db->insert('imported_row_property', array( $db->insert('imported_row_property', array(
'row_checksum' => $row, 'row_checksum' => $this->quoteBinary($row),
'property_checksum' => $property 'property_checksum' => $property
)); ));
} }
@ -343,8 +346,8 @@ class Import
$db->insert( $db->insert(
'imported_rowset_row', 'imported_rowset_row',
array( array(
'rowset_checksum' => $rowset, 'rowset_checksum' => $this->quoteBinary($rowset),
'row_checksum' => $row 'row_checksum' => $this->quoteBinary($row)
) )
); );
} }
@ -414,7 +417,10 @@ class Import
$query = $db $query = $db
->select() ->select()
->from($table, 'checksum') ->from($table, 'checksum')
->where('LOWER(HEX(checksum)) IN (?)', $hexed); ->where(
$this->connection->dbHexFunc('checksum') . ' IN (?)',
$hexed
);
$existing = $db->fetchCol($query); $existing = $db->fetchCol($query);
@ -458,4 +464,9 @@ class Import
$el = $this->sortObject($el); $el = $this->sortObject($el);
} }
} }
protected function quoteBinary($bin)
{
return $this->connection->quoteBinary($bin);
}
} }