core routines added

This commit is contained in:
José Lopes 2018-09-18 17:41:30 -03:00
parent e7c53e0236
commit c64d9bb369
9 changed files with 204 additions and 190 deletions

View File

@ -5,23 +5,6 @@ A scanner util for NetBox, because certain networks can be updated automagically
Requirements:
- Python 3
- Postgres 10
Python modules:
- sqlalchemy
- psycopg2
- python-netbox
- python-nmap
netbox-scanner/
- README.md
- requirements.txt
- setup.py
- Makefile
- netbox-scanner/
--- nbs.py
--- db.py
--- config.py
--- __init__.py
* Python 3
* Postgres 10.5
* Nmap 7.60

View File

@ -8,20 +8,17 @@ NETBOX = {
}
DATABASE = {
'NAME': 'nbscanner', # database name
'USER': 'nbscanner', # postgresql user
'PASSWORD': 'abc123', # postgresql password
'NAME': 'nbscan', # database name
'USER': 'nbscan', # postgresql user
'PASSWORD': '', # postgresql password
'HOST': 'localhost', # database server
'PORT': '5432', # database port
}
SESSION_LENGTH = 16 # length of scan's session token
DISABLE_TLS_WARNINGS = True # should urllib stop displaying TLS/SSL warnings?
TAGS = ['auto'] # only 1 tag is allowed
UNKNOWN_HOSTNAME = 'UNKNOWN HOST'
DISABLE_TLS_WARNINGS = True # stop displaying TLS/SSL warnings?
# These are the targets to be scanned. It could be:
# - single hosts: 10.2.50.7
# - single networks: 10.2.50.0/24
# - some hosts: 10.2.50.1-7
# The syntax is just the same as used in Nmap.
# Example: ['10.2.50.7', '10.2.50.0/24']
# These are the targets to be scanned.
# Example: ['192.168.40.0/20', '10.2.50.0/24']
TARGETS = []

View File

@ -1,53 +0,0 @@
# postgres=# CREATE DATABASE nbscanner;
# CREATE DATABASE
# postgres=# CREATE USER nbscanner WITH PASSWORD 'abc123';
# CREATE ROLE
# postgres=# GRANT ALL PRIVILEGES ON DATABASE nbscanner TO nbscanner;
# GRANT
# postgres=# \q
#
#
#
# https://www.pythoncentral.io/introductory-tutorial-python-sqlalchemy/
# http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
class Host(db.Model):
__tablename__ = 'Hosts'
id = self.con.Column(self.con.Integer, primary_key=True)
date = self.con.Column(self.con.DateTime, nullable=False)
session = self.con.Column(self.con.String(self.session_length), nullable=False)
address = self.con.Column(self.con.String(100), nullable=False)
name = self.con.Column(self.con.String(255))
mac = self.con.Column(self.con.String(17))
vendor = self.con.Column(self.con.String(100))
osvendor = self.con.Column(self.con.String(100))
osfamily = self.con.Column(self.con.String(100))
cpe = self.con.Column(self.con.String(255))
description = self.con.Column(self.con.String(100))
def __init__(d, s, a, n, m, v, ov, of, c, desc):
self.date = d
self.session = s
self.address = a
self.name = n
self.mac = m
self.vendor = v
self.osvendor = ov
self.osfamily = of
self.cpe = c
self.description = desc
def __repr__(self):
return 'Host(addr={}, date={}, name={})'.format(self.address,
self.date, self.name)
base = declarative_base()
engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(pguser,pgpass,pghost,pgport,pgdb))
base.metadata.create_all(engine)

44
netbox-scanner/models.py Normal file
View File

@ -0,0 +1,44 @@
# postgres=# CREATE DATABASE nbscan;
# CREATE DATABASE
# postgres=# CREATE USER nbscan WITH PASSWORD 'abc123';
# CREATE ROLE
# postgres=# GRANT ALL PRIVILEGES ON DATABASE nbscan TO nbscan;
# GRANT
# postgres=# \q
##
from sqlalchemy import create_engine, Column, Integer, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from config import DATABASE
base = declarative_base()
class Host(base):
__tablename__ = 'Hosts'
id = Column(Integer, primary_key=True)
date = Column(DateTime, nullable=False)
network = Column(String(120), nullable=False)
address = Column(String(100), nullable=False)
name = Column(String(255))
cpe = Column(String(255))
def __init__(self, d, net, a, n, c):
self.date = d
self.network = net
self.address = a
self.name = n
self.cpe = c
def __repr__(self):
return '<Host: address={}, network={}, name={}>'.format(self.address,
self.network, self.name)
engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(DATABASE['USER'],
DATABASE['PASSWORD'], DATABASE['HOST'], DATABASE['PORT'],
DATABASE['NAME']))
base.metadata.create_all(engine)

56
netbox-scanner/nbsync.py Normal file
View File

@ -0,0 +1,56 @@
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from ipaddress import IPv4Network
from sqlalchemy.orm import sessionmaker
from netbox import NetBox
from netbox.exceptions import NotFoundException
from models import Host, base, engine
from config import TAGS
class NetBoxSync(object):
def __init__(self, host, tls, token, port, warnings):
self.netbox = NetBox(host=host, use_ssl=tls, auth_token=token,
port=port)
if warnings:
disable_warnings(InsecureRequestWarning)
base.metadata.bind = engine
dbsession = sessionmaker(bind=engine)
self.session = dbsession()
def get_last_scan_date(self):
return self.session.query(Host).order_by(Host.date.desc()).first().date
def get_last_scan_all(self):
return self.session.query(Host).filter(Host.date == self.get_last_scan_date()).all()
def get_last_scan_networks(self):
return self.session.query(Host.network).filter(Host.date == self.get_last_scan_date()).group_by(Host.network).all()
def in_last_scan(self, address):
return self.session.query(Host.address).filter(Host.address == address).all()
def sync(self):
# updating netbox according to last scan
for host in self.get_last_scan_all():
nbhost = self.netbox.ipam.get_ip_addresses(address=host.address)
if nbhost:
if (TAGS[0] in nbhost[0]['tags']) and (host.name != nbhost[0]['description']):
self.netbox.ipam.update_ip('{}/32'.format(host.address), description=host.name)
else:
self.netbox.ipam.create_ip_address('{}/32'.format(host.address), tags=TAGS, description=host.name)
# deleting not found ipv4 hosts
for net in self.get_last_scan_networks():
for ipv4 in IPv4Network(net[0]):
address = str(ipv4)
if not self.in_last_scan(address):
nbhost = self.netbox.ipam.get_ip_addresses(address=address)
try:
if TAGS[0] in nbhost[0]['tags']:
self.netbox.ipam.delete_ip_address(address)
except IndexError:
pass

View File

@ -1,110 +1,17 @@
from random import SystemRandom
from string import ascii_lowercase, digits
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from netscan import NetworkScanner
from nbsync import NetBoxSync
from config import NETBOX, DISABLE_TLS_WARNINGS, TARGETS
from nmap import PortScanner
from netbox import NetBox
from netbox.exceptions import NotFoundException
from datetime import datetime
from config import NETBOX, DATABASE, SESSION_LENGTH, DISABLE_TLS_WARNINGS, TARGETS
print('{} - starting scan'.format(datetime.now()))
ns = NetworkScanner()
ns.scan(TARGETS)
print('{} - starting sync'.format(datetime.now()))
nbs = NetBoxSync(NETBOX['ADDRESS'], NETBOX['TLS'], NETBOX['TOKEN'],
NETBOX['PORT'], DISABLE_TLS_WARNINGS)
nbs.sync()
class NetBoxScanner(object):
def __init__(self):
self.netbox = NetBox(host=NETBOX['address'],
use_ssl=NETBOX['TLS'], auth_token=NETBOX['TOKEN'],
port=NETBOX['PORT'])
self.networks = TARGETS
self.results = None
self.session = ''.join(SystemRandom().choice(ascii_lowercase + digits)
for _ in range(SESSION_LENGTH))
if DISABLE_TLS_WARNINGS:
disable_warnings(InsecureRequestWarning)
def scan(self):
'''
Return pattern:
{
'networks': [
{
'network': '10.2.50.0/25',
'hosts': [
{
'address': '10.2.50.7',
'mac': 'ff:ff:ff:ff:ff:ff',
'vendor': 'Dell',
'name': 'hostname',
'osvendor': 'Microsoft',
'osfamily': 'Windows',
'cpe': []
}
]
}
]
}
'''
self.results = {'networks':[]}
for net in self.networks:
nm = PortScanner()
nm.scan(net, arguments='-T4 -O -F')
hosts = []
for host in nm.all_hosts():
ipv4 = nm[host]['addresses']['ipv4']
name = nm[host]['hostnames'][0]['name']
try:
mac = nm[host]['addresses']['mac']
vendor = nm[host]['addresses']['vendor'][mac]
except KeyError:
mac = vendor = '-'
try:
osvendor = nm[host]['osmatch'][0]['osclass'][0]['vendor']
osfamily = nm[host]['osmatch'][0]['osclass'][0]['osfamily']
cpe = nm[host]['osmatch'][0]['osclass'][0]['cpe']
except (KeyError, IndexError):
osvendor = osfamily = cpe = '-'
hosts.append({'address':ipv4,'mac':mac,'vendor':vendor,
'name':name,'osvendor':osvendor,'osfamily':osfamily,
'cpe':cpe})
self.results['networks'].append({'network':net,'hosts':hosts})
return self.results
def nbquery(self, address):
addr = self.netbox.ipam.get_ip_address(address)
print(addr); return
try:
print(addr[0]['address'])
print(addr[0]['description'])
print(addr[0]['tags'])
print(addr[0]['status']['label'])
print(addr[0]['last_updated'])
except IndexError:
return None
def nbdelete(self, address):
try:
self.netbox.ipam.delete_ip_address(address)
except NotFoundException:
return None
return address
def nbcreate(self, address, **kwargs):
'''nbs.nbcreate('10.2.50.77', tags=["auto"], description="Desktop")
'''
self.netbox.ipam.create_ip_address(address, **kwargs)
def nbupdate(self, address, **kwargs):
self.netbox.ipam.update_ip(address, **kwargs)
def sync(self):
pass
nbs = NetBoxScanner()
#print(nbs.session)
#nbs.nbquery('10.2.50.99')
#nbs.nbdelete('10.2.50.99')
#nbs.nbcreate('10.2.50.99', tags=["auto"], description="Desktop")
#nbs.nbupdate('10.2.50.99', description="Server")
print('{} - finished'.format(datetime.now()))
exit(0)

53
netbox-scanner/netscan.py Normal file
View File

@ -0,0 +1,53 @@
from datetime import datetime
from random import SystemRandom
from string import ascii_lowercase, digits
from nmap import PortScanner
from cpe import CPE
from sqlalchemy.orm import sessionmaker
from models import Host, base, engine
from config import UNKNOWN_HOSTNAME
class NetworkScanner(object):
def __init__(self):
base.metadata.bind = engine
dbsession = sessionmaker(bind=engine)
self.session = dbsession()
def get_name(self, nbcpe):
cpe = CPE(nbcpe[0], CPE.VERSION_2_3)
return '{}:{}:{}'.format(cpe.get_vendor()[0], cpe.get_product()[0],
cpe.get_version()[0])
def scan(self, targets):
''''''
date = datetime.now()
host_count = 0
for net in targets:
nm = PortScanner()
nm.scan(net, arguments='-T4 -O -F')
for host in nm.all_hosts():
addr = nm[host]['addresses']['ipv4']
name = nm[host]['hostnames'][0]['name']
try:
cpe = nm[host]['osmatch'][0]['osclass'][0]['cpe']
except (KeyError, IndexError):
cpe = None
if not name:
try:
name = self.get_name(cpe)
except TypeError:
name = UNKNOWN_HOSTNAME
self.session.add(Host(date, net, addr, name, cpe))
host_count += 1
if host_count >= 300000: # ~ a /14 network
self.session.commit()
host_count = 0
self.session.commit()

11
requirements.txt Normal file
View File

@ -0,0 +1,11 @@
certifi==2018.8.24
chardet==3.0.4
cpe==1.2.1
idna==2.7
ipaddress==1.0.22
psycopg2==2.7.5
python-netbox==0.0.12
python-nmap==0.6.1
requests==2.19.1
SQLAlchemy==1.2.11
urllib3==1.23

16
setup.py Normal file
View File

@ -0,0 +1,16 @@
#!/usr/bin/env python3
from setuptools import setup
from codecs import open
setup(
name='netbox-scanner',
packages=['netbox-scanner'],
version='0.0.2',
description='',
long_description=open('README.txt',encoding='utf-8').read(),
author='José Lopes de Oliveira Jr.',
license='MIT License',
url='https://github.com/forkd/netbox-scanner',
keywords=['netbox','ipam','network','scanner']
)