Package qubx :: Package fast :: Module kalman
[hide private]
[frames] | no frames]

Source Code for Module qubx.fast.kalman

  1  """Kalman filter from qubfast/qub_kalman. 
  2   
  3  Copyright 2014 Research Foundation State University of New York  
  4  This file is part of QUB Express.                                           
  5   
  6  QUB Express is free software; you can redistribute it and/or modify           
  7  it under the terms of the GNU General Public License as published by  
  8  the Free Software Foundation, either version 3 of the License, or     
  9  (at your option) any later version.                                   
 10   
 11  QUB Express is distributed in the hope that it will be useful,                
 12  but WITHOUT ANY WARRANTY; without even the implied warranty of        
 13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         
 14  GNU General Public License for more details.                          
 15   
 16  You should have received a copy of the GNU General Public License,    
 17  named LICENSE.txt, in the QUB Express program directory.  If not, see         
 18  <http://www.gnu.org/licenses/>.                                       
 19   
 20  """ 
 21  from qubx.fast.fast_utils import * 
 22   
23 -class KalmanConst(Structure):
24 _fields_ = [('storage', c_void_p), 25 ('Nprocess', c_int), 26 ('Nmeasure', c_int), 27 ('A', c_double_pp), 28 ('Q', c_double_pp), 29 ('H', c_double_pp), 30 ('R', c_double_pp)]
31 PKalmanConst = POINTER(KalmanConst) 32 33 # QUBFAST_API qub_kalman_const * qub_kalman_create(int Nprocess, int Nmeasure); 34 qubfast.qub_kalman_create.argtypes = (c_int, c_int) 35 qubfast.qub_kalman_create.restype = PKalmanConst 36 # QUBFAST_API void qub_kalman_free(qub_kalman_const *filter); 37 qubfast.qub_kalman_free.argtypes = (PKalmanConst,) 38 qubfast.qub_kalman_free.restype = None 39
40 -class Constants(object):
41 """Holds the matrices which define the filter; can be shared among L{Filter} instances."""
42 - def __init__(self, Nprocess=1, Nmeasure=1):
43 self.obj = qubfast.qub_kalman_create(Nprocess, Nmeasure) 44 self.A_ = numpy.frombuffer(pybuf(self.obj[0].A[0], Nprocess*Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess*Nprocess).reshape((Nprocess,Nprocess)) 45 self.Q_ = numpy.frombuffer(pybuf(self.obj[0].Q[0], Nprocess*Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess*Nprocess).reshape((Nprocess,Nprocess)) 46 self.H_ = numpy.frombuffer(pybuf(self.obj[0].H[0], Nmeasure*Nprocess*sizeof(c_double)), dtype='float64', count=Nmeasure*Nprocess).reshape((Nmeasure,Nprocess)) 47 self.R_ = numpy.frombuffer(pybuf(self.obj[0].R[0], Nmeasure*Nmeasure*sizeof(c_double)), dtype='float64', count=Nmeasure*Nmeasure).reshape((Nmeasure,Nmeasure))
48 - def __del__(self):
49 qubfast.qub_kalman_free(self.obj)
50 Nprocess = property(lambda self: self.obj[0].Nprocess, doc="length of the process (X) vector") 51 Nmeasure = property(lambda self: self.obj[0].Nmeasure, doc="length of the measurement (Z) vector") 52 A = property(lambda self: self.A_, doc="process matrix (Nprocess x Nprocess); default is identity matrix") 53 Q = property(lambda self: self.Q_, doc="process covariance (Nprocess x Nprocess)") 54 H = property(lambda self: self.H_, doc="process -> measurement matrix (Nmeasure x Nprocess)") 55 R = property(lambda self: self.R_, doc="measurement covariance (Nmeasure x Nmeasure)")
56 57
58 -class KalmanState(Structure):
59 _fields_ = [('storage', c_void_p), 60 ('kalman', PKalmanConst), 61 ('X', c_double_p), 62 ('Z', c_double_p), 63 ('P', c_double_pp)]
64 PKalmanState = POINTER(KalmanState) 65 66 # QUBFAST_API qub_kalman_state * qub_kalman_state_create(qub_kalman_const *kalman); 67 qubfast.qub_kalman_state_create.argtypes = (PKalmanConst,) 68 qubfast.qub_kalman_state_create.restype = PKalmanState 69 # QUBFAST_API void qub_kalman_state_free(qub_kalman_state *state); 70 qubfast.qub_kalman_state_free.argtypes = (PKalmanState,) 71 qubfast.qub_kalman_state_free.restype = None 72 # QUBFAST_API void qub_kalman_state_copy(qub_kalman_state *into, qub_kalman_state *from); 73 qubfast.qub_kalman_state_copy.argtypes = (PKalmanState, PKalmanState) 74 qubfast.qub_kalman_state_copy.restype = None 75 # QUBFAST_API void qub_kalman_state_reset(qub_kalman_state *state); 76 qubfast.qub_kalman_state_reset.argtypes = (PKalmanState,) 77 qubfast.qub_kalman_state_reset.restype = None 78 # QUBFAST_API void qub_kalman_state_next(qub_kalman_state *state, double *z); 79 qubfast.qub_kalman_state_next.argtypes = (PKalmanState, c_double_p) 80 qubfast.qub_kalman_state_next.restype = None 81 # QUBFAST_API void qub_kalman_state_next_into(qub_kalman_state *into, qub_kalman_state *from, double *z); 82 qubfast.qub_kalman_state_next_into.argtypes = (PKalmanState, PKalmanState, c_double_p) 83 qubfast.qub_kalman_state_next_into.restype = None 84 # QUBFAST_API void qub_kalman_filter(qub_kalman_state *state, int N, double *z); // for Nprocess == Nmeasure == 1 85 qubfast.qub_kalman_filter.argtypes = (PKalmanState, c_int, c_double_p, c_double_p) 86 qubfast.qub_kalman_filter.restype = None 87
88 -class Filter(object):
89 - def __init__(self, constants):
90 self.obj = qubfast.qub_kalman_state_create(constants.obj) 91 self.constants = constants 92 Nprocess, Nmeasure = constants.Nprocess, constants.Nmeasure 93 self.X_ = numpy.frombuffer(pybuf(self.obj[0].X, Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess) 94 self.Z_ = numpy.frombuffer(pybuf(self.obj[0].Z, Nmeasure*sizeof(c_double)), dtype='float64', count=Nmeasure) 95 self.P_ = numpy.frombuffer(pybuf(self.obj[0].P[0], Nprocess*Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess*Nprocess).reshape((Nprocess,Nprocess))
96 - def __del__(self):
97 qubfast.qub_kalman_state_free(self.obj)
98 X = property(lambda self: self.X_, doc="posterior process state vector") 99 Z = property(lambda self: self.Z_, doc="posterior measurement vector (output of the filter after each sample)") 100 P = property(lambda self: self.P_, doc="posterior process covariance")
101 - def copy_from(self, other):
102 qubfast.qub_kalman_state_copy(self.obj, other.obj)
103 - def reset(self):
104 qubfast.qub_kalman_state_reset(self.obj)
105 - def next_arr(self, z):
106 """z must be a numpy.array of type double, length constants.Nmeasure.""" 107 qubfast.qub_kalman_state_next(self.obj, cdata(z, c_double_p))
108 - def next_num(self, z):
109 """z must be a python number, and constants.Nmeasure == 1""" 110 z_ = c_double(z) 111 qubfast.qub_kalman_state_next(self.obj, byref(z_))
112 - def next_arr_from(self, other, z):
113 qubfast.qub_kalman_state_next_into(self.obj, other.obj, cdata(z, c_double_p))
114 - def next_num_from(self, other, z):
115 z_ = c_double(z) 116 qubfast.qub_kalman_state_next_into(self.obj, other.obj, byref(z_))
117 - def filter(self, arr, get_std=False):
118 """Optimization for Nmeasure == 1; replaces arr with filtered.""" 119 if get_std: 120 variance = numpy.zeros(shape=arr.shape, dtype='float64') 121 qubfast.qub_kalman_filter(self.obj, len(arr), cdata(arr, c_double_p), cdata(variance, c_double_p)) 122 return numpy.sqrt(variance) 123 else: 124 qubfast.qub_kalman_filter(self.obj, len(arr), cdata(arr, c_double_p), None)
125
126 -def Filter1D(A=1.0, Q=1e-5, H=1.0, R=1e-3, X=0.0):
127 """Returns a L{Filter} object for one-dimensional process and measurement. 128 129 @param A: process scalar; X is multiplied by this at each sample 130 @param Q: process variance 131 @param H: process to measurement scalar 132 @param R: measurement variance 133 @param X: initial process state 134 """ 135 c = Constants(1, 1) 136 c.A[0,0] = A 137 c.Q[0,0] = Q 138 c.H[0,0] = H 139 c.R[0,0] = R 140 f = Filter(c) 141 f.X[0] = X 142 return f
143