first commit

This commit is contained in:
José Lopes de Oliveira Jr 2018-09-12 10:59:10 -03:00
parent 1bbf2e6c3e
commit e7c53e0236
5 changed files with 216 additions and 1 deletions

View File

@ -1,2 +1,27 @@
# netbox-scanner
A scanner util for NetBox
A scanner util for NetBox, because certain networks can be updated automagically. ;)
## Installation
Requirements:
- Python 3
- Postgres 10
Python modules:
- sqlalchemy
- psycopg2
- python-netbox
- python-nmap
netbox-scanner/
- README.md
- requirements.txt
- setup.py
- Makefile
- netbox-scanner/
--- nbs.py
--- db.py
--- config.py
--- __init__.py

View File

27
netbox-scanner/config.py Normal file
View File

@ -0,0 +1,27 @@
# netbox-scanner configuration file.
NETBOX = {
'ADDRESS': '',
'TOKEN': '',
'TLS': True,
'PORT': 443,
}
DATABASE = {
'NAME': 'nbscanner', # database name
'USER': 'nbscanner', # postgresql user
'PASSWORD': 'abc123', # postgresql password
'HOST': 'localhost', # database server
'PORT': '5432', # database port
}
SESSION_LENGTH = 16 # length of scan's session token
DISABLE_TLS_WARNINGS = True # should urllib stop displaying TLS/SSL warnings?
# These are the targets to be scanned. It could be:
# - single hosts: 10.2.50.7
# - single networks: 10.2.50.0/24
# - some hosts: 10.2.50.1-7
# The syntax is just the same as used in Nmap.
# Example: ['10.2.50.7', '10.2.50.0/24']
TARGETS = []

53
netbox-scanner/db.py Normal file
View File

@ -0,0 +1,53 @@
# postgres=# CREATE DATABASE nbscanner;
# CREATE DATABASE
# postgres=# CREATE USER nbscanner WITH PASSWORD 'abc123';
# CREATE ROLE
# postgres=# GRANT ALL PRIVILEGES ON DATABASE nbscanner TO nbscanner;
# GRANT
# postgres=# \q
#
#
#
# https://www.pythoncentral.io/introductory-tutorial-python-sqlalchemy/
# http://www.rmunn.com/sqlalchemy-tutorial/tutorial.html
from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
class Host(db.Model):
__tablename__ = 'Hosts'
id = self.con.Column(self.con.Integer, primary_key=True)
date = self.con.Column(self.con.DateTime, nullable=False)
session = self.con.Column(self.con.String(self.session_length), nullable=False)
address = self.con.Column(self.con.String(100), nullable=False)
name = self.con.Column(self.con.String(255))
mac = self.con.Column(self.con.String(17))
vendor = self.con.Column(self.con.String(100))
osvendor = self.con.Column(self.con.String(100))
osfamily = self.con.Column(self.con.String(100))
cpe = self.con.Column(self.con.String(255))
description = self.con.Column(self.con.String(100))
def __init__(d, s, a, n, m, v, ov, of, c, desc):
self.date = d
self.session = s
self.address = a
self.name = n
self.mac = m
self.vendor = v
self.osvendor = ov
self.osfamily = of
self.cpe = c
self.description = desc
def __repr__(self):
return 'Host(addr={}, date={}, name={})'.format(self.address,
self.date, self.name)
base = declarative_base()
engine = create_engine('postgresql://{}:{}@{}:{}/{}'.format(pguser,pgpass,pghost,pgport,pgdb))
base.metadata.create_all(engine)

View File

@ -0,0 +1,110 @@
from random import SystemRandom
from string import ascii_lowercase, digits
from urllib3 import disable_warnings
from urllib3.exceptions import InsecureRequestWarning
from nmap import PortScanner
from netbox import NetBox
from netbox.exceptions import NotFoundException
from config import NETBOX, DATABASE, SESSION_LENGTH, DISABLE_TLS_WARNINGS, TARGETS
class NetBoxScanner(object):
def __init__(self):
self.netbox = NetBox(host=NETBOX['address'],
use_ssl=NETBOX['TLS'], auth_token=NETBOX['TOKEN'],
port=NETBOX['PORT'])
self.networks = TARGETS
self.results = None
self.session = ''.join(SystemRandom().choice(ascii_lowercase + digits)
for _ in range(SESSION_LENGTH))
if DISABLE_TLS_WARNINGS:
disable_warnings(InsecureRequestWarning)
def scan(self):
'''
Return pattern:
{
'networks': [
{
'network': '10.2.50.0/25',
'hosts': [
{
'address': '10.2.50.7',
'mac': 'ff:ff:ff:ff:ff:ff',
'vendor': 'Dell',
'name': 'hostname',
'osvendor': 'Microsoft',
'osfamily': 'Windows',
'cpe': []
}
]
}
]
}
'''
self.results = {'networks':[]}
for net in self.networks:
nm = PortScanner()
nm.scan(net, arguments='-T4 -O -F')
hosts = []
for host in nm.all_hosts():
ipv4 = nm[host]['addresses']['ipv4']
name = nm[host]['hostnames'][0]['name']
try:
mac = nm[host]['addresses']['mac']
vendor = nm[host]['addresses']['vendor'][mac]
except KeyError:
mac = vendor = '-'
try:
osvendor = nm[host]['osmatch'][0]['osclass'][0]['vendor']
osfamily = nm[host]['osmatch'][0]['osclass'][0]['osfamily']
cpe = nm[host]['osmatch'][0]['osclass'][0]['cpe']
except (KeyError, IndexError):
osvendor = osfamily = cpe = '-'
hosts.append({'address':ipv4,'mac':mac,'vendor':vendor,
'name':name,'osvendor':osvendor,'osfamily':osfamily,
'cpe':cpe})
self.results['networks'].append({'network':net,'hosts':hosts})
return self.results
def nbquery(self, address):
addr = self.netbox.ipam.get_ip_address(address)
print(addr); return
try:
print(addr[0]['address'])
print(addr[0]['description'])
print(addr[0]['tags'])
print(addr[0]['status']['label'])
print(addr[0]['last_updated'])
except IndexError:
return None
def nbdelete(self, address):
try:
self.netbox.ipam.delete_ip_address(address)
except NotFoundException:
return None
return address
def nbcreate(self, address, **kwargs):
'''nbs.nbcreate('10.2.50.77', tags=["auto"], description="Desktop")
'''
self.netbox.ipam.create_ip_address(address, **kwargs)
def nbupdate(self, address, **kwargs):
self.netbox.ipam.update_ip(address, **kwargs)
def sync(self):
pass
nbs = NetBoxScanner()
#print(nbs.session)
#nbs.nbquery('10.2.50.99')
#nbs.nbdelete('10.2.50.99')
#nbs.nbcreate('10.2.50.99', tags=["auto"], description="Desktop")
#nbs.nbupdate('10.2.50.99', description="Server")