Add functional tests for server auth and access control

This change adds a new integration test suite, named 'functional',
that is specifically intended to test third-party authentication
and group-based access control with the PyKMIP server. A new tox
environment is added to handle running these tests separately from
the existing 'integration' test suite. New Travis CI configuration
and setup files have also been added to facilitate running these
tests automatically.
This commit is contained in:
Peter Hamilton 2018-05-08 17:28:01 -04:00
parent 2dacdf7bd4
commit b5e7323845
15 changed files with 464 additions and 1 deletions

View File

@ -18,6 +18,14 @@ matrix:
os: linux
dist: trusty
env: TOXENV=py27 RUN_INTEGRATION_TESTS=1
- python: 2.7
os: linux
dist: precise
env: TOXENV=py27 RUN_INTEGRATION_TESTS=2
- python: 2.7
os: linux
dist: trusty
env: TOXENV=py27 RUN_INTEGRATION_TESTS=2
- python: 3.4
os: linux
dist: precise
@ -34,6 +42,14 @@ matrix:
os: linux
dist: trusty
env: TOXENV=py34 RUN_INTEGRATION_TESTS=1
- python: 3.4
os: linux
dist: precise
env: TOXENV=py34 RUN_INTEGRATION_TESTS=2
- python: 3.4
os: linux
dist: trusty
env: TOXENV=py34 RUN_INTEGRATION_TESTS=2
- python: 3.5
os: linux
dist: precise
@ -50,6 +66,14 @@ matrix:
os: linux
dist: trusty
env: TOXENV=py35 RUN_INTEGRATION_TESTS=1
- python: 3.5
os: linux
dist: precise
env: TOXENV=py35 RUN_INTEGRATION_TESTS=2
- python: 3.5
os: linux
dist: trusty
env: TOXENV=py35 RUN_INTEGRATION_TESTS=2
- python: 3.6
os: linux
dist: precise
@ -66,6 +90,14 @@ matrix:
os: linux
dist: trusty
env: TOXENV=py36 RUN_INTEGRATION_TESTS=1
- python: 3.6
os: linux
dist: precise
env: TOXENV=py36 RUN_INTEGRATION_TESTS=2
- python: 3.6
os: linux
dist: trusty
env: TOXENV=py36 RUN_INTEGRATION_TESTS=2
- python: 2.7
os: linux
dist: precise
@ -91,9 +123,14 @@ matrix:
dist: trusty
env: TOXENV=docs RUN_INTEGRATION_TESTS=0
install:
# Pin six to >= 1.11.0 to avoid setuptools/pip race condition
# For more info, see: https://github.com/OpenKMIP/PyKMIP/issues/435
- pip uninstall -y six
- pip install six>=1.11.0
- pip install tox
- pip install bandit
- pip install codecov
- pip install slugs
- python setup.py install
script:
- ./.travis/run.sh

View File

@ -0,0 +1 @@
Dummy file to ensure ./certs gets copied with the ./pykmip directory.

View File

@ -0,0 +1,51 @@
[john_doe]
host=127.0.0.1
port=5696
certfile=/tmp/pykmip/certs/client_certificate_john_doe.pem
keyfile=/tmp/pykmip/certs/client_key_john_doe.pem
ca_certs=/tmp/pykmip/certs/root_certificate.pem
cert_reqs=CERT_REQUIRED
ssl_version=PROTOCOL_SSLv23
do_handshake_on_connect=True
suppress_ragged_eofs=True
username=John Doe
password=secret1
[jane_doe]
host=127.0.0.1
port=5696
certfile=/tmp/pykmip/certs/client_certificate_jane_doe.pem
keyfile=/tmp/pykmip/certs/client_key_jane_doe.pem
ca_certs=/tmp/pykmip/certs/root_certificate.pem
cert_reqs=CERT_REQUIRED
ssl_version=PROTOCOL_SSLv23
do_handshake_on_connect=True
suppress_ragged_eofs=True
username=Jane Doe
password=secret2
[john_smith]
host=127.0.0.1
port=5696
certfile=/tmp/pykmip/certs/client_certificate_john_smith.pem
keyfile=/tmp/pykmip/certs/client_key_john_smith.pem
ca_certs=/tmp/pykmip/certs/root_certificate.pem
cert_reqs=CERT_REQUIRED
ssl_version=PROTOCOL_SSLv23
do_handshake_on_connect=True
suppress_ragged_eofs=True
username=John Smith
password=secret3
[jane_smith]
host=127.0.0.1
port=5696
certfile=/tmp/pykmip/certs/client_certificate_jane_smith.pem
keyfile=/tmp/pykmip/certs/client_key_jane_smith.pem
ca_certs=/tmp/pykmip/certs/root_certificate.pem
cert_reqs=CERT_REQUIRED
ssl_version=PROTOCOL_SSLv23
do_handshake_on_connect=True
suppress_ragged_eofs=True
username=Jane Smith
password=secret4

View File

@ -0,0 +1,24 @@
{
"policy_1": {
"groups": {
"Group A": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"DESTROY": "ALLOW_ALL"
}
},
"Group B": {
"SYMMETRIC_KEY": {
"GET": "ALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
},
"default": {
"SYMMETRIC_KEY": {
"GET": "DISALLOW_ALL",
"DESTROY": "DISALLOW_ALL"
}
}
}
}

View File

@ -0,0 +1,19 @@
[server]
hostname=127.0.0.1
port=5696
certificate_path=/tmp/pykmip/certs/server_certificate.pem
key_path=/tmp/pykmip/certs/server_key.pem
ca_path=/tmp/pykmip/certs/root_certificate.pem
auth_suite=Basic
policy_path=/tmp/pykmip/policies
enable_tls_client_auth=True
tls_cipher_suites=
TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256
AES128-SHA256
TLS_RSA_WITH_AES_256_CBC_SHA256
AES256-SHA256
logging_level=DEBUG
[auth:slugs]
enabled=True
url=http://127.0.0.1:8080/slugs/

View File

@ -0,0 +1,12 @@
[global]
environment = 'production'
server.socket_host = '127.0.0.1'
server.socket_port = 8080
log.access_file = '/tmp/slugs/access.log'
log.error_file = '/tmp/slugs/error.log'
[data]
user_group_mapping = '/tmp/slugs/user_group_mapping.csv'
[/slugs]
tools.trailing_slash.on = True

View File

@ -0,0 +1,4 @@
John Doe,Group A
Jane Doe,Group A
Jane Doe,Group B
John Smith,Group B
1 John Doe Group A
2 Jane Doe Group A
3 Jane Doe Group B
4 John Smith Group B

View File

@ -16,6 +16,21 @@ if [[ "${RUN_INTEGRATION_TESTS}" == "1" ]]; then
sudo chmod 777 /var/log/pykmip
python ./bin/run_server.py &
tox -e integration -- --config client
elif [[ "${RUN_INTEGRATION_TESTS}" == "2" ]]; then
# Set up the SLUGS instance
cp -r ./.travis/functional/slugs /tmp/
slugs -c /tmp/slugs/slugs.conf &
# Set up the PyKMIP server
cp -r ./.travis/functional/pykmip /tmp/
python ./bin/create_certificates.py
mv *.pem /tmp/pykmip/certs/
sudo mkdir /var/log/pykmip
sudo chmod 777 /var/log/pykmip
pykmip-server -f /tmp/pykmip/server.conf -l /tmp/pykmip/server.log &
# Run the functional tests
tox -e functional -- --config-file /tmp/pykmip/client.conf
else
tox
fi

View File

View File

@ -0,0 +1,29 @@
# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import pytest
def pytest_addoption(parser):
parser.addoption(
"--config-file",
action="store",
help="Config file path for client configuration settings"
)
@pytest.fixture(scope="class")
def config_file(request):
request.cls.config_file = request.config.getoption("--config-file")

View File

@ -0,0 +1,263 @@
# Copyright (c) 2018 The Johns Hopkins University/Applied Physics Laboratory
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import pytest
import six
import testtools
import time
from kmip.core import enums
from kmip.pie import client
from kmip.pie import exceptions
from kmip.pie import objects
@pytest.mark.usefixtures("config_file")
class TestSLUGSAuthenticationAndAccessControl(testtools.TestCase):
def setUp(self):
super(TestSLUGSAuthenticationAndAccessControl, self).setUp()
self.client_john_doe = client.ProxyKmipClient(
config='john_doe',
config_file=self.config_file
)
self.client_jane_doe = client.ProxyKmipClient(
config='jane_doe',
config_file=self.config_file
)
self.client_john_smith = client.ProxyKmipClient(
config='john_smith',
config_file=self.config_file
)
self.client_jane_smith = client.ProxyKmipClient(
config='jane_smith',
config_file=self.config_file
)
def tearDown(self):
super(TestSLUGSAuthenticationAndAccessControl, self).tearDown()
def test_group_level_access_control(self):
"""
Test that:
1. a user in Group A can create and retrieve a symmetric key
2. a user in Group B can also retrieve the same symmetric key
3. a user in both Groups can also retrieve the same symmetric key
4. a user in Group B cannot destroy the same symmetric key, and
5. a user in Group A can destroy the same symmetric key.
"""
with self.client_john_doe as c:
uid = c.create(
enums.CryptographicAlgorithm.AES,
256,
operation_policy_name="policy_1"
)
self.assertIsInstance(uid, six.string_types)
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
with self.client_jane_doe as c:
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
with self.client_john_smith as c:
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
self.assertRaises(exceptions.KmipOperationFailure, c.destroy, uid)
with self.client_john_doe as c:
c.destroy(uid)
self.assertRaises(
exceptions.KmipOperationFailure, c.get, uid)
self.assertRaises(
exceptions.KmipOperationFailure, c.destroy, uid)
def test_policy_live_loading(self):
"""
Test that:
1. a user in Group A can create and retrieve a symmetric key
2. a user in Group B can also retrieve the same symmetric key
3. a user in Group B cannot destroy the same symmetric key
4. a policy is uploaded if created after server start up
5. a user in Group A cannot retrieve the same symmetric key, and
6. a user in Group B can destroy the same symmetric key.
"""
with self.client_john_doe as c:
uid = c.create(
enums.CryptographicAlgorithm.AES,
256,
operation_policy_name="policy_1"
)
self.assertIsInstance(uid, six.string_types)
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
with self.client_john_smith as c:
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
self.assertRaises(exceptions.KmipOperationFailure, c.destroy, uid)
with open("/tmp/pykmip/policies/policy_overwrite.json", "w") as f:
f.write('{\n')
f.write(' "policy_1": {\n')
f.write(' "groups": {\n')
f.write(' "Group A": {\n')
f.write(' "SYMMETRIC_KEY": {\n')
f.write(' "GET": "DISALLOW_ALL",\n')
f.write(' "DESTROY": "DISALLOW_ALL"\n')
f.write(' }\n')
f.write(' },\n')
f.write(' "Group B": {\n')
f.write(' "SYMMETRIC_KEY": {\n')
f.write(' "GET": "ALLOW_ALL",\n')
f.write(' "DESTROY": "ALLOW_ALL"\n')
f.write(' }\n')
f.write(' }\n')
f.write(' }\n')
f.write(' }\n')
f.write('}\n')
time.sleep(1)
with self.client_john_doe as c:
self.assertRaises(exceptions.KmipOperationFailure, c.get, uid)
self.assertRaises(exceptions.KmipOperationFailure, c.destroy, uid)
with self.client_john_smith as c:
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
c.destroy(uid)
self.assertRaises(
exceptions.KmipOperationFailure, c.get, uid)
self.assertRaises(
exceptions.KmipOperationFailure, c.destroy, uid)
os.remove("/tmp/pykmip/policies/policy_overwrite.json")
time.sleep(1)
def test_policy_caching(self):
"""
Test that:
1. a user in Group A can create and retrieve a symmetric key
2. a policy is uploaded if created after server start up
3. a user in Group A cannot retrieve or destroy the same symmetric key
4. the original policy is restored after the new policy is removed, and
5. a user in Group A can retrieve and destroy the same symmetric key.
"""
with self.client_john_doe as c:
uid = c.create(
enums.CryptographicAlgorithm.AES,
256,
operation_policy_name="policy_1"
)
self.assertIsInstance(uid, six.string_types)
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
with open("/tmp/pykmip/policies/policy_caching.json", "w") as f:
f.write('{\n')
f.write(' "policy_1": {\n')
f.write(' "groups": {\n')
f.write(' "Group A": {\n')
f.write(' "SYMMETRIC_KEY": {\n')
f.write(' "GET": "DISALLOW_ALL",\n')
f.write(' "DESTROY": "DISALLOW_ALL"\n')
f.write(' }\n')
f.write(' }\n')
f.write(' }\n')
f.write(' }\n')
f.write('}\n')
time.sleep(1)
self.assertRaises(exceptions.KmipOperationFailure, c.get, uid)
self.assertRaises(exceptions.KmipOperationFailure, c.destroy, uid)
os.remove("/tmp/pykmip/policies/policy_caching.json")
time.sleep(1)
key = c.get(uid)
self.assertIsInstance(key, objects.SymmetricKey)
self.assertEqual(
key.cryptographic_algorithm,
enums.CryptographicAlgorithm.AES)
self.assertEqual(key.cryptographic_length, 256)
c.destroy(uid)
self.assertRaises(
exceptions.KmipOperationFailure, c.get, uid)
self.assertRaises(
exceptions.KmipOperationFailure, c.destroy, uid)
def test_authenticating_unrecognized_user(self):
"""
Test that an unrecognized user is blocked from submitting a request.
"""
with open("/tmp/slugs/user_group_mapping.csv", "w") as f:
f.write('Jane Doe,Group A\n')
f.write('Jane Doe,Group B\n')
f.write('John Smith,Group B\n')
time.sleep(1)
args = (enums.CryptographicAlgorithm.AES, 256)
kwargs = {'operation_policy_name': 'policy_1'}
with self.client_john_doe as c:
self.assertRaises(
exceptions.KmipOperationFailure,
c.create,
*args,
**kwargs
)
with open("/tmp/slugs/user_group_mapping.csv", "w") as f:
f.write('John Doe,Group A\n')
f.write('Jane Doe,Group A\n')
f.write('Jane Doe,Group B\n')
f.write('John Smith,Group B\n')
time.sleep(1)

View File

@ -1,5 +1,5 @@
cryptography>=1.3
enum34
requests
six>=1.9.0
six>=1.11.0
sqlalchemy>=1.0

View File

@ -5,6 +5,7 @@ testtools
fixtures
testresources
mock
slugs
testscenarios
testrepository
sphinx

View File

@ -22,6 +22,13 @@ basepython=python2.7
commands =
py.test --strict kmip/tests/integration -m "not ignore" {posargs}
[testenv:functional]
# Note: This requires local access to instances of the PyKMIP server and SLUGS.
deps = {[testenv]deps}
basepython=python2.7
commands =
py.test --strict kmip/tests/functional -m "not ignore" {posargs}
[testenv:bandit]
deps = {[testenv]deps}
commands = bandit -r kmip -n5 -x kmip/tests