mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-10-25 17:24:10 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			264 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			264 lines
		
	
	
		
			8.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import unicode_literals
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import time
 | |
| import json
 | |
| import socket
 | |
| import subprocess
 | |
| 
 | |
| __all__ = ['parse_statusdata', 'run_mysql_query', 'run_pgsql_query',
 | |
|            'LiveStatusSocket']
 | |
| 
 | |
| 
 | |
| MYSQL_PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
 | |
| MYSQL_SEPARATOR = '|'
 | |
| 
 | |
| PGSQL_PARAMS = b"-nq -U icinga -d icinga -c".split()
 | |
| PGSQL_SEPARATOR = '|'
 | |
| PGSQL_ENVIRONMENT = {
 | |
|     b'PGPASSWORD': b'icinga'
 | |
|     }
 | |
| 
 | |
| 
 | |
| def parse_statusdata(data, intelligent_cast=True):
 | |
|     parsed_data, data_type, type_data = {}, '', {}
 | |
|     for line in (l for l in data.split(os.linesep)
 | |
|                    if l and not l.startswith('#')):
 | |
|         if '{' in line:
 | |
|             data_type = line.partition('{')[0].strip()
 | |
|         elif '}' in line:
 | |
|             parsed_data.setdefault(data_type, []).append(type_data)
 | |
|         else:
 | |
|             key, _, value = line.partition('=')
 | |
| 
 | |
|             if intelligent_cast:
 | |
|                 value = _cast_status_value(value)
 | |
| 
 | |
|             type_data[key.strip()] = value
 | |
| 
 | |
|     return parsed_data
 | |
| 
 | |
| 
 | |
| def _cast_status_value(value):
 | |
|     try:
 | |
|         return int(value)
 | |
|     except ValueError:
 | |
|         try:
 | |
|             return float(value)
 | |
|         except ValueError:
 | |
|             return value
 | |
| 
 | |
| 
 | |
| def run_mysql_query(query, path):
 | |
|     p = subprocess.Popen([path] + MYSQL_PARAMS + [query.encode('utf-8')],
 | |
|                          stdout=subprocess.PIPE)
 | |
|     Logger.debug('Sent MYSQL query: {0!r}\n'.format(query))
 | |
|     resultset = [l.decode('utf-8') for l in p.stdout.readlines()]
 | |
|     Logger.debug('Received MYSQL resultset: {0!r}\n'
 | |
|                  ''.format(''.join(resultset)), True)
 | |
|     return _parse_mysql_result(resultset)
 | |
| 
 | |
| 
 | |
| def _parse_mysql_result(resultset):
 | |
|     result, header = [], None
 | |
|     for line in (l for l in resultset if MYSQL_SEPARATOR in l):
 | |
|         columns = [c.strip() for c in line[1:-3].split(MYSQL_SEPARATOR)]
 | |
|         if header is None:
 | |
|             header = columns
 | |
|         else:
 | |
|             result.append(dict((header[i], v if v != 'NULL' else None)
 | |
|                                for i, v in enumerate(columns)))
 | |
|     return result
 | |
| 
 | |
| 
 | |
| def run_pgsql_query(query, path):
 | |
|     p = subprocess.Popen([path] + PGSQL_PARAMS + [query.encode('utf-8')],
 | |
|                          stdout=subprocess.PIPE, env=PGSQL_ENVIRONMENT)
 | |
|     Logger.debug('Sent PostgreSQL query: {0!r}\n'.format(query))
 | |
|     resultset = [l.decode('utf-8') for l in p.stdout.readlines()]
 | |
|     Logger.debug('Received PostgreSQL resultset: {0!r}\n'
 | |
|                  ''.format(''.join(resultset)), True)
 | |
|     return _parse_pgsql_result(resultset)
 | |
| 
 | |
| 
 | |
| def _parse_pgsql_result(resultset):
 | |
|     result, header = [], None
 | |
|     for line in (l for l in resultset if PGSQL_SEPARATOR in l):
 | |
|         columns = [c.strip() for c in line.split(PGSQL_SEPARATOR)]
 | |
|         if header is None:
 | |
|             header = columns
 | |
|         else:
 | |
|             result.append(dict((header[i], v) for i, v in enumerate(columns)))
 | |
|     return result
 | |
| 
 | |
| 
 | |
| class LiveStatusError(Exception):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| class LiveStatusSocket(object):
 | |
|     options = [
 | |
|         'KeepAlive: on',
 | |
|         'OutputFormat: json',
 | |
|         'ResponseHeader: fixed16'
 | |
|         ]
 | |
| 
 | |
|     def __init__(self, path):
 | |
|         self.path = path
 | |
| 
 | |
|         self._connected = False
 | |
| 
 | |
|     def __enter__(self):
 | |
|         self.connect()
 | |
|         return self
 | |
| 
 | |
|     def __exit__(self, exc_type, exc_value, tb):
 | |
|         self.close()
 | |
| 
 | |
|     def connect(self):
 | |
|         self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
 | |
|         Logger.debug('Opened UNIX stream socket\n', True)
 | |
|         self.sock.connect(self.path)
 | |
|         Logger.debug('Connected to Livestatus socket: {0}\n'.format(self.path),
 | |
|                      True)
 | |
|         self._connected = True
 | |
| 
 | |
|     def reconnect(self, timeout=30):
 | |
|         Logger.debug('Reconnecting to Livestatus socket\n', True)
 | |
|         start = time.time()
 | |
|         while not self._connected and time.time() - start < timeout:
 | |
|             try:
 | |
|                 self.connect()
 | |
|             except socket.error, error:
 | |
|                 Logger.debug('Could not connect: {0}\n'.format(error), True)
 | |
|                 # Icinga2 does some "magic" with the socket during startup
 | |
|                 # which causes random errors being raised (EACCES, ENOENT, ..)
 | |
|                 # so we just ignore them until the timeout is reached
 | |
|                 time.sleep(1)
 | |
|         if not self._connected:
 | |
|             # Raise the very last exception once the timeout is reached
 | |
|             raise
 | |
| 
 | |
|     def close(self):
 | |
|         if self._connected:
 | |
|             self.sock.shutdown(socket.SHUT_RDWR)
 | |
|             Logger.debug('Shutted down Livestatus connection\n', True)
 | |
|             self.sock.close()
 | |
|             Logger.debug('Closed Livestatus socket\n', True)
 | |
|             self._connected = False
 | |
| 
 | |
|     def query(self, command):
 | |
|         self.send(command)
 | |
|         statuscode, response = self.recv()
 | |
| 
 | |
|         if statuscode != 200:
 | |
|             raise LiveStatusError(statuscode, response)
 | |
| 
 | |
|         return response
 | |
| 
 | |
|     def send(self, query):
 | |
|         if not self._connected:
 | |
|             raise RuntimeError('Tried to write to closed socket')
 | |
| 
 | |
|         full_query = '\n'.join([query] + self.options)
 | |
|         self.sock.sendall((full_query + '\n\n').encode('utf-8'))
 | |
|         Logger.debug('Sent Livestatus query: {0!r}\n'.format(full_query))
 | |
| 
 | |
|     def recv(self):
 | |
|         if not self._connected:
 | |
|             raise RuntimeError('Tried to read from closed socket')
 | |
| 
 | |
|         response = b''
 | |
|         response_header = self.sock.recv(16)
 | |
|         response_code = int(response_header[:3])
 | |
|         response_length = int(response_header[3:].strip())
 | |
| 
 | |
|         if response_length > 0:
 | |
|             while len(response) < response_length:
 | |
|                 response += self.sock.recv(response_length - len(response))
 | |
| 
 | |
|             response = response.decode('utf-8')
 | |
| 
 | |
|             try:
 | |
|                 response = json.loads(response)
 | |
|             except ValueError:
 | |
|                 pass
 | |
| 
 | |
|         Logger.debug('Received Livestatus response: {0!r} (Header was: {1!r})'
 | |
|                      '\n'.format(response, response_header), True)
 | |
|         return response_code, response
 | |
| 
 | |
| 
 | |
| class Logger(object):
 | |
|     INFO = 1
 | |
|     OK = 2
 | |
|     FAIL = 3
 | |
|     ERROR = 4
 | |
|     DEBUG_STD = 5
 | |
|     DEBUG_EXT = 6
 | |
| 
 | |
|     VERBOSITY = None
 | |
| 
 | |
|     @classmethod
 | |
|     def permitted(cls, severity):
 | |
|         if cls.VERBOSITY is None:
 | |
|             cls.VERBOSITY = next((int(sys.argv.pop(i).partition('=')[2])
 | |
|                                   for i, a in enumerate(sys.argv)
 | |
|                                   if a.startswith('--verbosity=')), 1)
 | |
| 
 | |
|         return (severity == cls.INFO and cls.VERBOSITY >= 1) or \
 | |
|                (severity == cls.OK and cls.VERBOSITY >= 1) or \
 | |
|                (severity == cls.FAIL and cls.VERBOSITY >= 1) or \
 | |
|                (severity == cls.ERROR and cls.VERBOSITY >= 1) or \
 | |
|                (severity == cls.DEBUG_STD and cls.VERBOSITY >= 2) or \
 | |
|                (severity == cls.DEBUG_EXT and cls.VERBOSITY >= 3)
 | |
| 
 | |
|     @staticmethod
 | |
|     def write(text, stderr=False):
 | |
|         if stderr:
 | |
|             sys.stderr.write(text)
 | |
|             sys.stderr.flush()
 | |
|         else:
 | |
|             sys.stdout.write(text)
 | |
|             sys.stdout.flush()
 | |
| 
 | |
|     @classmethod
 | |
|     def log(cls, severity, text):
 | |
|         if severity == cls.INFO and cls.permitted(cls.INFO):
 | |
|             cls.write('\x00[INFO] {0}'.format(text))
 | |
|         elif severity == cls.ERROR and cls.permitted(cls.ERROR):
 | |
|             cls.write('\x00[ERROR] {0}'.format(text))
 | |
|         elif severity == cls.FAIL and cls.permitted(cls.FAIL):
 | |
|             cls.write('\x00[FAIL] {0}'.format(text))
 | |
|         elif severity == cls.OK and cls.permitted(cls.OK):
 | |
|             cls.write('\x00[OK] {0}'.format(text))
 | |
|         elif severity == cls.DEBUG_STD and cls.permitted(cls.DEBUG_STD):
 | |
|             cls.write('\x00\x00[DEBUG] {0}'.format(text))
 | |
|         elif severity == cls.DEBUG_EXT and cls.permitted(cls.DEBUG_EXT):
 | |
|             cls.write('\x00\x00\x00\x00[DEBUG] {0}'.format(text))
 | |
|         else:
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     @classmethod
 | |
|     def info(cls, text):
 | |
|         return cls.log(cls.INFO, text)
 | |
| 
 | |
|     @classmethod
 | |
|     def error(cls, text):
 | |
|         return cls.log(cls.ERROR, text)
 | |
| 
 | |
|     @classmethod
 | |
|     def fail(cls, text):
 | |
|         return cls.log(cls.FAIL, text)
 | |
| 
 | |
|     @classmethod
 | |
|     def ok(cls, text):
 | |
|         return cls.log(cls.OK, text)
 | |
| 
 | |
|     @classmethod
 | |
|     def debug(cls, text, extended=False):
 | |
|         return cls.log(cls.DEBUG_EXT if extended else cls.DEBUG_STD, text)
 | |
| 
 |