1 """Thread management and multiprocessing.
2
3 Copyright 2008-2012 Research Foundation State University of New York
4 This file is part of QUB Express.
5
6 QUB Express is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 QUB Express is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License,
17 named LICENSE.txt, in the QUB Express program directory. If not, see
18 <http://www.gnu.org/licenses/>.
19
20 """
21
22 from __future__ import with_statement
23
24 import ctypes
25 import gobject
26 import multiprocessing
27 import threading
28 from math import *
29 import Queue
30 import collections
31 import linecache
32 import time
33 import traceback
34 import sys
35 import weakref
36 import qubx.util_types
37 import qubx.pyenv
38
40 """Maintains a list of running tasks; can interrupt one or all; use global variable C{Tasks}."
41
42 @ivar OnAdd: L{qubx.util_types.WeakEvent}(TaskManager, Task)
43 @ivar OnRemove: L{qubx.util_types.WeakEvent}(TaskManager, Task)
44 @ivar interruptible: list of running, interruptible Tasks
45 @ivar busy: list of running, non-interruptible Tasks
46 @ivar deferred: list of busy tasks to be interrupted when possible
47 """
57 """Add task to the list of running, interruptible tasks."""
58 with self.lock:
59 self.interruptible.append(task)
60 task.OnTerminate += self.__ref(self.__onTerminate)
61 gobject.idle_add(self.OnAdd, self, task)
63 """Remove task from the running list. Task must not be busy (in main_hold)."""
64 with self.lock:
65 self.interruptible.remove(task)
66 try:
67 task.OnTerminate -= self.__ref(self.__onTerminate)
68 except:
69 pass
70 gobject.idle_add(self.OnRemove, self, task)
87 """Raises KeyboardInterrupt in one Task, or in all if tid=None."""
88 if task is None:
89 for task in self.interruptible:
90 task.interrupt()
91 else:
92 busy = False
93 with self.lock:
94 if task.tid in self.busy:
95 busy = True
96 if not (task.tid in self.deferred):
97 self.deferred.append(task.tid)
98 if not busy:
99 throw_in(task.tid, KeyboardInterrupt)
100
101
102
104 """Raises Exception exc in thread tid. Actually it is raised sometime in the next "check interval" instructions."""
105 if exc is None:
106 exc_ptr = ctypes.c_void_p()
107 else:
108 exc_ptr = ctypes.py_object(exc)
109 return ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, exc_ptr)
110
111 Tasks = TaskManager()
112
113
114 -class GTK_Main_Hold(object):
115 """Context manager, for use in a Task thread, which pauses the main thread while it is entered.
116
117 >>> with task.main_hold:
118 ... read_something_volatile()
119
120 Used in combination with L{throw_in}, the exception is sometimes at inconveniently times in __enter__, __call__, or __exit__.
121 This is a possible source of freeze-ups.
122
123 In Linux, the Task thread can work with the GUI while the main thread is paused, but this technique
124 is not compatible with the Windows thread model. Not sure about Mac OS X.
125 """
126 - def __init__(self, task):
127 self.__task = weakref.ref(task)
128 self.in_call = threading.Event()
129 self.done_hold = threading.Event()
130 task = property(lambda self: self.__task())
131 - def __enter__(self):
132 try:
133 Tasks.set_busy(self.task)
134 for i in xrange(2*sys.getcheckinterval()): pass
135 except:
136 Tasks.unset_busy(self.task)
137 raise
138
139 self.in_call.clear()
140 gobject.idle_add(self)
141 self.in_call.wait()
142 - def __call__(self):
143 self.done_hold.clear()
144 self.in_call.set()
145
146 self.done_hold.wait()
147
148 - def __exit__(self, type, value, tback):
149 try:
150 self.done_hold.set()
151 Tasks.unset_busy(self.task)
152 except:
153 self.__exit__(type, value, tback)
154
156 """Context manager, for use in a Task thread, which pauses a Robot (or Robots) thread while it is entered.
157
158 >>> with robot.hold_from_task(task):
159 ... read_something_volatile()
160
161 Used in combination with L{throw_in}, the exception is sometimes at inconveniently times in __enter__, __call__, or __exit__.
162 This is a possible source of freeze-ups.
163
164 In Linux, the Task thread can work with the GUI while the main thread is paused, but this technique
165 is not compatible with the Windows thread model. Not sure about Mac OS X.
166 """
168 self.__robot = weakref.ref(robot)
169 self.__task = weakref.ref(task)
170 self.in_call = threading.Event()
171 self.done_hold = threading.Event()
172 task = property(lambda self: self.__task())
173 robot = property(lambda self: self.__robot())
186 self.done_hold.clear()
187 self.in_call.set()
188 self.done_hold.wait()
189 - def __exit__(self, type, value, tback):
195
196
198 """Returns the result of fn(*args, **kw) called in the gtk main thread; calling thread pauses until fn completes."""
199 event = threading.Event()
200 res_exc = [None, None]
201 def idle_f():
202 try:
203 res_exc[0] = fn(*args, **kw)
204 except:
205 res_exc[1] = sys.exc_info()
206 event.set()
207 gobject.idle_add(idle_f)
208 event.wait()
209 if res_exc[1]:
210 raise res_exc[1][0], res_exc[1][1], res_exc[1][2]
211 return res_exc[0]
212
213
215 """
216 A worker thread; not started until .start().
217
218 From inside the thread, trigger the callback On* by calling send_*().
219 The callback will then fire from the main thread.
220
221 @ivar label: descriptive string
222 @ivar progress: percent complete, between 0.0 and 100.0
223 @ivar status: message string
224 @ivar tid: thread id
225 @ivar main_hold: L{GTK_Main_Hold} to pause the main thread
226 @ivar OnResult: L{qubx.util_types.WeakEvent}(Task, result)
227 @ivar OnException: L{qubx.util_types.WeakEvent}(Task, type, value, traceback)
228 @ivar OnProgress: L{qubx.util_types.WeakEvent}(Task, progress)
229 @ivar OnStatus: L{qubx.util_types.WeakEvent}(Task, status)
230 @ivar OnTerminate: L{qubx.util_types.WeakEvent}(Task)
231 @ivar OnInterrupt: L{qubx.util_types.WeakEvent}(Task, cancel()) if you want to do more/less than KeyboardInterrupt
232 """
247 progress = property(lambda self: self._progress, lambda self, x: self.set_progress(x))
248 status = property(lambda self: self._status, lambda self, x: self.set_status(x))
250 """Updates progress and calls OnProgress in the main thread."""
251 self._progress = x
252 pf = floor(x)
253 if (pf < self._prog_floor) or (pf >= (self._prog_floor + 1)):
254 self._prog_floor = pf
255 self.send_progress(pf)
257 """Updates status and calls OnStatus in the main thread."""
258 self._status = x
259 self.send_status(x)
261 """Calls OnResult in the main thread."""
262 gobject.idle_add(self.OnResult, self, x)
264 """Calls OnException in the main thread."""
265 gobject.idle_add(self.OnException, self, *sys.exc_info())
267 """Calls OnProgress in the main thread."""
268 gobject.idle_add(self.OnProgress, self, x)
270 """Calls OnStatus in the main thread."""
271 gobject.idle_add(self.OnStatus, self, x)
273 """Starts the Task thread."""
274 if self._thread:
275 raise Exception('%s already started' % self.label)
276 self._thread = threading.Thread(target=self.thread_main)
277 self._thread.daemon = True
278 self._stop_flag = False
279 self._thread.start()
281 """Raises KeyboardInterrupt in the Task thread."""
282 throw_it = [True]
283 def cancel():
284 throw_it[0] = False
285 if self.tid:
286 self.OnInterrupt(self, cancel)
287 if throw_it[0]:
288 Tasks.interrupt(self)
290 """Waits for the Task thread to finish."""
291 if self._thread:
292 self._thread.join()
293 - def thread_main(self):
294 """Calls run, then makes sure OnTerminate is called in the main thread."""
295 try:
296 self.tid = threading._get_ident()
297 self.status = 'Running...'
298 self.run()
299 self.status = 'Done.'
300 except KeyboardInterrupt:
301 self.status = 'Interrupted.'
302 except Exception, e:
303 self.send_exception()
304 self.status = 'Error.'
305 gobject.idle_add(self.OnTerminate, self)
307 """Override this method to make the Task thread do something."""
308 pass
310 """Calls fn(*args, **kw) in the gtk main thread, waits for self(*results) to be called, and returns results (a tuple). Intended for long-running requests that return via callback:
311
312 >>> data = task.gui_call_recv(call_with_data, task)"""
313
314 Tasks.set_busy(self)
315 self.gui_event = threading.Event()
316 self.gui_res = None
317 self.gui_exc = None
318 def idle_f():
319 try:
320 fn(*args, **kw)
321 except:
322 traceback.print_exc()
323 self.gui_exc = sys.exc_info()
324 self.gui_event.set()
325 gobject.idle_add(idle_f)
326 self.gui_event.wait()
327 if self.gui_exc:
328 Tasks.unset_busy(self)
329 raise self.gui_exc[0], self.gui_exc[1], self.gui_exc[2]
330 res = self.gui_res
331 del self.gui_res
332 del self.gui_exc
333 del self.gui_event
334 Tasks.unset_busy(self)
335 return res
337 self.gui_res = args
338 self.gui_event.set()
340 """Returns the result of fn(*args, **kw) called in the gtk main thread; calling thread pauses until fn completes."""
341 Tasks.set_busy(self)
342 try:
343 return idle_wait(fn, *args, **kw)
344 finally:
345 Tasks.unset_busy(self)
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
372 """A worker thread which services a queue of callables."""
383
384 on_start_item = property(lambda self: self.__on_start_item, lambda self, x: self.__on_start_item.assign(x))
385 on_finish_item = property(lambda self: self.__on_finish_item, lambda self, x: self.__on_finish_item.assign(x))
386 acting = property(lambda self: self.__acting)
388 """Raises KeyboardInterrupt in the Task thread, only if it's running something."""
389 if self.__acting:
390 Task.interrupt(self)
392 """Causes the Task thread to stop servicing and terminate after finishing the current item, if any."""
393 self.stopping = True
394 self.do(lambda: None)
395 - def do(self, *argv, **kw):
396 """
397 Enqueues argv[0](*argv[1:], **kw). For example:
398
399 >>> myRobot.do(sys.stdout.write, 'Hello world')
400
401 causes this to happen in the Task thread:
402
403 >>> sys.stdout.write('Hello world')
404
405 """
406 self.__todo.put((argv, kw))
408 """Waits for any previous L{do} commands to finish before returning."""
409 qubx.pyenv.call_async_wait(lambda recv: self.do(qubx.pyenv.env.call_later, recv))
411 while not self.stopping:
412 self.run_one()
414 """A function's most recent traceback is kept alive until the function returns (!)
415 When this was part of run(), objects were becoming immortal."""
416 started_item = False
417 try:
418 argv, kw = self.__todo.get(True, 1.0)
419 self.__acting = True
420 try:
421 started_item = True
422 self.on_start_item()
423 except:
424 traceback.print_exc()
425 argv[0](*argv[1:], **kw)
426 except Queue.Empty:
427 pass
428 except KeyboardInterrupt:
429 pass
430 except Exception, exc:
431 self.send_exception()
432 self.__acting = False
433 if started_item:
434 try:
435 self.on_finish_item()
436 except:
437 traceback.print_exc()
438 - def settrace(self, trace=True, logfile=None):
441 if trace:
442 self.trace_log = logfile
443 sys.settrace(self.trace_func)
444 else:
445 self.trace_log = None
446 sys.settrace(None)
448 if event == 'line':
449 self.trace_lineno = frame.f_lineno
450 try:
451 filename = frame.f_globals["__file__"]
452 if (filename.endswith(".pyc") or
453 filename.endswith(".pyo")):
454 filename = filename[:-1]
455 self.trace_filename = filename
456 self.trace_name = frame.f_globals["__name__"]
457 self.trace_line = linecache.getline(filename, self.trace_lineno).rstrip()
458 if self.trace_log:
459 self.trace_log.write("%s:%s: %s\n" % (self.trace_name, self.trace_lineno, self.trace_line))
460 self.trace_log.flush()
461 except:
462 self.trace_log.write("<>:%s\n" % self.trace_lineno)
463 self.trace_log.flush()
464 return self.trace_func
466 """Returns a L{Robot_Hold} context manager."""
467 return Robot_Hold(self, task)
468
469 Homunculus = Robot
470
471
473 """A worker thread which controls a multiprocessing.Pool of worker processes.
474
475 @ivar pool: a multiprocessing.Pool
476 """
480 self.pool = multiprocessing.Pool()
481 try:
482 self.run_with_pool(self.pool)
483 finally:
484 self.status = 'Cleaning up processes...'
485 self.pool.close()
486 self.pool.join()
488 """Terminates and restarts the worker processes. Discards any work already in the pool."""
489 self.pool.terminate()
490 self.pool = multiprocessing.Pool()
492 """Override this method to make the worker thread do something."""
493 pass
494
495
497 """A worker thread with a pool of worker processes, servicing a queue of callables."""
508 on_start_item = property(lambda self: self.__on_start_item, lambda self, x: self.__on_start_item.assign(x))
509 on_finish_item = property(lambda self: self.__on_finish_item, lambda self, x: self.__on_finish_item.assign(x))
514 self.stopping = True
515 self.do(lambda: None)
517 """Waits for any previous L{do} commands to finish before returning."""
518 qubx.pyenv.call_async_wait(lambda recv: self.do(qubx.pyenv.env.call_later, recv))
519 - def do(self, *argv, **kw):
520 self.__todo.put((argv, kw))
522 self.pool = pool
523 try:
524 while not self.stopping:
525 self.run_one_with_pool()
526 finally:
527 self.pool.terminate()
529
530 started_item = False
531 try:
532 argv, kw = self.__todo.get(True, 1.0)
533 self.__acting = True
534 try:
535 started_item = True
536 self.on_start_item()
537 except:
538 traceback.print_exc()
539 argv[0](*argv[1:], **kw)
540 except Queue.Empty:
541 pass
542 except KeyboardInterrupt:
543 pass
544 except Exception, exc:
545 self.send_exception()
546 self.__acting = False
547 if started_item:
548 try:
549 self.on_finish_item()
550 except:
551 traceback.print_exc()
552 - def settrace(self, trace=True, logfile=None):
555 if trace:
556 self.trace_log = logfile
557 sys.settrace(self.trace_func)
558 else:
559 self.trace_log = None
560 sys.settrace(None)
562 if event == 'line':
563 self.trace_lineno = frame.f_lineno
564 filename = frame.f_globals["__file__"]
565 if (filename.endswith(".pyc") or
566 filename.endswith(".pyo")):
567 filename = filename[:-1]
568 self.trace_filename = filename
569 self.trace_name = frame.f_globals["__name__"]
570 self.trace_line = linecache.getline(filename, self.trace_lineno).rstrip()
571 if self.trace_log:
572 self.trace_log.write("%s:%s: %s\n" % (self.trace_name, self.trace_lineno, self.trace_line))
573 self.trace_log.flush()
574 return self.trace_func
576 """Returns a L{Robot_Hold} context manager."""
577 return Robot_Hold(self, task)
578
579 Homunculi = Robots
580