mirror of
https://github.com/lopes/netbox-scanner.git
synced 2025-07-24 22:34:47 +02:00
using pynetbox, logging improved
This commit is contained in:
parent
5d3e6972ac
commit
f1327f2102
@ -3,13 +3,12 @@
|
||||
NETBOX = {
|
||||
'ADDRESS': '',
|
||||
'TOKEN': '',
|
||||
'TLS': True,
|
||||
'PORT': 443,
|
||||
'TLS_VERIFY': True
|
||||
}
|
||||
|
||||
TAG = 'auto'
|
||||
UNKNOWN_HOSTNAME = 'UNKNOWN HOST'
|
||||
DISABLE_TLS_WARNINGS = True # stop displaying TLS/SSL warnings?
|
||||
UNKNOWN = 'UNKNOWN HOST'
|
||||
LOG = '.' # path to logfile
|
||||
|
||||
# These are the networks to be scanned.
|
||||
# Example: ['192.168.40.0/20', '10.2.50.0/24']
|
||||
|
@ -1,22 +1,17 @@
|
||||
import logging
|
||||
from urllib3 import disable_warnings
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
from ipaddress import IPv4Network
|
||||
|
||||
from nmap import PortScanner
|
||||
from cpe import CPE
|
||||
from netbox import NetBox
|
||||
from pynetbox import api
|
||||
|
||||
|
||||
class NetBoxScanner(object):
|
||||
|
||||
def __init__(self, host, tls, token, port, tag, unknown, warnings=True):
|
||||
self.netbox = NetBox(host=host, use_ssl=tls, auth_token=token,
|
||||
port=port)
|
||||
def __init__(self, address, token, tls_verify, tag, unknown):
|
||||
self.netbox = api(address, token=token, ssl_verify=tls_verify)
|
||||
self.tag = tag
|
||||
self.unknown = unknown
|
||||
if warnings:
|
||||
disable_warnings(InsecureRequestWarning)
|
||||
|
||||
def get_description(self, name, cpe):
|
||||
if name:
|
||||
@ -25,6 +20,21 @@ class NetBoxScanner(object):
|
||||
c = CPE(cpe[0], CPE.VERSION_2_3)
|
||||
return '{}.{}.{}'.format(c.get_vendor()[0],
|
||||
c.get_product()[0], c.get_version()[0])
|
||||
|
||||
def nbhandler(self, command, **kwargs):
|
||||
if command == 'get':
|
||||
return self.netbox.ipam.ip_addresses.get(
|
||||
address=kwargs['address'])
|
||||
elif command == 'create':
|
||||
self.netbox.ipam.ip_addresses.create(address=kwargs['address'],
|
||||
tags=kwargs['tag'], description=kwargs['description'])
|
||||
elif command == 'update':
|
||||
kwargs['nbhost'].description = kwargs['description']
|
||||
kwargs['nbhost'].save()
|
||||
elif command == 'delete':
|
||||
kwargs['nbhost'].delete()
|
||||
else:
|
||||
raise AttributeError
|
||||
|
||||
def scan(self, network):
|
||||
'''Scan a network.
|
||||
@ -54,40 +64,48 @@ class NetBoxScanner(object):
|
||||
:param networks: a list of valid networks, like ['10.0.0.0/8']
|
||||
:return: nothing will be returned
|
||||
'''
|
||||
create = update = delete = undiscovered = duplicate = 0
|
||||
for net in networks:
|
||||
logging.info('scan: {}'.format(net))
|
||||
hosts = self.scan(net)
|
||||
for host in hosts:
|
||||
nbhost = self.netbox.ipam.get_ip_addresses(
|
||||
address=host['address'])
|
||||
try:
|
||||
nbhost = self.nbhandler('get', address=host['address'])
|
||||
except ValueError:
|
||||
logging.error('duplicate: {}/32'.format(host['address']))
|
||||
duplicate += 1
|
||||
continue
|
||||
if nbhost:
|
||||
if (self.tag in nbhost[0]['tags']) and (
|
||||
host['description'] != nbhost[0]['description']):
|
||||
if (self.tag in nbhost.tags) and (
|
||||
host['description'] != nbhost.description):
|
||||
logging.warning('update: {} "{}" -> "{}"'.format(
|
||||
host['address'], nbhost[0]['description'],
|
||||
str(nbhost.address), nbhost.description,
|
||||
host['description']))
|
||||
self.netbox.ipam.update_ip('{}/32'.format(
|
||||
host['address']), description=host['description'])
|
||||
self.nbhandler('update', nbhost=nbhost,
|
||||
description=host['description'])
|
||||
update += 1
|
||||
else:
|
||||
logging.info('create: {} "{}"'.format(host['address'],
|
||||
logging.info('create: {}/32 "{}"'.format(host['address'],
|
||||
host['description']))
|
||||
self.netbox.ipam.create_ip_address(
|
||||
'{}/32'.format(host['address']),
|
||||
tags=[self.tag], description=host['description'])
|
||||
self.netbox.ipam.ip_addresses.create(
|
||||
address=host['address'], tags=[self.tag],
|
||||
description=host['description'])
|
||||
create += 1
|
||||
|
||||
for ipv4 in IPv4Network(net):
|
||||
address = str(ipv4)
|
||||
if not any(h['address'] == address for h in hosts):
|
||||
nbhost = self.netbox.ipam.get_ip_addresses(
|
||||
address=address)
|
||||
nbhost = self.nbhandler('get', address=address)
|
||||
try:
|
||||
if self.tag in nbhost[0]['tags']:
|
||||
if self.tag in nbhost.tags:
|
||||
logging.warning('delete: {} "{}"'.format(
|
||||
nbhost[0]['address'],
|
||||
nbhost[0]['description']))
|
||||
self.netbox.ipam.delete_ip_address(address)
|
||||
nbhost.address, nbhost.description))
|
||||
self.nbhandler('delete', nbhost=nbhost)
|
||||
delete += 1
|
||||
else:
|
||||
logging.info('undiscovered: {}'.format(
|
||||
nbhost[0]['address']))
|
||||
except IndexError:
|
||||
nbhost.address))
|
||||
undiscovered += 1
|
||||
except AttributeError:
|
||||
pass
|
||||
return (create, update, delete, undiscovered, duplicate)
|
||||
|
@ -4,38 +4,40 @@ import logging
|
||||
from sys import stdout, stderr
|
||||
from argparse import ArgumentParser
|
||||
from datetime import datetime
|
||||
from urllib3 import disable_warnings
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
|
||||
import config
|
||||
from nbscan import NetBoxScanner
|
||||
|
||||
|
||||
logging.basicConfig(filename='netbox-scanner-{}.log'.format(
|
||||
datetime.now().strftime('%Y%m%dT%H%M%SZ')),
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s\tnetbox-scanner\t%(levelname)s\t%(message)s')
|
||||
|
||||
argp = ArgumentParser()
|
||||
argp.add_argument('-l', '--log', help='logfile path', default=config.LOG)
|
||||
argp.add_argument('-a', '--address', help='netbox address',
|
||||
default=config.NETBOX['ADDRESS'])
|
||||
argp.add_argument('-s', '--tls', help='netbox use tls',
|
||||
action='store_true', default=config.NETBOX['TLS'])
|
||||
argp.add_argument('-t', '--token', help='netbox access token',
|
||||
default=config.NETBOX['TOKEN'])
|
||||
argp.add_argument('-p', '--port', help='netbox access port',
|
||||
default=config.NETBOX['PORT'])
|
||||
argp.add_argument('-v', '--verify', help='tls verify',
|
||||
action='store_true', default=config.NETBOX['TLS_VERIFY'])
|
||||
argp.add_argument('-g', '--tag', help='netbox-scanner tag',
|
||||
default=config.TAG)
|
||||
argp.add_argument('-u', '--unknown', help='netbox-scanner unknown host',
|
||||
default=config.UNKNOWN_HOSTNAME)
|
||||
argp.add_argument('-w', '--warnings', help='disable tls warnings',
|
||||
action='store_true', default=config.DISABLE_TLS_WARNINGS)
|
||||
default=config.UNKNOWN)
|
||||
argp.add_argument('-n', '--networks', nargs='+', help='networks to be scanned',
|
||||
default=config.NETWORKS)
|
||||
args = argp.parse_args()
|
||||
|
||||
nbs = NetBoxScanner(args.address, args.tls, args.token, args.port,
|
||||
args.tag, args.unknown, args.warnings)
|
||||
nbs.sync(args.networks)
|
||||
logging.info('finished')
|
||||
logging.basicConfig(filename='{}/netbox-scanner-{}.log'.format(args.log,
|
||||
datetime.now().strftime('%Y%m%dT%H%M%SZ')),
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s\tnetbox-scanner\t%(levelname)s\t%(message)s')
|
||||
|
||||
disable_warnings(InsecureRequestWarning)
|
||||
|
||||
nbs = NetBoxScanner(args.address, args.token, args.verify, args.tag, args.unknown)
|
||||
logging.info('started: {} networks'.format(len(args.networks)))
|
||||
stats = nbs.sync(args.networks)
|
||||
logging.info('finished: +{} ~{} -{} ?{} !{}'.format(stats[0], stats[1],
|
||||
stats[2], stats[3], stats[4]))
|
||||
|
||||
exit(0)
|
||||
|
@ -2,8 +2,9 @@ certifi==2018.8.24
|
||||
chardet==3.0.4
|
||||
cpe==1.2.1
|
||||
idna==2.7
|
||||
ipaddress==1.0.22
|
||||
python-netbox==0.0.12
|
||||
netaddr==0.7.19
|
||||
pynetbox==3.4.6
|
||||
python-nmap==0.6.1
|
||||
requests==2.19.1
|
||||
six==1.11.0
|
||||
urllib3==1.23
|
||||
|
Loading…
x
Reference in New Issue
Block a user