Package qubx :: Module spectrum
[hide private]
[frames] | no frames]

Source Code for Module qubx.spectrum

  1  #/* Copyright 2008-2012 Research Foundation State University of New York   */ 
  2   
  3  #/* This file is part of QUB Express.                                      */ 
  4   
  5  #/* QUB Express is free software; you can redistribute it and/or modify    */ 
  6  #/* it under the terms of the GNU General Public License as published by   */ 
  7  #/* the Free Software Foundation, either version 3 of the License, or      */ 
  8  #/* (at your option) any later version.                                    */ 
  9   
 10  #/* QUB Express is distributed in the hope that it will be useful,         */ 
 11  #/* but WITHOUT ANY WARRANTY; without even the implied warranty of         */ 
 12  #/* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the          */ 
 13  #/* GNU General Public License for more details.                           */ 
 14   
 15  #/* You should have received a copy of the GNU General Public License,     */ 
 16  #/* named LICENSE.txt, in the QUB Express program directory.  If not, see  */ 
 17  #/* <http://www.gnu.org/licenses/>.                                        */ 
 18   
 19  from __future__ import with_statement 
 20   
 21  from cStringIO import StringIO 
 22  import qubx.pyenv 
 23  import numpy 
 24  import re 
 25  from qubx.hist import * 
 26  from math import * 
 27   
 28  # necessary for py2exe/scipy 
 29  import scipy.misc 
 30  scipy.factorial = scipy.misc.factorial 
 31  import scipy.signal 
32 33 -def acceptPow2(x):
34 return int(2**max(2, round(log(int(x)) / log(2))))
35
36 -def acceptObjectName(x):
37 eval(x, qubx.pyenv.env.globals) 38 return x
39 40 npb_rules = [(re.compile(r"hann", re.I), 1.5), 41 (re.compile(r"hamm", re.I), 1.36), 42 (re.compile(r"blackman", re.I), 1.7), 43 (re.compile(r"flattop", re.I), 3.77)]
44 -def guess_noise_power_bandwidth(expr):
45 for regexp, npb in npb_rules: 46 if regexp.search(expr): 47 return npb 48 return 1.0 # e.g. None, uniform
49 npb = memoize(guess_noise_power_bandwidth) 50 51 52 @Propertied(Property('fft_bins', 256, 'number of FFT bins'), 53 Property('fft_window', 'numpy.hanning', 'func(N) -> array(shape=(N,)) e.g. numpy.hanning'), 54 Property('noise_power_bandwidth', 1.5, 'depends on the window function; common ones are autodetected') 55 )
56 -class SpectrumFace(DataHistFace):
57 __explore_featured = ['controls', 'copy_to_clipboard', 'copy_image_to_clipboard', 'save_to']
58 - def __init__(self, name='Spectrum', global_name=""):
59 super(SpectrumFace, self).__init__(name, global_name) 60 self.__ref = Reffer() 61 self.propertied_connect_settings('Spectrum') 62 self.plot_count = 1 63 self.path = None 64 65 self.controls = qubx.faces.Face("Spectrum", global_name='QubX.About.Spectrum') 66 self.controls.show() 67 line = pack_item(gtk.HBox(True), self.controls) 68 pack_label('Bins:', line) 69 self.txtBins = pack_item(qubx.GTK.NumEntry(self.fft_bins, acceptPow2), line) 70 self.propertied_connect_NumEntry('fft_bins', self.txtBins) 71 pack_label('Window:', self.controls) 72 self.mnuWindow = pack_item(qubx.GTK.SuggestiveComboBox(self.fft_window, acceptObjectName), self.controls) 73 self.mnuWindow.OnChange += self.__ref(self.__onChangeWindow) 74 self.mnuWindow.OnPopup += self.__ref(self.__onPopupWindow) 75 self.mnuWindow.OnSuggest += self.__ref(self.__onSuggestWindow) 76 pack_label('Noise power bandwith:', self.controls).set_tooltip_text("Auto-detected for common window names") 77 self.txtNPB = pack_item(qubx.GTK.NumEntry(self.noise_power_bandwidth, acceptFloatGreaterThan(0.0)), self.controls) 78 self.propertied_connect_NumEntry('noise_power_bandwidth', self.txtNPB) 79 line = pack_item(gtk.HBox(True), self.controls) 80 self.btnSave = pack_button('Save...', line, on_click=self.__onClickSave) 81 self.btnCopy = pack_button('Copy', line, on_click=self.__onClickCopy) 82 self.btnCopyImage = pack_button('Image...', line, on_click=self.__onClickCopyImage)
83 - def propertied_set(self, value, name):
84 super(SpectrumFace, self).propertied_set(value, name) 85 if name == 'fft_window': 86 self.mnuWindow.value = value 87 self.noise_power_bandwidth = npb(value) 88 self.update()
89 - def __onChangeWindow(self, txt, val):
90 self.propertied_on_user_set('fft_window', val)
91 - def __onPopupWindow(self, txt, suggest):
92 suggest("numpy.hanning") 93 suggest("numpy.hamming") 94 suggest("numpy.blackman") 95 suggest("scipy.signal.flattop") 96 suggest("None")
97 - def __onSuggestWindow(self, txt, val):
98 self.fft_window = val
99 - def __onClickSave(self, btn):
100 dlg = gtk.FileChooserDialog('Save as...', None, gtk.FILE_CHOOSER_ACTION_SAVE, 101 buttons=(gtk.STOCK_CANCEL,gtk.RESPONSE_CANCEL,gtk.STOCK_OPEN,gtk.RESPONSE_OK)) 102 dlg.set_default_response(gtk.RESPONSE_OK) 103 filter = gtk.FileFilter() 104 filter.set_name("text files") 105 filter.add_pattern("*.txt") 106 dlg.add_filter(filter) 107 if self.path: dlg.set_current_folder(self.path) 108 dlg.set_filename('Spectrum ') 109 response = dlg.run() 110 if response == gtk.RESPONSE_OK: 111 try: 112 fname = add_ext_if_none(dlg.get_filename(), '.txt') 113 if self.global_name: 114 qubx.pyenv.env.scriptable_if_matching('%s.save_to(open(%s, "w"))' % (self.global_name, repr(fname)), 115 [(self.global_name, self)]) 116 self.save_to(open(fname, 'w')) 117 except Exception, e: 118 print "Saving %s: %s" % (fname, e) 119 self.path = dlg.get_current_folder() 120 dlg.destroy()
121 - def __onClickCopy(self, btn):
122 if self.global_name: 123 qubx.pyenv.env.scriptable_if_matching('%s.copy_to_clipboard()' % self.global_name, 124 [(self.global_name, self)]) 125 self.copy_to_clipboard()
126 - def copy_to_clipboard(self):
127 buf = cStringIO.StringIO() 128 self.save_to(buf) 129 gtk.clipboard_get('CLIPBOARD').set_text(buf.getvalue())
130 - def __onClickCopyImage(self, btn):
131 if gtk.RESPONSE_ACCEPT == TheSpectrumCopyDialog.run(self.plots[0]): 132 w = TheSpectrumCopyDialog.copy_width 133 h = TheSpectrumCopyDialog.copy_height 134 if self.global_name: 135 qubx.pyenv.env.scriptable_if_matching('%s.copy_image_to_clipboard(%s, %s)' % 136 (self.global_name, repr(w), repr(h)), 137 [(self.global_name, self)]) 138 self.copy_image_to_clipboard(w, h)
139 - def copy_image_to_clipboard(self, w, h):
140 pixmap = gdk.Pixmap(self.window, w, h, -1) 141 ctx = pixmap.cairo_create() 142 ctx.set_source_rgb(1,1,1) 143 ctx.paint() 144 self.plots[0].draw_to_context(ctx, w, h) 145 del ctx 146 pixbuf = gdk.Pixbuf(gdk.COLORSPACE_RGB, False, 8, w, h) 147 pixbuf.get_from_drawable(pixmap, gdk.colormap_get_system(), 0, 0, 0, 0, -1, -1) 148 clipboard = gtk.clipboard_get('CLIPBOARD') 149 clipboard.set_image(pixbuf) 150 del pixmap 151 del pixbuf 152 qubx.pyenv.env.gc_collect_on_idle() 153 self.plots[0].redraw_canvas() # re-calculate geometry for mouse
154 - def onShow(self, showing):
155 DataHistFace.onShow(self, showing) 156 if showing: 157 self.QubX.About.append_face(self.controls) 158 self.QubX.About.show_face('Spectrum') 159 else: 160 self.QubX.About.remove_face( self.QubX.About.index('Spectrum') )
161 - def save_to(self, f):
162 """Writes the histogram as tab-separated text to a.file-like object""" 163 f.write(self.plots[0].nbChart.to_txt())
164 - def on_change_data(self):
165 """Determines if update() is needed""" 166 if self.QubX.Data.view: 167 self.update()
168 # self.robot.interrupt()
169 - def robot_update(self):
170 """Calculates a new spectrum.""" 171 with self.robot.main_hold: 172 segs = self.dataSource.get_segmentation() 173 nbin = self.fft_bins 174 window_f = eval(self.fft_window, qubx.pyenv.env.globals) 175 window = window_f(nbin).astype('float32') if window_f else numpy.ones(dtype='float32', shape=(nbin,)) 176 npb = self.noise_power_bandwidth 177 n = sum(seg.n for seg in segs) 178 if n == 0: return 179 units = segs[0].file.signals[segs[0].signal, 'Units'] 180 bins = numpy.fft.fftfreq(nbin, segs[0].sampling)[:nbin/2] # discard negative freqs 181 182 finished = 0 183 spec = MovingWindowSpectrum(window, npb, overlap=2, delta_f=bins[1]) 184 for seg in segs: 185 for frame in generate_sub_samples(nbin/2, self.dataSource.gen_samples(seg.chunks, self.robot.main_hold)): 186 if frame is None: # excluded region -- treating it like segment break 187 spec.flush() 188 else: 189 spec.add_frame(frame) 190 finished += len(frame) 191 self.robot.progress = finished * 99.0 / n 192 spec.flush() 193 bars = spec.finish_average() 194 self.robot.progress = 100.0 195 196 with self.robot.main_hold: 197 self.histrec[0][:] = [HistRec('Power Spectrum Distribution', 198 'Spectrum', bins, bars, nb_headers=['Hz', '%s^2 rms / Hz' % units], 199 nb_xlabel='Hz', nb_ylabel='PSD')] 200 gobject.idle_add(self.update_plots)
201
202 -def generate_sub_samples(sub_size, chunks):
203 # DataSource.gen_samples has to pause the main thread each time, 204 # so it's faster to cache a lot and subdivide it here in-thread 205 for chunk in chunks: 206 if chunk.included: 207 samples = chunk.samples 208 n = chunk.n 209 i = 0 210 while i < n: 211 k = min(sub_size, n-i) 212 yield samples[i:i+k] 213 i += k 214 else: 215 yield None
216
217 -class MovingWindowSpectrum(object):
218 - def __init__(self, window, noise_power_bandwidth, overlap, delta_f):
219 self.window = window 220 self.npb_correction = 1.0 / (noise_power_bandwidth * delta_f) 221 self.overlap = overlap 222 self.nbin = len(window) 223 self.frame = zeros(shape=(len(window)/2,), dtype='float64') 224 self.total_occupancy = 0.0 # like frame count, except first frame is only (1/overlap) full, so contributes that fraction to total_occupancy 225 self.data_window = zeros(shape=window.shape, dtype='float32') 226 self.piece_size = self.nbin / overlap 227 self.data_insert = self.nbin - self.piece_size 228 self.data_present = self.nbin
229 - def add_frame(self, frame, occupied=True):
230 n_kept = min(self.data_insert, self.nbin - self.data_present) 231 n_added = len(frame) 232 if n_kept: 233 self.data_window[self.data_insert-n_kept:self.data_insert] = self.data_window[self.nbin-n_kept:self.nbin] 234 if n_added: 235 self.data_window[self.data_insert:self.data_insert+n_added] = frame 236 if n_added < self.piece_size: 237 self.data_window[self.nbin - (self.piece_size - n_added):] = 0 238 self.data_present = self.data_insert - n_kept 239 occupancy = (n_kept + n_added) * 1.0 / self.nbin 240 self.total_occupancy += occupancy 241 if occupancy: 242 psd = numpy.abs(numpy.fft.rfft(self.window * self.data_window))**2 243 psd *= occupancy * self.npb_correction / self.nbin 244 self.frame += psd[:len(self.frame)]
245 - def flush(self):
246 for i in xrange(self.overlap-1): 247 self.add_frame([])
248 - def finish_average(self):
249 if self.total_occupancy: 250 self.frame /= self.total_occupancy 251 self.total_occupancy = None 252 return self.frame
253
254 255 -class SpectrumCopyDialog(qubx.GTK.CopyDialog):
256 - def __init__(self, parent=None):
257 qubx.GTK.CopyDialog.__init__(self, 'Copy histogram image', parent)
258 - def run(self, view):
259 response = qubx.GTK.CopyDialog.run(self, view) 260 return response
261 262 TheSpectrumCopyDialog = SpectrumCopyDialog() 263