mirror of
https://github.com/lopes/netbox-scanner.git
synced 2025-07-26 23:34:39 +02:00
minor code improvements
This commit is contained in:
parent
1de0ce0340
commit
ff062e72e7
@ -144,6 +144,32 @@ class NetBoxScanner(object):
|
|||||||
self.logger('created', address=host[0], description=host[1])
|
self.logger('created', address=host[0], description=host[1])
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
def sync_network(self, network):
|
||||||
|
'''Syncs a single network to NetBox.
|
||||||
|
|
||||||
|
:param network: a network with CIDR like '10.0.0.1/24'
|
||||||
|
:return: True if syncing is ok or False in other case.
|
||||||
|
'''
|
||||||
|
hosts = self.scan(network)
|
||||||
|
self.logger('scanned', net=network, hosts=len(hosts))
|
||||||
|
for host in hosts:
|
||||||
|
self.sync_host(host)
|
||||||
|
for ipv4 in IPv4Network(network): # cleanup
|
||||||
|
address = str(ipv4)
|
||||||
|
if not any(h[0]==address for h in hosts):
|
||||||
|
try:
|
||||||
|
nbhost = self.netbox.ipam.ip_addresses.get(address=address)
|
||||||
|
if self.tag in nbhost.tags:
|
||||||
|
nbhost.delete()
|
||||||
|
self.logger('deleted', address=nbhost.address,
|
||||||
|
description=nbhost.description)
|
||||||
|
else:
|
||||||
|
self.logger('undiscovered', address=nbhost.address,
|
||||||
|
description=nbhost.description)
|
||||||
|
except (AttributeError, ValueError):
|
||||||
|
pass
|
||||||
|
return True
|
||||||
|
|
||||||
def sync(self, networks):
|
def sync(self, networks):
|
||||||
'''Scan some networks and sync them to NetBox.
|
'''Scan some networks and sync them to NetBox.
|
||||||
|
|
||||||
@ -152,29 +178,15 @@ class NetBoxScanner(object):
|
|||||||
'''
|
'''
|
||||||
for s in self.stats:
|
for s in self.stats:
|
||||||
self.stats[s] = 0
|
self.stats[s] = 0
|
||||||
|
|
||||||
parsing = self.parser(networks)
|
parsing = self.parser(networks)
|
||||||
if parsing:
|
if parsing:
|
||||||
self.logger('mistyped', badnets=parsing)
|
self.logger('mistyped', badnets=parsing)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
for net in networks:
|
logging.info('started: {} networks'.format(len(networks)))
|
||||||
hosts = self.scan(net)
|
for network in networks:
|
||||||
self.logger('scanned', net=net, hosts=len(hosts))
|
self.sync_network(network)
|
||||||
for host in hosts:
|
logging.info('finished: +{} ~{} -{} ?{} !{}'.format(
|
||||||
self.sync_host(host)
|
self.stats['created'], self.stats['updated'], self.stats['deleted'],
|
||||||
for ipv4 in IPv4Network(net): # cleanup
|
self.stats['undiscovered'], self.stats['duplicated']))
|
||||||
address = str(ipv4)
|
|
||||||
if not any(h[0]==address for h in hosts):
|
|
||||||
try:
|
|
||||||
nbhost = self.netbox.ipam.ip_addresses.get(address=address)
|
|
||||||
if self.tag in nbhost.tags:
|
|
||||||
nbhost.delete()
|
|
||||||
self.logger('deleted', address=nbhost.address,
|
|
||||||
description=nbhost.description)
|
|
||||||
else:
|
|
||||||
self.logger('undiscovered', address=nbhost.address,
|
|
||||||
description=nbhost.description)
|
|
||||||
except (AttributeError, ValueError):
|
|
||||||
pass
|
|
||||||
return True
|
return True
|
||||||
|
@ -5,7 +5,6 @@ import logging
|
|||||||
from configparser import ConfigParser
|
from configparser import ConfigParser
|
||||||
from os import fsync
|
from os import fsync
|
||||||
from os.path import expanduser
|
from os.path import expanduser
|
||||||
from getpass import getpass
|
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from urllib3 import disable_warnings
|
from urllib3 import disable_warnings
|
||||||
from urllib3.exceptions import InsecureRequestWarning
|
from urllib3.exceptions import InsecureRequestWarning
|
||||||
@ -59,7 +58,6 @@ logfile = '{}/netbox-scanner-{}.log'.format(general_conf['log'],
|
|||||||
datetime.now().strftime('%Y%m%dT%H%M%SZ'))
|
datetime.now().strftime('%Y%m%dT%H%M%SZ'))
|
||||||
logging.basicConfig(filename=logfile, level=logging.INFO,
|
logging.basicConfig(filename=logfile, level=logging.INFO,
|
||||||
format='%(asctime)s\tnetbox-scanner\t%(levelname)s\t%(message)s')
|
format='%(asctime)s\tnetbox-scanner\t%(levelname)s\t%(message)s')
|
||||||
|
|
||||||
disable_warnings(InsecureRequestWarning)
|
disable_warnings(InsecureRequestWarning)
|
||||||
|
|
||||||
|
|
||||||
@ -67,10 +65,5 @@ if __name__ == '__main__':
|
|||||||
nbs = NetBoxScanner(netbox_conf['address'], netbox_conf['token'],
|
nbs = NetBoxScanner(netbox_conf['address'], netbox_conf['token'],
|
||||||
netbox_conf.getboolean('tls_verify'), general_conf['nmap_args'],
|
netbox_conf.getboolean('tls_verify'), general_conf['nmap_args'],
|
||||||
tacacs_conf, general_conf['tag'], general_conf['unknown'])
|
tacacs_conf, general_conf['tag'], general_conf['unknown'])
|
||||||
logging.info('started: {} networks'.format(len(networks)))
|
|
||||||
nbs.sync(networks)
|
nbs.sync(networks)
|
||||||
logging.info('finished: +{} ~{} -{} ?{} !{}'.format(nbs.stats['created'],
|
exit(0)
|
||||||
nbs.stats['updated'], nbs.stats['deleted'], nbs.stats['undiscovered'],
|
|
||||||
nbs.stats['duplicated']))
|
|
||||||
|
|
||||||
exit(0)
|
|
||||||
|
2
setup.py
2
setup.py
@ -7,7 +7,7 @@ with open('README.md', 'r') as fh:
|
|||||||
|
|
||||||
setuptools.setup(
|
setuptools.setup(
|
||||||
name='netbox-scanner',
|
name='netbox-scanner',
|
||||||
version='0.6.0',
|
version='0.6.1',
|
||||||
author='José Lopes de Oliveira Jr.',
|
author='José Lopes de Oliveira Jr.',
|
||||||
author_email='jlojunior@gmail.com',
|
author_email='jlojunior@gmail.com',
|
||||||
description='A scanner util for NetBox',
|
description='A scanner util for NetBox',
|
||||||
|
Loading…
x
Reference in New Issue
Block a user