#!/usr/bin/env python # coding: utf-8 from __future__ import unicode_literals import os import sys import time import random import socket import subprocess try: from subprocess import DEVNULL except ImportError: from os import devnull DEVNULL = open(devnull, 'w') import utils ICINGA_UID = 'icinga' USERNAME = 'Icinga 2 Admin' MAX_CHECK_ATTEMPTS = 3 LIVESTATUS_PATH = '/var/run/icinga2/cmd/livestatus' def send_command(command, quiet=False): try: return send_query('COMMAND [{0}] {1}'.format(int(time.time()), command)) except utils.LiveStatusError, error: if not quiet: sys.stderr.write('Failed to execute command: {0}\n\n{1}'.format(command, error)) def send_query(query): response = LIVESTATUS.query(query + '\nColumnHeaders: on') if response: header, result = response.pop(0), {} return [dict((header[i], v) for i, v in enumerate(r)) for r in response] return [] def run_query(query, retries=3): tries = 0 while True: rs = next(iter(utils.run_mysql_query(query, b'/usr/bin/mysql')), {}) if tries == retries or any(True for v in rs.itervalues() if v is not None): return rs else: tries += 1 time.sleep(1) def get_one(query): return next(iter(send_query(query)), {}) def restart_icinga(): LIVESTATUS.close() subprocess.check_call('sudo service icinga2 restart', shell=True, stdout=DEVNULL) LIVESTATUS.reconnect() def success(msg): print '[OK] {0}'.format(msg).encode('utf-8') sys.stdout.flush() return False def fail(msg): print '[FAIL] {0}'.format(msg).encode('utf-8') sys.stdout.flush() return True def error(msg): print '[ERROR] {0}'.format(msg).encode('utf-8') sys.stdout.flush() return True def test_host_comments(hostname): comment = 'test{0:.4}'.format(random.random()) add_command = 'ADD_HOST_COMMENT;%s;{persistent};{author};{comment}' % hostname del_command = 'DEL_HOST_COMMENT;{id}' comment_query = '\n'.join(['GET comments', 'Filter: type = 1', 'Filter: comment = {comment}', 'Columns: id']) send_command(add_command.format(persistent=0, author=USERNAME, comment=comment)) comment_info = get_one(comment_query.format(comment=comment)) if comment_info: success('Successfully added comment to host "{0}"'.format(hostname)) send_command(del_command.format(id=comment_info['id'])) if get_one(comment_query.format(comment=comment)): return fail('Could not remove comment with id #{0} from host "{1}"' ''.format(comment_info['id'], hostname)) return success('Successfully removed comment from host "{0}"' ''.format(hostname)) return fail('Could not add comment to host "{0}"'.format(hostname)) def test_service_comments(hostname, servicename): comment = 'test{0:.4}'.format(random.random()) add_command = 'ADD_SVC_COMMENT;%s;%s;{persistent};{author}' \ ';{comment}' % (hostname, servicename) del_command = 'DEL_SVC_COMMENT;{id}' comment_query = '\n'.join(['GET comments', 'Filter: type = 2', 'Filter: comment = {comment}', 'Columns: id']) send_command(add_command.format(persistent=0, author=USERNAME, comment=comment)) comment_info = get_one(comment_query.format(comment=comment)) if comment_info: success('Successfully added comment to service "{0}" of host "{1}"' ''.format(servicename, hostname)) send_command(del_command.format(id=comment_info['id'])) if get_one(comment_query.format(comment=comment)): return fail('Could not remove comment with id #{0} from service "{1}"' ''.format(comment_info['id'], servicename)) return success('Successfully removed comment from service "{0}"' ''.format(servicename)) return fail('Could not add comment to service "{0}" of host "{1}"' ''.format(servicename, hostname)) def test_host_downtimes(hostname): comment = 'test{0:.4}'.format(random.random()) start, end = time.time() + 20, time.time() + 320 add_command = 'SCHEDULE_HOST_DOWNTIME;%s;{start};{end};1;0' \ ';0;{author};{comment}' % hostname del_command = 'DEL_HOST_DOWNTIME;{id}' downtime_query = '\n'.join(['GET downtimes', 'Filter: triggered_by = 0', 'Filter: duration = 0', 'Filter: fixed = 1', 'Filter: comment = {comment}', 'Columns: id']) send_command(add_command.format(start=start, end=end, author=USERNAME, comment=comment)) downtime_info = get_one(downtime_query.format(comment=comment)) if downtime_info: success('Successfully scheduled downtime for host "{0}"'.format(hostname)) send_command(del_command.format(id=downtime_info['id'])) if get_one(downtime_query.format(comment=comment)): return fail('Could not remove downtime for host "{0}"'.format(hostname)) return success('Successfully removed downtime for host "{0}"'.format(hostname)) return fail('Could not schedule downtime for host "{0}"'.format(hostname)) def test_service_downtimes(hostname, servicename): comment = 'test{0:.4}'.format(random.random()) start, end = time.time() + 20, time.time() + 320 add_command = 'SCHEDULE_SVC_DOWNTIME;%s;%s;{start};{end};1;0;0' \ ';{author};{comment}' % (hostname, servicename) del_command = 'DEL_SVC_DOWNTIME;{id}' downtime_query = '\n'.join(['GET downtimes', 'Filter: triggered_by = 0', 'Filter: duration = 0', 'Filter: fixed = 1', 'Filter: comment = {comment}', 'Columns: id']) send_command(add_command.format(start=start, end=end, author=USERNAME, comment=comment)) downtime_info = get_one(downtime_query.format(comment=comment)) if downtime_info: success('Successfully scheduled downtime for service "{0}" of host "{1}"' ''.format(servicename, hostname)) send_command(del_command.format(id=downtime_info['id'])) if get_one(downtime_query.format(comment=comment)): return fail('Could not remove downtime for service "{0}" of host "{1}"' ''.format(servicename, hostname)) return success('Successfully removed downtime for service "{0}" of host "{1}"' ''.format(servicename, hostname)) return fail('Could not schedule downtime for service "{0}" of host "{1}"' ''.format(servicename, hostname)) def test_host_problem_acknowledgements(hostname): comment = 'test{0:.4}'.format(random.random()) send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname)) send_command('ACKNOWLEDGE_HOST_PROBLEM;{0};0;0;0;{1};{2}' ''.format(hostname, USERNAME, comment)) ack_info = get_one('GET comments\nFilter: comment = {0}' '\nFilter: entry_type = 4\nColumns: id'.format(comment)) if ack_info: success('Acknowledgement for host "{0}" has been processed'.format(hostname)) host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: acknowledged'.format(hostname)) if host_info['acknowledged'] != 1: return fail('Could not acknowledge problem on host "{0}"'.format(hostname)) else: return fail('Acknowledgement for host "{0}" has not been processed' ''.format(hostname)) success('Successfully acknowledged problem on host "{0}"'.format(hostname)) send_command('PROCESS_HOST_CHECK_RESULT;{0};0;ok'.format(hostname)) if get_one('GET comments\nFilter: id = {0}'.format(ack_info['id'])): return fail('Non sticky acknowledgements are not removed if their host recovers') success('Non sticky acknowledgements are removed if their host recovers') comment = 'test{0:.4}'.format(random.random()) send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname)) send_command('ACKNOWLEDGE_HOST_PROBLEM;{0};1;0;0;{1};{2}' ''.format(hostname, USERNAME, comment)) ack_info = get_one('GET comments\nFilter: comment = {0}' '\nFilter: entry_type = 4\nColumns: id'.format(comment)) send_command('PROCESS_HOST_CHECK_RESULT;{0};0;ok'.format(hostname)) if not get_one('GET comments\nFilter: id = {0}'.format(ack_info['id'])): return fail('Sticky acknowledgements are removed if their host recovers') send_command('DEL_HOST_COMMENT;{0}'.format(ack_info['id'])) success('Sticky acknowledgements need to be manually' ' removed once their host has recovered') host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: acknowledged state' ''.format(hostname)) if host_info['state'] == 0 and host_info['acknowledged'] == 1: return fail('Host "{0}" is still acknowledged though it is not DOWN anymore' ''.format(hostname)) return success('Hosts are not acknowledged anymore once their state changes') def test_remove_host_acknowledgements(hostname): comment = 'test{0:.4}'.format(random.random()) send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname)) send_command('ACKNOWLEDGE_HOST_PROBLEM;{0};0;0;0;{1};{2}' ''.format(hostname, USERNAME, comment)) ack_info = get_one('GET comments\nFilter: comment = {0}\nColumns: id' ''.format(comment)) if not ack_info: return fail('Acknowledgement for host "{0}" has not been processed' ''.format(hostname)) send_command('REMOVE_HOST_ACKNOWLEDGEMENT;{0}'.format(hostname)) if get_one('GET comments\nFilter: id = {0}'.format(ack_info['id'])): return fail('Acknowledgement related comments are not removed' ' when manually removing an acknowledgement') return success('Acknowledgement related comments are removed when' ' manually removing an acknowledgement') def test_expiring_host_acknowledgements(hostname): comment, expire_time = 'test{0:.4}'.format(random.random()), time.time() + 5 send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname)) send_command('ACKNOWLEDGE_HOST_PROBLEM_EXPIRE;{0};0;0;0;{1};{2};{3}' ''.format(hostname, expire_time, USERNAME, comment)) time.sleep(expire_time - time.time()) host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: acknowledged' ''.format(hostname)) if host_info['acknowledged'] == 1: send_command('REMOVE_HOST_ACKNOWLEDGEMENT;{0}'.format(hostname)) return fail('Acknowledgements do not expire as desired') return success('Acknowledgements do expire as desired') def test_change_host_check_command(hostname, command): old_command = get_one('GET hosts\nFilter: name = {0}\nColumns: check_command' ''.format(hostname))['check_command'] send_command('CHANGE_HOST_CHECK_COMMAND;{0};{1}'.format(hostname, command)) host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: check_command' ''.format(hostname)) if host_info['check_command'] != command: return fail('Could not change the check command of host "{0}" to "{1}"' ''.format(hostname, command)) send_command('CHANGE_HOST_CHECK_COMMAND;{0};{1}'.format(hostname, old_command)) return success('Successfully changed check command of host "{0}" to "{1}"' ''.format(hostname, command)) def test_change_host_check_timeperiod(hostname, period): old_period = get_one('GET hosts\nFilter: name = {0}\nColumns: check_period' ''.format(hostname))['check_period'] send_command('CHANGE_HOST_CHECK_TIMEPERIOD;{0};{1}'.format(hostname, period)) host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: check_period' ''.format(hostname)) if host_info['check_period'] != period: return fail('Could not change the check period of host "{0}" to "{1}"' ''.format(hostname, period)) send_command('CHANGE_HOST_CHECK_TIMEPERIOD;{0};{1}'.format(hostname, period)) return success('Successfully changed check period of host "{0}" to "{1}"' ''.format(hostname, period)) def test_change_host_modified_attributes(hostname): send_command('CHANGE_MAX_HOST_CHECK_ATTEMPTS;{0};1337'.format(hostname)) restart_icinga() host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: max_check_attempts'.format(hostname)) if host_info['max_check_attempts'] != 1337: return fail('Modified attributes of host "{0}" seem not to be ' 'persisted between application restarts'.format(hostname)) else: success('Modified attributes are being persisted' ' between application restarts') send_command('CHANGE_HOST_MODATTR;{0};0'.format(hostname)) restart_icinga() host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: max_check_attempts'.format(hostname)) if host_info['max_check_attempts'] == 1337: return fail('Re-setting the "modified attributes" value does not' ' seem to invalidate any modified attributes') return success('Re-setting the "modified attributes" value seems to' ' invalidate the modified attributes') def test_change_host_max_check_attempts(hostname): old_value = get_one('GET hosts\nFilter: name = {0}' '\nColumns: max_check_attempts' ''.format(hostname))['max_check_attempts'] send_command('CHANGE_MAX_HOST_CHECK_ATTEMPTS;{0};1234'.format(hostname)) host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: max_check_attempts'.format(hostname)) if host_info['max_check_attempts'] != 1234: return fail('Could not change the maximum number of check' ' attempts for host "{0}"'.format(hostname)) send_command('CHANGE_MAX_HOST_CHECK_ATTEMPTS;{0};{1}'.format(hostname, old_value)) return success('Successfully changed maximum number of check' ' attempts for host "{0}"'.format(hostname)) def test_change_service_max_check_attempts(hostname, servicename): old_value = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: max_check_attempts' ''.format(hostname, servicename))['max_check_attempts'] send_command('CHANGE_MAX_SVC_CHECK_ATTEMPTS;{0};{1};1234' ''.format(hostname, servicename)) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: max_check_attempts'.format(hostname, servicename)) if svc_info['max_check_attempts'] != 1234: return fail('Could not change the maximum number of check attempts for ' 'service "{0}" on host "{1}"'.format(servicename, hostname)) send_command('CHANGE_MAX_SVC_CHECK_ATTEMPTS;{0};{1};{2}' ''.format(hostname, servicename, old_value)) return success('Successfully changed maximum number of check attempts ' 'for service "{0}" on host "{1}"'.format(servicename, hostname)) def test_change_host_normal_check_interval(hostname): old_value = get_one('GET hosts\nFilter: name = {0}\nColumns: check_interval' ''.format(hostname))['check_interval'] send_command('CHANGE_NORMAL_HOST_CHECK_INTERVAL;{0};7200'.format(hostname)) host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: check_interval' ''.format(hostname)) if host_info['check_interval'] != 7200: return fail('Could not change the check interval of host "{0}"' ''.format(hostname)) send_command('CHANGE_NORMAL_HOST_CHECK_INTERVAL;{0};{1}'.format(hostname, old_value)) return success('Successfully changed check interval of host "{0}"' ''.format(hostname)) def test_change_service_normal_check_interval(hostname, servicename): old_value = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: check_interval' ''.format(hostname, servicename))['check_interval'] send_command('CHANGE_NORMAL_SVC_CHECK_INTERVAL;{0};{1};7200' ''.format(hostname, servicename)) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: check_interval' ''.format(hostname, servicename)) if svc_info['check_interval'] != 7200: return fail('Could not change the check interval of service "{0}"' ' on host "{1}"'.format(servicename, hostname)) send_command('CHANGE_NORMAL_SVC_CHECK_INTERVAL;{0};{1};{2}' ''.format(hostname, servicename, old_value)) return success('Successfully changed check interval of service ' '"{0}" on host "{1}"'.format(servicename, hostname)) def test_change_host_retry_check_interval(hostname): old_value = get_one('GET hosts\nFilter: name = {0}\nColumns: retry_interval' ''.format(hostname))['retry_interval'] send_command('CHANGE_RETRY_HOST_CHECK_INTERVAL;{0};3600'.format(hostname)) host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: retry_interval' ''.format(hostname)) if host_info['retry_interval'] != 3600: return fail('Could not change the retry interval of host "{0}"' ''.format(hostname)) send_command('CHANGE_RETRY_HOST_CHECK_INTERVAL;{0};{1}'.format(hostname, old_value)) return success('Successfully changed retry interval of host "{0}"' ''.format(hostname)) def test_change_service_retry_check_interval(hostname, servicename): old_value = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: retry_interval' ''.format(hostname, servicename))['retry_interval'] send_command('CHANGE_RETRY_SVC_CHECK_INTERVAL;{0};{1};3600' ''.format(hostname, servicename)) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: retry_interval' ''.format(hostname, servicename)) if svc_info['retry_interval'] != 3600: return fail('Could not change retry interval of service "{0}"' ' on host "{1}"'.format(servicename, hostname)) send_command('CHANGE_RETRY_SVC_CHECK_INTERVAL;{0};{1};{2}' ''.format(hostname, servicename, old_value)) return success('Successfully changed retry interval of service "{0}"' ' on host "{1}"'.format(servicename, hostname)) def test_change_service_check_command(hostname, servicename, command): old_value = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: check_command' ''.format(hostname, servicename))['check_command'] send_command('CHANGE_SVC_CHECK_COMMAND;{0};{1};{2}' ''.format(hostname, servicename, command)) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: check_command' ''.format(hostname, servicename)) if svc_info['check_command'] != command: return fail('Could not change the check command of service "{0}" on ' 'host "{1}" to "{2}"'.format(servicename, hostname, command)) send_command('CHANGE_SVC_CHECK_COMMAND;{0};{1};{2}' ''.format(hostname, servicename, old_value)) return success('Successfully changed check command of service "{0}" on' ' host "{1}"'.format(servicename, hostname)) def test_change_service_check_timeperiod(hostname, servicename, period): old_value = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: check_period' ''.format(hostname, servicename))['check_period'] send_command('CHANGE_SVC_CHECK_TIMEPERIOD;{0};{1};{2}' ''.format(hostname, servicename, period)) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: check_period' ''.format(hostname, servicename)) if svc_info['check_period'] != period: return fail('Could not change the check period of service "{0}" on host' ' "{1}" to "{2}"'.format(servicename, hostname, period)) send_command('CHANGE_SVC_CHECK_TIMEPERIOD;{0};{1};{2}' ''.format(hostname, servicename, old_value)) return success('Successfully changed check period of service "{0}"' ' on host "{1}"'.format(servicename, hostname)) def test_delay_host_notification(hostname): # This test assumes that the notification interval # for the given host is set to 10 seconds send_command('DISABLE_HOST_CHECK;{0}'.format(hostname)) for _ in xrange(MAX_CHECK_ATTEMPTS): send_command('PROCESS_HOST_CHECK_RESULT;{0};1;down'.format(hostname)) state_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: state state_type last_hard_state_change' ''.format(hostname)) if state_info['state'] != 1 or state_info['state_type'] != 1: send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) return fail('Could not switch state of host "{0}"' ' to DOWN (HARD)'.format(hostname)) time.sleep(1) note_info1 = get_one('GET hosts\nFilter: name = {0}' '\nColumns: last_notification'.format(hostname)) if not -1 < (note_info1['last_notification'] - \ state_info['last_hard_state_change']) < 1: send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) return fail('Switching host "{0}" to state DOWN (HARD) does not ' 'cause a notification being sent out'.format(hostname)) send_command('DELAY_HOST_NOTIFICATION;{0};{1}' ''.format(hostname, note_info1['last_notification'] + 20)) time.sleep(5 + 10 - (time.time() - note_info1['last_notification'])) note_info2 = get_one('GET hosts\nFilter: name = {0}' '\nColumns: last_notification'.format(hostname)) if note_info2['last_notification'] != note_info1['last_notification']: send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) return fail('There were a notification being sent out for host ' '"{0}" before the time that it should be delayed to' ''.format(hostname)) time.sleep(5 + note_info1['last_notification'] + 20 - time.time()) send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) note_info3 = get_one('GET hosts\nFilter: name = {0}' '\nColumns: last_notification'.format(hostname)) if note_info3['last_notification'] < note_info1['last_notification'] + 20: return fail('Could not delay notification for' ' host "{0}"'.format(hostname)) return success('Successfully delayed notification' ' for host "{0}"'.format(hostname)) def test_delay_service_notification(hostname, servicename): # This test assumes that the notification interval # for the given service is set to 10 seconds send_command('DISABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) for _ in xrange(MAX_CHECK_ATTEMPTS): send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};2;critical' ''.format(hostname, servicename)) state_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: state state_type last_hard_state_change' ''.format(hostname, servicename)) if state_info['state'] != 2 or state_info['state_type'] != 1: send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) return fail('Could not switch state of service "{0}" on host "{1}"' ' to CRITICAL (HARD)'.format(servicename, hostname)) time.sleep(1) note_info1 = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: last_notification' ''.format(hostname, servicename)) if not -1 < (note_info1['last_notification'] - \ state_info['last_hard_state_change']) < 1: send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) return fail('Switching service "{0}" on host "{1}" to state CRITICAL ' '(HARD) does not cause a notification being sent out' ''.format(servicename, hostname)) send_command('DELAY_SVC_NOTIFICATION;{0};{1};{2}' ''.format(hostname, servicename, note_info1['last_notification'] + 20)) time.sleep(5 + 10 - (time.time() - note_info1['last_notification'])) note_info2 = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: last_notification' ''.format(hostname, servicename)) if note_info2['last_notification'] != note_info1['last_notification']: send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) return fail('There were a notification being sent out for service "{0}" ' 'on host "{1}" before the time that it should be delayed to' ''.format(servicename, hostname)) time.sleep(5 + note_info1['last_notification'] + 20 - time.time()) send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) note_info3 = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: last_notification' ''.format(hostname, servicename)) if note_info3['last_notification'] < note_info1['last_notification'] + 20: return fail('Could not delay notification for service "{0}"' ' on host "{0}"'.format(servicename, hostname)) return success('Successfully delayed notification for service "{0}"' ' on host "{0}"'.format(servicename, hostname)) def test_disabling_scheduling_host_checks(hostname): # This test assumes that icinga is writing the hostchecks db table (DbCatCheck) compare_time = lambda a, b: -2 < a - b < 2 start_time_query = ('SELECT UNIX_TIMESTAMP(MAX(start_time)) as start_time' ' FROM icinga_hostchecks AS c' ' INNER JOIN icinga_hosts AS h' ' ON h.host_object_id = c.host_object_id' ' WHERE h.alias = "{0}"' '').format(hostname) # Need to use a sql query here because prior to this test another one ran # which submitted a passive checkresult and with livestatus it is only # possible to access the execution of the last result, which in this case # is the passive one without a start and end time. execution_time = float(run_query('SELECT MAX(c.execution_time) AS e_time' ' FROM icinga_hostchecks AS c' ' INNER JOIN icinga_hosts AS h' ' ON h.host_object_id = c.host_object_id' ' WHERE h.alias = "{0}"' ''.format(hostname)).get('e_time', -1)) if execution_time == -1: return fail('Unable to fetch the maximum execution time of' ' host "{0}" from the IDO'.format(hostname)) send_command('DISABLE_HOST_CHECK;{0}'.format(hostname)) scheduled_check = time.time() + 4 send_command('SCHEDULE_HOST_CHECK;{0};{1}'.format(hostname, scheduled_check)) time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval last_check = float(run_query(start_time_query).get('start_time') or 0) if compare_time(last_check, scheduled_check + execution_time): send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) return fail('Could not disable active checks on host "{0}"' ''.format(hostname)) success('Successfully disabled active checks on host "{0}"'.format(hostname)) scheduled_check = time.time() + 4 send_command('SCHEDULE_FORCED_HOST_CHECK;{0};{1}' ''.format(hostname, scheduled_check)) time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval last_check = float(run_query(start_time_query).get('start_time') or 0) if not compare_time(last_check, scheduled_check + execution_time): send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) return fail('Could not schedule a forced check on host "{0}" while' ' active checks were disabled'.format(hostname)) success('Successfully scheduled a forced check on host "{0}" ' 'while active checks were disabled'.format(hostname)) send_command('ENABLE_HOST_CHECK;{0}'.format(hostname)) scheduled_check = time.time() + 4 send_command('SCHEDULE_HOST_CHECK;{0};{1}' ''.format(hostname, scheduled_check)) time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval last_check = float(run_query(start_time_query).get('start_time') or 0) if not compare_time(last_check, scheduled_check + execution_time): return fail('Could not schedule a check after re-enabling ' 'active checks for host "{0}"'.format(hostname)) return success('Successfully scheduled a check on host "{0}" after' ' re-enabling active checks'.format(hostname)) def test_disabling_scheduling_service_checks(hostname, servicename): # This test assumes that icinga is writing the servicechecks db table (DbCatCheck) start_time_query = ('SELECT UNIX_TIMESTAMP(MAX(start_time)) as start_time' ' FROM icinga_servicechecks AS c' ' INNER JOIN icinga_services AS s' ' ON s.service_object_id = c.service_object_id' ' INNER JOIN icinga_hosts as h' ' ON h.host_object_id = s.host_object_id' ' WHERE h.alias = "{0}" AND s.display_name = "{1}"' '').format(hostname, servicename) compare_time = lambda a, b: -2 < a - b < 2 execution_time = float(run_query('SELECT MAX(c.execution_time) AS e_time' ' FROM icinga_servicechecks AS c' ' INNER JOIN icinga_services AS s' ' ON s.service_object_id = c.service_object_id' ' INNER JOIN icinga_hosts as h' ' ON h.host_object_id = s.host_object_id' ' WHERE h.alias = "{0}" AND s.display_name = "{1}"' ''.format(hostname, servicename)).get('e_time', -1)) if execution_time == -1: return fail('Unable to fetch the maximum execution time of service "{0}"' ' on host "{1}" from the IDO'.format(hostname, servicename)) send_command('DISABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) scheduled_check = time.time() + 4 send_command('SCHEDULE_SVC_CHECK;{0};{1};{2}'.format(hostname, servicename, scheduled_check)) time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval last_check = float(run_query(start_time_query).get('start_time') or 0) if compare_time(last_check, scheduled_check + execution_time): send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) return fail('Could not disable active checks for service "{0}" on host' ' "{1}"'.format(servicename, hostname)) success('Successfully disabled active checks for service "{0}" on host' '"{1}"'.format(servicename, hostname)) scheduled_check = time.time() + 4 send_command('SCHEDULE_FORCED_SVC_CHECK;{0};{1};{2}' ''.format(hostname, servicename, scheduled_check)) time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval last_check = float(run_query(start_time_query).get('start_time') or 0) if not compare_time(last_check, scheduled_check + execution_time): send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) return fail('Could not schedule a forced check for service "{0}"' ' on host "{1}" while active checks were disabled' ''.format(servicename, hostname)) success('Successfully scheduled a forced check for service "{0}" ' 'on host "{1}" while active checks were disabled' ''.format(servicename, hostname)) send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename)) scheduled_check = time.time() + 4 send_command('SCHEDULE_SVC_CHECK;{0};{1};{2}' ''.format(hostname, servicename, scheduled_check)) time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval last_check = float(run_query(start_time_query).get('start_time') or 0) if not compare_time(last_check, scheduled_check + execution_time): return fail('Could not schedule a check after re-enabling ' 'active checks for service "{0}" on host "{1}"' ''.format(servicename, hostname)) return success('Successfully scheduled a check for service "{0}" ' 'on host "{1}" after re-enabling active checks' ''.format(servicename, hostname)) def test_shutdown_restart_process(): get_pid = lambda: subprocess.Popen("ps ax | grep icinga[2] | awk '{print $1}'", shell=1, stdout=subprocess.PIPE).stdout.read() send_command('SHUTDOWN_PROCESS') time.sleep(2) if get_pid(): return fail('Could not stop the icinga2 process by using SHUTDOWN_PROCESS') success('Successfully stopped the icinga2 process with SHUTDOWN_PROCESS') restart_icinga() pid_before_restart = int(get_pid()) send_command('RESTART_PROCESS') LIVESTATUS.close() time.sleep(6) pid_after_restart = get_pid() if not pid_after_restart: restart_icinga() return fail('RESTART_PROCESS seems to just stop the process') if int(pid_after_restart) == pid_before_restart: return fail('RESTART_PROCESS does not restart the process at all') LIVESTATUS.reconnect() return success('Successfully restarted process by using RESTART_PROCESS') def test_passive_host_checkresult_processing(hostname): send_command('PROCESS_HOST_CHECK_RESULT;{0};1;blub'.format(hostname)) host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: state plugin_output'.format(hostname)) if host_info['state'] != 1: return fail('Could not submit a passive checkresult for host "{0}". Wrong' ' state: {1} != 1'.format(hostname, host_info['state'])) if host_info['plugin_output'] != 'blub': return fail('Could not submit a passive checkresult for host "{0}". Wrong' ' output: "{1}" != "blub"'.format(hostname, host_info['plugin_output'])) success('Successfully submitted a passive checkresult for host "{0}"' ''.format(hostname)) send_command('DISABLE_PASSIVE_HOST_CHECKS;{0}'.format(hostname)) send_command('PROCESS_HOST_CHECK_RESULT;{0};0;nope'.format(hostname), True) host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: state'.format(hostname)) if host_info['state'] == 0: send_command('ENABLE_PASSIVE_HOST_CHECKS;{0}'.format(hostname)) return fail('Submitted a passive checkresult for host "{0}" though the' ' submission of passive checkresults has been disabled' ''.format(hostname)) success('It is not possible to submit passive host checkresults' ' while their submission has been disabled') send_command('ENABLE_PASSIVE_HOST_CHECKS;{0}'.format(hostname)) send_command('PROCESS_HOST_CHECK_RESULT;{0};2;nope²'.format(hostname), True) host_info = get_one('GET hosts\nFilter: name = {0}' '\nColumns: plugin_output'.format(hostname)) if host_info['plugin_output'] == 'nope²': return fail('It is possible to submit a passive host checkresult' ' whose status code is 2 (UNREACHABLE)') return success('It is not possible to submit a passive host checkresult' ' whose status code is 2 (UNREACHABLE)') def test_passive_svc_checkresult_processing(hostname, servicename): send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};1;blub' ''.format(hostname, servicename)) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: state plugin_output' ''.format(hostname, servicename)) if svc_info['state'] != 1: return fail('Could not submit a passive checkresult for service' ' "{0}" on host "{1}". Wrong state: {2} != 1' ''.format(servicename, hostname, svc_info['state'])) if svc_info['plugin_output'] != 'blub': return fail('Could not submit a passive checkresult for service' ' "{0}" on host "{1}". Wrong output: "{2}" != "blub"' ''.format(servicename, hostname, svc_info['plugin_output'])) success('Successfully submitted a passive checkresult for service' ' "{0}" on host "{1}"'.format(servicename, hostname)) send_command('DISABLE_PASSIVE_SVC_CHECKS;{0};{1}' ''.format(hostname, servicename)) send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};0;nope' ''.format(hostname, servicename), True) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}\nColumns: state' ''.format(hostname, servicename)) if svc_info['state'] == 0: send_command('ENABLE_PASSIVE_SVC_CHECKS;{0};{1}' ''.format(hostname, servicename)) return fail('Submitted a passive checkresult for service "{0}" on host ' '"{1}" though the submission of passive checkresults has ' 'been disabled'.format(servicename, hostname)) success('It is not possible to submit passive service checkresults' ' while their submission has been disabled') send_command('ENABLE_PASSIVE_SVC_CHECKS;{0};{1}' ''.format(hostname, servicename)) send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};0;yäh!' ''.format(hostname, servicename), True) svc_info = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}\nColumns: plugin_output' ''.format(hostname, servicename)) if svc_info['plugin_output'] != 'yäh!': return fail('It is not possible to submit a passive service checkresult' ' whose output data contains non-ascii characters') return success('It is possible to submit a passive service checkresult' ' whose output data contains non-ascii characters') def test_process_file(hostname): with open('/tmp/comments.txt', 'w') as f: f.write("""[{2}] ADD_HOST_COMMENT;{0};0;{1};TheFirst [{3}] ADD_HOST_COMMENT;{0};0;{1};TheSecond [{4}] ADD_HOST_COMMENT;{0};0;{1};TheThird [{5}] ADD_HOST_COMMENT;{0};0;{1};TheFourth [{6}] ADD_HOST_COMMENT;{0};0;{1};TheFifth """.format(hostname, USERNAME, *[time.time() + i for i in xrange(4, 9)])) os.system('sudo chown {0} /tmp/comments.txt'.format(ICINGA_UID)) send_command('PROCESS_FILE;{0};1'.format('/tmp/comments.txt')) try: with open('/tmp/comments.txt') as f: pass except IOError: success('Files processed by PROCESS_FILE are deleted if' ' the delete-option is set to a non-zero value') else: os.remove('/tmp/comments.txt') return fail('Files processed by PROCESS_FILE are not deleted if' ' the delete-option is set to an non-zero value') comments = send_query('GET comments\nColumns: id\nFilter: comment ~' ' ^The(First|Second|Third|Fourth|Fifth)$') for comment in comments: send_command('DEL_HOST_COMMENT;{0}'.format(comment['id'])) if len(comments) < 5: return fail('Not all commands were processed by PROCESS_FILE') return success('All commands were processed by PROCESS_FILE') def test_custom_host_notifications(hostname): check_time = lambda a, b: -1 < (a - b) < 1 send_command('DISABLE_HOST_NOTIFICATIONS;{0}'.format(hostname)) sent_at = time.time() send_command('SEND_CUSTOM_HOST_NOTIFICATION;{0};0;{1};test' ''.format(hostname, USERNAME)) last_notification = get_one('GET hosts\nFilter: name = {0}' '\nColumns: last_notification' ''.format(hostname))['last_notification'] if check_time(last_notification, sent_at): send_command('ENABLE_HOST_NOTIFICATIONS;{0}'.format(hostname)) return fail('Sent a custom host notification for host "{0}" while' ' notifications were disabled'.format(hostname)) success('It is not possible to send custom host notifications for host' ' "{0}" while notifications are disabled'.format(hostname)) sent_at = time.time() send_command('SEND_CUSTOM_HOST_NOTIFICATION;{0};2;{1};test' ''.format(hostname, USERNAME)) last_notification = get_one('GET hosts\nFilter: name = {0}' '\nColumns: last_notification' ''.format(hostname))['last_notification'] if not check_time(last_notification, sent_at): send_command('ENABLE_HOST_NOTIFICATIONS;{0}'.format(hostname)) return fail('Could not send a forced custom notification for host "{0}"' ' while notifications were disabled'.format(hostname)) success('Successfully sent a forced custom notification for host ' '"{0}" while notifications were disabled'.format(hostname)) send_command('ENABLE_HOST_NOTIFICATIONS;{0}'.format(hostname)) sent_at = time.time() send_command('SEND_CUSTOM_HOST_NOTIFICATION;{0};0;{1};test' ''.format(hostname, USERNAME)) last_notification = get_one('GET hosts\nFilter: name = {0}' '\nColumns: last_notification' ''.format(hostname))['last_notification'] if not check_time(last_notification, sent_at): return fail('Could not send a custom host notification for host "{0}"' ' after notifications were re-enabled'.format(hostname)) return success('Successfully sent a a custom host notification for host "{0}"' ' after notifications were re-enabled'.format(hostname)) def test_custom_svc_notifications(hostname, servicename): check_time = lambda a, b: -1 < (a - b) < 1 send_command('DISABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename)) sent_at = time.time() send_command('SEND_CUSTOM_SVC_NOTIFICATION;{0};{1};0;{2};test' ''.format(hostname, servicename, USERNAME)) last_notification = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: last_notification' ''.format(hostname, servicename))['last_notification'] if check_time(last_notification, sent_at): send_command('ENABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename)) return fail('Sent a custom service notification for service "{0}" on host "{1}"' ' while notifications were disabled'.format(servicename, hostname)) success('It is not possible to send custom service notifications for service "{0}" on' ' host "{1}" while notifications are disabled'.format(servicename, hostname)) sent_at = time.time() send_command('SEND_CUSTOM_SVC_NOTIFICATION;{0};{1};2;{2};test' ''.format(hostname, servicename, USERNAME)) last_notification = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: last_notification' ''.format(hostname, servicename))['last_notification'] if not check_time(last_notification, sent_at): send_command('ENABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename)) return fail('Could not send a forced custom notification for service "{0}" on host ' '"{1}" while notifications were disabled'.format(servicename, hostname)) success('Successfully sent a forced custom notification for service "{0}" on host' ' "{0}" while notifications were disabled'.format(servicename, hostname)) send_command('ENABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename)) sent_at = time.time() send_command('SEND_CUSTOM_SVC_NOTIFICATION;{0};{1};0;{2};test' ''.format(hostname, servicename, USERNAME)) last_notification = get_one('GET services\nFilter: host_name = {0}' '\nFilter: description = {1}' '\nColumns: last_notification' ''.format(hostname, servicename))['last_notification'] if not check_time(last_notification, sent_at): return fail('Could not send a custom service notification for service "{0}" on host ' '"{1}" after notifications were re-enabled'.format(servicename, hostname)) return success('Successfully sent a custom service notification for service "{0}" on host' ' "{1}" after notifications were re-enabled'.format(servicename, hostname)) def test_hostgroup_commands(): # This test requires the "files/configs/groups.conf" configuration file failure = False send_command('CHANGE_HOST_MODATTR;localhost;0') localhost_services = [d['description'] for d in send_query('GET services\nColumns: description' '\nFilter: host_name = localhost')] for svc_desc in localhost_services: send_command('CHANGE_SVC_MODATTR;localhost;{0};0'.format(svc_desc)) send_command('DISABLE_HOSTGROUP_HOST_CHECKS;linux-servers') send_command('DISABLE_HOSTGROUP_PASSIVE_HOST_CHECKS;linux-servers') send_command('DISABLE_HOSTGROUP_HOST_NOTIFICATIONS;linux-servers') host_info = get_one('GET hosts\nFilter: name = localhost') if host_info['active_checks_enabled'] != 0: fail('Could not disable active checks for host "{0}" that is part' ' of hostgroup "linux-servers"'.format(host_info['name'])) failure = True else: success('Successfully disabled active checks for host "{0}" that is ' 'part of hostgroup "linux-servers"'.format(host_info['name'])) if host_info['accept_passive_checks'] != 0: fail('Could not disable passive checks for host "{0}" that is part' ' of hostgroup "linux-servers"'.format(host_info['name'])) failure = True else: success('Successfully disabled passive checks for host "{0}" that is ' 'part of hostgroup "linux-servers"'.format(host_info['name'])) if host_info['notifications_enabled'] != 0: fail('Could not disable notifications for host "{0}" that is part' ' of hostgroup "linux-servers"'.format(host_info['name'])) failure = True else: success('Successfully disabled notifications for host "{0}" that is ' 'part of hostgroup "linux-servers"'.format(host_info['name'])) send_command('ENABLE_HOSTGROUP_HOST_CHECKS;linux-servers') send_command('ENABLE_HOSTGROUP_PASSIVE_HOST_CHECKS;linux-servers') send_command('ENABLE_HOSTGROUP_HOST_NOTIFICATIONS;linux-servers') host_info = get_one('GET hosts\nFilter: name = localhost') if host_info['active_checks_enabled'] != 1: fail('Could not enable active checks for host "{0}" that is part' ' of hostgroup "linux-servers"'.format(host_info['name'])) failure = True else: success('Successfully enabled active checks for host "{0}" that is ' 'part of hostgroup "linux-servers"'.format(host_info['name'])) if host_info['accept_passive_checks'] != 1: fail('Could not enable passive checks for host "{0}" that is part' ' of hostgroup "linux-servers"'.format(host_info['name'])) failure = True else: success('Successfully enabled passive checks for host "{0}" that is ' 'part of hostgroup "linux-servers"'.format(host_info['name'])) if host_info['notifications_enabled'] != 1: fail('Could not enable notifications for host "{0}" that is part' ' of hostgroup "linux-servers"'.format(host_info['name'])) failure = True else: success('Successfully enabled notifications for host "{0}" that is ' 'part of hostgroup "linux-servers"'.format(host_info['name'])) send_command('DISABLE_HOSTGROUP_SVC_CHECKS;linux-servers') send_command('DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS;linux-servers') send_command('DISABLE_HOSTGROUP_SVC_NOTIFICATIONS;linux-servers') for svc_info in send_query('GET services\nFilter: host_name = localhost'): if svc_info['active_checks_enabled'] != 0: fail('Could not disable active checks for service "{0}" on ' 'host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) failure = True else: success('Successfully disabled active checks for service "{0}" ' 'on host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) if svc_info['accept_passive_checks'] != 0: fail('Could not disable passive checks for service "{0}" on ' 'host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) failure = True else: success('Successfully disabled passive checks for service "{0}" ' 'on host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) if svc_info['notifications_enabled'] != 0: fail('Could not disable notifications for service "{0}" on ' 'host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) failure = True else: success('Successfully disabled notifications for service "{0}" on' ' host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) send_command('ENABLE_HOSTGROUP_SVC_CHECKS;linux-servers') send_command('ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS;linux-servers') send_command('ENABLE_HOSTGROUP_SVC_NOTIFICATIONS;linux-servers') for svc_info in send_query('GET services\nFilter: host_name = localhost'): if svc_info['active_checks_enabled'] != 1: fail('Could not enable active checks for service "{0}" on ' 'host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) failure = True else: success('Successfully enabled active checks for service "{0}" ' 'on host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) if svc_info['accept_passive_checks'] != 1: fail('Could not enable passive checks for service "{0}" on ' 'host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) failure = True else: success('Successfully enabled passive checks for service "{0}" ' 'on host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) if svc_info['notifications_enabled'] != 1: fail('Could not enable notifications for service "{0}" on ' 'host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) failure = True else: success('Successfully enabled notifications for service "{0}" on' ' host "{1}" that is part of hostgroup "linux-servers"' ''.format(svc_info['description'], svc_info['host_name'])) return failure def test_servicegroup_commands(): # This test requires the "files/configs/groups.conf" configuration file failure = False send_command('CHANGE_HOST_MODATTR;localhost;0') aservicegroup_services = send_query('GET services\nColumns: description host_name' '\nFilter: groups >= aservicegroup') for svc_info in aservicegroup_services: send_command('CHANGE_SVC_MODATTR;{0};{1};0'.format(svc_info['host_name'], svc_info['description'])) send_command('DISABLE_SERVICEGROUP_HOST_CHECKS;aservicegroup') send_command('DISABLE_SERVICEGROUP_HOST_NOTIFICATIONS;aservicegroup') send_command('DISABLE_SERVICEGROUP_PASSIVE_HOST_CHECKS;aservicegroup') aservicegroup_hosts = send_query('GET hosts\nFilter: name = localhost') for host_info in aservicegroup_hosts: if host_info['active_checks_enabled'] != 0: fail('Could not disable active checks for host "{0}" which has' ' services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) failure = True else: success('Successfully disabled active checks for host "{0}" which ' 'has services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) if host_info['accept_passive_checks'] != 0: fail('Could not disable passive checks for host "{0}" which has' ' services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) failure = True else: success('Successfully disabled passive checks for host "{0}" which ' 'has services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) if host_info['notifications_enabled'] != 0: fail('Could not disable notifications for host "{0}" which has' ' services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) failure = True else: success('Successfully disabled notifications for host "{0}" which ' 'has services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) send_command('ENABLE_SERVICEGROUP_HOST_CHECKS;aservicegroup') send_command('ENABLE_SERVICEGROUP_HOST_NOTIFICATIONS;aservicegroup') send_command('ENABLE_SERVICEGROUP_PASSIVE_HOST_CHECKS;aservicegroup') aservicegroup_hosts = send_query('GET hosts\nFilter: name = localhost') for host_info in aservicegroup_hosts: if host_info['active_checks_enabled'] != 1: fail('Could not enable active checks for host "{0}" which has' ' services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) failure = True else: success('Successfully enabled active checks for host "{0}" which ' 'has services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) if host_info['accept_passive_checks'] != 1: fail('Could not enable passive checks for host "{0}" which has' ' services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) failure = True else: success('Successfully enabled passive checks for host "{0}" which ' 'has services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) if host_info['notifications_enabled'] != 1: fail('Could not enable notifications for host "{0}" which has' ' services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) failure = True else: success('Successfully enabled notifications for host "{0}" which ' 'has services that are part of servicegroup "aservicegroup"' ''.format(host_info['name'])) send_command('DISABLE_SERVICEGROUP_SVC_CHECKS;aservicegroup') send_command('DISABLE_SERVICEGROUP_SVC_NOTIFICATIONS;aservicegroup') send_command('DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS;aservicegroup') aservicegroup_services = send_query('GET services\nFilter: groups >= aservicegroup') for svc_info in aservicegroup_services: if svc_info['active_checks_enabled'] != 0: fail('Could not disable active checks for service "{0}"' ' that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) failure = True else: success('Successfully disabled active checks for service ' '"{0}" that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) if svc_info['accept_passive_checks'] != 0: fail('Could not disable passive checks for service "{0}"' ' that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) failure = True else: success('Successfully disabled passive checks for service ' '"{0}" that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) if svc_info['notifications_enabled'] != 0: fail('Could not disable notifications for service "{0}"' ' that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) failure = True else: success('Successfully disabled notifications for service ' '"{0}" that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) send_command('ENABLE_SERVICEGROUP_SVC_CHECKS;aservicegroup') send_command('ENABLE_SERVICEGROUP_SVC_NOTIFICATIONS;aservicegroup') send_command('ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS;aservicegroup') aservicegroup_services = send_query('GET services\nFilter: groups >= aservicegroup') for svc_info in aservicegroup_services: if svc_info['active_checks_enabled'] != 1: fail('Could not enable active checks for service "{0}"' ' that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) failure = True else: success('Successfully enabled active checks for service ' '"{0}" that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) if svc_info['accept_passive_checks'] != 1: fail('Could not enable passive checks for service "{0}"' ' that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) failure = True else: success('Successfully enabled passive checks for service ' '"{0}" that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) if svc_info['notifications_enabled'] != 1: fail('Could not enable notifications for service "{0}"' ' that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) failure = True else: success('Successfully enabled notifications for service ' '"{0}" that is part of servicegroup "aservicegroup"' ''.format(svc_info['description'])) return failure def main(): failure = test_host_comments('localhost') #failure &= test_host_comments('nsca-ng') # Cannot work without a hostcheck! failure &= test_service_comments('localhost', 'disk') failure &= test_service_comments('nsca-ng', 'PassiveService1') failure &= test_host_downtimes('localhost') #failure &= test_host_downtimes('nsca-ng') failure &= test_service_downtimes('localhost', 'disk') failure &= test_service_downtimes('nsca-ng', 'PassiveService1') failure &= test_host_problem_acknowledgements('localhost') failure &= test_remove_host_acknowledgements('localhost') failure &= test_expiring_host_acknowledgements('localhost') failure &= test_change_host_check_command('localhost', 'disk') failure &= test_change_service_check_command('localhost', 'disk', 'ping4') failure &= test_change_host_check_timeperiod('localhost', 'none') failure &= test_change_service_check_timeperiod('localhost', 'disk', 'none') failure &= test_change_host_max_check_attempts('localhost') failure &= test_change_service_max_check_attempts('localhost', 'disk') failure &= test_change_host_normal_check_interval('localhost') failure &= test_change_service_normal_check_interval('localhost', 'disk') failure &= test_change_host_retry_check_interval('localhost') failure &= test_change_service_retry_check_interval('localhost', 'disk') failure &= test_change_host_modified_attributes('localhost') failure &= test_delay_service_notification('localhost', 'disk') failure &= test_delay_host_notification('localhost') failure &= test_disabling_scheduling_host_checks('localhost') failure &= test_disabling_scheduling_service_checks('localhost', 'disk') failure &= test_shutdown_restart_process() failure &= test_passive_host_checkresult_processing('localhost') failure &= test_passive_svc_checkresult_processing('localhost', 'disk') failure &= test_process_file('localhost') failure &= test_custom_host_notifications('localhost') failure &= test_custom_svc_notifications('localhost', 'disk') failure &= test_hostgroup_commands() failure &= test_servicegroup_commands() return 1 if failure else 0 if __name__ == '__main__': try: with utils.LiveStatusSocket(LIVESTATUS_PATH) as LIVESTATUS: sys.exit(main()) except (OSError, IOError, socket.error), e: error('Could not connect to Livestatus socket: {0} ({1})' ''.format(LIVESTATUS_PATH, unicode(e)))