Skip to content

Commit 49fb9e9

Browse files
Add unit tests for weak stochastic integrators
1 parent ed48495 commit 49fb9e9

File tree

1 file changed

+352
-0
lines changed

1 file changed

+352
-0
lines changed
Lines changed: 352 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,352 @@
1+
#
2+
# ISC License
3+
#
4+
# Copyright (c) 2025, Autonomous Vehicle Systems Lab, University of Colorado at Boulder
5+
#
6+
# Permission to use, copy, modify, and/or distribute this software for any
7+
# purpose with or without fee is hereby granted, provided that the above
8+
# copyright notice and this permission notice appear in all copies.
9+
#
10+
# THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
# OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
"""
18+
The SRK integrator tested here is described in:
19+
20+
Tang, X., Xiao, A. Efficient weak second-order stochastic Runge-Kutta methods
21+
for Itô stochastic differential equations. Bit Numer Math 57, 241-260 (2017).
22+
https://doi.org/10.1007/s10543-016-0618-9
23+
"""
24+
from __future__ import annotations
25+
26+
from typing import Callable, List
27+
28+
from Basilisk.utilities import macros
29+
from Basilisk.utilities import SimulationBaseClass
30+
from Basilisk.simulation import svIntegrators
31+
from Basilisk.simulation import dynParamManager
32+
33+
try:
34+
from Basilisk.simulation import mujoco
35+
from Basilisk.simulation import StatefulSysModel
36+
couldImportMujoco = True
37+
except:
38+
couldImportMujoco = False
39+
40+
import pytest
41+
import numpy as np
42+
import numpy.typing as npt
43+
44+
# Function of the form y = f(t,x) where x and y are vectors of the same size
45+
DynFunc = Callable[[float, npt.NDArray[np.float64]], npt.NDArray[np.float64]]
46+
47+
import numpy as np
48+
49+
# Coefficient sets (from Tang & Xiao Table 2, W2-Ito1 & W2-Ito2)
50+
W2_Ito1 = {
51+
'alpha': np.array([1/6, 2/3, 1/6]),
52+
'beta0': np.array([-1, 1, 1]),
53+
'beta1': np.array([2, 0, -2]),
54+
55+
'A0': np.array([[0.0, 0.0, 0.0],
56+
[1/2, 0.0, 0.0],
57+
[-1, 2, 0]]),
58+
59+
'B0': np.array([[0.0, 0.0, 0.0],
60+
[(6-np.sqrt(6))/10, 0.0, 0.0],
61+
[(3+2*np.sqrt(6))/5, 0.0, 0.0]]),
62+
63+
'A1': np.array([[0.0, 0.0, 0.0],
64+
[1/4, 0.0, 0.0],
65+
[1/4, 0.0, 0.0]]),
66+
67+
'B1': np.array([[0.0, 0.0, 0.0],
68+
[1/2, 0.0, 0.0],
69+
[-1/2, 0.0, 0.0]]),
70+
71+
'B2': np.array([[0.0, 0.0, 0.0],
72+
[1.0, 0.0, 0.0],
73+
[0.0, 0.0, 0.0]]),
74+
}
75+
76+
W2_Ito2 = {
77+
'alpha': np.array([1/6, 1/3, 1/3, 1/6]),
78+
'beta0': np.array([0, -1, 1, 1]),
79+
'beta1': np.array([0, 2, 0, -2]),
80+
81+
'A0': np.array([[0.0, 0.0, 0.0, 0.0],
82+
[1/2, 0.0, 0.0, 0.0],
83+
[0.0, 1/2, 0.0, 0.0],
84+
[0.0, 0.0, 1.0, 0.0]]),
85+
86+
'B0': np.array([[0.0, 0.0, 0.0, 0.0],
87+
[0.0, 0.0, 0.0, 0.0],
88+
[0.0, 1.0, 0.0, 0.0],
89+
[0.0, 1.0, 0.0, 0.0]]),
90+
91+
'A1': np.array([[0.0, 0.0, 0.0, 0.0],
92+
[1/2, 0.0, 0.0, 0.0],
93+
[1/2, 0.0, 0.0, 0.0],
94+
[1/2, 0.0, 0.0, 0.0]]),
95+
96+
'B1': np.array([[0.0, 0.0, 0.0, 0.0],
97+
[0.0, 0.0, 0.0, 0.0],
98+
[0.0, 1/2, 0.0, 0.0],
99+
[0.0, -1/2, 0.0, 0.0]]),
100+
101+
'B2': np.array([[0.0, 0.0, 0.0, 0.0],
102+
[0.0, 0.0, 0.0, 0.0],
103+
[0.0, 1.0, 0.0, 0.0],
104+
[0.0, 0.0, 0.0, 0.0]]),
105+
}
106+
107+
def srk2_integrate(
108+
f: DynFunc,
109+
g_list: List[DynFunc],
110+
x0: npt.NDArray[np.float64],
111+
dt: float,
112+
tf: float,
113+
rng_seed: int,
114+
alpha: npt.NDArray[np.float64],
115+
beta0: npt.NDArray[np.float64],
116+
beta1: npt.NDArray[np.float64],
117+
A0: npt.NDArray[np.float64],
118+
B0: npt.NDArray[np.float64],
119+
A1: npt.NDArray[np.float64],
120+
B1: npt.NDArray[np.float64],
121+
B2: npt.NDArray[np.float64],
122+
):
123+
"""
124+
Generic s-stage SRK integrator for vector SDE:
125+
dX = f(t,X) dt + sum_k g_list[k](t,X) dW_k
126+
127+
Method described in Tang & Xiao.
128+
"""
129+
130+
def wrapped_f(full_state):
131+
t = full_state[0]
132+
x = full_state[1:]
133+
return np.concatenate([[1], f(t, x)])
134+
135+
wrapped_g_list = []
136+
for g in g_list:
137+
def wrapped_g(full_state, g=g):
138+
t = full_state[0]
139+
x = full_state[1:]
140+
return np.concatenate([[0], g(t, x)])
141+
wrapped_g_list.append(wrapped_g)
142+
143+
s = alpha.size
144+
n = x0.size
145+
m = len(g_list)
146+
147+
nsteps = int(np.floor(tf / dt))
148+
149+
x = np.zeros([nsteps+1, n+1], dtype=float)
150+
x[0,0] = 0
151+
x[0,1:] = x0
152+
153+
rng = svIntegrators.SRKRandomVariableGenerator()
154+
rng.setSeed(rng_seed)
155+
156+
for step in range(nsteps):
157+
t = x[step,0]
158+
X = x[step,:].copy()
159+
160+
# generate random variables
161+
rvs: svIntegrators.SRKRandomVariables = rng.generate(m, dt)
162+
Ik: List[List[float]] = rvs.Ik
163+
Ikl: List[List[float]] = rvs.Ikl
164+
xi: float = rvs.xi
165+
166+
# allocate stage arrays
167+
H0 = [X.copy() for _ in range(s)]
168+
Hk = [[X.copy() for _ in range(s)] for _ in range(m)]
169+
170+
# compute H0 stages
171+
for i in range(s):
172+
sumA = np.zeros(n+1)
173+
sumB = np.zeros(n+1)
174+
for j in range(s):
175+
sumA += A0[i, j] * wrapped_f(H0[j]) * dt
176+
for k in range(m):
177+
sumB += B0[i, j] * wrapped_g_list[k](Hk[k][j]) * Ik[k]
178+
H0[i] = X + sumA + sumB
179+
180+
# compute Hk stages
181+
for k in range(m):
182+
for i in range(s):
183+
sumA = np.zeros(n+1)
184+
sumB1 = np.zeros(n+1)
185+
sumB2 = np.zeros(n+1)
186+
for j in range(s):
187+
sumA += A1[i, j] * wrapped_f(H0[j]) * dt
188+
sumB1 += B1[i, j] * wrapped_g_list[k](Hk[k][j]) * xi
189+
for l in range(m):
190+
if l != k:
191+
sumB2 += B2[i, j] * wrapped_g_list[l](Hk[k][i]) * Ikl[k][l]
192+
Hk[k][i] = X + sumA + sumB1 + sumB2
193+
194+
# combine increments
195+
drift = np.zeros(n+1)
196+
for i in range(s):
197+
drift += alpha[i] * f(t, H0[i]) * dt
198+
199+
diffusion = np.zeros(n+1)
200+
for k in range(m):
201+
for i in range(s):
202+
diffusion += beta0[i] * wrapped_g_list[k](Hk[k][i]) * Ik[k]
203+
diffusion += beta1[i] * wrapped_g_list[k](Hk[k][i]) * Ikl[k][k]
204+
205+
x[step+1,:] = X + drift + diffusion
206+
207+
return x
208+
209+
class ExponentialSystem:
210+
"""A simple system with one state: dx/dt = x*t."""
211+
212+
x0 = np.array([1])
213+
214+
@staticmethod
215+
def f(t: float, x: npt.NDArray[np.float64]):
216+
return np.array([t*x[0]])
217+
218+
g = []
219+
220+
class Example1System:
221+
"""Example 1 dynamical system in:
222+
223+
Tang, X., Xiao, A. Efficient weak second-order stochastic Runge-Kutta methods
224+
for Itô stochastic differential equations. Bit Numer Math 57, 241-260 (2017).
225+
https://doi.org/10.1007/s10543-016-0618-9
226+
"""
227+
x0 = np.array([1, 1])
228+
229+
@staticmethod
230+
def f(t: float, x: npt.NDArray[np.float64]):
231+
y1, y2 = x
232+
return np.array([
233+
-273/512*y1,
234+
-1/160*y1 - (785/512 - np.sqrt(2)/8)*y2
235+
])
236+
237+
@staticmethod
238+
def g1(t: float, x: npt.NDArray[np.float64]):
239+
y1, y2 = x
240+
return np.array([
241+
1/4*y1,
242+
(1 - 2*np.sqrt(2)/8)/4*y2
243+
])
244+
245+
@staticmethod
246+
def g2(t: float, x: npt.NDArray[np.float64]):
247+
y1, y2 = x
248+
return np.array([
249+
1/16*y1,
250+
1/10*y1 + 1/16*y2
251+
])
252+
253+
g = [g1, g2]
254+
255+
def get_basilisk_sim(dt: float, x0: npt.NDArray[np.float64], f: DynFunc, g: List[DynFunc], seed: int):
256+
257+
# Declared inside, since StatefulSysModel may be undefined if not running with mujoco
258+
class GenericStochasticStateModel(StatefulSysModel.StatefulSysModel):
259+
260+
def __init__(self, x0: npt.NDArray[np.float64], f: DynFunc, g: List[DynFunc]):
261+
super().__init__()
262+
self.x0 = x0
263+
self.f = f
264+
self.g = g
265+
266+
def registerStates(self, registerer: StatefulSysModel.DynParamRegisterer):
267+
"""Called once during InitializeSimulation"""
268+
# We get one noise source per diffusion function g:
269+
m = len(self.g)
270+
n = self.x0.size
271+
272+
self.states: List[dynParamManager.StateData] = []
273+
for i in range(n):
274+
self.states.append( registerer.registerState(1, 1, f"y{i+1}") )
275+
self.states[-1].setNumNoiseSources(m)
276+
self.states[-1].setState([[self.x0[i]]])
277+
278+
# We want every noise source to be shared between states
279+
for k in range(m):
280+
registerer.registerSharedNoiseSource([
281+
(state, k)
282+
for state in self.states
283+
])
284+
285+
def UpdateState(self, CurrentSimNanos: int):
286+
"""Called at every integrator step"""
287+
m = len(self.g)
288+
n = self.x0.size
289+
290+
t = macros.NANO2SEC * CurrentSimNanos
291+
292+
# Collect current state into a numpy array
293+
x = np.array([state.getState()[0][0] for state in self.states])
294+
295+
# Compute f and store in the derivative of the states
296+
deriv = self.f(t, x)
297+
for i in range(n):
298+
self.states[i].setDerivative( [[deriv[i]]] )
299+
300+
# Compute g_k and store in the diffusion of the states
301+
for k in range(m):
302+
diff = self.g[k](t, x)
303+
for i in range(n):
304+
self.states[i].setDiffusion( [[diff[i]]], index=k )
305+
306+
# Create sim, process, and task
307+
scSim = SimulationBaseClass.SimBaseClass()
308+
dynProcess = scSim.CreateNewProcess("test")
309+
dynProcess.addTask(scSim.CreateNewTask("test", macros.sec2nano(dt)))
310+
311+
scene = mujoco.MJScene("<mujoco/>") # empty scene, no multi-body dynamics
312+
scSim.AddModelToTask("test", scene)
313+
314+
integratorObject = svIntegrators.svStochIntegratorW2Ito1(scene)
315+
scene.setIntegrator(integratorObject)
316+
317+
stateModel = GenericStochasticStateModel(x0, f, g)
318+
stateModel.ModelTag = "testModel"
319+
320+
scene.AddModelToDynamicsTask(stateModel)
321+
322+
scSim.InitializeSimulation()
323+
324+
return scSim, stateModel, integratorObject
325+
326+
@pytest.mark.skipif(not couldImportMujoco, reason="Compiled Basilisk without --mujoco")
327+
def test_deterministic():
328+
329+
dt = 1
330+
tf = 1
331+
seed = 10
332+
333+
system = ExponentialSystem
334+
335+
scSim, stateModel, integratorObject = get_basilisk_sim(dt, system.x0, system.f, system.g, seed)
336+
scSim.ConfigureStopTime( macros.sec2nano(tf) )
337+
scSim.ExecuteSimulation()
338+
339+
x1Basilisk = stateModel.states[0].getState()[0][0]
340+
341+
xPython = srk2_integrate(system.f, system.g, system.x0, dt, tf, seed, **W2_Ito1)
342+
x1Python = xPython[1,:]
343+
344+
x1expected = np.exp( tf**2 / 2 )
345+
346+
raise NotImplementedError("WIP")
347+
348+
if __name__ == "__main__":
349+
if True:
350+
test_deterministic()
351+
else:
352+
pytest.main([__file__])

0 commit comments

Comments
 (0)