1 """Gives outside processes access to the Python interpreter.
2
3 Chris Nicolai 2010-2012
4
5 A dedicated thread listens on a TCP port (by default, on the loopback interface '127.0.0.1' only).
6 Client and server alternately send messages, which consist of a verb, e.g. 'EVAL', and depending
7 on the verb, sdata (string data) such as '2**5', or bytes (binary array data).
8
9 Usage (server):
10
11 >>> server = RemoteServer(host='127.0.0.1', port=55384, seek_ports=10, sockid_path=None)
12
13 Listens on the first available port in 55384..(55384+10). Writes the actual port number
14 to a text file at sockid_path, if provided. Enqueues requests for processing, but as most
15 applications require everything to run on the same thread, it requires you to call
16
17 >>> server.process_one()
18
19 at frequent intervals, such as on a timer. To stop the serve print script.getvalue()
20 print script.getvalue()[:-1]
21 r:
22
23 >>> server.stop()
24
25
26 Usage (client):
27
28 >>> client = RemoteProxy(host='127.0.0.1', port=55384, sockid_path=None)
29
30 Connects to the server. Uses the port number in sockid_path, if it exists.
31
32 >>> import numpy
33 >>> arr = numpy.array([1,2,3,4,5,6,7,8,9,10]).reshape((2,5))
34 >>> print arr
35 [[ 1 2 3 4 5]
36 [ 6 7 8 9 10]]
37 >>> client.set_array(arr, 'arr')
38 >>> client.reval("arr[1,2]")
39 '8'
40 >>> client.rexec(\"""import numpy
41 arr2 = numpy.array(arr, copy=True)
42 arr2 *= 2
43 arr2 -= 1\""")
44 >>> local_arr2 = client.get_array('arr2')
45 >>> print local_arr2
46 [[ 1 3 5 7 9]
47 [11 13 15 17 19]]
48 >>> client.stop()
49
50
51 Protocol:
52
53 Strictly alternating messages, starting with the client. Certain messages are always multi-line, and terminated with a period on its own line. This transcript has all the possible interactions (>: client; <: server)
54
55 >: VERSION<tab>1.0
56 <: VERSION<tab>1.0
57 >: EXEC
58 >: import numpy
59 >: table = newTable()
60 >: .
61 <: DONE
62 >: EXEC
63 >: i fnot (table is None):
64 >: table.setWindowLabel("tutu")
65 <: ERROR
66 <: Traceback (most recent call last):
67 <: [...]
68 <: File "<string>", line 1:
69 <: i fnot (table is None):
70 <: ^
71 <: Syntax error: invalid syntax
72 <: .
73 >: EVAL<tab>t
74 <: VAL
75 <: '<qti.Table object at 0x04cc2687>'
76 <: .
77 >: EVAL<tab>nonsense
78 <: ERROR
79 <: Traceback (most recent call last):
80 <: [...more traceback...]
81 <: .
82 >: ARRAY<tab>3<tab>4<tab>float32<tab>remote_label
83 >: [3*4*sizeof(float32) bytes of array data]
84 >: .
85 <: SET
86 >: EVAL<tab>remote_label[1,2]
87 <: VAL
88 <: 34.6
89 <: .
90 >: GET ARRAY<tab>remote_label
91 <: ARRAY<tab>3<tab>4<tab>float32<tab>remote_label
92 <: [3*4*sizeof(float32) bytes of array data]
93 <: .
94
95 Copyright 2008-2011 Research Foundation State University of New York
96 This file is part of QUB Express.
97
98 QUB Express is free software; you can redistribute it and/or modify
99 it under the terms of the GNU General Public License as published by
100 the Free Software Foundation, either version 3 of the License, or
101 (at your option) any later version.
102
103 QUB Express is distributed in the hope that it will be useful,
104 but WITHOUT ANY WARRANTY; without even the implied warranty of
105 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
106 GNU General Public License for more details.
107
108 You should have received a copy of the GNU General Public License,
109 named LICENSE.txt, in the QUB Express program directory. If not, see
110 <http://www.gnu.org/licenses/>.
111
112 """
113
114 import __main__
115 import cStringIO
116 import numpy
117 import Queue
118 import re
119 import socket
120 import select
121 import struct
122 import sys
123 from threading import *
124 import time
125 import traceback
126 import qubx.sock
127
128
129 DEFAULT_HOST = '127.0.0.1'
130 DEFAULT_PORT = 55384
131
132
134 """Represents one message with associated data.
135
136 @ivar verb: one of ['VERSION', 'EVAL', 'VAL', 'EXEC', 'DONE', 'ERROR', 'ARRAY', 'STRING', 'SET', 'GET ARRAY']
137 @ivar sdata: payload for VERSION, EVAL, VAL, EXEC, ERROR, GET_ARRAY; metadata for ARRAY
138 @ivar bytes: payload for ARRAY
139 """
140 - def __init__(self, verb=None, sdata=None, bytes=None):
141 self.verb = verb
142 self.sdata = sdata
143 self.bytes = bytes
144
146 """Returns a L{RemoteMsg} containing arr and its label.
147
148 verb = 'ARRAY'
149 sdata = "%(rows)i\t%(cols)i\t%(dtype)s\t%(label)s"
150 bytes = arr.tostring()
151
152 @arr: a numpy.array, or something legal for numpy.array(arr)
153 @label: remote python identifier for the array
154 """
155 try:
156 arr.dtype
157 except AttributeError:
158 arr = numpy.array(arr)
159 if len(arr.shape) == 1:
160 return RemoteMsg('ARRAY', '%i\t%i\t%s\t%s' % (arr.shape[0], 1, str(arr.dtype), label), arr.tostring())
161 elif len(arr.shape) == 2:
162 return RemoteMsg('ARRAY', '%i\t%i\t%s\t%s' % (arr.shape[0], arr.shape[1], str(arr.dtype), label), arr.tostring())
163 else:
164 raise Exception('Unsupported array dimension: %s (need 1D or 2D)' % arr.shape)
165
167 """Returns (label, arr) contained in a L{RemoteArray} message."""
168 dims = re.match(r"([01-9]+)\t([01-9]+)\t([^\t]+)\t?(.*)", msg.sdata)
169 if dims:
170 rows, cols, dtype, label = [dims.group(i) for i in (1,2,3,4)]
171 rows, cols = int(rows), int(cols)
172 return label, numpy.frombuffer(msg.bytes, dtype, rows*cols).reshape((rows, cols))
173 else:
174 raise Exception('bad array format: %s' % msg.sdata)
175
176
178 """Returns a L{RemoteMsg} containing s and its label.
179
180 verb = 'ARRAY'
181 sdata = "%(len(s))s\t%(label)s"
182 bytes = s
183
184 @s: a string
185 @label: remote python identifier for the array
186 """
187 return RemoteMsg('STRING', '%i\t%s' % (len(s), label), s)
188
190 dims = re.match(r"([01-9]+)\t(.*)", msg.sdata)
191 if dims:
192 return dims.group(2), msg.bytes
193 else:
194 raise Exception('bad array format: %s' % msg.sdata)
195
196
198 """Reads and writes L{RemoteMsg}s."""
204 return msg.verb is None
206 """Sends a L{RemoteMsg}. Does not catch any potential socket.error s."""
207 if msg.verb in ('EXEC', 'VAL', 'ERROR', 'SET', 'DONE'):
208 self.sendBlock('%s\n' % msg.verb)
209 else:
210 self.sendBlock('%s\t%s\n' % (msg.verb, msg.sdata))
211 if msg.verb in ('EXEC', 'VAL', 'ERROR'):
212 self.sendBlock('%s\n.\n' % msg.sdata)
213 elif (msg.verb == 'STRING') or msg.bytes:
214 self.sendBlock('%s\n.\n' % msg.bytes)
216 """Don't call this directly; use L{Socket.recvMsg}."""
217 while True:
218 try:
219 line = self.readLine()
220 fields = re.match(r"([^\t]+)\t?(.*)", line)
221 if not fields: continue
222 verb = fields.group(1).strip()
223 sdata = fields.group(2).replace('\r', '')
224 msg = RemoteMsg(verb, sdata)
225 if verb in ('EXEC', 'VAL', 'ERROR'):
226 script = cStringIO.StringIO()
227 while True:
228 line = self.readLine().replace('\r', '')
229 if line.strip() == '.':
230 break
231 script.write(line)
232 msg = RemoteMsg(verb, script.getvalue()[:-1])
233 elif verb == 'STRING':
234 dims = re.match(r"([0-9]+)\t(.*)", sdata)
235 if dims:
236 n = int(dims.group(1))
237 bytes = self.readBlock(n) if n else ""
238 msg = RemoteMsg(verb, sdata, bytes)
239 while self.readLine().strip() != '.':
240 pass
241 elif verb == 'ARRAY':
242 dims = re.match(r"([0-9]+)\t([0-9]+)\t([^\t]+)\t?(.*)", sdata)
243 if dims:
244 rows, cols, dtype, label = [dims.group(i) for i in (1,2,3,4)]
245 rows, cols = int(rows), int(cols)
246 bytes = self.readBlock(rows*cols*numpy.dtype(dtype).itemsize)
247 msg = RemoteMsg(verb, sdata, bytes)
248 while self.readLine().strip() != '.':
249 pass
250 return msg
251 except socket.error:
252 return RemoteMsg()
253 except KeyboardInterrupt:
254 raise
255 except:
256 return RemoteMsg('ERROR', traceback.format_exc())
257
258
260 """Handles the server side of one connection.
261 Enqueues L{RemoteMsg}s for processing on its own thread;
262 you call session.process_one() frequently on the appropriate thread.
263 """
264 - def __init__(self, sock, async=True, main_globals=None):
265 RemoteConnect.__init__(self, sock, async)
266 self.globals = __main__.__dict__ if (main_globals is None) else main_globals
268 try:
269 exec(msg.sdata, self.globals, self.globals)
270 self.sendMsg(RemoteMsg('DONE'))
271 except:
272 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc()))
287 try:
288 arr = self.globals[msg.sdata]
289 self.sendMsg(RemoteArray(arr, msg.sdata))
290 except:
291 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc()))
292 - def process_one(self, block=False, run_on_thread=lambda f: f()):
293 """Processes and answers at most one message; returns False if there are none."""
294 try:
295 msg = self.recvMsg(block=block)
296 except Queue.Empty:
297 return False
298 try:
299 if msg.verb == 'EXEC':
300 run_on_thread(lambda: self.do_exec(msg))
301 elif msg.verb == 'EVAL':
302 run_on_thread(lambda: self.do_eval(msg))
303 elif msg.verb == 'ARRAY':
304 run_on_thread(lambda: self.do_array(msg))
305 elif msg.verb == 'STRING':
306 run_on_thread(lambda: self.do_string(msg))
307 elif msg.verb == 'GET ARRAY':
308 run_on_thread(lambda: self.do_get_array(msg))
309 elif msg.verb == 'ERROR':
310 self.sendMsg(msg)
311 elif msg.verb == 'VERSION':
312
313 self.sendMsg(RemoteMsg('VERSION', '1.1'))
314 else:
315 self.sendMsg(RemoteMsg('ERROR', 'unknown verb: %s'%msg.verb))
316 except socket.error:
317 self.stop()
318 except KeyboardInterrupt:
319 raise
320 except:
321 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc()))
322 return True
323
324
326 """Manages servant sockets, wraps with RemoteSessions; you call .process_one() to get their work done.
327
328 To clean up (close remaining servants): .stop()
329
330 @ivar process_one:
331 """
332 - def __init__(self, blocking=False, run_on_thread=lambda f: f(), main_globals=None):
333 """Raises socket.error if no ports available.
334
335 """
336 self.conns = []
337 self.process_one = self.__process().next
338 self.blocking = blocking
339 self.run_on_thread = run_on_thread
340 self.main_globals = main_globals
342 sess = RemoteSession(client, main_globals=self.main_globals)
343 self.conns.append(sess)
344 return sess
346 """Generator function to iterate over active connections; yielding after one job, or a full circle."""
347 since_yield = 0
348 while True:
349 if not self.conns:
350 yield False
351 for i in reversed(xrange(len(self.conns))):
352 if (since_yield == len(self.conns)) or self.conns[i].process_one(block=self.blocking, run_on_thread=self.run_on_thread):
353 yield True
354 since_yield = 0
355 else:
356 since_yield += 1
357 if self.conns[i].done():
358 del self.conns[i]
360 """Cleans up clients."""
361 for conn in self.conns:
362 conn.stop()
363
364
366 """Listens on the first available port in port..(port+seek_ports). Writes the actual port number
367 to a text file at sockid_path, if provided. One L{RemoteSession} per client enqueues requests for processing,
368 but as most applications require everything to run on the same thread, you must call
369
370 >>> server.process_one()
371
372 at frequent intervals, such as on a timer. To stop the server:
373
374 >>> server.stop()
375
376 @ivar process_one:
377 """
378 - def __init__(self, host='127.0.0.1', port=55384, seek_ports=10, sockid_path=None,
379 blocking=False, run_on_thread=lambda f: f(), main_globals=None):
380 """Raises socket.error if no ports available.
381
382 """
383 self.runner = RemoteRunner(blocking, run_on_thread, main_globals)
384 qubx.sock.TCPServer.__init__(self, host, port,
385 lambda client, addr: self.runner.manage(client),
386 seek_ports)
387 if sockid_path:
388 try:
389 open(sockid_path, 'w').write("%i\n"%self.port)
390 except:
391 traceback.print_exc()
395 """Stops the listening thread and closes the socket."""
396 qubx.sock.TCPServer.stop(self)
397 self.runner.stop()
398 print 'Remote control stopped.'
399
400
401
403 """Contains the traceback from an exception on the remote side."""
405 Exception.__init__(self, 'Remote error: %s' % traceback)
406
407
413 - def reval(self, expr, block=True, timeout=None):
418 - def rexec(self, buf, block=True, timeout=None):
419 """Executes the Python script in buf on the remote side. Can raise RemoteError."""
420 self.conn.sendMsg(RemoteMsg('EXEC', buf))
421 self.__check_error(block, timeout)
422 - def set_array(self, arr, label, block=True, timeout=None):
423 """Sends a numpy.array to the remote side, and assigns it to the remote identifier label."""
424 self.conn.sendMsg(RemoteArray(arr, label))
425 self.__check_error(block, timeout)
426 - def set_string(self, s, label, block=True, timeout=None):
427 """Sends a string to the remote side, and assigns it to the remote identifier label."""
428 self.conn.sendMsg(RemoteString(s, label))
429 self.__check_error(block, timeout)
430 - def get_array(self, label, block=True, timeout=None):
431 """Loads a numpy.array from the remote side, with the remote name label. Can raise RemoteError."""
432 self.conn.sendMsg(RemoteMsg('GET ARRAY', label))
433 result = self.__check_error(block, timeout)
434 return RemoteToArray(result)[1]
441 """Closes the connection to L{RemoteServer}."""
442 self.conn.stop()
443
444 -def RemoteProxy(host='127.0.0.1', port=55384, sockid_path=None):
445 """Connects to the server. Uses the port number in sockid_path, if it exists."""
446 rport = port
447 if sockid_path:
448 try:
449 rport = int( open(sockid_path, 'r').readline().strip() )
450 except:
451 traceback.print_exc()
452 print 'No/bad sockid file (%s); falling back on port %i' % (sockid_path, rport)
453 return RemoteController(qubx.sock.connect_as_client(host, rport))
454
455
457 """Listens on the first available port in port..(port+seek_ports). Writes the actual port number
458 to a text file at sockid_path, if provided. Wraps clients with RemoteController, given via callback on_connect
459
460 @ivar process_one:
461
462 TODO: default host='' is not compatible with ipv6
463 """
464 - def __init__(self, on_connect=lambda controller: None,
465 host='', port=25254, seek_ports=10, sockid_path=None,
466 blocking=False, run_on_thread=lambda f: f(), main_globals=None):
467 """Raises socket.error if no ports available.
468
469 """
470 qubx.sock.TCPServer.__init__(self, host, port, self.__connected, seek_ports)
471 self.__on_connect = on_connect
472 self.blocking = blocking
473 self.run_on_thread = run_on_thread
474 if sockid_path:
475 try:
476 open(sockid_path, 'w').write("%i\n"%self.port)
477 except:
478 traceback.print_exc()
483 """Stops the listening thread and closes the socket."""
484 qubx.sock.TCPServer.stop(self)
485 print 'Remote command stopped.'
486
487
488 if __name__ == "__main__":
489 import os
490 import tempfile
491
492 SERVER_TIMER_SEC = 0.2
493 SOCKID_NAME = 'remote_ctl.sockid'
494
495
496 sockid_path = os.path.join(tempfile.gettempdir(), SOCKID_NAME)
497 if len(sys.argv) > 1:
498 server = RemoteServer(port=int(sys.argv[1]), seek_ports=0, sockid_path=sockid_path)
499 else:
500 server = RemoteServer(port=DEFAULT_PORT, seek_ports=10, sockid_path=sockid_path)
501 try:
502 while True:
503 server.process_one()
504 time.sleep(SERVER_TIMER_SEC)
505 except KeyboardInterrupt:
506 pass
507 except:
508 traceback.print_exc()
509
510 server.stop()
511