#!/usr/bin/python3 # -*- coding: utf-8 -*- """Nagios plugin to check a systemd service on different properties""" import argparse import logging import collections import re try: import nagiosplugin except ImportError as e: print("Please install python3-nagiosplugin") raise e try: from gi.repository.Gio import DBusProxy, BusType except ImportError as e: print("Please install python3-gi") raise e _log = logging.getLogger('nagiosplugin') class Systemd: """Systemd access""" __dbus = None @classmethod def dbus(cls): if not Systemd.__dbus: _log.debug('Connecting to systemd DBUS') Systemd.__dbus = DBusProxy.new_for_bus_sync(BusType.SYSTEM, 0, None, 'org.freedesktop.systemd1', '/org/freedesktop/systemd1', 'org.freedesktop.systemd1.Manager', None) return Systemd.__dbus __all_units = None @classmethod def all_units(cls, filter=None): if not Systemd.__all_units: _log.debug('Listing all units') Systemd.__all_units = Systemd.dbus().ListUnits() units_set = set() for (name, _, _, _, _, _, _, _, _, _) in Systemd.__all_units: if filter is not None: if not re.search(filter, name): continue units_set.add(name) return units_set class Systemd_Service_State(object): """State of a Systemd Unit""" def connect_systemd(self, unit): """ initializing systemd dbus connection """ try: loadedUnit = Systemd.dbus().LoadUnit('(s)', unit) except Exception as e: _log.error(e) raise e dbus_service = DBusProxy.new_for_bus_sync(BusType.SYSTEM, 0, None, 'org.freedesktop.systemd1', loadedUnit, 'org.freedesktop.systemd1.Unit', None) self.__dbus_unit = dbus_service def __init__(self, unit_name): self.connect_systemd(unit_name) self.__prop = {} def __get_prop(self, name): """ Catching properties from dbus """ if not (name in self.__prop): self.__prop[name] = self.__dbus_unit.get_cached_property(name).unpack() #_log.debug('%r of %r is %r', name, self.id, self.__prop[name]) return self.__prop[name] @property def id(self): return self.__get_prop('Id') @property def activestate(self): """ ActiveState contains a state value that reflects whether the unit is currently active or not. The following states are currently defined: active, reloading, inactive, failed, activating, deactivating. active indicates that unit is active (obviously...). reloading indicates that the unit is active and currently reloading its configuration. inactive indicates that it is inactive and the previous run was successful or no previous run has taken place yet. failed indicates that it is inactive and the previous run was not successful (more information about the reason for this is available on the unit type specific interfaces, for example for services in the Result property, see below). activating indicates that the unit has previously been inactive but is currently in the process of entering an active state. Conversely deactivating indicates that the unit is currently in the process of deactivation. """ return self.__get_prop('ActiveState') t = self.service.get_cached_property('ActiveState').unpack() _log.debug('ServiceState of %r is %r', self.service, t) return t @property def substate(self): """ SubState encodes states of the same state machine that ActiveState covers, but knows more fine-grained states that are unit-type-specific. Where ActiveState only covers six high-level states, SubState covers possibly many more low-level unit-type-specific states that are mapped to the six high-level states. Note that multiple low-level states might map to the same high-level state, but not vice versa. Not all high-level states have low-level counterparts on all unit types. At this point the low-level states are not documented here, and are more likely to be extended later on than the common high-level states explained above. """ return self.__get_prop('SubState') @property def loadstate(self): """ LoadState of the unit. """ return self.__get_prop('LoadState') NOT_LOADED = -3 # !loaded/inactive NOT_LOADED_ERROR = -1 # !loaded/* FAILED = 0 # loaded/failed ACTIVE = 1 # loaded/active INACTIVE_DEAD = 2 # loaded/inactive/dead INACTIVE_OTHER = -2 # loaded/inactive/* CHANGING = 3 # loaded/{reloading|activating|deactivating} UNKNOWN = -5 # * @property def value(self): """ Value for metric/performance """ if self.loadstate != "loaded": return Systemd_Service_State.NOT_LOADED else: ast = self.activestate if ast == "failed": return Systemd_Service_State.FAILED elif ast == "active": return Systemd_Service_State.ACTIVE elif ast == "inactive": if self.substate == "dead": return Systemd_Service_State.INACTIVE_DEAD else: return Systemd_Service_State.INACTIVE_OTHER elif ast in ['activating', 'deactivating', 'reloading']: return Systemd_Service_State.CHANGING else: return Systemd_Service_State.UNKNOWN def str_state(self, metric, context): word = "but" if context.nagios_result(metric) == nagiosplugin.Ok: word = "and" return "{} {} {} {}({})".format(self.id, self.loadstate, word, self.activestate, self.substate) def range(self, metric, context, state_res): res = context.nagios_result(metric) if res == state_res: return self.value elif res == nagiosplugin.Ok: return self.value+1 return None def warning(self, metric, context): return self.range(metric, context, nagiosplugin.Warn) def critical(self, metric, context): return self.range(metric, context, nagiosplugin.Critical) class Systemd_Service(nagiosplugin.Resource): """One Systemd Service""" def __init__(self, **kwords): for key, value in kwords.items(): self.__setattr__(key, value) def normalize(self): if '.' in self.unit: _log.debug('Found \'.\' in ServiceName %r, so assuming you know what youre asking for', self.unit) else: self.unit = self.unit + '.service' _log.debug('Normalized unitname to check for %r', self.unit) def connect_systemd(self): """ initializing systemd dbus connection """ try: loadedUnit = Systemd.dbus().LoadUnit('(s)', self.unit) except Exception as e: _log.error(e) raise e service = DBusProxy.new_for_bus_sync(BusType.SYSTEM, 0, None, 'org.freedesktop.systemd1', loadedUnit, 'org.freedesktop.systemd1.Unit', None) self.service = service @property def name(self): """formatting the Testname (will be formatted as uppercase letters)""" return "SYSTEMD SERVICE %s" % (self.unit.split('.service')[0]) def probe(self): """ Create check metric for Systemd Service""" self.normalize() state = Systemd_Service_State(self.unit) yield Service_Metric(self.unit, state, context='service_state_explicit') class Systemd_Services(nagiosplugin.Resource): """Several Systemd Services""" def __init__(self, **kwords): for key, value in kwords.items(): self.__setattr__(key, value) @property def name(self): """formatting the Testname (will be formatted as uppercase letters)""" return "SYSTEMD SERVICES" def services_to_check(self): """List of systemd services to check. By default, all presents""" handled_services = {} list_services = [] for unit in Systemd.all_units(filter=self.filter): id_unit = unit if id_unit in handled_services: _log.info("Skipping unit %s already handled with %s", unit, handled_services[id_unit]) continue handled_services[id_unit] = unit list_services.append(unit) #_log.debug("Adding unit %s", unit) return list_services def probe(self): """ Create check metric for Systemd Services""" services = self.services_to_check() nb_services = len(services) services_stat = { 'loaded': 0, 'masked': 0, 'not-found': 0, 'active': 0, } yield nagiosplugin.Metric("checked", nb_services) for unit in services: #_log.debug("Probing unit %r", unit) state = Systemd_Service_State(unit) loadstate = state.loadstate if not loadstate in services_stat: raise nagiosplugin.CheckError( "unknown LoadState '{}' for unit '{}'".format( loadstate, unit)) services_stat[loadstate] += 1 if loadstate == 'loaded' and state.activestate == 'active': services_stat['active'] += 1 yield Service_Metric(unit, state, context='service_state_auto') for kind in services_stat.keys(): yield nagiosplugin.Metric(kind, services_stat[kind]) return [] class Service_Metric(nagiosplugin.Metric): def __init__(self, name, value, **kwords): self.__service_state = value super().__init__() def replace(self, **attr): obj = super().replace(**attr); obj.__service_state = self.service_state #print ("copying service_state {} from {} to {}".format(self.service_state, id(self), id(obj))) return obj; @property def value(self): return self.service_state.value @property def service_state(self): #print ("getting service_state in {}".format(id(self))) return self.__service_state class Systemd_Context(nagiosplugin.ScalarContext): @property def is_service(self): return False class Service_Context(Systemd_Context): """Abstract class""" @property def is_service(self): return True def nagios_result(self, metric): state=metric.service_state # possible Values are: # nagiosplugin.Ok, # nagiosplugin.Warn, # nagiosplugin.Critical, # nagiosplugin.Unknown return type(self).resultD[state.value] def evaluate(self, metric, resource): nr = self.nagios_result(metric) return self.result_cls(nr, metric=metric) def performance(self, metric, resource): return nagiosplugin.Performance(metric.name, metric.value, metric.uom, metric.service_state.warning(metric, self), metric.service_state.critical(metric, self), metric.min, metric.max) class Service_Context_Auto(Service_Context): resultD = collections.defaultdict( lambda: nagiosplugin.Unknown, { Systemd_Service_State.INACTIVE_OTHER: nagiosplugin.Critical, Systemd_Service_State.NOT_LOADED_ERROR: nagiosplugin.Critical, Systemd_Service_State.FAILED: nagiosplugin.Critical, Systemd_Service_State.NOT_LOADED: nagiosplugin.Ok, Systemd_Service_State.ACTIVE: nagiosplugin.Ok, Systemd_Service_State.INACTIVE_DEAD: nagiosplugin.Ok, Systemd_Service_State.CHANGING: nagiosplugin.Warn, }) class Service_Context_Explicit(Service_Context): resultD = collections.defaultdict( lambda: nagiosplugin.Unknown, { Systemd_Service_State.INACTIVE_OTHER: nagiosplugin.Critical, Systemd_Service_State.NOT_LOADED_ERROR: nagiosplugin.Critical, Systemd_Service_State.FAILED: nagiosplugin.Critical, Systemd_Service_State.NOT_LOADED: nagiosplugin.Critical, Systemd_Service_State.ACTIVE: nagiosplugin.Ok, Systemd_Service_State.INACTIVE_DEAD: nagiosplugin.Critical, Systemd_Service_State.CHANGING: nagiosplugin.Warn, }) class Services_Summary(nagiosplugin.Summary): def get_stats(self, results): stats = { 'ok': 0, 'warning': 0, 'critical': 0, 'unknown': 0, } gstats = {} total = 0 for r in results: if not r.context.is_service: gstats[r.metric.name] = r.metric.value continue t = r.state.text if not t in stats: raise nagiosplugin.CheckError( "invalid state '{}' in results".format(t)) stats[t] += 1 total += 1 stats['all'] = total for k in gstats: stats[k] = gstats[k] return stats def ok(self, results): if len(results) == 1: return '{0}'.format(results[0]) stats = self.get_stats(results) return "{0} units ok ({1} actives, {2} inactives, {3} masked, {4} not-found)".format( stats['ok'], stats['active'], stats['loaded']-stats['active'], stats['masked'], stats['not-found']) def problem(self, results): stats = self.get_stats(results) fs = results.first_significant t = fs.state.text if stats[t] == 1: return '{0}'.format(fs) else: return "{0} {1} units".format(stats[t], t) @nagiosplugin.guarded def main(): argp = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter, ) argp.add_argument('units', help='Check this Unit', nargs='*') argp.add_argument('-v', '--verbose', action='count', default=0, help='increase output verbosity (use up to 3 times)') argp.add_argument('-t', '--timeout', default=10, help='abort execution after TIMEOUT seconds') argp.add_argument('-f', '--filter', default='^.*\.service$', help='regexp for filtering systemd units') args = argp.parse_args() if len(args.units) == 1: check = nagiosplugin.Check( Systemd_Service(unit=args.units[0],**vars(args)), Service_Context_Explicit('service_state_explicit', fmt_metric=lambda m,c: m.service_state.str_state(m, c)), ) check.main(args.verbose, args.timeout) if len(args.units) == 0: check = nagiosplugin.Check( Systemd_Services(**vars(args)), Service_Context_Auto('service_state_auto', fmt_metric=lambda m,c: m.service_state.str_state(m, c)), Service_Context_Explicit('service_state_explicit', fmt_metric=lambda m,c: m.service_state.str_state(m, c)), Systemd_Context('checked'), Systemd_Context('masked'), Systemd_Context('loaded'), Systemd_Context('active'), Systemd_Context('not-found'), Services_Summary(), ) #print (Systemd.all_units()) check.main(args.verbose, args.timeout) if __name__ == '__main__': main()