Package qubx_testing :: Module i_over_var
[hide private]
[frames] | no frames]

Source Code for Module qubx_testing.i_over_var

  1   
  2  import collections 
  3  import itertools 
  4  import gc 
  5  import os 
  6  import sys 
  7  import time 
  8  import traceback 
  9   
 10  import qubx.dataGTK 
 11  import qubx.fast.data 
 12  import qubx.global_namespace 
 13  import qubx.pyenv 
 14  import qubx.settingsGTK 
 15  from qubx.accept import * 
 16  from qubx.data_types import * 
 17  from qubx.util_types import * 
 18  from qubx.GTK import * 
 19  from qubx.settings import * 
 20  from qubx.task import * 
 21  from numpy import * 
 22  import random 
 23  import cProfile 
 24  import __main__ 
 25   
 26   
 27  IoverVar_Properties = [Property('', None, 'Measures short samples of data, plots Mean v. Std., to help discover total channel count.'), 
 28                         Property('sample_dur', 10.0, 'Duration (ms) of each sample:', accept=acceptFloatGreaterThan(0.0), format='%.3g'), 
 29                         Property('output_name', 'Mean-Std', 'Output List name:', accept=str, format=str)] 
 30   
31 -def IoverVar(wait=True, receiver=lambda: None, **kw):
32 if not kw: 33 response = qubx.settingsGTK.prompt_propertied('qubx_i_over_var', IoverVar_Properties, 34 'Plot Mean v. Std', qubx.global_namespace.QubX) 35 if response is None: 36 return 37 else: 38 return IoverVar(wait, receiver, **response) 39 if wait: 40 return qubx.pyenv.call_async_wait(lambda rcv: IoverVar(wait=False, receiver=rcv, **kw)) 41 42 # fill in any missing keywords: 43 settings = dict(SettingsMgr['qubx_i_over_var'].as_pairs()) 44 for prop in IoverVar_Properties: 45 if prop.name and not (prop.name in kw): 46 try: 47 kw[prop.name] = settings[prop.name] 48 except KeyError: 49 kw[prop.name] = prop.default 50 51 qubx.pyenv.env.OnScriptable('qubx_testing.i_over_var.IoverVar(%s)' % ', '.join('%s=%s' % (k, repr(v)) for k, v in kw.iteritems())) 52 task = IoverVar_Task(receiver, **kw) 53 task.OnException += task_exception_to_console 54 task.start()
55 56 qubx.dataGTK.Tools.register('other', 'Plot Mean v. Std...', action=lambda menuitem: IoverVar(wait=False)) 57 58 59
60 -def task_exception_to_console(task, typ, val, tb):
61 traceback.print_exception(typ, val, tb)
62
63 -class IoverVar_Task(Task):
64 - def __init__(self, receiver, **kw):
65 Task.__init__(self, 'IoverVar') 66 self.__receiver = receiver 67 self.__kw = kw 68 QubX = qubx.global_namespace.QubX 69 self.dataview = QubX.Data.view 70 self.iSignal = QubX.DataSource.signal 71 segs = QubX.DataSource.get_segmentation() 72 self.chunks = chunks = [] 73 for seg in segs: 74 for chunk in seg.chunks: 75 if chunk.included: 76 chunk_w_data = chunk.get_samples() 77 chunk_w_data.src_seg = seg.index 78 chunk_w_data.off_in_seg = chunk.f - seg.offset 79 chunks.append(chunk_w_data) 80 self.table = QubX.Data.file.lists.show_list(kw['output_name']) 81 self.table.clear()
82 - def run(self):
83 try: 84 qubx.task.Tasks.add_task(self) 85 self.run_i_over_var(**self.__kw) 86 finally: 87 gobject.idle_add(self.__receiver) 88 qubx.task.Tasks.remove_task(self)
89 # take short samples of (mean, std) into a list for plotting, discovering channel count 90 # resample by a delta that yields desired number of levels 91 # list center-x coords of each level 92 # pick K at random from each level 93 # measure +/-dt about each pick, find (mean,std) 94 # param: sample_delta, sample_dur, samples_per_level (K)
95 - def run_i_over_var(self, sample_dur, output_name):
96 chunks = self.chunks 97 if not chunks: 98 return 99 samp_ms = chunks[0].sampling*1e3 100 half_dur_samples = int(round(sample_dur / samp_ms) / 2) or 1 101 data_min = 1e10 102 data_max = 1e-10 103 for chunk in chunks: 104 data_min = min(data_min, min(chunk.samples)) 105 data_max = max(data_max, max(chunk.samples)) 106 resample_delta = (data_max - data_min) / 50 107 samples_per_level = 8 108 idealizer = qubx.fast.data.IdlStim([], True, resample_delta, 0.0, samp_ms) 109 tot_n = sum(chunk.n for chunk in chunks) 110 read_n = 0 111 # analyze each included chunk: 112 for chunk in chunks: 113 idealizer.add(chunk.samples) 114 read_n += chunk.n 115 self.progress = read_n * 33.0 / tot_n 116 idealizer.done_add() 117 amps = idealizer.get_amps() 118 idlchunks = [idealizer.get_next_dwells(chunk.f, chunk.n) for chunk in chunks] 119 by_level = collections.defaultdict(list) 120 for i in xrange(len(idlchunks)): 121 chunk = chunks[i] 122 for f, l, c in itertools.izip(*idlchunks[i]): 123 # mid = int(round((f + l)/2.0)) 124 mid = random.randrange(f, l+1) 125 sample_f = max(chunk.f, mid-half_dur_samples) 126 sample_l = min(chunk.l, sample_f + 2*half_dur_samples) 127 data = chunk.samples[sample_f-chunk.f:sample_l-chunk.f+1] 128 by_level[c].append((sample_f, sample_l, data.mean(), data.std()**2)) 129 read_n += chunks[i].n 130 self.progress = read_n * 33.0 / tot_n 131 del chunks[i].samples 132 if samples_per_level: 133 for c in by_level: 134 by_level[c] = pick_random(by_level[c], samples_per_level) 135 samples_flmv = [] 136 for c in by_level: 137 samples_flmv.extend(by_level[c]) 138 qubx.task.idle_wait(self.__show_list, samples_flmv)
139 - def __show_list(self, samples_flmv):
140 # __main__.func_to_profile = lambda: self.__do_show_list(samples_flmv) 141 # cProfile.run("func_to_profile()", "/tmp/p") 142 #def __do_show_list(self, samples_flmv): 143 # part of why it's slow is repeatedly invalidating panels' models, so they queue their robot and wait...? 144 table = self.table 145 signal_name = self.dataview.signals[self.iSignal+1, 'Name'] 146 mean_name = '%s Mean' % signal_name 147 variance_name = '%s Std' % signal_name 148 for f, l, m, v in samples_flmv: 149 table.append_selection(f, l, **{mean_name: m, variance_name: sqrt(v)}) 150 QubX = qubx.global_namespace.QubX 151 QubX.Figures.Charts.add_two_plot('List', mean_name, variance_name, False, False) 152 QubX.show_charts()
153
154 -def pick_random(lst, k=1, destructive=True):
155 """destructive: destroys contents of lst to avoid making a copy""" 156 items = lst if destructive else lst[:] 157 n = len(items) 158 remain = k 159 results = [] 160 while remain and n: 161 pos = random.randrange(n) 162 results.append(items[pos]) 163 items[pos] = items[n-1] 164 n -= 1 165 remain -= 1 166 if k == 1: 167 return results[0] 168 return results
169