mirror of https://github.com/OpenKMIP/PyKMIP.git
176 lines
6.7 KiB
Python
176 lines
6.7 KiB
Python
# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import multiprocessing
|
|
import os
|
|
import signal
|
|
import time
|
|
|
|
from kmip.core import policy as operation_policy
|
|
|
|
|
|
def get_json_files(p):
|
|
"""
|
|
Scan the provided policy directory for all JSON policy files.
|
|
"""
|
|
f = [os.path.join(p, x) for x in os.listdir(p) if x.endswith(".json")]
|
|
return sorted(f)
|
|
|
|
|
|
class PolicyDirectoryMonitor(multiprocessing.Process):
|
|
"""
|
|
A file monitor that tracks modifications made within the policy directory.
|
|
"""
|
|
|
|
def __init__(self, policy_directory, policy_store, live_monitoring=True):
|
|
"""
|
|
Set up the file monitor with the policy directory to track.
|
|
|
|
Args:
|
|
policy_directory (string): The system path of the policy directory
|
|
that should be monitored. Required.
|
|
policy_store (DictProxy): A dictionary proxy created by the server
|
|
multiprocessing resource manager. Used to store and share the
|
|
policy information across server processes and threads.
|
|
Required.
|
|
live_monitoring (boolean): A boolean indicating whether or not
|
|
live monitoring should continue indefinitely. Optional,
|
|
defaults to True.
|
|
"""
|
|
super(PolicyDirectoryMonitor, self).__init__()
|
|
|
|
self.halt_trigger = multiprocessing.Event()
|
|
self.policy_directory = policy_directory
|
|
self.live_monitoring = live_monitoring
|
|
|
|
self.file_timestamps = None
|
|
self.policy_cache = None
|
|
self.policy_files = None
|
|
self.policy_map = None
|
|
self.policy_store = policy_store
|
|
|
|
self.reserved_policies = ['default', 'public']
|
|
|
|
def interrupt_handler(trigger, frame):
|
|
self.stop()
|
|
signal.signal(signal.SIGINT, interrupt_handler)
|
|
signal.signal(signal.SIGTERM, interrupt_handler)
|
|
|
|
self.logger = logging.getLogger("kmip.server.monitor")
|
|
self.initialize_tracking_structures()
|
|
|
|
def stop(self):
|
|
self.halt_trigger.set()
|
|
|
|
def scan_policies(self):
|
|
"""
|
|
Scan the policy directory for policy data.
|
|
"""
|
|
policy_files = get_json_files(self.policy_directory)
|
|
for f in set(policy_files) - set(self.policy_files):
|
|
self.file_timestamps[f] = 0
|
|
for f in set(self.policy_files) - set(policy_files):
|
|
self.logger.info("Removing policies for file: {}".format(f))
|
|
self.file_timestamps.pop(f, None)
|
|
for p in self.policy_cache.keys():
|
|
self.disassociate_policy_and_file(p, f)
|
|
for p in [k for k, v in self.policy_map.items() if v == f]:
|
|
self.restore_or_delete_policy(p)
|
|
self.policy_files = policy_files
|
|
|
|
for f in sorted(self.file_timestamps.keys()):
|
|
t = os.path.getmtime(f)
|
|
if t > self.file_timestamps[f]:
|
|
self.logger.info("Loading policies for file: {}".format(f))
|
|
self.file_timestamps[f] = t
|
|
old_p = [k for k, v in self.policy_map.items() if v == f]
|
|
try:
|
|
new_p = operation_policy.read_policy_from_file(f)
|
|
except ValueError:
|
|
self.logger.error("Failure loading file: {}".format(f))
|
|
self.logger.debug("", exc_info=True)
|
|
continue
|
|
for p in new_p.keys():
|
|
self.logger.info("Loading policy: {}".format(p))
|
|
if p in self.reserved_policies:
|
|
self.logger.warning(
|
|
"Policy '{}' overwrites a reserved policy and "
|
|
"will be thrown out.".format(p)
|
|
)
|
|
continue
|
|
if p in sorted(self.policy_store.keys()):
|
|
self.logger.debug(
|
|
"Policy '{}' overwrites an existing "
|
|
"policy.".format(p)
|
|
)
|
|
if f != self.policy_map.get(p):
|
|
self.policy_cache.get(p).append(
|
|
(
|
|
time.time(),
|
|
self.policy_map.get(p),
|
|
self.policy_store.get(p)
|
|
)
|
|
)
|
|
else:
|
|
self.policy_cache[p] = []
|
|
self.policy_store[p] = new_p.get(p)
|
|
self.policy_map[p] = f
|
|
for p in set(old_p) - set(new_p.keys()):
|
|
self.disassociate_policy_and_file(p, f)
|
|
self.restore_or_delete_policy(p)
|
|
|
|
def run(self):
|
|
"""
|
|
Start monitoring operation policy files.
|
|
"""
|
|
self.initialize_tracking_structures()
|
|
|
|
if self.live_monitoring:
|
|
self.logger.info("Starting up the operation policy file monitor.")
|
|
while not self.halt_trigger.is_set():
|
|
time.sleep(1)
|
|
self.scan_policies()
|
|
self.logger.info("Stopping the operation policy file monitor.")
|
|
else:
|
|
self.scan_policies()
|
|
|
|
def initialize_tracking_structures(self):
|
|
self.file_timestamps = {}
|
|
self.policy_cache = {}
|
|
self.policy_files = []
|
|
self.policy_map = {}
|
|
|
|
for k in self.policy_store.keys():
|
|
if k not in self.reserved_policies:
|
|
self.policy_store.pop(k, None)
|
|
|
|
def disassociate_policy_and_file(self, policy, file_name):
|
|
c = self.policy_cache.get(policy, [])
|
|
for i in [c.index(e) for e in c if e[1] == file_name][::-1]:
|
|
c.pop(i)
|
|
|
|
def restore_or_delete_policy(self, policy):
|
|
c = self.policy_cache.get(policy, [])
|
|
if len(c) == 0:
|
|
self.logger.info("Removing policy: {}".format(policy))
|
|
self.policy_store.pop(policy, None)
|
|
self.policy_map.pop(policy, None)
|
|
self.policy_cache.pop(policy, None)
|
|
else:
|
|
e = c.pop()
|
|
self.policy_store[policy] = e[2]
|
|
self.policy_map[policy] = e[1]
|