Package qubx_testing :: Module calcserv
[hide private]
[frames] | no frames]

Source Code for Module qubx_testing.calcserv

 1  import traceback 
 2  import qubx.faces 
 3  import qubx.pyenv 
 4  import qubx.task 
 5  from qubx.GTK import * 
 6  from qubx.remote_ctl import * 
 7   
 8  PORT = 25254 
 9   
10 -class CalcTask(qubx.task.Task):
11 - def __init__(self, host, port):
12 qubx.task.Task.__init__(self, 'CalcServ') 13 self.runner = RemoteRunner(blocking=True, main_globals=qubx.pyenv.env.globals) 14 self.host = host 15 self.port = port
16 - def run(self):
17 self.client = None 18 try: 19 self.client = self.runner.manage(qubx.sock.connect_as_client(self.host, self.port)) 20 while True: 21 self.runner.process_one() 22 except KeyboardInterrupt: 23 pass 24 except: 25 traceback.print_exc() 26 if self.client: 27 self.client.stop() 28 qubx.task.Tasks.remove_task(self)
29 30
31 -class CalcServ(qubx.faces.Face):
32 - def __init__(self, name, global_name):
33 qubx.faces.Face.__init__(self, name, global_name) 34 h = pack_item(gtk.HBox(), self) 35 self.txtHost = pack_item(NumEntry("localhost"), h, expand=True) 36 self.btnConnect = pack_button('Connect', h, self.__onClickConnect)
37 - def __onClickConnect(self, btn):
38 task = CalcTask(self.txtHost.value, PORT) 39 qubx.task.Tasks.add_task(task) 40 task.start()
41 42
43 -class CalcMaster(object):
44 - def __init__(self):
45 self.sessions = [] 46 self.server = RemoteCommandServer(self.__onConnect)
47 - def __onConnect(self, controller):
48 self.sessions.append(controller)
49 - def reval(self, expr):
50 dead = [] 51 for controller in self.sessions: 52 try: 53 print controller.reval(expr) 54 except: 55 traceback.print_exc() 56 controller.stop() 57 dead.append(controller) 58 for controller in dead: 59 self.sessions.remove(controller)
60