netbox-scanner/nbs/__init__.py
2020-05-28 17:22:14 -03:00

91 lines
2.8 KiB
Python

import logging
from pynetbox import api
class NetBoxScanner(object):
def __init__(self, address, token, tls_verify, tag, cleanup):
self.netbox = api(address, token, ssl_verify=tls_verify)
self.tag = tag
self.cleanup = cleanup
self.stats = {
'unchanged': 0,
'created': 0,
'updated': 0,
'deleted': 0,
'errors': 0
}
def sync_host(self, host):
'''Syncs a single host to NetBox
host: a tuple like ('10.0.0.1','Gateway')
returns: True if syncing is good or False for errors
'''
try:
nbhost = self.netbox.ipam.ip_addresses.get(address=host[0])
except ValueError:
logging.error(f'duplicated: {host[0]}/32')
self.stats['errors'] += 1
return False
if nbhost:
if (self.tag in nbhost.tags):
if (host[1] != nbhost.description):
aux = nbhost.description
nbhost.description = host[1]
nbhost.save()
logging.info(f'updated: {host[0]}/32 "{aux}" -> "{host[1]}"')
self.stats['updated'] += 1
else:
logging.info(f'unchanged: {host[0]}/32 "{host[1]}"')
self.stats['unchanged'] += 1
else:
logging.info(f'unchanged: {host[0]}/32 "{host[1]}"')
self.stats['unchanged'] += 1
else:
self.netbox.ipam.ip_addresses.create(
address=host[0],
tags=[self.tag],
description=host[1]
)
logging.info(f'created: {host[0]}/32 "{host[1]}"')
self.stats['created'] += 1
return True
def garbage_collector(self, hosts):
'''Removes records from NetBox not found in last sync'''
nbhosts = self.netbox.ipam.ip_addresses.filter(tag=self.tag)
for nbhost in nbhosts:
nbh = str(nbhost).split('/')[0]
if not any(nbh == addr[0] for addr in hosts):
nbhost.delete()
logging.info(f'deleted: {nbhost}')
self.stats['deleted'] += 1
def sync(self, hosts):
'''Syncs hosts to NetBox
hosts: list of tuples like [(addr,description),...]
'''
for s in self.stats:
self.stats[s] = 0
logging.info('started: {} hosts'.format(len(hosts)))
for host in hosts:
self.sync_host(host)
if self.cleanup:
self.garbage_collector(hosts)
logging.info('finished: .{} +{} ~{} -{} !{}'.format(
self.stats['unchanged'],
self.stats['created'],
self.stats['updated'],
self.stats['deleted'],
self.stats['errors']
))
return True