Added security checks plugin to Linux agent

This commit is contained in:
Enrique Martin 2023-10-05 09:38:32 +02:00
parent 0e530576e6
commit f86a117e30
2 changed files with 560 additions and 0 deletions

View File

@ -300,3 +300,7 @@ module_plugin grep_log /var/log/syslog Syslog ssh
#module_exec echo 5
#module_description Postcondition test module
#module_end
# This plugin runs several security checks in a Linux system
#module_plugin pandora_security_check

View File

@ -0,0 +1,556 @@
#!/usr/bin/python3
# -*- coding: utf-8 -*-
__author__ = ["Enrique Martin Garcia"]
__copyright__ = "Copyright 2023, PandoraFMS"
__maintainer__ = "Operations department"
__status__ = "Production"
__version__= '1.0'
import sys,os,signal
# Add lib dir path
lib_dir = os.path.join(os.path.dirname(sys.argv[0]), 'lib')
sys.path.insert(0, lib_dir)
# Define a function to handle the SIGINT signal
def sigint_handler(signal, frame):
print ('\nInterrupted by user', file=sys.stderr)
sys.exit(0)
signal.signal(signal.SIGINT, sigint_handler)
# Define a function to handle the SIGTERM signal
def sigterm_handler(signum, frame):
print("Received SIGTERM signal.", file=sys.stderr)
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
##############################################################
## SPECIFIC PLUGIN CODE
##############################################################
import argparse
import subprocess
import crypt
import hashlib
import pandoraPlugintools as ppt
###
# GLOBALS
##################
# Set modules global values
modules_group='Security'
# Set configuration blocks names
b_ports='PORTS'
b_files='FILES'
b_passwords='PASSWORDS'
blocks = [b_ports, b_files, b_passwords]
configuration_block=None
# Default integrity file is next to script
integrity_file='/tmp/' + ppt.generate_md5(os.path.abspath(sys.argv[0])) + '.integrity'
# Enable all checks by default
check_selinux=1
check_ssh_root_access=1
check_ssh_root_keys=1
check_ports=1
check_files=1
check_passwords=1
# Initialize check lists
l_ports=[
80,
22
]
l_files=[
'/etc/shadow',
'/etc/passwd',
'/etc/hosts',
'/etc/resolv',
'/etc/ssh/sshd_config',
'/etc/rsyslog.conf'
]
l_passwords=[
'123456',
'12345678',
'123456789',
'12345',
'1234567',
'password',
'1password',
'abc123',
'qwerty',
'111111',
'1234',
'iloveyou',
'sunshine',
'monkey',
'1234567890',
'123123',
'princess',
'baseball',
'dragon',
'football',
'shadow',
'soccer',
'unknown',
'000000',
'myspace1',
'purple',
'fuckyou',
'superman',
'Tigger',
'buster',
'pepper',
'ginger',
'qwerty123',
'qwerty1',
'peanut',
'summer',
'654321',
'michael1',
'cookie',
'LinkedIn',
'whatever',
'mustang',
'qwertyuiop',
'123456a',
'123abc',
'letmein',
'freedom',
'basketball',
'babygirl',
'hello',
'qwe123',
'fuckyou1',
'love',
'family',
'yellow',
'trustno1',
'jesus1',
'chicken',
'diamond',
'scooter',
'booboo',
'welcome',
'smokey',
'cheese',
'computer',
'butterfly',
'696969',
'midnight',
'princess1',
'orange',
'monkey1',
'killer',
'snoopy ',
'qwerty12 ',
'1qaz2wsx ',
'bandit',
'sparky',
'666666',
'football1',
'master',
'asshole',
'batman',
'sunshine1',
'bubbles',
'friends',
'1q2w3e4r',
'chocolate',
'Yankees',
'Tinkerbell',
'iloveyou1',
'abcd1234',
'flower',
'121212',
'passw0rd',
'pokemon',
'StarWars',
'iloveyou2',
'123qwe',
'Pussy',
'angel1'
]
###
# ARGS PARSER
##################
parser = argparse.ArgumentParser(description='Run several security checks in a Linux system')
parser.add_argument('--check_selinux',
help='Enable/Disable check SElinux module',
choices=[0, 1],
type=int,
default=check_selinux
)
parser.add_argument('--check_ssh_root_access',
help='Enable/Disable check SSH root access module',
choices=[0, 1],
type=int,
default=check_ssh_root_access
)
parser.add_argument('--check_ssh_root_keys',
help='Enable/Disable check SSH root keys module',
choices=[0, 1],
type=int,
default=check_ssh_root_keys
)
parser.add_argument('--check_ports',
help='Enable/Disable check ports module',
choices=[0, 1],
type=int,
default=check_ports
)
parser.add_argument('--check_files',
help='Enable/Disable check files module',
choices=[0, 1],
type=int,
default=check_files
)
parser.add_argument('--check_passwords',
help='Enable/Disable check passwords module',
choices=[0, 1],
type=int,
default=check_passwords
)
parser.add_argument('--integrity_file',
help='Path to integrity check file (Default: '+integrity_file+')',
metavar='<integrity_file>',
type=str,
default=integrity_file
)
parser.add_argument('--conf',
help='Path to plugin configuration file',
metavar='<conf_file>',
type=str
)
###
# FUNCTIONS
##################
# Parse current configuration block
def parse_configuration_block(line: str = ''):
global blocks
global configuration_block
for block in blocks:
if line=='['+block+']':
configuration_block=block
break
# Parse all configuration file
def parse_configuration(file: str = ''):
global configuration_block
try:
lines = open(file, 'r')
for line in lines:
line = line.strip()
# Skip empty lines
if not line:
continue
if line == "\n":
continue
# Set current configuration block
parse_configuration_block(line)
# Skip None block
if configuration_block is None:
continue
# Parse PORTS
elif configuration_block==b_ports:
l_ports.append(line)
# Parse FILES
elif configuration_block==b_files:
l_files.append(line)
# Parse PASSWORDS
elif configuration_block==b_passwords:
l_passwords.append(line)
except Exception as e:
print('Error while reading configuration file: '+str(e))
sys.exit(1)
# Print module XML from STDOUT
def print_xml_module(m_name: str = '', m_type: str = '', m_desc: str = '', m_value: str = ''):
module_values = {
'name' : m_name,
'type' : m_type,
'desc' : m_desc,
'value' : m_value,
'module_group' : modules_group
}
ppt.print_module(ppt.init_module(module_values), True)
###
# MAIN
##################
# Parse arguments
args = parser.parse_args()
check_selinux=args.check_selinux
check_ssh_root_access=args.check_ssh_root_access
check_ssh_root_keys=args.check_ssh_root_keys
check_ports=args.check_ports
check_files=args.check_files
check_passwords=args.check_passwords
integrity_file=args.integrity_file
if getattr(args, 'conf', None) is not None:
parse_configuration(args.conf)
# Check selinux status
if check_selinux==1:
value = 0
desc = 'SELinux is disabled.'
try:
if 'SELinux status: enabled' in subprocess.check_output(['sestatus'], stderr=subprocess.STDOUT, text=True):
value = 1
desc = 'SELinux is enabled.'
except Exception:
value = 0
desc = 'Can not determine if SELinux is enabled.'
print_xml_module('SELinux status', 'generic_proc', desc, ppt.parse_str(value))
# Check if SSH allows root access
if check_ssh_root_access==1:
value = 1
desc = 'SSH does not allow root access.'
try:
with open('/etc/ssh/sshd_config', 'r') as ssh_config_file:
for line in ssh_config_file:
line = line.strip()
# Skip empty and commented lines
if not line:
continue
if line == "\n":
continue
if line[0] == "#":
continue
option, val = line.split(maxsplit=1)
if option == 'PermitRootLogin':
if val.lower() != 'no':
value = 0
desc = 'SSH config allows root access.'
break
except FileNotFoundError:
value = 0
desc = 'Can not read /etc/ssh/sshd_config to check if root access allowed.'
except Exception:
value = 0
desc = 'Can not determine if SSH root access is allowed.'
print_xml_module('SSH root access status', 'generic_proc', desc, ppt.parse_str(value))
# Check if /root has SSH keys
if check_ssh_root_keys==1:
value = 1
desc = 'SSH root keys not found.'
try:
ssh_keys = {'private': [], 'public': []}
for root, dirs, files in os.walk('/root/.ssh'):
for filename in files:
file_path = os.path.join(root, filename)
with open(file_path, 'r') as file:
content = file.read()
if '-----BEGIN RSA PRIVATE KEY-----' in content and '-----END RSA PRIVATE KEY-----' in content:
ssh_keys['private'].append(file_path)
elif 'ssh-rsa' in content and filename != 'knwon_hosts' and filename != 'authorized_keys':
ssh_keys['public'].append(file_path)
if len(ssh_keys['private']) > 0 or len(ssh_keys['public']) > 0:
value = 0
desc = 'SSH root keys found:\n'+'\n'.join(ssh_keys['private'])+'\n'.join(ssh_keys['public'])
except Exception:
value = 0
desc = 'Can not determine if SSH root keys exist.'
print_xml_module('SSH root keys status', 'generic_proc', desc, ppt.parse_str(value))
# Check authorized ports
if check_ports==1:
value = 1
desc = 'No unauthorized ports found.'
# Create unique check ports list
l_ports = list(set(l_ports))
open_ports = []
not_allowed_ports = []
try:
for net_tcp_file in ['/proc/net/tcp', '/proc/net/tcp6']:
with open(net_tcp_file, 'r') as tcp_file:
# Skip the first line (header line)
next(tcp_file)
for line in tcp_file:
parts = line.strip().split()
if len(parts) >= 12:
local_address = parts[1]
local_port = int(local_address.split(':')[1], 16)
state = parts[3]
# Check if the connection is in state 0A (listening)
if state == "0A":
open_ports.append(local_port)
# Create unique ports list
open_ports = list(set(open_ports))
except Exception:
pass
for port in open_ports:
if ppt.parse_int(port) not in l_ports:
not_allowed_ports.append(port)
if len(not_allowed_ports) > 0:
value = 0
desc = 'Unauthorized ports found:\n'+'\n'.join(not_allowed_ports)
print_xml_module('Authorized ports status', 'generic_proc', desc, ppt.parse_str(value))
# Check files integrity
if check_files==1:
value = 1
desc = 'No changed files found.'
# Create unique check files list
l_files = list(set(l_files))
# Check if integrity file can be read and written
can_check_files=False
if os.access(integrity_file, os.R_OK | os.W_OK):
can_check_files=True
else:
# If integrity file doesn't exist, create it
if not os.path.exists(integrity_file):
try:
with open(integrity_file, 'w') as f:
can_check_files=True
except Exception:
value=0
desc='Integrity check file can not be created: '+integrity_file
else:
value=0
desc='Integrity check file can not be read or written: '+integrity_file
if can_check_files:
# Read integrity file content
integrity = {}
with open(integrity_file, "r") as f:
lines = f.read().splitlines()
for line in lines:
if len(line.strip()) < 1:
continue
else:
option, value = line.strip().split('=', maxsplit=1)
integrity[option.strip()] = value.strip()
# Check each file integrity
errored_files = []
no_integrity_files = []
for file in l_files:
file_key = ppt.generate_md5(file)
try:
with open(file, 'rb') as f:
md5 = hashlib.md5()
# Read the file in chunks to avoid loading the entire file into memory
for chunk in iter(lambda: f.read(4096), b''):
md5.update(chunk)
file_md5 = md5.hexdigest()
if file_key in integrity:
if integrity[file_key] != file_md5:
no_integrity_files.append(file)
integrity[file_key] = file_md5
except Exception:
errored_files.append(file)
# Prepare new integrity file content
integrity_content = ''
for key, val in integrity.items():
integrity_content = integrity_content+key+'='+val+'\n'
# Overwrite integrity file content
with open(integrity_file, "w") as f:
f.write(integrity_content)
# Check module status
if len(no_integrity_files) > 0:
value = 0
desc = 'Changed files found:\n'+'\n'.join(no_integrity_files)
if len(errored_files) > 0:
value = 0
desc = desc + '\nUnable to check integrity of some files:\n'+'\n'.join(errored_files)
print_xml_module('Files check status', 'generic_proc', desc, ppt.parse_str(value))
# Check weak passwords
if check_passwords==1:
value = 1
desc = 'No insecure passwords found.'
# Create unique check passwords list
l_passwords = list(set(l_passwords))
insecure_users = []
try:
with open('/etc/shadow', 'r') as shadow_file:
for line in shadow_file:
username, password_hash, *_ = line.strip().split(':')
# Skip users with no password hash or system users
if password_hash != "*" and username not in ["nobody", "root", "daemon"]:
for weak_password in l_passwords:
weak_password_hash = crypt.crypt(weak_password, password_hash[:2])
if weak_password_hash == password_hash:
insecure_users.append(username)
break
except FileNotFoundError:
value = 0
desc = 'Can not read /etc/shadow to check passwords.'
except Exception:
value = 0
desc = 'Can not determine if passwords are strong enough.'
if len(insecure_users) > 0:
value = 0
desc = 'Users with insecure passwords found:\n'+'\n'.join(insecure_users)
print_xml_module('Insecure passwords status', 'generic_proc', desc, ppt.parse_str(value))