Livestatus\Connection: switch to new fetch methods

This commit is contained in:
Thomas Gelf 2014-11-16 16:25:47 +01:00
parent efd395e12a
commit fd55ffe47e
1 changed files with 57 additions and 54 deletions

View File

@ -32,6 +32,13 @@ class Connection
const TYPE_UNIX = 1;
const TYPE_TCP = 2;
protected $bytesRead = 0;
protected $responseSize;
protected $status;
protected $headers;
// List of available Livestatus tables. Kept here as we otherwise get no
// useful error message
protected $available_tables = array(
'hosts', // hosts
'services', // services, joined with all data from hosts
@ -110,9 +117,10 @@ class Connection
{
return 100;
$count = clone($query);
$count->count();
// WTF? $count->count();
Benchmark::measure('Sending Livestatus Count Query');
$data = $this->doFetch((string) $count);
$this->execute($query);
$data = $this->fetchRowFromSocket();
Benchmark::measure('Got Livestatus count result');
return $data[0][0];
}
@ -163,21 +171,26 @@ class Connection
public function fetchAll(Query $query)
{
Benchmark::measure('Sending Livestatus Query');
$data = $this->doFetch((string) $query);
$this->execute($query);
Benchmark::measure('Got Livestatus Data');
if ($query->hasColumns()) {
$headers = $query->getColumnAliases();
} else {
// TODO: left this here, find out how to handle it better
die('F*** no data');
$headers = array_shift($data);
}
$result = array();
foreach ($data as $row) {
$result_row = & $result[];
$result_row = (object) array();
foreach ($row as $key => $val) {
$result_row->{$headers[$key]} = $val;
}
$filter = $query->filterIsSupported() ? null : $query->getFilter();
while ($row = $this->fetchRowFromSocket()) {
$r = new ResponseRow($row, $query);
$res = $query->resultRow($row);
if ($filter !== null && ! $filter->matches($res)) continue;
$result[] = $res;
}
if ($query->hasOrder()) {
usort($result, array($query, 'compare'));
}
@ -193,59 +206,49 @@ class Connection
return $result;
}
protected function doFetch($raw_query)
protected function hasBeenExecuted()
{
$conn = $this->getConnection();
$this->writeToSocket($raw_query);
$header = $this->readFromSocket(16);
$status = (int) substr($header, 0, 3);
$length = (int) trim(substr($header, 4));
$body = $this->readFromSocket($length);
if ($status !== 200) {
throw new IcingaException(
'Problem while reading %d bytes from livestatus: %s',
$length,
$body
);
}
$result = json_decode($body);
if ($result === null) {
throw new IcingaException('Got invalid response body from livestatus');
}
return $result;
return $this->status !== null;
}
protected function readFromSocket($length)
protected function execute($query)
{
$offset = 0;
$buffer = '';
// Reset state
$this->status = null;
$this->responseSize = null;
$this->bytesRead = 0;
while ($offset < $length) {
$data = socket_read($this->connection, $length - $offset);
if ($data === false) {
throw new IcingaException(
'Failed to read from livestatus socket: %s',
socket_strerror(socket_last_error($this->connection))
);
}
$size = strlen($data);
$offset += $size;
$buffer .= $data;
$raw = $query->toString();
if ($size === 0) {
break;
}
}
if ($offset !== $length) {
throw new IcingaException(
'Got only %d instead of %d bytes from livestatus socket',
$offset,
$length
Benchmark::measure($raw);
// "debug"
// echo $raw . "\n<br>";
$this->writeToSocket($raw);
$header = $this->readLineFromSocket();
if (! preg_match('~^(\d{3})\s\s*(\d+)$~', $header, $m)) {
$this->disconnect();
throw new Exception(
sprintf('Got invalid header. First 16 Bytes: %s', $header)
);
}
return $buffer;
$this->status = (int) $m[1];
$this->bytesRead = 0;
$this->responseSize = (int) $m[2];
if ($this->status !== 200) {
// "debug"
//die(var_export($raw, 1));
throw new Exception(
sprintf(
'Error %d while querying livestatus: %s %s',
$this->status,
$raw,
$this->readLineFromSocket()
)
);
}
$this->discoverColumnHeaders($query);
}
protected function discoverColumnHeaders($query)