monitoring-check-systemd-se.../check-systemd-service

448 lines
16 KiB
Python
Executable File

#!/usr/bin/python3
# -*- coding: utf-8 -*-
"""Nagios plugin to check a systemd service on different properties"""
import argparse
import logging
import collections
import re
try:
import nagiosplugin
except ImportError as e:
print("Please install python3-nagiosplugin")
raise e
try:
from gi.repository.Gio import DBusProxy, BusType
except ImportError as e:
print("Please install python3-gi")
raise e
_log = logging.getLogger('nagiosplugin')
class Systemd:
"""Systemd access"""
__dbus = None
@classmethod
def dbus(cls):
if not Systemd.__dbus:
_log.debug('Connecting to systemd DBUS')
Systemd.__dbus = DBusProxy.new_for_bus_sync(BusType.SYSTEM,
0,
None,
'org.freedesktop.systemd1',
'/org/freedesktop/systemd1',
'org.freedesktop.systemd1.Manager',
None)
return Systemd.__dbus
__all_units = None
@classmethod
def all_units(cls, filter=None):
if not Systemd.__all_units:
_log.debug('Listing all units')
Systemd.__all_units = Systemd.dbus().ListUnits()
units_set = set()
for (name, _, _, _, _, _, _, _, _, _) in Systemd.__all_units:
if filter is not None:
if not re.search(filter, name):
continue
units_set.add(name)
return units_set
class Systemd_Service_State(object):
"""State of a Systemd Unit"""
def connect_systemd(self, unit):
""" initializing systemd dbus connection """
try:
loadedUnit = Systemd.dbus().LoadUnit('(s)', unit)
except Exception as e:
_log.error(e)
raise e
dbus_service = DBusProxy.new_for_bus_sync(BusType.SYSTEM,
0,
None,
'org.freedesktop.systemd1',
loadedUnit,
'org.freedesktop.systemd1.Unit',
None)
self.__dbus_unit = dbus_service
def __init__(self, unit_name):
self.connect_systemd(unit_name)
self.__prop = {}
def __get_prop(self, name):
"""
Catching properties from dbus
"""
if not (name in self.__prop):
self.__prop[name] = self.__dbus_unit.get_cached_property(name).unpack()
#_log.debug('%r of %r is %r', name, self.id, self.__prop[name])
return self.__prop[name]
@property
def id(self):
return self.__get_prop('Id')
@property
def activestate(self):
"""
ActiveState contains a state value that reflects whether the unit is
currently active or not. The following states are currently defined:
active, reloading, inactive, failed, activating, deactivating. active
indicates that unit is active (obviously...). reloading indicates
that the unit is active and currently reloading its configuration.
inactive indicates that it is inactive and the previous run was
successful or no previous run has taken place yet. failed indicates
that it is inactive and the previous run was not successful (more
information about the reason for this is available on the unit type
specific interfaces, for example for services in the Result property,
see below). activating indicates that the unit has previously been
inactive but is currently in the process of entering an active state.
Conversely deactivating indicates that the unit is currently in the
process of deactivation.
"""
return self.__get_prop('ActiveState')
t = self.service.get_cached_property('ActiveState').unpack()
_log.debug('ServiceState of %r is %r', self.service, t)
return t
@property
def substate(self):
"""
SubState encodes states of the same state machine that ActiveState
covers, but knows more fine-grained states that are unit-type-specific.
Where ActiveState only covers six high-level states, SubState covers
possibly many more low-level unit-type-specific states that are mapped
to the six high-level states. Note that multiple low-level states might
map to the same high-level state, but not vice versa. Not all
high-level states have low-level counterparts on all unit types. At
this point the low-level states are not documented here, and are more
likely to be extended later on than the common high-level states
explained above.
"""
return self.__get_prop('SubState')
@property
def loadstate(self):
"""
LoadState of the unit.
"""
return self.__get_prop('LoadState')
NOT_LOADED = -3 # !loaded/inactive
NOT_LOADED_ERROR = -1 # !loaded/*
FAILED = 0 # loaded/failed
ACTIVE = 1 # loaded/active
INACTIVE_DEAD = 2 # loaded/inactive/dead
INACTIVE_OTHER = -2 # loaded/inactive/*
CHANGING = 3 # loaded/{reloading|activating|deactivating}
UNKNOWN = -5 # *
@property
def value(self):
"""
Value for metric/performance
"""
if self.loadstate != "loaded":
return Systemd_Service_State.NOT_LOADED
else:
ast = self.activestate
if ast == "failed":
return Systemd_Service_State.FAILED
elif ast == "active":
return Systemd_Service_State.ACTIVE
elif ast == "inactive":
if self.substate == "dead":
return Systemd_Service_State.INACTIVE_DEAD
else:
return Systemd_Service_State.INACTIVE_OTHER
elif ast in ['activating', 'deactivating', 'reloading']:
return Systemd_Service_State.CHANGING
else:
return Systemd_Service_State.UNKNOWN
def str_state(self, metric, context):
word = "but"
if context.nagios_result(metric) == nagiosplugin.Ok:
word = "and"
return "{} {} {} {}({})".format(self.id, self.loadstate, word, self.activestate, self.substate)
def range(self, metric, context, state_res):
res = context.nagios_result(metric)
if res == state_res:
return self.value
elif res == nagiosplugin.Ok:
return self.value+1
return None
def warning(self, metric, context):
return self.range(metric, context, nagiosplugin.Warn)
def critical(self, metric, context):
return self.range(metric, context, nagiosplugin.Critical)
class Systemd_Service(nagiosplugin.Resource):
"""One Systemd Service"""
def __init__(self, **kwords):
for key, value in kwords.items():
self.__setattr__(key, value)
def normalize(self):
if '.' in self.unit:
_log.debug('Found \'.\' in ServiceName %r, so assuming you know what youre asking for', self.unit)
else:
self.unit = self.unit + '.service'
_log.debug('Normalized unitname to check for %r', self.unit)
def connect_systemd(self):
""" initializing systemd dbus connection """
try:
loadedUnit = Systemd.dbus().LoadUnit('(s)', self.unit)
except Exception as e:
_log.error(e)
raise e
service = DBusProxy.new_for_bus_sync(BusType.SYSTEM,
0,
None,
'org.freedesktop.systemd1',
loadedUnit,
'org.freedesktop.systemd1.Unit',
None)
self.service = service
@property
def name(self):
"""formatting the Testname (will be formatted as uppercase letters)"""
return "SYSTEMD SERVICE %s" % (self.unit.split('.service')[0])
def probe(self):
""" Create check metric for Systemd Service"""
self.normalize()
state = Systemd_Service_State(self.unit)
yield Service_Metric(self.unit, state, context='service_state_explicit')
class Systemd_Services(nagiosplugin.Resource):
"""Several Systemd Services"""
def __init__(self, **kwords):
for key, value in kwords.items():
self.__setattr__(key, value)
@property
def name(self):
"""formatting the Testname (will be formatted as uppercase letters)"""
return "SYSTEMD SERVICES"
def services_to_check(self):
"""List of systemd services to check. By default, all presents"""
handled_services = {}
list_services = []
for unit in Systemd.all_units(filter=self.filter):
id_unit = unit
if id_unit in handled_services:
_log.info("Skipping unit %s already handled with %s", unit, handled_services[id_unit])
continue
handled_services[id_unit] = unit
list_services.append(unit)
#_log.debug("Adding unit %s", unit)
return list_services
def probe(self):
""" Create check metric for Systemd Services"""
services = self.services_to_check()
nb_services = len(services)
services_stat = {
'loaded': 0,
'masked': 0,
'not-found': 0,
'active': 0,
}
yield nagiosplugin.Metric("checked", nb_services)
for unit in services:
#_log.debug("Probing unit %r", unit)
state = Systemd_Service_State(unit)
loadstate = state.loadstate
if not loadstate in services_stat:
raise nagiosplugin.CheckError(
"unknown LoadState '{}' for unit '{}'".format(
loadstate, unit))
services_stat[loadstate] += 1
if loadstate == 'loaded' and state.activestate == 'active':
services_stat['active'] += 1
yield Service_Metric(unit, state, context='service_state_auto')
for kind in services_stat.keys():
yield nagiosplugin.Metric(kind, services_stat[kind])
return []
class Service_Metric(nagiosplugin.Metric):
def __init__(self, name, value, **kwords):
self.__service_state = value
super().__init__()
def replace(self, **attr):
obj = super().replace(**attr);
obj.__service_state = self.service_state
#print ("copying service_state {} from {} to {}".format(self.service_state, id(self), id(obj)))
return obj;
@property
def value(self):
return self.service_state.value
@property
def service_state(self):
#print ("getting service_state in {}".format(id(self)))
return self.__service_state
class Systemd_Context(nagiosplugin.ScalarContext):
@property
def is_service(self):
return False
class Service_Context(Systemd_Context):
"""Abstract class"""
@property
def is_service(self):
return True
def nagios_result(self, metric):
state=metric.service_state
# possible Values are:
# nagiosplugin.Ok,
# nagiosplugin.Warn,
# nagiosplugin.Critical,
# nagiosplugin.Unknown
return type(self).resultD[state.value]
def evaluate(self, metric, resource):
nr = self.nagios_result(metric)
return self.result_cls(nr, metric=metric)
def performance(self, metric, resource):
return nagiosplugin.Performance(metric.name, metric.value, metric.uom,
metric.service_state.warning(metric, self),
metric.service_state.critical(metric, self),
metric.min, metric.max)
class Service_Context_Auto(Service_Context):
resultD = collections.defaultdict( lambda: nagiosplugin.Unknown,
{
Systemd_Service_State.INACTIVE_OTHER: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED_ERROR: nagiosplugin.Critical,
Systemd_Service_State.FAILED: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED: nagiosplugin.Ok,
Systemd_Service_State.ACTIVE: nagiosplugin.Ok,
Systemd_Service_State.INACTIVE_DEAD: nagiosplugin.Ok,
Systemd_Service_State.CHANGING: nagiosplugin.Warn,
})
class Service_Context_Explicit(Service_Context):
resultD = collections.defaultdict( lambda: nagiosplugin.Unknown,
{
Systemd_Service_State.INACTIVE_OTHER: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED_ERROR: nagiosplugin.Critical,
Systemd_Service_State.FAILED: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED: nagiosplugin.Critical,
Systemd_Service_State.ACTIVE: nagiosplugin.Ok,
Systemd_Service_State.INACTIVE_DEAD: nagiosplugin.Critical,
Systemd_Service_State.CHANGING: nagiosplugin.Warn,
})
class Services_Summary(nagiosplugin.Summary):
def get_stats(self, results):
stats = {
'ok': 0,
'warning': 0,
'critical': 0,
'unknown': 0,
}
gstats = {}
total = 0
for r in results:
if not r.context.is_service:
gstats[r.metric.name] = r.metric.value
continue
t = r.state.text
if not t in stats:
raise nagiosplugin.CheckError(
"invalid state '{}' in results".format(t))
stats[t] += 1
total += 1
stats['all'] = total
for k in gstats:
stats[k] = gstats[k]
return stats
def ok(self, results):
if len(results) == 1:
return '{0}'.format(results[0])
stats = self.get_stats(results)
return "{0} units ok ({1} actives, {2} inactives, {3} masked, {4} not-found)".format(
stats['ok'], stats['active'], stats['loaded']-stats['active'], stats['masked'], stats['not-found'])
def problem(self, results):
stats = self.get_stats(results)
fs = results.first_significant
t = fs.state.text
if stats[t] == 1:
return '{0}'.format(fs)
else:
return "{0} {1} units".format(stats[t], t)
@nagiosplugin.guarded
def main():
argp = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawTextHelpFormatter,
)
argp.add_argument('units', help='Check this Unit', nargs='*')
argp.add_argument('-v', '--verbose', action='count', default=0,
help='increase output verbosity (use up to 3 times)')
argp.add_argument('-t', '--timeout', default=10,
help='abort execution after TIMEOUT seconds')
argp.add_argument('-f', '--filter', default='^.*\.service$',
help='regexp for filtering systemd units')
args = argp.parse_args()
if len(args.units) == 1:
check = nagiosplugin.Check(
Systemd_Service(unit=args.units[0],**vars(args)),
Service_Context_Explicit('service_state_explicit',
fmt_metric=lambda m,c: m.service_state.str_state(m, c)),
)
check.main(args.verbose, args.timeout)
if len(args.units) == 0:
check = nagiosplugin.Check(
Systemd_Services(**vars(args)),
Service_Context_Auto('service_state_auto',
fmt_metric=lambda m,c: m.service_state.str_state(m, c)),
Service_Context_Explicit('service_state_explicit',
fmt_metric=lambda m,c: m.service_state.str_state(m, c)),
Systemd_Context('checked'),
Systemd_Context('masked'),
Systemd_Context('loaded'),
Systemd_Context('active'),
Systemd_Context('not-found'),
Services_Summary(),
)
#print (Systemd.all_units())
check.main(args.verbose, args.timeout)
if __name__ == '__main__':
main()