mirror of
https://github.com/lopes/netbox-scanner.git
synced 2025-07-23 13:54:48 +02:00
code refactored
This commit is contained in:
parent
ab3c16201b
commit
ee065396f4
10
README.md
10
README.md
@ -2,9 +2,11 @@
|
||||
A scanner util for NetBox, because certain networks can be updated automagically. ;)
|
||||
|
||||
## Installation
|
||||
`netbox-scanner` is available as a Python package via PyPi, so you can install it using `pip`:
|
||||
|
||||
Requirements:
|
||||
pip3 install netbox-scanner
|
||||
|
||||
* Python 3
|
||||
* Postgres 10.5
|
||||
* Nmap 7.60
|
||||
Note that `netbox-scanner` will require Nmap and an instance of NetBox ready to use.
|
||||
|
||||
## Usage
|
||||
`netbox-scanner` can be used both in your programs or as a script to be used in shell.
|
||||
|
@ -7,14 +7,6 @@ NETBOX = {
|
||||
'PORT': 443,
|
||||
}
|
||||
|
||||
DATABASE = {
|
||||
'NAME': 'nbscan', # database name
|
||||
'USER': 'nbscan', # postgresql user
|
||||
'PASSWORD': '', # postgresql password
|
||||
'HOST': 'localhost', # database server
|
||||
'PORT': '5432', # database port
|
||||
}
|
||||
|
||||
TAGS = ['auto'] # only 1 tag is allowed
|
||||
UNKNOWN_HOSTNAME = 'UNKNOWN HOST'
|
||||
DISABLE_TLS_WARNINGS = True # stop displaying TLS/SSL warnings?
|
||||
|
@ -1,44 +0,0 @@
|
||||
# postgres=# CREATE DATABASE nbscan;
|
||||
# CREATE DATABASE
|
||||
# postgres=# CREATE USER nbscan WITH PASSWORD 'abc123';
|
||||
# CREATE ROLE
|
||||
# postgres=# GRANT ALL PRIVILEGES ON DATABASE nbscan TO nbscan;
|
||||
# GRANT
|
||||
# postgres=# \q
|
||||
##
|
||||
|
||||
|
||||
from sqlalchemy import create_engine, Column, Integer, String, DateTime
|
||||
from sqlalchemy.ext.declarative import declarative_base
|
||||
|
||||
from config import DATABASE
|
||||
|
||||
|
||||
base = declarative_base()
|
||||
|
||||
|
||||
class Host(base):
|
||||
__tablename__ = 'Hosts'
|
||||
id = Column(Integer, primary_key=True)
|
||||
date = Column(DateTime, nullable=False)
|
||||
network = Column(String(120), nullable=False)
|
||||
address = Column(String(100), nullable=False)
|
||||
name = Column(String(255))
|
||||
cpe = Column(String(255))
|
||||
|
||||
def __init__(self, d, net, a, n, c):
|
||||
self.date = d
|
||||
self.network = net
|
||||
self.address = a
|
||||
self.name = n
|
||||
self.cpe = c
|
||||
|
||||
def __repr__(self):
|
||||
return '<Host: address={}, network={}, name={}>'.format(self.address,
|
||||
self.network, self.name)
|
||||
|
||||
|
||||
engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(DATABASE['USER'],
|
||||
DATABASE['PASSWORD'], DATABASE['HOST'], DATABASE['PORT'],
|
||||
DATABASE['NAME']))
|
||||
base.metadata.create_all(engine)
|
64
netbox-scanner/nbscan.py
Normal file
64
netbox-scanner/nbscan.py
Normal file
@ -0,0 +1,64 @@
|
||||
|
||||
from urllib3 import disable_warnings
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
from ipaddress import IPv4Network
|
||||
|
||||
from nmap import PortScanner
|
||||
from cpe import CPE
|
||||
from netbox import NetBox
|
||||
|
||||
from config import TAGS, UNKNOWN_HOSTNAME
|
||||
|
||||
|
||||
class NetBoxScanner(object):
|
||||
|
||||
def __init__(self, host, tls, token, port, warnings=True):
|
||||
self.netbox = NetBox(host=host, use_ssl=tls, auth_token=token,
|
||||
port=port)
|
||||
if warnings:
|
||||
disable_warnings(InsecureRequestWarning)
|
||||
|
||||
def get_description(self, name, cpe):
|
||||
if name:
|
||||
return name
|
||||
else:
|
||||
c = CPE(cpe[0], CPE.VERSION_2_3)
|
||||
return '{}.{}.{}'.format(c.get_vendor()[0],
|
||||
c.get_product()[0], c.get_version()[0])
|
||||
|
||||
def scan(self, network):
|
||||
''''''
|
||||
hosts = []
|
||||
nm = PortScanner()
|
||||
nm.scan(network, arguments='-T4 -O -F')
|
||||
|
||||
for host in nm.all_hosts():
|
||||
address = nm[host]['addresses']['ipv4']
|
||||
try:
|
||||
description = self.get_description(nm[host]['hostnames'][0]['name'],
|
||||
nm[host]['osmatch'][0]['osclass'][0]['cpe'])
|
||||
except (KeyError, AttributeError):
|
||||
description = UNKNOWN_HOSTNAME
|
||||
hosts.append({'address':address,'description':description})
|
||||
return hosts
|
||||
|
||||
def sync(self, networks):
|
||||
for net in networks:
|
||||
hosts = self.scan(net)
|
||||
for host in hosts:
|
||||
nbhost = self.netbox.ipam.get_ip_addresses(address=host['address'])
|
||||
if nbhost:
|
||||
if (TAGS[0] in nbhost[0]['tags']) and (host['description'] != nbhost[0]['description']):
|
||||
self.netbox.ipam.update_ip('{}/32'.format(host['address']), description=host['description'])
|
||||
else:
|
||||
self.netbox.ipam.create_ip_address('{}/32'.format(host['address']), tags=TAGS, description=host['description'])
|
||||
|
||||
for ipv4 in IPv4Network(net):
|
||||
address = str(ipv4)
|
||||
if not any(h['address'] == address for h in hosts):
|
||||
nbhost = self.netbox.ipam.get_ip_addresses(address=address)
|
||||
try:
|
||||
if TAGS[0] in nbhost[0]['tags']:
|
||||
self.netbox.ipam.delete_ip_address(address)
|
||||
except IndexError:
|
||||
pass
|
@ -1,56 +0,0 @@
|
||||
from urllib3 import disable_warnings
|
||||
from urllib3.exceptions import InsecureRequestWarning
|
||||
from ipaddress import IPv4Network
|
||||
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
from netbox import NetBox
|
||||
from netbox.exceptions import NotFoundException
|
||||
|
||||
from models import Host, base, engine
|
||||
from config import TAGS
|
||||
|
||||
|
||||
class NetBoxSync(object):
|
||||
|
||||
def __init__(self, host, tls, token, port, warnings):
|
||||
self.netbox = NetBox(host=host, use_ssl=tls, auth_token=token,
|
||||
port=port)
|
||||
if warnings:
|
||||
disable_warnings(InsecureRequestWarning)
|
||||
base.metadata.bind = engine
|
||||
dbsession = sessionmaker(bind=engine)
|
||||
self.session = dbsession()
|
||||
|
||||
def get_last_scan_date(self):
|
||||
return self.session.query(Host).order_by(Host.date.desc()).first().date
|
||||
|
||||
def get_last_scan_all(self):
|
||||
return self.session.query(Host).filter(Host.date == self.get_last_scan_date()).all()
|
||||
|
||||
def get_last_scan_networks(self):
|
||||
return self.session.query(Host.network).filter(Host.date == self.get_last_scan_date()).group_by(Host.network).all()
|
||||
|
||||
def in_last_scan(self, address):
|
||||
return self.session.query(Host.address).filter(Host.address == address).all()
|
||||
|
||||
def sync(self):
|
||||
# updating netbox according to last scan
|
||||
for host in self.get_last_scan_all():
|
||||
nbhost = self.netbox.ipam.get_ip_addresses(address=host.address)
|
||||
if nbhost:
|
||||
if (TAGS[0] in nbhost[0]['tags']) and (host.name != nbhost[0]['description']):
|
||||
self.netbox.ipam.update_ip('{}/32'.format(host.address), description=host.name)
|
||||
else:
|
||||
self.netbox.ipam.create_ip_address('{}/32'.format(host.address), tags=TAGS, description=host.name)
|
||||
|
||||
# deleting not found ipv4 hosts
|
||||
for net in self.get_last_scan_networks():
|
||||
for ipv4 in IPv4Network(net[0]):
|
||||
address = str(ipv4)
|
||||
if not self.in_last_scan(address):
|
||||
nbhost = self.netbox.ipam.get_ip_addresses(address=address)
|
||||
try:
|
||||
if TAGS[0] in nbhost[0]['tags']:
|
||||
self.netbox.ipam.delete_ip_address(address)
|
||||
except IndexError:
|
||||
pass
|
@ -1,17 +1,12 @@
|
||||
from netscan import NetworkScanner
|
||||
from nbsync import NetBoxSync
|
||||
from nbscan import NetBoxScanner
|
||||
from config import NETBOX, DISABLE_TLS_WARNINGS, TARGETS
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
print('{} - starting scan'.format(datetime.now()))
|
||||
ns = NetworkScanner()
|
||||
ns.scan(TARGETS)
|
||||
print('starting - {}'.format(datetime.now()))
|
||||
nbs = NetBoxScanner(NETBOX['ADDRESS'], NETBOX['TLS'],
|
||||
NETBOX['TOKEN'], NETBOX['PORT'], DISABLE_TLS_WARNINGS)
|
||||
nbs.sync(TARGETS)
|
||||
print('finishing - {}'.format(datetime.now()))
|
||||
|
||||
print('{} - starting sync'.format(datetime.now()))
|
||||
nbs = NetBoxSync(NETBOX['ADDRESS'], NETBOX['TLS'], NETBOX['TOKEN'],
|
||||
NETBOX['PORT'], DISABLE_TLS_WARNINGS)
|
||||
nbs.sync()
|
||||
|
||||
print('{} - finished'.format(datetime.now()))
|
||||
exit(0)
|
||||
|
@ -1,53 +0,0 @@
|
||||
from datetime import datetime
|
||||
from random import SystemRandom
|
||||
from string import ascii_lowercase, digits
|
||||
|
||||
from nmap import PortScanner
|
||||
from cpe import CPE
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
||||
from models import Host, base, engine
|
||||
from config import UNKNOWN_HOSTNAME
|
||||
|
||||
|
||||
class NetworkScanner(object):
|
||||
|
||||
def __init__(self):
|
||||
base.metadata.bind = engine
|
||||
dbsession = sessionmaker(bind=engine)
|
||||
self.session = dbsession()
|
||||
|
||||
def get_name(self, nbcpe):
|
||||
cpe = CPE(nbcpe[0], CPE.VERSION_2_3)
|
||||
return '{}:{}:{}'.format(cpe.get_vendor()[0], cpe.get_product()[0],
|
||||
cpe.get_version()[0])
|
||||
|
||||
def scan(self, targets):
|
||||
''''''
|
||||
date = datetime.now()
|
||||
host_count = 0
|
||||
|
||||
for net in targets:
|
||||
|
||||
nm = PortScanner()
|
||||
nm.scan(net, arguments='-T4 -O -F')
|
||||
|
||||
for host in nm.all_hosts():
|
||||
addr = nm[host]['addresses']['ipv4']
|
||||
name = nm[host]['hostnames'][0]['name']
|
||||
try:
|
||||
cpe = nm[host]['osmatch'][0]['osclass'][0]['cpe']
|
||||
except (KeyError, IndexError):
|
||||
cpe = None
|
||||
if not name:
|
||||
try:
|
||||
name = self.get_name(cpe)
|
||||
except TypeError:
|
||||
name = UNKNOWN_HOSTNAME
|
||||
self.session.add(Host(date, net, addr, name, cpe))
|
||||
|
||||
host_count += 1
|
||||
if host_count >= 300000: # ~ a /14 network
|
||||
self.session.commit()
|
||||
host_count = 0
|
||||
self.session.commit()
|
@ -3,9 +3,7 @@ chardet==3.0.4
|
||||
cpe==1.2.1
|
||||
idna==2.7
|
||||
ipaddress==1.0.22
|
||||
psycopg2==2.7.5
|
||||
python-netbox==0.0.12
|
||||
python-nmap==0.6.1
|
||||
requests==2.19.1
|
||||
SQLAlchemy==1.2.11
|
||||
urllib3==1.23
|
||||
|
Loading…
x
Reference in New Issue
Block a user