CoreApi: clean up the streaming prototype

This commit is contained in:
Thomas Gelf 2016-02-01 15:23:30 +01:00
parent 9786fd5dae
commit e4f0f97d4f
2 changed files with 28 additions and 10 deletions

View File

@ -45,6 +45,12 @@ class CoreApi
)->getResult('name'); )->getResult('name');
} }
public function onEvent($callback)
{
$this->client->onEvent($callback);
return $this;
}
public function getObject($name, $pluraltype, $attrs = array()) public function getObject($name, $pluraltype, $attrs = array())
{ {
$params = (object) array( $params = (object) array(

View File

@ -21,6 +21,8 @@ class RestApiClient
protected $readBuffer = ''; protected $readBuffer = '';
protected $onEvent;
public function __construct($peer, $port = 5665, $cn = null) public function __construct($peer, $port = 5665, $cn = null)
{ {
$this->peer = $peer; $this->peer = $peer;
@ -37,6 +39,12 @@ class RestApiClient
return $this; return $this;
} }
public function onEvent($callback)
{
$this->onEvent = $callback;
return $this;
}
public function getPeerIdentity() public function getPeerIdentity()
{ {
return $this->peer; return $this->peer;
@ -156,7 +164,6 @@ class RestApiClient
if ($res === false) { if ($res === false) {
throw new Exception('CURL ERROR: ' . curl_error($curl)); throw new Exception('CURL ERROR: ' . curl_error($curl));
} }
Benchmark::measure('Rest Api, got response'); Benchmark::measure('Rest Api, got response');
if ($stream) { if ($stream) {
@ -210,12 +217,12 @@ class RestApiClient
{ {
$length = strlen($data); $length = strlen($data);
$this->readBuffer .= $data; $this->readBuffer .= $data;
echo "Got $length bytes\n"; // echo "Got $length bytes\n";
$this->dumpEvents(); $this->processEvents();
return $length; return $length;
} }
protected function dumpEvents() protected function processEvents()
{ {
$offset = 0; $offset = 0;
while (false !== ($pos = strpos($this->readBuffer, "\n", $offset))) { while (false !== ($pos = strpos($this->readBuffer, "\n", $offset))) {
@ -227,11 +234,16 @@ class RestApiClient
$str = substr($this->readBuffer, $offset, $pos); $str = substr($this->readBuffer, $offset, $pos);
$decoded = json_decode($str); $decoded = json_decode($str);
if ($decoded === false) { if ($decoded === false) {
die('No json: ' . $str); throw new Exception('Got invalid JSON: ' . $str);
} }
printf("Processing %s bytes\n", strlen($str));
print_r($decoded); // printf("Processing %s bytes\n", strlen($str));
// print_r($decoded);
if ($this->onEvent !== null) {
$func = $this->onEvent;
$func($decoded);
}
$offset = $pos + 1; $offset = $pos + 1;
} }
@ -240,7 +252,7 @@ print_r($decoded);
$this->readBuffer = substr($this->readBuffer, $offset + 1); $this->readBuffer = substr($this->readBuffer, $offset + 1);
} }
echo "REMAINING: " . strlen($this->readBuffer) . "\n"; // echo "REMAINING: " . strlen($this->readBuffer) . "\n";
} }
public function __destruct() public function __destruct()