Package qubx :: Module data_acquire
[hide private]
[frames] | no frames]

Source Code for Module qubx.data_acquire

 1  """Reads .acquire data files. 
 2   
 3  Copyright 2008-2011 Research Foundation State University of New York  
 4  This file is part of QUB Express.                                           
 5   
 6  QUB Express is free software; you can redistribute it and/or modify           
 7  it under the terms of the GNU General Public License as published by  
 8  the Free Software Foundation, either version 3 of the License, or     
 9  (at your option) any later version.                                   
10   
11  QUB Express is distributed in the hope that it will be useful,                
12  but WITHOUT ANY WARRANTY; without even the implied warranty of        
13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         
14  GNU General Public License for more details.                          
15   
16  You should have received a copy of the GNU General Public License,    
17  named LICENSE.txt, in the QUB Express program directory.  If not, see         
18  <http://www.gnu.org/licenses/>.                                       
19   
20  """ 
21   
22  from qubx.data_types import * 
23  from qubx.acquirefile import * 
24   
25   
26 -class QubData_ACQ_Analog(QubData_Analog):
27 - def __init__(self, data, channel):
28 """ 29 @param data: L{QubData_ACQ} 30 """ 31 QubData_Analog.__init__(self) 32 self.data = data 33 self.channel = channel
34 - def read(self, first, last, latency=0, skip=1):
35 ifirst, ilast = [self.data.segmentation.index_at(x) for x in (first, last)] 36 if ifirst != ilast: 37 raise Exception("Acquirefile: can't read() from multiple segments at once") 38 segfirst = self.data.segmentation.segments[ifirst][0] 39 seglast = self.data.segmentation.segments[ifirst][1] 40 return read_with_latency(self.read_in_seg, first, last, latency, skip, ifirst, segfirst, seglast)
41 - def read_in_seg(self, iseg, first, last, skip):
42 series, sweep = self.data.ser_sweep_of_seg[iseg] 43 samples = self.data.file.read_data_float32(series, sweep, iseg, self.channel, first, last-first+1) 44 scale = self.data.signals.get(self.channel, 'Scale') 45 if scale != 1.0: 46 samples *= scale 47 offset = self.data.signals.get(self.channel, 'Offset') 48 if offset != 0.0: 49 samples += offset 50 return samples
51 52
53 -class QubData_ACQ(QubData):
54 """One open Aquire data file. 55 56 @ivar file: L{qubx.acquirefile.AcquireFile} 57 """
58 - def __init__(self, path, progressf):
59 QubData.__init__(self) 60 self.path = path 61 self.file = AcquireFile(path) 62 self.sampling = self.file.sampling_sec 63 64 for c in xrange(self.file.header.analog_in_channel_count): 65 self.signals.append({'Name' : self.file.ai_channels[c].label, 66 'Units' : self.file.ai_channels[c].units}) 67 self.set_analog(c, QubData_ACQ_Analog(self, c)) 68 69 last = -1 70 i = 0 71 self.ser_sweep_of_seg = [] 72 for r, series in enumerate(self.file.series): 73 for w, sweep in enumerate(series.sweeps): 74 for g, seg in enumerate(sweep.sweep.segments): 75 seglen = sum(block.data_count for block in seg.channels[0]) 76 first = last + 1 77 last = first + seglen - 1 78 self.ser_sweep_of_seg.append((r,w)) 79 self.segmentation.add_seg(first, last, first*1e3*self.sampling) 80 81 base, ext = os.path.splitext(path) 82 sess = qubx.tree.Open(base+'.qsf', True) 83 sess.close() # release file, keep in mem 84 85 # override QuB's integer scaling values 86 for chan in qubx.tree.children(sess.find('DataChannels'), 'Channel'): 87 chan['Scaling'].data = 1.0 88 chan['Offset'].data = 0.0 89 90 self.read_session(sess, progressf)
91 92 93 SetReader('.acquire', 'Acquire files', QubData_ACQ) 94