Package qubx :: Module task
[hide private]
[frames] | no frames]

Source Code for Module qubx.task

  1  """Thread management and multiprocessing. 
  2   
  3  Copyright 2008-2012 Research Foundation State University of New York  
  4  This file is part of QUB Express.                                           
  5   
  6  QUB Express is free software; you can redistribute it and/or modify           
  7  it under the terms of the GNU General Public License as published by  
  8  the Free Software Foundation, either version 3 of the License, or     
  9  (at your option) any later version.                                   
 10   
 11  QUB Express is distributed in the hope that it will be useful,                
 12  but WITHOUT ANY WARRANTY; without even the implied warranty of        
 13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         
 14  GNU General Public License for more details.                          
 15   
 16  You should have received a copy of the GNU General Public License,    
 17  named LICENSE.txt, in the QUB Express program directory.  If not, see         
 18  <http://www.gnu.org/licenses/>.                                       
 19   
 20  """ 
 21   
 22  from __future__ import with_statement 
 23   
 24  import ctypes 
 25  import gobject 
 26  import multiprocessing 
 27  import threading 
 28  from math import * 
 29  import Queue 
 30  import collections 
 31  import linecache 
 32  import time 
 33  import traceback 
 34  import sys 
 35  import weakref 
 36  import qubx.util_types 
 37  import qubx.pyenv 
 38   
39 -class TaskManager(object):
40 """Maintains a list of running tasks; can interrupt one or all; use global variable C{Tasks}." 41 42 @ivar OnAdd: L{qubx.util_types.WeakEvent}(TaskManager, Task) 43 @ivar OnRemove: L{qubx.util_types.WeakEvent}(TaskManager, Task) 44 @ivar interruptible: list of running, interruptible Tasks 45 @ivar busy: list of running, non-interruptible Tasks 46 @ivar deferred: list of busy tasks to be interrupted when possible 47 """
48 - def __init__(self):
49 self.lock = threading.Lock() 50 self.interruptible = [] 51 self.busy = [] 52 self.deferred = [] 53 self.OnAdd = qubx.util_types.WeakEvent() # (TaskManager, Task) 54 self.OnRemove = qubx.util_types.WeakEvent() # (TaskManager, Task) 55 self.__ref = qubx.util_types.Reffer()
56 - def add_task(self, task):
57 """Add task to the list of running, interruptible tasks.""" 58 with self.lock: 59 self.interruptible.append(task) 60 task.OnTerminate += self.__ref(self.__onTerminate) 61 gobject.idle_add(self.OnAdd, self, task)
62 - def remove_task(self, task):
63 """Remove task from the running list. Task must not be busy (in main_hold).""" 64 with self.lock: 65 self.interruptible.remove(task) 66 try: 67 task.OnTerminate -= self.__ref(self.__onTerminate) 68 except: 69 pass 70 gobject.idle_add(self.OnRemove, self, task)
71 - def __onTerminate(self, task):
72 self.remove_task(task)
73 - def set_busy(self, task):
74 with self.lock: 75 if task in self.interruptible: 76 self.interruptible.remove(task) 77 self.busy.append(task.tid)
78 - def unset_busy(self, task):
79 with self.lock: 80 if task.tid in self.busy: 81 self.busy.remove(task.tid) 82 self.interruptible.append(task) 83 if task.tid in self.deferred: 84 self.deferred.remove(task.tid) 85 gobject.idle_add(self.interrupt, task)
86 - def interrupt(self, task=None):
87 """Raises KeyboardInterrupt in one Task, or in all if tid=None.""" 88 if task is None: 89 for task in self.interruptible: 90 task.interrupt() 91 else: 92 busy = False 93 with self.lock: 94 if task.tid in self.busy: 95 busy = True 96 if not (task.tid in self.deferred): 97 self.deferred.append(task.tid) 98 if not busy: 99 throw_in(task.tid, KeyboardInterrupt)
100 #while 1 > throw_in(task.tid, KeyboardInterrupt): 101 # time.sleep(0.1) 102
103 -def throw_in(tid, exc):
104 """Raises Exception exc in thread tid. Actually it is raised sometime in the next "check interval" instructions.""" 105 if exc is None: 106 exc_ptr = ctypes.c_void_p() 107 else: 108 exc_ptr = ctypes.py_object(exc) 109 return ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, exc_ptr)
110 111 Tasks = TaskManager() 112 113
114 -class GTK_Main_Hold(object):
115 """Context manager, for use in a Task thread, which pauses the main thread while it is entered. 116 117 >>> with task.main_hold: 118 ... read_something_volatile() 119 120 Used in combination with L{throw_in}, the exception is sometimes at inconveniently times in __enter__, __call__, or __exit__. 121 This is a possible source of freeze-ups. 122 123 In Linux, the Task thread can work with the GUI while the main thread is paused, but this technique 124 is not compatible with the Windows thread model. Not sure about Mac OS X. 125 """
126 - def __init__(self, task):
127 self.__task = weakref.ref(task) 128 self.in_call = threading.Event() 129 self.done_hold = threading.Event()
130 task = property(lambda self: self.__task())
131 - def __enter__(self):
132 try: 133 Tasks.set_busy(self.task) 134 for i in xrange(2*sys.getcheckinterval()): pass 135 except: 136 Tasks.unset_busy(self.task) 137 raise 138 139 self.in_call.clear() 140 gobject.idle_add(self) 141 self.in_call.wait()
142 - def __call__(self):
143 self.done_hold.clear() 144 self.in_call.set() 145 #qubx.pyenv.env.stdout.write('a\n') 146 self.done_hold.wait()
147 #qubx.pyenv.env.stdout.write('b\n')
148 - def __exit__(self, type, value, tback):
149 try: 150 self.done_hold.set() 151 Tasks.unset_busy(self.task) 152 except: 153 self.__exit__(type, value, tback)
154
155 -class Robot_Hold(object):
156 """Context manager, for use in a Task thread, which pauses a Robot (or Robots) thread while it is entered. 157 158 >>> with robot.hold_from_task(task): 159 ... read_something_volatile() 160 161 Used in combination with L{throw_in}, the exception is sometimes at inconveniently times in __enter__, __call__, or __exit__. 162 This is a possible source of freeze-ups. 163 164 In Linux, the Task thread can work with the GUI while the main thread is paused, but this technique 165 is not compatible with the Windows thread model. Not sure about Mac OS X. 166 """
167 - def __init__(self, robot, task):
168 self.__robot = weakref.ref(robot) 169 self.__task = weakref.ref(task) 170 self.in_call = threading.Event() 171 self.done_hold = threading.Event()
172 task = property(lambda self: self.__task()) 173 robot = property(lambda self: self.__robot())
174 - def __enter__(self):
175 try: 176 Tasks.set_busy(self.task) 177 for i in xrange(2*sys.getcheckinterval()): pass 178 except: 179 Tasks.unset_busy(self.task) 180 raise 181 182 self.in_call.clear() 183 self.robot.do(self) 184 self.in_call.wait()
185 - def __call__(self):
186 self.done_hold.clear() 187 self.in_call.set() 188 self.done_hold.wait()
189 - def __exit__(self, type, value, tback):
190 try: 191 self.done_hold.set() 192 Tasks.unset_busy(self.task) 193 except: 194 self.__exit__(type, value, tback)
195 196
197 -def idle_wait(fn, *args, **kw):
198 """Returns the result of fn(*args, **kw) called in the gtk main thread; calling thread pauses until fn completes.""" 199 event = threading.Event() 200 res_exc = [None, None] 201 def idle_f(): 202 try: 203 res_exc[0] = fn(*args, **kw) 204 except: 205 res_exc[1] = sys.exc_info() 206 event.set()
207 gobject.idle_add(idle_f) 208 event.wait() 209 if res_exc[1]: 210 raise res_exc[1][0], res_exc[1][1], res_exc[1][2] 211 return res_exc[0] 212 213
214 -class Task(object):
215 """ 216 A worker thread; not started until .start(). 217 218 From inside the thread, trigger the callback On* by calling send_*(). 219 The callback will then fire from the main thread. 220 221 @ivar label: descriptive string 222 @ivar progress: percent complete, between 0.0 and 100.0 223 @ivar status: message string 224 @ivar tid: thread id 225 @ivar main_hold: L{GTK_Main_Hold} to pause the main thread 226 @ivar OnResult: L{qubx.util_types.WeakEvent}(Task, result) 227 @ivar OnException: L{qubx.util_types.WeakEvent}(Task, type, value, traceback) 228 @ivar OnProgress: L{qubx.util_types.WeakEvent}(Task, progress) 229 @ivar OnStatus: L{qubx.util_types.WeakEvent}(Task, status) 230 @ivar OnTerminate: L{qubx.util_types.WeakEvent}(Task) 231 @ivar OnInterrupt: L{qubx.util_types.WeakEvent}(Task, cancel()) if you want to do more/less than KeyboardInterrupt 232 """
233 - def __init__(self, label):
234 self.label = label 235 self._progress = 0.0 236 self._prog_floor = 0.0 237 self._status = '' 238 self._thread = None 239 self.tid = None 240 self.main_hold = GTK_Main_Hold(self) 241 self.OnResult = qubx.util_types.WeakEvent() 242 self.OnException = qubx.util_types.WeakEvent() # (Task, type, value, traceback) 243 self.OnProgress = qubx.util_types.WeakEvent() 244 self.OnStatus = qubx.util_types.WeakEvent() 245 self.OnTerminate = qubx.util_types.WeakEvent() # called by framework no matter what 246 self.OnInterrupt = qubx.util_types.WeakEvent()
247 progress = property(lambda self: self._progress, lambda self, x: self.set_progress(x)) 248 status = property(lambda self: self._status, lambda self, x: self.set_status(x))
249 - def set_progress(self, x):
250 """Updates progress and calls OnProgress in the main thread.""" 251 self._progress = x 252 pf = floor(x) 253 if (pf < self._prog_floor) or (pf >= (self._prog_floor + 1)): 254 self._prog_floor = pf 255 self.send_progress(pf)
256 - def set_status(self, x):
257 """Updates status and calls OnStatus in the main thread.""" 258 self._status = x 259 self.send_status(x)
260 - def send_result(self, x):
261 """Calls OnResult in the main thread.""" 262 gobject.idle_add(self.OnResult, self, x)
263 - def send_exception(self):
264 """Calls OnException in the main thread.""" 265 gobject.idle_add(self.OnException, self, *sys.exc_info())
266 - def send_progress(self, x):
267 """Calls OnProgress in the main thread.""" 268 gobject.idle_add(self.OnProgress, self, x)
269 - def send_status(self, x):
270 """Calls OnStatus in the main thread.""" 271 gobject.idle_add(self.OnStatus, self, x)
272 - def start(self):
273 """Starts the Task thread.""" 274 if self._thread: 275 raise Exception('%s already started' % self.label) 276 self._thread = threading.Thread(target=self.thread_main) 277 self._thread.daemon = True 278 self._stop_flag = False 279 self._thread.start()
280 - def interrupt(self):
281 """Raises KeyboardInterrupt in the Task thread.""" 282 throw_it = [True] 283 def cancel(): 284 throw_it[0] = False
285 if self.tid: 286 self.OnInterrupt(self, cancel) 287 if throw_it[0]: 288 Tasks.interrupt(self)
289 - def join(self):
290 """Waits for the Task thread to finish.""" 291 if self._thread: 292 self._thread.join()
293 - def thread_main(self):
294 """Calls run, then makes sure OnTerminate is called in the main thread.""" 295 try: 296 self.tid = threading._get_ident() 297 self.status = 'Running...' 298 self.run() 299 self.status = 'Done.' 300 except KeyboardInterrupt: 301 self.status = 'Interrupted.' 302 except Exception, e: 303 self.send_exception() 304 self.status = 'Error.' 305 gobject.idle_add(self.OnTerminate, self)
306 - def run(self):
307 """Override this method to make the Task thread do something.""" 308 pass
309 - def gui_call_recv(self, fn, *args, **kw):
310 """Calls fn(*args, **kw) in the gtk main thread, waits for self(*results) to be called, and returns results (a tuple). Intended for long-running requests that return via callback: 311 312 >>> data = task.gui_call_recv(call_with_data, task)""" 313 # supply task object as the receiving callback 314 Tasks.set_busy(self) 315 self.gui_event = threading.Event() 316 self.gui_res = None 317 self.gui_exc = None 318 def idle_f(): 319 try: 320 fn(*args, **kw) 321 except: 322 traceback.print_exc() 323 self.gui_exc = sys.exc_info() 324 self.gui_event.set()
325 gobject.idle_add(idle_f) 326 self.gui_event.wait() 327 if self.gui_exc: 328 Tasks.unset_busy(self) 329 raise self.gui_exc[0], self.gui_exc[1], self.gui_exc[2] 330 res = self.gui_res 331 del self.gui_res 332 del self.gui_exc 333 del self.gui_event 334 Tasks.unset_busy(self) 335 return res
336 - def __call__(self, *args):
337 self.gui_res = args 338 self.gui_event.set()
339 - def idle_wait(self, fn, *args, **kw):
340 """Returns the result of fn(*args, **kw) called in the gtk main thread; calling thread pauses until fn completes.""" 341 Tasks.set_busy(self) 342 try: 343 return idle_wait(fn, *args, **kw) 344 finally: 345 Tasks.unset_busy(self)
346 347 348 #class Homunculus(object): 349 # def __init__(self): 350 # self.stopping = False 351 # self.__todo = Queue.Queue() 352 # self.thread = threading.Thread(target=self.__thread_func) 353 # self.thread.setDaemon(True) 354 # self.thread.start() 355 # def stop(self): 356 # self.stopping = True 357 # self.do(self.stop) # wake up queue 358 # def do(self, *argv, **kw): 359 # self.__todo.put((argv, kw)) 360 # def __thread_func(self): 361 # while not self.stopping: 362 # try: 363 # argv, kw = self.__todo.get(True, 1.0) 364 # argv[0](*argv[1:], **kw) 365 # except Queue.Empty: 366 # pass 367 # except: 368 # traceback.print_exc() 369 370
371 -class Robot(Task):
372 """A worker thread which services a queue of callables."""
373 - def __init__(self, label, on_start_item=lambda:None, on_finish_item=lambda:None):
374 Task.__init__(self, label) 375 self.__on_start_item = qubx.util_types.WeakCall("%s.on_start_item"%self.__class__) 376 self.__on_start_item.assign(on_start_item) 377 self.__on_finish_item = qubx.util_types.WeakCall("%s.on_finish_item"%self.__class__) 378 self.__on_finish_item.assign(on_finish_item) 379 self.stopping = False 380 self.__todo = Queue.Queue() 381 self.__acting = False 382 self.start()
383 #self.settrace(True, open("qubx-trace-%s.txt" % label, "w")) 384 on_start_item = property(lambda self: self.__on_start_item, lambda self, x: self.__on_start_item.assign(x)) 385 on_finish_item = property(lambda self: self.__on_finish_item, lambda self, x: self.__on_finish_item.assign(x)) 386 acting = property(lambda self: self.__acting)
387 - def interrupt(self):
388 """Raises KeyboardInterrupt in the Task thread, only if it's running something.""" 389 if self.__acting: 390 Task.interrupt(self)
391 - def stop(self):
392 """Causes the Task thread to stop servicing and terminate after finishing the current item, if any.""" 393 self.stopping = True 394 self.do(lambda: None) # wake up queue
395 - def do(self, *argv, **kw):
396 """ 397 Enqueues argv[0](*argv[1:], **kw). For example: 398 399 >>> myRobot.do(sys.stdout.write, 'Hello world') 400 401 causes this to happen in the Task thread: 402 403 >>> sys.stdout.write('Hello world') 404 405 """ 406 self.__todo.put((argv, kw))
407 - def sync(self):
408 """Waits for any previous L{do} commands to finish before returning.""" 409 qubx.pyenv.call_async_wait(lambda recv: self.do(qubx.pyenv.env.call_later, recv))
410 - def run(self):
411 while not self.stopping: 412 self.run_one()
413 - def run_one(self):
414 """A function's most recent traceback is kept alive until the function returns (!) 415 When this was part of run(), objects were becoming immortal.""" 416 started_item = False 417 try: 418 argv, kw = self.__todo.get(True, 1.0) 419 self.__acting = True 420 try: 421 started_item = True 422 self.on_start_item() 423 except: 424 traceback.print_exc() 425 argv[0](*argv[1:], **kw) 426 except Queue.Empty: 427 pass 428 except KeyboardInterrupt: 429 pass 430 except Exception, exc: 431 self.send_exception() 432 self.__acting = False 433 if started_item: 434 try: 435 self.on_finish_item() 436 except: 437 traceback.print_exc()
438 - def settrace(self, trace=True, logfile=None):
439 self.do(self.robot_settrace, trace, logfile)
440 - def robot_settrace(self, trace, logfile):
441 if trace: 442 self.trace_log = logfile 443 sys.settrace(self.trace_func) 444 else: 445 self.trace_log = None 446 sys.settrace(None)
447 - def trace_func(self, frame, event, arg):
448 if event == 'line': 449 self.trace_lineno = frame.f_lineno 450 try: 451 filename = frame.f_globals["__file__"] 452 if (filename.endswith(".pyc") or 453 filename.endswith(".pyo")): 454 filename = filename[:-1] 455 self.trace_filename = filename 456 self.trace_name = frame.f_globals["__name__"] 457 self.trace_line = linecache.getline(filename, self.trace_lineno).rstrip() 458 if self.trace_log: 459 self.trace_log.write("%s:%s: %s\n" % (self.trace_name, self.trace_lineno, self.trace_line)) 460 self.trace_log.flush() 461 except: 462 self.trace_log.write("<>:%s\n" % self.trace_lineno) 463 self.trace_log.flush() 464 return self.trace_func
465 - def hold_from_task(self, task):
466 """Returns a L{Robot_Hold} context manager.""" 467 return Robot_Hold(self, task)
468 469 Homunculus = Robot 470 471
472 -class PoolTask(Task):
473 """A worker thread which controls a multiprocessing.Pool of worker processes. 474 475 @ivar pool: a multiprocessing.Pool 476 """
477 - def __init__(self, label):
478 Task.__init__(self, label)
479 - def run(self):
480 self.pool = multiprocessing.Pool() 481 try: 482 self.run_with_pool(self.pool) 483 finally: 484 self.status = 'Cleaning up processes...' 485 self.pool.close() 486 self.pool.join()
487 - def reset_pool(self):
488 """Terminates and restarts the worker processes. Discards any work already in the pool.""" 489 self.pool.terminate() 490 self.pool = multiprocessing.Pool()
491 - def run_with_pool(self, pool):
492 """Override this method to make the worker thread do something.""" 493 pass
494 495
496 -class Robots(PoolTask):
497 """A worker thread with a pool of worker processes, servicing a queue of callables."""
498 - def __init__(self, label, on_start_item=lambda:None, on_finish_item=lambda:None):
499 PoolTask.__init__(self, label) 500 self.stopping = False 501 self.__todo = Queue.Queue() 502 self.__on_start_item = qubx.util_types.WeakCall("%s.on_start_item"%self.__class__) 503 self.__on_start_item.assign(on_start_item) 504 self.__on_finish_item = qubx.util_types.WeakCall("%s.on_finish_item"%self.__class__) 505 self.__on_finish_item.assign(on_finish_item) 506 self.__acting = False 507 self.start()
508 on_start_item = property(lambda self: self.__on_start_item, lambda self, x: self.__on_start_item.assign(x)) 509 on_finish_item = property(lambda self: self.__on_finish_item, lambda self, x: self.__on_finish_item.assign(x))
510 - def interrupt(self):
511 if self.__acting: 512 PoolTask.interrupt(self)
513 - def stop(self):
514 self.stopping = True 515 self.do(lambda: None) # wake up queue
516 - def sync(self):
517 """Waits for any previous L{do} commands to finish before returning.""" 518 qubx.pyenv.call_async_wait(lambda recv: self.do(qubx.pyenv.env.call_later, recv))
519 - def do(self, *argv, **kw):
520 self.__todo.put((argv, kw))
521 - def run_with_pool(self, pool):
522 self.pool = pool 523 try: 524 while not self.stopping: 525 self.run_one_with_pool() 526 finally: 527 self.pool.terminate()
528 - def run_one_with_pool(self):
529 # as separate method so exceptions don't get stuck in frame 530 started_item = False 531 try: 532 argv, kw = self.__todo.get(True, 1.0) 533 self.__acting = True 534 try: 535 started_item = True 536 self.on_start_item() 537 except: 538 traceback.print_exc() 539 argv[0](*argv[1:], **kw) 540 except Queue.Empty: 541 pass 542 except KeyboardInterrupt: 543 pass 544 except Exception, exc: 545 self.send_exception() 546 self.__acting = False 547 if started_item: 548 try: 549 self.on_finish_item() 550 except: 551 traceback.print_exc()
552 - def settrace(self, trace=True, logfile=None):
553 self.do(self.robot_settrace, trace, logfile)
554 - def robot_settrace(self, trace, logfile):
555 if trace: 556 self.trace_log = logfile 557 sys.settrace(self.trace_func) 558 else: 559 self.trace_log = None 560 sys.settrace(None)
561 - def trace_func(self, frame, event, arg):
562 if event == 'line': 563 self.trace_lineno = frame.f_lineno 564 filename = frame.f_globals["__file__"] 565 if (filename.endswith(".pyc") or 566 filename.endswith(".pyo")): 567 filename = filename[:-1] 568 self.trace_filename = filename 569 self.trace_name = frame.f_globals["__name__"] 570 self.trace_line = linecache.getline(filename, self.trace_lineno).rstrip() 571 if self.trace_log: 572 self.trace_log.write("%s:%s: %s\n" % (self.trace_name, self.trace_lineno, self.trace_line)) 573 self.trace_log.flush() 574 return self.trace_func
575 - def hold_from_task(self, task):
576 """Returns a L{Robot_Hold} context manager.""" 577 return Robot_Hold(self, task)
578 579 Homunculi = Robots 580