1 """Kalman filter from qubfast/qub_kalman.
2
3 Copyright 2014 Research Foundation State University of New York
4 This file is part of QUB Express.
5
6 QUB Express is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 QUB Express is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License,
17 named LICENSE.txt, in the QUB Express program directory. If not, see
18 <http://www.gnu.org/licenses/>.
19
20 """
21 from qubx.fast.fast_utils import *
22
24 _fields_ = [('storage', c_void_p),
25 ('Nprocess', c_int),
26 ('Nmeasure', c_int),
27 ('A', c_double_pp),
28 ('Q', c_double_pp),
29 ('H', c_double_pp),
30 ('R', c_double_pp)]
31 PKalmanConst = POINTER(KalmanConst)
32
33
34 qubfast.qub_kalman_create.argtypes = (c_int, c_int)
35 qubfast.qub_kalman_create.restype = PKalmanConst
36
37 qubfast.qub_kalman_free.argtypes = (PKalmanConst,)
38 qubfast.qub_kalman_free.restype = None
39
41 """Holds the matrices which define the filter; can be shared among L{Filter} instances."""
42 - def __init__(self, Nprocess=1, Nmeasure=1):
43 self.obj = qubfast.qub_kalman_create(Nprocess, Nmeasure)
44 self.A_ = numpy.frombuffer(pybuf(self.obj[0].A[0], Nprocess*Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess*Nprocess).reshape((Nprocess,Nprocess))
45 self.Q_ = numpy.frombuffer(pybuf(self.obj[0].Q[0], Nprocess*Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess*Nprocess).reshape((Nprocess,Nprocess))
46 self.H_ = numpy.frombuffer(pybuf(self.obj[0].H[0], Nmeasure*Nprocess*sizeof(c_double)), dtype='float64', count=Nmeasure*Nprocess).reshape((Nmeasure,Nprocess))
47 self.R_ = numpy.frombuffer(pybuf(self.obj[0].R[0], Nmeasure*Nmeasure*sizeof(c_double)), dtype='float64', count=Nmeasure*Nmeasure).reshape((Nmeasure,Nmeasure))
50 Nprocess = property(lambda self: self.obj[0].Nprocess, doc="length of the process (X) vector")
51 Nmeasure = property(lambda self: self.obj[0].Nmeasure, doc="length of the measurement (Z) vector")
52 A = property(lambda self: self.A_, doc="process matrix (Nprocess x Nprocess); default is identity matrix")
53 Q = property(lambda self: self.Q_, doc="process covariance (Nprocess x Nprocess)")
54 H = property(lambda self: self.H_, doc="process -> measurement matrix (Nmeasure x Nprocess)")
55 R = property(lambda self: self.R_, doc="measurement covariance (Nmeasure x Nmeasure)")
56
57
64 PKalmanState = POINTER(KalmanState)
65
66
67 qubfast.qub_kalman_state_create.argtypes = (PKalmanConst,)
68 qubfast.qub_kalman_state_create.restype = PKalmanState
69
70 qubfast.qub_kalman_state_free.argtypes = (PKalmanState,)
71 qubfast.qub_kalman_state_free.restype = None
72
73 qubfast.qub_kalman_state_copy.argtypes = (PKalmanState, PKalmanState)
74 qubfast.qub_kalman_state_copy.restype = None
75
76 qubfast.qub_kalman_state_reset.argtypes = (PKalmanState,)
77 qubfast.qub_kalman_state_reset.restype = None
78
79 qubfast.qub_kalman_state_next.argtypes = (PKalmanState, c_double_p)
80 qubfast.qub_kalman_state_next.restype = None
81
82 qubfast.qub_kalman_state_next_into.argtypes = (PKalmanState, PKalmanState, c_double_p)
83 qubfast.qub_kalman_state_next_into.restype = None
84
85 qubfast.qub_kalman_filter.argtypes = (PKalmanState, c_int, c_double_p, c_double_p)
86 qubfast.qub_kalman_filter.restype = None
87
90 self.obj = qubfast.qub_kalman_state_create(constants.obj)
91 self.constants = constants
92 Nprocess, Nmeasure = constants.Nprocess, constants.Nmeasure
93 self.X_ = numpy.frombuffer(pybuf(self.obj[0].X, Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess)
94 self.Z_ = numpy.frombuffer(pybuf(self.obj[0].Z, Nmeasure*sizeof(c_double)), dtype='float64', count=Nmeasure)
95 self.P_ = numpy.frombuffer(pybuf(self.obj[0].P[0], Nprocess*Nprocess*sizeof(c_double)), dtype='float64', count=Nprocess*Nprocess).reshape((Nprocess,Nprocess))
98 X = property(lambda self: self.X_, doc="posterior process state vector")
99 Z = property(lambda self: self.Z_, doc="posterior measurement vector (output of the filter after each sample)")
100 P = property(lambda self: self.P_, doc="posterior process covariance")
106 """z must be a numpy.array of type double, length constants.Nmeasure."""
107 qubfast.qub_kalman_state_next(self.obj, cdata(z, c_double_p))
109 """z must be a python number, and constants.Nmeasure == 1"""
110 z_ = c_double(z)
111 qubfast.qub_kalman_state_next(self.obj, byref(z_))
117 - def filter(self, arr, get_std=False):
118 """Optimization for Nmeasure == 1; replaces arr with filtered."""
119 if get_std:
120 variance = numpy.zeros(shape=arr.shape, dtype='float64')
121 qubfast.qub_kalman_filter(self.obj, len(arr), cdata(arr, c_double_p), cdata(variance, c_double_p))
122 return numpy.sqrt(variance)
123 else:
124 qubfast.qub_kalman_filter(self.obj, len(arr), cdata(arr, c_double_p), None)
125
126 -def Filter1D(A=1.0, Q=1e-5, H=1.0, R=1e-3, X=0.0):
127 """Returns a L{Filter} object for one-dimensional process and measurement.
128
129 @param A: process scalar; X is multiplied by this at each sample
130 @param Q: process variance
131 @param H: process to measurement scalar
132 @param R: measurement variance
133 @param X: initial process state
134 """
135 c = Constants(1, 1)
136 c.A[0,0] = A
137 c.Q[0,0] = Q
138 c.H[0,0] = H
139 c.R[0,0] = R
140 f = Filter(c)
141 f.X[0] = X
142 return f
143