Commit | Line | Data |
---|---|---|
86530b38 AT |
1 | """Macintosh binhex compression/decompression. |
2 | ||
3 | easy interface: | |
4 | binhex(inputfilename, outputfilename) | |
5 | hexbin(inputfilename, outputfilename) | |
6 | """ | |
7 | ||
8 | # | |
9 | # Jack Jansen, CWI, August 1995. | |
10 | # | |
11 | # The module is supposed to be as compatible as possible. Especially the | |
12 | # easy interface should work "as expected" on any platform. | |
13 | # XXXX Note: currently, textfiles appear in mac-form on all platforms. | |
14 | # We seem to lack a simple character-translate in python. | |
15 | # (we should probably use ISO-Latin-1 on all but the mac platform). | |
16 | # XXXX The simple routines are too simple: they expect to hold the complete | |
17 | # files in-core. Should be fixed. | |
18 | # XXXX It would be nice to handle AppleDouble format on unix | |
19 | # (for servers serving macs). | |
20 | # XXXX I don't understand what happens when you get 0x90 times the same byte on | |
21 | # input. The resulting code (xx 90 90) would appear to be interpreted as an | |
22 | # escaped *value* of 0x90. All coders I've seen appear to ignore this nicety... | |
23 | # | |
24 | import sys | |
25 | import os | |
26 | import struct | |
27 | import binascii | |
28 | ||
29 | __all__ = ["binhex","hexbin","Error"] | |
30 | ||
31 | class Error(Exception): | |
32 | pass | |
33 | ||
34 | # States (what have we written) | |
35 | [_DID_HEADER, _DID_DATA, _DID_RSRC] = range(3) | |
36 | ||
37 | # Various constants | |
38 | REASONABLY_LARGE=32768 # Minimal amount we pass the rle-coder | |
39 | LINELEN=64 | |
40 | RUNCHAR=chr(0x90) # run-length introducer | |
41 | ||
42 | # | |
43 | # This code is no longer byte-order dependent | |
44 | ||
45 | # | |
46 | # Workarounds for non-mac machines. | |
47 | if os.name == 'mac': | |
48 | import macfs | |
49 | import MacOS | |
50 | try: | |
51 | openrf = MacOS.openrf | |
52 | except AttributeError: | |
53 | # Backward compatibility | |
54 | openrf = open | |
55 | ||
56 | def FInfo(): | |
57 | return macfs.FInfo() | |
58 | ||
59 | def getfileinfo(name): | |
60 | finfo = macfs.FSSpec(name).GetFInfo() | |
61 | dir, file = os.path.split(name) | |
62 | # XXXX Get resource/data sizes | |
63 | fp = open(name, 'rb') | |
64 | fp.seek(0, 2) | |
65 | dlen = fp.tell() | |
66 | fp = openrf(name, '*rb') | |
67 | fp.seek(0, 2) | |
68 | rlen = fp.tell() | |
69 | return file, finfo, dlen, rlen | |
70 | ||
71 | def openrsrc(name, *mode): | |
72 | if not mode: | |
73 | mode = '*rb' | |
74 | else: | |
75 | mode = '*' + mode[0] | |
76 | return openrf(name, mode) | |
77 | ||
78 | else: | |
79 | # | |
80 | # Glue code for non-macintosh usage | |
81 | # | |
82 | ||
83 | class FInfo: | |
84 | def __init__(self): | |
85 | self.Type = '????' | |
86 | self.Creator = '????' | |
87 | self.Flags = 0 | |
88 | ||
89 | def getfileinfo(name): | |
90 | finfo = FInfo() | |
91 | # Quick check for textfile | |
92 | fp = open(name) | |
93 | data = open(name).read(256) | |
94 | for c in data: | |
95 | if not c.isspace() and (c<' ' or ord(c) > 0x7f): | |
96 | break | |
97 | else: | |
98 | finfo.Type = 'TEXT' | |
99 | fp.seek(0, 2) | |
100 | dsize = fp.tell() | |
101 | fp.close() | |
102 | dir, file = os.path.split(name) | |
103 | file = file.replace(':', '-', 1) | |
104 | return file, finfo, dsize, 0 | |
105 | ||
106 | class openrsrc: | |
107 | def __init__(self, *args): | |
108 | pass | |
109 | ||
110 | def read(self, *args): | |
111 | return '' | |
112 | ||
113 | def write(self, *args): | |
114 | pass | |
115 | ||
116 | def close(self): | |
117 | pass | |
118 | ||
119 | class _Hqxcoderengine: | |
120 | """Write data to the coder in 3-byte chunks""" | |
121 | ||
122 | def __init__(self, ofp): | |
123 | self.ofp = ofp | |
124 | self.data = '' | |
125 | self.hqxdata = '' | |
126 | self.linelen = LINELEN-1 | |
127 | ||
128 | def write(self, data): | |
129 | self.data = self.data + data | |
130 | datalen = len(self.data) | |
131 | todo = (datalen//3)*3 | |
132 | data = self.data[:todo] | |
133 | self.data = self.data[todo:] | |
134 | if not data: | |
135 | return | |
136 | self.hqxdata = self.hqxdata + binascii.b2a_hqx(data) | |
137 | self._flush(0) | |
138 | ||
139 | def _flush(self, force): | |
140 | first = 0 | |
141 | while first <= len(self.hqxdata)-self.linelen: | |
142 | last = first + self.linelen | |
143 | self.ofp.write(self.hqxdata[first:last]+'\n') | |
144 | self.linelen = LINELEN | |
145 | first = last | |
146 | self.hqxdata = self.hqxdata[first:] | |
147 | if force: | |
148 | self.ofp.write(self.hqxdata + ':\n') | |
149 | ||
150 | def close(self): | |
151 | if self.data: | |
152 | self.hqxdata = \ | |
153 | self.hqxdata + binascii.b2a_hqx(self.data) | |
154 | self._flush(1) | |
155 | self.ofp.close() | |
156 | del self.ofp | |
157 | ||
158 | class _Rlecoderengine: | |
159 | """Write data to the RLE-coder in suitably large chunks""" | |
160 | ||
161 | def __init__(self, ofp): | |
162 | self.ofp = ofp | |
163 | self.data = '' | |
164 | ||
165 | def write(self, data): | |
166 | self.data = self.data + data | |
167 | if len(self.data) < REASONABLY_LARGE: | |
168 | return | |
169 | rledata = binascii.rlecode_hqx(self.data) | |
170 | self.ofp.write(rledata) | |
171 | self.data = '' | |
172 | ||
173 | def close(self): | |
174 | if self.data: | |
175 | rledata = binascii.rlecode_hqx(self.data) | |
176 | self.ofp.write(rledata) | |
177 | self.ofp.close() | |
178 | del self.ofp | |
179 | ||
180 | class BinHex: | |
181 | def __init__(self, (name, finfo, dlen, rlen), ofp): | |
182 | if type(ofp) == type(''): | |
183 | ofname = ofp | |
184 | ofp = open(ofname, 'w') | |
185 | if os.name == 'mac': | |
186 | fss = macfs.FSSpec(ofname) | |
187 | fss.SetCreatorType('BnHq', 'TEXT') | |
188 | ofp.write('(This file must be converted with BinHex 4.0)\n\n:') | |
189 | hqxer = _Hqxcoderengine(ofp) | |
190 | self.ofp = _Rlecoderengine(hqxer) | |
191 | self.crc = 0 | |
192 | if finfo is None: | |
193 | finfo = FInfo() | |
194 | self.dlen = dlen | |
195 | self.rlen = rlen | |
196 | self._writeinfo(name, finfo) | |
197 | self.state = _DID_HEADER | |
198 | ||
199 | def _writeinfo(self, name, finfo): | |
200 | nl = len(name) | |
201 | if nl > 63: | |
202 | raise Error, 'Filename too long' | |
203 | d = chr(nl) + name + '\0' | |
204 | d2 = finfo.Type + finfo.Creator | |
205 | ||
206 | # Force all structs to be packed with big-endian | |
207 | d3 = struct.pack('>h', finfo.Flags) | |
208 | d4 = struct.pack('>ii', self.dlen, self.rlen) | |
209 | info = d + d2 + d3 + d4 | |
210 | self._write(info) | |
211 | self._writecrc() | |
212 | ||
213 | def _write(self, data): | |
214 | self.crc = binascii.crc_hqx(data, self.crc) | |
215 | self.ofp.write(data) | |
216 | ||
217 | def _writecrc(self): | |
218 | # XXXX Should this be here?? | |
219 | # self.crc = binascii.crc_hqx('\0\0', self.crc) | |
220 | self.ofp.write(struct.pack('>h', self.crc)) | |
221 | self.crc = 0 | |
222 | ||
223 | def write(self, data): | |
224 | if self.state != _DID_HEADER: | |
225 | raise Error, 'Writing data at the wrong time' | |
226 | self.dlen = self.dlen - len(data) | |
227 | self._write(data) | |
228 | ||
229 | def close_data(self): | |
230 | if self.dlen != 0: | |
231 | raise Error, 'Incorrect data size, diff=%r' % (self.rlen,) | |
232 | self._writecrc() | |
233 | self.state = _DID_DATA | |
234 | ||
235 | def write_rsrc(self, data): | |
236 | if self.state < _DID_DATA: | |
237 | self.close_data() | |
238 | if self.state != _DID_DATA: | |
239 | raise Error, 'Writing resource data at the wrong time' | |
240 | self.rlen = self.rlen - len(data) | |
241 | self._write(data) | |
242 | ||
243 | def close(self): | |
244 | if self.state < _DID_DATA: | |
245 | self.close_data() | |
246 | if self.state != _DID_DATA: | |
247 | raise Error, 'Close at the wrong time' | |
248 | if self.rlen != 0: | |
249 | raise Error, \ | |
250 | "Incorrect resource-datasize, diff=%r" % (self.rlen,) | |
251 | self._writecrc() | |
252 | self.ofp.close() | |
253 | self.state = None | |
254 | del self.ofp | |
255 | ||
256 | def binhex(inp, out): | |
257 | """(infilename, outfilename) - Create binhex-encoded copy of a file""" | |
258 | finfo = getfileinfo(inp) | |
259 | ofp = BinHex(finfo, out) | |
260 | ||
261 | ifp = open(inp, 'rb') | |
262 | # XXXX Do textfile translation on non-mac systems | |
263 | while 1: | |
264 | d = ifp.read(128000) | |
265 | if not d: break | |
266 | ofp.write(d) | |
267 | ofp.close_data() | |
268 | ifp.close() | |
269 | ||
270 | ifp = openrsrc(inp, 'rb') | |
271 | while 1: | |
272 | d = ifp.read(128000) | |
273 | if not d: break | |
274 | ofp.write_rsrc(d) | |
275 | ofp.close() | |
276 | ifp.close() | |
277 | ||
278 | class _Hqxdecoderengine: | |
279 | """Read data via the decoder in 4-byte chunks""" | |
280 | ||
281 | def __init__(self, ifp): | |
282 | self.ifp = ifp | |
283 | self.eof = 0 | |
284 | ||
285 | def read(self, totalwtd): | |
286 | """Read at least wtd bytes (or until EOF)""" | |
287 | decdata = '' | |
288 | wtd = totalwtd | |
289 | # | |
290 | # The loop here is convoluted, since we don't really now how | |
291 | # much to decode: there may be newlines in the incoming data. | |
292 | while wtd > 0: | |
293 | if self.eof: return decdata | |
294 | wtd = ((wtd+2)//3)*4 | |
295 | data = self.ifp.read(wtd) | |
296 | # | |
297 | # Next problem: there may not be a complete number of | |
298 | # bytes in what we pass to a2b. Solve by yet another | |
299 | # loop. | |
300 | # | |
301 | while 1: | |
302 | try: | |
303 | decdatacur, self.eof = \ | |
304 | binascii.a2b_hqx(data) | |
305 | break | |
306 | except binascii.Incomplete: | |
307 | pass | |
308 | newdata = self.ifp.read(1) | |
309 | if not newdata: | |
310 | raise Error, \ | |
311 | 'Premature EOF on binhex file' | |
312 | data = data + newdata | |
313 | decdata = decdata + decdatacur | |
314 | wtd = totalwtd - len(decdata) | |
315 | if not decdata and not self.eof: | |
316 | raise Error, 'Premature EOF on binhex file' | |
317 | return decdata | |
318 | ||
319 | def close(self): | |
320 | self.ifp.close() | |
321 | ||
322 | class _Rledecoderengine: | |
323 | """Read data via the RLE-coder""" | |
324 | ||
325 | def __init__(self, ifp): | |
326 | self.ifp = ifp | |
327 | self.pre_buffer = '' | |
328 | self.post_buffer = '' | |
329 | self.eof = 0 | |
330 | ||
331 | def read(self, wtd): | |
332 | if wtd > len(self.post_buffer): | |
333 | self._fill(wtd-len(self.post_buffer)) | |
334 | rv = self.post_buffer[:wtd] | |
335 | self.post_buffer = self.post_buffer[wtd:] | |
336 | return rv | |
337 | ||
338 | def _fill(self, wtd): | |
339 | self.pre_buffer = self.pre_buffer + self.ifp.read(wtd+4) | |
340 | if self.ifp.eof: | |
341 | self.post_buffer = self.post_buffer + \ | |
342 | binascii.rledecode_hqx(self.pre_buffer) | |
343 | self.pre_buffer = '' | |
344 | return | |
345 | ||
346 | # | |
347 | # Obfuscated code ahead. We have to take care that we don't | |
348 | # end up with an orphaned RUNCHAR later on. So, we keep a couple | |
349 | # of bytes in the buffer, depending on what the end of | |
350 | # the buffer looks like: | |
351 | # '\220\0\220' - Keep 3 bytes: repeated \220 (escaped as \220\0) | |
352 | # '?\220' - Keep 2 bytes: repeated something-else | |
353 | # '\220\0' - Escaped \220: Keep 2 bytes. | |
354 | # '?\220?' - Complete repeat sequence: decode all | |
355 | # otherwise: keep 1 byte. | |
356 | # | |
357 | mark = len(self.pre_buffer) | |
358 | if self.pre_buffer[-3:] == RUNCHAR + '\0' + RUNCHAR: | |
359 | mark = mark - 3 | |
360 | elif self.pre_buffer[-1] == RUNCHAR: | |
361 | mark = mark - 2 | |
362 | elif self.pre_buffer[-2:] == RUNCHAR + '\0': | |
363 | mark = mark - 2 | |
364 | elif self.pre_buffer[-2] == RUNCHAR: | |
365 | pass # Decode all | |
366 | else: | |
367 | mark = mark - 1 | |
368 | ||
369 | self.post_buffer = self.post_buffer + \ | |
370 | binascii.rledecode_hqx(self.pre_buffer[:mark]) | |
371 | self.pre_buffer = self.pre_buffer[mark:] | |
372 | ||
373 | def close(self): | |
374 | self.ifp.close() | |
375 | ||
376 | class HexBin: | |
377 | def __init__(self, ifp): | |
378 | if type(ifp) == type(''): | |
379 | ifp = open(ifp) | |
380 | # | |
381 | # Find initial colon. | |
382 | # | |
383 | while 1: | |
384 | ch = ifp.read(1) | |
385 | if not ch: | |
386 | raise Error, "No binhex data found" | |
387 | # Cater for \r\n terminated lines (which show up as \n\r, hence | |
388 | # all lines start with \r) | |
389 | if ch == '\r': | |
390 | continue | |
391 | if ch == ':': | |
392 | break | |
393 | if ch != '\n': | |
394 | dummy = ifp.readline() | |
395 | ||
396 | hqxifp = _Hqxdecoderengine(ifp) | |
397 | self.ifp = _Rledecoderengine(hqxifp) | |
398 | self.crc = 0 | |
399 | self._readheader() | |
400 | ||
401 | def _read(self, len): | |
402 | data = self.ifp.read(len) | |
403 | self.crc = binascii.crc_hqx(data, self.crc) | |
404 | return data | |
405 | ||
406 | def _checkcrc(self): | |
407 | filecrc = struct.unpack('>h', self.ifp.read(2))[0] & 0xffff | |
408 | #self.crc = binascii.crc_hqx('\0\0', self.crc) | |
409 | # XXXX Is this needed?? | |
410 | self.crc = self.crc & 0xffff | |
411 | if filecrc != self.crc: | |
412 | raise Error, 'CRC error, computed %x, read %x' \ | |
413 | %(self.crc, filecrc) | |
414 | self.crc = 0 | |
415 | ||
416 | def _readheader(self): | |
417 | len = self._read(1) | |
418 | fname = self._read(ord(len)) | |
419 | rest = self._read(1+4+4+2+4+4) | |
420 | self._checkcrc() | |
421 | ||
422 | type = rest[1:5] | |
423 | creator = rest[5:9] | |
424 | flags = struct.unpack('>h', rest[9:11])[0] | |
425 | self.dlen = struct.unpack('>l', rest[11:15])[0] | |
426 | self.rlen = struct.unpack('>l', rest[15:19])[0] | |
427 | ||
428 | self.FName = fname | |
429 | self.FInfo = FInfo() | |
430 | self.FInfo.Creator = creator | |
431 | self.FInfo.Type = type | |
432 | self.FInfo.Flags = flags | |
433 | ||
434 | self.state = _DID_HEADER | |
435 | ||
436 | def read(self, *n): | |
437 | if self.state != _DID_HEADER: | |
438 | raise Error, 'Read data at wrong time' | |
439 | if n: | |
440 | n = n[0] | |
441 | n = min(n, self.dlen) | |
442 | else: | |
443 | n = self.dlen | |
444 | rv = '' | |
445 | while len(rv) < n: | |
446 | rv = rv + self._read(n-len(rv)) | |
447 | self.dlen = self.dlen - n | |
448 | return rv | |
449 | ||
450 | def close_data(self): | |
451 | if self.state != _DID_HEADER: | |
452 | raise Error, 'close_data at wrong time' | |
453 | if self.dlen: | |
454 | dummy = self._read(self.dlen) | |
455 | self._checkcrc() | |
456 | self.state = _DID_DATA | |
457 | ||
458 | def read_rsrc(self, *n): | |
459 | if self.state == _DID_HEADER: | |
460 | self.close_data() | |
461 | if self.state != _DID_DATA: | |
462 | raise Error, 'Read resource data at wrong time' | |
463 | if n: | |
464 | n = n[0] | |
465 | n = min(n, self.rlen) | |
466 | else: | |
467 | n = self.rlen | |
468 | self.rlen = self.rlen - n | |
469 | return self._read(n) | |
470 | ||
471 | def close(self): | |
472 | if self.rlen: | |
473 | dummy = self.read_rsrc(self.rlen) | |
474 | self._checkcrc() | |
475 | self.state = _DID_RSRC | |
476 | self.ifp.close() | |
477 | ||
478 | def hexbin(inp, out): | |
479 | """(infilename, outfilename) - Decode binhexed file""" | |
480 | ifp = HexBin(inp) | |
481 | finfo = ifp.FInfo | |
482 | if not out: | |
483 | out = ifp.FName | |
484 | if os.name == 'mac': | |
485 | ofss = macfs.FSSpec(out) | |
486 | out = ofss.as_pathname() | |
487 | ||
488 | ofp = open(out, 'wb') | |
489 | # XXXX Do translation on non-mac systems | |
490 | while 1: | |
491 | d = ifp.read(128000) | |
492 | if not d: break | |
493 | ofp.write(d) | |
494 | ofp.close() | |
495 | ifp.close_data() | |
496 | ||
497 | d = ifp.read_rsrc(128000) | |
498 | if d: | |
499 | ofp = openrsrc(out, 'wb') | |
500 | ofp.write(d) | |
501 | while 1: | |
502 | d = ifp.read_rsrc(128000) | |
503 | if not d: break | |
504 | ofp.write(d) | |
505 | ofp.close() | |
506 | ||
507 | if os.name == 'mac': | |
508 | nfinfo = ofss.GetFInfo() | |
509 | nfinfo.Creator = finfo.Creator | |
510 | nfinfo.Type = finfo.Type | |
511 | nfinfo.Flags = finfo.Flags | |
512 | ofss.SetFInfo(nfinfo) | |
513 | ||
514 | ifp.close() | |
515 | ||
516 | def _test(): | |
517 | if os.name == 'mac': | |
518 | fss, ok = macfs.PromptGetFile('File to convert:') | |
519 | if not ok: | |
520 | sys.exit(0) | |
521 | fname = fss.as_pathname() | |
522 | else: | |
523 | fname = sys.argv[1] | |
524 | binhex(fname, fname+'.hqx') | |
525 | hexbin(fname+'.hqx', fname+'.viahqx') | |
526 | #hexbin(fname, fname+'.unpacked') | |
527 | sys.exit(1) | |
528 | ||
529 | if __name__ == '__main__': | |
530 | _test() |