Add initial API and tests for stream-based resource reading

For now the stream-based API is a simple BytesIO wrapper around
data/data_raw, but it will be optimized in the future.
This commit is contained in:
dgelessus
2020-07-21 14:12:09 +02:00
parent 0f6018e4bf
commit 61247ec783
3 changed files with 78 additions and 1 deletions
+39
View File
@@ -129,7 +129,36 @@ class ResourceFileReadTests(unittest.TestCase):
self.assertEqual(actual_res.name, None)
self.assertEqual(actual_res.attributes, rsrcfork.ResourceAttrs(0))
self.assertEqual(actual_res.data, expected_data)
with actual_res.open() as f:
self.assertEqual(f.read(10), expected_data[:10])
self.assertEqual(f.read(5), expected_data[10:15])
self.assertEqual(f.read(), expected_data[15:])
f.seek(0)
self.assertEqual(f.read(), expected_data)
self.assertEqual(actual_res.compressed_info, None)
actual_res_1 = rf[b"TEXT"][256]
expected_data_1 = TEXTCLIPPING_RESOURCES[b"TEXT"][256]
actual_res_2 = rf[b"utxt"][256]
expected_data_2 = TEXTCLIPPING_RESOURCES[b"utxt"][256]
with self.subTest(stream_test="multiple streams for the same resource"):
with actual_res_1.open() as f1, actual_res_1.open() as f2:
f1.seek(5)
f2.seek(10)
self.assertEqual(f1.read(10), expected_data_1[5:15])
self.assertEqual(f2.read(10), expected_data_1[10:20])
self.assertEqual(f1.read(), expected_data_1[15:])
self.assertEqual(f2.read(), expected_data_1[20:])
with self.subTest(stream_test="multiple streams for different resources"):
with actual_res_1.open() as f1, actual_res_2.open() as f2:
f1.seek(5)
f2.seek(10)
self.assertEqual(f1.read(10), expected_data_1[5:15])
self.assertEqual(f2.read(10), expected_data_2[10:20])
self.assertEqual(f1.read(), expected_data_1[15:])
self.assertEqual(f2.read(), expected_data_2[20:])
def test_textclipping_seekable_stream(self) -> None:
with TEXTCLIPPING_RSRC_FILE.open("rb") as f:
@@ -239,6 +268,8 @@ class ResourceFileReadTests(unittest.TestCase):
self.assertEqual(actual_res.name, expected_name)
self.assertEqual(actual_res.attributes, expected_attrs)
self.assertEqual(actual_res.data, expected_data)
with actual_res.open() as f:
self.assertEqual(f.read(), expected_data)
self.assertEqual(actual_res.compressed_info, None)
def test_compress_compare(self) -> None:
@@ -274,6 +305,14 @@ class ResourceFileReadTests(unittest.TestCase):
# The compressed resource's (automatically decompressed) data must match the uncompressed data.
self.assertEqual(compressed_res.data, uncompressed_res.data)
self.assertEqual(compressed_res.length, uncompressed_res.length)
with compressed_res.open() as compressed_f, uncompressed_res.open() as uncompressed_f:
compressed_f.seek(15)
uncompressed_f.seek(15)
self.assertEqual(compressed_f.read(10), uncompressed_f.read(10))
self.assertEqual(compressed_f.read(), uncompressed_f.read())
compressed_f.seek(0)
uncompressed_f.seek(0)
self.assertEqual(compressed_f.read(), uncompressed_f.read())
if rsrcfork.ResourceAttrs.resCompressed in compressed_res.attributes:
# Resources with the compressed attribute must expose correct compression metadata.