#!/usr/bin/env python from __future__ import unicode_literals import os import sys import time import utils LS_HOST_COLUMNS = [ 'name', 'name', 'display_name', 'display_name', None, 'state', 'state_type', 'current_attempt', 'max_check_attempts', None, 'last_state', None, 'last_state_change', None, 'latency', 'execution_time', 'plugin_output', None, 'last_check', 'address', 'address6' ] LS_SVC_COLUMNS = [ 'description', 'display_name', 'display_name', None, 'state', 'state_type', 'current_attempt', 'max_check_attempts', None, 'last_state', None, 'last_state_change', None, 'latency', 'execution_time', 'plugin_output', 'perf_data', 'last_check', 'host_num_services', 'host_num_services_ok', 'host_num_services_warn', 'host_num_services_unknown', 'host_num_services_crit' ] STATE_MAP = { 'SOFT': 0, 'HARD': 1 } def send_command(command): try: return send_query('COMMAND [{0}] {1}'.format(int(time.time()), command)) except utils.LiveStatusError, error: sys.stderr.write('Failed to execute command: {0}\n\n{1}\n' ''.format(command, error)) def send_query(query): response = LIVESTATUS.query(query + '\nColumnHeaders: on') if response: header, result = response.pop(0), {} return [dict((header[i], v) for i, v in enumerate(r)) for r in response] return [] def get_one(query): return next(iter(send_query(query)), {}) def get_event_output(): try: with open('/tmp/test_event.out') as f: remove = True return f.read().rstrip().split('|') except (IOError, OSError): remove = False finally: if remove: os.system('sudo rm -f /tmp/test_event.out') def convert_output(value): try: return int(value) except ValueError: try: return float(value) except ValueError: return STATE_MAP.get(value, value) def success(msg): print '[OK] {0}'.format(msg).encode('utf-8') def fail(msg): print '[FAIL] {0}'.format(msg).encode('utf-8') def info(msg): print '[INFO] {0}'.format(msg).encode('utf-8') def main(): send_command('CHANGE_HOST_EVENT_HANDLER;localhost;test_event') host_info = get_one('GET hosts\nFilter: name = localhost' '\nColumns: event_handler') if host_info.get('event_handler') != 'test_event': fail('Could not assign eventcommand "test_event" to host "localhost"') return 1 success('Successfully assigned an eventcommand to host "localhost"') send_command('PROCESS_HOST_CHECK_RESULT;localhost;1;A negative result to' ' trigger an eventhandler|some interesting perfdata!') event_output = get_event_output() if not event_output: send_command('CHANGE_HOST_EVENT_HANDLER;localhost;') fail('Could not trigger the eventcommand') return 1 success('Successfully triggered the eventcommand') failure = False info('Checking host macros...') host_info = get_one('GET hosts\nFilter: name = localhost\nColumns: {0}' ''.format(' '.join(c for c in LS_HOST_COLUMNS if c))) if event_output[0] != host_info['name']*2: failure = True fail('Escaping environment variables seems not to properly working') fail(' Expected: {0!r} Got: {1!r}'.format(host_info['name']*2, event_output[0])) for i, column in enumerate(LS_HOST_COLUMNS[1:], 1): if column is not None: output_value = convert_output(event_output[i]) if output_value != host_info[column]: failure = True fail('{0!r} is unequal to {1!r} ({2})'.format(output_value, host_info[column], column)) info('Checking service macros...') svc_info = get_one('GET services\nFilter: description = ping4\nColumns: {0}' ''.format(' '.join(c for c in LS_SVC_COLUMNS if c))) for i, column in enumerate(LS_SVC_COLUMNS, len(LS_HOST_COLUMNS)): if column is not None: output_value = convert_output(event_output[i]) if output_value != svc_info[column]: failure = True fail('{0!r} is unequal to {1!r} ({2})'.format(output_value, svc_info[column], column)) info('Checking global macros...') info('Checking command macros...') if convert_output(event_output[-1]) != 1337: failure = True fail('The command macro "custom_macro" is not being substituted') send_command('DISABLE_HOST_EVENT_HANDLER;localhost') send_command('PROCESS_HOST_CHECK_RESULT;localhost;0;A positive result that' ' should not trigger an eventhandler') if get_event_output(): failure = True fail('Could not disable the eventcommand') else: success('Successfully disabled the eventcommand') send_command('ENABLE_HOST_EVENT_HANDLER;localhost') host_info = get_one('GET hosts\nFilter: name = localhost' '\nColumns: event_handler_enabled') if host_info['event_handler_enabled'] != 1: failure = True fail('Could not re-enable the eventcommand') else: success('Successfully re-enabled the eventcommand') send_command('CHANGE_HOST_EVENT_HANDLER;localhost;') host_info = get_one('GET hosts\nFilter: name = localhost' '\nColumns: event_handler') if host_info['event_handler']: failure = True fail('Could not remove eventcommand "test_event"' ' assigned to host "localhost"') else: success('Successfully removed the eventcommand' ' assigned to host "localhost"') return 1 if failure else 0 if __name__ == '__main__': with utils.LiveStatusSocket('/var/run/icinga2/cmd/livestatus') as LIVESTATUS: sys.exit(main())