Commit | Line | Data |
---|---|---|
86530b38 AT |
1 | # -*- Mode: Python -*- |
2 | # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp | |
3 | # Author: Sam Rushing <rushing@nightmare.com> | |
4 | ||
5 | # ====================================================================== | |
6 | # Copyright 1996 by Sam Rushing | |
7 | # | |
8 | # All Rights Reserved | |
9 | # | |
10 | # Permission to use, copy, modify, and distribute this software and | |
11 | # its documentation for any purpose and without fee is hereby | |
12 | # granted, provided that the above copyright notice appear in all | |
13 | # copies and that both that copyright notice and this permission | |
14 | # notice appear in supporting documentation, and that the name of Sam | |
15 | # Rushing not be used in advertising or publicity pertaining to | |
16 | # distribution of the software without specific, written prior | |
17 | # permission. | |
18 | # | |
19 | # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, | |
20 | # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN | |
21 | # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR | |
22 | # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS | |
23 | # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, | |
24 | # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN | |
25 | # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. | |
26 | # ====================================================================== | |
27 | ||
28 | """Basic infrastructure for asynchronous socket service clients and servers. | |
29 | ||
30 | There are only two ways to have a program on a single processor do "more | |
31 | than one thing at a time". Multi-threaded programming is the simplest and | |
32 | most popular way to do it, but there is another very different technique, | |
33 | that lets you have nearly all the advantages of multi-threading, without | |
34 | actually using multiple threads. it's really only practical if your program | |
35 | is largely I/O bound. If your program is CPU bound, then pre-emptive | |
36 | scheduled threads are probably what you really need. Network servers are | |
37 | rarely CPU-bound, however. | |
38 | ||
39 | If your operating system supports the select() system call in its I/O | |
40 | library (and nearly all do), then you can use it to juggle multiple | |
41 | communication channels at once; doing other work while your I/O is taking | |
42 | place in the "background." Although this strategy can seem strange and | |
43 | complex, especially at first, it is in many ways easier to understand and | |
44 | control than multi-threaded programming. The module documented here solves | |
45 | many of the difficult problems for you, making the task of building | |
46 | sophisticated high-performance network servers and clients a snap. | |
47 | """ | |
48 | ||
49 | import exceptions | |
50 | import select | |
51 | import socket | |
52 | import sys | |
53 | import time | |
54 | ||
55 | import os | |
56 | from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \ | |
57 | ENOTCONN, ESHUTDOWN, EINTR, EISCONN, errorcode | |
58 | ||
59 | try: | |
60 | socket_map | |
61 | except NameError: | |
62 | socket_map = {} | |
63 | ||
64 | class ExitNow(exceptions.Exception): | |
65 | pass | |
66 | ||
67 | def read(obj): | |
68 | try: | |
69 | obj.handle_read_event() | |
70 | except ExitNow: | |
71 | raise | |
72 | except: | |
73 | obj.handle_error() | |
74 | ||
75 | def write(obj): | |
76 | try: | |
77 | obj.handle_write_event() | |
78 | except ExitNow: | |
79 | raise | |
80 | except: | |
81 | obj.handle_error() | |
82 | ||
83 | def _exception (obj): | |
84 | try: | |
85 | obj.handle_expt_event() | |
86 | except ExitNow: | |
87 | raise | |
88 | except: | |
89 | obj.handle_error() | |
90 | ||
91 | def readwrite(obj, flags): | |
92 | try: | |
93 | if flags & (select.POLLIN | select.POLLPRI): | |
94 | obj.handle_read_event() | |
95 | if flags & select.POLLOUT: | |
96 | obj.handle_write_event() | |
97 | if flags & (select.POLLERR | select.POLLHUP | select.POLLNVAL): | |
98 | obj.handle_expt_event() | |
99 | except ExitNow: | |
100 | raise | |
101 | except: | |
102 | obj.handle_error() | |
103 | ||
104 | def poll(timeout=0.0, map=None): | |
105 | if map is None: | |
106 | map = socket_map | |
107 | if map: | |
108 | r = []; w = []; e = [] | |
109 | for fd, obj in map.items(): | |
110 | is_r = obj.readable() | |
111 | is_w = obj.writable() | |
112 | if is_r: | |
113 | r.append(fd) | |
114 | if is_w: | |
115 | w.append(fd) | |
116 | if is_r or is_w: | |
117 | e.append(fd) | |
118 | if [] == r == w == e: | |
119 | time.sleep(timeout) | |
120 | else: | |
121 | try: | |
122 | r, w, e = select.select(r, w, e, timeout) | |
123 | except select.error, err: | |
124 | if err[0] != EINTR: | |
125 | raise | |
126 | else: | |
127 | return | |
128 | ||
129 | for fd in r: | |
130 | obj = map.get(fd) | |
131 | if obj is None: | |
132 | continue | |
133 | read(obj) | |
134 | ||
135 | for fd in w: | |
136 | obj = map.get(fd) | |
137 | if obj is None: | |
138 | continue | |
139 | write(obj) | |
140 | ||
141 | for fd in e: | |
142 | obj = map.get(fd) | |
143 | if obj is None: | |
144 | continue | |
145 | _exception(obj) | |
146 | ||
147 | def poll2(timeout=0.0, map=None): | |
148 | # Use the poll() support added to the select module in Python 2.0 | |
149 | if map is None: | |
150 | map = socket_map | |
151 | if timeout is not None: | |
152 | # timeout is in milliseconds | |
153 | timeout = int(timeout*1000) | |
154 | pollster = select.poll() | |
155 | if map: | |
156 | for fd, obj in map.items(): | |
157 | flags = 0 | |
158 | if obj.readable(): | |
159 | flags |= select.POLLIN | select.POLLPRI | |
160 | if obj.writable(): | |
161 | flags |= select.POLLOUT | |
162 | if flags: | |
163 | # Only check for exceptions if object was either readable | |
164 | # or writable. | |
165 | flags |= select.POLLERR | select.POLLHUP | select.POLLNVAL | |
166 | pollster.register(fd, flags) | |
167 | try: | |
168 | r = pollster.poll(timeout) | |
169 | except select.error, err: | |
170 | if err[0] != EINTR: | |
171 | raise | |
172 | r = [] | |
173 | for fd, flags in r: | |
174 | obj = map.get(fd) | |
175 | if obj is None: | |
176 | continue | |
177 | readwrite(obj, flags) | |
178 | ||
179 | poll3 = poll2 # Alias for backward compatibility | |
180 | ||
181 | def loop(timeout=30.0, use_poll=False, map=None, count=None): | |
182 | if map is None: | |
183 | map = socket_map | |
184 | ||
185 | if use_poll and hasattr(select, 'poll'): | |
186 | poll_fun = poll2 | |
187 | else: | |
188 | poll_fun = poll | |
189 | ||
190 | if count is None: | |
191 | while map: | |
192 | poll_fun(timeout, map) | |
193 | ||
194 | else: | |
195 | while map and count > 0: | |
196 | poll_fun(timeout, map) | |
197 | count = count - 1 | |
198 | ||
199 | class dispatcher: | |
200 | ||
201 | debug = False | |
202 | connected = False | |
203 | accepting = False | |
204 | closing = False | |
205 | addr = None | |
206 | ||
207 | def __init__(self, sock=None, map=None): | |
208 | if map is None: | |
209 | self._map = socket_map | |
210 | else: | |
211 | self._map = map | |
212 | ||
213 | if sock: | |
214 | self.set_socket(sock, map) | |
215 | # I think it should inherit this anyway | |
216 | self.socket.setblocking(0) | |
217 | self.connected = True | |
218 | # XXX Does the constructor require that the socket passed | |
219 | # be connected? | |
220 | try: | |
221 | self.addr = sock.getpeername() | |
222 | except socket.error: | |
223 | # The addr isn't crucial | |
224 | pass | |
225 | else: | |
226 | self.socket = None | |
227 | ||
228 | def __repr__(self): | |
229 | status = [self.__class__.__module__+"."+self.__class__.__name__] | |
230 | if self.accepting and self.addr: | |
231 | status.append('listening') | |
232 | elif self.connected: | |
233 | status.append('connected') | |
234 | if self.addr is not None: | |
235 | try: | |
236 | status.append('%s:%d' % self.addr) | |
237 | except TypeError: | |
238 | status.append(repr(self.addr)) | |
239 | return '<%s at %#x>' % (' '.join(status), id(self)) | |
240 | ||
241 | def add_channel(self, map=None): | |
242 | #self.log_info('adding channel %s' % self) | |
243 | if map is None: | |
244 | map = self._map | |
245 | map[self._fileno] = self | |
246 | ||
247 | def del_channel(self, map=None): | |
248 | fd = self._fileno | |
249 | if map is None: | |
250 | map = self._map | |
251 | if map.has_key(fd): | |
252 | #self.log_info('closing channel %d:%s' % (fd, self)) | |
253 | del map[fd] | |
254 | self._fileno = None | |
255 | ||
256 | def create_socket(self, family, type): | |
257 | self.family_and_type = family, type | |
258 | self.socket = socket.socket(family, type) | |
259 | self.socket.setblocking(0) | |
260 | self._fileno = self.socket.fileno() | |
261 | self.add_channel() | |
262 | ||
263 | def set_socket(self, sock, map=None): | |
264 | self.socket = sock | |
265 | ## self.__dict__['socket'] = sock | |
266 | self._fileno = sock.fileno() | |
267 | self.add_channel(map) | |
268 | ||
269 | def set_reuse_addr(self): | |
270 | # try to re-use a server port if possible | |
271 | try: | |
272 | self.socket.setsockopt( | |
273 | socket.SOL_SOCKET, socket.SO_REUSEADDR, | |
274 | self.socket.getsockopt(socket.SOL_SOCKET, | |
275 | socket.SO_REUSEADDR) | 1 | |
276 | ) | |
277 | except socket.error: | |
278 | pass | |
279 | ||
280 | # ================================================== | |
281 | # predicates for select() | |
282 | # these are used as filters for the lists of sockets | |
283 | # to pass to select(). | |
284 | # ================================================== | |
285 | ||
286 | def readable(self): | |
287 | return True | |
288 | ||
289 | def writable(self): | |
290 | return True | |
291 | ||
292 | # ================================================== | |
293 | # socket object methods. | |
294 | # ================================================== | |
295 | ||
296 | def listen(self, num): | |
297 | self.accepting = True | |
298 | if os.name == 'nt' and num > 5: | |
299 | num = 1 | |
300 | return self.socket.listen(num) | |
301 | ||
302 | def bind(self, addr): | |
303 | self.addr = addr | |
304 | return self.socket.bind(addr) | |
305 | ||
306 | def connect(self, address): | |
307 | self.connected = False | |
308 | err = self.socket.connect_ex(address) | |
309 | # XXX Should interpret Winsock return values | |
310 | if err in (EINPROGRESS, EALREADY, EWOULDBLOCK): | |
311 | return | |
312 | if err in (0, EISCONN): | |
313 | self.addr = address | |
314 | self.connected = True | |
315 | self.handle_connect() | |
316 | else: | |
317 | raise socket.error, (err, errorcode[err]) | |
318 | ||
319 | def accept(self): | |
320 | # XXX can return either an address pair or None | |
321 | try: | |
322 | conn, addr = self.socket.accept() | |
323 | return conn, addr | |
324 | except socket.error, why: | |
325 | if why[0] == EWOULDBLOCK: | |
326 | pass | |
327 | else: | |
328 | raise | |
329 | ||
330 | def send(self, data): | |
331 | try: | |
332 | result = self.socket.send(data) | |
333 | return result | |
334 | except socket.error, why: | |
335 | if why[0] == EWOULDBLOCK: | |
336 | return 0 | |
337 | else: | |
338 | raise | |
339 | return 0 | |
340 | ||
341 | def recv(self, buffer_size): | |
342 | try: | |
343 | data = self.socket.recv(buffer_size) | |
344 | if not data: | |
345 | # a closed connection is indicated by signaling | |
346 | # a read condition, and having recv() return 0. | |
347 | self.handle_close() | |
348 | return '' | |
349 | else: | |
350 | return data | |
351 | except socket.error, why: | |
352 | # winsock sometimes throws ENOTCONN | |
353 | if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]: | |
354 | self.handle_close() | |
355 | return '' | |
356 | else: | |
357 | raise | |
358 | ||
359 | def close(self): | |
360 | self.del_channel() | |
361 | self.socket.close() | |
362 | ||
363 | # cheap inheritance, used to pass all other attribute | |
364 | # references to the underlying socket object. | |
365 | def __getattr__(self, attr): | |
366 | return getattr(self.socket, attr) | |
367 | ||
368 | # log and log_info may be overridden to provide more sophisticated | |
369 | # logging and warning methods. In general, log is for 'hit' logging | |
370 | # and 'log_info' is for informational, warning and error logging. | |
371 | ||
372 | def log(self, message): | |
373 | sys.stderr.write('log: %s\n' % str(message)) | |
374 | ||
375 | def log_info(self, message, type='info'): | |
376 | if __debug__ or type != 'info': | |
377 | print '%s: %s' % (type, message) | |
378 | ||
379 | def handle_read_event(self): | |
380 | if self.accepting: | |
381 | # for an accepting socket, getting a read implies | |
382 | # that we are connected | |
383 | if not self.connected: | |
384 | self.connected = True | |
385 | self.handle_accept() | |
386 | elif not self.connected: | |
387 | self.handle_connect() | |
388 | self.connected = True | |
389 | self.handle_read() | |
390 | else: | |
391 | self.handle_read() | |
392 | ||
393 | def handle_write_event(self): | |
394 | # getting a write implies that we are connected | |
395 | if not self.connected: | |
396 | self.handle_connect() | |
397 | self.connected = True | |
398 | self.handle_write() | |
399 | ||
400 | def handle_expt_event(self): | |
401 | self.handle_expt() | |
402 | ||
403 | def handle_error(self): | |
404 | nil, t, v, tbinfo = compact_traceback() | |
405 | ||
406 | # sometimes a user repr method will crash. | |
407 | try: | |
408 | self_repr = repr(self) | |
409 | except: | |
410 | self_repr = '<__repr__(self) failed for object at %0x>' % id(self) | |
411 | ||
412 | self.log_info( | |
413 | 'uncaptured python exception, closing channel %s (%s:%s %s)' % ( | |
414 | self_repr, | |
415 | t, | |
416 | v, | |
417 | tbinfo | |
418 | ), | |
419 | 'error' | |
420 | ) | |
421 | self.close() | |
422 | ||
423 | def handle_expt(self): | |
424 | self.log_info('unhandled exception', 'warning') | |
425 | ||
426 | def handle_read(self): | |
427 | self.log_info('unhandled read event', 'warning') | |
428 | ||
429 | def handle_write(self): | |
430 | self.log_info('unhandled write event', 'warning') | |
431 | ||
432 | def handle_connect(self): | |
433 | self.log_info('unhandled connect event', 'warning') | |
434 | ||
435 | def handle_accept(self): | |
436 | self.log_info('unhandled accept event', 'warning') | |
437 | ||
438 | def handle_close(self): | |
439 | self.log_info('unhandled close event', 'warning') | |
440 | self.close() | |
441 | ||
442 | # --------------------------------------------------------------------------- | |
443 | # adds simple buffered output capability, useful for simple clients. | |
444 | # [for more sophisticated usage use asynchat.async_chat] | |
445 | # --------------------------------------------------------------------------- | |
446 | ||
447 | class dispatcher_with_send(dispatcher): | |
448 | ||
449 | def __init__(self, sock=None, map=None): | |
450 | dispatcher.__init__(self, sock, map) | |
451 | self.out_buffer = '' | |
452 | ||
453 | def initiate_send(self): | |
454 | num_sent = 0 | |
455 | num_sent = dispatcher.send(self, self.out_buffer[:512]) | |
456 | self.out_buffer = self.out_buffer[num_sent:] | |
457 | ||
458 | def handle_write(self): | |
459 | self.initiate_send() | |
460 | ||
461 | def writable(self): | |
462 | return (not self.connected) or len(self.out_buffer) | |
463 | ||
464 | def send(self, data): | |
465 | if self.debug: | |
466 | self.log_info('sending %s' % repr(data)) | |
467 | self.out_buffer = self.out_buffer + data | |
468 | self.initiate_send() | |
469 | ||
470 | # --------------------------------------------------------------------------- | |
471 | # used for debugging. | |
472 | # --------------------------------------------------------------------------- | |
473 | ||
474 | def compact_traceback(): | |
475 | t, v, tb = sys.exc_info() | |
476 | tbinfo = [] | |
477 | assert tb # Must have a traceback | |
478 | while tb: | |
479 | tbinfo.append(( | |
480 | tb.tb_frame.f_code.co_filename, | |
481 | tb.tb_frame.f_code.co_name, | |
482 | str(tb.tb_lineno) | |
483 | )) | |
484 | tb = tb.tb_next | |
485 | ||
486 | # just to be safe | |
487 | del tb | |
488 | ||
489 | file, function, line = tbinfo[-1] | |
490 | info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo]) | |
491 | return (file, function, line), t, v, info | |
492 | ||
493 | def close_all(map=None): | |
494 | if map is None: | |
495 | map = socket_map | |
496 | for x in map.values(): | |
497 | x.socket.close() | |
498 | map.clear() | |
499 | ||
500 | # Asynchronous File I/O: | |
501 | # | |
502 | # After a little research (reading man pages on various unixen, and | |
503 | # digging through the linux kernel), I've determined that select() | |
504 | # isn't meant for doing asynchronous file i/o. | |
505 | # Heartening, though - reading linux/mm/filemap.c shows that linux | |
506 | # supports asynchronous read-ahead. So _MOST_ of the time, the data | |
507 | # will be sitting in memory for us already when we go to read it. | |
508 | # | |
509 | # What other OS's (besides NT) support async file i/o? [VMS?] | |
510 | # | |
511 | # Regardless, this is useful for pipes, and stdin/stdout... | |
512 | ||
513 | if os.name == 'posix': | |
514 | import fcntl | |
515 | ||
516 | class file_wrapper: | |
517 | # here we override just enough to make a file | |
518 | # look like a socket for the purposes of asyncore. | |
519 | ||
520 | def __init__(self, fd): | |
521 | self.fd = fd | |
522 | ||
523 | def recv(self, *args): | |
524 | return os.read(self.fd, *args) | |
525 | ||
526 | def send(self, *args): | |
527 | return os.write(self.fd, *args) | |
528 | ||
529 | read = recv | |
530 | write = send | |
531 | ||
532 | def close(self): | |
533 | os.close(self.fd) | |
534 | ||
535 | def fileno(self): | |
536 | return self.fd | |
537 | ||
538 | class file_dispatcher(dispatcher): | |
539 | ||
540 | def __init__(self, fd, map=None): | |
541 | dispatcher.__init__(self, None, map) | |
542 | self.connected = True | |
543 | self.set_file(fd) | |
544 | # set it to non-blocking mode | |
545 | flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0) | |
546 | flags = flags | os.O_NONBLOCK | |
547 | fcntl.fcntl(fd, fcntl.F_SETFL, flags) | |
548 | ||
549 | def set_file(self, fd): | |
550 | self._fileno = fd | |
551 | self.socket = file_wrapper(fd) | |
552 | self.add_channel() |