Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | import sys |
2 | import os | |
3 | import shutil | |
4 | import tempfile | |
5 | ||
6 | import unittest | |
7 | import tarfile | |
8 | ||
9 | from test import test_support | |
10 | ||
11 | # Check for our compression modules. | |
12 | try: | |
13 | import gzip | |
14 | gzip.GzipFile | |
15 | except (ImportError, AttributeError): | |
16 | gzip = None | |
17 | try: | |
18 | import bz2 | |
19 | except ImportError: | |
20 | bz2 = None | |
21 | ||
22 | def path(path): | |
23 | return test_support.findfile(path) | |
24 | ||
25 | testtar = path("testtar.tar") | |
26 | tempdir = os.path.join(tempfile.gettempdir(), "testtar" + os.extsep + "dir") | |
27 | tempname = test_support.TESTFN | |
28 | membercount = 10 | |
29 | ||
30 | def tarname(comp=""): | |
31 | if not comp: | |
32 | return testtar | |
33 | return os.path.join(tempdir, "%s%s%s" % (testtar, os.extsep, comp)) | |
34 | ||
35 | def dirname(): | |
36 | if not os.path.exists(tempdir): | |
37 | os.mkdir(tempdir) | |
38 | return tempdir | |
39 | ||
40 | def tmpname(): | |
41 | return tempname | |
42 | ||
43 | ||
44 | class BaseTest(unittest.TestCase): | |
45 | comp = '' | |
46 | mode = 'r' | |
47 | sep = ':' | |
48 | ||
49 | def setUp(self): | |
50 | mode = self.mode + self.sep + self.comp | |
51 | self.tar = tarfile.open(tarname(self.comp), mode) | |
52 | ||
53 | def tearDown(self): | |
54 | self.tar.close() | |
55 | ||
56 | class ReadTest(BaseTest): | |
57 | ||
58 | def test(self): | |
59 | """Test member extraction. | |
60 | """ | |
61 | members = 0 | |
62 | for tarinfo in self.tar: | |
63 | members += 1 | |
64 | if not tarinfo.isreg(): | |
65 | continue | |
66 | f = self.tar.extractfile(tarinfo) | |
67 | self.assert_(len(f.read()) == tarinfo.size, | |
68 | "size read does not match expected size") | |
69 | f.close() | |
70 | ||
71 | self.assert_(members == membercount, | |
72 | "could not find all members") | |
73 | ||
74 | def test_sparse(self): | |
75 | """Test sparse member extraction. | |
76 | """ | |
77 | if self.sep != "|": | |
78 | f1 = self.tar.extractfile("S-SPARSE") | |
79 | f2 = self.tar.extractfile("S-SPARSE-WITH-NULLS") | |
80 | self.assert_(f1.read() == f2.read(), | |
81 | "_FileObject failed on sparse file member") | |
82 | ||
83 | def test_readlines(self): | |
84 | """Test readlines() method of _FileObject. | |
85 | """ | |
86 | if self.sep != "|": | |
87 | filename = "0-REGTYPE-TEXT" | |
88 | self.tar.extract(filename, dirname()) | |
89 | lines1 = file(os.path.join(dirname(), filename), "rU").readlines() | |
90 | lines2 = self.tar.extractfile(filename).readlines() | |
91 | self.assert_(lines1 == lines2, | |
92 | "_FileObject.readline() does not work correctly") | |
93 | ||
94 | def test_seek(self): | |
95 | """Test seek() method of _FileObject, incl. random reading. | |
96 | """ | |
97 | if self.sep != "|": | |
98 | filename = "0-REGTYPE" | |
99 | self.tar.extract(filename, dirname()) | |
100 | data = file(os.path.join(dirname(), filename), "rb").read() | |
101 | ||
102 | tarinfo = self.tar.getmember(filename) | |
103 | fobj = self.tar.extractfile(tarinfo) | |
104 | ||
105 | text = fobj.read() | |
106 | fobj.seek(0) | |
107 | self.assert_(0 == fobj.tell(), | |
108 | "seek() to file's start failed") | |
109 | fobj.seek(2048, 0) | |
110 | self.assert_(2048 == fobj.tell(), | |
111 | "seek() to absolute position failed") | |
112 | fobj.seek(-1024, 1) | |
113 | self.assert_(1024 == fobj.tell(), | |
114 | "seek() to negative relative position failed") | |
115 | fobj.seek(1024, 1) | |
116 | self.assert_(2048 == fobj.tell(), | |
117 | "seek() to positive relative position failed") | |
118 | s = fobj.read(10) | |
119 | self.assert_(s == data[2048:2058], | |
120 | "read() after seek failed") | |
121 | fobj.seek(0, 2) | |
122 | self.assert_(tarinfo.size == fobj.tell(), | |
123 | "seek() to file's end failed") | |
124 | self.assert_(fobj.read() == "", | |
125 | "read() at file's end did not return empty string") | |
126 | fobj.seek(-tarinfo.size, 2) | |
127 | self.assert_(0 == fobj.tell(), | |
128 | "relative seek() to file's start failed") | |
129 | fobj.seek(512) | |
130 | s1 = fobj.readlines() | |
131 | fobj.seek(512) | |
132 | s2 = fobj.readlines() | |
133 | self.assert_(s1 == s2, | |
134 | "readlines() after seek failed") | |
135 | fobj.close() | |
136 | ||
137 | class ReadStreamTest(ReadTest): | |
138 | sep = "|" | |
139 | ||
140 | def test(self): | |
141 | """Test member extraction, and for StreamError when | |
142 | seeking backwards. | |
143 | """ | |
144 | ReadTest.test(self) | |
145 | tarinfo = self.tar.getmembers()[0] | |
146 | f = self.tar.extractfile(tarinfo) | |
147 | self.assertRaises(tarfile.StreamError, f.read) | |
148 | ||
149 | def test_stream(self): | |
150 | """Compare the normal tar and the stream tar. | |
151 | """ | |
152 | stream = self.tar | |
153 | tar = tarfile.open(tarname(), 'r') | |
154 | ||
155 | while 1: | |
156 | t1 = tar.next() | |
157 | t2 = stream.next() | |
158 | if t1 is None: | |
159 | break | |
160 | self.assert_(t2 is not None, "stream.next() failed.") | |
161 | ||
162 | if t2.islnk() or t2.issym(): | |
163 | self.assertRaises(tarfile.StreamError, stream.extractfile, t2) | |
164 | continue | |
165 | v1 = tar.extractfile(t1) | |
166 | v2 = stream.extractfile(t2) | |
167 | if v1 is None: | |
168 | continue | |
169 | self.assert_(v2 is not None, "stream.extractfile() failed") | |
170 | self.assert_(v1.read() == v2.read(), "stream extraction failed") | |
171 | ||
172 | stream.close() | |
173 | ||
174 | class WriteTest(BaseTest): | |
175 | mode = 'w' | |
176 | ||
177 | def setUp(self): | |
178 | mode = self.mode + self.sep + self.comp | |
179 | self.src = tarfile.open(tarname(self.comp), 'r') | |
180 | self.dstname = tmpname() | |
181 | self.dst = tarfile.open(self.dstname, mode) | |
182 | ||
183 | def tearDown(self): | |
184 | self.src.close() | |
185 | self.dst.close() | |
186 | ||
187 | def test_posix(self): | |
188 | self.dst.posix = 1 | |
189 | self._test() | |
190 | ||
191 | def test_nonposix(self): | |
192 | self.dst.posix = 0 | |
193 | self._test() | |
194 | ||
195 | def test_small(self): | |
196 | self.dst.add(os.path.join(os.path.dirname(__file__),"cfgparser.1")) | |
197 | self.dst.close() | |
198 | self.assertNotEqual(os.stat(self.dstname).st_size, 0) | |
199 | ||
200 | def _test(self): | |
201 | for tarinfo in self.src: | |
202 | if not tarinfo.isreg(): | |
203 | continue | |
204 | f = self.src.extractfile(tarinfo) | |
205 | if self.dst.posix and len(tarinfo.name) > tarfile.LENGTH_NAME: | |
206 | self.assertRaises(ValueError, self.dst.addfile, | |
207 | tarinfo, f) | |
208 | else: | |
209 | self.dst.addfile(tarinfo, f) | |
210 | ||
211 | class WriteSize0Test(BaseTest): | |
212 | mode = 'w' | |
213 | ||
214 | def setUp(self): | |
215 | self.tmpdir = dirname() | |
216 | self.dstname = tmpname() | |
217 | self.dst = tarfile.open(self.dstname, "w") | |
218 | ||
219 | def tearDown(self): | |
220 | self.dst.close() | |
221 | ||
222 | def test_file(self): | |
223 | path = os.path.join(self.tmpdir, "file") | |
224 | file(path, "w") | |
225 | tarinfo = self.dst.gettarinfo(path) | |
226 | self.assertEqual(tarinfo.size, 0) | |
227 | file(path, "w").write("aaa") | |
228 | tarinfo = self.dst.gettarinfo(path) | |
229 | self.assertEqual(tarinfo.size, 3) | |
230 | ||
231 | def test_directory(self): | |
232 | path = os.path.join(self.tmpdir, "directory") | |
233 | os.mkdir(path) | |
234 | tarinfo = self.dst.gettarinfo(path) | |
235 | self.assertEqual(tarinfo.size, 0) | |
236 | ||
237 | def test_symlink(self): | |
238 | if hasattr(os, "symlink"): | |
239 | path = os.path.join(self.tmpdir, "symlink") | |
240 | os.symlink("link_target", path) | |
241 | tarinfo = self.dst.gettarinfo(path) | |
242 | self.assertEqual(tarinfo.size, 0) | |
243 | ||
244 | ||
245 | class WriteStreamTest(WriteTest): | |
246 | sep = '|' | |
247 | ||
248 | class WriteGNULongTest(unittest.TestCase): | |
249 | """This testcase checks for correct creation of GNU Longname | |
250 | and Longlink extensions. | |
251 | ||
252 | It creates a tarfile and adds empty members with either | |
253 | long names, long linknames or both and compares the size | |
254 | of the tarfile with the expected size. | |
255 | ||
256 | It checks for SF bug #812325 in TarFile._create_gnulong(). | |
257 | ||
258 | While I was writing this testcase, I noticed a second bug | |
259 | in the same method: | |
260 | Long{names,links} weren't null-terminated which lead to | |
261 | bad tarfiles when their length was a multiple of 512. This | |
262 | is tested as well. | |
263 | """ | |
264 | ||
265 | def setUp(self): | |
266 | self.tar = tarfile.open(tmpname(), "w") | |
267 | self.tar.posix = False | |
268 | ||
269 | def tearDown(self): | |
270 | self.tar.close() | |
271 | ||
272 | def _length(self, s): | |
273 | blocks, remainder = divmod(len(s) + 1, 512) | |
274 | if remainder: | |
275 | blocks += 1 | |
276 | return blocks * 512 | |
277 | ||
278 | def _calc_size(self, name, link=None): | |
279 | # initial tar header | |
280 | count = 512 | |
281 | ||
282 | if len(name) > tarfile.LENGTH_NAME: | |
283 | # gnu longname extended header + longname | |
284 | count += 512 | |
285 | count += self._length(name) | |
286 | ||
287 | if link is not None and len(link) > tarfile.LENGTH_LINK: | |
288 | # gnu longlink extended header + longlink | |
289 | count += 512 | |
290 | count += self._length(link) | |
291 | ||
292 | return count | |
293 | ||
294 | def _test(self, name, link=None): | |
295 | tarinfo = tarfile.TarInfo(name) | |
296 | if link: | |
297 | tarinfo.linkname = link | |
298 | tarinfo.type = tarfile.LNKTYPE | |
299 | ||
300 | self.tar.addfile(tarinfo) | |
301 | ||
302 | v1 = self._calc_size(name, link) | |
303 | v2 = self.tar.offset | |
304 | self.assertEqual(v1, v2, "GNU longname/longlink creation failed") | |
305 | ||
306 | def test_longname_1023(self): | |
307 | self._test(("longnam/" * 127) + "longnam") | |
308 | ||
309 | def test_longname_1024(self): | |
310 | self._test(("longnam/" * 127) + "longname") | |
311 | ||
312 | def test_longname_1025(self): | |
313 | self._test(("longnam/" * 127) + "longname_") | |
314 | ||
315 | def test_longlink_1023(self): | |
316 | self._test("name", ("longlnk/" * 127) + "longlnk") | |
317 | ||
318 | def test_longlink_1024(self): | |
319 | self._test("name", ("longlnk/" * 127) + "longlink") | |
320 | ||
321 | def test_longlink_1025(self): | |
322 | self._test("name", ("longlnk/" * 127) + "longlink_") | |
323 | ||
324 | def test_longnamelink_1023(self): | |
325 | self._test(("longnam/" * 127) + "longnam", | |
326 | ("longlnk/" * 127) + "longlnk") | |
327 | ||
328 | def test_longnamelink_1024(self): | |
329 | self._test(("longnam/" * 127) + "longname", | |
330 | ("longlnk/" * 127) + "longlink") | |
331 | ||
332 | def test_longnamelink_1025(self): | |
333 | self._test(("longnam/" * 127) + "longname_", | |
334 | ("longlnk/" * 127) + "longlink_") | |
335 | ||
336 | class ExtractHardlinkTest(BaseTest): | |
337 | ||
338 | def test_hardlink(self): | |
339 | """Test hardlink extraction (bug #857297) | |
340 | """ | |
341 | # Prevent errors from being caught | |
342 | self.tar.errorlevel = 1 | |
343 | ||
344 | self.tar.extract("0-REGTYPE", dirname()) | |
345 | try: | |
346 | # Extract 1-LNKTYPE which is a hardlink to 0-REGTYPE | |
347 | self.tar.extract("1-LNKTYPE", dirname()) | |
348 | except EnvironmentError, e: | |
349 | import errno | |
350 | if e.errno == errno.ENOENT: | |
351 | self.fail("hardlink not extracted properly") | |
352 | ||
353 | ||
354 | # Gzip TestCases | |
355 | class ReadTestGzip(ReadTest): | |
356 | comp = "gz" | |
357 | class ReadStreamTestGzip(ReadStreamTest): | |
358 | comp = "gz" | |
359 | class WriteTestGzip(WriteTest): | |
360 | comp = "gz" | |
361 | class WriteStreamTestGzip(WriteStreamTest): | |
362 | comp = "gz" | |
363 | ||
364 | # Filemode test cases | |
365 | ||
366 | class FileModeTest(unittest.TestCase): | |
367 | def test_modes(self): | |
368 | self.assertEqual(tarfile.filemode(0755), '-rwxr-xr-x') | |
369 | self.assertEqual(tarfile.filemode(07111), '---s--s--t') | |
370 | ||
371 | ||
372 | if bz2: | |
373 | # Bzip2 TestCases | |
374 | class ReadTestBzip2(ReadTestGzip): | |
375 | comp = "bz2" | |
376 | class ReadStreamTestBzip2(ReadStreamTestGzip): | |
377 | comp = "bz2" | |
378 | class WriteTestBzip2(WriteTest): | |
379 | comp = "bz2" | |
380 | class WriteStreamTestBzip2(WriteStreamTestGzip): | |
381 | comp = "bz2" | |
382 | ||
383 | # If importing gzip failed, discard the Gzip TestCases. | |
384 | if not gzip: | |
385 | del ReadTestGzip | |
386 | del ReadStreamTestGzip | |
387 | del WriteTestGzip | |
388 | del WriteStreamTestGzip | |
389 | ||
390 | def test_main(): | |
391 | if gzip: | |
392 | # create testtar.tar.gz | |
393 | gzip.open(tarname("gz"), "wb").write(file(tarname(), "rb").read()) | |
394 | if bz2: | |
395 | # create testtar.tar.bz2 | |
396 | bz2.BZ2File(tarname("bz2"), "wb").write(file(tarname(), "rb").read()) | |
397 | ||
398 | tests = [ | |
399 | FileModeTest, | |
400 | ReadTest, | |
401 | ReadStreamTest, | |
402 | WriteTest, | |
403 | WriteSize0Test, | |
404 | WriteStreamTest, | |
405 | WriteGNULongTest, | |
406 | ] | |
407 | ||
408 | if hasattr(os, "link"): | |
409 | tests.append(ExtractHardlinkTest) | |
410 | ||
411 | if gzip: | |
412 | tests.extend([ | |
413 | ReadTestGzip, ReadStreamTestGzip, | |
414 | WriteTestGzip, WriteStreamTestGzip | |
415 | ]) | |
416 | ||
417 | if bz2: | |
418 | tests.extend([ | |
419 | ReadTestBzip2, ReadStreamTestBzip2, | |
420 | WriteTestBzip2, WriteStreamTestBzip2 | |
421 | ]) | |
422 | try: | |
423 | test_support.run_unittest(*tests) | |
424 | finally: | |
425 | if gzip: | |
426 | os.remove(tarname("gz")) | |
427 | if bz2: | |
428 | os.remove(tarname("bz2")) | |
429 | if os.path.exists(dirname()): | |
430 | shutil.rmtree(dirname()) | |
431 | if os.path.exists(tmpname()): | |
432 | os.remove(tmpname()) | |
433 | ||
434 | if __name__ == "__main__": | |
435 | test_main() |