Merge branch 'ent-11756-plugintools-python' into 'develop'

added files modules,agents,transfer,http,general,discovery

See merge request artica/pandorafms!6283
This commit is contained in:
Alejandro Sánchez 2023-08-22 08:08:56 +00:00
commit c664af40b2
12 changed files with 2913 additions and 0 deletions

View File

@ -0,0 +1,84 @@
# Python: module plugintools for PandoraFMS Developers
pandoraPluginTools is a library that aims to help the creation of scripts and their integration in Pandora FMS.
[PluginTools Reference Documentation](https://pandorafms.com/guides/public/books/plugintools)
The package includes the following modules. Each one has different functions that facilitate and automate the data integration in Pandora FMS:
**general**
Module containing general purpose functions, useful in the creation of plugins for PandoraFMS.
**threads**
Module containing threading purpose functions, useful to run parallel functions.
**agents**
Module that contains functions oriented to the creation of Pandora FMS agents
**modules**
Module that contains functions oriented to the creation of Pandora FMS modules.
**transfer**
Module containing functions oriented to file transfer and data sending to Pandora FMS server.
**discovery**
Module containing functions oriented to the creation of Pandora FMS discovery plugins.
**http**
Module containing functions oriented to HTTP API calls.
## Example
``` python
import pandoraPluginTools as ppt
## Define agent
server_name = "WIN-SERV"
agent=ppt.init_agent({
"agent_name" : ppt.generate_md5(server_name),
"agent_alias" : server_name,
"description" : "Default Windows server"
})
## Define modules
modules=[]
data = 10
modules.append({
"name" : "CPU usage",
"type" : "generic_data",
"value": data,
"desc" : "Percentage of CPU utilization",
"unit" : "%"
})
## Generate and transfer XML
xml_content = ppt.print_agent(agent, modules)
xml_file = ppt.write_xml(xml_content, agent["agent_name"])
ppt.transfer_xml(
xml_file,
transfer_mode="tentacle",
tentacle_ip="192.168.1.20",
tentacle_port="41121",
)
```
The package has the following dependencies:
- Hashlib
- datetime.datetime
- hashlib
- json
- os
- print_agent
- print_log_module
- print_module
- queue.Queue
- requests.auth.HTTPBasicAuth
- requests.auth.HTTPDigestAuth
- requests.sessions.Session
- requests_ntlm.HttpNtlmAuth
- shutil
- subprocess.Popen
- sys
- threading.Thread

View File

@ -0,0 +1,10 @@
from .general import *
from .output import *
from .encryption import *
from .threads import *
from .agents import *
from .modules import *
from .transfer import *
from .discovery import *
from .http import *
from .snmp import *

View File

@ -0,0 +1,409 @@
import sys
import os
####
# Define global variables dict, used in functions as default values.
# Its values can be changed.
#########################################################################################
_GLOBAL_VARIABLES = {
'agents_group_name' : '',
'interval' : 300
}
####
# Define some global variables
#########################################################################################
_WINDOWS = os.name == "nt" or os.name == "ce"
_LINUX = sys.platform.startswith("linux")
_MACOS = sys.platform.startswith("darwin")
_OSX = _MACOS # deprecated alias
_FREEBSD = sys.platform.startswith("freebsd")
_OPENBSD = sys.platform.startswith("openbsd")
_NETBSD = sys.platform.startswith("netbsd")
_BSD = _FREEBSD or _OPENBSD or _NETBSD
_SUNOS = sys.platform.startswith(("sunos", "solaris"))
_AIX = sys.platform.startswith("aix")
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Prints any list, dict, string, float or integer as a json
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Set a global variable with the specified name and assigns a value to it.
#########################################################################################
def set_global_variable(
variable_name: str = "",
value = None
)-> None:
"""
Sets the value of a global variable in the '_GLOBAL_VARIABLES' dictionary.
Args:
variable_name (str): Name of the variable to set.
value (any): Value to assign to the variable.
Returns:
None
"""
from .general import set_dict_key_value
set_dict_key_value(_GLOBAL_VARIABLES, variable_name, value)
####
# Get a global variable with the specified name.
#########################################################################################
def get_global_variable(
variable_name: str = ""
)-> None:
"""
Gets the value of a global variable in the '_GLOBAL_VARIABLES' dictionary.
Args:
variable_name (str): Name of the variable to set.
Returns:
None
"""
from .general import get_dict_key_value
get_dict_key_value(_GLOBAL_VARIABLES, variable_name)
####
# Agent class
#########################################################################################
class Agent:
"""
Basic agent class. Requires agent parameters (config {dictionary})
and module definition (modules_def [list of dictionaries])
"""
def __init__(
self,
config: dict = {},
modules_def: list = [],
log_modules_def: list = []
):
self.modules_def = []
self.added_modules = []
self.log_modules_def = []
self.config = init_agent(config)
for module in modules_def:
self.add_module(module)
for log_module in log_modules_def:
self.add_log_module(log_module)
def update_config(
self,
config: dict = {}
)-> None:
'''
Update the configuration settings with new values.
Args:
config (dict): A dictionary containing configuration keys and their new values.
Returns:
None
'''
for key, value in config.items():
if key in self.config:
self.config[key] = value
def get_config(
self
) -> dict:
'''
Retrieve the current configuration settings.
Returns:
dict: A dictionary containing the current configuration settings.
'''
return self.config
def add_module(
self,
module: dict = {}
)-> None:
'''
Add a new module to the list of modules.
Args:
module (dict): A dictionary containing module information.
Returns:
None
'''
from .general import generate_md5
from .modules import init_module
if "name" in module and type(module["name"]) == str and len(module["name"].strip()) > 0:
self.modules_def.append(init_module(module))
self.added_modules.append(generate_md5(module["name"]))
def del_module(
self,
module_name: str = ""
)-> None:
'''
Delete a module based on its name.
Args:
module_name (str): The name of the module to be deleted.
Returns:
None
'''
from .general import generate_md5
if len(module_name.strip()) > 0:
try:
module_id = self.added_modules.index(generate_md5(module_name))
except:
module_id = None
if module_id is not None:
self.added_modules.pop(module_id)
self.modules_def.pop(module_id)
def update_module(
self,
module_name: str = "",
module: dict = {}
)-> None:
'''
Update a module based on its name.
Args:
module_name (str): The name of the module to be updated.
module (dict): A dictionary containing updated module information.
Returns:
None
'''
module_def = self.get_module(module_name)
if module_def:
if "name" not in module:
module["name"] = module_name
module_def.update(module)
self.del_module(module_name)
self.add_module(module_def)
def get_module(
self,
module_name: str = ""
) -> dict:
'''
Retrieve module information based on its name.
Args:
module_name (str): The name of the module to retrieve.
Returns:
dict: A dictionary containing module information if found, otherwise an empty dictionary.
'''
from .general import generate_md5
if len(module_name.strip()) > 0:
try:
module_id = self.added_modules.index(generate_md5(module_name))
except:
module_id = None
if module_id is not None:
return self.modules_def[module_id]
else:
return {}
def get_modules_def(
self
) -> dict:
'''
Retrieve the definitions of all added modules.
Returns:
dict: A dictionary containing the definitions of all added modules.
'''
return self.modules_def
def add_log_module(
self,
log_module: dict = {}
)-> None:
'''
Add a new log module to the list of log modules.
Args:
log_module (dict): A dictionary containing log module information.
Returns:
None
'''
from .modules import init_log_module
if "source" in log_module and type(log_module["source"]) == str and len(log_module["source"].strip()) > 0:
self.log_modules_def.append(init_log_module(log_module))
def get_log_modules_def(
self
) -> dict:
'''
Retrieve the definitions of all added log modules.
Returns:
dict: A dictionary containing the definitions of all added log modules.
'''
return self.log_modules_def
def print_xml(
self,
print_flag: bool = False
) -> str:
'''
Generate and optionally print the XML representation of the agent.
Args:
print_flag (bool): A flag indicating whether to print the XML representation.
Returns:
str: The XML representation of the agent.
'''
return print_agent(self.get_config(), self.get_modules_def(), self.get_log_modules_def(), print_flag)
####
# Gets system OS name
#########################################################################################
def get_os() -> str:
"""
Gets system OS name
Returns:
str: OS name.
"""
os = "Other"
if _WINDOWS:
os = "Windows"
if _LINUX:
os = "Linux"
if _MACOS or _OSX:
os = "MacOS"
if _FREEBSD or _OPENBSD or _NETBSD or _BSD:
os = "BSD"
if _SUNOS:
os = "Solaris"
if _AIX:
os = "AIX"
return os
####
# Init agent template
#########################################################################################
def init_agent(
default_values: dict = {}
) -> dict:
"""
Initialize an agent template with default values.
Args:
default_values (dict): A dictionary containing custom default values for the agent template.
Returns:
dict: A dictionary representing the agent template with default and custom values.
"""
from .general import now
agent = {
"agent_name" : "",
"agent_alias" : "",
"parent_agent_name" : "",
"description" : "",
"version" : "",
"os_name" : "",
"os_version" : "",
"timestamp" : now(),
"address" : "",
"group" : _GLOBAL_VARIABLES['agents_group_name'],
"interval" : _GLOBAL_VARIABLES['interval'],
"agent_mode" : "1"
}
for key, value in default_values.items():
if key in agent:
agent[key] = value
return agent
####
# Prints agent XML. Requires agent conf (dict) and modules (list) as arguments.
#########################################################################################
def print_agent(
agent: dict = None,
modules: list = [],
log_modules: list = [],
print_flag: bool = False
) -> str:
"""
Print the XML representation of an agent.
Args:
agent (dict): A dictionary containing agent configuration.
modules (list): A list of dictionaries representing modules.
log_modules (list): A list of dictionaries representing log modules.
print_flag (bool): A flag indicating whether to print the XML representation.
Returns:
str: The XML representation of the agent.
"""
from .output import print_stdout
from .modules import print_module,print_log_module
xml = ""
data_file = None
if agent is not None:
header = "<?xml version='1.0' encoding='UTF-8'?>\n"
header += "<agent_data"
for dato in agent:
header += " " + str(dato) + "='" + str(agent[dato]) + "'"
header += ">\n"
xml = header
for module in modules:
modules_xml = print_module(module)
xml += str(modules_xml)
for log_module in log_modules:
modules_xml = print_log_module(log_module)
xml += str(modules_xml)
xml += "</agent_data>"
if print_flag:
print_stdout(xml)
return xml

View File

@ -0,0 +1,225 @@
import sys
import json
####
# Define some global variables
#########################################################################################
_ERROR_LEVEL = 0
_SUMMARY = {}
_INFO = ""
_MONITORING_DATA = []
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Print the variable as a JSON-like representation for debugging purposes.
Args:
var (any): The variable to be printed.
print_errors (bool): A flag indicating whether to print errors during debugging.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Set error level to value
#########################################################################################
def set_disco_error_level(
value: int = 0
)-> None:
"""
Sets the error level to the specified value.
Args:
value (int, optional): The error level value. Default is 0.
"""
global _ERROR_LEVEL
_ERROR_LEVEL = value
####
# Set fixed value to summary dict
#########################################################################################
def set_disco_summary(
data: dict = {}
)-> None:
"""
Sets the disk summary data in the internal summary dictionary.
This function updates the summary dictionary with the provided disk summary data.
Args:
data (dict): A dictionary containing disk summary data.
Returns:
None
"""
global _SUMMARY
_SUMMARY = {}
####
# Set fixed value to summary key
#########################################################################################
def set_disco_summary_value(
key: str = "",
value = None
)-> None:
"""
Sets a fixed value for a key in the '_SUMMARY' dictionary.
Args:
key (str): Key to set the value for.
value (any): Value to assign to the key.
Returns:
None
"""
global _SUMMARY
_SUMMARY[key] = value
####
# Add value to summary key
#########################################################################################
def add_disco_summary_value(
key: str = "",
value = None
)-> None:
"""
Adds a value to a key in the 'SUMMARY' dictionary.
If the key already exists, the value will be incremented. Otherwise, a new key will be created.
Args:
key (str): Key to add the value to.
value (any): Value to add to the key.
Returns:
None
"""
global _SUMMARY
if key in _SUMMARY:
_SUMMARY[key] += value
else:
set_disco_summary_value(key, value)
####
# Set fixed value to info
#########################################################################################
def set_disco_info_value(
value: str = ""
)-> None:
"""
Sets a fixed value to the '_INFO' variable.
Args:
data (str, optional): The value to set in the '_INFO' variable. Default is an empty string.
Returns:
None
"""
global _INFO
_INFO = value
####
# Add data to info
#########################################################################################
def add_disco_info_value(
value: str = ""
)-> None:
"""
Adds data to the '_INFO' variable.
Args:
data (str, optional): The data to add to the '_INFO' variable. Default is an empty string.
Returns:
None
"""
global _INFO
_INFO += value
####
# Set fixed value to monitoring data
#########################################################################################
def set_disco_monitoring_data(
data: list = []
)-> None:
"""
Set the monitoring data for disk usage.
Args:
data (list): A list containing disk monitoring data.
Returns:
None
"""
global _MONITORING_DATA
_MONITORING_DATA = data
####
# Add value to monitoring data
#########################################################################################
def add_disco_monitoring_data(
data: dict = {}
)-> None:
"""
Add disk monitoring data to the global monitoring dataset.
Args:
data (dict): A dictionary containing disk monitoring data.
Returns:
None
"""
global _MONITORING_DATA
_MONITORING_DATA.append(data)
####
# Print JSON output and exit script
#########################################################################################
def disco_output()-> None:
"""
Prints the JSON output and exits the script.
The function uses the global variables '_ERROR_LEVEL', '_SUMMARY', '_INFO' and '_MONITORING_DATA'
to create the JSON output. It then prints the JSON string and exits the script with
the '_ERROR_LEVEL' as the exit code.
Returns:
None
"""
from .output import print_stdout
global _ERROR_LEVEL
global _SUMMARY
global _INFO
global _MONITORING_DATA
output={}
if _SUMMARY:
output["summary"] = _SUMMARY
if _INFO:
output["info"] = _INFO
if _MONITORING_DATA:
output["monitoring_data"] = _MONITORING_DATA
json_string = json.dumps(output)
print_stdout(json_string)
sys.exit(_ERROR_LEVEL)

View File

@ -0,0 +1,116 @@
try:
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
except ImportError as e:
import sys
from .output import print_stderr
print_stderr("ModuleNotFoundError: No module named 'pycryptodome'")
sys.exit(1)
import hashlib
import base64
import hmac
from binascii import unhexlify
####
# Define encription internal global variables.
#########################################################################################
_PASSWORD = "default_salt"
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Print the variable as a JSON-like representation for debugging purposes.
Args:
var (any): The variable to be printed.
print_errors (bool): A flag indicating whether to print errors during debugging.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Internal use only: Get AES cipher
#########################################################################################
def _get_cipher(
password: str = _PASSWORD
) -> AES:
'''
Internal use only: Get AES cipher for encryption and decryption.
Args:
password (str): The password used to derive the encryption key.
Returns:
AES: An AES cipher instance for encryption and decryption.
'''
key = b''
msg = password.encode('utf-8')
hash_obj = hmac.new(key, msg, hashlib.sha256)
hash_result = hash_obj.digest()
hash_base64 = base64.b64encode(hash_result)[:16].decode()
iv = b'0000000000000000'
return AES.new(hash_base64.encode(), AES.MODE_CBC, iv)
####
# Return encrypted string
#########################################################################################
def encrypt_AES(
str_to_encrypt: str = "",
password: str = _PASSWORD
) -> str:
'''
Encrypt a string using AES encryption.
Args:
str_to_encrypt (str): The string to be encrypted.
password (str): The password used to derive the encryption key.
Returns:
str: The encrypted string in base64 encoding.
'''
cipher = _get_cipher(password)
try:
msg_padded = pad(str_to_encrypt.encode(), AES.block_size, style='pkcs7')
cipher_text = cipher.encrypt(msg_padded)
b64str = base64.b64encode(cipher_text).decode()
except:
b64str = ''
return b64str
####
# Return decrypted string
#########################################################################################
def decrypt_AES(
str_to_decrypt: str = "",
password: str = _PASSWORD
) -> str:
'''
Decrypt an encrypted string using AES decryption.
Args:
str_to_decrypt (str): The encrypted string to be decrypted.
password (str): The password used to derive the encryption key.
Returns:
str: The decrypted string.
'''
cipher = _get_cipher(password)
try:
decrypted_str = unpad(cipher.decrypt(base64.b64decode(str_to_decrypt)), AES.block_size, style='pkcs7').decode().strip()
except:
decrypted_str = ''
return decrypted_str

View File

@ -0,0 +1,652 @@
import sys
from datetime import datetime
import hashlib
####
# Define some global variables
#########################################################################################
# Entity to character mapping. Contains a few tweaks to make it backward compatible with the previous safe_input implementation.
_ENT2CHR = {
'#x00': chr(0),
'#x01': chr(1),
'#x02': chr(2),
'#x03': chr(3),
'#x04': chr(4),
'#x05': chr(5),
'#x06': chr(6),
'#x07': chr(7),
'#x08': chr(8),
'#x09': chr(9),
'#x0a': chr(10),
'#x0b': chr(11),
'#x0c': chr(12),
'#x0d': chr(13),
'#x0e': chr(14),
'#x0f': chr(15),
'#x10': chr(16),
'#x11': chr(17),
'#x12': chr(18),
'#x13': chr(19),
'#x14': chr(20),
'#x15': chr(21),
'#x16': chr(22),
'#x17': chr(23),
'#x18': chr(24),
'#x19': chr(25),
'#x1a': chr(26),
'#x1b': chr(27),
'#x1c': chr(28),
'#x1d': chr(29),
'#x1e': chr(30),
'#x1f': chr(31),
'#x20': chr(32),
'quot': chr(34),
'amp': chr(38),
'#039': chr(39),
'#40': chr(40),
'#41': chr(41),
'lt': chr(60),
'gt': chr(62),
'#92': chr(92),
'#x80': chr(128),
'#x81': chr(129),
'#x82': chr(130),
'#x83': chr(131),
'#x84': chr(132),
'#x85': chr(133),
'#x86': chr(134),
'#x87': chr(135),
'#x88': chr(136),
'#x89': chr(137),
'#x8a': chr(138),
'#x8b': chr(139),
'#x8c': chr(140),
'#x8d': chr(141),
'#x8e': chr(142),
'#x8f': chr(143),
'#x90': chr(144),
'#x91': chr(145),
'#x92': chr(146),
'#x93': chr(147),
'#x94': chr(148),
'#x95': chr(149),
'#x96': chr(150),
'#x97': chr(151),
'#x98': chr(152),
'#x99': chr(153),
'#x9a': chr(154),
'#x9b': chr(155),
'#x9c': chr(156),
'#x9d': chr(157),
'#x9e': chr(158),
'#x9f': chr(159),
'#xa0': chr(160),
'#xa1': chr(161),
'#xa2': chr(162),
'#xa3': chr(163),
'#xa4': chr(164),
'#xa5': chr(165),
'#xa6': chr(166),
'#xa7': chr(167),
'#xa8': chr(168),
'#xa9': chr(169),
'#xaa': chr(170),
'#xab': chr(171),
'#xac': chr(172),
'#xad': chr(173),
'#xae': chr(174),
'#xaf': chr(175),
'#xb0': chr(176),
'#xb1': chr(177),
'#xb2': chr(178),
'#xb3': chr(179),
'#xb4': chr(180),
'#xb5': chr(181),
'#xb6': chr(182),
'#xb7': chr(183),
'#xb8': chr(184),
'#xb9': chr(185),
'#xba': chr(186),
'#xbb': chr(187),
'#xbc': chr(188),
'#xbd': chr(189),
'#xbe': chr(190),
'Aacute': chr(193),
'Auml': chr(196),
'Eacute': chr(201),
'Euml': chr(203),
'Iacute': chr(205),
'Iuml': chr(207),
'Ntilde': chr(209),
'Oacute': chr(211),
'Ouml': chr(214),
'Uacute': chr(218),
'Uuml': chr(220),
'aacute': chr(225),
'auml': chr(228),
'eacute': chr(233),
'euml': chr(235),
'iacute': chr(237),
'iuml': chr(239),
'ntilde': chr(241),
'oacute': chr(243),
'ouml': chr(246),
'uacute': chr(250),
'uuml': chr(252),
'OElig': chr(338),
'oelig': chr(339),
'Scaron': chr(352),
'scaron': chr(353),
'Yuml': chr(376),
'fnof': chr(402),
'circ': chr(710),
'tilde': chr(732),
'Alpha': chr(913),
'Beta': chr(914),
'Gamma': chr(915),
'Delta': chr(916),
'Epsilon': chr(917),
'Zeta': chr(918),
'Eta': chr(919),
'Theta': chr(920),
'Iota': chr(921),
'Kappa': chr(922),
'Lambda': chr(923),
'Mu': chr(924),
'Nu': chr(925),
'Xi': chr(926),
'Omicron': chr(927),
'Pi': chr(928),
'Rho': chr(929),
'Sigma': chr(931),
'Tau': chr(932),
'Upsilon': chr(933),
'Phi': chr(934),
'Chi': chr(935),
'Psi': chr(936),
'Omega': chr(937),
'alpha': chr(945),
'beta': chr(946),
'gamma': chr(947),
'delta': chr(948),
'epsilon': chr(949),
'zeta': chr(950),
'eta': chr(951),
'theta': chr(952),
'iota': chr(953),
'kappa': chr(954),
'lambda': chr(955),
'mu': chr(956),
'nu': chr(957),
'xi': chr(958),
'omicron': chr(959),
'pi': chr(960),
'rho': chr(961),
'sigmaf': chr(962),
'sigma': chr(963),
'tau': chr(964),
'upsilon': chr(965),
'phi': chr(966),
'chi': chr(967),
'psi': chr(968),
'omega': chr(969),
'thetasym': chr(977),
'upsih': chr(978),
'piv': chr(982),
'ensp': chr(8194),
'emsp': chr(8195),
'thinsp': chr(8201),
'zwnj': chr(8204),
'zwj': chr(8205),
'lrm': chr(8206),
'rlm': chr(8207),
'ndash': chr(8211),
'mdash': chr(8212),
'lsquo': chr(8216),
'rsquo': chr(8217),
'sbquo': chr(8218),
'ldquo': chr(8220),
'rdquo': chr(8221),
'bdquo': chr(8222),
'dagger': chr(8224),
'Dagger': chr(8225),
'bull': chr(8226),
'hellip': chr(8230),
'permil': chr(8240),
'prime': chr(8242),
'Prime': chr(8243),
'lsaquo': chr(8249),
'rsaquo': chr(8250),
'oline': chr(8254),
'frasl': chr(8260),
'euro': chr(8364),
'image': chr(8465),
'weierp': chr(8472),
'real': chr(8476),
'trade': chr(8482),
'alefsym': chr(8501),
'larr': chr(8592),
'uarr': chr(8593),
'rarr': chr(8594),
'darr': chr(8595),
'harr': chr(8596),
'crarr': chr(8629),
'lArr': chr(8656),
'uArr': chr(8657),
'rArr': chr(8658),
'dArr': chr(8659),
'hArr': chr(8660),
'forall': chr(8704),
'part': chr(8706),
'exist': chr(8707),
'empty': chr(8709),
'nabla': chr(8711),
'isin': chr(8712),
'notin': chr(8713),
'ni': chr(8715),
'prod': chr(8719),
'sum': chr(8721),
'minus': chr(8722),
'lowast': chr(8727),
'radic': chr(8730),
'prop': chr(8733),
'infin': chr(8734),
'ang': chr(8736),
'and': chr(8743),
'or': chr(8744),
'cap': chr(8745),
'cup': chr(8746),
'int': chr(8747),
'there4': chr(8756),
'sim': chr(8764),
'cong': chr(8773),
'asymp': chr(8776),
'ne': chr(8800),
'equiv': chr(8801),
'le': chr(8804),
'ge': chr(8805),
'sub': chr(8834),
'sup': chr(8835),
'nsub': chr(8836),
'sube': chr(8838),
'supe': chr(8839),
'oplus': chr(8853),
'otimes': chr(8855),
'perp': chr(8869),
'sdot': chr(8901),
'lceil': chr(8968),
'rceil': chr(8969),
'lfloor': chr(8970),
'rfloor': chr(8971),
'lang': chr(9001),
'rang': chr(9002),
'loz': chr(9674),
'spades': chr(9824),
'clubs': chr(9827),
'hearts': chr(9829),
'diams': chr(9830),
}
# Construct the character to entity mapping.
_CHR2ENT = {v: "&" + k + ";" for k, v in _ENT2CHR.items()}
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Print the variable as a JSON-like representation for debugging purposes.
Args:
var (any): The variable to be printed.
print_errors (bool): A flag indicating whether to print errors during debugging.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Convert the input_string encoded in html entity to clear char string.
#########################################################################################
def safe_input(
input_string: str = ""
) -> str:
'''
Convert an input string encoded in HTML entities to a clear character string.
Args:
input_string (str): The input string encoded in HTML entities.
Returns:
str: The decoded clear character string.
'''
if not input_string:
return ""
return "".join(_CHR2ENT.get(char, char) for char in input_string)
####
# Convert the html entities to input_string encoded to rebuild char string.
#########################################################################################
def safe_output(
input_string: str = ""
) -> str:
'''
Convert HTML entities back to their corresponding characters in the input string.
Args:
input_string (str): The input string containing HTML entities.
Returns:
str: The decoded clear character string.
'''
if not input_string:
return ""
for char, entity in _CHR2ENT.items():
input_string = input_string.replace(entity, char)
return input_string
####
# Assign to a key in a dict a given value.
#########################################################################################
def set_dict_key_value(
input_dict: dict = {},
input_key: str = "",
input_value = None
)-> None:
"""
Assign a given value to a specified key in a dictionary.
Args:
input_dict (dict): The dictionary to which the value will be assigned.
input_key (str): The key in the dictionary to which the value will be assigned.
input_value (any): The value to be assigned to the specified key.
Returns:
None
"""
key = input_key.strip()
if len(key) > 0:
input_dict[key] = input_value
####
# Return the value of a key in a given dict.
#########################################################################################
def get_dict_key_value(
input_dict: dict = {},
input_key: str = ""
)-> None:
"""
Return the value associated with a given key in a provided dictionary.
Args:
input_dict (dict): The dictionary to search for the key-value pair.
input_key (str): The key to look up in the dictionary.
Returns:
The value associated with the specified key, or None if the key is not found.
"""
key = input_key.strip()
if key in input_dict:
return input_dict[key]
else:
return None
####
# Return MD5 hash string.
#########################################################################################
def generate_md5(
input_string: str = ""
) -> str:
"""
Generates an MD5 hash for the given input string.
Args:
input_string (str): The string for which the MD5 hash will be generated.
Returns:
str: The MD5 hash of the input string as a hexadecimal string.
"""
try:
md5_hash = hashlib.md5(input_string.encode()).hexdigest()
except:
md5_hash = ""
return md5_hash
####
# Returns or print current time in date format or utimestamp.
#########################################################################################
def now(
utimestamp: bool = False,
print_flag: bool = False
) -> str:
"""
Get the current time in the specified format or as a Unix timestamp.
Args:
utimestamp (bool): Set to True to get the Unix timestamp (epoch time).
print_flag (bool): Set to True to print the time to standard output.
Returns:
str: The current time in the desired format or as a Unix timestamp.
"""
from .output import print_stdout
today = datetime.today()
if utimestamp:
time = datetime.timestamp(today)
else:
time = today.strftime('%Y/%m/%d %H:%M:%S')
if print_flag:
print_stdout(time)
return time
####
# Translate macros in string from a dict.
#########################################################################################
def translate_macros(
macro_dic: dict = {},
data: str = ""
) -> str:
"""
Replace macros in the input string with their corresponding values.
Args:
macro_dic (dict): A dictionary containing macro names and their corresponding values.
data (str): The input string in which macros should be replaced.
Returns:
str: The input string with macros replaced by their values.
"""
for macro_name, macro_value in macro_dic.items():
data = data.replace(macro_name, macro_value)
return data
####
# Parse configuration file line by line based on separator and return dict.
#########################################################################################
def parse_configuration(
file: str = "/etc/pandora/pandora_server.conf",
separator: str = " ",
default_values: dict = {}
) -> dict:
"""
Parse a configuration file and return its data as a dictionary.
Args:
file (str): The path to the configuration file. Defaults to "/etc/pandora/pandora_server.conf".
separator (str, optional): The separator between option and value. Defaults to " ".
default_values (dict, optional): A dictionary of default values. Defaults to an empty dictionary.
Returns:
dict: A dictionary containing all keys and values from the configuration file.
"""
from .output import print_stderr
config = {}
try:
with open (file, "r") as conf:
lines = conf.read().splitlines()
for line in lines:
if line.strip().startswith("#") or len(line.strip()) < 1 :
continue
else:
option, value = line.strip().split(separator, maxsplit=1)
config[option.strip()] = value.strip()
except Exception as e:
print_stderr(f"{type(e).__name__}: {e}")
for option, value in default_values.items():
if option.strip() not in config:
config[option.strip()] = value.strip()
return config
####
# Parse csv file line by line and return list.
#########################################################################################
def parse_csv_file(
file: str = "",
separator: str = ';',
count_parameters: int = 0,
print_errors: bool = False
) -> list:
"""
Parse a CSV configuration file and return its data in a list.
Args:
file (str): The path to the CSV configuration file.
separator (str, optional): The separator between values in the CSV. Defaults to ";".
count_parameters (int, optional): The minimum number of parameters each line should have. Defaults to 0.
print_errors (bool, optional): Set to True to print errors for lines with insufficient parameters. Defaults to False.
Returns:
list: A list containing lists of values for each line in the CSV.
"""
from .output import print_stderr
csv_arr = []
try:
with open (file, "r") as csv:
lines = csv.read().splitlines()
for line in lines:
if line.strip().startswith("#") or len(line.strip()) < 1 :
continue
else:
value = line.strip().split(separator)
if len(value) >= count_parameters:
csv_arr.append(value)
elif print_errors==True:
print_stderr(f'Csv line: {line} does not match minimun parameter defined: {count_parameters}')
except Exception as e:
print_stderr(f"{type(e).__name__}: {e}")
return csv_arr
####
# Parse given variable to integer.
#########################################################################################
def parse_int(
var = None
) -> int:
"""
Parse given variable to integer.
Args:
var (any): The variable to be parsed as an integer.
Returns:
int: The parsed integer value. If parsing fails, returns 0.
"""
try:
return int(var)
except:
return 0
####
# Parse given variable to float.
#########################################################################################
def parse_float(
var = None
) -> float:
"""
Parse given variable to float.
Args:
var (any): The variable to be parsed as an float.
Returns:
float: The parsed float value. If parsing fails, returns 0.
"""
try:
return float(var)
except:
return 0
####
# Parse given variable to string.
#########################################################################################
def parse_str(
var = None
) -> str:
"""
Parse given variable to string.
Args:
var (any): The variable to be parsed as an string.
Returns:
str: The parsed string value. If parsing fails, returns "".
"""
try:
return str(var)
except:
return ""
####
# Parse given variable to bool.
#########################################################################################
def parse_bool(
var = None
) -> bool:
"""
Parse given variable to bool.
Args:
var (any): The variable to be parsed as an bool.
Returns:
bool: The parsed bool value. If parsing fails, returns False.
"""
try:
return bool(var)
except:
return False

View File

@ -0,0 +1,113 @@
import urllib3
import warnings
from requests.sessions import Session
from requests_ntlm import HttpNtlmAuth
from requests.auth import HTTPBasicAuth
from requests.auth import HTTPDigestAuth
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Print the provided variable as JSON, supporting various data types.
Args:
var (any): The variable to be printed as JSON.
print_errors (bool): Set to True to print errors encountered during printing.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Internal: Auth URL session
#########################################################################################
def _auth_call(
session = None,
authtype: str = "basic",
user: str = "",
passw: str = ""
)-> None:
"""
Perform authentication for URL requests using various authentication types.
Args:
session (object, optional): The request Session() object. Defaults to None.
authtype (str, optional): The authentication type. Supported values: 'ntlm', 'basic', or 'digest'. Defaults to 'basic'.
user (str, optional): The authentication user. Defaults to an empty string.
passw (str, optional): The authentication password. Defaults to an empty string.
Returns:
None
"""
if session is not None:
if authtype == 'ntlm':
session.auth = HttpNtlmAuth(user, passw)
elif authtype == 'basic':
session.auth = HTTPBasicAuth(user, passw)
elif authtype == 'digest':
session.auth = HTTPDigestAuth(user, passw)
####
# Call URL and return output
#########################################################################################
def call_url(
url: str = "",
authtype: str = "basic",
user: str = "",
passw: str = "",
timeout: int = 1,
verify: bool = True,
print_errors: bool = False
) -> str:
"""
Call a URL and return its contents.
Args:
url (str): The URL to call.
authtype (str, optional): The authentication type. Supported values: 'ntlm', 'basic', 'digest'. Defaults to 'basic'.
user (str, optional): The authentication user. Defaults to an empty string.
passw (str, optional): The authentication password. Defaults to an empty string.
timeout (int, optional): The session timeout in seconds. Defaults to 1.
print_errors (bool, optional): Set to True to print errors encountered during the call. Defaults to False.
Returns:
str: The output from the URL call.
"""
from .output import print_stderr
if url == "":
if print_errors:
print_stderr("Error: URL not provided")
return None
else:
# using with so we make sure the session is closed even when exceptions are encountered
with Session() as session:
if authtype is not None:
_auth_call(session, authtype, user, passw)
output = ""
try:
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=urllib3.exceptions.InsecureRequestWarning)
response = session.get(url, timeout=timeout, verify=verify)
response.raise_for_status() # Raise an exception for non-2xx responses
return response.content
except requests.exceptions.Timeout:
if print_errors:
print_stderr("Error: Request timed out")
except requests.exceptions.RequestException as e:
if print_errors:
print_stderr(f"RequestException:\t{e}")
except ValueError:
if print_errors:
print_stderr("Error: URL format not valid (example http://myserver/page.php)")
return None

View File

@ -0,0 +1,330 @@
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Print the provided variable as JSON format, supporting various data types.
Args:
var (any, optional): The variable to be printed as JSON. Defaults to an empty string.
print_errors (bool, optional): Set to True to print errors encountered during printing. Defaults to False.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Init module template
#########################################################################################
def init_module(
default_values: dict = {}
) -> dict:
"""
Initializes a module template with default values.
Args:
default_values (dict, optional): Dictionary containing default values to override template values. Defaults to an empty dictionary.
Returns:
dict: Dictionary representing the module template with default values.
"""
module = {
"name" : None,
"type" : "generic_data_string",
"value" : "0",
"desc" : "",
"unit" : "",
"interval" : "",
"tags" : "",
"module_group" : "",
"module_parent" : "",
"min_warning" : "",
"min_warning_forced" : "",
"max_warning" : "",
"max_warning_forced" : "",
"min_critical" : "",
"min_critical_forced" : "",
"max_critical" : "",
"max_critical_forced" : "",
"str_warning" : "",
"str_warning_forced" : "",
"str_critical" : "",
"str_critical_forced" : "",
"critical_inverse" : "",
"warning_inverse" : "",
"max" : "",
"min" : "",
"post_process" : "",
"disabled" : "",
"min_ff_event" : "",
"status" : "",
"timestamp" : "",
"custom_id" : "",
"critical_instructions" : "",
"warning_instructions" : "",
"unknown_instructions" : "",
"quiet" : "",
"module_ff_interval" : "",
"crontab" : "",
"min_ff_event_normal" : "",
"min_ff_event_warning" : "",
"min_ff_event_critical" : "",
"ff_type" : "",
"ff_timeout" : "",
"each_ff" : "",
"module_parent_unlink" : "",
"alert" : []
}
for key, value in default_values.items():
if key in module:
module[key] = value
return module
####
# Returns module in XML format. Accepts only {dict}
#########################################################################################
def print_module(
module: dict = None,
print_flag: bool = False
) -> str:
"""
Returns module in XML format. Accepts only {dict}.
Args:
module (dict, optional): Dictionary containing module data. Defaults to None.
print_flag (bool, optional): Flag to print the module XML to STDOUT. Defaults to False.
Returns:
str: Module data in XML format.
"""
from .output import print_stdout
module_xml = ""
if module is not None:
data = dict(module)
module_xml = ("<module>\n"
"\t<name><![CDATA[" + str(data["name"]) + "]]></name>\n"
"\t<type>" + str(data["type"]) + "</type>\n"
)
if type(data["type"]) is not str and "string" not in data["type"]: #### Strip spaces if module not generic_data_string
data["value"] = data["value"].strip()
if isinstance(data["value"], list): # Checks if value is a list
module_xml += "\t<datalist>\n"
for value in data["value"]:
if type(value) is dict and "value" in value:
module_xml += "\t<data>\n"
module_xml += "\t\t<value><![CDATA[" + str(value["value"]) + "]]></value>\n"
if "timestamp" in value:
module_xml += "\t\t<timestamp><![CDATA[" + str(value["timestamp"]) + "]]></timestamp>\n"
module_xml += "\t</data>\n"
module_xml += "\t</datalist>\n"
else:
module_xml += "\t<data><![CDATA[" + str(data["value"]) + "]]></data>\n"
if "desc" in data and len(str(data["desc"]).strip()) > 0:
module_xml += "\t<description><![CDATA[" + str(data["desc"]) + "]]></description>\n"
if "unit" in data and len(str(data["unit"]).strip()) > 0:
module_xml += "\t<unit><![CDATA[" + str(data["unit"]) + "]]></unit>\n"
if "interval" in data and len(str(data["interval"]).strip()) > 0:
module_xml += "\t<module_interval><![CDATA[" + str(data["interval"]) + "]]></module_interval>\n"
if "tags" in data and len(str(data["tags"]).strip()) > 0:
module_xml += "\t<tags>" + str(data["tags"]) + "</tags>\n"
if "module_group" in data and len(str(data["module_group"]).strip()) > 0:
module_xml += "\t<module_group>" + str(data["module_group"]) + "</module_group>\n"
if "module_parent" in data and len(str(data["module_parent"]).strip()) > 0:
module_xml += "\t<module_parent>" + str(data["module_parent"]) + "</module_parent>\n"
if "min_warning" in data and len(str(data["min_warning"]).strip()) > 0:
module_xml += "\t<min_warning><![CDATA[" + str(data["min_warning"]) + "]]></min_warning>\n"
if "min_warning_forced" in data and len(str(data["min_warning_forced"]).strip()) > 0:
module_xml += "\t<min_warning_forced><![CDATA[" + str(data["min_warning_forced"]) + "]]></min_warning_forced>\n"
if "max_warning" in data and len(str(data["max_warning"]).strip()) > 0:
module_xml += "\t<max_warning><![CDATA[" + str(data["max_warning"]) + "]]></max_warning>\n"
if "max_warning_forced" in data and len(str(data["max_warning_forced"]).strip()) > 0:
module_xml += "\t<max_warning_forced><![CDATA[" + str(data["max_warning_forced"]) + "]]></max_warning_forced>\n"
if "min_critical" in data and len(str(data["min_critical"]).strip()) > 0:
module_xml += "\t<min_critical><![CDATA[" + str(data["min_critical"]) + "]]></min_critical>\n"
if "min_critical_forced" in data and len(str(data["min_critical_forced"]).strip()) > 0:
module_xml += "\t<min_critical_forced><![CDATA[" + str(data["min_critical_forced"]) + "]]></min_critical_forced>\n"
if "max_critical" in data and len(str(data["max_critical"]).strip()) > 0:
module_xml += "\t<max_critical><![CDATA[" + str(data["max_critical"]) + "]]></max_critical>\n"
if "max_critical_forced" in data and len(str(data["max_critical_forced"]).strip()) > 0:
module_xml += "\t<max_critical_forced><![CDATA[" + str(data["max_critical_forced"]) + "]]></max_critical_forced>\n"
if "str_warning" in data and len(str(data["str_warning"]).strip()) > 0:
module_xml += "\t<str_warning><![CDATA[" + str(data["str_warning"]) + "]]></str_warning>\n"
if "str_warning_forced" in data and len(str(data["str_warning_forced"]).strip()) > 0:
module_xml += "\t<str_warning_forced><![CDATA[" + str(data["str_warning_forced"]) + "]]></str_warning_forced>\n"
if "str_critical" in data and len(str(data["str_critical"]).strip()) > 0:
module_xml += "\t<str_critical><![CDATA[" + str(data["str_critical"]) + "]]></str_critical>\n"
if "str_critical_forced" in data and len(str(data["str_critical_forced"]).strip()) > 0:
module_xml += "\t<str_critical_forced><![CDATA[" + str(data["str_critical_forced"]) + "]]></str_critical_forced>\n"
if "critical_inverse" in data and len(str(data["critical_inverse"]).strip()) > 0:
module_xml += "\t<critical_inverse><![CDATA[" + str(data["critical_inverse"]) + "]]></critical_inverse>\n"
if "warning_inverse" in data and len(str(data["warning_inverse"]).strip()) > 0:
module_xml += "\t<warning_inverse><![CDATA[" + str(data["warning_inverse"]) + "]]></warning_inverse>\n"
if "max" in data and len(str(data["max"]).strip()) > 0:
module_xml += "\t<max><![CDATA[" + str(data["max"]) + "]]></max>\n"
if "min" in data and len(str(data["min"]).strip()) > 0:
module_xml += "\t<min><![CDATA[" + str(data["min"]) + "]]></min>\n"
if "post_process" in data and len(str(data["post_process"]).strip()) > 0:
module_xml += "\t<post_process><![CDATA[" + str(data["post_process"]) + "]]></post_process>\n"
if "disabled" in data and len(str(data["disabled"]).strip()) > 0:
module_xml += "\t<disabled><![CDATA[" + str(data["disabled"]) + "]]></disabled>\n"
if "min_ff_event" in data and len(str(data["min_ff_event"]).strip()) > 0:
module_xml += "\t<min_ff_event><![CDATA[" + str(data["min_ff_event"]) + "]]></min_ff_event>\n"
if "status" in data and len(str(data["status"]).strip()) > 0:
module_xml += "\t<status><![CDATA[" + str(data["status"]) + "]]></status>\n"
if "timestamp" in data and len(str(data["timestamp"]).strip()) > 0:
module_xml += "\t<timestamp><![CDATA[" + str(data["timestamp"]) + "]]></timestamp>\n"
if "custom_id" in data and len(str(data["custom_id"]).strip()) > 0:
module_xml += "\t<custom_id><![CDATA[" + str(data["custom_id"]) + "]]></custom_id>\n"
if "critical_instructions" in data and len(str(data["critical_instructions"]).strip()) > 0:
module_xml += "\t<critical_instructions><![CDATA[" + str(data["critical_instructions"]) + "]]></critical_instructions>\n"
if "warning_instructions" in data and len(str(data["warning_instructions"]).strip()) > 0:
module_xml += "\t<warning_instructions><![CDATA[" + str(data["warning_instructions"]) + "]]></warning_instructions>\n"
if "unknown_instructions" in data and len(str(data["unknown_instructions"]).strip()) > 0:
module_xml += "\t<unknown_instructions><![CDATA[" + str(data["unknown_instructions"]) + "]]></unknown_instructions>\n"
if "quiet" in data and len(str(data["quiet"]).strip()) > 0:
module_xml += "\t<quiet><![CDATA[" + str(data["quiet"]) + "]]></quiet>\n"
if "module_ff_interval" in data and len(str(data["module_ff_interval"]).strip()) > 0:
module_xml += "\t<module_ff_interval><![CDATA[" + str(data["module_ff_interval"]) + "]]></module_ff_interval>\n"
if "crontab" in data and len(str(data["crontab"]).strip()) > 0:
module_xml += "\t<crontab><![CDATA[" + str(data["crontab"]) + "]]></crontab>\n"
if "min_ff_event_normal" in data and len(str(data["min_ff_event_normal"]).strip()) > 0:
module_xml += "\t<min_ff_event_normal><![CDATA[" + str(data["min_ff_event_normal"]) + "]]></min_ff_event_normal>\n"
if "min_ff_event_warning" in data and len(str(data["min_ff_event_warning"]).strip()) > 0:
module_xml += "\t<min_ff_event_warning><![CDATA[" + str(data["min_ff_event_warning"]) + "]]></min_ff_event_warning>\n"
if "min_ff_event_critical" in data and len(str(data["min_ff_event_critical"]).strip()) > 0:
module_xml += "\t<min_ff_event_critical><![CDATA[" + str(data["min_ff_event_critical"]) + "]]></min_ff_event_critical>\n"
if "ff_type" in data and len(str(data["ff_type"]).strip()) > 0:
module_xml += "\t<ff_type><![CDATA[" + str(data["ff_type"]) + "]]></ff_type>\n"
if "ff_timeout" in data and len(str(data["ff_timeout"]).strip()) > 0:
module_xml += "\t<ff_timeout><![CDATA[" + str(data["ff_timeout"]) + "]]></ff_timeout>\n"
if "each_ff" in data and len(str(data["each_ff"]).strip()) > 0:
module_xml += "\t<each_ff><![CDATA[" + str(data["each_ff"]) + "]]></each_ff>\n"
if "module_parent_unlink" in data and len(str(data["module_parent_unlink"]).strip()) > 0:
module_xml += "\t<module_parent_unlink><![CDATA[" + str(data["module_parent_unlink"]) + "]]></module_parent_unlink>\n"
if "alert" in data:
for alert in data["alert"]:
if len(str(alert).strip()) > 0:
module_xml += "\t<alert_template><![CDATA[" + str(alert) + "]]></alert_template>\n"
module_xml += "</module>\n"
if print_flag:
print_stdout(module_xml)
return module_xml
####
# Init log module template
#########################################################################################
def init_log_module(
default_values: dict = {}
) -> dict:
"""
Initializes a log module template with default values.
Args:
default_values (dict, optional): Default values to initialize the log module with. Defaults to an empty dictionary.
Returns:
dict: Dictionary representing the log module template with default values.
"""
module = {
"source" : None,
"value" : ""
}
for key, value in default_values.items():
if key in module:
module[key] = value
return module
####
# Returns log module in XML format. Accepts only {dict}
#########################################################################################
def print_log_module(
module: dict = None,
print_flag: bool = False
) -> str:
"""
Returns log module in XML format. Accepts only {dict}.
- Only works with one module at a time: otherwise iteration is needed.
- Module "value" field accepts str type.
- Use not_print_flag to avoid printing the XML (only populates variables).
Args:
module (dict, optional): Dictionary representing the log module. Defaults to None.
print_flag (bool, optional): Flag to indicate whether to print the XML. Defaults to False.
Returns:
str: XML representation of the log module.
"""
from .output import print_stdout
module_xml = ""
if module is not None:
data = dict(module)
module_xml = ("<log_module>\n"
"\t<source><![CDATA[" + str(data["source"]) + "]]></source>\n"
"\t<data>\"" + str(data["value"]) + "\"</data>\n"
)
module_xml += "</log_module>\n"
if print_flag:
print_stdout(module_xml)
return module_xml

View File

@ -0,0 +1,137 @@
import sys
import os
import json
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Prints any list, dict, string, float or integer as a json
Args:
var (any, optional): Variable to be printed. Defaults to "".
print_errors (bool, optional): Flag to indicate whether to print errors. Defaults to False.
"""
print_debug(var, print_errors)
####
# Prints message in stdout
#########################################################################################
def print_stdout(
message: str = ""
)-> None:
"""
Prints message in stdout
Args:
message (str, optional): Message to be printed. Defaults to "".
Returns:
None
"""
print(message)
####
# Prints message in stderr
#########################################################################################
def print_stderr(
message: str = ""
)-> None:
"""
Prints message in stderr
Args:
message (str, optional): Message to be printed. Defaults to "".
Returns:
None
"""
print(message, file=sys.stderr)
####
# Prints dictionary in formatted json string.
#########################################################################################
def print_debug(
var = "",
print_errors: bool = False
)-> None:
"""
Prints any list, dict, string, float or integer as a json
Args:
var: Variable to be printed.
print_errors (bool, optional): Whether to print errors. Defaults to False.
Returns:
None
"""
try:
debug_json = json.dumps(var, indent=4)
print_stdout(debug_json)
except json.JSONDecodeError as e:
if print_errors:
print_stderr(f"debug_dict: Failed to dump. Error: {e}")
except Exception as e:
if print_errors:
print_stderr(f"debug_dict: Unexpected error: {e}")
####
# Add new line to log file
#########################################################################################
def logger(
log_file: str = "",
message: str = "",
log_level: str = "",
add_date: bool = True,
print_errors: bool = False
) -> bool:
'''
Add new line to log file
Args:
log_file (str): Path to the log file.
message (str): Message to be added to the log.
log_level (str): Log level, if applicable. Defaults to an empty string.
add_date (bool): Whether to add the current date and time to the log entry. Defaults to True.
print_errors (bool): Whether to print errors. Defaults to False.
Returns:
bool: True if the log entry was successfully added, False otherwise.
'''
from .general import now
try:
if not os.path.exists(log_file):
with open(log_file, 'w') as file:
pass # Creates an empty file
elif not os.access(log_file, os.W_OK):
if print_errors:
print_stderr(f"Log file '{log_file}' is not writable.")
return False
with open(log_file, 'a') as file:
final_message = ""
if add_date:
final_message += now() + " "
if log_level != "":
final_message += "[" + log_level + "] "
final_message += message + "\n"
file.write(final_message)
return True
except Exception as e:
if print_errors:
print_stderr(f"An error occurred while appending to the log: {e}")
return False

View File

@ -0,0 +1,330 @@
from easysnmp import Session
from pysnmp.hlapi import *
####
# Define global variables dict, used in functions as default values.
# Its values can be changed.
#########################################################################################
_GLOBAL_VARIABLES = {
'hostname' : "",
'version' : 1,
'community' : "public",
'user' : "",
'auth_protocol' : "",
'auth_password' : "",
'privacy_protocol' : "",
'privacy_password' : "",
'security_level' : "noAuthNoPriv",
'timeout' : 2,
'retries' : 1,
'remote_port' : 161,
}
####
# Set a global variable with the specified name and assigns a value to it.
#########################################################################################
def set_global_variable(
variable_name: str = "",
value = None
)-> None:
"""
Sets the value of a global variable in the '_GLOBAL_VARIABLES' dictionary.
Args:
variable_name (str): Name of the variable to set.
value (any): Value to assign to the variable.
Returns:
None
"""
from .general import set_dict_key_value
set_dict_key_value(_GLOBAL_VARIABLES, variable_name, value)
####
# Get a global variable with the specified name.
#########################################################################################
def get_global_variable(
variable_name: str = ""
)-> None:
"""
Gets the value of a global variable in the '_GLOBAL_VARIABLES' dictionary.
Args:
variable_name (str): Name of the variable to set.
Returns:
None
"""
from .general import get_dict_key_value
get_dict_key_value(_GLOBAL_VARIABLES, variable_name)
####
# A class that represents an SNMP target, providing methods for setting up SNMP configuration and performing SNMP operations like GET and WALK.
#########################################################################################
class SNMPTarget:
"""
A class that represents an SNMP target, providing methods for setting up SNMP configuration
and performing SNMP operations like GET and WALK.
"""
def __init__(
self,
host: str = _GLOBAL_VARIABLES['hostname'],
version: int = _GLOBAL_VARIABLES['version'],
community: str = _GLOBAL_VARIABLES['community'],
user: str = _GLOBAL_VARIABLES['user'],
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
security_level: str = _GLOBAL_VARIABLES['security_level'],
timeout: int = _GLOBAL_VARIABLES['timeout'],
retries: int = _GLOBAL_VARIABLES['retries'],
remote_port: int = _GLOBAL_VARIABLES['remote_port']):
self.session = create_snmp_session(
host,
version,
community,
user,
auth_protocol,
auth_password,
privacy_protocol,
privacy_password,
security_level,
timeout,
retries,
remote_port
)
####
# Performs an SNMP GET operation to retrieve the value of a specified OID.
#########################################################################################
def snmp_get(self, oid):
"""
Performs an SNMP GET operation to retrieve the value of a specified OID.
Args:
oid (str): The OID (Object Identifier) for the SNMP GET operation.
Returns:
str: The value retrieved from the specified OID.
"""
return self.session.get(oid).value
####
# Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
#########################################################################################
def snmp_walk(self, oid):
"""
Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
Args:
oid (str): The OID (Object Identifier) representing the root of the subtree.
Returns:
list: A list of values retrieved from the specified subtree.
"""
oid_items = self.session.walk(oid)
oid_value_dict = {} # Initialize an empty dictionary
for item in oid_items:
oid_with_index = f"{item.oid}.{item.oid_index}"
oid_value_dict[oid_with_index] = item.value
return oid_value_dict
####
# Creates an SNMP session based on the global configuration variables.
#########################################################################################
def create_snmp_session(
host: str = _GLOBAL_VARIABLES['hostname'],
version: int = _GLOBAL_VARIABLES['version'],
community: str = _GLOBAL_VARIABLES['community'],
user: str = _GLOBAL_VARIABLES['user'],
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
security_level: str = _GLOBAL_VARIABLES['security_level'],
timeout: int = _GLOBAL_VARIABLES['timeout'],
retries: int = _GLOBAL_VARIABLES['retries'],
remote_port: int = _GLOBAL_VARIABLES['remote_port']
) -> Session:
"""
Creates an SNMP session based on the provided configuration or global variables.
Args:
hostname (str): Hostname or IP address of the SNMP agent.
version (int): SNMP version (1, 2, or 3).
community (str): SNMP community string (for version 1 or 2).
user (str): SNMPv3 username (for version 3).
auth_protocol (str): SNMPv3 authentication protocol (e.g., 'MD5' or 'SHA').
auth_password (str): SNMPv3 authentication password.
privacy_protocol (str): SNMPv3 privacy protocol (e.g., 'AES' or 'DES').
privacy_password (str): SNMPv3 privacy password.
security_level (str): SNMPv3 security level ('noAuthNoPriv', 'authNoPriv', 'authPriv').
timeout (int): SNMP request timeout in seconds.
retries (int): Number of SNMP request retries.
remote_port (int): SNMP agent port.
Returns:
Session: An SNMP session configured based on the provided or global variables.
"""
session_kwargs = {
"hostname": host,
"version": version,
"use_numeric": True,
"timeout": timeout,
"retries": retries,
"remote_port": remote_port
}
if version == 1 or version == 2:
session_kwargs["community"] = community
elif version == 3:
session_kwargs["security_username"] = user
if security_level == "authPriv":
session_kwargs.update({
"auth_protocol": auth_protocol,
"auth_password": auth_password,
"privacy_protocol": privacy_protocol,
"privacy_password": privacy_password,
"security_level": "auth_with_privacy"
})
elif security_level == "authNoPriv":
session_kwargs.update({
"auth_protocol": auth_protocol,
"auth_password": auth_password,
"security_level": "auth_without_privacy"
})
elif security_level == "noAuthNoPriv":
session_kwargs["security_level"] = "no_auth_or_privacy"
return Session(**session_kwargs)
####
# Performs an SNMP GET operation to retrieve the value of a specified OID.
#########################################################################################
def snmp_get(
oid: str,
host: str = _GLOBAL_VARIABLES['hostname'],
version: int = _GLOBAL_VARIABLES['version'],
community: str = _GLOBAL_VARIABLES['community'],
user: str = _GLOBAL_VARIABLES['user'],
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
security_level: str = _GLOBAL_VARIABLES['security_level'],
timeout: int = _GLOBAL_VARIABLES['timeout'],
retries: int = _GLOBAL_VARIABLES['retries'],
remote_port: int = _GLOBAL_VARIABLES['remote_port']
) -> str:
"""
Performs an SNMP GET operation to retrieve the value of a specified OID.
Args:
oid (str): The OID (Object Identifier) for the SNMP GET operation.
Returns:
str: The value retrieved from the specified OID.
"""
session = create_snmp_session(host,version,community,user,auth_protocol,auth_password,privacy_protocol,privacy_password,security_level,timeout,retries,remote_port)
return session.get(oid).value
####
# Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
#########################################################################################
def snmp_walk(
oid: str,
host: str = _GLOBAL_VARIABLES['hostname'],
version: int = _GLOBAL_VARIABLES['version'],
community: str = _GLOBAL_VARIABLES['community'],
user: str = _GLOBAL_VARIABLES['user'],
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
security_level: str = _GLOBAL_VARIABLES['security_level'],
timeout: int = _GLOBAL_VARIABLES['timeout'],
retries: int = _GLOBAL_VARIABLES['retries'],
remote_port: int = _GLOBAL_VARIABLES['remote_port']
) -> dict:
"""
Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
Args:
oid (str): The OID (Object Identifier) representing the root of the subtree.
Returns:
list: A list of values retrieved from the specified subtree.
"""
session = create_snmp_session(host,version,community,user,auth_protocol,auth_password,privacy_protocol,privacy_password,security_level,timeout,retries,remote_port)
oid_items = session.walk(oid)
oid_value_dict = {}
for item in oid_items:
oid_with_index = f"{item.oid}.{item.oid_index}"
oid_value_dict[oid_with_index] = item.value
return oid_value_dict
####
# Sends an SNMP trap to the specified destination IP using the given OID, value, and community.
#########################################################################################
def snmp_trap(
trap_oid: str,
trap_value: str,
destination_ip: str,
community: str) -> None:
"""
Sends an SNMP trap to the specified destination IP using the given OID, value, and community.
Args:
trap_oid (str): The OID (Object Identifier) for the SNMP trap.
trap_value (str): The value associated with the trap.
destination_ip (str): The IP address of the trap's destination.
community (str): The SNMP community string for authentication.
Returns:
None
"""
trap_object = ObjectIdentity(trap_oid)
trap_value = OctetString(trap_value)
errorIndication, errorStatus, errorIndex, varBinds = next(
sendNotification(
SnmpEngine(),
CommunityData(community),
UdpTransportTarget((destination_ip, 162)),
ContextData(),
'trap',
NotificationType(
ObjectIdentity('SNMPv2-MIB', 'coldStart')
).addVarBinds(
(trap_object, trap_value)
)
)
)
if errorIndication:
print('Error:', errorIndication)
elif errorStatus:
print(
'%s at %s' %
(
errorStatus.prettyPrint(),
errorIndex and varBinds[int(errorIndex) - 1][0] or '?'
)
)
else:
print('SNMP trap sent successfully.')

View File

@ -0,0 +1,249 @@
import sys
from queue import Queue
from threading import Thread
from multiprocessing import Pool, Manager
####
# Define multi-processing internal global variables.
#########################################################################################
_MANAGER = Manager()
_SHARED_DICT = _MANAGER.dict()
_SHARED_DICT_LOCK = _MANAGER.Lock()
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Prints the provided variable in a JSON-like format.
Args:
var: The variable (list, dict, string, float, integer) to be printed.
print_errors (bool): If True, prints any errors that occur during formatting.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Internal use only: Run a given function in a thread
#########################################################################################
def _single_thread(
q = None,
function: callable = None,
errors: list = []
):
"""
Internal use only: Runs a given function in a thread.
Args:
q: A queue from which to get parameters for the function.
function (callable): The function to be executed in the thread.
errors (list): A list to store any errors encountered during execution.
"""
params=q.get()
q.task_done()
try:
function(params)
except Exception as e:
errors.append("Error while runing single thread: "+str(e))
####
# Run a given function for given items list in a given number of threads
#########################################################################################
def run_threads(
max_threads: int = 1,
function: callable = None,
items: list = [],
print_errors: bool = False
) -> bool:
"""
Run a given function for a list of items in multiple threads.
Args:
max_threads (int): Maximum number of threads to use.
function (callable): The function to be executed in each thread.
items (list): List of items to process.
print_errors (bool): Whether to print errors encountered during execution.
Returns:
bool: True if all threads executed successfully, False otherwise.
"""
from .output import print_stderr
# Assign threads
threads = max_threads
if threads > len(items):
threads = len(items)
if threads < 1:
threads = 1
# Distribute items per thread
items_per_thread = []
thread = 0
for item in items:
if not 0 <= thread < len(items_per_thread):
items_per_thread.append([])
items_per_thread[thread].append(item)
thread += 1
if thread >= threads:
thread=0
# Run threads
try:
q=Queue()
for n_thread in range(threads) :
q.put(items_per_thread[n_thread])
run_threads = []
errors = []
for n_thread in range(threads):
t = Thread(target=_single_thread, args=(q, function, errors))
t.daemon=True
t.start()
run_threads.append(t)
for t in run_threads:
t.join()
q.join()
if print_errors:
for error in errors:
print_stderr(str(error))
if len(errors) > 0:
return False
else:
return True
except Exception as e:
if print_errors:
print_stderr("Error while running threads: "+str(e))
return False
####
# Set a given value to a key in the internal shared dict.
# Used by all parallel processes.
#########################################################################################
def set_shared_dict_value(
key: str = None,
value = None
)-> None:
"""
Set a value for a key in the internal shared dictionary.
This function is used by all parallel processes.
Args:
key (str): The key in the shared dictionary.
value: The value to be assigned to the key.
"""
global _SHARED_DICT
if key is not None:
with _SHARED_DICT_LOCK:
_SHARED_DICT[key] = value
####
# Add a given value to a key in the internal shared dict.
# Used by all parallel processes.
#########################################################################################
def add_shared_dict_value(
key: str = None,
value = None
)-> None:
"""
Add a value to a key in the internal shared dictionary.
This function is used by all parallel processes.
Args:
key (str): The key in the shared dictionary.
value: The value to be added to the key.
"""
global _SHARED_DICT
if key is not None:
with _SHARED_DICT_LOCK:
if key in _SHARED_DICT:
_SHARED_DICT[key] += value
else:
set_shared_dict_value(key, value)
####
# Get the value of a key in the internal shared dict.
# Used by all parallel processes.
#########################################################################################
def get_shared_dict_value(
key: str = None
):
"""
Get the value of a key in the internal shared dictionary.
This function is used by all parallel processes.
Args:
key (str): The key in the shared dictionary.
Returns:
The value associated with the key, or None if the key does not exist.
"""
global _SHARED_DICT
with _SHARED_DICT_LOCK:
if key in _SHARED_DICT and key is not None:
return _SHARED_DICT[key]
else:
return None
####
# Run a given function for given items list in a given number of processes
# Given function receives each item as first parameter
#########################################################################################
def run_processes(
max_processes: int = 1,
function: callable = None,
items: list = [],
print_errors: bool = False
) -> bool:
"""
Run a given function for given items list in a given number of processes
Args:
max_processes (int): The maximum number of processes to run in parallel.
function (callable): The function to be executed for each item.
items (list): List of items to be processed.
print_errors (bool): Whether to print errors.
Returns:
bool: True if all processes completed successfully, False otherwise.
"""
from .output import print_stderr
# Assign processes
processes = max_processes
if processes > len(items):
processes = len(items)
if processes < 1:
processes = 1
# Run processes
with Pool(processes) as pool:
try:
pool.map(function, items)
result = True
except Exception as error:
if print_errors:
print_stderr(str(error))
result = False
return result

View File

@ -0,0 +1,258 @@
from datetime import datetime
from subprocess import *
import shutil
import subprocess
import os
import sys
####
# Define global variables dict, used in functions as default values.
# Its values can be changed.
#########################################################################################
_GLOBAL_VARIABLES = {
'transfer_mode' : 'tentacle',
'temporal' : '/tmp',
'data_dir' : '/var/spool/pandora/data_in/',
'tentacle_client' : 'tentacle_client',
'tentacle_ip' : '127.0.0.1',
'tentacle_port' : 41121,
'tentacle_extra_opts' : '',
'tentacle_retries' : 1
}
####
# Internal: Alias for output.print_debug function
#########################################################################################
def _print_debug(
var = "",
print_errors: bool = False
):
"""
Prints any list, dict, string, float or integer as a json
Args:
var: The variable to be printed.
print_errors (bool): Whether to print errors.
"""
from .output import print_debug
print_debug(var, print_errors)
####
# Set a global variable with the specified name and assigns a value to it.
#########################################################################################
def set_global_variable(
variable_name: str = "",
value = None
)-> None:
"""
Sets the value of a global variable in the '_GLOBAL_VARIABLES' dictionary.
Args:
variable_name (str): Name of the variable to set.
value (any): Value to assign to the variable.
Returns:
None
"""
from .general import set_dict_key_value
set_dict_key_value(_GLOBAL_VARIABLES, variable_name, value)
####
# Get a global variable with the specified name.
#########################################################################################
def get_global_variable(
variable_name: str = ""
)-> None:
"""
Gets the value of a global variable in the '_GLOBAL_VARIABLES' dictionary.
Args:
variable_name (str): Name of the variable to set.
Returns:
None
"""
from .general import get_dict_key_value
get_dict_key_value(_GLOBAL_VARIABLES, variable_name)
####
# Sends file using tentacle protocol
#########################################################################################
def tentacle_xml(
data_file: str = "",
tentacle_ops: dict = {},
tentacle_path: str = _GLOBAL_VARIABLES['tentacle_client'],
retry: bool = False,
debug: int = 0,
print_errors: bool = True
) -> bool:
"""
Sends file using tentacle protocol
Args:
data_file (str): Path to the data file to be sent.
tentacle_ops (dict): Tentacle options as a dictionary (address [password] [port]).
tentacle_path (str): Custom path for the tentacle client executable.
retry (bool): Whether to retry sending the file if it fails.
debug (int): Debug mode flag. If enabled (1), the data file will not be removed after sending.
print_errors (bool): Whether to print error messages.
Returns:
bool: True for success, False for errors.
"""
from .output import print_stderr
if data_file is not None :
if not 'address' in tentacle_ops:
tentacle_ops['address'] = _GLOBAL_VARIABLES['tentacle_ip']
if not 'port' in tentacle_ops:
tentacle_ops['port'] = _GLOBAL_VARIABLES['tentacle_port']
if not 'extra_opts' in tentacle_ops:
tentacle_ops['extra_opts'] = _GLOBAL_VARIABLES['tentacle_extra_opts']
if tentacle_ops['address'] is None :
if print_errors:
print_stderr("Tentacle error: No address defined")
return False
try :
with open(data_file.strip(), 'r') as data:
data.read()
data.close()
except Exception as e :
if print_errors:
print_stderr(f"Tentacle error: {type(e).__name__} {e}")
return False
tentacle_cmd = f"{tentacle_path} -v -a {tentacle_ops['address']} -p {tentacle_ops['port']} {tentacle_ops['extra_opts']} {data_file.strip()}"
tentacle_exe=subprocess.Popen(tentacle_cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
rc=tentacle_exe.wait()
result = True
if rc != 0 :
if retry:
tentacle_retries = _GLOBAL_VARIABLES['tentacle_retries']
if tentacle_retries < 1:
tentacle_retries = 1
retry_count = 0
while retry_count < tentacle_retries :
tentacle_exe=subprocess.Popen(tentacle_cmd, stdout=subprocess.PIPE,stderr=subprocess.PIPE, shell=True)
rc=tentacle_exe.wait()
if rc == 0:
break
if print_errors:
stderr = tentacle_exe.stderr.read().decode()
msg = f"Tentacle error (Retry {retry_count + 1}/{tentacle_retries}): {stderr}"
print_stderr(str(datetime.today().strftime('%Y-%m-%d %H:%M')) + msg)
retry_count += 1
if retry_count >= tentacle_retries:
result = False
else:
if print_errors:
stderr = tentacle_exe.stderr.read().decode()
msg="Tentacle error:" + str(stderr)
print_stderr(str(datetime.today().strftime('%Y-%m-%d %H:%M')) + msg)
result = False
if debug == 0 :
os.remove(data_file.strip())
return result
else:
if print_errors:
print_stderr("Tentacle error: file path is required.")
return False
####
# Detect transfer mode and send XML.
#########################################################################################
def transfer_xml(
file: str = "",
transfer_mode: str = _GLOBAL_VARIABLES['transfer_mode'],
tentacle_ip: str = _GLOBAL_VARIABLES['tentacle_ip'],
tentacle_port: int = _GLOBAL_VARIABLES['tentacle_port'],
tentacle_extra_opts: str = _GLOBAL_VARIABLES['tentacle_extra_opts'],
data_dir: str = _GLOBAL_VARIABLES['data_dir']
)-> None:
"""
Detects the transfer mode and calls the agentplugin() function to perform the transfer.
Args:
file (str): Path to file to send.
transfer_mode (str, optional): Transfer mode. Default is _GLOBAL_VARIABLES['transfer_mode'].
tentacle_ip (str, optional): IP address for Tentacle. Default is _GLOBAL_VARIABLES['tentacle_ip'].
tentacle_port (str, optional): Port for Tentacle. Default is _GLOBAL_VARIABLES['tentacle_port'].
data_dir (str, optional): Path to data dir with local transfer mode. Default is _GLOBAL_VARIABLES['data_dir'].
Returns:
None
"""
if file is not None:
if transfer_mode != "local":
tentacle_conf = {
'address' : tentacle_ip,
'port' : tentacle_port,
'extra_opts' : tentacle_extra_opts
}
tentacle_xml(file, tentacle_conf)
else:
shutil.move(file, data_dir)
####
# Creates a agent .data file in the specified data_dir folder
#########################################################################################
def write_xml(
xml: str = "",
agent_name: str = "",
data_dir: str = _GLOBAL_VARIABLES['temporal'],
print_errors: bool = False
) -> str:
"""
Creates an agent .data file in the specified data_dir folder
Args:
xml (str): XML string to be written in the file.
agent_name (str): Agent name for the XML and file name.
data_dir (str): Folder in which the file will be created.
print_errors (bool): Whether to print error messages.
Returns:
str: Path to the created .data file.
"""
from .general import generate_md5
from .output import print_stderr
Utime = datetime.now().strftime('%s')
agent_name_md5 = generate_md5(agent_name)
data_file = "%s/%s.%s.data" %(str(data_dir),agent_name_md5,str(Utime))
try:
with open(data_file, 'x') as data:
data.write(xml)
except OSError as o:
if print_errors:
print_stderr(f"ERROR - Could not write file: {o}, please check directory permissions")
except Exception as e:
if print_errors:
print_stderr(f"{type(e).__name__}: {e}")
return data_file