mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-10-25 17:24:10 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			1216 lines
		
	
	
		
			64 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			1216 lines
		
	
	
		
			64 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python
 | |
| # coding: utf-8
 | |
| from __future__ import unicode_literals
 | |
| 
 | |
| import os
 | |
| import sys
 | |
| import time
 | |
| import random
 | |
| import socket
 | |
| import subprocess
 | |
| 
 | |
| try:
 | |
|     from subprocess import DEVNULL
 | |
| except ImportError:
 | |
|     from os import devnull
 | |
|     DEVNULL = open(devnull, 'w')
 | |
| 
 | |
| import utils
 | |
| 
 | |
| 
 | |
| ICINGA_UID = 'icinga'
 | |
| USERNAME = 'Icinga 2 Admin'
 | |
| MAX_CHECK_ATTEMPTS = 3
 | |
| LIVESTATUS_PATH = '/var/run/icinga2/cmd/livestatus'
 | |
| 
 | |
| 
 | |
| def send_command(command, quiet=False):
 | |
|     try:
 | |
|         return send_query('COMMAND [{0}] {1}'.format(int(time.time()), command))
 | |
|     except utils.LiveStatusError, error:
 | |
|         if not quiet:
 | |
|             sys.stderr.write('Failed to execute command: {0}\n\n{1}'.format(command, error))
 | |
| 
 | |
| 
 | |
| def send_query(query):
 | |
|     response = LIVESTATUS.query(query + '\nColumnHeaders: on')
 | |
|     if response:
 | |
|         header, result = response.pop(0), {}
 | |
|         return [dict((header[i], v) for i, v in enumerate(r)) for r in response]
 | |
|     return []
 | |
| 
 | |
| 
 | |
| def run_query(query, retries=3):
 | |
|     tries = 0
 | |
|     while True:
 | |
|         rs = next(iter(utils.run_mysql_query(query, b'/usr/bin/mysql')), {})
 | |
|         if tries == retries or any(True for v in rs.itervalues() if v is not None):
 | |
|             return rs
 | |
|         else:
 | |
|             tries += 1
 | |
|             time.sleep(1)
 | |
| 
 | |
| 
 | |
| def get_one(query):
 | |
|     return next(iter(send_query(query)), {})
 | |
| 
 | |
| 
 | |
| def restart_icinga():
 | |
|     LIVESTATUS.close()
 | |
|     subprocess.check_call('sudo service icinga2 restart', shell=True, stdout=DEVNULL)
 | |
|     LIVESTATUS.reconnect()
 | |
| 
 | |
| 
 | |
| def success(msg):
 | |
|     utils.Logger.ok(msg + '\n')
 | |
|     return False
 | |
| 
 | |
| 
 | |
| def fail(msg):
 | |
|     utils.Logger.fail(msg + '\n')
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def error(msg):
 | |
|     utils.Logger.error(msg + '\n')
 | |
|     return True
 | |
| 
 | |
| 
 | |
| def test_host_comments(hostname):
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     add_command = 'ADD_HOST_COMMENT;%s;{persistent};{author};{comment}' % hostname
 | |
|     del_command = 'DEL_HOST_COMMENT;{id}'
 | |
|     comment_query = '\n'.join(['GET comments', 'Filter: type = 1',
 | |
|                                'Filter: comment = {comment}',
 | |
|                                'Columns: id'])
 | |
| 
 | |
|     send_command(add_command.format(persistent=0, author=USERNAME, comment=comment))
 | |
|     comment_info = get_one(comment_query.format(comment=comment))
 | |
|     if comment_info:
 | |
|         success('Successfully added comment to host "{0}"'.format(hostname))
 | |
|         send_command(del_command.format(id=comment_info['id']))
 | |
|         if get_one(comment_query.format(comment=comment)):
 | |
|             return fail('Could not remove comment with id #{0} from host "{1}"'
 | |
|                         ''.format(comment_info['id'], hostname))
 | |
|         return success('Successfully removed comment from host "{0}"'
 | |
|                            ''.format(hostname))
 | |
|     return fail('Could not add comment to host "{0}"'.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_service_comments(hostname, servicename):
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     add_command = 'ADD_SVC_COMMENT;%s;%s;{persistent};{author}' \
 | |
|                   ';{comment}' % (hostname, servicename)
 | |
|     del_command = 'DEL_SVC_COMMENT;{id}'
 | |
|     comment_query = '\n'.join(['GET comments', 'Filter: type = 2',
 | |
|                                'Filter: comment = {comment}',
 | |
|                                'Columns: id'])
 | |
| 
 | |
|     send_command(add_command.format(persistent=0, author=USERNAME, comment=comment))
 | |
|     comment_info = get_one(comment_query.format(comment=comment))
 | |
|     if comment_info:
 | |
|         success('Successfully added comment to service "{0}" of host "{1}"'
 | |
|                 ''.format(servicename, hostname))
 | |
|         send_command(del_command.format(id=comment_info['id']))
 | |
|         if get_one(comment_query.format(comment=comment)):
 | |
|             return fail('Could not remove comment with id #{0} from service "{1}"'
 | |
|                         ''.format(comment_info['id'], servicename))
 | |
|         return success('Successfully removed comment from service "{0}"'
 | |
|                        ''.format(servicename))
 | |
|     return fail('Could not add comment to service "{0}" of host "{1}"'
 | |
|                 ''.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_host_downtimes(hostname):
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     start, end = time.time() + 20, time.time() + 320
 | |
|     add_command = 'SCHEDULE_HOST_DOWNTIME;%s;{start};{end};1;0' \
 | |
|                   ';0;{author};{comment}' % hostname
 | |
|     del_command = 'DEL_HOST_DOWNTIME;{id}'
 | |
|     downtime_query = '\n'.join(['GET downtimes', 'Filter: triggered_by = 0',
 | |
|                                 'Filter: duration = 0', 'Filter: fixed = 1',
 | |
|                                 'Filter: comment = {comment}',
 | |
|                                 'Columns: id'])
 | |
| 
 | |
|     send_command(add_command.format(start=start, end=end, author=USERNAME,
 | |
|                                     comment=comment))
 | |
|     downtime_info = get_one(downtime_query.format(comment=comment))
 | |
|     if downtime_info:
 | |
|         success('Successfully scheduled downtime for host "{0}"'.format(hostname))
 | |
|         send_command(del_command.format(id=downtime_info['id']))
 | |
|         if get_one(downtime_query.format(comment=comment)):
 | |
|             return fail('Could not remove downtime for host "{0}"'.format(hostname))
 | |
|         return success('Successfully removed downtime for host "{0}"'.format(hostname))
 | |
|     return fail('Could not schedule downtime for host "{0}"'.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_service_downtimes(hostname, servicename):
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     start, end = time.time() + 20, time.time() + 320
 | |
|     add_command = 'SCHEDULE_SVC_DOWNTIME;%s;%s;{start};{end};1;0;0' \
 | |
|                   ';{author};{comment}' % (hostname, servicename)
 | |
|     del_command = 'DEL_SVC_DOWNTIME;{id}'
 | |
|     downtime_query = '\n'.join(['GET downtimes', 'Filter: triggered_by = 0',
 | |
|                                 'Filter: duration = 0', 'Filter: fixed = 1',
 | |
|                                 'Filter: comment = {comment}',
 | |
|                                 'Columns: id'])
 | |
| 
 | |
|     send_command(add_command.format(start=start, end=end, author=USERNAME,
 | |
|                                     comment=comment))
 | |
|     downtime_info = get_one(downtime_query.format(comment=comment))
 | |
|     if downtime_info:
 | |
|         success('Successfully scheduled downtime for service "{0}" of host "{1}"'
 | |
|                 ''.format(servicename, hostname))
 | |
|         send_command(del_command.format(id=downtime_info['id']))
 | |
|         if get_one(downtime_query.format(comment=comment)):
 | |
|             return fail('Could not remove downtime for service "{0}" of host "{1}"'
 | |
|                         ''.format(servicename, hostname))
 | |
|         return success('Successfully removed downtime for service "{0}" of host "{1}"'
 | |
|                        ''.format(servicename, hostname))
 | |
|     return fail('Could not schedule downtime for service "{0}" of host "{1}"'
 | |
|                 ''.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_host_problem_acknowledgements(hostname):
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname))
 | |
|     send_command('ACKNOWLEDGE_HOST_PROBLEM;{0};0;0;0;{1};{2}'
 | |
|                  ''.format(hostname, USERNAME, comment))
 | |
|     ack_info = get_one('GET comments\nFilter: comment = {0}'
 | |
|                        '\nFilter: entry_type = 4\nColumns: id'.format(comment))
 | |
|     if ack_info:
 | |
|         success('Acknowledgement for host "{0}" has been processed'.format(hostname))
 | |
|         host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                             '\nColumns: acknowledged'.format(hostname))
 | |
|         if host_info['acknowledged'] != 1:
 | |
|             return fail('Could not acknowledge problem on host "{0}"'.format(hostname))
 | |
|     else:
 | |
|         return fail('Acknowledgement for host "{0}" has not been processed'
 | |
|                     ''.format(hostname))
 | |
|     success('Successfully acknowledged problem on host "{0}"'.format(hostname))
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};0;ok'.format(hostname))
 | |
|     if get_one('GET comments\nFilter: id = {0}'.format(ack_info['id'])):
 | |
|         return fail('Non sticky acknowledgements are not removed if their host recovers')
 | |
|     success('Non sticky acknowledgements are removed if their host recovers')
 | |
| 
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname))
 | |
|     send_command('ACKNOWLEDGE_HOST_PROBLEM;{0};1;0;0;{1};{2}'
 | |
|                  ''.format(hostname, USERNAME, comment))
 | |
|     ack_info = get_one('GET comments\nFilter: comment = {0}'
 | |
|                        '\nFilter: entry_type = 4\nColumns: id'.format(comment))
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};0;ok'.format(hostname))
 | |
|     if not get_one('GET comments\nFilter: id = {0}'.format(ack_info['id'])):
 | |
|         return fail('Sticky acknowledgements are removed if their host recovers')
 | |
|     send_command('DEL_HOST_COMMENT;{0}'.format(ack_info['id']))
 | |
|     success('Sticky acknowledgements need to be manually'
 | |
|             ' removed once their host has recovered')
 | |
| 
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: acknowledged state'
 | |
|                         ''.format(hostname))
 | |
|     if host_info['state'] == 0 and host_info['acknowledged'] == 1:
 | |
|         return fail('Host "{0}" is still acknowledged though it is not DOWN anymore'
 | |
|                     ''.format(hostname))
 | |
|     return success('Hosts are not acknowledged anymore once their state changes')
 | |
| 
 | |
| 
 | |
| def test_remove_host_acknowledgements(hostname):
 | |
|     comment = 'test{0:.4}'.format(random.random())
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname))
 | |
|     send_command('ACKNOWLEDGE_HOST_PROBLEM;{0};0;0;0;{1};{2}'
 | |
|                  ''.format(hostname, USERNAME, comment))
 | |
|     ack_info = get_one('GET comments\nFilter: comment = {0}\nColumns: id'
 | |
|                        ''.format(comment))
 | |
|     if not ack_info:
 | |
|         return fail('Acknowledgement for host "{0}" has not been processed'
 | |
|                     ''.format(hostname))
 | |
|     send_command('REMOVE_HOST_ACKNOWLEDGEMENT;{0}'.format(hostname))
 | |
|     if get_one('GET comments\nFilter: id = {0}'.format(ack_info['id'])):
 | |
|         return fail('Acknowledgement related comments are not removed'
 | |
|                     ' when manually removing an acknowledgement')
 | |
|     return success('Acknowledgement related comments are removed when'
 | |
|                    ' manually removing an acknowledgement')
 | |
| 
 | |
| 
 | |
| def test_expiring_host_acknowledgements(hostname):
 | |
|     comment, expire_time = 'test{0:.4}'.format(random.random()), time.time() + 5
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};1;fail'.format(hostname))
 | |
|     send_command('ACKNOWLEDGE_HOST_PROBLEM_EXPIRE;{0};0;0;0;{1};{2};{3}'
 | |
|                  ''.format(hostname, expire_time, USERNAME, comment))
 | |
|     time.sleep(expire_time - time.time())
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: acknowledged'
 | |
|                         ''.format(hostname))
 | |
|     if host_info['acknowledged'] == 1:
 | |
|         send_command('REMOVE_HOST_ACKNOWLEDGEMENT;{0}'.format(hostname))
 | |
|         return fail('Acknowledgements do not expire as desired')
 | |
|     return success('Acknowledgements do expire as desired')
 | |
| 
 | |
| 
 | |
| def test_change_host_check_command(hostname, command):
 | |
|     old_command = get_one('GET hosts\nFilter: name = {0}\nColumns: check_command'
 | |
|                           ''.format(hostname))['check_command']
 | |
|     send_command('CHANGE_HOST_CHECK_COMMAND;{0};{1}'.format(hostname, command))
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: check_command'
 | |
|                         ''.format(hostname))
 | |
|     if host_info['check_command'] != command:
 | |
|         return fail('Could not change the check command of host "{0}" to "{1}"'
 | |
|                     ''.format(hostname, command))
 | |
|     send_command('CHANGE_HOST_CHECK_COMMAND;{0};{1}'.format(hostname, old_command))
 | |
|     return success('Successfully changed check command of host "{0}" to "{1}"'
 | |
|                    ''.format(hostname, command))
 | |
| 
 | |
| 
 | |
| def test_change_host_check_timeperiod(hostname, period):
 | |
|     old_period = get_one('GET hosts\nFilter: name = {0}\nColumns: check_period'
 | |
|                          ''.format(hostname))['check_period']
 | |
|     send_command('CHANGE_HOST_CHECK_TIMEPERIOD;{0};{1}'.format(hostname, period))
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: check_period'
 | |
|                         ''.format(hostname))
 | |
|     if host_info['check_period'] != period:
 | |
|         return fail('Could not change the check period of host "{0}" to "{1}"'
 | |
|                     ''.format(hostname, period))
 | |
|     send_command('CHANGE_HOST_CHECK_TIMEPERIOD;{0};{1}'.format(hostname, period))
 | |
|     return success('Successfully changed check period of host "{0}" to "{1}"'
 | |
|                    ''.format(hostname, period))
 | |
| 
 | |
| 
 | |
| def test_change_host_modified_attributes(hostname):
 | |
|     send_command('CHANGE_MAX_HOST_CHECK_ATTEMPTS;{0};1337'.format(hostname))
 | |
|     restart_icinga()
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: max_check_attempts'.format(hostname))
 | |
|     if host_info['max_check_attempts'] != 1337:
 | |
|         return fail('Modified attributes of host "{0}" seem not to be '
 | |
|                     'persisted between application restarts'.format(hostname))
 | |
|     else:
 | |
|         success('Modified attributes are being persisted'
 | |
|                 ' between application restarts')
 | |
|     send_command('CHANGE_HOST_MODATTR;{0};0'.format(hostname))
 | |
|     restart_icinga()
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: max_check_attempts'.format(hostname))
 | |
|     if host_info['max_check_attempts'] == 1337:
 | |
|         return fail('Re-setting the "modified attributes" value does not'
 | |
|                     ' seem to invalidate any modified attributes')
 | |
|     return success('Re-setting the "modified attributes" value seems to'
 | |
|                    ' invalidate the modified attributes')
 | |
| 
 | |
| 
 | |
| def test_change_host_max_check_attempts(hostname):
 | |
|     old_value = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: max_check_attempts'
 | |
|                         ''.format(hostname))['max_check_attempts']
 | |
|     send_command('CHANGE_MAX_HOST_CHECK_ATTEMPTS;{0};1234'.format(hostname))
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: max_check_attempts'.format(hostname))
 | |
|     if host_info['max_check_attempts'] != 1234:
 | |
|         return fail('Could not change the maximum number of check'
 | |
|                     ' attempts for host "{0}"'.format(hostname))
 | |
|     send_command('CHANGE_MAX_HOST_CHECK_ATTEMPTS;{0};{1}'.format(hostname,
 | |
|                                                                  old_value))
 | |
|     return success('Successfully changed maximum number of check'
 | |
|                    ' attempts for host "{0}"'.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_change_service_max_check_attempts(hostname, servicename):
 | |
|     old_value = get_one('GET services\nFilter: host_name = {0}'
 | |
|                         '\nFilter: description = {1}'
 | |
|                         '\nColumns: max_check_attempts'
 | |
|                         ''.format(hostname, servicename))['max_check_attempts']
 | |
|     send_command('CHANGE_MAX_SVC_CHECK_ATTEMPTS;{0};{1};1234'
 | |
|                  ''.format(hostname, servicename))
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}'
 | |
|                        '\nColumns: max_check_attempts'.format(hostname,
 | |
|                                                               servicename))
 | |
|     if svc_info['max_check_attempts'] != 1234:
 | |
|         return fail('Could not change the maximum number of check attempts for '
 | |
|                     'service "{0}" on host "{1}"'.format(servicename, hostname))
 | |
|     send_command('CHANGE_MAX_SVC_CHECK_ATTEMPTS;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, old_value))
 | |
|     return success('Successfully changed maximum number of check attempts '
 | |
|                    'for service "{0}" on host "{1}"'.format(servicename,
 | |
|                                                             hostname))
 | |
| 
 | |
| 
 | |
| def test_change_host_normal_check_interval(hostname):
 | |
|     old_value = get_one('GET hosts\nFilter: name = {0}\nColumns: check_interval'
 | |
|                         ''.format(hostname))['check_interval']
 | |
|     send_command('CHANGE_NORMAL_HOST_CHECK_INTERVAL;{0};7200'.format(hostname))
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: check_interval'
 | |
|                         ''.format(hostname))
 | |
|     if host_info['check_interval'] != 7200:
 | |
|         return fail('Could not change the check interval of host "{0}"'
 | |
|                     ''.format(hostname))
 | |
|     send_command('CHANGE_NORMAL_HOST_CHECK_INTERVAL;{0};{1}'.format(hostname,
 | |
|                                                                     old_value))
 | |
|     return success('Successfully changed check interval of host "{0}"'
 | |
|                    ''.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_change_service_normal_check_interval(hostname, servicename):
 | |
|     old_value = get_one('GET services\nFilter: host_name = {0}'
 | |
|                         '\nFilter: description = {1}'
 | |
|                         '\nColumns: check_interval'
 | |
|                         ''.format(hostname, servicename))['check_interval']
 | |
|     send_command('CHANGE_NORMAL_SVC_CHECK_INTERVAL;{0};{1};7200'
 | |
|                  ''.format(hostname, servicename))
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}'
 | |
|                        '\nColumns: check_interval'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['check_interval'] != 7200:
 | |
|         return fail('Could not change the check interval of service "{0}"'
 | |
|                     ' on host "{1}"'.format(servicename, hostname))
 | |
|     send_command('CHANGE_NORMAL_SVC_CHECK_INTERVAL;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, old_value))
 | |
|     return success('Successfully changed check interval of service '
 | |
|                    '"{0}" on host "{1}"'.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_change_host_retry_check_interval(hostname):
 | |
|     old_value = get_one('GET hosts\nFilter: name = {0}\nColumns: retry_interval'
 | |
|                         ''.format(hostname))['retry_interval']
 | |
|     send_command('CHANGE_RETRY_HOST_CHECK_INTERVAL;{0};3600'.format(hostname))
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}\nColumns: retry_interval'
 | |
|                         ''.format(hostname))
 | |
|     if host_info['retry_interval'] != 3600:
 | |
|         return fail('Could not change the retry interval of host "{0}"'
 | |
|                     ''.format(hostname))
 | |
|     send_command('CHANGE_RETRY_HOST_CHECK_INTERVAL;{0};{1}'.format(hostname,
 | |
|                                                                    old_value))
 | |
|     return success('Successfully changed retry interval of host "{0}"'
 | |
|                    ''.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_change_service_retry_check_interval(hostname, servicename):
 | |
|     old_value = get_one('GET services\nFilter: host_name = {0}'
 | |
|                         '\nFilter: description = {1}'
 | |
|                         '\nColumns: retry_interval'
 | |
|                         ''.format(hostname, servicename))['retry_interval']
 | |
|     send_command('CHANGE_RETRY_SVC_CHECK_INTERVAL;{0};{1};3600'
 | |
|                  ''.format(hostname, servicename))
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}'
 | |
|                        '\nColumns: retry_interval'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['retry_interval'] != 3600:
 | |
|         return fail('Could not change retry interval of service "{0}"'
 | |
|                     ' on host "{1}"'.format(servicename, hostname))
 | |
|     send_command('CHANGE_RETRY_SVC_CHECK_INTERVAL;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, old_value))
 | |
|     return success('Successfully changed retry interval of service "{0}"'
 | |
|                    ' on host "{1}"'.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_change_service_check_command(hostname, servicename, command):
 | |
|     old_value = get_one('GET services\nFilter: host_name = {0}'
 | |
|                         '\nFilter: description = {1}'
 | |
|                         '\nColumns: check_command'
 | |
|                         ''.format(hostname, servicename))['check_command']
 | |
|     send_command('CHANGE_SVC_CHECK_COMMAND;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, command))
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}'
 | |
|                        '\nColumns: check_command'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['check_command'] != command:
 | |
|         return fail('Could not change the check command of service "{0}" on '
 | |
|                     'host "{1}" to "{2}"'.format(servicename, hostname, command))
 | |
|     send_command('CHANGE_SVC_CHECK_COMMAND;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, old_value))
 | |
|     return success('Successfully changed check command of service "{0}" on'
 | |
|                    ' host "{1}"'.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_change_service_check_timeperiod(hostname, servicename, period):
 | |
|     old_value = get_one('GET services\nFilter: host_name = {0}'
 | |
|                         '\nFilter: description = {1}'
 | |
|                         '\nColumns: check_period'
 | |
|                         ''.format(hostname, servicename))['check_period']
 | |
|     send_command('CHANGE_SVC_CHECK_TIMEPERIOD;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, period))
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}'
 | |
|                        '\nColumns: check_period'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['check_period'] != period:
 | |
|         return fail('Could not change the check period of service "{0}" on host'
 | |
|                     ' "{1}" to "{2}"'.format(servicename, hostname, period))
 | |
|     send_command('CHANGE_SVC_CHECK_TIMEPERIOD;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, old_value))
 | |
|     return success('Successfully changed check period of service "{0}"'
 | |
|                    ' on host "{1}"'.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_delay_host_notification(hostname):
 | |
|     # This test assumes that the notification interval
 | |
|     # for the given host is set to 10 seconds
 | |
|     send_command('DISABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|     for _ in xrange(MAX_CHECK_ATTEMPTS):
 | |
|         send_command('PROCESS_HOST_CHECK_RESULT;{0};1;down'.format(hostname))
 | |
|     state_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                          '\nColumns: state state_type last_hard_state_change'
 | |
|                          ''.format(hostname))
 | |
|     if state_info['state'] != 1 or state_info['state_type'] != 1:
 | |
|         send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|         return fail('Could not switch state of host "{0}"'
 | |
|                     ' to DOWN (HARD)'.format(hostname))
 | |
|     time.sleep(1)
 | |
|     note_info1 = get_one('GET hosts\nFilter: name = {0}'
 | |
|                          '\nColumns: last_notification'.format(hostname))
 | |
|     if not -1 < (note_info1['last_notification'] - \
 | |
|             state_info['last_hard_state_change']) < 1:
 | |
|         send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|         return fail('Switching host "{0}" to state DOWN (HARD) does not '
 | |
|                     'cause a notification being sent out'.format(hostname))
 | |
|     send_command('DELAY_HOST_NOTIFICATION;{0};{1}'
 | |
|                  ''.format(hostname, note_info1['last_notification'] + 20))
 | |
|     time.sleep(5 + 10 - (time.time() - note_info1['last_notification']))
 | |
|     note_info2 = get_one('GET hosts\nFilter: name = {0}'
 | |
|                          '\nColumns: last_notification'.format(hostname))
 | |
|     if note_info2['last_notification'] != note_info1['last_notification']:
 | |
|         send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|         return fail('There were a notification being sent out for host '
 | |
|                     '"{0}" before the time that it should be delayed to'
 | |
|                     ''.format(hostname))
 | |
|     time.sleep(5 + note_info1['last_notification'] + 20 - time.time())
 | |
|     send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|     note_info3 = get_one('GET hosts\nFilter: name = {0}'
 | |
|                          '\nColumns: last_notification'.format(hostname))
 | |
|     if note_info3['last_notification'] < note_info1['last_notification'] + 20:
 | |
|         return fail('Could not delay notification for'
 | |
|                     ' host "{0}"'.format(hostname))
 | |
|     return success('Successfully delayed notification'
 | |
|                    ' for host "{0}"'.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_delay_service_notification(hostname, servicename):
 | |
|     # This test assumes that the notification interval
 | |
|     # for the given service is set to 10 seconds
 | |
|     send_command('DISABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|     for _ in xrange(MAX_CHECK_ATTEMPTS):
 | |
|         send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};2;critical'
 | |
|                      ''.format(hostname, servicename))
 | |
|     state_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                          '\nFilter: description = {1}'
 | |
|                          '\nColumns: state state_type last_hard_state_change'
 | |
|                          ''.format(hostname, servicename))
 | |
|     if state_info['state'] != 2 or state_info['state_type'] != 1:
 | |
|         send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('Could not switch state of service "{0}" on host "{1}"'
 | |
|                     ' to CRITICAL (HARD)'.format(servicename, hostname))
 | |
|     time.sleep(1)
 | |
|     note_info1 = get_one('GET services\nFilter: host_name = {0}'
 | |
|                          '\nFilter: description = {1}'
 | |
|                          '\nColumns: last_notification'
 | |
|                          ''.format(hostname, servicename))
 | |
|     if not -1 < (note_info1['last_notification'] - \
 | |
|             state_info['last_hard_state_change']) < 1:
 | |
|         send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('Switching service "{0}" on host "{1}" to state CRITICAL '
 | |
|                     '(HARD) does not cause a notification being sent out'
 | |
|                     ''.format(servicename, hostname))
 | |
|     send_command('DELAY_SVC_NOTIFICATION;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename,
 | |
|                            note_info1['last_notification'] + 20))
 | |
|     time.sleep(5 + 10 - (time.time() - note_info1['last_notification']))
 | |
|     note_info2 = get_one('GET services\nFilter: host_name = {0}'
 | |
|                          '\nFilter: description = {1}'
 | |
|                          '\nColumns: last_notification'
 | |
|                          ''.format(hostname, servicename))
 | |
|     if note_info2['last_notification'] != note_info1['last_notification']:
 | |
|         send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('There were a notification being sent out for service "{0}" '
 | |
|                     'on host "{1}" before the time that it should be delayed to'
 | |
|                     ''.format(servicename, hostname))
 | |
|     time.sleep(5 + note_info1['last_notification'] + 20 - time.time())
 | |
|     send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|     note_info3 = get_one('GET services\nFilter: host_name = {0}'
 | |
|                          '\nFilter: description = {1}'
 | |
|                          '\nColumns: last_notification'
 | |
|                          ''.format(hostname, servicename))
 | |
|     if note_info3['last_notification'] < note_info1['last_notification'] + 20:
 | |
|         return fail('Could not delay notification for service "{0}"'
 | |
|                     ' on host "{0}"'.format(servicename, hostname))
 | |
|     return success('Successfully delayed notification for service "{0}"'
 | |
|                    ' on host "{0}"'.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_disabling_scheduling_host_checks(hostname):
 | |
|     # This test assumes that icinga is writing the hostchecks db table (DbCatCheck)
 | |
|     compare_time = lambda a, b: -2 < a - b < 2
 | |
|     start_time_query = ('SELECT UNIX_TIMESTAMP(MAX(start_time)) as start_time'
 | |
|                         ' FROM icinga_hostchecks AS c'
 | |
|                         ' INNER JOIN icinga_hosts AS h'
 | |
|                         ' ON h.host_object_id = c.host_object_id'
 | |
|                         ' WHERE h.alias = "{0}"'
 | |
|                         '').format(hostname)
 | |
|     # Need to use a sql query here because prior to this test another one ran
 | |
|     # which submitted a passive checkresult and with livestatus it is only
 | |
|     # possible to access the execution of the last result, which in this case
 | |
|     # is the passive one without a start and end time.
 | |
|     execution_time = float(run_query('SELECT MAX(c.execution_time) AS e_time'
 | |
|                                      ' FROM icinga_hostchecks AS c'
 | |
|                                      ' INNER JOIN icinga_hosts AS h'
 | |
|                                      ' ON h.host_object_id = c.host_object_id'
 | |
|                                      ' WHERE h.alias = "{0}"'
 | |
|                                      ''.format(hostname)).get('e_time', -1))
 | |
|     if execution_time == -1:
 | |
|         return fail('Unable to fetch the maximum execution time of'
 | |
|                     ' host "{0}" from the IDO'.format(hostname))
 | |
| 
 | |
|     send_command('DISABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|     scheduled_check = time.time() + 4
 | |
|     send_command('SCHEDULE_HOST_CHECK;{0};{1}'.format(hostname, scheduled_check))
 | |
|     time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval
 | |
|     last_check = float(run_query(start_time_query).get('start_time') or 0)
 | |
|     if compare_time(last_check, scheduled_check + execution_time):
 | |
|         send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|         return fail('Could not disable active checks on host "{0}"'
 | |
|                     ''.format(hostname))
 | |
|     success('Successfully disabled active checks on host "{0}"'.format(hostname))
 | |
|     scheduled_check = time.time() + 4
 | |
|     send_command('SCHEDULE_FORCED_HOST_CHECK;{0};{1}'
 | |
|                  ''.format(hostname, scheduled_check))
 | |
|     time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval
 | |
|     last_check = float(run_query(start_time_query).get('start_time') or 0)
 | |
|     if not compare_time(last_check, scheduled_check + execution_time):
 | |
|         send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|         return fail('Could not schedule a forced check on host "{0}" while'
 | |
|                     ' active checks were disabled'.format(hostname))
 | |
|     success('Successfully scheduled a forced check on host "{0}" '
 | |
|             'while active checks were disabled'.format(hostname))
 | |
|     send_command('ENABLE_HOST_CHECK;{0}'.format(hostname))
 | |
|     scheduled_check = time.time() + 4
 | |
|     send_command('SCHEDULE_HOST_CHECK;{0};{1}'
 | |
|                  ''.format(hostname, scheduled_check))
 | |
|     time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval
 | |
|     last_check = float(run_query(start_time_query).get('start_time') or 0)
 | |
|     if not compare_time(last_check, scheduled_check + execution_time):
 | |
|         return fail('Could not schedule a check after re-enabling '
 | |
|                     'active checks for host "{0}"'.format(hostname))
 | |
|     return success('Successfully scheduled a check on host "{0}" after'
 | |
|                    ' re-enabling active checks'.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_disabling_scheduling_service_checks(hostname, servicename):
 | |
|     # This test assumes that icinga is writing the servicechecks db table (DbCatCheck)
 | |
|     start_time_query = ('SELECT UNIX_TIMESTAMP(MAX(start_time)) as start_time'
 | |
|                         ' FROM icinga_servicechecks AS c'
 | |
|                         ' INNER JOIN icinga_services AS s'
 | |
|                         ' ON s.service_object_id = c.service_object_id'
 | |
|                         ' INNER JOIN icinga_hosts as h'
 | |
|                         ' ON h.host_object_id = s.host_object_id'
 | |
|                         ' WHERE h.alias = "{0}" AND s.display_name = "{1}"'
 | |
|                         '').format(hostname, servicename)
 | |
|     compare_time = lambda a, b: -2 < a - b < 2
 | |
|     execution_time = float(run_query('SELECT MAX(c.execution_time) AS e_time'
 | |
|                                      ' FROM icinga_servicechecks AS c'
 | |
|                                      ' INNER JOIN icinga_services AS s'
 | |
|                                      ' ON s.service_object_id = c.service_object_id'
 | |
|                                      ' INNER JOIN icinga_hosts as h'
 | |
|                                      ' ON h.host_object_id = s.host_object_id'
 | |
|                                      ' WHERE h.alias = "{0}" AND s.display_name = "{1}"'
 | |
|                                      ''.format(hostname, servicename)).get('e_time', -1))
 | |
|     if execution_time == -1:
 | |
|         return fail('Unable to fetch the maximum execution time of service "{0}"'
 | |
|                     ' on host "{1}" from the IDO'.format(hostname, servicename))
 | |
| 
 | |
|     send_command('DISABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|     scheduled_check = time.time() + 4
 | |
|     send_command('SCHEDULE_SVC_CHECK;{0};{1};{2}'.format(hostname, servicename,
 | |
|                                                          scheduled_check))
 | |
|     time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval
 | |
|     last_check = float(run_query(start_time_query).get('start_time') or 0)
 | |
|     if compare_time(last_check, scheduled_check + execution_time):
 | |
|         send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('Could not disable active checks for service "{0}" on host'
 | |
|                     ' "{1}"'.format(servicename, hostname))
 | |
|     success('Successfully disabled active checks for service "{0}" on host'
 | |
|             '"{1}"'.format(servicename, hostname))
 | |
|     scheduled_check = time.time() + 4
 | |
|     send_command('SCHEDULE_FORCED_SVC_CHECK;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, scheduled_check))
 | |
|     time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval
 | |
|     last_check = float(run_query(start_time_query).get('start_time') or 0)
 | |
|     if not compare_time(last_check, scheduled_check + execution_time):
 | |
|         send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('Could not schedule a forced check for service "{0}"'
 | |
|                     ' on host "{1}" while active checks were disabled'
 | |
|                     ''.format(servicename, hostname))
 | |
|     success('Successfully scheduled a forced check for service "{0}" '
 | |
|             'on host "{1}" while active checks were disabled'
 | |
|             ''.format(servicename, hostname))
 | |
|     send_command('ENABLE_SVC_CHECK;{0};{1}'.format(hostname, servicename))
 | |
|     scheduled_check = time.time() + 4
 | |
|     send_command('SCHEDULE_SVC_CHECK;{0};{1};{2}'
 | |
|                  ''.format(hostname, servicename, scheduled_check))
 | |
|     time.sleep(execution_time + 4 + 5) # +5 as this is the transaction interval
 | |
|     last_check = float(run_query(start_time_query).get('start_time') or 0)
 | |
|     if not compare_time(last_check, scheduled_check + execution_time):
 | |
|         return fail('Could not schedule a check after re-enabling '
 | |
|                     'active checks for service "{0}" on host "{1}"'
 | |
|                     ''.format(servicename, hostname))
 | |
|     return success('Successfully scheduled a check for service "{0}" '
 | |
|                    'on host "{1}" after re-enabling active checks'
 | |
|                    ''.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_shutdown_restart_process():
 | |
|     get_pid = lambda: subprocess.Popen("ps ax | grep icinga[2] | awk '{print $1}'",
 | |
|                                        shell=1, stdout=subprocess.PIPE).stdout.read()
 | |
| 
 | |
|     send_command('SHUTDOWN_PROCESS')
 | |
|     time.sleep(2)
 | |
|     if get_pid():
 | |
|         return fail('Could not stop the icinga2 process by using SHUTDOWN_PROCESS')
 | |
|     success('Successfully stopped the icinga2 process with SHUTDOWN_PROCESS')
 | |
|     restart_icinga()
 | |
|     pid_before_restart = int(get_pid())
 | |
|     send_command('RESTART_PROCESS')
 | |
|     LIVESTATUS.close()
 | |
|     time.sleep(6)
 | |
|     pid_after_restart = get_pid()
 | |
|     if not pid_after_restart:
 | |
|         restart_icinga()
 | |
|         return fail('RESTART_PROCESS seems to just stop the process')
 | |
|     if int(pid_after_restart) == pid_before_restart:
 | |
|         return fail('RESTART_PROCESS does not restart the process at all')
 | |
|     LIVESTATUS.reconnect()
 | |
|     return success('Successfully restarted process by using RESTART_PROCESS')
 | |
| 
 | |
| 
 | |
| def test_passive_host_checkresult_processing(hostname):
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};1;blub'.format(hostname))
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: state plugin_output'.format(hostname))
 | |
|     if host_info['state'] != 1:
 | |
|         return fail('Could not submit a passive checkresult for host "{0}". Wrong'
 | |
|                     ' state: {1} != 1'.format(hostname, host_info['state']))
 | |
|     if host_info['plugin_output'] != 'blub':
 | |
|         return fail('Could not submit a passive checkresult for host "{0}". Wrong'
 | |
|                     ' output: "{1}" != "blub"'.format(hostname,
 | |
|                                                       host_info['plugin_output']))
 | |
|     success('Successfully submitted a passive checkresult for host "{0}"'
 | |
|             ''.format(hostname))
 | |
|     send_command('DISABLE_PASSIVE_HOST_CHECKS;{0}'.format(hostname))
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};0;nope'.format(hostname), True)
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: state'.format(hostname))
 | |
|     if host_info['state'] == 0:
 | |
|         send_command('ENABLE_PASSIVE_HOST_CHECKS;{0}'.format(hostname))
 | |
|         return fail('Submitted a passive checkresult for host "{0}" though the'
 | |
|                     ' submission of passive checkresults has been disabled'
 | |
|                     ''.format(hostname))
 | |
|     success('It is not possible to submit passive host checkresults'
 | |
|             ' while their submission has been disabled')
 | |
|     send_command('ENABLE_PASSIVE_HOST_CHECKS;{0}'.format(hostname))
 | |
|     send_command('PROCESS_HOST_CHECK_RESULT;{0};2;nope²'.format(hostname), True)
 | |
|     host_info = get_one('GET hosts\nFilter: name = {0}'
 | |
|                         '\nColumns: plugin_output'.format(hostname))
 | |
|     if host_info['plugin_output'] == 'nope²':
 | |
|         return fail('It is possible to submit a passive host checkresult'
 | |
|                     ' whose status code is 2 (UNREACHABLE)')
 | |
|     return success('It is not possible to submit a passive host checkresult'
 | |
|                    ' whose status code is 2 (UNREACHABLE)')
 | |
| 
 | |
| 
 | |
| def test_passive_svc_checkresult_processing(hostname, servicename):
 | |
|     send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};1;blub'
 | |
|                  ''.format(hostname, servicename))
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}'
 | |
|                        '\nColumns: state plugin_output'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['state'] != 1:
 | |
|         return fail('Could not submit a passive checkresult for service'
 | |
|                     ' "{0}" on host "{1}". Wrong state: {2} != 1'
 | |
|                     ''.format(servicename, hostname, svc_info['state']))
 | |
|     if svc_info['plugin_output'] != 'blub':
 | |
|         return fail('Could not submit a passive checkresult for service'
 | |
|                     ' "{0}" on host "{1}". Wrong output: "{2}" != "blub"'
 | |
|                     ''.format(servicename, hostname, svc_info['plugin_output']))
 | |
|     success('Successfully submitted a passive checkresult for service'
 | |
|             ' "{0}" on host "{1}"'.format(servicename, hostname))
 | |
|     send_command('DISABLE_PASSIVE_SVC_CHECKS;{0};{1}'
 | |
|                  ''.format(hostname, servicename))
 | |
|     send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};0;nope'
 | |
|                  ''.format(hostname, servicename), True)
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}\nColumns: state'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['state'] == 0:
 | |
|         send_command('ENABLE_PASSIVE_SVC_CHECKS;{0};{1}'
 | |
|                      ''.format(hostname, servicename))
 | |
|         return fail('Submitted a passive checkresult for service "{0}" on host '
 | |
|                     '"{1}" though the submission of passive checkresults has '
 | |
|                     'been disabled'.format(servicename, hostname))
 | |
|     success('It is not possible to submit passive service checkresults'
 | |
|             ' while their submission has been disabled')
 | |
|     send_command('ENABLE_PASSIVE_SVC_CHECKS;{0};{1}'
 | |
|                  ''.format(hostname, servicename))
 | |
|     send_command('PROCESS_SERVICE_CHECK_RESULT;{0};{1};0;yäh!'
 | |
|                  ''.format(hostname, servicename), True)
 | |
|     svc_info = get_one('GET services\nFilter: host_name = {0}'
 | |
|                        '\nFilter: description = {1}\nColumns: plugin_output'
 | |
|                        ''.format(hostname, servicename))
 | |
|     if svc_info['plugin_output'] != 'yäh!':
 | |
|         return fail('It is not possible to submit a passive service checkresult'
 | |
|                     ' whose output data contains non-ascii characters')
 | |
|     return success('It is possible to submit a passive service checkresult'
 | |
|                    ' whose output data contains non-ascii characters')
 | |
| 
 | |
| 
 | |
| def test_process_file(hostname):
 | |
|     with open('/tmp/comments.txt', 'w') as f:
 | |
|         f.write("""[{2}] ADD_HOST_COMMENT;{0};0;{1};TheFirst
 | |
| [{3}] ADD_HOST_COMMENT;{0};0;{1};TheSecond
 | |
| [{4}] ADD_HOST_COMMENT;{0};0;{1};TheThird
 | |
| [{5}] ADD_HOST_COMMENT;{0};0;{1};TheFourth
 | |
| [{6}] ADD_HOST_COMMENT;{0};0;{1};TheFifth
 | |
| """.format(hostname, USERNAME, *[time.time() + i for i in xrange(4, 9)]))
 | |
|     os.system('sudo chown {0} /tmp/comments.txt'.format(ICINGA_UID))
 | |
|     send_command('PROCESS_FILE;{0};1'.format('/tmp/comments.txt'))
 | |
| 
 | |
|     try:
 | |
|         with open('/tmp/comments.txt') as f:
 | |
|             pass
 | |
|     except IOError:
 | |
|         success('Files processed by PROCESS_FILE are deleted if'
 | |
|                 ' the delete-option is set to a non-zero value')
 | |
|     else:
 | |
|         os.remove('/tmp/comments.txt')
 | |
|         return fail('Files processed by PROCESS_FILE are not deleted if'
 | |
|                     ' the delete-option is set to an non-zero value')
 | |
| 
 | |
|     comments = send_query('GET comments\nColumns: id\nFilter: comment ~'
 | |
|                           ' ^The(First|Second|Third|Fourth|Fifth)$')
 | |
|     for comment in comments:
 | |
|         send_command('DEL_HOST_COMMENT;{0}'.format(comment['id']))
 | |
|     if len(comments) < 5:
 | |
|         return fail('Not all commands were processed by PROCESS_FILE')
 | |
|     return success('All commands were processed by PROCESS_FILE')
 | |
| 
 | |
| 
 | |
| def test_custom_host_notifications(hostname):
 | |
|     check_time = lambda a, b: -1 < (a - b) < 1
 | |
| 
 | |
|     send_command('DISABLE_HOST_NOTIFICATIONS;{0}'.format(hostname))
 | |
|     sent_at = time.time()
 | |
|     send_command('SEND_CUSTOM_HOST_NOTIFICATION;{0};0;{1};test'
 | |
|                  ''.format(hostname, USERNAME))
 | |
|     last_notification = get_one('GET hosts\nFilter: name = {0}'
 | |
|                                 '\nColumns: last_notification'
 | |
|                                 ''.format(hostname))['last_notification']
 | |
|     if check_time(last_notification, sent_at):
 | |
|         send_command('ENABLE_HOST_NOTIFICATIONS;{0}'.format(hostname))
 | |
|         return fail('Sent a custom host notification for host "{0}" while'
 | |
|                     ' notifications were disabled'.format(hostname))
 | |
|     success('It is not possible to send custom host notifications for host'
 | |
|             ' "{0}" while notifications are disabled'.format(hostname))
 | |
|     sent_at = time.time()
 | |
|     send_command('SEND_CUSTOM_HOST_NOTIFICATION;{0};2;{1};test'
 | |
|                  ''.format(hostname, USERNAME))
 | |
|     last_notification = get_one('GET hosts\nFilter: name = {0}'
 | |
|                                 '\nColumns: last_notification'
 | |
|                                 ''.format(hostname))['last_notification']
 | |
|     if not check_time(last_notification, sent_at):
 | |
|         send_command('ENABLE_HOST_NOTIFICATIONS;{0}'.format(hostname))
 | |
|         return fail('Could not send a forced custom notification for host "{0}"'
 | |
|                     ' while notifications were disabled'.format(hostname))
 | |
|     success('Successfully sent a forced custom notification for host '
 | |
|             '"{0}" while notifications were disabled'.format(hostname))
 | |
|     send_command('ENABLE_HOST_NOTIFICATIONS;{0}'.format(hostname))
 | |
|     sent_at = time.time()
 | |
|     send_command('SEND_CUSTOM_HOST_NOTIFICATION;{0};0;{1};test'
 | |
|                  ''.format(hostname, USERNAME))
 | |
|     last_notification = get_one('GET hosts\nFilter: name = {0}'
 | |
|                                 '\nColumns: last_notification'
 | |
|                                 ''.format(hostname))['last_notification']
 | |
|     if not check_time(last_notification, sent_at):
 | |
|         return fail('Could not send a custom host notification for host "{0}"'
 | |
|                     ' after notifications were re-enabled'.format(hostname))
 | |
|     return success('Successfully sent a a custom host notification for host "{0}"'
 | |
|                    ' after notifications were re-enabled'.format(hostname))
 | |
| 
 | |
| 
 | |
| def test_custom_svc_notifications(hostname, servicename):
 | |
|     check_time = lambda a, b: -1 < (a - b) < 1
 | |
| 
 | |
|     send_command('DISABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename))
 | |
|     sent_at = time.time()
 | |
|     send_command('SEND_CUSTOM_SVC_NOTIFICATION;{0};{1};0;{2};test'
 | |
|                  ''.format(hostname, servicename, USERNAME))
 | |
|     last_notification = get_one('GET services\nFilter: host_name = {0}'
 | |
|                                 '\nFilter: description = {1}'
 | |
|                                 '\nColumns: last_notification'
 | |
|                                 ''.format(hostname, servicename))['last_notification']
 | |
|     if check_time(last_notification, sent_at):
 | |
|         send_command('ENABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('Sent a custom service notification for service "{0}" on host "{1}"'
 | |
|                     ' while notifications were disabled'.format(servicename, hostname))
 | |
|     success('It is not possible to send custom service notifications for service "{0}" on'
 | |
|             ' host "{1}" while notifications are disabled'.format(servicename, hostname))
 | |
|     sent_at = time.time()
 | |
|     send_command('SEND_CUSTOM_SVC_NOTIFICATION;{0};{1};2;{2};test'
 | |
|                  ''.format(hostname, servicename, USERNAME))
 | |
|     last_notification = get_one('GET services\nFilter: host_name = {0}'
 | |
|                                 '\nFilter: description = {1}'
 | |
|                                 '\nColumns: last_notification'
 | |
|                                 ''.format(hostname, servicename))['last_notification']
 | |
|     if not check_time(last_notification, sent_at):
 | |
|         send_command('ENABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename))
 | |
|         return fail('Could not send a forced custom notification for service "{0}" on host '
 | |
|                     '"{1}" while notifications were disabled'.format(servicename, hostname))
 | |
|     success('Successfully sent a forced custom notification for service "{0}" on host'
 | |
|             ' "{0}" while notifications were disabled'.format(servicename, hostname))
 | |
|     send_command('ENABLE_SVC_NOTIFICATIONS;{0};{1}'.format(hostname, servicename))
 | |
|     sent_at = time.time()
 | |
|     send_command('SEND_CUSTOM_SVC_NOTIFICATION;{0};{1};0;{2};test'
 | |
|                  ''.format(hostname, servicename, USERNAME))
 | |
|     last_notification = get_one('GET services\nFilter: host_name = {0}'
 | |
|                                 '\nFilter: description = {1}'
 | |
|                                 '\nColumns: last_notification'
 | |
|                                 ''.format(hostname, servicename))['last_notification']
 | |
|     if not check_time(last_notification, sent_at):
 | |
|         return fail('Could not send a custom service notification for service "{0}" on host '
 | |
|                     '"{1}" after notifications were re-enabled'.format(servicename, hostname))
 | |
|     return success('Successfully sent a custom service notification for service "{0}" on host'
 | |
|                    ' "{1}" after notifications were re-enabled'.format(servicename, hostname))
 | |
| 
 | |
| 
 | |
| def test_hostgroup_commands():
 | |
|     # This test requires the "files/configs/groups.conf" configuration file
 | |
|     failure = False
 | |
|     send_command('CHANGE_HOST_MODATTR;localhost;0')
 | |
|     localhost_services = [d['description']
 | |
|                           for d in send_query('GET services\nColumns: description'
 | |
|                                               '\nFilter: host_name = localhost')]
 | |
|     for svc_desc in localhost_services:
 | |
|         send_command('CHANGE_SVC_MODATTR;localhost;{0};0'.format(svc_desc))
 | |
| 
 | |
|     send_command('DISABLE_HOSTGROUP_HOST_CHECKS;linux-servers')
 | |
|     send_command('DISABLE_HOSTGROUP_PASSIVE_HOST_CHECKS;linux-servers')
 | |
|     send_command('DISABLE_HOSTGROUP_HOST_NOTIFICATIONS;linux-servers')
 | |
|     host_info = get_one('GET hosts\nFilter: name = localhost')
 | |
|     if host_info['active_checks_enabled'] != 0:
 | |
|         fail('Could not disable active checks for host "{0}" that is part'
 | |
|              ' of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|         failure = True
 | |
|     else:
 | |
|         success('Successfully disabled active checks for host "{0}" that is '
 | |
|                 'part of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|     if host_info['accept_passive_checks'] != 0:
 | |
|         fail('Could not disable passive checks for host "{0}" that is part'
 | |
|              ' of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|         failure = True
 | |
|     else:
 | |
|         success('Successfully disabled passive checks for host "{0}" that is '
 | |
|                 'part of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|     if host_info['notifications_enabled'] != 0:
 | |
|         fail('Could not disable notifications for host "{0}" that is part'
 | |
|              ' of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|         failure = True
 | |
|     else:
 | |
|         success('Successfully disabled notifications for host "{0}" that is '
 | |
|                 'part of hostgroup "linux-servers"'.format(host_info['name']))
 | |
| 
 | |
|     send_command('ENABLE_HOSTGROUP_HOST_CHECKS;linux-servers')
 | |
|     send_command('ENABLE_HOSTGROUP_PASSIVE_HOST_CHECKS;linux-servers')
 | |
|     send_command('ENABLE_HOSTGROUP_HOST_NOTIFICATIONS;linux-servers')
 | |
|     host_info = get_one('GET hosts\nFilter: name = localhost')
 | |
|     if host_info['active_checks_enabled'] != 1:
 | |
|         fail('Could not enable active checks for host "{0}" that is part'
 | |
|              ' of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|         failure = True
 | |
|     else:
 | |
|         success('Successfully enabled active checks for host "{0}" that is '
 | |
|                 'part of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|     if host_info['accept_passive_checks'] != 1:
 | |
|         fail('Could not enable passive checks for host "{0}" that is part'
 | |
|              ' of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|         failure = True
 | |
|     else:
 | |
|         success('Successfully enabled passive checks for host "{0}" that is '
 | |
|                 'part of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|     if host_info['notifications_enabled'] != 1:
 | |
|         fail('Could not enable notifications for host "{0}" that is part'
 | |
|              ' of hostgroup "linux-servers"'.format(host_info['name']))
 | |
|         failure = True
 | |
|     else:
 | |
|         success('Successfully enabled notifications for host "{0}" that is '
 | |
|                 'part of hostgroup "linux-servers"'.format(host_info['name']))
 | |
| 
 | |
|     send_command('DISABLE_HOSTGROUP_SVC_CHECKS;linux-servers')
 | |
|     send_command('DISABLE_HOSTGROUP_PASSIVE_SVC_CHECKS;linux-servers')
 | |
|     send_command('DISABLE_HOSTGROUP_SVC_NOTIFICATIONS;linux-servers')
 | |
|     for svc_info in send_query('GET services\nFilter: host_name = localhost'):
 | |
|         if svc_info['active_checks_enabled'] != 0:
 | |
|             fail('Could not disable active checks for service "{0}" on '
 | |
|                  'host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                  ''.format(svc_info['description'], svc_info['host_name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled active checks for service "{0}" '
 | |
|                     'on host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                     ''.format(svc_info['description'], svc_info['host_name']))
 | |
|         if svc_info['accept_passive_checks'] != 0:
 | |
|             fail('Could not disable passive checks for service "{0}" on '
 | |
|                  'host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                  ''.format(svc_info['description'], svc_info['host_name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled passive checks for service "{0}" '
 | |
|                     'on host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                     ''.format(svc_info['description'], svc_info['host_name']))
 | |
|         if svc_info['notifications_enabled'] != 0:
 | |
|             fail('Could not disable notifications for service "{0}" on '
 | |
|                  'host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                  ''.format(svc_info['description'], svc_info['host_name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled notifications for service "{0}" on'
 | |
|                     ' host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                     ''.format(svc_info['description'], svc_info['host_name']))
 | |
| 
 | |
|     send_command('ENABLE_HOSTGROUP_SVC_CHECKS;linux-servers')
 | |
|     send_command('ENABLE_HOSTGROUP_PASSIVE_SVC_CHECKS;linux-servers')
 | |
|     send_command('ENABLE_HOSTGROUP_SVC_NOTIFICATIONS;linux-servers')
 | |
|     for svc_info in send_query('GET services\nFilter: host_name = localhost'):
 | |
|         if svc_info['active_checks_enabled'] != 1:
 | |
|             fail('Could not enable active checks for service "{0}" on '
 | |
|                  'host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                  ''.format(svc_info['description'], svc_info['host_name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled active checks for service "{0}" '
 | |
|                     'on host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                     ''.format(svc_info['description'], svc_info['host_name']))
 | |
|         if svc_info['accept_passive_checks'] != 1:
 | |
|             fail('Could not enable passive checks for service "{0}" on '
 | |
|                  'host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                  ''.format(svc_info['description'], svc_info['host_name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled passive checks for service "{0}" '
 | |
|                     'on host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                     ''.format(svc_info['description'], svc_info['host_name']))
 | |
|         if svc_info['notifications_enabled'] != 1:
 | |
|             fail('Could not enable notifications for service "{0}" on '
 | |
|                  'host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                  ''.format(svc_info['description'], svc_info['host_name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled notifications for service "{0}" on'
 | |
|                     ' host "{1}" that is part of hostgroup "linux-servers"'
 | |
|                     ''.format(svc_info['description'], svc_info['host_name']))
 | |
| 
 | |
|     return failure
 | |
| 
 | |
| 
 | |
| def test_servicegroup_commands():
 | |
|     # This test requires the "files/configs/groups.conf" configuration file
 | |
|     failure = False
 | |
|     send_command('CHANGE_HOST_MODATTR;localhost;0')
 | |
|     aservicegroup_services = send_query('GET services\nColumns: description host_name'
 | |
|                                         '\nFilter: groups >= aservicegroup')
 | |
|     for svc_info in aservicegroup_services:
 | |
|         send_command('CHANGE_SVC_MODATTR;{0};{1};0'.format(svc_info['host_name'],
 | |
|                                                            svc_info['description']))
 | |
| 
 | |
|     send_command('DISABLE_SERVICEGROUP_HOST_CHECKS;aservicegroup')
 | |
|     send_command('DISABLE_SERVICEGROUP_HOST_NOTIFICATIONS;aservicegroup')
 | |
|     send_command('DISABLE_SERVICEGROUP_PASSIVE_HOST_CHECKS;aservicegroup')
 | |
|     aservicegroup_hosts = send_query('GET hosts\nFilter: name = localhost')
 | |
|     for host_info in aservicegroup_hosts:
 | |
|         if host_info['active_checks_enabled'] != 0:
 | |
|             fail('Could not disable active checks for host "{0}" which has'
 | |
|                  ' services that are part of servicegroup "aservicegroup"'
 | |
|                  ''.format(host_info['name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled active checks for host "{0}" which '
 | |
|                     'has services that are part of servicegroup "aservicegroup"'
 | |
|                     ''.format(host_info['name']))
 | |
|         if host_info['accept_passive_checks'] != 0:
 | |
|             fail('Could not disable passive checks for host "{0}" which has'
 | |
|                  ' services that are part of servicegroup "aservicegroup"'
 | |
|                  ''.format(host_info['name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled passive checks for host "{0}" which '
 | |
|                     'has services that are part of servicegroup "aservicegroup"'
 | |
|                     ''.format(host_info['name']))
 | |
|         if host_info['notifications_enabled'] != 0:
 | |
|             fail('Could not disable notifications for host "{0}" which has'
 | |
|                  ' services that are part of servicegroup "aservicegroup"'
 | |
|                  ''.format(host_info['name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled notifications for host "{0}" which '
 | |
|                     'has services that are part of servicegroup "aservicegroup"'
 | |
|                     ''.format(host_info['name']))
 | |
| 
 | |
|     send_command('ENABLE_SERVICEGROUP_HOST_CHECKS;aservicegroup')
 | |
|     send_command('ENABLE_SERVICEGROUP_HOST_NOTIFICATIONS;aservicegroup')
 | |
|     send_command('ENABLE_SERVICEGROUP_PASSIVE_HOST_CHECKS;aservicegroup')
 | |
|     aservicegroup_hosts = send_query('GET hosts\nFilter: name = localhost')
 | |
|     for host_info in aservicegroup_hosts:
 | |
|         if host_info['active_checks_enabled'] != 1:
 | |
|             fail('Could not enable active checks for host "{0}" which has'
 | |
|                  ' services that are part of servicegroup "aservicegroup"'
 | |
|                  ''.format(host_info['name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled active checks for host "{0}" which '
 | |
|                     'has services that are part of servicegroup "aservicegroup"'
 | |
|                     ''.format(host_info['name']))
 | |
|         if host_info['accept_passive_checks'] != 1:
 | |
|             fail('Could not enable passive checks for host "{0}" which has'
 | |
|                  ' services that are part of servicegroup "aservicegroup"'
 | |
|                  ''.format(host_info['name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled passive checks for host "{0}" which '
 | |
|                     'has services that are part of servicegroup "aservicegroup"'
 | |
|                     ''.format(host_info['name']))
 | |
|         if host_info['notifications_enabled'] != 1:
 | |
|             fail('Could not enable notifications for host "{0}" which has'
 | |
|                  ' services that are part of servicegroup "aservicegroup"'
 | |
|                  ''.format(host_info['name']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled notifications for host "{0}" which '
 | |
|                     'has services that are part of servicegroup "aservicegroup"'
 | |
|                     ''.format(host_info['name']))
 | |
| 
 | |
|     send_command('DISABLE_SERVICEGROUP_SVC_CHECKS;aservicegroup')
 | |
|     send_command('DISABLE_SERVICEGROUP_SVC_NOTIFICATIONS;aservicegroup')
 | |
|     send_command('DISABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS;aservicegroup')
 | |
|     aservicegroup_services = send_query('GET services\nFilter: groups >= aservicegroup')
 | |
|     for svc_info in aservicegroup_services:
 | |
|         if svc_info['active_checks_enabled'] != 0:
 | |
|             fail('Could not disable active checks for service "{0}"'
 | |
|                  ' that is part of servicegroup "aservicegroup"'
 | |
|                  ''.format(svc_info['description']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled active checks for service '
 | |
|                     '"{0}" that is part of servicegroup "aservicegroup"'
 | |
|                     ''.format(svc_info['description']))
 | |
|         if svc_info['accept_passive_checks'] != 0:
 | |
|             fail('Could not disable passive checks for service "{0}"'
 | |
|                  ' that is part of servicegroup "aservicegroup"'
 | |
|                  ''.format(svc_info['description']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled passive checks for service '
 | |
|                     '"{0}" that is part of servicegroup "aservicegroup"'
 | |
|                     ''.format(svc_info['description']))
 | |
|         if svc_info['notifications_enabled'] != 0:
 | |
|             fail('Could not disable notifications for service "{0}"'
 | |
|                  ' that is part of servicegroup "aservicegroup"'
 | |
|                  ''.format(svc_info['description']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully disabled notifications for service '
 | |
|                     '"{0}" that is part of servicegroup "aservicegroup"'
 | |
|                     ''.format(svc_info['description']))
 | |
| 
 | |
|     send_command('ENABLE_SERVICEGROUP_SVC_CHECKS;aservicegroup')
 | |
|     send_command('ENABLE_SERVICEGROUP_SVC_NOTIFICATIONS;aservicegroup')
 | |
|     send_command('ENABLE_SERVICEGROUP_PASSIVE_SVC_CHECKS;aservicegroup')
 | |
|     aservicegroup_services = send_query('GET services\nFilter: groups >= aservicegroup')
 | |
|     for svc_info in aservicegroup_services:
 | |
|         if svc_info['active_checks_enabled'] != 1:
 | |
|             fail('Could not enable active checks for service "{0}"'
 | |
|                  ' that is part of servicegroup "aservicegroup"'
 | |
|                  ''.format(svc_info['description']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled active checks for service '
 | |
|                     '"{0}" that is part of servicegroup "aservicegroup"'
 | |
|                     ''.format(svc_info['description']))
 | |
|         if svc_info['accept_passive_checks'] != 1:
 | |
|             fail('Could not enable passive checks for service "{0}"'
 | |
|                  ' that is part of servicegroup "aservicegroup"'
 | |
|                  ''.format(svc_info['description']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled passive checks for service '
 | |
|                     '"{0}" that is part of servicegroup "aservicegroup"'
 | |
|                     ''.format(svc_info['description']))
 | |
|         if svc_info['notifications_enabled'] != 1:
 | |
|             fail('Could not enable notifications for service "{0}"'
 | |
|                  ' that is part of servicegroup "aservicegroup"'
 | |
|                  ''.format(svc_info['description']))
 | |
|             failure = True
 | |
|         else:
 | |
|             success('Successfully enabled notifications for service '
 | |
|                     '"{0}" that is part of servicegroup "aservicegroup"'
 | |
|                     ''.format(svc_info['description']))
 | |
| 
 | |
|     return failure
 | |
| 
 | |
| 
 | |
| def main():
 | |
|     failure = test_host_comments('localhost')
 | |
|     #failure |= test_host_comments('nsca-ng') # Cannot work without a hostcheck!
 | |
| 
 | |
|     failure |= test_service_comments('localhost', 'disk')
 | |
|     failure |= test_service_comments('nsca-ng', 'PassiveService1')
 | |
| 
 | |
|     failure |= test_host_downtimes('localhost')
 | |
|     #failure |= test_host_downtimes('nsca-ng')
 | |
| 
 | |
|     failure |= test_service_downtimes('localhost', 'disk')
 | |
|     failure |= test_service_downtimes('nsca-ng', 'PassiveService1')
 | |
| 
 | |
|     failure |= test_host_problem_acknowledgements('localhost')
 | |
|     failure |= test_remove_host_acknowledgements('localhost')
 | |
|     failure |= test_expiring_host_acknowledgements('localhost')
 | |
| 
 | |
|     failure |= test_change_host_check_command('localhost', 'disk')
 | |
|     failure |= test_change_service_check_command('localhost', 'disk', 'ping4')
 | |
|     failure |= test_change_host_check_timeperiod('localhost', 'none')
 | |
|     failure |= test_change_service_check_timeperiod('localhost', 'disk', 'none')
 | |
|     failure |= test_change_host_max_check_attempts('localhost')
 | |
|     failure |= test_change_service_max_check_attempts('localhost', 'disk')
 | |
|     failure |= test_change_host_normal_check_interval('localhost')
 | |
|     failure |= test_change_service_normal_check_interval('localhost', 'disk')
 | |
|     failure |= test_change_host_retry_check_interval('localhost')
 | |
|     failure |= test_change_service_retry_check_interval('localhost', 'disk')
 | |
|     failure |= test_change_host_modified_attributes('localhost')
 | |
| 
 | |
|     failure |= test_delay_service_notification('localhost', 'disk')
 | |
|     failure |= test_delay_host_notification('localhost')
 | |
| 
 | |
|     failure |= test_disabling_scheduling_host_checks('localhost')
 | |
|     failure |= test_disabling_scheduling_service_checks('localhost', 'disk')
 | |
| 
 | |
|     failure |= test_shutdown_restart_process()
 | |
| 
 | |
|     failure |= test_passive_host_checkresult_processing('localhost')
 | |
|     failure |= test_passive_svc_checkresult_processing('localhost', 'disk')
 | |
| 
 | |
|     failure |= test_process_file('localhost')
 | |
| 
 | |
|     failure |= test_custom_host_notifications('localhost')
 | |
|     failure |= test_custom_svc_notifications('localhost', 'disk')
 | |
| 
 | |
|     failure |= test_hostgroup_commands()
 | |
|     failure |= test_servicegroup_commands()
 | |
| 
 | |
|     return 1 if failure else 0
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     try:
 | |
|         with utils.LiveStatusSocket(LIVESTATUS_PATH) as LIVESTATUS:
 | |
|             sys.exit(main())
 | |
|     except (OSError, IOError, socket.error), e:
 | |
|         error('Could not connect to Livestatus socket: {0} ({1})'
 | |
|               ''.format(LIVESTATUS_PATH, unicode(e)))
 | |
| 
 |