CoreApi: testing streams

This commit is contained in:
Thomas Gelf 2016-01-19 18:01:21 +01:00
parent df89a2b89e
commit 41d9179389
3 changed files with 107 additions and 4 deletions

View File

@ -8,8 +8,18 @@ use Icinga\Module\Director\Cli\Command;
use Icinga\Module\Director\Objects\IcingaHost; use Icinga\Module\Director\Objects\IcingaHost;
use Icinga\Module\Director\Objects\IcingaHostVar; use Icinga\Module\Director\Objects\IcingaHostVar;
use Icinga\Module\Director\Core\RestApiClient;
use Icinga\Module\Director\Core\CoreApi;
class BenchmarkCommand extends Command class BenchmarkCommand extends Command
{ {
public function streamAction()
{
$this->api()->stream();
}
public function filterAction() public function filterAction()
{ {
$flat = array(); $flat = array();
@ -47,4 +57,16 @@ class BenchmarkCommand extends Command
Benchmark::measure('all done'); Benchmark::measure('all done');
} }
protected function api()
{
$apiconfig = $this->Config()->getSection('api');
$client = new RestApiClient($apiconfig->get('address'), $apiconfig->get('port'));
$client->setCredentials($apiconfig->get('username'), $apiconfig->get('password'));
$api = new CoreApi($client);
return $api;
}
} }

View File

@ -354,6 +354,30 @@ constants
))->succeeded(); ))->succeeded();
} }
public function stream()
{
$allTypes = array(
'CheckResult',
'StateChange',
'Notification',
'AcknowledgementSet',
'AcknowledgementCleared',
'CommentAdded',
'CommentRemoved',
'DowntimeAdded',
'DowntimeRemoved',
'DowntimeTriggered'
);
$queue = 'director-rand';
$url = sprintf('events?queue=%s&types=%s', $queue, implode('&types=', $allTypes));
$this->client->request('post', $url, null, false, true);
}
public function dumpConfig(IcingaConfig $config, $db, $moduleName = 'director') public function dumpConfig(IcingaConfig $config, $db, $moduleName = 'director')
{ {
$start = microtime(true); $start = microtime(true);

View File

@ -19,6 +19,8 @@ class RestApiClient
protected $curl; protected $curl;
protected $readBuffer = '';
public function __construct($peer, $port = 5665, $cn = null) public function __construct($peer, $port = 5665, $cn = null)
{ {
$this->peer = $peer; $this->peer = $peer;
@ -45,11 +47,13 @@ class RestApiClient
return sprintf('https://%s:%d/%s/%s', $this->peer, $this->port, $this->version, $url); return sprintf('https://%s:%d/%s/%s', $this->peer, $this->port, $this->version, $url);
} }
protected function request($method, $url, $body = null, $raw = false) // protected function request($method, $url, $body = null, $raw = false)
public function request($method, $url, $body = null, $raw = false, $stream = false)
{ {
if (function_exists('curl_version')) { if (function_exists('curl_version')) {
return $this->curlRequest($method, $url, $body, $raw); return $this->curlRequest($method, $url, $body, $raw, $stream);
} elseif (version_compare(PHP_VERSION, '5.4.0') >= 0) { } elseif (version_compare(PHP_VERSION, '5.4.0') >= 0) {
// TODO: fail if stream
return $this->phpRequest($method, $url, $body, $raw); return $this->phpRequest($method, $url, $body, $raw);
} else { } else {
throw new Exception('No CURL extension detected, this is required for PHP < 5.4'); throw new Exception('No CURL extension detected, this is required for PHP < 5.4');
@ -106,12 +110,12 @@ class RestApiClient
} }
} }
protected function curlRequest($method, $url, $body = null, $raw = false) protected function curlRequest($method, $url, $body = null, $raw = false, $stream = false)
{ {
$auth = sprintf('%s:%s', $this->user, $this->pass); $auth = sprintf('%s:%s', $this->user, $this->pass);
$headers = array( $headers = array(
'Host: ' . $this->getPeerIdentity(), 'Host: ' . $this->getPeerIdentity(),
'Connection: close' // 'Connection: close'
); );
if (! $raw) { if (! $raw) {
@ -141,6 +145,10 @@ class RestApiClient
$opts[CURLOPT_POSTFIELDS] = $body; $opts[CURLOPT_POSTFIELDS] = $body;
} }
if ($stream) {
$opts[CURLOPT_WRITEFUNCTION] = array($this, 'readPart');
}
curl_setopt_array($curl, $opts); curl_setopt_array($curl, $opts);
Benchmark::measure('Rest Api, sending ' . $url); Benchmark::measure('Rest Api, sending ' . $url);
@ -150,6 +158,11 @@ class RestApiClient
} }
Benchmark::measure('Rest Api, got response'); Benchmark::measure('Rest Api, got response');
if ($stream) {
return $this;
}
if ($raw) { if ($raw) {
return $res; return $res;
} else { } else {
@ -192,4 +205,48 @@ class RestApiClient
} }
return $this->curl; return $this->curl;
} }
protected function readPart($curl, $data)
{
$length = strlen($data);
$this->readBuffer .= $data;
echo "Got $length bytes\n";
$this->dumpEvents();
return $length;
}
protected function dumpEvents()
{
$offset = 0;
while (false !== ($pos = strpos($this->readBuffer, "\n", $offset))) {
if ($pos === $offset) {
echo "Got empty line $offset / $pos\n";
$offset = $pos + 1;
continue;
}
$str = substr($this->readBuffer, $offset, $pos);
$decoded = json_decode($str);
if ($decoded === false) {
die('No json: ' . $str);
}
printf("Processing %s bytes\n", strlen($str));
print_r($decoded);
$offset = $pos + 1;
}
if ($offset > 0) {
$this->readBuffer = substr($this->readBuffer, $offset + 1);
}
echo "REMAINING: " . strlen($this->readBuffer) . "\n";
}
public function __destruct()
{
if ($this->curl !== null && is_resource($this->curl)) {
curl_close($this->curl);
}
}
} }