mirror of
https://github.com/lopes/netbox-scanner.git
synced 2025-07-04 04:24:36 +02:00
136 lines
4.6 KiB
Python
136 lines
4.6 KiB
Python
import urllib.request
|
|
|
|
from urllib.parse import urlencode, urlsplit
|
|
from json import loads
|
|
from socket import timeout
|
|
from ssl import _create_unverified_context
|
|
from base64 import b64encode
|
|
|
|
|
|
TIMEOUT = 20
|
|
|
|
|
|
def gen_auth(username, password):
|
|
'Generate basic authorization using username and password'
|
|
return b64encode(f'{username}:{password}'.encode('utf-8')).decode()
|
|
|
|
def make_url(base_url, endpoint, resource):
|
|
'Corsair creates URLs using this method'
|
|
base_url = urlsplit(base_url)
|
|
path = base_url.path + f'/{endpoint}/{resource}'
|
|
path = path.replace('//', '/')
|
|
path = path[:-1] if path.endswith('/') else path
|
|
return base_url._replace(path=path).geturl()
|
|
|
|
|
|
class Api(object):
|
|
def __init__(self, base_url, username, password, tls_verify=True):
|
|
self.base_url = base_url if base_url[-1] != '/' else base_url[:-1]
|
|
self.auth = gen_auth(username, password)
|
|
self.tls_verify = tls_verify
|
|
self.credentials = (self.base_url, self.auth, self.tls_verify)
|
|
|
|
self.data = Endpoint(self.credentials, 'data')
|
|
self.op = Endpoint(self.credentials, 'op')
|
|
|
|
|
|
class Endpoint(object):
|
|
def __init__(self, credentials, endpoint):
|
|
self.base_url = credentials[0]
|
|
self.endpoint = endpoint
|
|
self.resource = ''
|
|
self.auth = credentials[1]
|
|
self.tls_verify = credentials[2]
|
|
|
|
def read(self, _resource, **filters):
|
|
self.resource = f'{_resource}.json' # will only deal with JSON outputs
|
|
first_result = 0 if 'firstResult' not in filters else filters['firstResult']
|
|
max_results = 1000 if 'maxResults' not in filters else filters['maxResults']
|
|
filters.update({'firstResult':first_result, 'maxResults':max_results})
|
|
req = Request(make_url(self.base_url, self.endpoint, self.resource),
|
|
self.auth, self.tls_verify)
|
|
try:
|
|
res = req.get(**filters)
|
|
except timeout:
|
|
raise Exception('Operation timedout')
|
|
return loads(res.read()) # test for possible Prime errors
|
|
|
|
|
|
class Request(object):
|
|
def __init__(self, url, auth, tls_verify):
|
|
self.url = url
|
|
self.auth = auth
|
|
self.timeout = TIMEOUT
|
|
self.context = None if tls_verify else _create_unverified_context()
|
|
self.headers = {
|
|
'Content-Type': 'application/json',
|
|
'Authorization': f'Basic {self.auth}'
|
|
}
|
|
|
|
def get(self, **filters):
|
|
url = f'{self.url}?{self.dotted_filters(**filters)}' if filters else self.url
|
|
req = urllib.request.Request(url, headers=self.headers, method='GET')
|
|
return urllib.request.urlopen(req, timeout=self.timeout, context=self.context)
|
|
|
|
def dotted_filters(self, **filters):
|
|
'Prime filters start with a dot'
|
|
if not filters:
|
|
return ''
|
|
else:
|
|
return f'.{urlencode(filters).replace("&", "&.")}'
|
|
|
|
|
|
class Prime(object):
|
|
|
|
def __init__(self, address, username, password, tls_verify, unknown):
|
|
self.prime = Api(address, username, password, tls_verify)
|
|
self.unknown = unknown
|
|
self.hosts = list()
|
|
|
|
def run(self, access_points=False):
|
|
'''Extracts devices from Cisco Prime
|
|
access_points: if set to True, will try to get APs data
|
|
Returns False for no errors or True if errors occurred
|
|
'''
|
|
errors = False
|
|
devices = self.get_devices('Devices')
|
|
for device in devices:
|
|
try:
|
|
self.hosts.append((
|
|
device['devicesDTO']['ipAddress'],
|
|
device['devicesDTO']['deviceName']
|
|
))
|
|
except KeyError:
|
|
errors = True
|
|
|
|
if access_points:
|
|
aps = self.get_devices('AccessPoints')
|
|
for ap in aps:
|
|
try:
|
|
self.hosts.append((
|
|
ap['accessPointsDTO']['ipAddress']['address'],
|
|
ap['accessPointsDTO']['model']
|
|
))
|
|
except KeyError:
|
|
errors = True
|
|
return errors
|
|
|
|
def get_devices(self, resource):
|
|
'This function is used to support run()'
|
|
raw = list()
|
|
res = self.prime.data.read(resource, full='true')
|
|
count = res['queryResponse']['@count']
|
|
last = res['queryResponse']['@last']
|
|
raw.extend(res['queryResponse']['entity'])
|
|
while last < count - 1:
|
|
first_result = last + 1
|
|
last += 1000
|
|
res = self.prime.data.read(
|
|
resource,
|
|
full='true',
|
|
firstResult=first_result
|
|
)
|
|
raw.extend(res['queryResponse']['entity'])
|
|
return raw
|
|
|