mirror of https://github.com/Icinga/icinga2.git
278 lines
9.8 KiB
Python
Executable File
278 lines
9.8 KiB
Python
Executable File
#!/usr/bin/env python
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
import os
|
|
import re
|
|
import sys
|
|
import json
|
|
import glob
|
|
import subprocess
|
|
from optparse import OptionParser
|
|
from xml.dom.minidom import getDOMImplementation
|
|
|
|
|
|
try:
|
|
from subprocess import DEVNULL
|
|
except ImportError:
|
|
DEVNULL = open(os.devnull, 'w')
|
|
|
|
|
|
class Logger(object):
|
|
INFO = 1
|
|
OK = 2
|
|
FAIL = 3
|
|
ERROR = 4
|
|
DEBUG_STD = 5
|
|
DEBUG_EXT = 6
|
|
|
|
VERBOSITY = 0
|
|
OUTPUT_LENGTH = 1024
|
|
|
|
@staticmethod
|
|
def write(text, stderr=False):
|
|
if stderr:
|
|
sys.stderr.write(text.encode('utf-8'))
|
|
sys.stderr.flush()
|
|
else:
|
|
sys.stdout.write(text.encode('utf-8'))
|
|
sys.stdout.flush()
|
|
|
|
@classmethod
|
|
def set_verbosity(cls, verbosity):
|
|
cls.VERBOSITY = verbosity
|
|
|
|
@classmethod
|
|
def log(cls, severity, text):
|
|
if severity == cls.INFO and cls.VERBOSITY >= 1:
|
|
cls.write('\033[1;94m[INFO]\033[1;0m {0}'.format(text))
|
|
elif severity == cls.ERROR and cls.VERBOSITY >= 1:
|
|
cls.write('\033[1;33m[ERROR]\033[1;0m {0}'.format(text), True)
|
|
elif severity == cls.FAIL and cls.VERBOSITY >= 1:
|
|
cls.write('\033[1;31m[FAIL] {0}\033[1;0m'.format(text))
|
|
elif severity == cls.OK and cls.VERBOSITY >= 1:
|
|
cls.write('\033[1;32m[OK]\033[1;0m {0}'.format(text))
|
|
elif severity == cls.DEBUG_STD and cls.VERBOSITY >= 2:
|
|
cls.write('\033[1;90m[DEBUG]\033[1;0m {0}'.format(text))
|
|
elif severity == cls.DEBUG_EXT and cls.VERBOSITY >= 3:
|
|
if cls.VERBOSITY < 4 and len(text) > cls.OUTPUT_LENGTH:
|
|
suffix = '... (Truncated to {0} bytes)\n' \
|
|
''.format(cls.OUTPUT_LENGTH)
|
|
text = text[:cls.OUTPUT_LENGTH] + suffix
|
|
cls.write('\033[1;90m[DEBUG]\033[1;0m {0}'.format(text))
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
@classmethod
|
|
def info(cls, text):
|
|
return cls.log(cls.INFO, text)
|
|
|
|
@classmethod
|
|
def error(cls, text):
|
|
return cls.log(cls.ERROR, text)
|
|
|
|
@classmethod
|
|
def fail(cls, text):
|
|
return cls.log(cls.FAIL, text)
|
|
|
|
@classmethod
|
|
def ok(cls, text):
|
|
return cls.log(cls.OK, text)
|
|
|
|
@classmethod
|
|
def debug(cls, text, extended=False):
|
|
return cls.log(cls.DEBUG_EXT if extended else cls.DEBUG_STD, text)
|
|
|
|
|
|
class TestSuite(object):
|
|
def __init__(self, configpath):
|
|
self._tests = []
|
|
self._results = {}
|
|
|
|
self.load_config(configpath)
|
|
|
|
def add_test(self, filepath):
|
|
self._tests.append(filepath)
|
|
|
|
def load_config(self, filepath):
|
|
with open(filepath) as f:
|
|
self._config = json.load(f)
|
|
|
|
def get_report(self):
|
|
dom = getDOMImplementation()
|
|
document = dom.createDocument(None, 'testsuite', None)
|
|
xml_root = document.documentElement
|
|
|
|
for name, info in self._results.iteritems():
|
|
testresult = document.createElement('testcase')
|
|
testresult.setAttribute('classname', 'vm')
|
|
testresult.setAttribute('name', name)
|
|
|
|
totaltests = document.createElement('tests')
|
|
totaltests.appendChild(document.createTextNode(str(info['total'])))
|
|
testresult.appendChild(totaltests)
|
|
|
|
failedtests = document.createElement('failures')
|
|
failedtests.appendChild(document.createTextNode(str(info['failures'])))
|
|
testresult.appendChild(failedtests)
|
|
|
|
systemout = document.createElement('system-out')
|
|
systemout.appendChild(document.createTextNode(info['stdout']))
|
|
testresult.appendChild(systemout)
|
|
|
|
systemerr = document.createElement('system-err')
|
|
systemerr.appendChild(document.createTextNode(info['stderr']))
|
|
testresult.appendChild(systemerr)
|
|
|
|
if info['returncode'] != 0:
|
|
failure = document.createElement('failure')
|
|
failure.setAttribute('type', 'returncode')
|
|
failure.appendChild(document.createTextNode(
|
|
'code: {0}'.format(info['returncode'])))
|
|
testresult.appendChild(failure)
|
|
|
|
xml_root.appendChild(testresult)
|
|
|
|
return document.toxml()
|
|
|
|
def run(self):
|
|
for path in self._tests:
|
|
test_name = os.path.basename(path)
|
|
Logger.debug('Copying test "{0}" to remote machine\n'.format(test_name))
|
|
self._copy_test(path)
|
|
self._apply_setup_routines(test_name, 'setup')
|
|
note_printed = Logger.info('Running test "{0}"...\n'.format(test_name))
|
|
result = self._run_test(path)
|
|
Logger.info('Test "{0}" has finished (Total tests: {1}, Failures: {2})\n'
|
|
''.format(test_name, result['total'], result['failures']))
|
|
self._apply_setup_routines(test_name, 'teardown')
|
|
Logger.debug('Removing test "{0}" from remote machine\n'.format(test_name))
|
|
self._remove_test(test_name)
|
|
self._results[test_name] = result
|
|
if note_printed:
|
|
Logger.write('\n')
|
|
|
|
def _apply_setup_routines(self, test_name, context):
|
|
instructions = next((t[1].get(context)
|
|
for t in self._config.get('setups', {}).iteritems()
|
|
if re.match(t[0], test_name)), None)
|
|
if instructions is not None:
|
|
note_printed = Logger.info('Applying {0} routines for test "{1}" .. '
|
|
''.format(context, test_name))
|
|
for instruction in instructions.get('copy', []):
|
|
source, _, destination = instruction.partition('>>')
|
|
self._copy_file(source.strip(), destination.strip())
|
|
for filepath in instructions.get('clean', []):
|
|
self._remove_file(filepath)
|
|
for command in instructions.get('exec', []):
|
|
self._exec_command(command)
|
|
if note_printed:
|
|
Logger.write('Done\n')
|
|
|
|
def _remove_file(self, path):
|
|
command = self._config['commands']['clean'].format(path)
|
|
rc = subprocess.call(command, stdout=DEVNULL, shell=True)
|
|
if rc != 0:
|
|
Logger.error('Cannot remove file "{0}" ({1})\n'.format(path, rc))
|
|
|
|
def _exec_command(self, command):
|
|
command = self._config['commands']['exec'].format(command)
|
|
rc = subprocess.call(command, stdout=DEVNULL, shell=True)
|
|
if rc != 0:
|
|
Logger.error('Command "{0}" exited with exit code "{1}"\n' \
|
|
''.format(command, rc))
|
|
|
|
def _copy_file(self, source, destination):
|
|
command = self._config['commands']['copy'].format(source, destination)
|
|
rc = subprocess.call(command, stdout=DEVNULL, shell=True)
|
|
if rc != 0:
|
|
Logger.error('Cannot copy file "{0}" to "{1}" ({2})\n' \
|
|
''.format(source, destination, rc))
|
|
|
|
def _copy_test(self, path):
|
|
self._copy_file(path, os.path.join(self._config['settings']['test_root'],
|
|
os.path.basename(path)))
|
|
|
|
def _remove_test(self, test_name):
|
|
test_root = self._config['settings']['test_root']
|
|
self._remove_file(os.path.join(test_root, test_name))
|
|
|
|
def _run_test(self, path):
|
|
command = self._config['commands']['exec']
|
|
target = os.path.join(self._config['settings']['test_root'],
|
|
os.path.basename(path))
|
|
options = ['--verbosity={0}'.format(Logger.VERBOSITY)]
|
|
p = subprocess.Popen(command.format(' '.join([target] + options)),
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE,
|
|
shell=True)
|
|
output, test_count, failed_tests = self._watch_output(p.stdout)
|
|
return {
|
|
'total': test_count,
|
|
'failures': failed_tests,
|
|
'stdout': output,
|
|
'stderr': p.stderr.read().decode('utf-8'),
|
|
'returncode': p.wait()
|
|
}
|
|
|
|
def _watch_output(self, pipe):
|
|
output, total, failures = '', 0, 0
|
|
while True:
|
|
line = pipe.readline().decode('utf-8')
|
|
if not line:
|
|
break
|
|
|
|
verbosity_level = line.count('\x00')
|
|
line = line[verbosity_level:]
|
|
if line.startswith('[ERROR] '):
|
|
Logger.error(line[8:])
|
|
elif line.startswith('[DEBUG] '):
|
|
Logger.debug(line[8:], verbosity_level == 4)
|
|
elif line.startswith('[FAIL] '):
|
|
Logger.fail(line[7:])
|
|
failures += 1
|
|
total += 1
|
|
elif line.startswith('[OK] '):
|
|
Logger.ok(line[5:])
|
|
total += 1
|
|
else:
|
|
Logger.info(line.replace('[INFO] ', ''))
|
|
|
|
output += line
|
|
return (output, total, failures)
|
|
|
|
|
|
def parse_commandline():
|
|
parser = OptionParser(version='0.5')
|
|
parser.add_option('-C', '--config', default="run_tests.conf",
|
|
help='The path to the config file to use [%default]')
|
|
parser.add_option('-R', '--results',
|
|
help='The file where to store the test results')
|
|
parser.add_option('-v', '--verbose', action='count', default=1,
|
|
help='Be more verbose (Maximum output: -vvv)')
|
|
parser.add_option('-q', '--quiet', action='count', default=0,
|
|
help='Be less verbose')
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
options, arguments = parse_commandline()
|
|
suite = TestSuite(options.config)
|
|
|
|
for path in (p for a in arguments for p in glob.glob(a)):
|
|
suite.add_test(path)
|
|
|
|
Logger.set_verbosity(options.verbose - options.quiet)
|
|
suite.run()
|
|
|
|
if options.results is not None:
|
|
with open(options.results, 'w') as f:
|
|
f.write(suite.get_report().encode('utf-8'))
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|
|
|