improved code organization and documentation

This commit is contained in:
José Lopes 2018-10-03 13:32:48 -03:00
parent 44b2cd7a10
commit 84f4cdf67f
5 changed files with 101 additions and 75 deletions

View File

@ -1,12 +1,13 @@
# netbox-scanner
A scanner util for NetBox, because certain networks can be updated automagically. ;)
A scanner util for [NetBox](https://netbox.readthedocs.io/en/stable/), because certain networks can be updated automagically. ;)
## Installation
`netbox-scanner` is available as a Python package via PyPi, so you can install it using `pip`:
`netbox-scanner` is available as a Python package via [PyPi](https://pypi.org/project/netbox-scanner/), so you can install it using `pip`:
$ pip3 install netbox-scanner
Another way is to download from GitHub:
You can also download from GitHub:
$ wget https://github.com/forkd/netbox-scanner/archive/master.zip
$ unzip netbox-scanner-master.zip -d netbox-scanner
@ -15,7 +16,8 @@ Another way is to download from GitHub:
$ vi netbox-scanner/config.py # edit this file, save and exit
$ python netbox-scanner/netbox-scanner.py
Note that `netbox-scanner` will require Nmap and an instance of NetBox ready to use.
Note that `netbox-scanner` will require [Nmap](https://nmap.org/) and an instance of NetBox ready to use.
## Usage
`netbox-scanner` can be used both in your Python programs or as a script. To use `netbox-scanner` as a script, edit `netbox-scanner/config.py` with your setup, and run the command below:
@ -26,11 +28,11 @@ Note that `netbox-scanner` will require Nmap and an instance of NetBox ready to
1. It will scan all networks defined in `netbox-scanner/config.py` or via parameters.
2. For each discovered host it will:
1. If host is in NetBox, description is different, and `tag` is set as defined in `netbox-scanner/config.py/TAG`, it'll be updated.
1. If host is in NetBox, description is different, and `tag` is equal to `netbox-scanner/config.py/TAG`, it's description will be updated.
2. If host is not in NetBox, it'll be created.
3. It will iterate through each network to find and delete any hosts registered in NetBox that did not respond to scan, and have the tag `netbox-scanner/config.py/TAG`.
This way, if some hosts in your monitored networks are eventually down, but you don't want to delete them, just make sure that they doesn't have the tag defined in `netbox-scanner/config.py/TAG`.
This way, if some hosts in your monitored networks are eventually down, but you don't want `netbox-scanner` to manage them, just make sure that they don't have the tag defined in `netbox-scanner/config.py/TAG`.
To see a list of all available parameters in `netbox-scanner.py`, simple use the `-h` option --please note that all parameters are optional, because all of them can be set using `netbox-scanner/config.py` file:
@ -38,5 +40,12 @@ To see a list of all available parameters in `netbox-scanner.py`, simple use the
Of course, you can use `cron` to automatically run `netbox-scanner`.
## Configuration File
`netbox-scanner` have a configuration file (`netbox-scanner/netbox-scanner/config.py`) with all parameters needed to scan networks and synchronize them to NetBox. Before using `netbox-scanner/netbox-scannner/netbox-scanner.py` you should read that file and fill all variables according to your environment.
It is strongly recommended that you use this file instead of passing parameters via command line, because it's easier and avoid common mistakes in multiple executions. You should use command line parameters occasionally, in single scans.
## License
`netbox-scanner` is licensed under a MIT license --read `LICENSE` file for more information.

View File

@ -11,11 +11,13 @@ NETBOX = {
}
DEVICE_AUTH = {
'CISCO': {
'USER': 'netbox',
'PASSWORD': '',
'COMMAND': 'show run | inc hostname'
}
# 'CISCO': {
# 'USER': 'netbox',
# 'PASSWORD': '',
# 'COMMAND': 'show run | inc hostname',
# 'REGEX': r'hostname ([A-Z|a-z|0-9|\-|_]+)',
# 'REGROUP': 1
# }
}
# These are the networks to be scanned.

View File

@ -1,16 +1,18 @@
import re
import logging
from ipaddress import IPv4Network
from nmap import PortScanner
from cpe import CPE
from pynetbox import api
from paramiko import SSHClient, AutoAddPolicy
from paramiko.ssh_exception import AuthenticationException, SSHException, NoValidConnectionsError
from paramiko.ssh_exception import AuthenticationException
from paramiko.ssh_exception import SSHException
from paramiko.ssh_exception import NoValidConnectionsError
# paramiko is too noisy
logging.getLogger('paramiko').setLevel(logging.CRITICAL)
logging.getLogger('paramiko').setLevel(logging.CRITICAL) # paramiko is noisy
class NetBoxScanner(object):
@ -20,6 +22,8 @@ class NetBoxScanner(object):
self.devs = devs_auth
self.tag = tag
self.unknown = unknown
self.stats = {'created':0, 'updated':0, 'deleted':0,
'undiscovered':0, 'duplicated':0}
def get_description(self, address, name, cpe):
'''Define a description based on hostname and CPE'''
@ -35,37 +39,20 @@ class NetBoxScanner(object):
client.connect(address, username=self.devs[vendor]['USER'],
password=self.devs[vendor]['PASSWORD'])
stdin, stdout, stderr = client.exec_command(self.devs[vendor]['COMMAND'])
return '{}: {}'.format(vendor.lower(),
re.search(r'hostname ([A-Z|a-z|0-9|\-|_]+)',
str(stdout.read().decode('utf-8'))).group(1))
return '{}:{}'.format(vendor.lower(),
re.search(self.devs[vendor]['REGEX'],
str(stdout.read().decode('utf-8'))).group(self.devs[vendor]['REGROUP']))
except (AuthenticationException, SSHException,
NoValidConnectionsError, TimeoutError):
pass
return '{}.{}.{}'.format(c.get_vendor()[0], c.get_product()[0],
c.get_version()[0])
def nbhandler(self, command, **kwargs):
'''Handles NetBox integration'''
if command == 'get':
return self.netbox.ipam.ip_addresses.get(
address=kwargs['address'])
elif command == 'create':
self.netbox.ipam.ip_addresses.create(address=kwargs['address'],
tags=kwargs['tag'], description=kwargs['description'])
elif command == 'update':
kwargs['nbhost'].description = kwargs['description']
kwargs['nbhost'].save()
elif command == 'delete':
kwargs['nbhost'].delete()
else:
raise AttributeError
def scan(self, network):
'''Scan a network.
:param network: a valid network, like 10.0.0.0/8
:return: a list with dictionaries of responsive
hosts (address and description)
:return: a list of tuples like [('10.0.0.1','Gateway'),...].
'''
hosts = []
nm = PortScanner()
@ -79,57 +66,85 @@ class NetBoxScanner(object):
nm[host]['osmatch'][0]['osclass'][0]['cpe'])
except (KeyError, AttributeError, IndexError):
description = self.unknown
hosts.append({'address':address,'description':description})
hosts.append((address, description))
return hosts
def logger(self, logtype, **kwargs):
'''Logs and updates stats for NetBox interactions.'''
if logtype == 'created':
logging.info('created: {}/32 "{}"'.format(kwargs['address'],
kwargs['description']))
self.stats['created'] += 1
elif logtype == 'updated':
logging.warning('updated: {}/32 "{}" -> "{}"'.format(
kwargs['address'], kwargs['description_old'],
kwargs['description_new']))
self.stats['updated'] += 1
elif logtype == 'deleted':
logging.warning('deleted: {} "{}"'.format(kwargs['address'],
kwargs['description']))
self.stats['deleted'] += 1
elif logtype == 'undiscovered':
logging.warning('undiscovered: {} "{}"'.format(kwargs['address'],
kwargs['description']))
self.stats['undiscovered'] += 1
elif logtype == 'duplicated':
logging.error('duplicated: {}/32'.format(kwargs['address']))
self.stats['duplicated'] += 1
def sync_host(self, host):
'''Syncs a single host to NetBox.
:param host: a tuple like ('10.0.0.1','Gateway')
:return: True if syncing is ok or False in other case.
'''
try:
nbhost = self.netbox.ipam.ip_addresses.get(address=host[0])
except ValueError:
self.logger('duplicated', address=host[0])
return False
if nbhost:
if (self.tag in nbhost.tags) and (host[1] != nbhost.description):
aux = nbhost.description
nbhost.description = host[1]
nbhost.save()
self.logger('updated', address=host[0], description_old=aux,
description_new=host[1])
else:
self.netbox.ipam.ip_addresses.create(address=host[0],
tags=[self.tag], description=host[0])
self.logger('created', address=host[0], description=host[1])
return True
def sync(self, networks):
'''Scan some networks and sync them to NetBox.
:param networks: a list of valid networks, like ['10.0.0.0/8']
:return: synching statistics are returned as a tuple
'''
create = update = delete = undiscovered = duplicate = 0
for s in self.stats:
self.stats[s] = 0
for net in networks:
hosts = self.scan(net)
logging.info('scan: {} ({} hosts discovered)'.format(net, len(hosts)))
logging.info('scan: {} ({} hosts discovered)'.format(net,
len(hosts)))
for host in hosts:
try:
nbhost = self.nbhandler('get', address=host['address'])
except ValueError:
logging.error('duplicate: {}/32'.format(host['address']))
duplicate += 1
continue
if nbhost:
if (self.tag in nbhost.tags) and (
host['description'] != nbhost.description):
logging.warning('update: {} "{}" -> "{}"'.format(
str(nbhost.address), nbhost.description,
host['description']))
self.nbhandler('update', nbhost=nbhost,
description=host['description'])
update += 1
else:
logging.info('create: {}/32 "{}"'.format(host['address'],
host['description']))
self.netbox.ipam.ip_addresses.create(
address=host['address'], tags=[self.tag],
description=host['description'])
create += 1
self.sync_host(host)
for ipv4 in IPv4Network(net):
for ipv4 in IPv4Network(net): # cleanup
address = str(ipv4)
if not any(h['address'] == address for h in hosts):
if not any(h[0]==address for h in hosts):
try:
nbhost = self.nbhandler('get', address=address)
nbhost = self.netbox.ipam.ip_addresses.get(address=address)
if self.tag in nbhost.tags:
logging.warning('delete: {} "{}"'.format(
nbhost.address, nbhost.description))
self.nbhandler('delete', nbhost=nbhost)
delete += 1
nbhost.delete()
self.logger('deleted', address=nbhost.address,
description=nbhost.description)
else:
logging.warning('undiscovered: {} "{}"'.format(
nbhost.address, nbhost.description))
undiscovered += 1
self.logger('undiscovered', address=nbhost.address,
description=nbhost.description)
except (AttributeError, ValueError):
pass
return (create, update, delete, undiscovered, duplicate)
return (self.stats['created'], self.stats['updated'],
self.stats['deleted'], self.stats['undiscovered'],
self.stats['duplicated'])

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python3
import logging
from sys import stdout, stderr
from argparse import ArgumentParser
from datetime import datetime
from urllib3 import disable_warnings
@ -36,7 +35,8 @@ logging.basicConfig(filename='{}/netbox-scanner-{}.log'.format(args.log,
disable_warnings(InsecureRequestWarning)
nbs = NetBoxScanner(args.address, args.token, args.verify, args.devices, args.tag, args.unknown)
nbs = NetBoxScanner(args.address, args.token, args.verify, args.devices,
args.tag, args.unknown)
logging.info('started: {} networks'.format(len(args.networks)))
stats = nbs.sync(args.networks)
logging.info('finished: +{} ~{} -{} ?{} !{}'.format(stats[0], stats[1],

View File

@ -7,7 +7,7 @@ with open('README.md', 'r') as fh:
setuptools.setup(
name='netbox-scanner',
version='0.4.2',
version='0.5.0',
author='José Lopes de Oliveira Jr.',
author_email='jlojunior@gmail.com',
description='A scanner util for NetBox',