rename for linting

This commit is contained in:
Max Fiedler 2024-03-25 09:10:01 +01:00
parent 7e7da706c2
commit aff7827bcd
4 changed files with 522 additions and 0 deletions

268
dhcp_lease_list.py Normal file
View File

@ -0,0 +1,268 @@
#!/usr/bin/python3
import datetime
import bisect
def parse_timestamp(raw_str):
tokens = raw_str.split()
if len(tokens) == 1:
if tokens[0].lower() == 'never':
return 'never'
else:
raise Exception('Parse error in timestamp')
elif len(tokens) == 3:
return datetime.datetime.strptime(' '.join(tokens[1:]),
'%Y/%m/%d %H:%M:%S')
else:
raise Exception('Parse error in timestamp')
def timestamp_is_ge(t1, t2):
if t1 == 'never':
return True
elif t2 == 'never':
return False
else:
return t1 >= t2
def timestamp_is_lt(t1, t2):
if t1 == 'never':
return False
elif t2 == 'never':
return t1 != 'never'
else:
return t1 < t2
def timestamp_is_between(t, tstart, tend):
return timestamp_is_ge(t, tstart) and timestamp_is_lt(t, tend)
def parse_hardware(raw_str):
tokens = raw_str.split()
if len(tokens) == 2:
return tokens[1]
else:
raise Exception('Parse error in hardware')
def strip_endquotes(raw_str):
return raw_str.strip('"')
def identity(raw_str):
return raw_str
def parse_binding_state(raw_str):
tokens = raw_str.split()
if len(tokens) == 2:
return tokens[1]
else:
raise Exception('Parse error in binding state')
def parse_next_binding_state(raw_str):
tokens = raw_str.split()
if len(tokens) == 3:
return tokens[2]
else:
raise Exception('Parse error in next binding state')
def parse_rewind_binding_state(raw_str):
tokens = raw_str.split()
if len(tokens) == 3:
return tokens[2]
else:
raise Exception('Parse error in next binding state')
def parse_leases_file(leases_file):
valid_keys = {
'starts': parse_timestamp,
'ends': parse_timestamp,
'tstp': parse_timestamp,
'tsfp': parse_timestamp,
'atsfp': parse_timestamp,
'cltt': parse_timestamp,
'hardware': parse_hardware,
'binding': parse_binding_state,
'next': parse_next_binding_state,
'rewind': parse_rewind_binding_state,
'uid': strip_endquotes,
'client-hostname': strip_endquotes,
'option': identity,
'set': identity,
'on': identity,
'abandoned': None,
'bootp': None,
'reserved': None,
}
leases_db = {}
lease_rec = {}
in_lease = False
in_failover = False
for line in leases_file:
if line.lstrip().startswith('#'):
continue
tokens = line.split()
if len(tokens) == 0:
continue
key = tokens[0].lower()
if key == 'lease':
if not in_lease:
ip_address = tokens[1]
lease_rec = {'ip_address': ip_address}
in_lease = True
else:
raise Exception('Parse error in leases file')
elif key == 'failover':
in_failover = True
elif key == '}':
if in_lease:
for k in valid_keys:
if callable(valid_keys[k]):
lease_rec[k] = lease_rec.get(k, '')
else:
lease_rec[k] = False
ip_address = lease_rec['ip_address']
if ip_address in leases_db:
leases_db[ip_address].insert(0, lease_rec)
else:
leases_db[ip_address] = [lease_rec]
lease_rec = {}
in_lease = False
elif in_failover:
in_failover = False
continue
else:
raise Exception('Parse error in leases file')
elif key in valid_keys:
if in_lease:
value = line[(line.index(key) + len(key)):]
value = value.strip().rstrip(';').rstrip()
if callable(valid_keys[key]):
lease_rec[key] = valid_keys[key](value)
else:
lease_rec[key] = True
else:
raise Exception('Parse error in leases file')
else:
if in_lease:
raise Exception('Parse error in leases file')
if in_lease:
raise Exception('Parse error in leases file')
return leases_db
def round_timedelta(tdelta):
return datetime.timedelta(tdelta.days,
tdelta.seconds
+ (0 if tdelta.microseconds < 500000 else 1))
def timestamp_now():
n = datetime.datetime.utcnow()
return datetime.datetime(n.year, n.month, n.day, n.hour, n.minute, n.second
+ (0 if n.microsecond < 500000 else 1))
def lease_is_active(lease_rec, as_of_ts):
return timestamp_is_between(as_of_ts,
lease_rec['starts'],
lease_rec['ends'])
def ipv4_to_int(ipv4_addr):
parts = ipv4_addr.split('.')
return (int(parts[0]) << 24) \
+ (int(parts[1]) << 16) \
+ (int(parts[2]) << 8) \
+ int(parts[3])
def select_active_leases(leases_db, as_of_ts):
retarray = []
sortedarray = []
for ip_address in leases_db:
lease_rec = leases_db[ip_address][0]
if lease_is_active(lease_rec, as_of_ts):
ip_as_int = ipv4_to_int(ip_address)
insertpos = bisect.bisect(sortedarray, ip_as_int)
sortedarray.insert(insertpos, ip_as_int)
retarray.insert(insertpos, lease_rec)
return retarray
##############################################################################
myfile = open('/var/lib/dhcpd/dhcpd.leases', 'r')
leases = parse_leases_file(myfile)
myfile.close()
now = timestamp_now()
report_dataset = select_active_leases(leases, now)
# TODO: UnboundLocalError:
# local variable 'lease_rec' referenced before assignment
print('+---------------------------------------------------------------------')
print('| DHCPD ACTIVE LEASES REPORT')
print('+---------------+-----------------+--------------------+--------------')
print('| IP Address | MAC Address | Expires (days,H:M:S) | Client Hostname ')
print('+---------------+-----------------+--------------------+--------------')
for lease in report_dataset:
print(('| ' + format(lease['ip_address'], '<15') + ' | '
+ format(lease['hardware'], '<17') + ' | '
+ format(str((lease['ends'] - now)
if lease['ends'] != 'never' else 'never'),
'>20') + ' | ' + lease['client-hostname']))
print('+---------------+-----------------+--------------------+--------------')
print(('| Total Active Leases: ' + str(len(report_dataset))))
print(('| Report generated (UTC): ' + str(now)))
print('+---------------------------------------------------------------------')

64
kea_mgmt.py Normal file
View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
import requests
import json
import re
import pprint
url = "http://localhost:8000/"
headers = {"Content-Type": "application/json"}
# Validate for correct IPv4 address syntax
def is_valid_ipv4(ip_address):
ip_regex = re.compile(r'^((25[0-5]|(2[0-4]|1\d|[1-9]|)\d)(\.(?!$)|$)){4}$')
return bool(ip_regex.match(ip_address))
print("Choose action to execute:")
print("1. list all DHCP4 leases")
print("2. resend specific IP address to DNS server")
choice = int(input("Enter choice: "))
if choice == 1:
data = {
"command": "lease4-get-all",
"service": [
"dhcp4"
]
}
elif choice == 2:
while True:
print("Please enter an IPv4 address:")
ip_address = input()
if is_valid_ipv4(ip_address):
break
else:
print("Invalid IPv4 address. Please try again.")
data = {
"command": "lease4-resend-ddns",
"arguments": {
"ip-address": ip_address
},
"service": [
"dhcp4"
]
}
else:
print("Invalid choice.")
exit()
# Get the json data from the server
response = requests.post(url, headers=headers, data=json.dumps(data))
# Make sure the request was successful
response.raise_for_status()
try:
pprint.pprint(response.json())
except json.JSONDecodeError:
print("Error: The server response is not in a valid JSON format.")

104
login_log.py Normal file
View File

@ -0,0 +1,104 @@
# #!/usr/bin/env python3
"""Write information into a logfile, like the timestamp and the hostname."""
import os
import socket
from datetime import date
from datetime import datetime
# Determine if the OS is MS Windows
if os.name == 'nt':
# Set the location of the file to write to, in the user Documents directory
logfile = os.path.join(os.path.join(os.environ['USERPROFILE']),
r'Documents\login.log')
# For OS which are not MS Windows
else:
logfile = os.path.join(os.path.join(os.environ['HOME']),
r'Documents/login.log')
# Create the file if it does not exist yet
if not os.path.exists(logfile):
with open(logfile, 'w', encoding="utf-8"):
pass
# Determine the current date and time
today = date.today()
now = datetime.now()
current_day = today.strftime("%d.%m.%Y")
current_time = now.strftime("%H:%M:%S")
# Determine the client hostname
CLIENT_HOSTNAME = os.getenv("CLIENTNAME")
# Determine the client hostname via a different method
# in case the first one failed
if CLIENT_HOSTNAME is None:
try:
CLIENT_HOSTNAME = socket.gethostbyaddr(socket.gethostname())[0]
except socket.herror:
print("Socket host error: unable to determine hostname.")
CLIENT_HOSTNAME = "localhost"
# The `class Prepender` is a class that is used to prepend information to the
# top of a log file. It takes the file path as an argument and reads in the
# existing file. It provides methods to write lines to the file in reverse
# order, effectively prepending the lines to the top of the file. Finally,
# when the `Prepender` object is closed or exited, it writes the modified lines
# back to the file.
class Prepender:
'''Class for prepending the information to the log file.'''
def __init__(self,
file_path,
):
# Read in the existing file, so we can write it back later
with open(file_path, mode='r', encoding="utf-8") as file:
self.__write_queue = file.readlines()
self.__open_file = open(file_path, mode='w', encoding="utf-8")
def write_line(self, line):
'''Function for adding a new line to the file.'''
self.__write_queue.insert(0,
"%s\n" % line,
)
def write_lines(self, lines):
'''Function for adding new lines to the file.'''
lines.reverse()
for line in lines:
self.write_line(line)
def close(self):
'''Function for closing the file after use.'''
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self.__write_queue:
self.__open_file.writelines(self.__write_queue)
self.__open_file.close()
# if isinstance(CLIENT_HOSTNAME, type) is None:
# if CLIENT_HOSTNAME == "None":
# CLIENT_HOSTNAME = "localhost"
# Prepend the determined information to the top of the log file
with Prepender(logfile) as f:
# Must write individual lines in reverse order
f.write_line(current_day+' - '+current_time+' - '+CLIENT_HOSTNAME)
# Or, use write_lines instead - that maintains order.
# with Prepender(logfile) as f:
# f.write_lines(
# ['This will be line 1',
# 'This will be line 2',
# 'This will be line 3',
# ]
# )

86
logoff_log.py Normal file
View File

@ -0,0 +1,86 @@
#!/usr/bin/env python3
import os
import socket
from datetime import date
from datetime import datetime
# Determine if the OS is MS Windows
if os.name == 'nt':
# Set the location of the file to write to, in the user Documents directory
logfile = os.path.join(os.path.join(os.environ['USERPROFILE']),
r'Documents\logoff.log')
# For OS which are not MS Windows
else:
logfile = os.path.join(os.path.join(os.environ['HOME']),
r'Documents/logoff.log')
# Create the file if it does not exist yet
if not os.path.exists(logfile):
with open(logfile, 'w'):
pass
# Determine the current date and time
today = date.today()
now = datetime.now()
current_day = today.strftime("%d.%m.%Y")
current_time = now.strftime("%H:%M:%S")
# Determine the client hostname
client_hostname = os.getenv("CLIENTNAME")
# Determine the client hostname via a different method
# in case the first one failed
if client_hostname is None:
client_hostname = socket.gethostbyaddr(socket.gethostname())[0]
# Class for prepending the date and time to the log file
class Prepender(object):
def __init__(self,
file_path,
):
# Read in the existing file, so we can write it back later
with open(file_path, mode='r') as f:
self.__write_queue = f.readlines()
self.__open_file = open(file_path, mode='w')
def write_line(self, line):
self.__write_queue.insert(0,
"%s\n" % line,
)
def write_lines(self, lines):
lines.reverse()
for line in lines:
self.write_line(line)
def close(self):
self.__exit__(None, None, None)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
if self.__write_queue:
self.__open_file.writelines(self.__write_queue)
self.__open_file.close()
# Prepend the determined information to the top of the log file
with Prepender(logfile) as f:
# Must write individual lines in reverse order
f.write_line(current_day+' - '+current_time+' - '+client_hostname)
# Or, use write_lines instead - that maintains order.
# with Prepender(logfile) as f:
# f.write_lines(
# ['This will be line 1',
# 'This will be line 2',
# 'This will be line 3',
# ]
# )