Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | import unittest |
2 | from test import test_support | |
3 | import zlib | |
4 | import random | |
5 | ||
6 | # print test_support.TESTFN | |
7 | ||
8 | def getbuf(): | |
9 | # This was in the original. Avoid non-repeatable sources. | |
10 | # Left here (unused) in case something wants to be done with it. | |
11 | import imp | |
12 | try: | |
13 | t = imp.find_module('test_zlib') | |
14 | file = t[0] | |
15 | except ImportError: | |
16 | file = open(__file__) | |
17 | buf = file.read() * 8 | |
18 | file.close() | |
19 | return buf | |
20 | ||
21 | ||
22 | ||
23 | class ChecksumTestCase(unittest.TestCase): | |
24 | # checksum test cases | |
25 | def test_crc32start(self): | |
26 | self.assertEqual(zlib.crc32(""), zlib.crc32("", 0)) | |
27 | ||
28 | def test_crc32empty(self): | |
29 | self.assertEqual(zlib.crc32("", 0), 0) | |
30 | self.assertEqual(zlib.crc32("", 1), 1) | |
31 | self.assertEqual(zlib.crc32("", 432), 432) | |
32 | ||
33 | def test_adler32start(self): | |
34 | self.assertEqual(zlib.adler32(""), zlib.adler32("", 1)) | |
35 | ||
36 | def test_adler32empty(self): | |
37 | self.assertEqual(zlib.adler32("", 0), 0) | |
38 | self.assertEqual(zlib.adler32("", 1), 1) | |
39 | self.assertEqual(zlib.adler32("", 432), 432) | |
40 | ||
41 | def assertEqual32(self, seen, expected): | |
42 | # 32-bit values masked -- checksums on 32- vs 64- bit machines | |
43 | # This is important if bit 31 (0x08000000L) is set. | |
44 | self.assertEqual(seen & 0x0FFFFFFFFL, expected & 0x0FFFFFFFFL) | |
45 | ||
46 | def test_penguins(self): | |
47 | self.assertEqual32(zlib.crc32("penguin", 0), 0x0e5c1a120L) | |
48 | self.assertEqual32(zlib.crc32("penguin", 1), 0x43b6aa94) | |
49 | self.assertEqual32(zlib.adler32("penguin", 0), 0x0bcf02f6) | |
50 | self.assertEqual32(zlib.adler32("penguin", 1), 0x0bd602f7) | |
51 | ||
52 | self.assertEqual(zlib.crc32("penguin"), zlib.crc32("penguin", 0)) | |
53 | self.assertEqual(zlib.adler32("penguin"),zlib.adler32("penguin",1)) | |
54 | ||
55 | ||
56 | ||
57 | class ExceptionTestCase(unittest.TestCase): | |
58 | # make sure we generate some expected errors | |
59 | def test_bigbits(self): | |
60 | # specifying total bits too large causes an error | |
61 | self.assertRaises(zlib.error, | |
62 | zlib.compress, 'ERROR', zlib.MAX_WBITS + 1) | |
63 | ||
64 | def test_badcompressobj(self): | |
65 | # verify failure on building compress object with bad params | |
66 | self.assertRaises(ValueError, zlib.compressobj, 1, zlib.DEFLATED, 0) | |
67 | ||
68 | def test_baddecompressobj(self): | |
69 | # verify failure on building decompress object with bad params | |
70 | self.assertRaises(ValueError, zlib.decompressobj, 0) | |
71 | ||
72 | ||
73 | ||
74 | class CompressTestCase(unittest.TestCase): | |
75 | # Test compression in one go (whole message compression) | |
76 | def test_speech(self): | |
77 | x = zlib.compress(HAMLET_SCENE) | |
78 | self.assertEqual(zlib.decompress(x), HAMLET_SCENE) | |
79 | ||
80 | def test_speech128(self): | |
81 | # compress more data | |
82 | data = HAMLET_SCENE * 128 | |
83 | x = zlib.compress(data) | |
84 | self.assertEqual(zlib.decompress(x), data) | |
85 | ||
86 | ||
87 | ||
88 | ||
89 | class CompressObjectTestCase(unittest.TestCase): | |
90 | # Test compression object | |
91 | def test_pair(self): | |
92 | # straightforward compress/decompress objects | |
93 | data = HAMLET_SCENE * 128 | |
94 | co = zlib.compressobj() | |
95 | x1 = co.compress(data) | |
96 | x2 = co.flush() | |
97 | self.assertRaises(zlib.error, co.flush) # second flush should not work | |
98 | dco = zlib.decompressobj() | |
99 | y1 = dco.decompress(x1 + x2) | |
100 | y2 = dco.flush() | |
101 | self.assertEqual(data, y1 + y2) | |
102 | ||
103 | def test_compressoptions(self): | |
104 | # specify lots of options to compressobj() | |
105 | level = 2 | |
106 | method = zlib.DEFLATED | |
107 | wbits = -12 | |
108 | memlevel = 9 | |
109 | strategy = zlib.Z_FILTERED | |
110 | co = zlib.compressobj(level, method, wbits, memlevel, strategy) | |
111 | x1 = co.compress(HAMLET_SCENE) | |
112 | x2 = co.flush() | |
113 | dco = zlib.decompressobj(wbits) | |
114 | y1 = dco.decompress(x1 + x2) | |
115 | y2 = dco.flush() | |
116 | self.assertEqual(HAMLET_SCENE, y1 + y2) | |
117 | ||
118 | def test_compressincremental(self): | |
119 | # compress object in steps, decompress object as one-shot | |
120 | data = HAMLET_SCENE * 128 | |
121 | co = zlib.compressobj() | |
122 | bufs = [] | |
123 | for i in range(0, len(data), 256): | |
124 | bufs.append(co.compress(data[i:i+256])) | |
125 | bufs.append(co.flush()) | |
126 | combuf = ''.join(bufs) | |
127 | ||
128 | dco = zlib.decompressobj() | |
129 | y1 = dco.decompress(''.join(bufs)) | |
130 | y2 = dco.flush() | |
131 | self.assertEqual(data, y1 + y2) | |
132 | ||
133 | def test_decompinc(self, flush=False, source=None, cx=256, dcx=64): | |
134 | # compress object in steps, decompress object in steps | |
135 | source = source or HAMLET_SCENE | |
136 | data = source * 128 | |
137 | co = zlib.compressobj() | |
138 | bufs = [] | |
139 | for i in range(0, len(data), cx): | |
140 | bufs.append(co.compress(data[i:i+cx])) | |
141 | bufs.append(co.flush()) | |
142 | combuf = ''.join(bufs) | |
143 | ||
144 | self.assertEqual(data, zlib.decompress(combuf)) | |
145 | ||
146 | dco = zlib.decompressobj() | |
147 | bufs = [] | |
148 | for i in range(0, len(combuf), dcx): | |
149 | bufs.append(dco.decompress(combuf[i:i+dcx])) | |
150 | self.assertEqual('', dco.unconsumed_tail, ######## | |
151 | "(A) uct should be '': not %d long" % | |
152 | len(dco.unconsumed_tail)) | |
153 | if flush: | |
154 | bufs.append(dco.flush()) | |
155 | else: | |
156 | while True: | |
157 | chunk = dco.decompress('') | |
158 | if chunk: | |
159 | bufs.append(chunk) | |
160 | else: | |
161 | break | |
162 | self.assertEqual('', dco.unconsumed_tail, ######## | |
163 | "(B) uct should be '': not %d long" % | |
164 | len(dco.unconsumed_tail)) | |
165 | self.assertEqual(data, ''.join(bufs)) | |
166 | # Failure means: "decompressobj with init options failed" | |
167 | ||
168 | def test_decompincflush(self): | |
169 | self.test_decompinc(flush=True) | |
170 | ||
171 | def test_decompimax(self, source=None, cx=256, dcx=64): | |
172 | # compress in steps, decompress in length-restricted steps | |
173 | source = source or HAMLET_SCENE | |
174 | # Check a decompression object with max_length specified | |
175 | data = source * 128 | |
176 | co = zlib.compressobj() | |
177 | bufs = [] | |
178 | for i in range(0, len(data), cx): | |
179 | bufs.append(co.compress(data[i:i+cx])) | |
180 | bufs.append(co.flush()) | |
181 | combuf = ''.join(bufs) | |
182 | self.assertEqual(data, zlib.decompress(combuf), | |
183 | 'compressed data failure') | |
184 | ||
185 | dco = zlib.decompressobj() | |
186 | bufs = [] | |
187 | cb = combuf | |
188 | while cb: | |
189 | #max_length = 1 + len(cb)//10 | |
190 | chunk = dco.decompress(cb, dcx) | |
191 | self.failIf(len(chunk) > dcx, | |
192 | 'chunk too big (%d>%d)' % (len(chunk), dcx)) | |
193 | bufs.append(chunk) | |
194 | cb = dco.unconsumed_tail | |
195 | bufs.append(dco.flush()) | |
196 | self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') | |
197 | ||
198 | def test_decompressmaxlen(self, flush=False): | |
199 | # Check a decompression object with max_length specified | |
200 | data = HAMLET_SCENE * 128 | |
201 | co = zlib.compressobj() | |
202 | bufs = [] | |
203 | for i in range(0, len(data), 256): | |
204 | bufs.append(co.compress(data[i:i+256])) | |
205 | bufs.append(co.flush()) | |
206 | combuf = ''.join(bufs) | |
207 | self.assertEqual(data, zlib.decompress(combuf), | |
208 | 'compressed data failure') | |
209 | ||
210 | dco = zlib.decompressobj() | |
211 | bufs = [] | |
212 | cb = combuf | |
213 | while cb: | |
214 | max_length = 1 + len(cb)//10 | |
215 | chunk = dco.decompress(cb, max_length) | |
216 | self.failIf(len(chunk) > max_length, | |
217 | 'chunk too big (%d>%d)' % (len(chunk),max_length)) | |
218 | bufs.append(chunk) | |
219 | cb = dco.unconsumed_tail | |
220 | if flush: | |
221 | bufs.append(dco.flush()) | |
222 | else: | |
223 | while chunk: | |
224 | chunk = dco.decompress('', max_length) | |
225 | self.failIf(len(chunk) > max_length, | |
226 | 'chunk too big (%d>%d)' % (len(chunk),max_length)) | |
227 | bufs.append(chunk) | |
228 | self.assertEqual(data, ''.join(bufs), 'Wrong data retrieved') | |
229 | ||
230 | def test_decompressmaxlenflush(self): | |
231 | self.test_decompressmaxlen(flush=True) | |
232 | ||
233 | def test_maxlenmisc(self): | |
234 | # Misc tests of max_length | |
235 | dco = zlib.decompressobj() | |
236 | self.assertRaises(ValueError, dco.decompress, "", -1) | |
237 | self.assertEqual('', dco.unconsumed_tail) | |
238 | ||
239 | def test_flushes(self): | |
240 | # Test flush() with the various options, using all the | |
241 | # different levels in order to provide more variations. | |
242 | sync_opt = ['Z_NO_FLUSH', 'Z_SYNC_FLUSH', 'Z_FULL_FLUSH'] | |
243 | sync_opt = [getattr(zlib, opt) for opt in sync_opt | |
244 | if hasattr(zlib, opt)] | |
245 | data = HAMLET_SCENE * 8 | |
246 | ||
247 | for sync in sync_opt: | |
248 | for level in range(10): | |
249 | obj = zlib.compressobj( level ) | |
250 | a = obj.compress( data[:3000] ) | |
251 | b = obj.flush( sync ) | |
252 | c = obj.compress( data[3000:] ) | |
253 | d = obj.flush() | |
254 | self.assertEqual(zlib.decompress(''.join([a,b,c,d])), | |
255 | data, ("Decompress failed: flush " | |
256 | "mode=%i, level=%i") % (sync, level)) | |
257 | del obj | |
258 | ||
259 | def test_odd_flush(self): | |
260 | # Test for odd flushing bugs noted in 2.0, and hopefully fixed in 2.1 | |
261 | import random | |
262 | ||
263 | if hasattr(zlib, 'Z_SYNC_FLUSH'): | |
264 | # Testing on 17K of "random" data | |
265 | ||
266 | # Create compressor and decompressor objects | |
267 | co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) | |
268 | dco = zlib.decompressobj() | |
269 | ||
270 | # Try 17K of data | |
271 | # generate random data stream | |
272 | try: | |
273 | # In 2.3 and later, WichmannHill is the RNG of the bug report | |
274 | gen = random.WichmannHill() | |
275 | except AttributeError: | |
276 | try: | |
277 | # 2.2 called it Random | |
278 | gen = random.Random() | |
279 | except AttributeError: | |
280 | # others might simply have a single RNG | |
281 | gen = random | |
282 | gen.seed(1) | |
283 | data = genblock(1, 17 * 1024, generator=gen) | |
284 | ||
285 | # compress, sync-flush, and decompress | |
286 | first = co.compress(data) | |
287 | second = co.flush(zlib.Z_SYNC_FLUSH) | |
288 | expanded = dco.decompress(first + second) | |
289 | ||
290 | # if decompressed data is different from the input data, choke. | |
291 | self.assertEqual(expanded, data, "17K random source doesn't match") | |
292 | ||
293 | def test_empty_flush(self): | |
294 | # Test that calling .flush() on unused objects works. | |
295 | # (Bug #1083110 -- calling .flush() on decompress objects | |
296 | # caused a core dump.) | |
297 | ||
298 | co = zlib.compressobj(zlib.Z_BEST_COMPRESSION) | |
299 | self.failUnless(co.flush()) # Returns a zlib header | |
300 | dco = zlib.decompressobj() | |
301 | self.assertEqual(dco.flush(), "") # Returns nothing | |
302 | ||
303 | ||
304 | def genblock(seed, length, step=1024, generator=random): | |
305 | """length-byte stream of random data from a seed (in step-byte blocks).""" | |
306 | if seed is not None: | |
307 | generator.seed(seed) | |
308 | randint = generator.randint | |
309 | if length < step or step < 2: | |
310 | step = length | |
311 | blocks = [] | |
312 | for i in range(0, length, step): | |
313 | blocks.append(''.join([chr(randint(0,255)) | |
314 | for x in range(step)])) | |
315 | return ''.join(blocks)[:length] | |
316 | ||
317 | ||
318 | ||
319 | def choose_lines(source, number, seed=None, generator=random): | |
320 | """Return a list of number lines randomly chosen from the source""" | |
321 | if seed is not None: | |
322 | generator.seed(seed) | |
323 | sources = source.split('\n') | |
324 | return [generator.choice(sources) for n in range(number)] | |
325 | ||
326 | ||
327 | ||
328 | HAMLET_SCENE = """ | |
329 | LAERTES | |
330 | ||
331 | O, fear me not. | |
332 | I stay too long: but here my father comes. | |
333 | ||
334 | Enter POLONIUS | |
335 | ||
336 | A double blessing is a double grace, | |
337 | Occasion smiles upon a second leave. | |
338 | ||
339 | LORD POLONIUS | |
340 | ||
341 | Yet here, Laertes! aboard, aboard, for shame! | |
342 | The wind sits in the shoulder of your sail, | |
343 | And you are stay'd for. There; my blessing with thee! | |
344 | And these few precepts in thy memory | |
345 | See thou character. Give thy thoughts no tongue, | |
346 | Nor any unproportioned thought his act. | |
347 | Be thou familiar, but by no means vulgar. | |
348 | Those friends thou hast, and their adoption tried, | |
349 | Grapple them to thy soul with hoops of steel; | |
350 | But do not dull thy palm with entertainment | |
351 | Of each new-hatch'd, unfledged comrade. Beware | |
352 | Of entrance to a quarrel, but being in, | |
353 | Bear't that the opposed may beware of thee. | |
354 | Give every man thy ear, but few thy voice; | |
355 | Take each man's censure, but reserve thy judgment. | |
356 | Costly thy habit as thy purse can buy, | |
357 | But not express'd in fancy; rich, not gaudy; | |
358 | For the apparel oft proclaims the man, | |
359 | And they in France of the best rank and station | |
360 | Are of a most select and generous chief in that. | |
361 | Neither a borrower nor a lender be; | |
362 | For loan oft loses both itself and friend, | |
363 | And borrowing dulls the edge of husbandry. | |
364 | This above all: to thine ownself be true, | |
365 | And it must follow, as the night the day, | |
366 | Thou canst not then be false to any man. | |
367 | Farewell: my blessing season this in thee! | |
368 | ||
369 | LAERTES | |
370 | ||
371 | Most humbly do I take my leave, my lord. | |
372 | ||
373 | LORD POLONIUS | |
374 | ||
375 | The time invites you; go; your servants tend. | |
376 | ||
377 | LAERTES | |
378 | ||
379 | Farewell, Ophelia; and remember well | |
380 | What I have said to you. | |
381 | ||
382 | OPHELIA | |
383 | ||
384 | 'Tis in my memory lock'd, | |
385 | And you yourself shall keep the key of it. | |
386 | ||
387 | LAERTES | |
388 | ||
389 | Farewell. | |
390 | """ | |
391 | ||
392 | ||
393 | def test_main(): | |
394 | test_support.run_unittest( | |
395 | ChecksumTestCase, | |
396 | ExceptionTestCase, | |
397 | CompressTestCase, | |
398 | CompressObjectTestCase | |
399 | ) | |
400 | ||
401 | if __name__ == "__main__": | |
402 | test_main() | |
403 | ||
404 | def test(tests=''): | |
405 | if not tests: tests = 'o' | |
406 | testcases = [] | |
407 | if 'k' in tests: testcases.append(ChecksumTestCase) | |
408 | if 'x' in tests: testcases.append(ExceptionTestCase) | |
409 | if 'c' in tests: testcases.append(CompressTestCase) | |
410 | if 'o' in tests: testcases.append(CompressObjectTestCase) | |
411 | test_support.run_unittest(*testcases) | |
412 | ||
413 | if False: | |
414 | import sys | |
415 | sys.path.insert(1, '/Py23Src/python/dist/src/Lib/test') | |
416 | import test_zlib as tz | |
417 | ts, ut = tz.test_support, tz.unittest | |
418 | su = ut.TestSuite() | |
419 | su.addTest(ut.makeSuite(tz.CompressTestCase)) | |
420 | ts.run_suite(su) |