PyKMIP/kmip/tests/unit/services/server/test_monitor.py
Peter Hamilton 055483d663 Add a policy directory monitor
This change adds a PolicyDirectoryMonitor subprocess that can be
used by the server to continuously monitor and load operation
policies from the configured operation policy directory. The
monitor tracks policy file modifications, file creation, and file
deletion, restoring legacy policies from existing policy files
should the current file backing a policy get deleted. Changes to
existing policies are detected and updated as soon as the backing
policy file is saved to disk.

An extensive unit test is included to exercise the different
operating conditions the monitor may encounter.
2018-04-10 14:32:27 -04:00

2071 lines
69 KiB
Python

# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mock
import multiprocessing
import os
import shutil
import signal
import tempfile
import testtools
from kmip.core import enums
from kmip.services.server import monitor
class TestMonitorUtilities(testtools.TestCase):
def setUp(self):
super(TestMonitorUtilities, self).setUp()
self.tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp_dir)
def test_get_json_files(self):
"""
Test that all files ending in .json can be collected from a directory.
"""
with open(os.path.join(self.tmp_dir, "policy_1.json"), "w") as f:
f.write('{"policy_1": {}}\n')
with open(os.path.join(self.tmp_dir, "policy_2.json"), "w") as f:
f.write('{"policy_2": {}}\n')
with open(os.path.join(self.tmp_dir, "policy_3.txt"), "w") as f:
f.write('{"policy_3": {}}\n')
result = monitor.get_json_files(self.tmp_dir)
self.assertIsInstance(result, list)
self.assertEqual(2, len(result))
self.assertIn(os.path.join(self.tmp_dir, "policy_1.json"), result)
self.assertIn(os.path.join(self.tmp_dir, "policy_2.json"), result)
POLICY_1 = """
{
"policy_A": {
"groups": {
"group_A": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"DESTROY": "ALLOW_ALL"
}
}
}
}
}
"""
POLICY_2 = """
{
"policy_B": {
"groups": {
"group_B": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"LOCATE": "ALLOW_ALL",
"DESTROY": "ALLOW_ALL"
}
}
}
},
"policy_C": {
"groups": {
"group_C": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
}
}
}
"""
POLICY_3 = """
{
"policy_B": {
"groups": {
"group_B": {
"SYMMETRIC_KEY": {
"GET": "DISALLOW_ALL",
"LOCATE": "DISALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
}
}
}
"""
POLICY_4 = """
{
"default": {
"groups": {
"group_B": {
"SYMMETRIC_KEY": {
"GET": "DISALLOW_ALL",
"LOCATE": "DISALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
}
}
}
"""
POLICY_5 = """
{
"policy_B": {
"groups": {
"group_B": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"LOCATE": "ALLOW_ALL",
"DESTROY": "ALLOW_ALL"
}
}
}
},
"policy_D": {
"groups": {
"group_D": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
}
}
}
"""
POLICY_6 = """
{
"policy_A": {
"groups": {
"group_A": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"DESTROY": "ALLOW_ALL"
}
}
}
},
"policy_E": {
"groups": {
"group_E": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"CHECK": "ALLOW_OWNER",
"DESTROY": "ALLOW_ALL"
}
}
}
}
}
"""
POLICY_7 = """
{
"policy_D": {
"groups": {
"group_D": {
"SYMMETRIC_KEY": {
"GET": "DISALLOW_ALL",
"LOCATE": "DISALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
}
}
}
"""
def write_file(path, file_name, content):
with open(os.path.join(path, file_name), "w") as f:
f.write("{}\n".format(content))
def side_effects(effects):
for effect in effects:
if isinstance(effect, bool):
yield effect
else:
effect()
yield False
def build_write_effect(path, file_name, content):
def side_effect():
write_file(path, file_name, content)
return side_effect
def build_delete_effect(path, file_name):
def side_effect():
os.remove(os.path.join(path, file_name))
return side_effect
class TestPolicyDirectoryMonitor(testtools.TestCase):
def setUp(self):
super(TestPolicyDirectoryMonitor, self).setUp()
self.tmp_dir = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, self.tmp_dir)
def test_init(self):
"""
Test that the PolicyDirectoryMonitor can be instantiated without error.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
self.assertIsInstance(
m.halt_trigger,
multiprocessing.synchronize.Event
)
self.assertEqual(self.tmp_dir, m.policy_directory)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
self.assertEqual(['default', 'public'], m.reserved_policies)
self.assertIsInstance(m.logger, logging.Logger)
def test_signal_handler(self):
"""
Test that the signal handler for SIGINT and SIGTERM correctly stops
the monitor.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.stop = mock.MagicMock()
handler = signal.getsignal(signal.SIGINT)
m.stop.assert_not_called()
handler(None, None)
m.stop.assert_called()
def test_stop(self):
"""
Test that the PolicyDirectoryMonitor processes stop calls correctly.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
self.assertFalse(m.halt_trigger.is_set())
m.stop()
self.assertTrue(m.halt_trigger.is_set())
def test_run(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = [False, True]
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
self.assertEqual(
{
"policy_A": [],
"policy_B": [],
"policy_C": []
},
m.policy_cache
)
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_policy_overloading(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when one policy overloads another existing policy.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = [False, True]
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
write_file(self.tmp_dir, "policy_3.json", POLICY_3)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_3.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.debug.assert_any_call(
"Policy 'policy_B' overwrites an existing policy."
)
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(3, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_C", None))
path = os.path.join(self.tmp_dir, "policy_3.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
cache = m.policy_cache.get("policy_A")
self.assertEqual(0, len(cache))
cache = m.policy_cache.get("policy_B")
self.assertEqual(1, len(cache))
self.assertEqual(
os.path.join(self.tmp_dir, "policy_2.json"),
cache[0][1]
)
self.assertEqual(
{
'groups': {
'group_B': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET:
enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE:
enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY:
enums.Policy.ALLOW_ALL
}
}
}
},
cache[0][2]
)
# self.assertEqual(
# {
# 'policy_A': [],
# 'policy_B': [
# (
# 1480043060.870089,
# os.path.join(self.tmp_dir, "policy_2.json"),
# {
# 'groups': {
# 'group_B': {
# enums.ObjectType.SYMMETRIC_KEY: {
# enums.Operation.GET:
# enums.Policy.ALLOW_ALL,
# enums.Operation.LOCATE:
# enums.Policy.ALLOW_ALL,
# enums.Operation.DESTROY:
# enums.Policy.ALLOW_ALL
# }
# }
# }
# }
# )
# ],
# 'policy_C': []
# },
# m.policy_cache
# )
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_policy_load_failure(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when one policy can't be loaded properly.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = [False, True]
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", "not a JSON blob")
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.error.assert_any_call(
"Failure loading file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.debug.assert_called()
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(
{
"policy_A": []
},
m.policy_cache
)
self.assertEqual(1, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
def test_run_with_policy_load_failure_and_fix(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when one policy can't be loaded properly and is
then fixed while tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(
self.tmp_dir,
"policy_2.json",
"invalid JSON"
),
False,
build_write_effect(self.tmp_dir, "policy_2.json", POLICY_2),
False,
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.error.assert_any_call(
"Failure loading file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.debug.assert_called()
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
self.assertEqual(
{
"policy_A": [],
"policy_B": [],
"policy_C": []
},
m.policy_cache
)
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_policy_overloading_reserved(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when one policy can't be loaded properly.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = [False, True]
write_file(self.tmp_dir, "policy_3.json", POLICY_3)
write_file(self.tmp_dir, "policy_4.json", POLICY_4)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_3.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_4.json")
)
)
m.logger.info.assert_any_call("Loading policy: default")
m.logger.warning.assert_any_call(
"Policy 'default' overwrites a reserved policy and will be "
"thrown out."
)
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_3.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
path = os.path.join(self.tmp_dir, "policy_4.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(
{
"policy_B": []
},
m.policy_cache
)
self.assertEqual(1, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
def test_run_with_edit_modifying_existing_file(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when an existing policy file is modified while
tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(self.tmp_dir, "policy_2.json", POLICY_5),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_D")
m.logger.info.assert_any_call("Removing policy: policy_C")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_D", None))
self.assertEqual(
{
"policy_A": [],
"policy_B": [],
"policy_D": []
},
m.policy_cache
)
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_D": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_D", None)
)
def test_run_with_edit_adding_to_existing_file(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when an existing policy file is added to while
tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(self.tmp_dir, "policy_1.json", POLICY_6),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call("Loading policy: policy_E")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
self.assertEqual(path, m.policy_map.get("policy_E", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
self.assertEqual(
{
"policy_A": [],
"policy_B": [],
"policy_C": [],
"policy_E": []
},
m.policy_cache
)
self.assertEqual(4, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
self.assertEqual(
{
"groups": {
"group_E": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_E", None)
)
def test_run_with_edit_deleting_from_existing_file(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when an existing policy file has content removed
while tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(self.tmp_dir, "policy_1.json", POLICY_1),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_6)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call("Loading policy: policy_E")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call("Removing policy: policy_E")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
self.assertEqual(
{
"policy_A": [],
"policy_B": [],
"policy_C": []
},
m.policy_cache
)
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_deleting_existing_file(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when an existing policy file is removed while
tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_delete_effect(self.tmp_dir, "policy_1.json"),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_6)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call("Loading policy: policy_E")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Removing policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Removing policy: policy_A")
m.logger.info.assert_any_call("Removing policy: policy_E")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(1, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(2, len(m.policy_map.keys()))
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
self.assertEqual(
{
"policy_B": [],
"policy_C": []
},
m.policy_cache
)
self.assertEqual(2, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_adding_new_file(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when a new policy file is added while tracking is
active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(self.tmp_dir, "policy_2.json", POLICY_2),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.debug.assert_not_called()
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(2, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
self.assertEqual(
{
"policy_A": [],
"policy_B": [],
"policy_C": []
},
m.policy_cache
)
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_adding_new_file_overloading(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when new policy files are added overwritting
existing policies while tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(self.tmp_dir, "policy_3.json", POLICY_2),
build_write_effect(self.tmp_dir, "policy_4.json", POLICY_3),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_3.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.debug.assert_any_call(
"Policy 'policy_B' overwrites an existing policy."
)
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.debug.assert_any_call(
"Policy 'policy_C' overwrites an existing policy."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_4.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.debug.assert_any_call(
"Policy 'policy_B' overwrites an existing policy."
)
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(4, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_2.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
path = os.path.join(self.tmp_dir, "policy_3.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_C", None))
path = os.path.join(self.tmp_dir, "policy_4.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual([], m.policy_cache.get("policy_A"))
cache = m.policy_cache.get("policy_B")
self.assertEqual(2, len(cache))
self.assertEqual(
os.path.join(self.tmp_dir, "policy_2.json"),
cache[0][1]
)
self.assertEqual(
{
'groups': {
'group_B': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET:
enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE:
enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY:
enums.Policy.ALLOW_ALL
}
}
}
},
cache[0][2]
)
self.assertEqual(
os.path.join(self.tmp_dir, "policy_3.json"),
cache[1][1]
)
self.assertEqual(
{
'groups': {
'group_B': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET:
enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE:
enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY:
enums.Policy.ALLOW_ALL
}
}
}
},
cache[1][2]
)
cache = m.policy_cache.get("policy_C")
self.assertEqual(1, len(cache))
self.assertEqual(
os.path.join(self.tmp_dir, "policy_2.json"),
cache[0][1]
)
self.assertEqual(
{
'groups': {
'group_C': {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET:
enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY:
enums.Policy.DISALLOW_ALL
}
}
}
},
cache[0][2]
)
self.assertEqual(3, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
def test_run_with_adding_new_file_editing_overloading(self):
"""
Test that the PolicyDirectoryMonitor can load policy files and track
them properly, even when new policy files are added overwritting
existing policies while tracking is active.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
m.halt_trigger.is_set.side_effect = side_effects(
[
False,
build_write_effect(self.tmp_dir, "policy_3.json", POLICY_2),
build_write_effect(self.tmp_dir, "policy_4.json", POLICY_3),
build_delete_effect(self.tmp_dir, "policy_2.json"),
build_write_effect(self.tmp_dir, "policy_4.json", POLICY_7),
True
]
)
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
m.run()
m.logger.info.assert_any_call(
"Starting up the operation policy file monitor."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_1.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_A")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_3.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.debug.assert_any_call(
"Policy 'policy_B' overwrites an existing policy."
)
m.logger.info.assert_any_call("Loading policy: policy_C")
m.logger.debug.assert_any_call(
"Policy 'policy_C' overwrites an existing policy."
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_4.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_B")
m.logger.debug.assert_any_call(
"Policy 'policy_B' overwrites an existing policy."
)
m.logger.info.assert_any_call(
"Removing policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_2.json")
)
)
m.logger.info.assert_any_call(
"Loading policies for file: {}".format(
os.path.join(self.tmp_dir, "policy_4.json")
)
)
m.logger.info.assert_any_call("Loading policy: policy_D")
m.logger.info.assert_any_call(
"Stopping the operation policy file monitor."
)
self.assertEqual(3, len(m.policy_files))
path = os.path.join(self.tmp_dir, "policy_1.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_A", None))
path = os.path.join(self.tmp_dir, "policy_3.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_B", None))
self.assertEqual(path, m.policy_map.get("policy_C", None))
path = os.path.join(self.tmp_dir, "policy_4.json")
self.assertEqual(
os.path.getmtime(path),
m.file_timestamps.get(path, None)
)
self.assertIn(path, m.policy_files)
self.assertEqual(path, m.policy_map.get("policy_D", None))
self.assertEqual([], m.policy_cache.get("policy_A"))
self.assertEqual([], m.policy_cache.get("policy_B"))
self.assertEqual([], m.policy_cache.get("policy_C"))
self.assertEqual(4, len(m.policy_store.keys()))
self.assertEqual(
{
"groups": {
"group_A": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_A", None)
)
self.assertEqual(
{
"groups": {
"group_B": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
}
}
}
},
m.policy_store.get("policy_B", None)
)
self.assertEqual(
{
"groups": {
"group_C": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.ALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_C", None)
)
self.assertEqual(
{
"groups": {
"group_D": {
enums.ObjectType.SYMMETRIC_KEY: {
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
}
}
}
},
m.policy_store.get("policy_D", None)
)
def test_initialize_tracking_structures(self):
"""
Test that the PolicyDirectoryMonitor can correctly initialize/reset the
various tracking structures used for file monitoring.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.file_timestamps["a"] = 1234
m.policy_cache["a"] = (123.12, "b", {"c": 2})
m.policy_files = ["a", "b"]
m.policy_map["a"] = "b"
m.policy_store["a"] = {"c": 2}
m.initialize_tracking_structures()
self.assertEqual({}, m.file_timestamps)
self.assertEqual({}, m.policy_cache)
self.assertEqual([], m.policy_files)
self.assertEqual({}, m.policy_map)
self.assertEqual([], m.policy_store.keys())
def test_disassociate_policy_and_file(self):
"""
Test that the PolicyDirectoryMonitor can correctly unlink a policy and
a policy file in its tracking structures.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.policy_cache = {
"policy_A": [
(
1480043060.870089,
os.path.join(self.tmp_dir, "policy_1.json"),
{}
),
(
1480043062.02171,
os.path.join(self.tmp_dir, "policy_2.json"),
{}
),
(
1480043062.645776,
os.path.join(self.tmp_dir, "policy_1.json"),
{}
),
(
1480043063.453713,
os.path.join(self.tmp_dir, "policy_3.json"),
{}
)
],
"policy_B": [
(
1480043123.65311,
os.path.join(self.tmp_dir, "policy_1.json"),
{}
)
]
}
m.disassociate_policy_and_file(
"policy_A",
os.path.join(self.tmp_dir, "policy_1.json")
)
self.assertEqual(
[
(
1480043062.02171,
os.path.join(self.tmp_dir, "policy_2.json"),
{}
),
(
1480043063.453713,
os.path.join(self.tmp_dir, "policy_3.json"),
{}
)
],
m.policy_cache.get("policy_A", [])
)
self.assertEqual(
[
(
1480043123.65311,
os.path.join(self.tmp_dir, "policy_1.json"),
{}
)
],
m.policy_cache.get("policy_B", [])
)
def test_restore_or_delete_policy_restore(self):
"""
Test that the PolicyDirectoryMonitor can correctly restore policy data
upon a policy file change.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.policy_cache = {
"policy_A": [
(
1480043060.870089,
os.path.join(self.tmp_dir, "policy_1.json"),
{'{"policy_1"}'}
),
(
1480043062.02171,
os.path.join(self.tmp_dir, "policy_2.json"),
{'{"policy_2"}'}
),
(
1480043063.453713,
os.path.join(self.tmp_dir, "policy_3.json"),
{'{"policy_3"}'}
)
]
}
m.policy_store["policy_A"] = {'{"policy_4"}'}
m.policy_map["policy_A"] = os.path.join(self.tmp_dir, "policy_4.json")
m.restore_or_delete_policy("policy_A")
m.logger.info.assert_not_called()
self.assertEqual(
[
(
1480043060.870089,
os.path.join(self.tmp_dir, "policy_1.json"),
{'{"policy_1"}'}
),
(
1480043062.02171,
os.path.join(self.tmp_dir, "policy_2.json"),
{'{"policy_2"}'}
)
],
m.policy_cache.get("policy_A", [])
)
self.assertEqual(
{'{"policy_3"}'},
m.policy_store.get("policy_A", {})
)
self.assertEqual(
os.path.join(self.tmp_dir, "policy_3.json"),
m.policy_map.get("policy_A", None)
)
def test_restore_or_delete_policy_delete(self):
"""
Test that the PolicyDirectoryMonitor can correctly delete policy data
upon a policy file change.
"""
m = monitor.PolicyDirectoryMonitor(
self.tmp_dir,
multiprocessing.Manager().dict()
)
m.logger = mock.MagicMock(logging.Logger)
m.policy_cache = {
"policy_A": []
}
m.policy_store["policy_A"] = {'{"policy_4"}'}
m.policy_map["policy_A"] = os.path.join(self.tmp_dir, "policy_4.json")
m.restore_or_delete_policy("policy_A")
m.logger.info.assert_called_once_with("Removing policy: policy_A")
self.assertNotIn("policy_A", m.policy_cache.keys())
self.assertNotIn("policy_A", m.policy_store.keys())
self.assertNotIn("policy_A", m.policy_map.keys())