mirror of
https://github.com/OpenKMIP/PyKMIP.git
synced 2025-06-27 17:24:24 +02:00
This change updates the server, integrating policy file monitoring and restructuring the engine. The top-level server entity now handles loading policy files using the PolicyDirectoryMonitor subprocess. A shared memory dictionary is used to share newly modified policy data across the session threads managed by the server and used by the engine. The legacy policy loading code in the engine has been removed. Unit tests have been added and modified for both the server and engine to verify the functionality of these modifications.
2154 lines
72 KiB
Python
2154 lines
72 KiB
Python
# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import logging
|
|
import mock
|
|
import multiprocessing
|
|
import os
|
|
import shutil
|
|
import signal
|
|
import tempfile
|
|
import testtools
|
|
|
|
from kmip.core import enums
|
|
from kmip.services.server import monitor
|
|
|
|
|
|
class TestMonitorUtilities(testtools.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestMonitorUtilities, self).setUp()
|
|
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp_dir)
|
|
|
|
def test_get_json_files(self):
|
|
"""
|
|
Test that all files ending in .json can be collected from a directory.
|
|
"""
|
|
with open(os.path.join(self.tmp_dir, "policy_1.json"), "w") as f:
|
|
f.write('{"policy_1": {}}\n')
|
|
with open(os.path.join(self.tmp_dir, "policy_2.json"), "w") as f:
|
|
f.write('{"policy_2": {}}\n')
|
|
with open(os.path.join(self.tmp_dir, "policy_3.txt"), "w") as f:
|
|
f.write('{"policy_3": {}}\n')
|
|
|
|
result = monitor.get_json_files(self.tmp_dir)
|
|
|
|
self.assertIsInstance(result, list)
|
|
self.assertEqual(2, len(result))
|
|
self.assertIn(os.path.join(self.tmp_dir, "policy_1.json"), result)
|
|
self.assertIn(os.path.join(self.tmp_dir, "policy_2.json"), result)
|
|
|
|
|
|
POLICY_1 = """
|
|
{
|
|
"policy_A": {
|
|
"groups": {
|
|
"group_A": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"DESTROY": "ALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
POLICY_2 = """
|
|
{
|
|
"policy_B": {
|
|
"groups": {
|
|
"group_B": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"LOCATE": "ALLOW_ALL",
|
|
"DESTROY": "ALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"policy_C": {
|
|
"groups": {
|
|
"group_C": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"DESTROY": "DISALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
POLICY_3 = """
|
|
{
|
|
"policy_B": {
|
|
"groups": {
|
|
"group_B": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "DISALLOW_ALL",
|
|
"LOCATE": "DISALLOW_ALL",
|
|
"DESTROY": "DISALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
POLICY_4 = """
|
|
{
|
|
"default": {
|
|
"groups": {
|
|
"group_B": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "DISALLOW_ALL",
|
|
"LOCATE": "DISALLOW_ALL",
|
|
"DESTROY": "DISALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
POLICY_5 = """
|
|
{
|
|
"policy_B": {
|
|
"groups": {
|
|
"group_B": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"LOCATE": "ALLOW_ALL",
|
|
"DESTROY": "ALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"policy_D": {
|
|
"groups": {
|
|
"group_D": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"DESTROY": "DISALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
POLICY_6 = """
|
|
{
|
|
"policy_A": {
|
|
"groups": {
|
|
"group_A": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"DESTROY": "ALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
},
|
|
"policy_E": {
|
|
"groups": {
|
|
"group_E": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "ALLOW_ALL",
|
|
"CHECK": "ALLOW_OWNER",
|
|
"DESTROY": "ALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
POLICY_7 = """
|
|
{
|
|
"policy_D": {
|
|
"groups": {
|
|
"group_D": {
|
|
"SYMMETRIC_KEY": {
|
|
"GET": "DISALLOW_ALL",
|
|
"LOCATE": "DISALLOW_ALL",
|
|
"DESTROY": "DISALLOW_ALL"
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def write_file(path, file_name, content):
|
|
with open(os.path.join(path, file_name), "w") as f:
|
|
f.write("{}\n".format(content))
|
|
|
|
|
|
def side_effects(effects):
|
|
for effect in effects:
|
|
if isinstance(effect, bool):
|
|
yield effect
|
|
else:
|
|
effect()
|
|
yield False
|
|
|
|
|
|
def build_write_effect(path, file_name, content):
|
|
def side_effect():
|
|
write_file(path, file_name, content)
|
|
return side_effect
|
|
|
|
|
|
def build_delete_effect(path, file_name):
|
|
def side_effect():
|
|
os.remove(os.path.join(path, file_name))
|
|
return side_effect
|
|
|
|
|
|
class TestPolicyDirectoryMonitor(testtools.TestCase):
|
|
|
|
def setUp(self):
|
|
super(TestPolicyDirectoryMonitor, self).setUp()
|
|
|
|
self.tmp_dir = tempfile.mkdtemp()
|
|
self.addCleanup(shutil.rmtree, self.tmp_dir)
|
|
|
|
def test_init(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can be instantiated without error.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
|
|
self.assertIsInstance(
|
|
m.halt_trigger,
|
|
multiprocessing.synchronize.Event
|
|
)
|
|
self.assertEqual(self.tmp_dir, m.policy_directory)
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
self.assertEqual(['default', 'public'], m.reserved_policies)
|
|
self.assertIsInstance(m.logger, logging.Logger)
|
|
|
|
def test_signal_handler(self):
|
|
"""
|
|
Test that the signal handler for SIGINT and SIGTERM correctly stops
|
|
the monitor.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.stop = mock.MagicMock()
|
|
handler = signal.getsignal(signal.SIGINT)
|
|
|
|
m.stop.assert_not_called()
|
|
handler(None, None)
|
|
m.stop.assert_called()
|
|
|
|
def test_stop(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor processes stop calls correctly.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
|
|
self.assertFalse(m.halt_trigger.is_set())
|
|
|
|
m.stop()
|
|
|
|
self.assertTrue(m.halt_trigger.is_set())
|
|
|
|
def test_run(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = [False, True]
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_C": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_policy_overloading(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when one policy overloads another existing policy.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = [False, True]
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
write_file(self.tmp_dir, "policy_3.json", POLICY_3)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_3.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_B' overwrites an existing policy."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_3.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
|
|
cache = m.policy_cache.get("policy_A")
|
|
self.assertEqual(0, len(cache))
|
|
cache = m.policy_cache.get("policy_B")
|
|
self.assertEqual(1, len(cache))
|
|
self.assertEqual(
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
cache[0][1]
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
'groups': {
|
|
'group_B': {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY:
|
|
enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
cache[0][2]
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_policy_load_failure(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when one policy can't be loaded properly.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = [False, True]
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", "not a JSON blob")
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.error.assert_any_call(
|
|
"Failure loading file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.debug.assert_called()
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(1, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
|
|
def test_run_with_policy_load_failure_and_fix(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when one policy can't be loaded properly and is
|
|
then fixed while tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(
|
|
self.tmp_dir,
|
|
"policy_2.json",
|
|
"invalid JSON"
|
|
),
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_2.json", POLICY_2),
|
|
False,
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.error.assert_any_call(
|
|
"Failure loading file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.debug.assert_called()
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_C": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_policy_overloading_reserved(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when one policy can't be loaded properly.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = [False, True]
|
|
|
|
write_file(self.tmp_dir, "policy_3.json", POLICY_3)
|
|
write_file(self.tmp_dir, "policy_4.json", POLICY_4)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_3.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_4.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: default")
|
|
m.logger.warning.assert_any_call(
|
|
"Policy 'default' overwrites a reserved policy and will be "
|
|
"thrown out."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_3.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_4.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_B": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(1, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
|
|
def test_run_with_edit_modifying_existing_file(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when an existing policy file is modified while
|
|
tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_2.json", POLICY_5),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_D")
|
|
m.logger.info.assert_any_call("Removing policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_D", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_D": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_D": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_D", None)
|
|
)
|
|
|
|
def test_run_with_edit_adding_to_existing_file(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when an existing policy file is added to while
|
|
tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_1.json", POLICY_6),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call("Loading policy: policy_E")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_E", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_C": [],
|
|
"policy_E": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(4, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_E": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.CHECK: enums.Policy.ALLOW_OWNER,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_E", None)
|
|
)
|
|
|
|
def test_run_with_edit_deleting_from_existing_file(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when an existing policy file has content removed
|
|
while tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_1.json", POLICY_1),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_6)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call("Loading policy: policy_E")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call("Removing policy: policy_E")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_C": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_deleting_existing_file(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when an existing policy file is removed while
|
|
tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_delete_effect(self.tmp_dir, "policy_1.json"),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_6)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call("Loading policy: policy_E")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Removing policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Removing policy: policy_A")
|
|
m.logger.info.assert_any_call("Removing policy: policy_E")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(1, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(2, len(m.policy_map.keys()))
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_B": [],
|
|
"policy_C": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_adding_new_file(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when a new policy file is added while tracking is
|
|
active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_2.json", POLICY_2),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.debug.assert_not_called()
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_C": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_adding_new_file_overloading(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when new policy files are added overwritting
|
|
existing policies while tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_3.json", POLICY_2),
|
|
build_write_effect(self.tmp_dir, "policy_4.json", POLICY_3),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_3.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_B' overwrites an existing policy."
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_C' overwrites an existing policy."
|
|
)
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_4.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_B' overwrites an existing policy."
|
|
)
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(4, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_3.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_4.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
|
|
self.assertEqual([], m.policy_cache.get("policy_A"))
|
|
cache = m.policy_cache.get("policy_B")
|
|
self.assertEqual(2, len(cache))
|
|
self.assertEqual(
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
cache[0][1]
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
'groups': {
|
|
'group_B': {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY:
|
|
enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
cache[0][2]
|
|
)
|
|
self.assertEqual(
|
|
os.path.join(self.tmp_dir, "policy_3.json"),
|
|
cache[1][1]
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
'groups': {
|
|
'group_B': {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY:
|
|
enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
cache[1][2]
|
|
)
|
|
cache = m.policy_cache.get("policy_C")
|
|
self.assertEqual(1, len(cache))
|
|
self.assertEqual(
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
cache[0][1]
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
'groups': {
|
|
'group_C': {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET:
|
|
enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY:
|
|
enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
cache[0][2]
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_run_with_adding_new_file_editing_overloading(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly, even when new policy files are added overwritting
|
|
existing policies while tracking is active.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = side_effects(
|
|
[
|
|
False,
|
|
build_write_effect(self.tmp_dir, "policy_3.json", POLICY_2),
|
|
build_write_effect(self.tmp_dir, "policy_4.json", POLICY_3),
|
|
build_delete_effect(self.tmp_dir, "policy_2.json"),
|
|
build_write_effect(self.tmp_dir, "policy_4.json", POLICY_7),
|
|
True
|
|
]
|
|
)
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Starting up the operation policy file monitor."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_3.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_B' overwrites an existing policy."
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_C' overwrites an existing policy."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_4.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.debug.assert_any_call(
|
|
"Policy 'policy_B' overwrites an existing policy."
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Removing policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_4.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_D")
|
|
m.logger.info.assert_any_call(
|
|
"Stopping the operation policy file monitor."
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_3.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_4.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_D", None))
|
|
|
|
self.assertEqual([], m.policy_cache.get("policy_A"))
|
|
self.assertEqual([], m.policy_cache.get("policy_B"))
|
|
self.assertEqual([], m.policy_cache.get("policy_C"))
|
|
|
|
self.assertEqual(4, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_D": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.DISALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_D", None)
|
|
)
|
|
|
|
def test_run_without_live_monitoring(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can load policy files and track
|
|
them properly even when operating in a one-shot scanning mode.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict(),
|
|
live_monitoring=False
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
m.halt_trigger = mock.MagicMock(multiprocessing.synchronize.Event)
|
|
m.halt_trigger.is_set.side_effect = [False, True]
|
|
|
|
write_file(self.tmp_dir, "policy_1.json", POLICY_1)
|
|
write_file(self.tmp_dir, "policy_2.json", POLICY_2)
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual([], m.policy_store.keys())
|
|
|
|
m.run()
|
|
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_A")
|
|
m.logger.info.assert_any_call(
|
|
"Loading policies for file: {}".format(
|
|
os.path.join(self.tmp_dir, "policy_2.json")
|
|
)
|
|
)
|
|
m.logger.info.assert_any_call("Loading policy: policy_B")
|
|
m.logger.info.assert_any_call("Loading policy: policy_C")
|
|
|
|
self.assertEqual(2, len(m.policy_files))
|
|
path = os.path.join(self.tmp_dir, "policy_1.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_A", None))
|
|
|
|
path = os.path.join(self.tmp_dir, "policy_2.json")
|
|
self.assertEqual(
|
|
os.path.getmtime(path),
|
|
m.file_timestamps.get(path, None)
|
|
)
|
|
self.assertIn(path, m.policy_files)
|
|
self.assertEqual(path, m.policy_map.get("policy_B", None))
|
|
self.assertEqual(path, m.policy_map.get("policy_C", None))
|
|
|
|
self.assertEqual(
|
|
{
|
|
"policy_A": [],
|
|
"policy_B": [],
|
|
"policy_C": []
|
|
},
|
|
m.policy_cache
|
|
)
|
|
|
|
self.assertEqual(3, len(m.policy_store.keys()))
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_A": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_A", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_B": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.LOCATE: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.ALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_B", None)
|
|
)
|
|
self.assertEqual(
|
|
{
|
|
"groups": {
|
|
"group_C": {
|
|
enums.ObjectType.SYMMETRIC_KEY: {
|
|
enums.Operation.GET: enums.Policy.ALLOW_ALL,
|
|
enums.Operation.DESTROY: enums.Policy.DISALLOW_ALL
|
|
}
|
|
}
|
|
}
|
|
},
|
|
m.policy_store.get("policy_C", None)
|
|
)
|
|
|
|
def test_initialize_tracking_structures(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can correctly initialize/reset the
|
|
various tracking structures used for file monitoring.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
|
|
m.file_timestamps["a"] = 1234
|
|
m.policy_cache["a"] = (123.12, "b", {"c": 2})
|
|
m.policy_files = ["a", "b"]
|
|
m.policy_map["a"] = "b"
|
|
m.policy_store["a"] = {"c": 2}
|
|
m.policy_store["default"] = {"c": 3}
|
|
|
|
m.initialize_tracking_structures()
|
|
|
|
self.assertEqual({}, m.file_timestamps)
|
|
self.assertEqual({}, m.policy_cache)
|
|
self.assertEqual([], m.policy_files)
|
|
self.assertEqual({}, m.policy_map)
|
|
self.assertEqual(["default"], m.policy_store.keys())
|
|
self.assertEqual({"c": 3}, m.policy_store.get("default"))
|
|
|
|
def test_disassociate_policy_and_file(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can correctly unlink a policy and
|
|
a policy file in its tracking structures.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
|
|
m.policy_cache = {
|
|
"policy_A": [
|
|
(
|
|
1480043060.870089,
|
|
os.path.join(self.tmp_dir, "policy_1.json"),
|
|
{}
|
|
),
|
|
(
|
|
1480043062.02171,
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
{}
|
|
),
|
|
(
|
|
1480043062.645776,
|
|
os.path.join(self.tmp_dir, "policy_1.json"),
|
|
{}
|
|
),
|
|
(
|
|
1480043063.453713,
|
|
os.path.join(self.tmp_dir, "policy_3.json"),
|
|
{}
|
|
)
|
|
],
|
|
"policy_B": [
|
|
(
|
|
1480043123.65311,
|
|
os.path.join(self.tmp_dir, "policy_1.json"),
|
|
{}
|
|
)
|
|
]
|
|
}
|
|
|
|
m.disassociate_policy_and_file(
|
|
"policy_A",
|
|
os.path.join(self.tmp_dir, "policy_1.json")
|
|
)
|
|
|
|
self.assertEqual(
|
|
[
|
|
(
|
|
1480043062.02171,
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
{}
|
|
),
|
|
(
|
|
1480043063.453713,
|
|
os.path.join(self.tmp_dir, "policy_3.json"),
|
|
{}
|
|
)
|
|
],
|
|
m.policy_cache.get("policy_A", [])
|
|
)
|
|
self.assertEqual(
|
|
[
|
|
(
|
|
1480043123.65311,
|
|
os.path.join(self.tmp_dir, "policy_1.json"),
|
|
{}
|
|
)
|
|
],
|
|
m.policy_cache.get("policy_B", [])
|
|
)
|
|
|
|
def test_restore_or_delete_policy_restore(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can correctly restore policy data
|
|
upon a policy file change.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
|
|
m.policy_cache = {
|
|
"policy_A": [
|
|
(
|
|
1480043060.870089,
|
|
os.path.join(self.tmp_dir, "policy_1.json"),
|
|
{'{"policy_1"}'}
|
|
),
|
|
(
|
|
1480043062.02171,
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
{'{"policy_2"}'}
|
|
),
|
|
(
|
|
1480043063.453713,
|
|
os.path.join(self.tmp_dir, "policy_3.json"),
|
|
{'{"policy_3"}'}
|
|
)
|
|
]
|
|
}
|
|
m.policy_store["policy_A"] = {'{"policy_4"}'}
|
|
m.policy_map["policy_A"] = os.path.join(self.tmp_dir, "policy_4.json")
|
|
|
|
m.restore_or_delete_policy("policy_A")
|
|
|
|
m.logger.info.assert_not_called()
|
|
self.assertEqual(
|
|
[
|
|
(
|
|
1480043060.870089,
|
|
os.path.join(self.tmp_dir, "policy_1.json"),
|
|
{'{"policy_1"}'}
|
|
),
|
|
(
|
|
1480043062.02171,
|
|
os.path.join(self.tmp_dir, "policy_2.json"),
|
|
{'{"policy_2"}'}
|
|
)
|
|
],
|
|
m.policy_cache.get("policy_A", [])
|
|
)
|
|
self.assertEqual(
|
|
{'{"policy_3"}'},
|
|
m.policy_store.get("policy_A", {})
|
|
)
|
|
self.assertEqual(
|
|
os.path.join(self.tmp_dir, "policy_3.json"),
|
|
m.policy_map.get("policy_A", None)
|
|
)
|
|
|
|
def test_restore_or_delete_policy_delete(self):
|
|
"""
|
|
Test that the PolicyDirectoryMonitor can correctly delete policy data
|
|
upon a policy file change.
|
|
"""
|
|
m = monitor.PolicyDirectoryMonitor(
|
|
self.tmp_dir,
|
|
multiprocessing.Manager().dict()
|
|
)
|
|
m.logger = mock.MagicMock(logging.Logger)
|
|
|
|
m.policy_cache = {
|
|
"policy_A": []
|
|
}
|
|
m.policy_store["policy_A"] = {'{"policy_4"}'}
|
|
m.policy_map["policy_A"] = os.path.join(self.tmp_dir, "policy_4.json")
|
|
|
|
m.restore_or_delete_policy("policy_A")
|
|
|
|
m.logger.info.assert_called_once_with("Removing policy: policy_A")
|
|
self.assertNotIn("policy_A", m.policy_cache.keys())
|
|
self.assertNotIn("policy_A", m.policy_store.keys())
|
|
self.assertNotIn("policy_A", m.policy_map.keys())
|