Skip to content

Commit 63a2623

Browse files
committed
Add tests for _ZlibDecompressor
1 parent 9d60339 commit 63a2623

File tree

1 file changed

+168
-0
lines changed

1 file changed

+168
-0
lines changed

Lib/test/test_zlib.py

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ def test_wbits(self):
871871
)
872872
self.assertEqual(expected, actual)
873873

874+
874875
def choose_lines(source, number, seed=None, generator=random):
875876
"""Return a list of number lines randomly chosen from the source"""
876877
if seed is not None:
@@ -944,6 +945,173 @@ def choose_lines(source, number, seed=None, generator=random):
944945
"""
945946

946947

948+
class ZlibDecompressorTest():
949+
# Test adopted from test_bz2.py
950+
TEXT = HAMLET_SCENE
951+
DATA = zlib.compress(HAMLET_SCENE)
952+
BAD_DATA = b"Not a valid deflate block"
953+
def test_Constructor(self):
954+
self.assertRaises(TypeError, zlib._ZlibDecompressor, 42)
955+
956+
def testDecompress(self):
957+
zlibd = zlib._ZlibDecompressor()
958+
self.assertRaises(TypeError, zlibd.decompress)
959+
text = zlibd.decompress(self.DATA)
960+
self.assertEqual(text, self.TEXT)
961+
962+
def testDecompressChunks10(self):
963+
zlibd = zlib._ZlibDecompressor()
964+
text = b''
965+
n = 0
966+
while True:
967+
str = self.DATA[n*10:(n+1)*10]
968+
if not str:
969+
break
970+
text += zlibd.decompress(str)
971+
n += 1
972+
self.assertEqual(text, self.TEXT)
973+
974+
def testDecompressUnusedData(self):
975+
zlibd = zlib._ZlibDecompressor()
976+
unused_data = b"this is unused data"
977+
text = zlibd.decompress(self.DATA+unused_data)
978+
self.assertEqual(text, self.TEXT)
979+
self.assertEqual(zlibd.unused_data, unused_data)
980+
981+
def testEOFError(self):
982+
zlibd = zlib._ZlibDecompressor()
983+
text = zlibd.decompress(self.DATA)
984+
self.assertRaises(EOFError, zlibd.decompress, b"anything")
985+
self.assertRaises(EOFError, zlibd.decompress, b"")
986+
987+
@support.skip_if_pgo_task
988+
@bigmemtest(size=_4G + 100, memuse=3.3)
989+
def testDecompress4G(self, size):
990+
# "Test zlib._ZlibDecompressor.decompress() with >4GiB input"
991+
blocksize = 10 * 1024 * 1024
992+
block = random.randbytes(blocksize)
993+
try:
994+
data = block * (size // blocksize + 1)
995+
compressed = zlib.compress(data)
996+
zlibd = zlib._ZlibDecompressor()
997+
decompressed = zlibd.decompress(compressed)
998+
self.assertTrue(decompressed == data)
999+
finally:
1000+
data = None
1001+
compressed = None
1002+
decompressed = None
1003+
1004+
def testPickle(self):
1005+
for proto in range(pickle.HIGHEST_PROTOCOL + 1):
1006+
with self.assertRaises(TypeError):
1007+
pickle.dumps(zlib._ZlibDecompressor(), proto)
1008+
1009+
def testDecompressorChunksMaxsize(self):
1010+
zlibd = zlib._ZlibDecompressor()
1011+
max_length = 100
1012+
out = []
1013+
1014+
# Feed some input
1015+
len_ = len(self.BIG_DATA) - 64
1016+
out.append(zlibd.decompress(self.BIG_DATA[:len_],
1017+
max_length=max_length))
1018+
self.assertFalse(zlibd.needs_input)
1019+
self.assertEqual(len(out[-1]), max_length)
1020+
1021+
# Retrieve more data without providing more input
1022+
out.append(zlibd.decompress(b'', max_length=max_length))
1023+
self.assertFalse(zlibd.needs_input)
1024+
self.assertEqual(len(out[-1]), max_length)
1025+
1026+
# Retrieve more data while providing more input
1027+
out.append(zlibd.decompress(self.BIG_DATA[len_:],
1028+
max_length=max_length))
1029+
self.assertLessEqual(len(out[-1]), max_length)
1030+
1031+
# Retrieve remaining uncompressed data
1032+
while not zlibd.eof:
1033+
out.append(zlibd.decompress(b'', max_length=max_length))
1034+
self.assertLessEqual(len(out[-1]), max_length)
1035+
1036+
out = b"".join(out)
1037+
self.assertEqual(out, self.BIG_TEXT)
1038+
self.assertEqual(zlibd.unused_data, b"")
1039+
1040+
def test_decompressor_inputbuf_1(self):
1041+
# Test reusing input buffer after moving existing
1042+
# contents to beginning
1043+
zlibd = zlib._ZlibDecompressor()
1044+
out = []
1045+
1046+
# Create input buffer and fill it
1047+
self.assertEqual(zlibd.decompress(self.DATA[:100],
1048+
max_length=0), b'')
1049+
1050+
# Retrieve some results, freeing capacity at beginning
1051+
# of input buffer
1052+
out.append(zlibd.decompress(b'', 2))
1053+
1054+
# Add more data that fits into input buffer after
1055+
# moving existing data to beginning
1056+
out.append(zlibd.decompress(self.DATA[100:105], 15))
1057+
1058+
# Decompress rest of data
1059+
out.append(zlibd.decompress(self.DATA[105:]))
1060+
self.assertEqual(b''.join(out), self.TEXT)
1061+
1062+
def test_decompressor_inputbuf_2(self):
1063+
# Test reusing input buffer by appending data at the
1064+
# end right away
1065+
zlibd = zlib._ZlibDecompressor()
1066+
out = []
1067+
1068+
# Create input buffer and empty it
1069+
self.assertEqual(zlibd.decompress(self.DATA[:200],
1070+
max_length=0), b'')
1071+
out.append(zlibd.decompress(b''))
1072+
1073+
# Fill buffer with new data
1074+
out.append(zlibd.decompress(self.DATA[200:280], 2))
1075+
1076+
# Append some more data, not enough to require resize
1077+
out.append(zlibd.decompress(self.DATA[280:300], 2))
1078+
1079+
# Decompress rest of data
1080+
out.append(zlibd.decompress(self.DATA[300:]))
1081+
self.assertEqual(b''.join(out), self.TEXT)
1082+
1083+
def test_decompressor_inputbuf_3(self):
1084+
# Test reusing input buffer after extending it
1085+
1086+
zlibd = zlib._ZlibDecompressor()
1087+
out = []
1088+
1089+
# Create almost full input buffer
1090+
out.append(zlibd.decompress(self.DATA[:200], 5))
1091+
1092+
# Add even more data to it, requiring resize
1093+
out.append(zlibd.decompress(self.DATA[200:300], 5))
1094+
1095+
# Decompress rest of data
1096+
out.append(zlibd.decompress(self.DATA[300:]))
1097+
self.assertEqual(b''.join(out), self.TEXT)
1098+
1099+
def test_failure(self):
1100+
zlibd = zlib._ZlibDecompressor()
1101+
self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
1102+
# Previously, a second call could crash due to internal inconsistency
1103+
self.assertRaises(Exception, zlibd.decompress, self.BAD_DATA * 30)
1104+
1105+
@support.refcount_test
1106+
def test_refleaks_in___init__(self):
1107+
gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
1108+
zlibd = zlib._ZlibDecompressor()
1109+
refs_before = gettotalrefcount()
1110+
for i in range(100):
1111+
zlibd.__init__()
1112+
self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
1113+
1114+
9471115
class CustomInt:
9481116
def __index__(self):
9491117
return 100

0 commit comments

Comments
 (0)