icinga2/test/jenkins/eventhandler.test

278 lines
9.7 KiB
Python
Executable File

#!/usr/bin/env python
from __future__ import unicode_literals
import os
import sys
import time
import socket
import utils
LIVESTATUS_PATH = '/var/run/icinga2/cmd/livestatus'
LS_HOST_COLUMNS = [
'name',
'name',
'display_name',
'display_name',
None,
'state',
'state_type',
'current_attempt',
'max_check_attempts',
None,
'last_state',
None,
'last_state_change',
None,
'latency',
'execution_time',
'plugin_output',
None,
'last_check',
'address',
'address6'
]
LS_SVC_COLUMNS = [
'description',
'display_name',
'display_name',
None,
'state',
'state_type',
'current_attempt',
'max_check_attempts',
None,
'last_state',
None,
'last_state_change',
None,
'latency',
'execution_time',
'plugin_output',
'perf_data',
'last_check',
'host_num_services',
'host_num_services_ok',
'host_num_services_warn',
'host_num_services_unknown',
'host_num_services_crit'
]
STATE_MAP = {
'SOFT': 0,
'HARD': 1
}
def send_command(command):
try:
return send_query('COMMAND [{0}] {1}'.format(int(time.time()), command))
except utils.LiveStatusError, error:
sys.stderr.write('Failed to execute command: {0}\n\n{1}\n'
''.format(command, error))
def send_query(query):
response = LIVESTATUS.query(query + '\nColumnHeaders: on')
if response:
header, result = response.pop(0), {}
return [dict((header[i], v) for i, v in enumerate(r)) for r in response]
return []
def get_one(query):
return next(iter(send_query(query)), {})
def get_event_output():
try:
with open('/tmp/test_event.out') as f:
remove = True
return f.read().rstrip().split('|')
except (IOError, OSError):
remove = False
finally:
if remove:
os.system('sudo rm -f /tmp/test_event.out')
def convert_output(value):
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return STATE_MAP.get(value, value)
def validate_time_format(inputstr, formatstr):
try:
time.strptime(inputstr, formatstr)
except ValueError:
return False
else:
return True
def main():
send_command('CHANGE_HOST_EVENT_HANDLER;localhost;test_event')
host_info = get_one('GET hosts\nFilter: name = localhost'
'\nColumns: event_handler')
if host_info.get('event_handler') != 'test_event':
utils.Logger.fail('Could not assign eventcommand "test_event"'
' to host "localhost"\n')
return 1
utils.Logger.ok('Successfully assigned an eventcommand'
' to host "localhost"\n')
send_command('PROCESS_HOST_CHECK_RESULT;localhost;1;A negative result to'
' trigger an eventhandler|some interesting perfdata!')
event_output = get_event_output()
if not event_output:
send_command('CHANGE_HOST_EVENT_HANDLER;localhost;')
utils.Logger.fail('Could not trigger the eventcommand\n')
return 1
utils.Logger.ok('Successfully triggered the eventcommand\n')
failure = False
utils.Logger.info('Checking host macros...\n')
host_info = get_one('GET hosts\nFilter: name = localhost\nColumns: {0}'
''.format(' '.join(c for c in LS_HOST_COLUMNS if c)))
if event_output[0] != host_info['name']*2:
failure = True
utils.Logger.fail('Escaping environment variables '
'seems not to properly working\n')
utils.Logger.fail(' Expected: {0!r} Got: {1!r}\n'
''.format(host_info['name']*2, event_output[0]))
else:
utils.Logger.ok('Escaped environment variables'
' are properly processed\n')
for i, column in enumerate(LS_HOST_COLUMNS[1:], 1):
if column is not None:
macro_name, _, macro_value = event_output[i].partition('=')
output_value = convert_output(macro_value)
if output_value != host_info[column]:
failure = True
utils.Logger.fail('Macro "{0}" returns an incorrect value. '
'Expected "{2}" but got "{1}"\n'
''.format(macro_name, output_value,
host_info[column]))
else:
utils.Logger.ok('Macro "{0}" returns the correct value\n'
''.format(macro_name))
utils.Logger.info('Checking service macros...\n')
svc_info = get_one('GET services\nFilter: description = ping4\nColumns: {0}'
''.format(' '.join(c for c in LS_SVC_COLUMNS if c)))
for i, column in enumerate(LS_SVC_COLUMNS, len(LS_HOST_COLUMNS)):
if column is not None:
macro_name, _, macro_value = event_output[i].partition('=')
output_value = convert_output(macro_value)
if output_value != svc_info[column]:
failure = True
utils.Logger.fail('Macro "{0}" returns an incorrect value. '
'Expected "{2}" but got "{1}"\n'
''.format(macro_name, output_value,
svc_info[column]))
else:
utils.Logger.ok('Macro "{0}" returns the correct value\n'
''.format(macro_name))
utils.Logger.info('Checking global macros...\n')
timet = convert_output(event_output[-6].partition('=')[2])
if not isinstance(timet, int):
failure = True
utils.Logger.fail('Macro "TIMET" does not return a timestamp. '
'Expected int but got: {0!r}\n'.format(timet))
else:
utils.Logger.ok('Macro "TIMET" returns the correct value\n')
longdatetime = event_output[-5].partition('=')[2]
longdatetime_format = '%Y-%m-%d %H:%M:%S +0000'
if not validate_time_format(longdatetime, longdatetime_format):
failure = True
utils.Logger.fail('Macro "LONGDATETIME" returns an incorrect value.'
' Expected value of format "{0}" but got "{1}"\n'
''.format(longdatetime_format, longdatetime))
else:
utils.Logger.ok('Macro "LONGDATETIME" returns the correct value\n')
shortdatetime = event_output[-4].partition('=')[2]
shortdatetime_format = '%Y-%m-%d %H:%M:%S'
if not validate_time_format(shortdatetime, shortdatetime_format):
failure = True
utils.Logger.fail('Macro "SHORTDATETIME" returns an incorrect value.'
' Expected value of format "{0}" but got "{1}"\n'
''.format(shortdatetime_format, shortdatetime))
else:
utils.Logger.ok('Macro "SHORTDATETIME" returns the correct value\n')
m_date = event_output[-3].partition('=')[2]
m_date_format = '%Y-%m-%d'
if not validate_time_format(m_date, m_date_format):
failure = True
utils.Logger.fail('Macro "DATE" returns an incorrect value. '
'Expected value of format "{0}" but got "{1}"\n'
''.format(m_date_format, m_date))
else:
utils.Logger.ok('Macro "DATE" returns the correct value\n')
m_time = event_output[-2].partition('=')[2]
m_time_format = '%H:%M:%S +0000'
if not validate_time_format(m_time, m_time_format):
failure = True
utils.Logger.fail('Macro "TIME" returns an incorrect value. '
'Expected value of format "{0}" but got "{1}"\n'
''.format(m_time_format, m_time))
else:
utils.Logger.ok('Macro "TIME" returns the correct value\n')
utils.Logger.info('Checking command macros...\n')
if convert_output(event_output[-1].partition('=')[2]) != 1337:
failure = True
utils.Logger.fail('The command macro "custom_macro"'
' is not being substituted\n')
else:
utils.Logger.ok('The command macro "custom_macro"'
' is correctly substituted\n')
send_command('DISABLE_HOST_EVENT_HANDLER;localhost')
send_command('PROCESS_HOST_CHECK_RESULT;localhost;0;A positive result that'
' should not trigger an eventhandler')
if get_event_output():
failure = True
utils.Logger.fail('Could not disable the eventcommand\n')
else:
utils.Logger.ok('Successfully disabled the eventcommand\n')
send_command('ENABLE_HOST_EVENT_HANDLER;localhost')
host_info = get_one('GET hosts\nFilter: name = localhost'
'\nColumns: event_handler_enabled')
if host_info['event_handler_enabled'] != 1:
failure = True
utils.Logger.fail('Could not re-enable the eventcommand\n')
else:
utils.Logger.ok('Successfully re-enabled the eventcommand\n')
send_command('CHANGE_HOST_EVENT_HANDLER;localhost;')
host_info = get_one('GET hosts\nFilter: name = localhost'
'\nColumns: event_handler')
if host_info['event_handler']:
failure = True
utils.Logger.fail('Could not remove eventcommand "test_event"'
' assigned to host "localhost"\n')
else:
utils.Logger.ok('Successfully removed the eventcommand'
' assigned to host "localhost"\n')
return 1 if failure else 0
if __name__ == '__main__':
try:
with utils.LiveStatusSocket(LIVESTATUS_PATH) as LIVESTATUS:
sys.exit(main())
except (OSError, IOError, socket.error), e:
utils.Logger.error('Could not connect to Livestatus socket: {0} ({1})'
'\n'.format(LIVESTATUS_PATH, unicode(e)))