Refactor IDO query code

refs #5223
This commit is contained in:
Johannes Meyer 2013-12-10 14:55:33 +01:00
parent d5dc65a752
commit 02d24200bb
6 changed files with 101 additions and 71 deletions

View File

@ -5,7 +5,7 @@ import os
import sys
import time
from ido_mysql import run_query
import utils
STATE_OK = 0
@ -32,6 +32,8 @@ output=%(output)s
def main():
run_query = lambda q: utils.run_mysql_query(q, b'/usr/bin/mysql')
# We need to wait a bit first as Icinga processes a
# checkresult only if its newer than the last check
query = 'select unix_timestamp(s.last_check) as last_check ' \

View File

@ -2,7 +2,6 @@ from __future__ import unicode_literals
from datetime import datetime, timedelta
CHECK_INTERVAL = 10 # minutes; The actual interval are 5 minutes but as other
# tests might restart Icinga we need to take any
# rescheduling into account

View File

@ -0,0 +1,80 @@
from __future__ import unicode_literals
import os
import subprocess
__all__ = ['parse_statusdata', 'run_mysql_query', 'run_pgsql_query']
MYSQL_PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
MYSQL_SEPARATOR = '|'
PGSQL_PARAMS = b"-nq -U icinga -d icinga -c".split()
PGSQL_SEPARATOR = '|'
PGSQL_ENVIRONMENT = {
b'PGPASSWORD': b'icinga'
}
def parse_statusdata(data, intelligent_cast=True):
parsed_data, data_type, type_data = {}, '', {}
for line in (l for l in data.split(os.linesep)
if l and not l.startswith('#')):
if '{' in line:
data_type = line.partition('{')[0].strip()
elif '}' in line:
parsed_data.setdefault(data_type, []).append(type_data)
else:
key, _, value = line.partition('=')
if intelligent_cast:
value = _cast_status_value(value)
type_data[key.strip()] = value
return parsed_data
def _cast_status_value(value):
try:
return int(value)
except ValueError:
try:
return float(value)
except ValueError:
return value
def run_mysql_query(query, path):
p = subprocess.Popen([path] + MYSQL_PARAMS + [query.encode('utf-8')],
stdout=subprocess.PIPE)
return _parse_mysql_result([l.decode('utf-8') for l in p.stdout.readlines()])
def _parse_mysql_result(resultset):
result, header = [], None
for line in (l for l in resultset if MYSQL_SEPARATOR in l):
columns = [c.strip() for c in line[1:-3].split(MYSQL_SEPARATOR)]
if header is None:
header = columns
else:
result.append(dict((header[i], v) for i, v in enumerate(columns)))
return result
def run_pgsql_query(query, path):
p = subprocess.Popen([path] + PGSQL_PARAMS + [query.encode('utf-8')],
stdout=subprocess.PIPE, env=PGSQL_ENVIRONMENT)
return _parse_pgsql_result([l.decode('utf-8') for l in p.stdout.readlines()])
def _parse_pgsql_result(resultset):
result, header = [], None
for line in (l for l in resultset if PGSQL_SEPARATOR in l):
columns = [c.strip() for c in line.split(PGSQL_SEPARATOR)]
if header is None:
header = columns
else:
result.append(dict((header[i], v) for i, v in enumerate(columns)))
return result

View File

@ -2,38 +2,14 @@
from __future__ import unicode_literals
import sys
import subprocess
try:
import utils
import ido_tests
IDO_TESTS_FOUND = True
except ImportError:
IDO_TESTS_FOUND = False
MYSQL = b"/usr/bin/mysql"
PARAMS = b"-t -D icinga -u icinga --password=icinga -e".split()
SEPARATOR = '|'
def run_query(query):
p = subprocess.Popen([MYSQL] + PARAMS + [query.encode('utf-8')],
stdout=subprocess.PIPE)
return parse_result([l.decode('utf-8') for l in p.stdout.readlines()])
def parse_result(resultset):
result, header = [], None
for line in (l for l in resultset if SEPARATOR in l):
columns = [c.strip() for c in line[1:-3].split(SEPARATOR)]
if header is None:
header = columns
else:
result.append(dict((header[i], v) for i, v in enumerate(columns)))
return result
def main():
run_query = lambda q: utils.run_mysql_query(q, b'/usr/bin/mysql')
if not ido_tests.validate_tables([d['Tables_in_icinga']
for d in run_query('show tables')]):
return 1
@ -87,8 +63,5 @@ def main():
if __name__ == '__main__':
if not IDO_TESTS_FOUND:
raise
sys.exit(main())

View File

@ -2,41 +2,14 @@
from __future__ import unicode_literals
import sys
import subprocess
try:
import utils
import ido_tests
IDO_TESTS_FOUND = True
except ImportError:
IDO_TESTS_FOUND = False
PSQL = b"/usr/bin/psql"
PARAMS = b"-nq -U icinga -d icinga -c".split()
SEPARATOR = '|'
ENVIRONMENT = {
b'PGPASSWORD': b'icinga'
}
def run_query(query):
p = subprocess.Popen([PSQL] + PARAMS + [query.encode('utf-8')],
stdout=subprocess.PIPE, env=ENVIRONMENT)
return parse_result([l.decode('utf-8') for l in p.stdout.readlines()])
def parse_result(resultset):
result, header = [], None
for line in (l for l in resultset if SEPARATOR in l):
columns = [c.strip() for c in line.split(SEPARATOR)]
if header is None:
header = columns
else:
result.append(dict((header[i], v) for i, v in enumerate(columns)))
return result
def main():
run_query = lambda q: utils.run_pgsql_query(q, b'/usr/bin/psql')
if not ido_tests.validate_tables([d['Name'] for d in run_query('\\dt')
if d['Type'] == 'table']):
return 1
@ -92,8 +65,5 @@ def main():
if __name__ == '__main__':
if not IDO_TESTS_FOUND:
raise
sys.exit(main())

View File

@ -10,10 +10,16 @@
"setups": {
"^ido_[a-z]{2}sql.test$": {
"setup": {
"copy": ["files/ido_tests.py >> /tmp/ido_tests.py"]
"copy": [
"files/ido_tests.py >> /tmp/ido_tests.py",
"files/utils.py >> /tmp/utils.py"
]
},
"teardown": {
"clean": ["/tmp/ido_tests.py", "/tmp/ido_tests.pyc"]
"clean": [
"/tmp/ido_tests.py*",
"/tmp/utils.py*"
]
}
},
"checkresult.test": {
@ -21,7 +27,7 @@
"copy": [
"files/configs/checkresult.conf >> /tmp/checkresult.conf",
"files/wait_for_ido.sh >> /tmp/wait_for_ido.sh",
"ido_mysql.test >> /tmp/ido_mysql.py"
"files/utils.py >> /tmp/utils.py"
],
"exec": [
"sudo mv /tmp/checkresult.conf /etc/icinga2/conf.d/",
@ -31,7 +37,7 @@
]
},
"teardown": {
"clean": ["/tmp/ido_mysql.py*"],
"clean": ["/tmp/utils.py*"],
"exec": [
"sudo rm /etc/icinga2/conf.d/checkresult.conf",
"sudo service icinga2 restart",