from typing import Tuple
import numpy as np
from numba import njit
from CyRK.nb.dop_coefficients import (
A as A_DOP, B as B_DOP, C as C_DOP, E3 as E3_DOP, E5 as E5_DOP, D as D_DOP,
N_STAGES as N_STAGES_DOP, N_STAGES_EXTENDED as N_STAGES_EXTENDED_DOP, ORDER as ORDER_DOP,
ERROR_ESTIMATOR_ORDER as ERROR_ESTIMATOR_ORDER_DOP)
from CyRK.nb.nb_storage import WrapNBRKResult
# Optimizations
EMPTY_ARR = np.empty(0, dtype=np.float64)
EPS = np.finfo(np.float64).eps
EPS_10 = 10. * EPS
EPS_100 = 100. * EPS
INF = np.inf
# Diffeq Solver Settings
# Multiply steps computed from asymptotic behaviour of errors by this.
SAFETY = 0.9
MIN_FACTOR = 0.2 # Minimum allowed decrease in a step size.
MAX_FACTOR = 10. # Maximum allowed increase in a step size.
RK23_order = 3
RK23_error_estimator_order = 2
RK23_n_stages = 3
RK23_C = np.array([0, 1 / 2, 3 / 4], order='C', dtype=np.float64)
RK23_A = np.array(
[
[0, 0, 0],
[1 / 2, 0, 0],
[0, 3 / 4, 0]
], order='C', dtype=np.float64
)
RK23_B = np.array([2 / 9, 1 / 3, 4 / 9], order='C', dtype=np.float64)
RK23_E = np.array([5 / 72, -1 / 12, -1 / 9, 1 / 8], order='C', dtype=np.float64)
RK23_P = np.array(
[[1, -4 / 3, 5 / 9],
[0, 1, -2 / 3],
[0, 4 / 3, -8 / 9],
[0, -1, 1]], order='C', dtype=np.float64
)
RK45_order = 5
RK45_error_estimator_order = 4
RK45_n_stages = 6
RK45_C = np.array([0, 1 / 5, 3 / 10, 4 / 5, 8 / 9, 1], order='C', dtype=np.float64)
RK45_A = np.array(
[
[0, 0, 0, 0, 0],
[1 / 5, 0, 0, 0, 0],
[3 / 40, 9 / 40, 0, 0, 0],
[44 / 45, -56 / 15, 32 / 9, 0, 0],
[19372 / 6561, -25360 / 2187, 64448 / 6561, -212 / 729, 0],
[9017 / 3168, -355 / 33, 46732 / 5247, 49 / 176, -5103 / 18656]
], order='C', dtype=np.float64
)
RK45_B = np.array([35 / 384, 0, 500 / 1113, 125 / 192, -2187 / 6784, 11 / 84], order='C', dtype=np.float64)
RK45_E = np.array(
[-71 / 57600, 0, 71 / 16695, -71 / 1920, 17253 / 339200, -22 / 525,
1 / 40], order='C', dtype=np.float64
)
RK45_P = np.array(
[
[1, -8048581381 / 2820520608, 8663915743 / 2820520608,
-12715105075 / 11282082432],
[0, 0, 0, 0],
[0, 131558114200 / 32700410799, -68118460800 / 10900136933,
87487479700 / 32700410799],
[0, -1754552775 / 470086768, 14199869525 / 1410260304,
-10690763975 / 1880347072],
[0, 127303824393 / 49829197408, -318862633887 / 49829197408,
701980252875 / 199316789632],
[0, -282668133 / 205662961, 2019193451 / 616988883, -1453857185 / 822651844],
[0, 40617522 / 29380423, -110615467 / 29380423, 69997945 / 29380423]], order='C', dtype=np.float64
)
@njit(cache=False)
def _norm(x):
return np.linalg.norm(x) / np.sqrt(x.size)
[docs]
@njit(cache=False, fastmath=False)
def nbsolve_ivp(
diffeq: callable,
t_span: Tuple[float, float],
y0: np.ndarray,
args: tuple = tuple(),
rtol: float = 1.e-3,
atol: float = 1.e-6,
rtols: np.ndarray = EMPTY_ARR,
atols: np.ndarray = EMPTY_ARR,
max_step: float = np.inf,
first_step: float = None,
rk_method: int = 1,
t_eval: np.ndarray = EMPTY_ARR,
capture_extra: bool = False,
interpolate_extra: bool = False,
max_num_steps: int = 0,
warnings: bool = True
):
""" A Numba-safe Runge-Kutta Integrator based on Scipy's solve_ivp RK integrator [1]_, [2]_.
Parameters
----------
diffeq : callable
An njit-compiled function that defines the derivatives of the problem.
t_span : Tuple[float, float]
A tuple of the beginning and end of the integration domain's dependent variables.
y0 : np.ndarray
1D array of the initial values of the problem at t_span[0]
args : tuple = tuple()
Any additional arguments that are passed to dffeq.
rtol : float = 1.e-3
Integration relative tolerance used to determine optimal step size.
atol : float = 1.e-6
Integration absolute tolerance used to determine optimal step size.
rtols : np.ndarray = EMPTY_ARR
Array of relative tolerances (size of y0).
atols : np.ndarray = EMPTY_ARR
Array of absolute tolerances (size of y0).
max_step : float = np.inf
Maximum allowed step size.
first_step : float = None
Initial step size. If `None`, then the function will attempt to determine an appropriate initial step.
rk_method : int = 1
The type of RK method used for integration
0 = RK23
1 = RK45
2 = DOP853
t_eval : np.ndarray = None
If provided, then the function will interpolate the integration results to provide them at the
requested t-steps.
capture_extra : bool = False
If True, then extra output will be captured from the differential equation.
See CyRK's Documentation/Extra Output.md for more information
interpolate_extra : bool = False
If True, then extra output will be interpolated (along with y) at t_eval. Otherwise, y will be interpolated
and then differential equation will be called to find the output at each t in t_eval.
max_num_steps : int = 0
Maximum number of steps integrator is allowed to take.
If set to 0 (the default) then an infinite number of steps are allowed.
warnings : bool = True
If True, then warnings will be raised which can slow down integration.
References
----------
.. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
Equations I: Nonstiff Problems", Sec. II.
.. [2] `Page with original Fortran code of DOP853
<http://www.unige.ch/~hairer/software.html>`_.
Returns
-------
time_domain : np.ndarray
The final time domain. This is equal to t_eval if it was provided.
y_results : np.ndarray
The solution of the differential equation provided for each time_result.
If `capture_extra` was set to True then this will output both y and any extra parameters calculated by the
differential equation. The format of this output will look like::
y_results[0:y_size, :] = ... # Actual y-results calculated by the diffeq solver
y_results[y_size:extra_size, :] = ... # Extra outputs captured alongside y during integration
success : bool
Final integration success flag.
message : str
Any integration messages, useful if success=False.
"""
if warnings:
print("DEPRECATION WARNING! This version of `nbsolve_ivp` will be deprecated in a future release in favor of the current `nbsolve2_ivp` (names will be swapped). These two functions are quite different so please review the documentation at https://cyrk.readthedocs.io/en/latest/Numba.html before upgrading.")
print("IMPORTANT!! Printing this warning is slowing down your integration! Disable it by setting `warnings=False` in `nbsolve_ivp`.")
# Clean up and interpret inputs
t_start = t_span[0]
t_end = t_span[1]
direction = np.sign(t_end - t_start) if t_end != t_start else 1
direction_inf = direction * np.inf
y0 = np.asarray(y0)
y_size = y0.size
y_size_sqrt = np.sqrt(y_size)
dtype = y0.dtype
t_eval_size = t_eval.size
run_interpolation = t_eval_size > 0
store_extras_during_integration = capture_extra
if run_interpolation and not interpolate_extra:
# If y is eventually interpolated but the extra outputs are not being interpolated, then there is
# no point in storing the values during the integration. Turn off this functionality to save
# on computation
store_extras_during_integration = False
# If extra output is true then the output of the diffeq will be larger than the size of y0.
# determine that extra size by calling the diffeq and checking its size.
extra_start = y_size
extra_size = 0
total_size = y_size
if capture_extra:
output_ = np.asarray(diffeq(t_start, y0, *args), dtype=dtype)
total_size = output_.size
extra_size = total_size - y_size
extra_start = y_size
# Extract the extra output from the function output.
y0_plus_extra = np.empty(total_size, dtype=dtype)
for i in range(total_size):
if i < extra_start:
# Pull from y0
y0_plus_extra[i] = y0[i]
else:
# Pull from extra output
y0_plus_extra[i] = output_[i]
if store_extras_during_integration:
y0_to_store = y0_plus_extra
store_loop_size = total_size
else:
y0_to_store = y0
store_loop_size = y_size
else:
y0_to_store = y0
store_loop_size = y_size
extra_result = np.empty(extra_size, dtype=dtype)
if store_extras_during_integration:
y_result_store = np.empty(total_size, dtype=dtype)
else:
y_result_store = np.empty(y_size, dtype=dtype)
# Containers to store results during integration
time_domain_list = [t_start]
y_result_list = [np.copy(y0_to_store)]
# Integrator Status Codes
status = 0
message = "Integration is/was ongoing (perhaps it was interrupted?)."
# Determine RK constants
if rk_method == 0:
# RK23 Method
rk_order = RK23_order
error_order = RK23_error_estimator_order
rk_n_stages = RK23_n_stages
rk_n_stages_plus1 = rk_n_stages + 1
C = RK23_C
A = RK23_A
B = RK23_B
E = np.asarray(RK23_E, dtype=dtype)
# TODO: Used in dense output calculation. Not needed until that is implemented.
# P = RK23_P
# Set these unused variables to E to avoid variable not set check
E3 = E
E5 = E
# Initialize RK-K variable
K = np.empty((rk_n_stages_plus1, y_size), dtype=dtype)
elif rk_method == 1:
# RK45 Method
rk_order = RK45_order
error_order = RK45_error_estimator_order
rk_n_stages = RK45_n_stages
rk_n_stages_plus1 = rk_n_stages + 1
C = RK45_C
A = RK45_A
B = RK45_B
E = np.asarray(RK45_E, dtype=dtype)
# TODO: Used in dense output calculation. Not needed until that is implemented.
# P = RK45_P
# Set these unused variables to E to avoid variable not set check
E3 = E
E5 = E
# Initialize RK-K variable
K = np.empty((rk_n_stages_plus1, y_size), dtype=dtype)
elif rk_method == 2:
# DOP853
rk_order = ORDER_DOP
error_order = ERROR_ESTIMATOR_ORDER_DOP
rk_n_stages = N_STAGES_DOP
rk_n_stages_plus1 = rk_n_stages + 1
A = A_DOP[:rk_n_stages, :rk_n_stages]
B = B_DOP
C = C_DOP[:rk_n_stages]
E3 = np.asarray(E3_DOP, dtype=dtype)
E5 = np.asarray(E5_DOP, dtype=dtype)
# TODO: Used in dense output calculation. Not needed until that is implemented.
# D = np.asarray(D_DOP, dtype=dtype)
# A_EXTRA = np.asarray(A_DOP[rk_n_stages + 1:], dtype=dtype)
# C_EXTRA = np.asarray(C_DOP[rk_n_stages + 1:], dtype=dtype)
# Set these unused variables to E to avoid variable not set check
E = E3
# Initialize RK-K variable
K_extended = np.empty((N_STAGES_EXTENDED_DOP, y_size), dtype=dtype)
K = np.ascontiguousarray(K_extended[:rk_n_stages_plus1, :])
else:
status = -8
message = "Attribute error."
raise AttributeError(
'Unexpected rk_method provided. Currently supported versions are:\n'
'\t0 = RK23\n'
'\t1 = RK34\n'
'\t2 = DOP853')
# Recast some constants into the correct dtype, so they can be used with y0.
A = np.asarray(A, dtype=dtype)
B = np.asarray(B, dtype=dtype)
# Determine some RK-related constants
len_C = len(C)
A_at_10 = A[1, 0]
error_expo = 1. / (error_order + 1.)
# Setup tolerances
rtol_array = np.empty(y_size, dtype=np.float64)
rtol_size = rtols.size
if rtol_size > 0:
if rtol_size != y_size:
raise AttributeError('rtols must be the same size as y0.')
for i in range(y_size):
rtol_ = rtols[i]
if rtol_ < (100 * EPS):
rtol_ = (100 * EPS)
rtol_array[i] = rtol_
else:
if rtol < (100 * EPS):
rtol = (100 * EPS)
for i in range(y_size):
rtol_array[i] = rtol
atol_array = np.empty(y_size, dtype=np.float64)
atol_size = atols.size
if atol_size > 0:
if atol_size != y_size:
raise AttributeError('atols must be the same size as y0.')
for i in range(y_size):
atol_array[i] = atols[i]
else:
for i in range(y_size):
atol_array[i] = atol
# Determine maximum number of steps
if max_num_steps == 0:
use_max_steps = False
elif max_num_steps < 0:
raise AttributeError('Negative number of max steps provided.')
else:
use_max_steps = True
# Initialize variables for start of integration
t_old = t_start
t_new = t_start
y_new = np.empty_like(y0)
y_old = np.empty_like(y0)
dydt_old = np.empty_like(y0)
dydt_new = np.empty_like(y0)
for i in range(y_size):
y0_i = y0[i]
y_new[i] = y0_i
y_old[i] = y0_i
output = np.asarray(diffeq(t_new, y_new, *args), dtype=dtype)
for i in range(y_size):
dydt_old[i] = output[i]
# Find first step size
first_step_found = False
if first_step is not None:
step_size = max_step
if first_step < 0.:
status = -8
message = "Attribute error."
raise AttributeError('Error in user-provided step size: Step size must be a positive number.')
elif first_step > np.abs(t_end - t_start):
status = -8
message = "Attribute error."
raise AttributeError('Error in user-provided step size: Step size can not exceed bounds.')
elif first_step != 0.:
step_size = first_step
first_step_found = True
if not first_step_found:
# Select an initial step size based on the differential equation.
# .. [1] E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential
# Equations I: Nonstiff Problems", Sec. II.4.
if y_size == 0:
step_size = INF
else:
# Take the norm of d0 and d1
d0 = 0.
d1 = 0.
for i in range(y_size):
scale = atol_array[i] + np.abs(y_old[i]) * rtol_array[i]
d0_abs = np.abs(y_old[i] / scale)
d1_abs = np.abs(dydt_old[i] / scale)
d0 += (d0_abs * d0_abs)
d1 += (d1_abs * d1_abs)
d0 = np.sqrt(d0) / y_size_sqrt
d1 = np.sqrt(d1) / y_size_sqrt
if d0 < 1.e-5 or d1 < 1.e-5:
h0 = 1.e-6
else:
h0 = 0.01 * d0 / d1
y1 = y_old + h0 * direction * dydt_old
t1 = t_old + h0 * direction
# Use the differential equation to estimate the first step size
diffeq_output = np.asarray(diffeq(t1, y1, *args), dtype=dtype)
for i in range(y_size):
dydt_new[i] = diffeq_output[i]
d2 = 0.
for i in range(y_size):
scale = atol_array[i] + np.abs(y_old[i]) * rtol_array[i]
d2_abs = np.abs((dydt_new[i] - dydt_old[i]) / scale)
d2 += (d2_abs * d2_abs)
d2 = np.sqrt(d2) / (h0 * y_size_sqrt)
if d1 <= 1.e-15 and d2 <= 1.e-15:
h1 = max(1.e-6, h0 * 1.e-3)
else:
h1 = (0.01 / max(d1, d2))**error_expo
next_after = 10. * abs(np.nextafter(t_old, direction * np.inf) - t_old)
step_size = max(next_after, min(100. * h0, h1))
# Main integration loop
if y_size == 0:
status = -6
message = "Integration never started: y-size is zero."
# # Time Loop
len_t = 1 # Already one time step due to initial conditions.
while status == 0:
if t_new == t_end:
t_old = t_end
t_new = t_end
status = 1
break
if use_max_steps:
if len_t > max_num_steps:
status = -7
message = "Maximum number of steps (set by user) exceeded during integration."
break
# Run RK integration step
# Determine step size based on previous loop
# Find minimum step size based on the value of t (less floating point numbers between numbers when t is large)
next_after = 10. * abs(np.nextafter(t_old, direction * np.inf) - t_old)
min_step = next_after
# Look for over/undershoots in previous step size
if step_size > max_step:
step_size = max_step
elif step_size < min_step:
step_size = min_step
# Determine new step size
step_accepted = False
step_rejected = False
step_error = False
# # Step Loop
while not step_accepted:
if step_size < min_step:
step_error = True
break
# Move time forward for this particular step size
step = step_size * direction
t_new = t_old + step
# Check that we are not at the end of integration with that move
if direction * (t_new - t_end) > 0:
t_new = t_end
# Correct the step if we were at the end of integration
step = t_new - t_old
step_size = np.abs(step)
# Calculate derivative using RK method
# Dot Product (K, a) * step
for s in range(1, len_C):
time_ = t_old + C[s] * step
# Dot Product (K, a) * step
if s == 1:
for i in range(y_size):
# Set the first column of K
temp_double_numeric = dydt_old[i]
K[0, i] = temp_double_numeric
# Calculate y_new for s==1
y_new[i] = y_old[i] + (temp_double_numeric * A_at_10 * step)
else:
for j in range(s):
A_at_sj = A[s, j] * step
for i in range(y_size):
if j == 0:
# Initialize
y_new[i] = y_old[i]
y_new[i] += K[j, i] * A_at_sj
# Update K with a new result from the differential equationC
diffeq_output = np.asarray(diffeq(time_, y_new, *args), dtype=dtype)
for i in range(y_size):
K[s, i] = diffeq_output[i]
# Dot Product (K, B) * step
for j in range(rk_n_stages):
temp = B[j] * step
# We do not use rk_n_stages_plus1 here because we are chopping off the last row of K to match
# the shape of B.
for i in range(y_size):
if j == 0:
# Initialize
y_new[i] = y_old[i]
y_new[i] += K[j, i] * temp
# Find final dydt for this timestep
diffeq_output = np.asarray(diffeq(t_new, y_new, *args), dtype=dtype)
for i in range(store_loop_size):
if i < extra_start:
# Set diffeq results
dydt_new[i] = diffeq_output[i]
else:
# Set extra results
extra_result[i - extra_start] = diffeq_output[i]
# Estimate error to change the step size for next step
if rk_method == 2:
# DOP853 error estimation
# Dot Product (K, E5) / Scale and (K, E3) / scale
error_norm3 = 0.
error_norm5 = 0.
for i in range(y_size):
# Check how well this step performed
scale = atol_array[i] + np.maximum(np.abs(y_old[i]), np.abs(y_new[i])) * rtol_array[i]
# Set last array of K equal to dydt
K[rk_n_stages, i] = dydt_new[i]
for j in range(rk_n_stages_plus1):
if j == 0:
# Initialize
error_dot_1 = 0.
error_dot_2 = 0.
temp = K[j, i]
error_dot_1 += temp * E3[j]
error_dot_2 += temp * E5[j]
error_norm3_abs = np.abs(error_dot_1) / scale
error_norm5_abs = np.abs(error_dot_2) / scale
error_norm3 += (error_norm3_abs * error_norm3_abs)
error_norm5 += (error_norm5_abs * error_norm5_abs)
# Check if errors are zero
if (error_norm5 == 0.) and (error_norm3 == 0.):
error_norm = 0.
else:
error_denom = error_norm5 + 0.01 * error_norm3
error_norm = step_size * error_norm5 / np.sqrt(error_denom * y_size)
else:
# Calculate Error for RK23 and RK45
error_norm = 0.
# Dot Product (K, E) * step / scale
for i in range(y_size):
# Check how well this step performed.
scale = atol_array[i] + max(np.abs(y_old[i]), np.abs(y_new[i])) * rtol_array[i]
# Set last array of K equal to dydt
K[rk_n_stages, i] = dydt_new[i]
for j in range(rk_n_stages_plus1):
if j == 0:
# Initialize
error_dot_1 = 0.
error_dot_1 += K[j, i] * E[j]
error_norm_abs = np.abs(error_dot_1)
error_norm_abs *= (step / scale)
error_norm += (error_norm_abs * error_norm_abs)
error_norm = np.sqrt(error_norm) / y_size_sqrt
if error_norm < 1.:
# The error is low! Let's update this step for the next time loop
if error_norm == 0.:
step_factor = MAX_FACTOR
else:
step_factor = min(
MAX_FACTOR,
SAFETY * error_norm**-error_expo
)
if step_rejected:
# There were problems with this step size on the previous step loop. Make sure factor does
# not exasperate them.
step_factor = min(step_factor, 1.)
step_size = step_size * step_factor
step_accepted = True
else:
step_size = step_size * max(
MIN_FACTOR,
SAFETY * error_norm**-error_expo
)
step_rejected = True
if step_error:
# Issue with step convergence
status = -1
message = "Error in step size calculation:\n\tRequired step size is less than spacing between numbers."
break
elif not step_accepted:
# Issue with step convergence
status = -7
message = "Error in step size calculation:\n\tError in step size acceptance."
break
# End of step loop. Update the _now variables
t_old = t_new
for i in range(y_size):
y_old[i] = y_new[i]
dydt_old[i] = dydt_new[i]
# Save data
# If there is extra outputs then we need to store those at this timestep as well.
for i in range(store_loop_size):
if i < extra_start:
# Pull from y result
y_result_store[i] = y_new[i]
else:
# Pull from extra
y_result_store[i] = extra_result[i - extra_start]
time_domain_list.append(t_new)
y_result_list.append(np.copy(y_result_store))
len_t += 1
# To match the format that scipy follows, we will take the transpose of y.
time_domain = np.empty(len_t, dtype=np.float64)
y_results = np.empty((store_loop_size, len_t), dtype=dtype)
for t_i in range(len_t):
time_domain[t_i] = time_domain_list[t_i]
y_results_list_at_t = y_result_list[t_i]
for y_i in range(store_loop_size):
y_results[y_i, t_i] = y_results_list_at_t[y_i]
success = False
if status == 1:
success = True
message = "Integration completed without issue."
if t_eval_size > 0:
old_status = status
status = 2
# User only wants data at specific points.
# The current version of this function has not implemented sicpy's dense output, so we must use an interpolation.
t_eval = np.asarray(t_eval, dtype=np.float64)
y_results_reduced = np.empty((total_size, t_eval.size), dtype=dtype)
# np.interp only works on 1D arrays, so we have to loop through each of the variables and call for each y:
for i in range(y_size):
y_results_reduced[i, :] = np.interp(t_eval, time_domain, y_results[i, :])
if capture_extra:
# Right now if there is any extra output then it is stored at each time step used in the RK loop.
# We have to make a choice on what to output do we, like we do with y, interpolate all of those extras?
# or do we use the interpolation on y to find new values.
# The latter method is more computationally expensive (recalls the diffeq for each y) but is more accurate.
if interpolate_extra:
# Continue the interpolation for the extra values.
for i in range(extra_size):
y_results_reduced[extra_start + i, :] = \
np.interp(t_eval, time_domain, y_results[extra_start + i, :])
else:
# Use y and t to recalculate the extra outputs
y_ = np.empty(y_size, dtype=dtype)
for t_i in range(t_eval_size):
t_ = t_eval[t_i]
for y_i in range(y_size):
y_[y_i] = y_results_reduced[y_i, t_i]
diffeq_output = np.asarray(diffeq(t_, y_, *args), dtype=dtype)
for i in range(extra_size):
y_results_reduced[extra_start + i, t_i] = diffeq_output[extra_start + i]
y_results = y_results_reduced
time_domain = t_eval
status = old_status
# Install result into solution named tuple
nbrk_result = WrapNBRKResult(success, message, time_domain.size, total_size, y0.size, status, y_results, time_domain)
return nbrk_result