2015-11-04 21:54:59 +01:00
|
|
|
|
# encoding: utf-8
|
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
2015-10-05 18:56:10 +02:00
|
|
|
|
from compose import utils
|
|
|
|
|
|
|
|
|
|
|
2015-11-16 18:35:26 +01:00
|
|
|
|
class TestJsonSplitter(object):
|
2015-10-05 18:56:10 +02:00
|
|
|
|
|
|
|
|
|
def test_json_splitter_no_object(self):
|
|
|
|
|
data = '{"foo": "bar'
|
2015-11-16 18:35:26 +01:00
|
|
|
|
assert utils.json_splitter(data) is None
|
2015-10-05 18:56:10 +02:00
|
|
|
|
|
|
|
|
|
def test_json_splitter_with_object(self):
|
|
|
|
|
data = '{"foo": "bar"}\n \n{"next": "obj"}'
|
2015-11-16 18:35:26 +01:00
|
|
|
|
assert utils.json_splitter(data) == ({'foo': 'bar'}, '{"next": "obj"}')
|
2015-11-04 21:54:59 +01:00
|
|
|
|
|
|
|
|
|
|
2015-11-16 18:35:26 +01:00
|
|
|
|
class TestStreamAsText(object):
|
2015-11-04 21:54:59 +01:00
|
|
|
|
|
|
|
|
|
def test_stream_with_non_utf_unicode_character(self):
|
|
|
|
|
stream = [b'\xed\xf3\xf3']
|
|
|
|
|
output, = utils.stream_as_text(stream)
|
|
|
|
|
assert output == '<EFBFBD><EFBFBD><EFBFBD>'
|
|
|
|
|
|
|
|
|
|
def test_stream_with_utf_character(self):
|
|
|
|
|
stream = ['ěĝ'.encode('utf-8')]
|
|
|
|
|
output, = utils.stream_as_text(stream)
|
|
|
|
|
assert output == 'ěĝ'
|
2015-11-16 18:35:26 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestJsonStream(object):
|
|
|
|
|
|
|
|
|
|
def test_with_falsy_entries(self):
|
|
|
|
|
stream = [
|
|
|
|
|
'{"one": "two"}\n{}\n',
|
|
|
|
|
"[1, 2, 3]\n[]\n",
|
|
|
|
|
]
|
|
|
|
|
output = list(utils.json_stream(stream))
|
|
|
|
|
assert output == [
|
|
|
|
|
{'one': 'two'},
|
|
|
|
|
{},
|
|
|
|
|
[1, 2, 3],
|
|
|
|
|
[],
|
|
|
|
|
]
|