
448 lines
16 KiB
Executable File

# -*- coding: utf-8 -*-
"""Nagios plugin to check a systemd service on different properties"""
import argparse
import logging
import collections
import re
import nagiosplugin
except ImportError as e:
print("Please install python3-nagiosplugin")
raise e
from gi.repository.Gio import DBusProxy, BusType
except ImportError as e:
print("Please install python3-gi")
raise e
_log = logging.getLogger('nagiosplugin')
class Systemd:
"""Systemd access"""
__dbus = None
def dbus(cls):
if not Systemd.__dbus:
_log.debug('Connecting to systemd DBUS')
Systemd.__dbus = DBusProxy.new_for_bus_sync(BusType.SYSTEM,
return Systemd.__dbus
__all_units = None
def all_units(cls, filter=None):
if not Systemd.__all_units:
_log.debug('Listing all units')
Systemd.__all_units = Systemd.dbus().ListUnits()
units_set = set()
for (name, _, _, _, _, _, _, _, _, _) in Systemd.__all_units:
if filter is not None:
if not, name):
return units_set
class Systemd_Service_State(object):
"""State of a Systemd Unit"""
def connect_systemd(self, unit):
""" initializing systemd dbus connection """
loadedUnit = Systemd.dbus().LoadUnit('(s)', unit)
except Exception as e:
raise e
dbus_service = DBusProxy.new_for_bus_sync(BusType.SYSTEM,
self.__dbus_unit = dbus_service
def __init__(self, unit_name):
self.__prop = {}
def __get_prop(self, name):
Catching properties from dbus
if not (name in self.__prop):
self.__prop[name] = self.__dbus_unit.get_cached_property(name).unpack()
#_log.debug('%r of %r is %r', name,, self.__prop[name])
return self.__prop[name]
def id(self):
return self.__get_prop('Id')
def activestate(self):
ActiveState contains a state value that reflects whether the unit is
currently active or not. The following states are currently defined:
active, reloading, inactive, failed, activating, deactivating. active
indicates that unit is active (obviously...). reloading indicates
that the unit is active and currently reloading its configuration.
inactive indicates that it is inactive and the previous run was
successful or no previous run has taken place yet. failed indicates
that it is inactive and the previous run was not successful (more
information about the reason for this is available on the unit type
specific interfaces, for example for services in the Result property,
see below). activating indicates that the unit has previously been
inactive but is currently in the process of entering an active state.
Conversely deactivating indicates that the unit is currently in the
process of deactivation.
return self.__get_prop('ActiveState')
t = self.service.get_cached_property('ActiveState').unpack()
_log.debug('ServiceState of %r is %r', self.service, t)
return t
def substate(self):
SubState encodes states of the same state machine that ActiveState
covers, but knows more fine-grained states that are unit-type-specific.
Where ActiveState only covers six high-level states, SubState covers
possibly many more low-level unit-type-specific states that are mapped
to the six high-level states. Note that multiple low-level states might
map to the same high-level state, but not vice versa. Not all
high-level states have low-level counterparts on all unit types. At
this point the low-level states are not documented here, and are more
likely to be extended later on than the common high-level states
explained above.
return self.__get_prop('SubState')
def loadstate(self):
LoadState of the unit.
return self.__get_prop('LoadState')
NOT_LOADED = -3 # !loaded/inactive
NOT_LOADED_ERROR = -1 # !loaded/*
FAILED = 0 # loaded/failed
ACTIVE = 1 # loaded/active
INACTIVE_DEAD = 2 # loaded/inactive/dead
INACTIVE_OTHER = -2 # loaded/inactive/*
CHANGING = 3 # loaded/{reloading|activating|deactivating}
UNKNOWN = -5 # *
def value(self):
Value for metric/performance
if self.loadstate != "loaded":
return Systemd_Service_State.NOT_LOADED
ast = self.activestate
if ast == "failed":
return Systemd_Service_State.FAILED
elif ast == "active":
return Systemd_Service_State.ACTIVE
elif ast == "inactive":
if self.substate == "dead":
return Systemd_Service_State.INACTIVE_DEAD
return Systemd_Service_State.INACTIVE_OTHER
elif ast in ['activating', 'deactivating', 'reloading']:
return Systemd_Service_State.CHANGING
return Systemd_Service_State.UNKNOWN
def str_state(self, metric, context):
word = "but"
if context.nagios_result(metric) == nagiosplugin.Ok:
word = "and"
return "{} {} {} {}({})".format(, self.loadstate, word, self.activestate, self.substate)
def range(self, metric, context, state_res):
res = context.nagios_result(metric)
if res == state_res:
return self.value
elif res == nagiosplugin.Ok:
return self.value+1
return None
def warning(self, metric, context):
return self.range(metric, context, nagiosplugin.Warn)
def critical(self, metric, context):
return self.range(metric, context, nagiosplugin.Critical)
class Systemd_Service(nagiosplugin.Resource):
"""One Systemd Service"""
def __init__(self, **kwords):
for key, value in kwords.items():
self.__setattr__(key, value)
def normalize(self):
if '.' in self.unit:
_log.debug('Found \'.\' in ServiceName %r, so assuming you know what youre asking for', self.unit)
self.unit = self.unit + '.service'
_log.debug('Normalized unitname to check for %r', self.unit)
def connect_systemd(self):
""" initializing systemd dbus connection """
loadedUnit = Systemd.dbus().LoadUnit('(s)', self.unit)
except Exception as e:
raise e
service = DBusProxy.new_for_bus_sync(BusType.SYSTEM,
self.service = service
def name(self):
"""formatting the Testname (will be formatted as uppercase letters)"""
return "SYSTEMD SERVICE %s" % (self.unit.split('.service')[0])
def probe(self):
""" Create check metric for Systemd Service"""
state = Systemd_Service_State(self.unit)
yield Service_Metric(self.unit, state, context='service_state_explicit')
class Systemd_Services(nagiosplugin.Resource):
"""Several Systemd Services"""
def __init__(self, **kwords):
for key, value in kwords.items():
self.__setattr__(key, value)
def name(self):
"""formatting the Testname (will be formatted as uppercase letters)"""
def services_to_check(self):
"""List of systemd services to check. By default, all presents"""
handled_services = {}
list_services = []
for unit in Systemd.all_units(filter=self.filter):
id_unit = unit
if id_unit in handled_services:"Skipping unit %s already handled with %s", unit, handled_services[id_unit])
handled_services[id_unit] = unit
#_log.debug("Adding unit %s", unit)
return list_services
def probe(self):
""" Create check metric for Systemd Services"""
services = self.services_to_check()
nb_services = len(services)
services_stat = {
'loaded': 0,
'masked': 0,
'not-found': 0,
'active': 0,
yield nagiosplugin.Metric("checked", nb_services)
for unit in services:
#_log.debug("Probing unit %r", unit)
state = Systemd_Service_State(unit)
loadstate = state.loadstate
if not loadstate in services_stat:
raise nagiosplugin.CheckError(
"unknown LoadState '{}' for unit '{}'".format(
loadstate, unit))
services_stat[loadstate] += 1
if loadstate == 'loaded' and state.activestate == 'active':
services_stat['active'] += 1
yield Service_Metric(unit, state, context='service_state_auto')
for kind in services_stat.keys():
yield nagiosplugin.Metric(kind, services_stat[kind])
return []
class Service_Metric(nagiosplugin.Metric):
def __init__(self, name, value, **kwords):
self.__service_state = value
def replace(self, **attr):
obj = super().replace(**attr);
obj.__service_state = self.service_state
#print ("copying service_state {} from {} to {}".format(self.service_state, id(self), id(obj)))
return obj;
def value(self):
return self.service_state.value
def service_state(self):
#print ("getting service_state in {}".format(id(self)))
return self.__service_state
class Systemd_Context(nagiosplugin.ScalarContext):
def is_service(self):
return False
class Service_Context(Systemd_Context):
"""Abstract class"""
def is_service(self):
return True
def nagios_result(self, metric):
# possible Values are:
# nagiosplugin.Ok,
# nagiosplugin.Warn,
# nagiosplugin.Critical,
# nagiosplugin.Unknown
return type(self).resultD[state.value]
def evaluate(self, metric, resource):
nr = self.nagios_result(metric)
return self.result_cls(nr, metric=metric)
def performance(self, metric, resource):
return nagiosplugin.Performance(, metric.value, metric.uom,
metric.service_state.warning(metric, self),
metric.service_state.critical(metric, self),
metric.min, metric.max)
class Service_Context_Auto(Service_Context):
resultD = collections.defaultdict( lambda: nagiosplugin.Unknown,
Systemd_Service_State.INACTIVE_OTHER: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED_ERROR: nagiosplugin.Critical,
Systemd_Service_State.FAILED: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED: nagiosplugin.Ok,
Systemd_Service_State.ACTIVE: nagiosplugin.Ok,
Systemd_Service_State.INACTIVE_DEAD: nagiosplugin.Ok,
Systemd_Service_State.CHANGING: nagiosplugin.Warn,
class Service_Context_Explicit(Service_Context):
resultD = collections.defaultdict( lambda: nagiosplugin.Unknown,
Systemd_Service_State.INACTIVE_OTHER: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED_ERROR: nagiosplugin.Critical,
Systemd_Service_State.FAILED: nagiosplugin.Critical,
Systemd_Service_State.NOT_LOADED: nagiosplugin.Critical,
Systemd_Service_State.ACTIVE: nagiosplugin.Ok,
Systemd_Service_State.INACTIVE_DEAD: nagiosplugin.Critical,
Systemd_Service_State.CHANGING: nagiosplugin.Warn,
class Services_Summary(nagiosplugin.Summary):
def get_stats(self, results):
stats = {
'ok': 0,
'warning': 0,
'critical': 0,
'unknown': 0,
gstats = {}
total = 0
for r in results:
if not r.context.is_service:
gstats[] = r.metric.value
t = r.state.text
if not t in stats:
raise nagiosplugin.CheckError(
"invalid state '{}' in results".format(t))
stats[t] += 1
total += 1
stats['all'] = total
for k in gstats:
stats[k] = gstats[k]
return stats
def ok(self, results):
if len(results) == 1:
return '{0}'.format(results[0])
stats = self.get_stats(results)
return "{0} units ok ({1} actives, {2} inactives, {3} masked, {4} not-found)".format(
stats['ok'], stats['active'], stats['loaded']-stats['active'], stats['masked'], stats['not-found'])
def problem(self, results):
stats = self.get_stats(results)
fs = results.first_significant
t = fs.state.text
if stats[t] == 1:
return '{0}'.format(fs)
return "{0} {1} units".format(stats[t], t)
def main():
argp = argparse.ArgumentParser(description=__doc__,
argp.add_argument('units', help='Check this Unit', nargs='*')
argp.add_argument('-v', '--verbose', action='count', default=0,
help='increase output verbosity (use up to 3 times)')
argp.add_argument('-t', '--timeout', default=10,
help='abort execution after TIMEOUT seconds')
argp.add_argument('-f', '--filter', default='^.*\.service$',
help='regexp for filtering systemd units')
args = argp.parse_args()
if len(args.units) == 1:
check = nagiosplugin.Check(
fmt_metric=lambda m,c: m.service_state.str_state(m, c)),
check.main(args.verbose, args.timeout)
if len(args.units) == 0:
check = nagiosplugin.Check(
fmt_metric=lambda m,c: m.service_state.str_state(m, c)),
fmt_metric=lambda m,c: m.service_state.str_state(m, c)),
#print (Systemd.all_units())
check.main(args.verbose, args.timeout)
if __name__ == '__main__':