CoreApi: allow to stream raw events

fixes #744
This commit is contained in:
Thomas Gelf 2017-01-24 11:12:28 +01:00
parent ddd711e5e8
commit e1b420d4c4
2 changed files with 37 additions and 18 deletions

View File

@ -48,9 +48,9 @@ class CoreApi implements DeploymentApiInterface
)->getResult('name');
}
public function onEvent($callback)
public function onEvent($callback, $raw = false)
{
$this->client->onEvent($callback);
$this->client->onEvent($callback, $raw);
return $this;
}

View File

@ -24,6 +24,8 @@ class RestApiClient
protected $onEvent;
protected $onEventWantsRaw;
public function __construct($peer, $port = 5665, $cn = null)
{
$this->peer = $peer;
@ -40,8 +42,9 @@ class RestApiClient
return $this;
}
public function onEvent($callback)
public function onEvent($callback, $raw = false)
{
$this->onEventWantsRaw = $raw;
$this->onEvent = $callback;
return $this;
}
@ -162,6 +165,7 @@ class RestApiClient
if ($stream) {
$opts[CURLOPT_WRITEFUNCTION] = array($this, 'readPart');
$opts[CURLOPT_TCP_NODELAY] = 1;
}
curl_setopt_array($curl, $opts);
@ -193,7 +197,12 @@ class RestApiClient
}
}
protected function readPart($curl, $data)
/**
* @param resource $curl
* @param $data
* @return int
*/
protected function readPart($curl, & $data)
{
$length = strlen($data);
$this->readBuffer .= $data;
@ -248,23 +257,11 @@ class RestApiClient
$offset = 0;
while (false !== ($pos = strpos($this->readBuffer, "\n", $offset))) {
if ($pos === $offset) {
echo "Got empty line $offset / $pos\n";
// echo "Got empty line $offset / $pos\n";
$offset = $pos + 1;
continue;
}
$str = substr($this->readBuffer, $offset, $pos);
$decoded = json_decode($str);
if ($decoded === false) {
throw new Exception('Got invalid JSON: ' . $str);
}
// printf("Processing %s bytes\n", strlen($str));
// print_r($decoded);
if ($this->onEvent !== null) {
$func = $this->onEvent;
$func($decoded);
}
$this->processReadBuffer($offset, $pos);
$offset = $pos + 1;
}
@ -276,6 +273,28 @@ class RestApiClient
// echo "REMAINING: " . strlen($this->readBuffer) . "\n";
}
protected function processReadBuffer($offset, $pos)
{
if ($this->onEvent === null) {
return;
}
$func = $this->onEvent;
$str = substr($this->readBuffer, $offset, $pos);
// printf("Processing %s bytes\n", strlen($str));
if ($this->onEventWantsRaw) {
$func($str);
} else {
$decoded = json_decode($str);
if ($decoded === false) {
throw new Exception('Got invalid JSON: ' . $str);
}
$func($decoded);
}
}
public function __destruct()
{
if ($this->curl !== null && is_resource($this->curl)) {