Initial commit of OpenSPARC T2 design and verification files.
[OpenSPARC-T2-DV] / tools / src / nas,5.n2.os.2 / lib / python / lib / python2.4 / idlelib / rpc.py
CommitLineData
86530b38
AT
1"""RPC Implemention, originally written for the Python Idle IDE
2
3For security reasons, GvR requested that Idle's Python execution server process
4connect to the Idle process, which listens for the connection. Since Idle has
5has only one client per server, this was not a limitation.
6
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
21
22The RPCServer handler class is expected to provide register/unregister methods.
23RPCHandler inherits the mix-in class SocketIO, which provides these methods.
24
25See the Idle run.main() docstring for further information on how this was
26accomplished in Idle.
27
28"""
29
30import sys
31import os
32import socket
33import select
34import SocketServer
35import struct
36import cPickle as pickle
37import threading
38import Queue
39import traceback
40import copy_reg
41import types
42import marshal
43
44
45def unpickle_code(ms):
46 co = marshal.loads(ms)
47 assert isinstance(co, types.CodeType)
48 return co
49
50def pickle_code(co):
51 assert isinstance(co, types.CodeType)
52 ms = marshal.dumps(co)
53 return unpickle_code, (ms,)
54
55# XXX KBK 24Aug02 function pickling capability not used in Idle
56# def unpickle_function(ms):
57# return ms
58
59# def pickle_function(fn):
60# assert isinstance(fn, type.FunctionType)
61# return repr(fn)
62
63copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
64# copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
65
66BUFSIZE = 8*1024
67LOCALHOST = '127.0.0.1'
68
69class RPCServer(SocketServer.TCPServer):
70
71 def __init__(self, addr, handlerclass=None):
72 if handlerclass is None:
73 handlerclass = RPCHandler
74 SocketServer.TCPServer.__init__(self, addr, handlerclass)
75
76 def server_bind(self):
77 "Override TCPServer method, no bind() phase for connecting entity"
78 pass
79
80 def server_activate(self):
81 """Override TCPServer method, connect() instead of listen()
82
83 Due to the reversed connection, self.server_address is actually the
84 address of the Idle Client to which we are connecting.
85
86 """
87 self.socket.connect(self.server_address)
88
89 def get_request(self):
90 "Override TCPServer method, return already connected socket"
91 return self.socket, self.server_address
92
93 def handle_error(self, request, client_address):
94 """Override TCPServer method
95
96 Error message goes to __stderr__. No error message if exiting
97 normally or socket raised EOF. Other exceptions not handled in
98 server code will cause os._exit.
99
100 """
101 try:
102 raise
103 except SystemExit:
104 raise
105 except:
106 erf = sys.__stderr__
107 print>>erf, '\n' + '-'*40
108 print>>erf, 'Unhandled server exception!'
109 print>>erf, 'Thread: %s' % threading.currentThread().getName()
110 print>>erf, 'Client Address: ', client_address
111 print>>erf, 'Request: ', repr(request)
112 traceback.print_exc(file=erf)
113 print>>erf, '\n*** Unrecoverable, server exiting!'
114 print>>erf, '-'*40
115 os._exit(0)
116
117#----------------- end class RPCServer --------------------
118
119objecttable = {}
120request_queue = Queue.Queue(0)
121response_queue = Queue.Queue(0)
122
123
124class SocketIO:
125
126 nextseq = 0
127
128 def __init__(self, sock, objtable=None, debugging=None):
129 self.sockthread = threading.currentThread()
130 if debugging is not None:
131 self.debugging = debugging
132 self.sock = sock
133 if objtable is None:
134 objtable = objecttable
135 self.objtable = objtable
136 self.responses = {}
137 self.cvars = {}
138
139 def close(self):
140 sock = self.sock
141 self.sock = None
142 if sock is not None:
143 sock.close()
144
145 def exithook(self):
146 "override for specific exit action"
147 os._exit()
148
149 def debug(self, *args):
150 if not self.debugging:
151 return
152 s = self.location + " " + str(threading.currentThread().getName())
153 for a in args:
154 s = s + " " + str(a)
155 print>>sys.__stderr__, s
156
157 def register(self, oid, object):
158 self.objtable[oid] = object
159
160 def unregister(self, oid):
161 try:
162 del self.objtable[oid]
163 except KeyError:
164 pass
165
166 def localcall(self, seq, request):
167 self.debug("localcall:", request)
168 try:
169 how, (oid, methodname, args, kwargs) = request
170 except TypeError:
171 return ("ERROR", "Bad request format")
172 if not self.objtable.has_key(oid):
173 return ("ERROR", "Unknown object id: %r" % (oid,))
174 obj = self.objtable[oid]
175 if methodname == "__methods__":
176 methods = {}
177 _getmethods(obj, methods)
178 return ("OK", methods)
179 if methodname == "__attributes__":
180 attributes = {}
181 _getattributes(obj, attributes)
182 return ("OK", attributes)
183 if not hasattr(obj, methodname):
184 return ("ERROR", "Unsupported method name: %r" % (methodname,))
185 method = getattr(obj, methodname)
186 try:
187 if how == 'CALL':
188 ret = method(*args, **kwargs)
189 if isinstance(ret, RemoteObject):
190 ret = remoteref(ret)
191 return ("OK", ret)
192 elif how == 'QUEUE':
193 request_queue.put((seq, (method, args, kwargs)))
194 return("QUEUED", None)
195 else:
196 return ("ERROR", "Unsupported message type: %s" % how)
197 except SystemExit:
198 raise
199 except socket.error:
200 raise
201 except:
202 self.debug("localcall:EXCEPTION")
203 traceback.print_exc(file=sys.__stderr__)
204 return ("EXCEPTION", None)
205
206 def remotecall(self, oid, methodname, args, kwargs):
207 self.debug("remotecall:asynccall: ", oid, methodname)
208 seq = self.asynccall(oid, methodname, args, kwargs)
209 return self.asyncreturn(seq)
210
211 def remotequeue(self, oid, methodname, args, kwargs):
212 self.debug("remotequeue:asyncqueue: ", oid, methodname)
213 seq = self.asyncqueue(oid, methodname, args, kwargs)
214 return self.asyncreturn(seq)
215
216 def asynccall(self, oid, methodname, args, kwargs):
217 request = ("CALL", (oid, methodname, args, kwargs))
218 seq = self.newseq()
219 if threading.currentThread() != self.sockthread:
220 cvar = threading.Condition()
221 self.cvars[seq] = cvar
222 self.debug(("asynccall:%d:" % seq), oid, methodname, args, kwargs)
223 self.putmessage((seq, request))
224 return seq
225
226 def asyncqueue(self, oid, methodname, args, kwargs):
227 request = ("QUEUE", (oid, methodname, args, kwargs))
228 seq = self.newseq()
229 if threading.currentThread() != self.sockthread:
230 cvar = threading.Condition()
231 self.cvars[seq] = cvar
232 self.debug(("asyncqueue:%d:" % seq), oid, methodname, args, kwargs)
233 self.putmessage((seq, request))
234 return seq
235
236 def asyncreturn(self, seq):
237 self.debug("asyncreturn:%d:call getresponse(): " % seq)
238 response = self.getresponse(seq, wait=0.05)
239 self.debug(("asyncreturn:%d:response: " % seq), response)
240 return self.decoderesponse(response)
241
242 def decoderesponse(self, response):
243 how, what = response
244 if how == "OK":
245 return what
246 if how == "QUEUED":
247 return None
248 if how == "EXCEPTION":
249 self.debug("decoderesponse: EXCEPTION")
250 return None
251 if how == "EOF":
252 self.debug("decoderesponse: EOF")
253 self.decode_interrupthook()
254 return None
255 if how == "ERROR":
256 self.debug("decoderesponse: Internal ERROR:", what)
257 raise RuntimeError, what
258 raise SystemError, (how, what)
259
260 def decode_interrupthook(self):
261 ""
262 raise EOFError
263
264 def mainloop(self):
265 """Listen on socket until I/O not ready or EOF
266
267 pollresponse() will loop looking for seq number None, which
268 never comes, and exit on EOFError.
269
270 """
271 try:
272 self.getresponse(myseq=None, wait=0.05)
273 except EOFError:
274 self.debug("mainloop:return")
275 return
276
277 def getresponse(self, myseq, wait):
278 response = self._getresponse(myseq, wait)
279 if response is not None:
280 how, what = response
281 if how == "OK":
282 response = how, self._proxify(what)
283 return response
284
285 def _proxify(self, obj):
286 if isinstance(obj, RemoteProxy):
287 return RPCProxy(self, obj.oid)
288 if isinstance(obj, types.ListType):
289 return map(self._proxify, obj)
290 # XXX Check for other types -- not currently needed
291 return obj
292
293 def _getresponse(self, myseq, wait):
294 self.debug("_getresponse:myseq:", myseq)
295 if threading.currentThread() is self.sockthread:
296 # this thread does all reading of requests or responses
297 while 1:
298 response = self.pollresponse(myseq, wait)
299 if response is not None:
300 return response
301 else:
302 # wait for notification from socket handling thread
303 cvar = self.cvars[myseq]
304 cvar.acquire()
305 while not self.responses.has_key(myseq):
306 cvar.wait()
307 response = self.responses[myseq]
308 self.debug("_getresponse:%s: thread woke up: response: %s" %
309 (myseq, response))
310 del self.responses[myseq]
311 del self.cvars[myseq]
312 cvar.release()
313 return response
314
315 def newseq(self):
316 self.nextseq = seq = self.nextseq + 2
317 return seq
318
319 def putmessage(self, message):
320 self.debug("putmessage:%d:" % message[0])
321 try:
322 s = pickle.dumps(message)
323 except pickle.PicklingError:
324 print >>sys.__stderr__, "Cannot pickle:", repr(message)
325 raise
326 s = struct.pack("<i", len(s)) + s
327 while len(s) > 0:
328 try:
329 r, w, x = select.select([], [self.sock], [])
330 n = self.sock.send(s[:BUFSIZE])
331 except (AttributeError, socket.error):
332 # socket was closed
333 raise IOError
334 else:
335 s = s[n:]
336
337 buffer = ""
338 bufneed = 4
339 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
340
341 def pollpacket(self, wait):
342 self._stage0()
343 if len(self.buffer) < self.bufneed:
344 r, w, x = select.select([self.sock.fileno()], [], [], wait)
345 if len(r) == 0:
346 return None
347 try:
348 s = self.sock.recv(BUFSIZE)
349 except socket.error:
350 raise EOFError
351 if len(s) == 0:
352 raise EOFError
353 self.buffer += s
354 self._stage0()
355 return self._stage1()
356
357 def _stage0(self):
358 if self.bufstate == 0 and len(self.buffer) >= 4:
359 s = self.buffer[:4]
360 self.buffer = self.buffer[4:]
361 self.bufneed = struct.unpack("<i", s)[0]
362 self.bufstate = 1
363
364 def _stage1(self):
365 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
366 packet = self.buffer[:self.bufneed]
367 self.buffer = self.buffer[self.bufneed:]
368 self.bufneed = 4
369 self.bufstate = 0
370 return packet
371
372 def pollmessage(self, wait):
373 packet = self.pollpacket(wait)
374 if packet is None:
375 return None
376 try:
377 message = pickle.loads(packet)
378 except pickle.UnpicklingError:
379 print >>sys.__stderr__, "-----------------------"
380 print >>sys.__stderr__, "cannot unpickle packet:", repr(packet)
381 traceback.print_stack(file=sys.__stderr__)
382 print >>sys.__stderr__, "-----------------------"
383 raise
384 return message
385
386 def pollresponse(self, myseq, wait):
387 """Handle messages received on the socket.
388
389 Some messages received may be asynchronous 'call' or 'queue' requests,
390 and some may be responses for other threads.
391
392 'call' requests are passed to self.localcall() with the expectation of
393 immediate execution, during which time the socket is not serviced.
394
395 'queue' requests are used for tasks (which may block or hang) to be
396 processed in a different thread. These requests are fed into
397 request_queue by self.localcall(). Responses to queued requests are
398 taken from response_queue and sent across the link with the associated
399 sequence numbers. Messages in the queues are (sequence_number,
400 request/response) tuples and code using this module removing messages
401 from the request_queue is responsible for returning the correct
402 sequence number in the response_queue.
403
404 pollresponse() will loop until a response message with the myseq
405 sequence number is received, and will save other responses in
406 self.responses and notify the owning thread.
407
408 """
409 while 1:
410 # send queued response if there is one available
411 try:
412 qmsg = response_queue.get(0)
413 except Queue.Empty:
414 pass
415 else:
416 seq, response = qmsg
417 message = (seq, ('OK', response))
418 self.putmessage(message)
419 # poll for message on link
420 try:
421 message = self.pollmessage(wait)
422 if message is None: # socket not ready
423 return None
424 except EOFError:
425 self.handle_EOF()
426 return None
427 except AttributeError:
428 return None
429 seq, resq = message
430 how = resq[0]
431 self.debug("pollresponse:%d:myseq:%s" % (seq, myseq))
432 # process or queue a request
433 if how in ("CALL", "QUEUE"):
434 self.debug("pollresponse:%d:localcall:call:" % seq)
435 response = self.localcall(seq, resq)
436 self.debug("pollresponse:%d:localcall:response:%s"
437 % (seq, response))
438 if how == "CALL":
439 self.putmessage((seq, response))
440 elif how == "QUEUE":
441 # don't acknowledge the 'queue' request!
442 pass
443 continue
444 # return if completed message transaction
445 elif seq == myseq:
446 return resq
447 # must be a response for a different thread:
448 else:
449 cv = self.cvars.get(seq, None)
450 # response involving unknown sequence number is discarded,
451 # probably intended for prior incarnation of server
452 if cv is not None:
453 cv.acquire()
454 self.responses[seq] = resq
455 cv.notify()
456 cv.release()
457 continue
458
459 def handle_EOF(self):
460 "action taken upon link being closed by peer"
461 self.EOFhook()
462 self.debug("handle_EOF")
463 for key in self.cvars:
464 cv = self.cvars[key]
465 cv.acquire()
466 self.responses[key] = ('EOF', None)
467 cv.notify()
468 cv.release()
469 # call our (possibly overridden) exit function
470 self.exithook()
471
472 def EOFhook(self):
473 "Classes using rpc client/server can override to augment EOF action"
474 pass
475
476#----------------- end class SocketIO --------------------
477
478class RemoteObject:
479 # Token mix-in class
480 pass
481
482def remoteref(obj):
483 oid = id(obj)
484 objecttable[oid] = obj
485 return RemoteProxy(oid)
486
487class RemoteProxy:
488
489 def __init__(self, oid):
490 self.oid = oid
491
492class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
493
494 debugging = False
495 location = "#S" # Server
496
497 def __init__(self, sock, addr, svr):
498 svr.current_handler = self ## cgt xxx
499 SocketIO.__init__(self, sock)
500 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
501
502 def handle(self):
503 "handle() method required by SocketServer"
504 self.mainloop()
505
506 def get_remote_proxy(self, oid):
507 return RPCProxy(self, oid)
508
509class RPCClient(SocketIO):
510
511 debugging = False
512 location = "#C" # Client
513
514 nextseq = 1 # Requests coming from the client are odd numbered
515
516 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
517 self.listening_sock = socket.socket(family, type)
518 self.listening_sock.setsockopt(socket.SOL_SOCKET,
519 socket.SO_REUSEADDR, 1)
520 self.listening_sock.bind(address)
521 self.listening_sock.listen(1)
522
523 def accept(self):
524 working_sock, address = self.listening_sock.accept()
525 if self.debugging:
526 print>>sys.__stderr__, "****** Connection request from ", address
527 if address[0] == LOCALHOST:
528 SocketIO.__init__(self, working_sock)
529 else:
530 print>>sys.__stderr__, "** Invalid host: ", address
531 raise socket.error
532
533 def get_remote_proxy(self, oid):
534 return RPCProxy(self, oid)
535
536class RPCProxy:
537
538 __methods = None
539 __attributes = None
540
541 def __init__(self, sockio, oid):
542 self.sockio = sockio
543 self.oid = oid
544
545 def __getattr__(self, name):
546 if self.__methods is None:
547 self.__getmethods()
548 if self.__methods.get(name):
549 return MethodProxy(self.sockio, self.oid, name)
550 if self.__attributes is None:
551 self.__getattributes()
552 if not self.__attributes.has_key(name):
553 raise AttributeError, name
554
555 def __getattributes(self):
556 self.__attributes = self.sockio.remotecall(self.oid,
557 "__attributes__", (), {})
558
559 def __getmethods(self):
560 self.__methods = self.sockio.remotecall(self.oid,
561 "__methods__", (), {})
562
563def _getmethods(obj, methods):
564 # Helper to get a list of methods from an object
565 # Adds names to dictionary argument 'methods'
566 for name in dir(obj):
567 attr = getattr(obj, name)
568 if callable(attr):
569 methods[name] = 1
570 if type(obj) == types.InstanceType:
571 _getmethods(obj.__class__, methods)
572 if type(obj) == types.ClassType:
573 for super in obj.__bases__:
574 _getmethods(super, methods)
575
576def _getattributes(obj, attributes):
577 for name in dir(obj):
578 attr = getattr(obj, name)
579 if not callable(attr):
580 attributes[name] = 1
581
582class MethodProxy:
583
584 def __init__(self, sockio, oid, name):
585 self.sockio = sockio
586 self.oid = oid
587 self.name = name
588
589 def __call__(self, *args, **kwargs):
590 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
591 return value
592
593
594# XXX KBK 09Sep03 We need a proper unit test for this module. Previously
595# existing test code was removed at Rev 1.27.