import numpy as np
import copy
import time
import datetime
import json
from velocity_optimization.src.VelQP import VelQP
[docs]def online_qp(velqp: VelQP,
v_ini: float,
kappa: np.ndarray,
delta_s: np.ndarray,
P_max: np.array = None,
ax_max: np.array = None,
ay_max: np.array = None,
x0_v: np.ndarray = None,
v_max: np.ndarray = None,
v_end: float = None,
F_ini: float = None,
s_glob: float = None,
v_max_cstr: np.ndarray = None) -> tuple:
"""
Python version: 3.5
Created by: Thomas Herrmann (thomas.herrmann@tum.de)
Created on: 01.11.2019
Documentation: Creates an SQP that optimizes a velocity profile for a given path.
Inputs:
velqp: QP solver object used within the SQP
v_ini: initial velocity hard constraint [m/s]
kappa: kappa profile of given path [rad/m]
delta_s: discretization step length [m]
P_max: max. allowed power [kW]
ax_max: max. allowed longitudinal acceleration [m/s^2]
ay_max: max. allowed ongitudial acceleration [m/s^2]
x0_v: initial guess of optimal velocity [m/s]
v_max: max. should velocity (objective function) [m/s]
v_end: constrained end velocity in optimization horizon [m/s]
F_ini: initial force constraint [kN]
s_glob: global s coordinate of current vehicle position [m]
v_max_cstr: max. must velocity (hard constraint) [m/s]
Outputs:
v_op: optimized velocity using OSQP as QP solver [m/s]
s_t_op: optimized slack values [-]
qp_status: status of last QP within SQP [-]
"""
# --- Steplength reduction parameter for Armijo rule
beta = 2 / 4
# --- Initialization of logging variables
x0_v_log = None
x0_s_t_log = None
kappa_log = None
delta_s_log = None
ax_max_log = None
ay_max_log = None
v_ini_log = None
v_max_log = None
v_end_log = None
F_ini_log = None
####################################################################################################################
# --- Preparing input parameters for SQP
####################################################################################################################
# --- Emergency SQP
# upper bound for last velocity entry depending on current velocity
if velqp.sid == 'EmergSQP':
# --- Upper bound for last velocity point
v_end = 0.4
v_max = np.array([v_ini - (x + 1) * 4 for x in range(velqp.sqp_stgs['m'])])
# set all values below threshold to threshold
v_max[v_max < 0.0] = 0.0
# --- Assume linear velocity decrease to v_end from current velocity
x0_v = np.array(
[v_ini + x * (v_max[-1] - v_ini) / velqp.sqp_stgs['m'] for x in range(velqp.sqp_stgs['m'])])
# Initialize slack variables with zero values
x0_s_t = np.array([0] * velqp.n)
# Overwrite None
F_ini = 0
s_glob = 0
# --- /Emergency SQP
# --- Performance SQP
else:
x0_s_t = np.array([0] * velqp.n)
# --- /Performance SQP
# --- Make sure to always have a numeric velocity > 0.1
if v_ini < velqp.sym_sc_['vmin_mps_']:
v_ini = velqp.sym_sc_['vmin_mps_']
x0_v[0] = v_ini
# --- Initialization of optimization variables
# velocity [m/s]
v_op = np.zeros(velqp.sqp_stgs['m'], )
# slack variable on tires
s_t_op = np.zeros(velqp.n, )
qp_iter = 0
qp_status = 0
# SQP mean-error
err = np.inf
# SQP infinity-error
err_inf = np.inf
# SQP-counter
n_sqp = 0
# SQP-timer
dt = 0
# time limit of SQP loop [s]
dt_lim = velqp.sqp_stgs['t_sqp_max']
# --- Save inputs to SQP here for logging purpose
x0_v_log = copy.deepcopy(x0_v.tolist())
x0_s_t_log = copy.deepcopy(x0_s_t.tolist())
kappa_log = kappa.tolist()
delta_s_log = delta_s.tolist()
v_ini_log = v_ini
try:
v_max_log = copy.deepcopy(v_max.tolist())
except AttributeError:
v_max_log = copy.deepcopy(v_max)
v_end_log = v_end
F_ini_log = F_ini
if isinstance(ax_max, np.ndarray) and isinstance(ay_max, np.ndarray):
ax_max_log = ax_max.tolist()
ay_max_log = ay_max.tolist()
else:
ax_max_log = velqp.sym_sc_['axmax_mps2_']
ay_max_log = velqp.sym_sc_['aymax_mps2_']
if isinstance(P_max, np.ndarray):
Pmax_log = P_max.tolist()
else:
Pmax_log = velqp.sym_sc_['Pmax_kW_']
# --- Start SQP-loop
t_start = time.time()
while (err > velqp.err or err_inf > velqp.err_inf) and n_sqp < velqp.sqp_stgs['n_sqp_max'] and dt < dt_lim:
# --- Update parameters of QP
if len(x0_v) is not int(velqp.sqp_stgs['m']):
print("Error in x0-length in ", velqp.sid)
print(x0_v)
if len(v_max) is not int(velqp.sqp_stgs['m']):
print("Error in v_max-length in ", velqp.sid)
print(v_max)
# --- Update QP matrices
velqp.osqp_update_online(x0_v=x0_v,
x0_s_t=x0_s_t,
v_ini=v_ini,
v_max=v_max,
v_end=v_end,
F_ini=F_ini,
kappa=kappa,
delta_s=delta_s,
P_max=P_max,
ax_max=ax_max,
ay_max=ay_max,
vmax_cstr=v_max_cstr)
# --- Solve the QP
sol, qp_iter, qp_status = velqp.osqp_solve()
# --- Check primal infeasibility of problem and return v = 0
if qp_status == -3:
break
# --- Store solution from previous SQP-iteration
o_old = np.append(v_op, x0_s_t)
# --- Store errors from previous SQP-iteration
err_old = err
err_inf_old = err_inf
try:
############################################################################################################
# --- Armijo: decrease steplength alpha if SQP-error increases between iterations
############################################################################################################
k = 0 # counter for Armijo-loop
while True:
# --- Choose a steplength
alpha = beta ** k # steplength e {1, beta, beta^2, ...}
# --- Restructure solution from QP-solution
# Add leading "0" as ini-velocity must be kept constant as given
v_op = alpha * np.insert(sol[0:velqp.m - 1], 0, 0) + x0_v
s_t_op = alpha * sol[velqp.m - 1:] + x0_s_t
# --- Calculate SQP iteration error
o = np.append(v_op, s_t_op)
err = np.sqrt(np.matmul(o - o_old, o - o_old)) / o.shape[0]
err_inf = np.max(np.abs(o - o_old))
# --- Break Armijo-loop in case a suitable steplength alpha was found
if err < err_old and err_inf < err_inf_old:
break
# --- Increase Armijo-loop's counter and restart loop
else:
k += 1
if velqp.sqp_stgs['b_print_sqp_alpha']:
print(velqp.sid + " | alpha: " + str(alpha))
############################################################################################################
# --- Postprocessing Armijo-loop: Create new operating point for optimization variables
############################################################################################################
# --- Create new operating-point for velocity variables
x0_v = v_op
# --- Create new operating-point for tire slack variables
x0_s_t = s_t_op
except TypeError:
# --- Do different initialization till n_sqp_max is reached
if velqp.sid == 'EmergSQP':
x0_v = (v_ini - 0.05) * np.ones((velqp.m, ))
v_op = np.zeros(velqp.sqp_stgs['m'], )
x0_s_t = np.zeros(velqp.n, )
s_t_op = np.zeros(velqp.n, )
print("No solution for emerg. line found. Retrying with different initialization ...")
# Reset SQP-counter
n_sqp = 0
if not velqp.sqp_stgs['b_online_mode']:
print('Optimized velocity profile: ', v_op[0:velqp.sqp_stgs['m']])
if velqp.sqp_stgs['obj_func'] == 'slacks':
print('Slacks on velocity: ', v_op[velqp.sqp_stgs['m']:2 * velqp.sqp_stgs['m']])
if velqp.sqp_stgs['b_print_sqp_err']:
print(velqp.sid + " | SQP err: " + str(err))
print(velqp.sid + " | SQP inf.-err: " + str(err_inf))
################################################################################################################
# --- Check termination criteria for SQP-loop
################################################################################################################
# increase SQP-iteration counter
n_sqp += 1
if n_sqp >= velqp.sqp_stgs['n_sqp_max']:
print(velqp.sid + " reached max. SQP iteration-number!")
# update timer
dt = time.time() - t_start
if dt >= dt_lim:
print(velqp.sid + " took too long!")
if velqp.sqp_stgs['b_print_SQP_runtime']:
print(velqp.sid + " | SQP time [ms]: " + str(dt * 1000))
# Only write to log-file after SQP-iterations
if velqp.sid == 'PerfSQP' and velqp.logger_perf is not None:
velqp.logger_perf.debug('%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s',
str(datetime.datetime.now().time()),
s_glob,
json.dumps(x0_v_log),
json.dumps(x0_s_t_log),
json.dumps(kappa_log),
json.dumps(delta_s_log),
v_ini_log,
json.dumps(v_max_log),
v_end_log,
F_ini_log,
Pmax_log,
json.dumps(qp_iter),
json.dumps(qp_status),
json.dumps(dt * 1000))
velqp.logger_perf.debug('%s', v_op.tolist())
velqp.logger_perf.debug('%s', s_t_op.tolist())
velqp.logger_perf.debug('%s;%s', ax_max_log, ay_max_log)
elif velqp.sid == 'EmergSQP' and velqp.logger_emerg is not None:
velqp.logger_emerg.debug('%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s;%s',
str(datetime.datetime.now().time()),
s_glob,
json.dumps(x0_v_log),
json.dumps(x0_s_t_log),
json.dumps(kappa_log),
json.dumps(delta_s_log),
v_ini_log,
json.dumps(v_max_log),
v_end_log,
F_ini_log,
Pmax_log,
json.dumps(qp_iter),
json.dumps(qp_status),
json.dumps(dt * 1000))
velqp.logger_emerg.debug('%s', v_op.tolist())
velqp.logger_emerg.debug('%s', s_t_op.tolist())
velqp.logger_emerg.debug('%s;%s', ax_max_log, ay_max_log)
if velqp.sqp_stgs['b_trajectory_check']:
ax_norm = np.abs(kappa[0:velqp.sqp_stgs['m'] - 1] * v_op[0:velqp.sqp_stgs['m'] - 1] ** 2) / \
velqp.sym_sc_['axmax_mps2_']
ay_norm = np.abs(
(v_op[1:velqp.sqp_stgs['m']] ** 2 - v_op[0:velqp.sqp_stgs['m'] - 1] ** 2) / (2 * np.array(delta_s))
+ (velqp.sym_sc_['c_res_'] * v_op[0:velqp.sqp_stgs['m'] - 1] ** 2) / (1000 * velqp.sym_sc_['m_t_'])) / \
velqp.sym_sc_['aymax_mps2_']
perf_check = (ax_norm + ay_norm) > 1
if perf_check[:-1].any():
print(ax_norm + ay_norm)
print('*** SQP: Trajectory not OK! ', velqp.sid, ' ***')
if velqp.sqp_stgs['b_print_n_sqp']:
print(velqp.sid + ' | nSQP ' + str(n_sqp))
if velqp.sqp_stgs['b_print_s_v_val']:
print(velqp.sid + ' | s_v_tires ' + str(x0_s_t))
if velqp.sqp_stgs['b_print_J']:
print("(v - v_max) ** 2", np.sum((v_op - v_max) ** 2))
print("Tre. slacks", velqp.sym_sc_['s_tre_w_'] * np.sum(s_t_op ** 2))
return v_op, s_t_op, qp_status