edit class snmpmanager, added parameters
This commit is contained in:
parent
340ed2169f
commit
44123eb269
|
@ -6,10 +6,10 @@ from easysnmp import Session, TrapSender
|
||||||
#########################################################################################
|
#########################################################################################
|
||||||
|
|
||||||
_GLOBAL_VARIABLES = {
|
_GLOBAL_VARIABLES = {
|
||||||
'hostname' : '',
|
'hostname' : "",
|
||||||
'version' : 1,
|
'version' : 1,
|
||||||
'community' : 'public',
|
'community' : "public",
|
||||||
'user' : '',
|
'user' : "",
|
||||||
'auth_protocol' : "",
|
'auth_protocol' : "",
|
||||||
'auth_password' : "",
|
'auth_password' : "",
|
||||||
'privacy_protocol' : "",
|
'privacy_protocol' : "",
|
||||||
|
@ -20,6 +20,7 @@ _GLOBAL_VARIABLES = {
|
||||||
'remote_port' : 161,
|
'remote_port' : 161,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
####
|
####
|
||||||
# Set a global variable with the specified name and assigns a value to it.
|
# Set a global variable with the specified name and assigns a value to it.
|
||||||
#########################################################################################
|
#########################################################################################
|
||||||
|
@ -60,23 +61,100 @@ def get_global_variable(
|
||||||
|
|
||||||
get_dict_key_value(_GLOBAL_VARIABLES, variable_name)
|
get_dict_key_value(_GLOBAL_VARIABLES, variable_name)
|
||||||
|
|
||||||
|
####
|
||||||
|
# A class that represents an SNMP target, providing methods for setting up SNMP configuration and performing SNMP operations like GET and WALK.
|
||||||
|
#########################################################################################
|
||||||
|
class SNMPTarget:
|
||||||
|
"""
|
||||||
|
A class that represents an SNMP target, providing methods for setting up SNMP configuration
|
||||||
|
and performing SNMP operations like GET and WALK.
|
||||||
|
"""
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
host: str = _GLOBAL_VARIABLES['hostname'],
|
||||||
|
version: int = _GLOBAL_VARIABLES['version'],
|
||||||
|
community: str = _GLOBAL_VARIABLES['community'],
|
||||||
|
user: str = _GLOBAL_VARIABLES['user'],
|
||||||
|
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
|
||||||
|
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
|
||||||
|
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
|
||||||
|
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
|
||||||
|
security_level: str = _GLOBAL_VARIABLES['security_level'],
|
||||||
|
timeout: int = _GLOBAL_VARIABLES['timeout'],
|
||||||
|
retries: int = _GLOBAL_VARIABLES['retries'],
|
||||||
|
remote_port: int = _GLOBAL_VARIABLES['remote_port']):
|
||||||
|
|
||||||
|
self.session = create_snmp_session(
|
||||||
|
host,
|
||||||
|
version,
|
||||||
|
community,
|
||||||
|
user,
|
||||||
|
auth_protocol,
|
||||||
|
auth_password,
|
||||||
|
privacy_protocol,
|
||||||
|
privacy_password,
|
||||||
|
security_level,
|
||||||
|
timeout,
|
||||||
|
retries,
|
||||||
|
remote_port
|
||||||
|
)
|
||||||
|
|
||||||
|
####
|
||||||
|
# Performs an SNMP GET operation to retrieve the value of a specified OID.
|
||||||
|
#########################################################################################
|
||||||
|
def snmp_get(self, oid):
|
||||||
|
"""
|
||||||
|
Performs an SNMP GET operation to retrieve the value of a specified OID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
oid (str): The OID (Object Identifier) for the SNMP GET operation.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: The value retrieved from the specified OID.
|
||||||
|
"""
|
||||||
|
return self.session.get(oid)
|
||||||
|
|
||||||
|
####
|
||||||
|
# Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
|
||||||
|
#########################################################################################
|
||||||
|
def snmp_walk(self, oid):
|
||||||
|
"""
|
||||||
|
Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
oid (str): The OID (Object Identifier) representing the root of the subtree.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
list: A list of values retrieved from the specified subtree.
|
||||||
|
"""
|
||||||
|
|
||||||
|
oid_items = self.session.walk(oid)
|
||||||
|
|
||||||
|
oid_value_dict = {} # Initialize an empty dictionary
|
||||||
|
|
||||||
|
for item in oid_items:
|
||||||
|
oid_with_index = f"{item.oid}.{item.oid_index}"
|
||||||
|
oid_value_dict[oid_with_index] = item.value
|
||||||
|
|
||||||
|
return oid_value_dict
|
||||||
|
|
||||||
####
|
####
|
||||||
# Creates an SNMP session based on the global configuration variables.
|
# Creates an SNMP session based on the global configuration variables.
|
||||||
#########################################################################################
|
#########################################################################################
|
||||||
def create_snmp_session(
|
def create_snmp_session(
|
||||||
host=None,
|
host: str = _GLOBAL_VARIABLES['hostname'],
|
||||||
version=None,
|
version: int = _GLOBAL_VARIABLES['version'],
|
||||||
community=None,
|
community: str = _GLOBAL_VARIABLES['community'],
|
||||||
user=None,
|
user: str = _GLOBAL_VARIABLES['user'],
|
||||||
auth_protocol=None,
|
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
|
||||||
auth_password=None,
|
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
|
||||||
privacy_protocol=None,
|
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
|
||||||
privacy_password=None,
|
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
|
||||||
security_level=None,
|
security_level: str = _GLOBAL_VARIABLES['security_level'],
|
||||||
timeout=None,
|
timeout: int = _GLOBAL_VARIABLES['timeout'],
|
||||||
retries=None,
|
retries: int = _GLOBAL_VARIABLES['retries'],
|
||||||
remote_port=None
|
remote_port: int = _GLOBAL_VARIABLES['remote_port']
|
||||||
):
|
) -> Session:
|
||||||
"""
|
"""
|
||||||
Creates an SNMP session based on the provided configuration or global variables.
|
Creates an SNMP session based on the provided configuration or global variables.
|
||||||
|
|
||||||
|
@ -97,18 +175,6 @@ def create_snmp_session(
|
||||||
Returns:
|
Returns:
|
||||||
Session: An SNMP session configured based on the provided or global variables.
|
Session: An SNMP session configured based on the provided or global variables.
|
||||||
"""
|
"""
|
||||||
host = _GLOBAL_VARIABLES['hostname']
|
|
||||||
version = _GLOBAL_VARIABLES['version']
|
|
||||||
community = _GLOBAL_VARIABLES['community']
|
|
||||||
user = _GLOBAL_VARIABLES['user']
|
|
||||||
auth_protocol = _GLOBAL_VARIABLES['auth_protocol']
|
|
||||||
auth_password = _GLOBAL_VARIABLES['auth_password']
|
|
||||||
privacy_protocol = _GLOBAL_VARIABLES['privacy_protocol']
|
|
||||||
privacy_password = _GLOBAL_VARIABLES['privacy_password']
|
|
||||||
security_level = _GLOBAL_VARIABLES['security_level']
|
|
||||||
timeout = _GLOBAL_VARIABLES['timeout']
|
|
||||||
retries = _GLOBAL_VARIABLES['retries']
|
|
||||||
remote_port = _GLOBAL_VARIABLES['remote_port']
|
|
||||||
|
|
||||||
session_kwargs = {
|
session_kwargs = {
|
||||||
"hostname": host,
|
"hostname": host,
|
||||||
|
@ -143,9 +209,23 @@ def create_snmp_session(
|
||||||
|
|
||||||
return Session(**session_kwargs)
|
return Session(**session_kwargs)
|
||||||
|
|
||||||
|
####
|
||||||
|
# Performs an SNMP GET operation to retrieve the value of a specified OID.
|
||||||
|
#########################################################################################
|
||||||
def snmp_get(
|
def snmp_get(
|
||||||
oid: str
|
oid: str,
|
||||||
|
host: str = _GLOBAL_VARIABLES['hostname'],
|
||||||
|
version: int = _GLOBAL_VARIABLES['version'],
|
||||||
|
community: str = _GLOBAL_VARIABLES['community'],
|
||||||
|
user: str = _GLOBAL_VARIABLES['user'],
|
||||||
|
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
|
||||||
|
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
|
||||||
|
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
|
||||||
|
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
|
||||||
|
security_level: str = _GLOBAL_VARIABLES['security_level'],
|
||||||
|
timeout: int = _GLOBAL_VARIABLES['timeout'],
|
||||||
|
retries: int = _GLOBAL_VARIABLES['retries'],
|
||||||
|
remote_port: int = _GLOBAL_VARIABLES['remote_port']
|
||||||
) -> str:
|
) -> str:
|
||||||
"""
|
"""
|
||||||
Performs an SNMP GET operation to retrieve the value of a specified OID.
|
Performs an SNMP GET operation to retrieve the value of a specified OID.
|
||||||
|
@ -156,12 +236,27 @@ def snmp_get(
|
||||||
Returns:
|
Returns:
|
||||||
str: The value retrieved from the specified OID.
|
str: The value retrieved from the specified OID.
|
||||||
"""
|
"""
|
||||||
session = create_snmp_session()
|
session = create_snmp_session(host,version,community,user,auth_protocol,auth_password,privacy_protocol,privacy_password,security_level,timeout,retries,remote_port)
|
||||||
return session.get(oid)
|
return session.get(oid)
|
||||||
|
|
||||||
|
####
|
||||||
|
# Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
|
||||||
|
#########################################################################################
|
||||||
def snmp_walk(
|
def snmp_walk(
|
||||||
oid: str
|
oid: str,
|
||||||
) -> list:
|
host: str = _GLOBAL_VARIABLES['hostname'],
|
||||||
|
version: int = _GLOBAL_VARIABLES['version'],
|
||||||
|
community: str = _GLOBAL_VARIABLES['community'],
|
||||||
|
user: str = _GLOBAL_VARIABLES['user'],
|
||||||
|
auth_protocol: str = _GLOBAL_VARIABLES['auth_protocol'],
|
||||||
|
auth_password: str = _GLOBAL_VARIABLES['auth_password'],
|
||||||
|
privacy_protocol: str = _GLOBAL_VARIABLES['privacy_protocol'],
|
||||||
|
privacy_password: str = _GLOBAL_VARIABLES['privacy_password'],
|
||||||
|
security_level: str = _GLOBAL_VARIABLES['security_level'],
|
||||||
|
timeout: int = _GLOBAL_VARIABLES['timeout'],
|
||||||
|
retries: int = _GLOBAL_VARIABLES['retries'],
|
||||||
|
remote_port: int = _GLOBAL_VARIABLES['remote_port']
|
||||||
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
|
Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
|
||||||
|
|
||||||
|
@ -171,8 +266,16 @@ def snmp_walk(
|
||||||
Returns:
|
Returns:
|
||||||
list: A list of values retrieved from the specified subtree.
|
list: A list of values retrieved from the specified subtree.
|
||||||
"""
|
"""
|
||||||
session = create_snmp_session()
|
session = create_snmp_session(host,version,community,user,auth_protocol,auth_password,privacy_protocol,privacy_password,security_level,timeout,retries,remote_port)
|
||||||
return session.walk(oid)
|
oid_items = session.walk(oid)
|
||||||
|
|
||||||
|
oid_value_dict = {}
|
||||||
|
|
||||||
|
for item in oid_items:
|
||||||
|
oid_with_index = f"{item.oid}.{item.oid_index}"
|
||||||
|
oid_value_dict[oid_with_index] = item.value
|
||||||
|
|
||||||
|
return oid_value_dict
|
||||||
|
|
||||||
####
|
####
|
||||||
# Sends an SNMP trap to the specified destination IP using the given OID, value, and community.
|
# Sends an SNMP trap to the specified destination IP using the given OID, value, and community.
|
||||||
|
@ -202,143 +305,4 @@ def snmp_trap(
|
||||||
trap.destination_ip = destination_ip
|
trap.destination_ip = destination_ip
|
||||||
trap.community = community
|
trap.community = community
|
||||||
|
|
||||||
trap.send_trap()
|
trap.send_trap()
|
||||||
|
|
||||||
class SNMPManager:
|
|
||||||
def __init__(self):
|
|
||||||
self.global_variables = {
|
|
||||||
'hostname': '',
|
|
||||||
'version': 1,
|
|
||||||
'community': 'public',
|
|
||||||
'user': '',
|
|
||||||
'auth_protocol': "",
|
|
||||||
'auth_password': "",
|
|
||||||
'privacy_protocol': "",
|
|
||||||
'privacy_password': "",
|
|
||||||
'security_level': "noAuthNoPriv",
|
|
||||||
'timeout': 2,
|
|
||||||
'retries': 1,
|
|
||||||
'remote_port': 161,
|
|
||||||
}
|
|
||||||
|
|
||||||
def set_global_variable(self, variable_name, value):
|
|
||||||
"""
|
|
||||||
Sets the value of a global variable in the SNMPManager instance.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
variable_name (str): Name of the variable to set.
|
|
||||||
value (any): Value to assign to the variable.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
None
|
|
||||||
"""
|
|
||||||
self.global_variables[variable_name] = value
|
|
||||||
|
|
||||||
def get_global_variable(self, variable_name):
|
|
||||||
"""
|
|
||||||
Gets the value of a global variable from the SNMPManager instance.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
variable_name (str): Name of the variable to retrieve.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
any: The value of the specified variable, or None if not found.
|
|
||||||
"""
|
|
||||||
return self.global_variables.get(variable_name)
|
|
||||||
|
|
||||||
def create_snmp_session(self):
|
|
||||||
"""
|
|
||||||
Creates an SNMP session based on the global configuration variables in the SNMPManager instance.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
None
|
|
||||||
"""
|
|
||||||
host = _GLOBAL_VARIABLES['hostname']
|
|
||||||
version = _GLOBAL_VARIABLES['version']
|
|
||||||
community = _GLOBAL_VARIABLES['community']
|
|
||||||
user = _GLOBAL_VARIABLES['user']
|
|
||||||
auth_protocol = _GLOBAL_VARIABLES['auth_protocol']
|
|
||||||
auth_password = _GLOBAL_VARIABLES['auth_password']
|
|
||||||
privacy_protocol = _GLOBAL_VARIABLES['privacy_protocol']
|
|
||||||
privacy_password = _GLOBAL_VARIABLES['privacy_password']
|
|
||||||
security_level = _GLOBAL_VARIABLES['security_level']
|
|
||||||
timeout = _GLOBAL_VARIABLES['timeout']
|
|
||||||
retries = _GLOBAL_VARIABLES['retries']
|
|
||||||
remote_port = _GLOBAL_VARIABLES['remote_port']
|
|
||||||
|
|
||||||
session_kwargs = {
|
|
||||||
"hostname": host,
|
|
||||||
"version": version,
|
|
||||||
"use_numeric": True,
|
|
||||||
"timeout": timeout,
|
|
||||||
"retries": retries,
|
|
||||||
"remote_port": remote_port
|
|
||||||
}
|
|
||||||
|
|
||||||
if version == 1 or version == 2:
|
|
||||||
session_kwargs["community"] = community
|
|
||||||
elif version == 3:
|
|
||||||
session_kwargs["security_username"] = user
|
|
||||||
|
|
||||||
if security_level == "authPriv":
|
|
||||||
session_kwargs.update({
|
|
||||||
"auth_protocol": auth_protocol,
|
|
||||||
"auth_password": auth_password,
|
|
||||||
"privacy_protocol": privacy_protocol,
|
|
||||||
"privacy_password": privacy_password,
|
|
||||||
"security_level": "auth_with_privacy"
|
|
||||||
})
|
|
||||||
elif security_level == "authNoPriv":
|
|
||||||
session_kwargs.update({
|
|
||||||
"auth_protocol": auth_protocol,
|
|
||||||
"auth_password": auth_password,
|
|
||||||
"security_level": "auth_without_privacy"
|
|
||||||
})
|
|
||||||
elif security_level == "noAuthNoPriv":
|
|
||||||
session_kwargs["security_level"] = "no_auth_or_privacy"
|
|
||||||
|
|
||||||
self.session = Session(**session_kwargs)
|
|
||||||
|
|
||||||
def snmp_get(self, oid):
|
|
||||||
"""
|
|
||||||
Performs an SNMP GET operation to retrieve the value of a specified OID.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
oid (str): The OID (Object Identifier) for the SNMP GET operation.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: The value retrieved from the specified OID.
|
|
||||||
"""
|
|
||||||
return self.session.get(oid)
|
|
||||||
|
|
||||||
def snmp_walk(self, oid):
|
|
||||||
"""
|
|
||||||
Performs an SNMP WALK operation to retrieve a list of values from a subtree of the MIB.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
oid (str): The OID (Object Identifier) representing the root of the subtree.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
list: A list of values retrieved from the specified subtree.
|
|
||||||
"""
|
|
||||||
return self.session.walk(oid)
|
|
||||||
|
|
||||||
def snmp_trap(self, trap_oid, trap_value, destination_ip, community):
|
|
||||||
"""
|
|
||||||
Sends an SNMP trap to the specified destination IP using the given OID, value, and community.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
trap_oid (str): The OID (Object Identifier) for the SNMP trap.
|
|
||||||
trap_value (str): The value associated with the trap.
|
|
||||||
destination_ip (str): The IP address of the trap's destination.
|
|
||||||
community (str): The SNMP community string for authentication.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
None
|
|
||||||
"""
|
|
||||||
trap = TrapSender()
|
|
||||||
trap.trap_oid = trap_oid
|
|
||||||
trap.trap_value = trap_value
|
|
||||||
trap.destination_ip = destination_ip
|
|
||||||
trap.community = community
|
|
||||||
trap.send_trap()
|
|
Loading…
Reference in New Issue