Package qubx :: Module remote_ctl
[hide private]
[frames] | no frames]

Source Code for Module qubx.remote_ctl

  1  """Gives outside processes access to the Python interpreter. 
  2   
  3  Chris Nicolai 2010-2012 
  4   
  5  A dedicated thread listens on a TCP port (by default, on the loopback interface '127.0.0.1' only). 
  6  Client and server alternately send messages, which consist of a verb, e.g. 'EVAL', and depending 
  7  on the verb, sdata (string data) such as '2**5', or bytes (binary array data). 
  8   
  9  Usage (server): 
 10   
 11      >>> server = RemoteServer(host='127.0.0.1', port=55384, seek_ports=10, sockid_path=None) 
 12   
 13  Listens on the first available port in 55384..(55384+10).  Writes the actual port number 
 14  to a text file at sockid_path, if provided.  Enqueues requests for processing, but as most 
 15  applications require everything to run on the same thread, it requires you to call 
 16   
 17      >>> server.process_one() 
 18   
 19  at frequent intervals, such as on a timer.  To stop the serve                    print script.getvalue() 
 20                      print script.getvalue()[:-1] 
 21  r: 
 22   
 23      >>> server.stop() 
 24   
 25   
 26  Usage (client): 
 27   
 28      >>> client = RemoteProxy(host='127.0.0.1', port=55384, sockid_path=None) 
 29   
 30  Connects to the server.  Uses the port number in sockid_path, if it exists. 
 31   
 32      >>> import numpy 
 33      >>> arr = numpy.array([1,2,3,4,5,6,7,8,9,10]).reshape((2,5)) 
 34      >>> print arr 
 35      [[ 1  2  3  4  5] 
 36       [ 6  7  8  9 10]] 
 37      >>> client.set_array(arr, 'arr') 
 38      >>> client.reval("arr[1,2]") 
 39      '8' 
 40      >>> client.rexec(\"""import numpy 
 41      arr2 = numpy.array(arr, copy=True) 
 42      arr2 *= 2 
 43      arr2 -= 1\""") 
 44      >>> local_arr2 = client.get_array('arr2') 
 45      >>> print local_arr2 
 46      [[ 1  3  5  7  9] 
 47       [11 13 15 17 19]] 
 48      >>> client.stop() 
 49   
 50   
 51  Protocol: 
 52   
 53  Strictly alternating messages, starting with the client.  Certain messages are always multi-line, and terminated with a period on its own line.  This transcript has all the possible interactions (>: client; <: server) 
 54   
 55      >: VERSION<tab>1.0 
 56      <: VERSION<tab>1.0 
 57      >: EXEC 
 58      >: import numpy 
 59      >: table = newTable() 
 60      >: . 
 61      <: DONE 
 62      >: EXEC 
 63      >: i fnot (table is None): 
 64      >:     table.setWindowLabel("tutu") 
 65      <: ERROR 
 66      <: Traceback (most recent call last): 
 67      <: [...] 
 68      <:   File "<string>", line 1: 
 69      <:     i fnot (table is None): 
 70      <:          ^ 
 71      <: Syntax error: invalid syntax 
 72      <: . 
 73      >: EVAL<tab>t 
 74      <: VAL 
 75      <: '<qti.Table object at 0x04cc2687>' 
 76      <: . 
 77      >: EVAL<tab>nonsense 
 78      <: ERROR 
 79      <: Traceback (most recent call last): 
 80      <: [...more traceback...] 
 81      <: . 
 82      >: ARRAY<tab>3<tab>4<tab>float32<tab>remote_label 
 83      >: [3*4*sizeof(float32) bytes of array data] 
 84      >: . 
 85      <: SET 
 86      >: EVAL<tab>remote_label[1,2] 
 87      <: VAL 
 88      <: 34.6 
 89      <: . 
 90      >: GET ARRAY<tab>remote_label 
 91      <: ARRAY<tab>3<tab>4<tab>float32<tab>remote_label 
 92      <: [3*4*sizeof(float32) bytes of array data] 
 93      <: . 
 94   
 95  Copyright 2008-2011 Research Foundation State University of New York  
 96  This file is part of QUB Express.                                           
 97   
 98  QUB Express is free software; you can redistribute it and/or modify           
 99  it under the terms of the GNU General Public License as published by  
100  the Free Software Foundation, either version 3 of the License, or     
101  (at your option) any later version.                                   
102   
103  QUB Express is distributed in the hope that it will be useful,                
104  but WITHOUT ANY WARRANTY; without even the implied warranty of        
105  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         
106  GNU General Public License for more details.                          
107   
108  You should have received a copy of the GNU General Public License,    
109  named LICENSE.txt, in the QUB Express program directory.  If not, see         
110  <http://www.gnu.org/licenses/>.                                       
111   
112  """ 
113   
114  import __main__ 
115  import cStringIO 
116  import numpy 
117  import Queue 
118  import re 
119  import socket 
120  import select 
121  import struct 
122  import sys 
123  from threading import * 
124  import time 
125  import traceback 
126  import qubx.sock 
127   
128   
129  DEFAULT_HOST = '127.0.0.1' 
130  DEFAULT_PORT = 55384 
131   
132   
133 -class RemoteMsg(object):
134 """Represents one message with associated data. 135 136 @ivar verb: one of ['VERSION', 'EVAL', 'VAL', 'EXEC', 'DONE', 'ERROR', 'ARRAY', 'STRING', 'SET', 'GET ARRAY'] 137 @ivar sdata: payload for VERSION, EVAL, VAL, EXEC, ERROR, GET_ARRAY; metadata for ARRAY 138 @ivar bytes: payload for ARRAY 139 """
140 - def __init__(self, verb=None, sdata=None, bytes=None):
141 self.verb = verb 142 self.sdata = sdata 143 self.bytes = bytes
144
145 -def RemoteArray(arr, label):
146 """Returns a L{RemoteMsg} containing arr and its label. 147 148 verb = 'ARRAY' 149 sdata = "%(rows)i\t%(cols)i\t%(dtype)s\t%(label)s" 150 bytes = arr.tostring() 151 152 @arr: a numpy.array, or something legal for numpy.array(arr) 153 @label: remote python identifier for the array 154 """ 155 try: 156 arr.dtype 157 except AttributeError: 158 arr = numpy.array(arr) 159 if len(arr.shape) == 1: 160 return RemoteMsg('ARRAY', '%i\t%i\t%s\t%s' % (arr.shape[0], 1, str(arr.dtype), label), arr.tostring()) 161 elif len(arr.shape) == 2: 162 return RemoteMsg('ARRAY', '%i\t%i\t%s\t%s' % (arr.shape[0], arr.shape[1], str(arr.dtype), label), arr.tostring()) 163 else: 164 raise Exception('Unsupported array dimension: %s (need 1D or 2D)' % arr.shape)
165
166 -def RemoteToArray(msg):
167 """Returns (label, arr) contained in a L{RemoteArray} message.""" 168 dims = re.match(r"([01-9]+)\t([01-9]+)\t([^\t]+)\t?(.*)", msg.sdata) 169 if dims: 170 rows, cols, dtype, label = [dims.group(i) for i in (1,2,3,4)] 171 rows, cols = int(rows), int(cols) 172 return label, numpy.frombuffer(msg.bytes, dtype, rows*cols).reshape((rows, cols)) 173 else: 174 raise Exception('bad array format: %s' % msg.sdata)
175 176
177 -def RemoteString(s, label):
178 """Returns a L{RemoteMsg} containing s and its label. 179 180 verb = 'ARRAY' 181 sdata = "%(len(s))s\t%(label)s" 182 bytes = s 183 184 @s: a string 185 @label: remote python identifier for the array 186 """ 187 return RemoteMsg('STRING', '%i\t%s' % (len(s), label), s)
188
189 -def RemoteToString(msg):
190 dims = re.match(r"([01-9]+)\t(.*)", msg.sdata) 191 if dims: 192 return dims.group(2), msg.bytes 193 else: 194 raise Exception('bad array format: %s' % msg.sdata)
195 196
197 -class RemoteConnect(qubx.sock.Socket):
198 """Reads and writes L{RemoteMsg}s."""
199 - def __init__(self, sock, async=True):
200 qubx.sock.Socket.__init__(self, sock, async)
201 - def msgNonNull(self):
202 return RemoteMsg('placeholder')
203 - def msgIsNull(self, msg):
204 return msg.verb is None
205 - def sendMsg(self, msg):
206 """Sends a L{RemoteMsg}. Does not catch any potential socket.error s.""" 207 if msg.verb in ('EXEC', 'VAL', 'ERROR', 'SET', 'DONE'): 208 self.sendBlock('%s\n' % msg.verb) 209 else: 210 self.sendBlock('%s\t%s\n' % (msg.verb, msg.sdata)) 211 if msg.verb in ('EXEC', 'VAL', 'ERROR'): 212 self.sendBlock('%s\n.\n' % msg.sdata) 213 elif (msg.verb == 'STRING') or msg.bytes: 214 self.sendBlock('%s\n.\n' % msg.bytes)
215 - def readMsg(self):
216 """Don't call this directly; use L{Socket.recvMsg}.""" 217 while True: 218 try: 219 line = self.readLine() 220 fields = re.match(r"([^\t]+)\t?(.*)", line) 221 if not fields: continue # big problem? 222 verb = fields.group(1).strip() 223 sdata = fields.group(2).replace('\r', '') 224 msg = RemoteMsg(verb, sdata) 225 if verb in ('EXEC', 'VAL', 'ERROR'): 226 script = cStringIO.StringIO() 227 while True: 228 line = self.readLine().replace('\r', '') 229 if line.strip() == '.': 230 break 231 script.write(line) 232 msg = RemoteMsg(verb, script.getvalue()[:-1]) # skip final newline 233 elif verb == 'STRING': 234 dims = re.match(r"([0-9]+)\t(.*)", sdata) 235 if dims: 236 n = int(dims.group(1)) 237 bytes = self.readBlock(n) if n else "" 238 msg = RemoteMsg(verb, sdata, bytes) 239 while self.readLine().strip() != '.': 240 pass 241 elif verb == 'ARRAY': 242 dims = re.match(r"([0-9]+)\t([0-9]+)\t([^\t]+)\t?(.*)", sdata) 243 if dims: 244 rows, cols, dtype, label = [dims.group(i) for i in (1,2,3,4)] 245 rows, cols = int(rows), int(cols) 246 bytes = self.readBlock(rows*cols*numpy.dtype(dtype).itemsize) 247 msg = RemoteMsg(verb, sdata, bytes) 248 while self.readLine().strip() != '.': 249 pass 250 return msg 251 except socket.error: 252 return RemoteMsg() # null message closes socket 253 except KeyboardInterrupt: 254 raise 255 except: 256 return RemoteMsg('ERROR', traceback.format_exc())
257 258
259 -class RemoteSession(RemoteConnect):
260 """Handles the server side of one connection. 261 Enqueues L{RemoteMsg}s for processing on its own thread; 262 you call session.process_one() frequently on the appropriate thread. 263 """
264 - def __init__(self, sock, async=True, main_globals=None):
265 RemoteConnect.__init__(self, sock, async) 266 self.globals = __main__.__dict__ if (main_globals is None) else main_globals
267 - def do_exec(self, msg):
268 try: 269 exec(msg.sdata, self.globals, self.globals) 270 self.sendMsg(RemoteMsg('DONE')) 271 except: 272 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc()))
273 - def do_eval(self, msg):
274 try: 275 self.sendMsg(RemoteMsg('VAL', str(eval(msg.sdata, self.globals, self.globals)))) 276 except: 277 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc()))
278 - def do_array(self, msg):
279 lbl, arr = RemoteToArray(msg) 280 self.globals[lbl] = arr 281 self.sendMsg(RemoteMsg('SET'))
282 - def do_string(self, msg):
283 lbl, s = RemoteToString(msg) 284 self.globals[lbl] = s 285 self.sendMsg(RemoteMsg('SET'))
286 - def do_get_array(self, msg):
287 try: 288 arr = self.globals[msg.sdata] 289 self.sendMsg(RemoteArray(arr, msg.sdata)) 290 except: 291 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc()))
292 - def process_one(self, block=False, run_on_thread=lambda f: f()):
293 """Processes and answers at most one message; returns False if there are none.""" 294 try: 295 msg = self.recvMsg(block=block) 296 except Queue.Empty: 297 return False 298 try: 299 if msg.verb == 'EXEC': 300 run_on_thread(lambda: self.do_exec(msg)) 301 elif msg.verb == 'EVAL': 302 run_on_thread(lambda: self.do_eval(msg)) 303 elif msg.verb == 'ARRAY': 304 run_on_thread(lambda: self.do_array(msg)) 305 elif msg.verb == 'STRING': # v1.1 306 run_on_thread(lambda: self.do_string(msg)) 307 elif msg.verb == 'GET ARRAY': 308 run_on_thread(lambda: self.do_get_array(msg)) 309 elif msg.verb == 'ERROR': # in recv/parse 310 self.sendMsg(msg) 311 elif msg.verb == 'VERSION': 312 # TODO: modify behavior for older versions (oldest is 1.0) 313 self.sendMsg(RemoteMsg('VERSION', '1.1')) 314 else: 315 self.sendMsg(RemoteMsg('ERROR', 'unknown verb: %s'%msg.verb)) 316 except socket.error: 317 self.stop() 318 except KeyboardInterrupt: 319 raise 320 except: 321 self.sendMsg(RemoteMsg('ERROR', traceback.format_exc())) 322 return True
323 324
325 -class RemoteRunner(object):
326 """Manages servant sockets, wraps with RemoteSessions; you call .process_one() to get their work done. 327 328 To clean up (close remaining servants): .stop() 329 330 @ivar process_one: 331 """
332 - def __init__(self, blocking=False, run_on_thread=lambda f: f(), main_globals=None):
333 """Raises socket.error if no ports available. 334 335 """ 336 self.conns = [] 337 self.process_one = self.__process().next 338 self.blocking = blocking 339 self.run_on_thread = run_on_thread 340 self.main_globals = main_globals
341 - def manage(self, client):
342 sess = RemoteSession(client, main_globals=self.main_globals) 343 self.conns.append(sess) 344 return sess
345 - def __process(self):
346 """Generator function to iterate over active connections; yielding after one job, or a full circle.""" 347 since_yield = 0 348 while True: 349 if not self.conns: 350 yield False 351 for i in reversed(xrange(len(self.conns))): 352 if (since_yield == len(self.conns)) or self.conns[i].process_one(block=self.blocking, run_on_thread=self.run_on_thread): 353 yield True 354 since_yield = 0 355 else: 356 since_yield += 1 357 if self.conns[i].done(): 358 del self.conns[i]
359 - def stop(self):
360 """Cleans up clients.""" 361 for conn in self.conns: 362 conn.stop()
363 364
365 -class RemoteServer(qubx.sock.TCPServer):
366 """Listens on the first available port in port..(port+seek_ports). Writes the actual port number 367 to a text file at sockid_path, if provided. One L{RemoteSession} per client enqueues requests for processing, 368 but as most applications require everything to run on the same thread, you must call 369 370 >>> server.process_one() 371 372 at frequent intervals, such as on a timer. To stop the server: 373 374 >>> server.stop() 375 376 @ivar process_one: 377 """
378 - def __init__(self, host='127.0.0.1', port=55384, seek_ports=10, sockid_path=None, 379 blocking=False, run_on_thread=lambda f: f(), main_globals=None):
380 """Raises socket.error if no ports available. 381 382 """ 383 self.runner = RemoteRunner(blocking, run_on_thread, main_globals) 384 qubx.sock.TCPServer.__init__(self, host, port, 385 lambda client, addr: self.runner.manage(client), 386 seek_ports) 387 if sockid_path: 388 try: 389 open(sockid_path, 'w').write("%i\n"%self.port) 390 except: 391 traceback.print_exc()
392 - def process_one(self):
393 return self.runner.process_one()
394 - def stop(self):
395 """Stops the listening thread and closes the socket.""" 396 qubx.sock.TCPServer.stop(self) 397 self.runner.stop() 398 print 'Remote control stopped.'
399 400 401
402 -class RemoteError(Exception):
403 """Contains the traceback from an exception on the remote side."""
404 - def __init__(self, traceback):
405 Exception.__init__(self, 'Remote error: %s' % traceback)
406 407
408 -class RemoteController(object):
409 - def __init__(self, sock, async=True):
410 self.conn = RemoteConnect(sock, async) 411 self.conn.sendMsg(RemoteMsg('VERSION', '1.1')) 412 self.__check_error(True, 5.0)
413 - def reval(self, expr, block=True, timeout=None):
414 """Evaluates expr on the remote side, returns the result as a string. Can raise RemoteError.""" 415 self.conn.sendMsg(RemoteMsg('EVAL', expr)) 416 result = self.__check_error(block, timeout) 417 return result.sdata
418 - def rexec(self, buf, block=True, timeout=None):
419 """Executes the Python script in buf on the remote side. Can raise RemoteError.""" 420 self.conn.sendMsg(RemoteMsg('EXEC', buf)) 421 self.__check_error(block, timeout)
422 - def set_array(self, arr, label, block=True, timeout=None):
423 """Sends a numpy.array to the remote side, and assigns it to the remote identifier label.""" 424 self.conn.sendMsg(RemoteArray(arr, label)) 425 self.__check_error(block, timeout)
426 - def set_string(self, s, label, block=True, timeout=None):
427 """Sends a string to the remote side, and assigns it to the remote identifier label.""" 428 self.conn.sendMsg(RemoteString(s, label)) 429 self.__check_error(block, timeout)
430 - def get_array(self, label, block=True, timeout=None):
431 """Loads a numpy.array from the remote side, with the remote name label. Can raise RemoteError.""" 432 self.conn.sendMsg(RemoteMsg('GET ARRAY', label)) 433 result = self.__check_error(block, timeout) 434 return RemoteToArray(result)[1]
435 - def __check_error(self, block=True, timeout=None):
436 result = self.conn.recvMsg(block, timeout) 437 if result.verb == 'ERROR': 438 raise RemoteError(result.sdata) 439 return result
440 - def stop(self):
441 """Closes the connection to L{RemoteServer}.""" 442 self.conn.stop()
443
444 -def RemoteProxy(host='127.0.0.1', port=55384, sockid_path=None):
445 """Connects to the server. Uses the port number in sockid_path, if it exists.""" 446 rport = port 447 if sockid_path: 448 try: 449 rport = int( open(sockid_path, 'r').readline().strip() ) 450 except: 451 traceback.print_exc() 452 print 'No/bad sockid file (%s); falling back on port %i' % (sockid_path, rport) 453 return RemoteController(qubx.sock.connect_as_client(host, rport))
454 455
456 -class RemoteCommandServer(qubx.sock.TCPServer):
457 """Listens on the first available port in port..(port+seek_ports). Writes the actual port number 458 to a text file at sockid_path, if provided. Wraps clients with RemoteController, given via callback on_connect 459 460 @ivar process_one: 461 462 TODO: default host='' is not compatible with ipv6 463 """
464 - def __init__(self, on_connect=lambda controller: None, 465 host='', port=25254, seek_ports=10, sockid_path=None, 466 blocking=False, run_on_thread=lambda f: f(), main_globals=None):
467 """Raises socket.error if no ports available. 468 469 """ 470 qubx.sock.TCPServer.__init__(self, host, port, self.__connected, seek_ports) 471 self.__on_connect = on_connect 472 self.blocking = blocking 473 self.run_on_thread = run_on_thread 474 if sockid_path: 475 try: 476 open(sockid_path, 'w').write("%i\n"%self.port) 477 except: 478 traceback.print_exc()
479 - def __connected(self, client, addr):
480 rc = RemoteController(client) 481 self.__on_connect(rc)
482 - def stop(self):
483 """Stops the listening thread and closes the socket.""" 484 qubx.sock.TCPServer.stop(self) 485 print 'Remote command stopped.'
486 487 488 if __name__ == "__main__": 489 import os 490 import tempfile 491 492 SERVER_TIMER_SEC = 0.2 # processing max 1 message every this many seconds 493 SOCKID_NAME = 'remote_ctl.sockid' # file to hold socket number, in the temp dir 494 495 # optional first argument: port number 496 sockid_path = os.path.join(tempfile.gettempdir(), SOCKID_NAME) 497 if len(sys.argv) > 1: 498 server = RemoteServer(port=int(sys.argv[1]), seek_ports=0, sockid_path=sockid_path) 499 else: # no port provided: try DEFAULT_PORT + 10 others 500 server = RemoteServer(port=DEFAULT_PORT, seek_ports=10, sockid_path=sockid_path) 501 try: 502 while True: 503 server.process_one() 504 time.sleep(SERVER_TIMER_SEC) 505 except KeyboardInterrupt: 506 pass 507 except: 508 traceback.print_exc() 509 510 server.stop() 511