Commit | Line | Data |
---|---|---|
920dae64 AT |
1 | from test.test_support import verify, vereq, TESTFN |
2 | import mmap | |
3 | import os, re | |
4 | ||
5 | PAGESIZE = mmap.PAGESIZE | |
6 | ||
7 | def test_both(): | |
8 | "Test mmap module on Unix systems and Windows" | |
9 | ||
10 | # Create a file to be mmap'ed. | |
11 | if os.path.exists(TESTFN): | |
12 | os.unlink(TESTFN) | |
13 | f = open(TESTFN, 'w+') | |
14 | ||
15 | try: # unlink TESTFN no matter what | |
16 | # Write 2 pages worth of data to the file | |
17 | f.write('\0'* PAGESIZE) | |
18 | f.write('foo') | |
19 | f.write('\0'* (PAGESIZE-3) ) | |
20 | f.flush() | |
21 | m = mmap.mmap(f.fileno(), 2 * PAGESIZE) | |
22 | f.close() | |
23 | ||
24 | # Simple sanity checks | |
25 | ||
26 | print type(m) # SF bug 128713: segfaulted on Linux | |
27 | print ' Position of foo:', m.find('foo') / float(PAGESIZE), 'pages' | |
28 | vereq(m.find('foo'), PAGESIZE) | |
29 | ||
30 | print ' Length of file:', len(m) / float(PAGESIZE), 'pages' | |
31 | vereq(len(m), 2*PAGESIZE) | |
32 | ||
33 | print ' Contents of byte 0:', repr(m[0]) | |
34 | vereq(m[0], '\0') | |
35 | print ' Contents of first 3 bytes:', repr(m[0:3]) | |
36 | vereq(m[0:3], '\0\0\0') | |
37 | ||
38 | # Modify the file's content | |
39 | print "\n Modifying file's content..." | |
40 | m[0] = '3' | |
41 | m[PAGESIZE +3: PAGESIZE +3+3] = 'bar' | |
42 | ||
43 | # Check that the modification worked | |
44 | print ' Contents of byte 0:', repr(m[0]) | |
45 | vereq(m[0], '3') | |
46 | print ' Contents of first 3 bytes:', repr(m[0:3]) | |
47 | vereq(m[0:3], '3\0\0') | |
48 | print ' Contents of second page:', repr(m[PAGESIZE-1 : PAGESIZE + 7]) | |
49 | vereq(m[PAGESIZE-1 : PAGESIZE + 7], '\0foobar\0') | |
50 | ||
51 | m.flush() | |
52 | ||
53 | # Test doing a regular expression match in an mmap'ed file | |
54 | match = re.search('[A-Za-z]+', m) | |
55 | if match is None: | |
56 | print ' ERROR: regex match on mmap failed!' | |
57 | else: | |
58 | start, end = match.span(0) | |
59 | length = end - start | |
60 | ||
61 | print ' Regex match on mmap (page start, length of match):', | |
62 | print start / float(PAGESIZE), length | |
63 | ||
64 | vereq(start, PAGESIZE) | |
65 | vereq(end, PAGESIZE + 6) | |
66 | ||
67 | # test seeking around (try to overflow the seek implementation) | |
68 | m.seek(0,0) | |
69 | print ' Seek to zeroth byte' | |
70 | vereq(m.tell(), 0) | |
71 | m.seek(42,1) | |
72 | print ' Seek to 42nd byte' | |
73 | vereq(m.tell(), 42) | |
74 | m.seek(0,2) | |
75 | print ' Seek to last byte' | |
76 | vereq(m.tell(), len(m)) | |
77 | ||
78 | print ' Try to seek to negative position...' | |
79 | try: | |
80 | m.seek(-1) | |
81 | except ValueError: | |
82 | pass | |
83 | else: | |
84 | verify(0, 'expected a ValueError but did not get it') | |
85 | ||
86 | print ' Try to seek beyond end of mmap...' | |
87 | try: | |
88 | m.seek(1,2) | |
89 | except ValueError: | |
90 | pass | |
91 | else: | |
92 | verify(0, 'expected a ValueError but did not get it') | |
93 | ||
94 | print ' Try to seek to negative position...' | |
95 | try: | |
96 | m.seek(-len(m)-1,2) | |
97 | except ValueError: | |
98 | pass | |
99 | else: | |
100 | verify(0, 'expected a ValueError but did not get it') | |
101 | ||
102 | # Try resizing map | |
103 | print ' Attempting resize()' | |
104 | try: | |
105 | m.resize(512) | |
106 | except SystemError: | |
107 | # resize() not supported | |
108 | # No messages are printed, since the output of this test suite | |
109 | # would then be different across platforms. | |
110 | pass | |
111 | else: | |
112 | # resize() is supported | |
113 | verify(len(m) == 512, | |
114 | "len(m) is %d, but expecting 512" % (len(m),) ) | |
115 | # Check that we can no longer seek beyond the new size. | |
116 | try: | |
117 | m.seek(513,0) | |
118 | except ValueError: | |
119 | pass | |
120 | else: | |
121 | verify(0, 'Could seek beyond the new size') | |
122 | ||
123 | # Check that the underlying file is truncated too | |
124 | # (bug #728515) | |
125 | f = open(TESTFN) | |
126 | f.seek(0, 2) | |
127 | verify(f.tell() == 512, 'Underlying file not truncated') | |
128 | f.close() | |
129 | verify(m.size() == 512, 'New size not reflected in file') | |
130 | ||
131 | m.close() | |
132 | ||
133 | finally: | |
134 | try: | |
135 | f.close() | |
136 | except OSError: | |
137 | pass | |
138 | try: | |
139 | os.unlink(TESTFN) | |
140 | except OSError: | |
141 | pass | |
142 | ||
143 | # Test for "access" keyword parameter | |
144 | try: | |
145 | mapsize = 10 | |
146 | print " Creating", mapsize, "byte test data file." | |
147 | open(TESTFN, "wb").write("a"*mapsize) | |
148 | print " Opening mmap with access=ACCESS_READ" | |
149 | f = open(TESTFN, "rb") | |
150 | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_READ) | |
151 | verify(m[:] == 'a'*mapsize, "Readonly memory map data incorrect.") | |
152 | ||
153 | print " Ensuring that readonly mmap can't be slice assigned." | |
154 | try: | |
155 | m[:] = 'b'*mapsize | |
156 | except TypeError: | |
157 | pass | |
158 | else: | |
159 | verify(0, "Able to write to readonly memory map") | |
160 | ||
161 | print " Ensuring that readonly mmap can't be item assigned." | |
162 | try: | |
163 | m[0] = 'b' | |
164 | except TypeError: | |
165 | pass | |
166 | else: | |
167 | verify(0, "Able to write to readonly memory map") | |
168 | ||
169 | print " Ensuring that readonly mmap can't be write() to." | |
170 | try: | |
171 | m.seek(0,0) | |
172 | m.write('abc') | |
173 | except TypeError: | |
174 | pass | |
175 | else: | |
176 | verify(0, "Able to write to readonly memory map") | |
177 | ||
178 | print " Ensuring that readonly mmap can't be write_byte() to." | |
179 | try: | |
180 | m.seek(0,0) | |
181 | m.write_byte('d') | |
182 | except TypeError: | |
183 | pass | |
184 | else: | |
185 | verify(0, "Able to write to readonly memory map") | |
186 | ||
187 | print " Ensuring that readonly mmap can't be resized." | |
188 | try: | |
189 | m.resize(2*mapsize) | |
190 | except SystemError: # resize is not universally supported | |
191 | pass | |
192 | except TypeError: | |
193 | pass | |
194 | else: | |
195 | verify(0, "Able to resize readonly memory map") | |
196 | del m, f | |
197 | verify(open(TESTFN, "rb").read() == 'a'*mapsize, | |
198 | "Readonly memory map data file was modified") | |
199 | ||
200 | print " Opening mmap with size too big" | |
201 | import sys | |
202 | f = open(TESTFN, "r+b") | |
203 | try: | |
204 | m = mmap.mmap(f.fileno(), mapsize+1) | |
205 | except ValueError: | |
206 | # we do not expect a ValueError on Windows | |
207 | # CAUTION: This also changes the size of the file on disk, and | |
208 | # later tests assume that the length hasn't changed. We need to | |
209 | # repair that. | |
210 | if sys.platform.startswith('win'): | |
211 | verify(0, "Opening mmap with size+1 should work on Windows.") | |
212 | else: | |
213 | # we expect a ValueError on Unix, but not on Windows | |
214 | if not sys.platform.startswith('win'): | |
215 | verify(0, "Opening mmap with size+1 should raise ValueError.") | |
216 | m.close() | |
217 | f.close() | |
218 | if sys.platform.startswith('win'): | |
219 | # Repair damage from the resizing test. | |
220 | f = open(TESTFN, 'r+b') | |
221 | f.truncate(mapsize) | |
222 | f.close() | |
223 | ||
224 | print " Opening mmap with access=ACCESS_WRITE" | |
225 | f = open(TESTFN, "r+b") | |
226 | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_WRITE) | |
227 | print " Modifying write-through memory map." | |
228 | m[:] = 'c'*mapsize | |
229 | verify(m[:] == 'c'*mapsize, | |
230 | "Write-through memory map memory not updated properly.") | |
231 | m.flush() | |
232 | m.close() | |
233 | f.close() | |
234 | f = open(TESTFN, 'rb') | |
235 | stuff = f.read() | |
236 | f.close() | |
237 | verify(stuff == 'c'*mapsize, | |
238 | "Write-through memory map data file not updated properly.") | |
239 | ||
240 | print " Opening mmap with access=ACCESS_COPY" | |
241 | f = open(TESTFN, "r+b") | |
242 | m = mmap.mmap(f.fileno(), mapsize, access=mmap.ACCESS_COPY) | |
243 | print " Modifying copy-on-write memory map." | |
244 | m[:] = 'd'*mapsize | |
245 | verify(m[:] == 'd' * mapsize, | |
246 | "Copy-on-write memory map data not written correctly.") | |
247 | m.flush() | |
248 | verify(open(TESTFN, "rb").read() == 'c'*mapsize, | |
249 | "Copy-on-write test data file should not be modified.") | |
250 | try: | |
251 | print " Ensuring copy-on-write maps cannot be resized." | |
252 | m.resize(2*mapsize) | |
253 | except TypeError: | |
254 | pass | |
255 | else: | |
256 | verify(0, "Copy-on-write mmap resize did not raise exception.") | |
257 | del m, f | |
258 | try: | |
259 | print " Ensuring invalid access parameter raises exception." | |
260 | f = open(TESTFN, "r+b") | |
261 | m = mmap.mmap(f.fileno(), mapsize, access=4) | |
262 | except ValueError: | |
263 | pass | |
264 | else: | |
265 | verify(0, "Invalid access code should have raised exception.") | |
266 | ||
267 | if os.name == "posix": | |
268 | # Try incompatible flags, prot and access parameters. | |
269 | f = open(TESTFN, "r+b") | |
270 | try: | |
271 | m = mmap.mmap(f.fileno(), mapsize, flags=mmap.MAP_PRIVATE, | |
272 | prot=mmap.PROT_READ, access=mmap.ACCESS_WRITE) | |
273 | except ValueError: | |
274 | pass | |
275 | else: | |
276 | verify(0, "Incompatible parameters should raise ValueError.") | |
277 | f.close() | |
278 | finally: | |
279 | try: | |
280 | os.unlink(TESTFN) | |
281 | except OSError: | |
282 | pass | |
283 | ||
284 | # Do a tougher .find() test. SF bug 515943 pointed out that, in 2.2, | |
285 | # searching for data with embedded \0 bytes didn't work. | |
286 | f = open(TESTFN, 'w+') | |
287 | ||
288 | try: # unlink TESTFN no matter what | |
289 | data = 'aabaac\x00deef\x00\x00aa\x00' | |
290 | n = len(data) | |
291 | f.write(data) | |
292 | f.flush() | |
293 | m = mmap.mmap(f.fileno(), n) | |
294 | f.close() | |
295 | ||
296 | for start in range(n+1): | |
297 | for finish in range(start, n+1): | |
298 | slice = data[start : finish] | |
299 | vereq(m.find(slice), data.find(slice)) | |
300 | vereq(m.find(slice + 'x'), -1) | |
301 | m.close() | |
302 | ||
303 | finally: | |
304 | os.unlink(TESTFN) | |
305 | ||
306 | # make sure a double close doesn't crash on Solaris (Bug# 665913) | |
307 | f = open(TESTFN, 'w+') | |
308 | ||
309 | try: # unlink TESTFN no matter what | |
310 | f.write(2**16 * 'a') # Arbitrary character | |
311 | f.close() | |
312 | ||
313 | f = open(TESTFN) | |
314 | mf = mmap.mmap(f.fileno(), 2**16, access=mmap.ACCESS_READ) | |
315 | mf.close() | |
316 | mf.close() | |
317 | f.close() | |
318 | ||
319 | finally: | |
320 | os.unlink(TESTFN) | |
321 | ||
322 | ||
323 | print ' Test passed' | |
324 | ||
325 | test_both() |