From f1327f2102e1b2e56991756c71dd2a3225cfbe6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jos=C3=A9=20Lopes?= Date: Tue, 25 Sep 2018 17:25:48 -0300 Subject: [PATCH] using pynetbox, logging improved --- netbox-scanner/config.py | 7 ++-- netbox-scanner/nbscan.py | 72 ++++++++++++++++++++------------ netbox-scanner/netbox-scanner.py | 34 ++++++++------- requirements.txt | 5 ++- setup.py | 2 +- 5 files changed, 70 insertions(+), 50 deletions(-) diff --git a/netbox-scanner/config.py b/netbox-scanner/config.py index 7cdd9ee..86c09fe 100644 --- a/netbox-scanner/config.py +++ b/netbox-scanner/config.py @@ -3,13 +3,12 @@ NETBOX = { 'ADDRESS': '', 'TOKEN': '', - 'TLS': True, - 'PORT': 443, + 'TLS_VERIFY': True } TAG = 'auto' -UNKNOWN_HOSTNAME = 'UNKNOWN HOST' -DISABLE_TLS_WARNINGS = True # stop displaying TLS/SSL warnings? +UNKNOWN = 'UNKNOWN HOST' +LOG = '.' # path to logfile # These are the networks to be scanned. # Example: ['192.168.40.0/20', '10.2.50.0/24'] diff --git a/netbox-scanner/nbscan.py b/netbox-scanner/nbscan.py index 9b4bd8b..1fcb761 100644 --- a/netbox-scanner/nbscan.py +++ b/netbox-scanner/nbscan.py @@ -1,22 +1,17 @@ import logging -from urllib3 import disable_warnings -from urllib3.exceptions import InsecureRequestWarning from ipaddress import IPv4Network from nmap import PortScanner from cpe import CPE -from netbox import NetBox +from pynetbox import api class NetBoxScanner(object): - def __init__(self, host, tls, token, port, tag, unknown, warnings=True): - self.netbox = NetBox(host=host, use_ssl=tls, auth_token=token, - port=port) + def __init__(self, address, token, tls_verify, tag, unknown): + self.netbox = api(address, token=token, ssl_verify=tls_verify) self.tag = tag self.unknown = unknown - if warnings: - disable_warnings(InsecureRequestWarning) def get_description(self, name, cpe): if name: @@ -25,6 +20,21 @@ class NetBoxScanner(object): c = CPE(cpe[0], CPE.VERSION_2_3) return '{}.{}.{}'.format(c.get_vendor()[0], c.get_product()[0], c.get_version()[0]) + + def nbhandler(self, command, **kwargs): + if command == 'get': + return self.netbox.ipam.ip_addresses.get( + address=kwargs['address']) + elif command == 'create': + self.netbox.ipam.ip_addresses.create(address=kwargs['address'], + tags=kwargs['tag'], description=kwargs['description']) + elif command == 'update': + kwargs['nbhost'].description = kwargs['description'] + kwargs['nbhost'].save() + elif command == 'delete': + kwargs['nbhost'].delete() + else: + raise AttributeError def scan(self, network): '''Scan a network. @@ -54,40 +64,48 @@ class NetBoxScanner(object): :param networks: a list of valid networks, like ['10.0.0.0/8'] :return: nothing will be returned ''' + create = update = delete = undiscovered = duplicate = 0 for net in networks: logging.info('scan: {}'.format(net)) hosts = self.scan(net) for host in hosts: - nbhost = self.netbox.ipam.get_ip_addresses( - address=host['address']) + try: + nbhost = self.nbhandler('get', address=host['address']) + except ValueError: + logging.error('duplicate: {}/32'.format(host['address'])) + duplicate += 1 + continue if nbhost: - if (self.tag in nbhost[0]['tags']) and ( - host['description'] != nbhost[0]['description']): + if (self.tag in nbhost.tags) and ( + host['description'] != nbhost.description): logging.warning('update: {} "{}" -> "{}"'.format( - host['address'], nbhost[0]['description'], + str(nbhost.address), nbhost.description, host['description'])) - self.netbox.ipam.update_ip('{}/32'.format( - host['address']), description=host['description']) + self.nbhandler('update', nbhost=nbhost, + description=host['description']) + update += 1 else: - logging.info('create: {} "{}"'.format(host['address'], + logging.info('create: {}/32 "{}"'.format(host['address'], host['description'])) - self.netbox.ipam.create_ip_address( - '{}/32'.format(host['address']), - tags=[self.tag], description=host['description']) + self.netbox.ipam.ip_addresses.create( + address=host['address'], tags=[self.tag], + description=host['description']) + create += 1 for ipv4 in IPv4Network(net): address = str(ipv4) if not any(h['address'] == address for h in hosts): - nbhost = self.netbox.ipam.get_ip_addresses( - address=address) + nbhost = self.nbhandler('get', address=address) try: - if self.tag in nbhost[0]['tags']: + if self.tag in nbhost.tags: logging.warning('delete: {} "{}"'.format( - nbhost[0]['address'], - nbhost[0]['description'])) - self.netbox.ipam.delete_ip_address(address) + nbhost.address, nbhost.description)) + self.nbhandler('delete', nbhost=nbhost) + delete += 1 else: logging.info('undiscovered: {}'.format( - nbhost[0]['address'])) - except IndexError: + nbhost.address)) + undiscovered += 1 + except AttributeError: pass + return (create, update, delete, undiscovered, duplicate) diff --git a/netbox-scanner/netbox-scanner.py b/netbox-scanner/netbox-scanner.py index 5fa6987..f1647e1 100644 --- a/netbox-scanner/netbox-scanner.py +++ b/netbox-scanner/netbox-scanner.py @@ -4,38 +4,40 @@ import logging from sys import stdout, stderr from argparse import ArgumentParser from datetime import datetime +from urllib3 import disable_warnings +from urllib3.exceptions import InsecureRequestWarning import config from nbscan import NetBoxScanner -logging.basicConfig(filename='netbox-scanner-{}.log'.format( - datetime.now().strftime('%Y%m%dT%H%M%SZ')), - level=logging.INFO, - format='%(asctime)s\tnetbox-scanner\t%(levelname)s\t%(message)s') - argp = ArgumentParser() +argp.add_argument('-l', '--log', help='logfile path', default=config.LOG) argp.add_argument('-a', '--address', help='netbox address', default=config.NETBOX['ADDRESS']) -argp.add_argument('-s', '--tls', help='netbox use tls', - action='store_true', default=config.NETBOX['TLS']) argp.add_argument('-t', '--token', help='netbox access token', default=config.NETBOX['TOKEN']) -argp.add_argument('-p', '--port', help='netbox access port', - default=config.NETBOX['PORT']) +argp.add_argument('-v', '--verify', help='tls verify', + action='store_true', default=config.NETBOX['TLS_VERIFY']) argp.add_argument('-g', '--tag', help='netbox-scanner tag', default=config.TAG) argp.add_argument('-u', '--unknown', help='netbox-scanner unknown host', - default=config.UNKNOWN_HOSTNAME) -argp.add_argument('-w', '--warnings', help='disable tls warnings', - action='store_true', default=config.DISABLE_TLS_WARNINGS) + default=config.UNKNOWN) argp.add_argument('-n', '--networks', nargs='+', help='networks to be scanned', default=config.NETWORKS) args = argp.parse_args() -nbs = NetBoxScanner(args.address, args.tls, args.token, args.port, - args.tag, args.unknown, args.warnings) -nbs.sync(args.networks) -logging.info('finished') +logging.basicConfig(filename='{}/netbox-scanner-{}.log'.format(args.log, + datetime.now().strftime('%Y%m%dT%H%M%SZ')), + level=logging.INFO, + format='%(asctime)s\tnetbox-scanner\t%(levelname)s\t%(message)s') + +disable_warnings(InsecureRequestWarning) + +nbs = NetBoxScanner(args.address, args.token, args.verify, args.tag, args.unknown) +logging.info('started: {} networks'.format(len(args.networks))) +stats = nbs.sync(args.networks) +logging.info('finished: +{} ~{} -{} ?{} !{}'.format(stats[0], stats[1], + stats[2], stats[3], stats[4])) exit(0) diff --git a/requirements.txt b/requirements.txt index fa7499d..f0a2965 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,8 +2,9 @@ certifi==2018.8.24 chardet==3.0.4 cpe==1.2.1 idna==2.7 -ipaddress==1.0.22 -python-netbox==0.0.12 +netaddr==0.7.19 +pynetbox==3.4.6 python-nmap==0.6.1 requests==2.19.1 +six==1.11.0 urllib3==1.23 diff --git a/setup.py b/setup.py index 7451653..eb59208 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ with open('README.md', 'r') as fh: setuptools.setup( name='netbox-scanner', - version='0.2.1', + version='0.3.0', author='José Lopes de Oliveira Jr.', author_email='jlojunior@gmail.com', description='A scanner util for NetBox',