mirror of
				https://github.com/Icinga/icinga2.git
				synced 2025-11-04 13:45:04 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			278 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			278 lines
		
	
	
		
			9.8 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
#!/usr/bin/env python
 | 
						|
from __future__ import unicode_literals
 | 
						|
 | 
						|
 | 
						|
import os
 | 
						|
import re
 | 
						|
import sys
 | 
						|
import json
 | 
						|
import glob
 | 
						|
import subprocess
 | 
						|
from optparse import OptionParser
 | 
						|
from xml.dom.minidom import getDOMImplementation
 | 
						|
 | 
						|
 | 
						|
try:
 | 
						|
    from subprocess import DEVNULL
 | 
						|
except ImportError:
 | 
						|
    DEVNULL = open(os.devnull, 'w')
 | 
						|
 | 
						|
 | 
						|
class Logger(object):
 | 
						|
    INFO = 1
 | 
						|
    OK = 2
 | 
						|
    FAIL = 3
 | 
						|
    ERROR = 4
 | 
						|
    DEBUG_STD = 5
 | 
						|
    DEBUG_EXT = 6
 | 
						|
 | 
						|
    VERBOSITY = 0
 | 
						|
    OUTPUT_LENGTH = 1024
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def write(text, stderr=False):
 | 
						|
        if stderr:
 | 
						|
            sys.stderr.write(text.encode('utf-8'))
 | 
						|
            sys.stderr.flush()
 | 
						|
        else:
 | 
						|
            sys.stdout.write(text.encode('utf-8'))
 | 
						|
            sys.stdout.flush()
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def set_verbosity(cls, verbosity):
 | 
						|
        cls.VERBOSITY = verbosity
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def log(cls, severity, text):
 | 
						|
        if severity == cls.INFO and cls.VERBOSITY >= 1:
 | 
						|
            cls.write('\033[1;94m[INFO]\033[1;0m {0}'.format(text))
 | 
						|
        elif severity == cls.ERROR and cls.VERBOSITY >= 1:
 | 
						|
            cls.write('\033[1;33m[ERROR]\033[1;0m {0}'.format(text), True)
 | 
						|
        elif severity == cls.FAIL and cls.VERBOSITY >= 1:
 | 
						|
            cls.write('\033[1;31m[FAIL] {0}\033[1;0m'.format(text))
 | 
						|
        elif severity == cls.OK and cls.VERBOSITY >= 1:
 | 
						|
            cls.write('\033[1;32m[OK]\033[1;0m {0}'.format(text))
 | 
						|
        elif severity == cls.DEBUG_STD and cls.VERBOSITY >= 2:
 | 
						|
            cls.write('\033[1;90m[DEBUG]\033[1;0m {0}'.format(text))
 | 
						|
        elif severity == cls.DEBUG_EXT and cls.VERBOSITY >= 3:
 | 
						|
            if cls.VERBOSITY < 4 and len(text) > cls.OUTPUT_LENGTH:
 | 
						|
                suffix = '... (Truncated to {0} bytes)\n' \
 | 
						|
                         ''.format(cls.OUTPUT_LENGTH)
 | 
						|
                text = text[:cls.OUTPUT_LENGTH] + suffix
 | 
						|
            cls.write('\033[1;90m[DEBUG]\033[1;0m {0}'.format(text))
 | 
						|
        else:
 | 
						|
            return False
 | 
						|
        return True
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def info(cls, text):
 | 
						|
        return cls.log(cls.INFO, text)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def error(cls, text):
 | 
						|
        return cls.log(cls.ERROR, text)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def fail(cls, text):
 | 
						|
        return cls.log(cls.FAIL, text)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def ok(cls, text):
 | 
						|
        return cls.log(cls.OK, text)
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def debug(cls, text, extended=False):
 | 
						|
        return cls.log(cls.DEBUG_EXT if extended else cls.DEBUG_STD, text)
 | 
						|
 | 
						|
 | 
						|
class TestSuite(object):
 | 
						|
    def __init__(self, configpath):
 | 
						|
        self._tests = []
 | 
						|
        self._results = {}
 | 
						|
 | 
						|
        self.load_config(configpath)
 | 
						|
 | 
						|
    def add_test(self, filepath):
 | 
						|
        self._tests.append(filepath)
 | 
						|
 | 
						|
    def load_config(self, filepath):
 | 
						|
        with open(filepath) as f:
 | 
						|
            self._config = json.load(f)
 | 
						|
 | 
						|
    def get_report(self):
 | 
						|
        dom = getDOMImplementation()
 | 
						|
        document = dom.createDocument(None, 'testsuite', None)
 | 
						|
        xml_root = document.documentElement
 | 
						|
 | 
						|
        for name, info in self._results.iteritems():
 | 
						|
            testresult = document.createElement('testcase')
 | 
						|
            testresult.setAttribute('classname', 'vm')
 | 
						|
            testresult.setAttribute('name', name)
 | 
						|
 | 
						|
            totaltests = document.createElement('tests')
 | 
						|
            totaltests.appendChild(document.createTextNode(str(info['total'])))
 | 
						|
            testresult.appendChild(totaltests)
 | 
						|
 | 
						|
            failedtests = document.createElement('failures')
 | 
						|
            failedtests.appendChild(document.createTextNode(str(info['failures'])))
 | 
						|
            testresult.appendChild(failedtests)
 | 
						|
 | 
						|
            systemout = document.createElement('system-out')
 | 
						|
            systemout.appendChild(document.createTextNode(info['stdout']))
 | 
						|
            testresult.appendChild(systemout)
 | 
						|
 | 
						|
            systemerr = document.createElement('system-err')
 | 
						|
            systemerr.appendChild(document.createTextNode(info['stderr']))
 | 
						|
            testresult.appendChild(systemerr)
 | 
						|
 | 
						|
            if info['returncode'] != 0:
 | 
						|
                failure = document.createElement('failure')
 | 
						|
                failure.setAttribute('type', 'returncode')
 | 
						|
                failure.appendChild(document.createTextNode(
 | 
						|
                    'code: {0}'.format(info['returncode'])))
 | 
						|
                testresult.appendChild(failure)
 | 
						|
 | 
						|
            xml_root.appendChild(testresult)
 | 
						|
 | 
						|
        return document.toxml()
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        for path in self._tests:
 | 
						|
            test_name = os.path.basename(path)
 | 
						|
            Logger.debug('Copying test "{0}" to remote machine\n'.format(test_name))
 | 
						|
            self._copy_test(path)
 | 
						|
            self._apply_setup_routines(test_name, 'setup')
 | 
						|
            note_printed = Logger.info('Running test "{0}"...\n'.format(test_name))
 | 
						|
            result = self._run_test(path)
 | 
						|
            Logger.info('Test "{0}" has finished (Total tests: {1}, Failures: {2})\n'
 | 
						|
                        ''.format(test_name, result['total'], result['failures']))
 | 
						|
            self._apply_setup_routines(test_name, 'teardown')
 | 
						|
            Logger.debug('Removing test "{0}" from remote machine\n'.format(test_name))
 | 
						|
            self._remove_test(test_name)
 | 
						|
            self._results[test_name] = result
 | 
						|
            if note_printed:
 | 
						|
                Logger.write('\n')
 | 
						|
 | 
						|
    def _apply_setup_routines(self, test_name, context):
 | 
						|
        instructions = next((t[1].get(context)
 | 
						|
                             for t in self._config.get('setups', {}).iteritems()
 | 
						|
                             if re.match(t[0], test_name)), None)
 | 
						|
        if instructions is not None:
 | 
						|
            note_printed = Logger.info('Applying {0} routines for test "{1}" .. '
 | 
						|
                                       ''.format(context, test_name))
 | 
						|
            for instruction in instructions.get('copy', []):
 | 
						|
                source, _, destination = instruction.partition('>>')
 | 
						|
                self._copy_file(source.strip(), destination.strip())
 | 
						|
            for filepath in instructions.get('clean', []):
 | 
						|
                self._remove_file(filepath)
 | 
						|
            for command in instructions.get('exec', []):
 | 
						|
                self._exec_command(command)
 | 
						|
            if note_printed:
 | 
						|
                Logger.write('Done\n')
 | 
						|
 | 
						|
    def _remove_file(self, path):
 | 
						|
        command = self._config['commands']['clean'].format(path)
 | 
						|
        rc = subprocess.call(command, stdout=DEVNULL, shell=True)
 | 
						|
        if rc != 0:
 | 
						|
            Logger.error('Cannot remove file "{0}" ({1})\n'.format(path, rc))
 | 
						|
 | 
						|
    def _exec_command(self, command):
 | 
						|
        command = self._config['commands']['exec'].format(command)
 | 
						|
        rc = subprocess.call(command, stdout=DEVNULL, shell=True)
 | 
						|
        if rc != 0:
 | 
						|
            Logger.error('Command "{0}" exited with exit code "{1}"\n' \
 | 
						|
                         ''.format(command, rc))
 | 
						|
 | 
						|
    def _copy_file(self, source, destination):
 | 
						|
        command = self._config['commands']['copy'].format(source, destination)
 | 
						|
        rc = subprocess.call(command, stdout=DEVNULL, shell=True)
 | 
						|
        if rc != 0:
 | 
						|
            Logger.error('Cannot copy file "{0}" to "{1}" ({2})\n' \
 | 
						|
                         ''.format(source, destination, rc))
 | 
						|
 | 
						|
    def _copy_test(self, path):
 | 
						|
        self._copy_file(path, os.path.join(self._config['settings']['test_root'],
 | 
						|
                                           os.path.basename(path)))
 | 
						|
 | 
						|
    def _remove_test(self, test_name):
 | 
						|
        test_root = self._config['settings']['test_root']
 | 
						|
        self._remove_file(os.path.join(test_root, test_name))
 | 
						|
 | 
						|
    def _run_test(self, path):
 | 
						|
        command = self._config['commands']['exec']
 | 
						|
        target = os.path.join(self._config['settings']['test_root'],
 | 
						|
                              os.path.basename(path))
 | 
						|
        options = ['--verbosity={0}'.format(Logger.VERBOSITY)]
 | 
						|
        p = subprocess.Popen(command.format(' '.join([target] + options)),
 | 
						|
                             stdout=subprocess.PIPE, stderr=subprocess.PIPE,
 | 
						|
                             shell=True)
 | 
						|
        output, test_count, failed_tests = self._watch_output(p.stdout)
 | 
						|
        return {
 | 
						|
            'total': test_count,
 | 
						|
            'failures': failed_tests,
 | 
						|
            'stdout': output,
 | 
						|
            'stderr': p.stderr.read().decode('utf-8'),
 | 
						|
            'returncode': p.wait()
 | 
						|
            }
 | 
						|
 | 
						|
    def _watch_output(self, pipe):
 | 
						|
        output, total, failures = '', 0, 0
 | 
						|
        while True:
 | 
						|
            line = pipe.readline().decode('utf-8')
 | 
						|
            if not line:
 | 
						|
                break
 | 
						|
 | 
						|
            verbosity_level = line.count('\x00')
 | 
						|
            line = line[verbosity_level:]
 | 
						|
            if line.startswith('[ERROR] '):
 | 
						|
                Logger.error(line[8:])
 | 
						|
            elif line.startswith('[DEBUG] '):
 | 
						|
                Logger.debug(line[8:], verbosity_level == 4)
 | 
						|
            elif line.startswith('[FAIL] '):
 | 
						|
                Logger.fail(line[7:])
 | 
						|
                failures += 1
 | 
						|
                total += 1
 | 
						|
            elif line.startswith('[OK] '):
 | 
						|
                Logger.ok(line[5:])
 | 
						|
                total += 1
 | 
						|
            else:
 | 
						|
                Logger.info(line.replace('[INFO] ', ''))
 | 
						|
 | 
						|
            output += line
 | 
						|
        return (output, total, failures)
 | 
						|
 | 
						|
 | 
						|
def parse_commandline():
 | 
						|
    parser = OptionParser(version='0.5')
 | 
						|
    parser.add_option('-C', '--config', default="run_tests.conf",
 | 
						|
                      help='The path to the config file to use [%default]')
 | 
						|
    parser.add_option('-R', '--results',
 | 
						|
                      help='The file where to store the test results')
 | 
						|
    parser.add_option('-v', '--verbose', action='count', default=1,
 | 
						|
                      help='Be more verbose (Maximum output: -vvv)')
 | 
						|
    parser.add_option('-q', '--quiet', action='count', default=0,
 | 
						|
                      help='Be less verbose')
 | 
						|
    return parser.parse_args()
 | 
						|
 | 
						|
 | 
						|
def main():
 | 
						|
    options, arguments = parse_commandline()
 | 
						|
    suite = TestSuite(options.config)
 | 
						|
 | 
						|
    for path in (p for a in arguments for p in glob.glob(a)):
 | 
						|
        suite.add_test(path)
 | 
						|
 | 
						|
    Logger.set_verbosity(options.verbose - options.quiet)
 | 
						|
    suite.run()
 | 
						|
 | 
						|
    if options.results is not None:
 | 
						|
        with open(options.results, 'w') as f:
 | 
						|
            f.write(suite.get_report().encode('utf-8'))
 | 
						|
 | 
						|
    return 0
 | 
						|
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
    sys.exit(main())
 | 
						|
 |