mirror of https://github.com/Icinga/icinga2.git
264 lines
8.2 KiB
Python
264 lines
8.2 KiB
Python
from __future__ import unicode_literals
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import socket
|
|
import subprocess
|
|
|
|
__all__ = ['parse_statusdata', 'run_mysql_query', 'run_pgsql_query',
|
|
'LiveStatusSocket']
|
|
|
|
|
|
MYSQL_PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
|
|
MYSQL_SEPARATOR = '|'
|
|
|
|
PGSQL_PARAMS = b"-nq -U icinga -d icinga -c".split()
|
|
PGSQL_SEPARATOR = '|'
|
|
PGSQL_ENVIRONMENT = {
|
|
b'PGPASSWORD': b'icinga'
|
|
}
|
|
|
|
|
|
def parse_statusdata(data, intelligent_cast=True):
|
|
parsed_data, data_type, type_data = {}, '', {}
|
|
for line in (l for l in data.split(os.linesep)
|
|
if l and not l.startswith('#')):
|
|
if '{' in line:
|
|
data_type = line.partition('{')[0].strip()
|
|
elif '}' in line:
|
|
parsed_data.setdefault(data_type, []).append(type_data)
|
|
else:
|
|
key, _, value = line.partition('=')
|
|
|
|
if intelligent_cast:
|
|
value = _cast_status_value(value)
|
|
|
|
type_data[key.strip()] = value
|
|
|
|
return parsed_data
|
|
|
|
|
|
def _cast_status_value(value):
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
try:
|
|
return float(value)
|
|
except ValueError:
|
|
return value
|
|
|
|
|
|
def run_mysql_query(query, path):
|
|
p = subprocess.Popen([path] + MYSQL_PARAMS + [query.encode('utf-8')],
|
|
stdout=subprocess.PIPE)
|
|
Logger.debug('Sent MYSQL query: {0!r}\n'.format(query))
|
|
resultset = [l.decode('utf-8') for l in p.stdout.readlines()]
|
|
Logger.debug('Received MYSQL resultset: {0!r}\n'
|
|
''.format(''.join(resultset)), True)
|
|
return _parse_mysql_result(resultset)
|
|
|
|
|
|
def _parse_mysql_result(resultset):
|
|
result, header = [], None
|
|
for line in (l for l in resultset if MYSQL_SEPARATOR in l):
|
|
columns = [c.strip() for c in line[1:-3].split(MYSQL_SEPARATOR)]
|
|
if header is None:
|
|
header = columns
|
|
else:
|
|
result.append(dict((header[i], v if v != 'NULL' else None)
|
|
for i, v in enumerate(columns)))
|
|
return result
|
|
|
|
|
|
def run_pgsql_query(query, path):
|
|
p = subprocess.Popen([path] + PGSQL_PARAMS + [query.encode('utf-8')],
|
|
stdout=subprocess.PIPE, env=PGSQL_ENVIRONMENT)
|
|
Logger.debug('Sent PostgreSQL query: {0!r}\n'.format(query))
|
|
resultset = [l.decode('utf-8') for l in p.stdout.readlines()]
|
|
Logger.debug('Received PostgreSQL resultset: {0!r}\n'
|
|
''.format(''.join(resultset)), True)
|
|
return _parse_pgsql_result(resultset)
|
|
|
|
|
|
def _parse_pgsql_result(resultset):
|
|
result, header = [], None
|
|
for line in (l for l in resultset if PGSQL_SEPARATOR in l):
|
|
columns = [c.strip() for c in line.split(PGSQL_SEPARATOR)]
|
|
if header is None:
|
|
header = columns
|
|
else:
|
|
result.append(dict((header[i], v) for i, v in enumerate(columns)))
|
|
return result
|
|
|
|
|
|
class LiveStatusError(Exception):
|
|
pass
|
|
|
|
|
|
class LiveStatusSocket(object):
|
|
options = [
|
|
'KeepAlive: on',
|
|
'OutputFormat: json',
|
|
'ResponseHeader: fixed16'
|
|
]
|
|
|
|
def __init__(self, path):
|
|
self.path = path
|
|
|
|
self._connected = False
|
|
|
|
def __enter__(self):
|
|
self.connect()
|
|
return self
|
|
|
|
def __exit__(self, exc_type, exc_value, tb):
|
|
self.close()
|
|
|
|
def connect(self):
|
|
self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
Logger.debug('Opened UNIX stream socket\n', True)
|
|
self.sock.connect(self.path)
|
|
Logger.debug('Connected to Livestatus socket: {0}\n'.format(self.path),
|
|
True)
|
|
self._connected = True
|
|
|
|
def reconnect(self, timeout=30):
|
|
Logger.debug('Reconnecting to Livestatus socket\n', True)
|
|
start = time.time()
|
|
while not self._connected and time.time() - start < timeout:
|
|
try:
|
|
self.connect()
|
|
except socket.error, error:
|
|
Logger.debug('Could not connect: {0}\n'.format(error), True)
|
|
# Icinga2 does some "magic" with the socket during startup
|
|
# which causes random errors being raised (EACCES, ENOENT, ..)
|
|
# so we just ignore them until the timeout is reached
|
|
time.sleep(1)
|
|
if not self._connected:
|
|
# Raise the very last exception once the timeout is reached
|
|
raise
|
|
|
|
def close(self):
|
|
if self._connected:
|
|
self.sock.shutdown(socket.SHUT_RDWR)
|
|
Logger.debug('Shutted down Livestatus connection\n', True)
|
|
self.sock.close()
|
|
Logger.debug('Closed Livestatus socket\n', True)
|
|
self._connected = False
|
|
|
|
def query(self, command):
|
|
self.send(command)
|
|
statuscode, response = self.recv()
|
|
|
|
if statuscode != 200:
|
|
raise LiveStatusError(statuscode, response)
|
|
|
|
return response
|
|
|
|
def send(self, query):
|
|
if not self._connected:
|
|
raise RuntimeError('Tried to write to closed socket')
|
|
|
|
full_query = '\n'.join([query] + self.options)
|
|
self.sock.sendall((full_query + '\n\n').encode('utf-8'))
|
|
Logger.debug('Sent Livestatus query: {0!r}\n'.format(full_query))
|
|
|
|
def recv(self):
|
|
if not self._connected:
|
|
raise RuntimeError('Tried to read from closed socket')
|
|
|
|
response = b''
|
|
response_header = self.sock.recv(16)
|
|
response_code = int(response_header[:3])
|
|
response_length = int(response_header[3:].strip())
|
|
|
|
if response_length > 0:
|
|
while len(response) < response_length:
|
|
response += self.sock.recv(response_length - len(response))
|
|
|
|
response = response.decode('utf-8')
|
|
|
|
try:
|
|
response = json.loads(response)
|
|
except ValueError:
|
|
pass
|
|
|
|
Logger.debug('Received Livestatus response: {0!r} (Header was: {1!r})'
|
|
'\n'.format(response, response_header), True)
|
|
return response_code, response
|
|
|
|
|
|
class Logger(object):
|
|
INFO = 1
|
|
OK = 2
|
|
FAIL = 3
|
|
ERROR = 4
|
|
DEBUG_STD = 5
|
|
DEBUG_EXT = 6
|
|
|
|
VERBOSITY = None
|
|
|
|
@classmethod
|
|
def permitted(cls, severity):
|
|
if cls.VERBOSITY is None:
|
|
cls.VERBOSITY = next((int(sys.argv.pop(i).partition('=')[2])
|
|
for i, a in enumerate(sys.argv)
|
|
if a.startswith('--verbosity=')), 1)
|
|
|
|
return (severity == cls.INFO and cls.VERBOSITY >= 1) or \
|
|
(severity == cls.OK and cls.VERBOSITY >= 1) or \
|
|
(severity == cls.FAIL and cls.VERBOSITY >= 1) or \
|
|
(severity == cls.ERROR and cls.VERBOSITY >= 1) or \
|
|
(severity == cls.DEBUG_STD and cls.VERBOSITY >= 2) or \
|
|
(severity == cls.DEBUG_EXT and cls.VERBOSITY >= 3)
|
|
|
|
@staticmethod
|
|
def write(text, stderr=False):
|
|
if stderr:
|
|
sys.stderr.write(text)
|
|
sys.stderr.flush()
|
|
else:
|
|
sys.stdout.write(text)
|
|
sys.stdout.flush()
|
|
|
|
@classmethod
|
|
def log(cls, severity, text):
|
|
if severity == cls.INFO and cls.permitted(cls.INFO):
|
|
cls.write('\x00[INFO] {0}'.format(text))
|
|
elif severity == cls.ERROR and cls.permitted(cls.ERROR):
|
|
cls.write('\x00[ERROR] {0}'.format(text))
|
|
elif severity == cls.FAIL and cls.permitted(cls.FAIL):
|
|
cls.write('\x00[FAIL] {0}'.format(text))
|
|
elif severity == cls.OK and cls.permitted(cls.OK):
|
|
cls.write('\x00[OK] {0}'.format(text))
|
|
elif severity == cls.DEBUG_STD and cls.permitted(cls.DEBUG_STD):
|
|
cls.write('\x00\x00[DEBUG] {0}'.format(text))
|
|
elif severity == cls.DEBUG_EXT and cls.permitted(cls.DEBUG_EXT):
|
|
cls.write('\x00\x00\x00\x00[DEBUG] {0}'.format(text))
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def info(cls, text):
|
|
return cls.log(cls.INFO, text)
|
|
|
|
@classmethod
|
|
def error(cls, text):
|
|
return cls.log(cls.ERROR, text)
|
|
|
|
@classmethod
|
|
def fail(cls, text):
|
|
return cls.log(cls.FAIL, text)
|
|
|
|
@classmethod
|
|
def ok(cls, text):
|
|
return cls.log(cls.OK, text)
|
|
|
|
@classmethod
|
|
def debug(cls, text, extended=False):
|
|
return cls.log(cls.DEBUG_EXT if extended else cls.DEBUG_STD, text)
|
|
|