Source code for velocity_optimization.opt_postproc.vis.VisBenchmarkLogs

import csv
import json
import numpy as np
import datetime
import time
from matplotlib import pyplot as plt
import linecache
from velocity_optimization.src.VelQP import VelQP
from velocity_optimization.opt_postproc.src.VOptIPOPT import VOptIPOPT
from velocity_optimization.opt_postproc.src.VOptQPOASES import VOpt_qpOASES
from velocity_optimization.opt_postproc.src.online_qp_postproc import online_qp_postproc
from velocity_optimization.opt_postproc.src.CalcObjective import CalcObjective
from velocity_optimization.opt_postproc.vis.VisGUI import VisVP_Logs_GUI
from velocity_optimization.opt_postproc.vis.VisObjectiveStatus import VisVP_ObjStatus
from velocity_optimization.opt_postproc.vis.VisRuntime import VisVP_Runtime
from velocity_optimization.opt_postproc.vis.VisGlobalVals import VisVP_GlobalVals
try:
    import tikzplotlib
except ImportError:
    print('Warning: No module tikzplotlib found. Not necessary on car but for development.')

# Font sizes
SMALL_SIZE = 14
MEDIUM_SIZE = 18
BIGGER_SIZE = 22

# Line Width
LW = 2


[docs]class VisVP_Logs: __slots__ = ('csv_name', 'csv_name_ltpl', 'vis_options', 'sol_options', 'sol_dict', 'm', 'sid', 'log_lines', 'row_count', 'calc_objective', 'runtimes', 'vis_gui', 'velqp', 'velqp_bench', 'vp_ipopt', 'vp_qpOASES', 'b_vis_triggered', 'dt_ipopt_arr', 'dt_sqp_qpoases_arr', 'P_max') def __init__(self, csv_name: str, csv_name_ltpl: str, params_path: str, input_path: str, vis_options: dict, sol_options: dict, m: int, sid: str = 'PerfSQP', log_lines: int = 4): """ Python version: 3.5 Created by: Thomas Herrmann (thomas.herrmann@tum.de) Created on: 01.02.2020 Modified by Tobias Klotz Documentation: Class doing the main handling of the visualization of the SQP velocity planner logging data. Inputs: csv_name: log data path & file csv_name_ltpl: raw lof file of ltpl module params_path: absolute path to folder containing config file .ini input_path: absolute path to folder containing variable vehicle and track information vis_options: user specified visualization options of the debugging tool m: number of velocity optimization variables sid: ID of the velocity planner 'PerfSQP' or 'EmergSQP' log_lines: number of lines in log files spanning one cohesive data block """ self.csv_name = csv_name self.csv_name_ltpl = csv_name_ltpl self.vis_options = vis_options self.sol_options = sol_options self.m = m self.sid = sid self.log_lines = log_lines with open(csv_name) as csvfile: self.row_count = sum(1 for row in csv.reader(csvfile, delimiter=';', lineterminator='\n')) # --- Create objective calculation object self.calc_objective = CalcObjective(csv_name=csv_name, log_lines=log_lines, sid=sid, params_path=params_path) self.calc_objective.calc_objective() # --- Create runtime histogram object self.runtimes = VisVP_Runtime() # --- Create visualization GUI object self.vis_gui = VisVP_Logs_GUI(vis_handler=self, m=m, vis_options=vis_options, params_path=params_path, sol_options=sol_options) # --- Create OSQP solver object self.velqp = VelQP(m=m, sid=sid, params_path=params_path, input_path=input_path) self.velqp_bench = VelQP(m=m, sid=sid, params_path=params_path, input_path=input_path) # --- Create Solver Objects for key, value in self.sol_options.items(): # --- Create IPOPT solver object if self.sol_options[key]['Solver'] == "IPOPT": self.sol_options[key].update({'Create_Solver': VOptIPOPT(m=m, sid=sid, params_path=params_path, b_warm=False, vis_options=vis_options, sol_dict=sol_options, key=key)}) # --- Create qpOASES solver object if self.sol_options[key]['Solver'] == "qpOASES": self.sol_options[key].update({'Create_Solver': VOpt_qpOASES(Hm=self.velqp_bench.J_Hess[1:, 1:], Am=self.velqp_bench.Am)}) # For movie tool self.b_vis_triggered = False # Runtime arrays self.dt_sqp_qpoases_arr = [] # Runtime and solver status arrays for key, value in self.sol_options.items(): self.sol_options[key]['Time'] = [] self.sol_options[key]['SolStatus'] = [] # Store max. power values self.P_max = None # --- Do some plots extra to main debug window self.pre_debug_plots()
[docs] def pre_debug_plots(self): """ Python version: 3.5 Created by: Thomas Herrmann Created on: 01.02.2020 Documentation: Does extra plots before main debug window opens. These plots contain QP status, objective term values, global values from entirely logged race session. """ # --- Visualize objective and SQP status VisVP_ObjStatus(calc_objective=self.calc_objective, vis_options=self.vis_options) # --- Visualize SQP (OSQP) runtime if self.vis_options['b_calc_time_plot']: self.runtimes.plot_hist(name_solver='OSQP', dt_solver=self.calc_objective.dt_sqp_arr, vis_options=self.vis_options) # --- Visualization of global values (energy, power) if self.vis_options['b_global_plot']: self.glob_val_vis = VisVP_GlobalVals(vis_main=self, csv_name=self.csv_name, csv_name_ltpl=self.csv_name_ltpl, log_lines=self.log_lines, row_count=self.row_count, glob_lim=self.vis_options['glob_lim'])
[docs] def vis_log(self, idx: int): """ Python version: 3.5 Created by: Thomas Herrmann Created on: 01.02.2020 Documentation: Reads data from logging files and updates the main debug window. Inputs: idx: the starting line-number of the data block that should be read from the log-file """ # Modify user's idx to have multiple of log_lines print(idx) if idx != 0 and np.mod(idx, self.log_lines) != 0: idx = int(idx + (self.log_lines - np.mod(idx, self.log_lines))) elif idx == self.row_count: pass # don't extract entries corresponding to index out of bounds if idx >= self.row_count: idx -= self.log_lines # Choose your own starting Index for a single plot if self.vis_options['b_idx'] > 0: number = self.vis_options['b_idx'] idx = 4 * number # --- Read specific lines in logs row_lc1 = self.read_log_file(self.csv_name, idx) row_lc2 = self.read_log_file(self.csv_name, idx + 1) row_lc3 = self.read_log_file(self.csv_name, idx + 2) row_lc4 = self.read_log_file(self.csv_name, idx + 3) if (row_lc1 != []) and (row_lc2 != []) and (row_lc3 != []) and (row_lc4 != []): # Timestamp ts = row_lc1[0] # datetime.datetime(2012,4,1,0,0).timestamp() # --- Get data from file-name: year, month, day ts_prefix = self.csv_name.rsplit('/')[-1].rsplit('_')[2] + '-' \ + self.csv_name.rsplit('/')[-1].rsplit('_')[3] + '-' \ + self.csv_name.rsplit('/')[-1].rsplit('_')[4] + '-' ts = ts_prefix + ts # Global s coordinate [m] s_glob = json.loads(row_lc1[1]) # Initial velocity guess [m/s] x0_v = np.array(json.loads(row_lc1[2])) # Initial slack variable value guess [-] x0_s_t = np.array(json.loads(row_lc1[3])) # kappa profile of path [rad/m] kappa = np.array(json.loads(row_lc1[4])) # Step size of given path [m] delta_s = np.array(json.loads(row_lc1[5])) # Initial velocity hard constraint [m/s] v_ini = json.loads(row_lc1[6]) # Max. allowed velocity in optimization horizon [m/s] v_max = np.array(json.loads(row_lc1[7])) # End velocity in optimization horizon [m/s] v_end = json.loads(row_lc1[8]) # Initial force constraint [kN] F_ini = json.loads(row_lc1[9]) # Max. allowed power [kW] P_max = np.array(json.loads(row_lc1[10])) self.P_max = P_max # Iteration number of last QP in velocity SQP [-] qp_iter = json.loads(row_lc1[11]) # Status of last QP in velocity SQP [-] qp_status = json.loads(row_lc1[12]) # Total optimization time of velocity SQP [ms] dt_sqp = json.loads(row_lc1[13]) # By OSQP optimized velocity profile [m/s] v_op_osqp = json.loads(row_lc2[0]) # By OSQP optimized slack variable values [-] s_t_op_osqp = json.loads(row_lc3[0]) # Max. allowed longitudinal acceleration [m/s^2] ax_max = json.loads(row_lc4[0]) # Max. allowed lateral acceleration in optimization horizon [m/s^2] ay_max = json.loads(row_lc4[1]) # --- Update constraints in QP if np.size(ax_max) == 1: ax_max_new = None ax_max_new_ipopt = None ay_max_new = None ay_max_new_ipopt = None else: ax_max_new = ax_max ax_max_new_ipopt = ax_max_new[:-1] ay_max_new = ay_max ay_max_new_ipopt = ay_max_new[:-1] # --- Update OSQP expressions self.velqp.osqp_update_online(x0_v=np.array(v_op_osqp), x0_s_t=np.array(s_t_op_osqp), v_ini=v_ini, v_max=v_max, v_end=v_end, F_ini=F_ini, P_max=P_max, kappa=kappa, delta_s=delta_s, ax_max=ax_max_new, ay_max=ay_max_new) ############################################################################################################ # --- Rerun OSQP from log-data ############################################################################################################ if self.vis_options['b_run_OSQP']: # --- Rerun OSQP solver sol = self.velqp.osqp_solve()[0] # --- Transform OSQP solution v_op_rerun = np.insert(sol[0:self.velqp.m - 1], 0, 0) + x0_v # s_t_op_rerun = sol[self.velqp.m - 1:] + x0_s_t print("OSQP rerun v_mps:\n", v_op_rerun) ############################################################################################################ # --- Calculate Solver-solution (Dictionary) ############################################################################################################ for key, value in self.sol_options.items(): ######################################################################################################## # --- Calculate IPOPT-solution ######################################################################################################## if self.sol_options[key]['Solver'] == "IPOPT": sol_ipopt, \ param_vec, \ t_op_ipopt, \ sol_status_ipopt = self.sol_options[key]['Create_Solver'].calc_v_ipopt(v_ini=v_ini, kappa=kappa, delta_s=delta_s, v_max=v_max, F_ini=F_ini, v_end=v_end, P_max=P_max, x0_v=x0_v, x0_s_t=x0_s_t, ax_max=ax_max_new_ipopt, ay_max=ay_max_new_ipopt) v_op_ipopt, \ s_t_op_ipopt, \ F_op_ipopt, \ P_op_ipopt, \ ax_op_ipopt, \ ay_op_ipopt, \ F_xzf_op_ipopt, \ F_yzf_op_ipopt, \ F_xzr_op_ipopt, \ F_yzr_op_ipopt, \ F_xzfl_op_ipopt, \ F_xzfr_op_ipopt, \ F_yzfl_op_ipopt, \ F_yzfr_op_ipopt, \ F_xzrl_op_ipopt, \ F_xzrr_op_ipopt, \ F_yzrl_op_ipopt, \ F_yzrr_op_ipopt, \ = self.sol_options[key]['Create_Solver'].transform_sol(sol=sol_ipopt, param_vec_=param_vec, vis_options=self.vis_options) self.sol_options[key].update({'Velocity': v_op_ipopt}) self.sol_options[key].update({'Slack_Sol': s_t_op_ipopt}) self.sol_options[key].update({'Force': F_op_ipopt}) self.sol_options[key].update({'Power': P_op_ipopt}) self.sol_options[key].update({'Acc_x': ax_op_ipopt}) self.sol_options[key].update({'Acc_y': ay_op_ipopt}) self.sol_options[key].update({'F_xzf': F_xzf_op_ipopt}) self.sol_options[key].update({'F_yzf': F_yzf_op_ipopt}) self.sol_options[key].update({'F_xzr': F_xzr_op_ipopt}) self.sol_options[key].update({'F_yzr': F_yzr_op_ipopt}) self.sol_options[key].update({'F_xzfl': F_xzfl_op_ipopt}) self.sol_options[key].update({'F_xzfr': F_xzfr_op_ipopt}) self.sol_options[key].update({'F_yzfl': F_yzfl_op_ipopt}) self.sol_options[key].update({'F_yzfr': F_yzfr_op_ipopt}) self.sol_options[key].update({'F_xzrl': F_xzrl_op_ipopt}) self.sol_options[key].update({'F_xzrr': F_xzrr_op_ipopt}) self.sol_options[key].update({'F_yzrl': F_yzrl_op_ipopt}) self.sol_options[key].update({'F_yzrr': F_yzrr_op_ipopt}) self.sol_options[key]['Time'].append(t_op_ipopt) self.sol_options[key]['SolStatus'].append(sol_status_ipopt) ######################################################################################################## # --- Calculate qpOASES-solution ######################################################################################################## if self.sol_options[key]['Solver'] == "qpOASES": # qpOASES velocity [m/s] v_op_qpoases = None # qpOASES force [kN] F_op_qpoases = None # qpOASES tire slacks [-] eps_op_qpoases = None t_start = time.perf_counter() v_op_qpoases, \ eps_op_qpoases, \ F_op_qpoases = \ online_qp_postproc(velqp=self.velqp_bench, vp_qpOASES=self.sol_options[key]['Create_Solver'], v_ini=v_ini, kappa=kappa, delta_s=delta_s, x0_v=x0_v, x0_s_t=x0_s_t, v_max=v_max, v_end=v_end, F_ini=F_ini, ax_max=ax_max_new, ay_max=ay_max_new, P_max=P_max) # Overall SQP runtime using qpOASES [ms] dt_sqp_qpoases = (time.perf_counter() - t_start) * 1000 # Store qpOASES runtime print('Overall qpOASES runtime:', dt_sqp_qpoases) P_op_qpoases = F_op_qpoases[0:self.m - 1] * v_op_qpoases[0:self.m - 1] ay_op_qpoases = kappa[0:self.velqp.sqp_stgs['m'] - 1] \ * np.square(v_op_qpoases[0:self.velqp.sqp_stgs['m'] - 1]) ax_op_qpoases = (np.square(v_op_qpoases[1:self.velqp.sqp_stgs['m']]) - np.square( v_op_qpoases[0:self.velqp.sqp_stgs['m'] - 1])) / (2 * np.array(delta_s)) \ + (self.velqp.sym_sc_['c_res_'] * np.square(v_op_qpoases[0:self.velqp.sqp_stgs['m'] - 1])) /\ (1000 * self.velqp.sym_sc_['m_t_']) self.sol_options[key].update({'Velocity': v_op_qpoases}) self.sol_options[key].update({'Slack_Sol': eps_op_qpoases}) self.sol_options[key].update({'Force': F_op_qpoases}) self.sol_options[key].update({'Power': P_op_qpoases}) self.sol_options[key].update({'Acc_x': ax_op_qpoases}) self.sol_options[key].update({'Acc_y': ay_op_qpoases}) self.sol_options[key]['Time'].append(dt_sqp_qpoases) # self.sol_options[key]['SolStatus'].append(sol_status_qpoases) if self.vis_options['b_immediate_plot_update']: # --- SQP Status update self.vis_gui.text_status.\ set_text(r'$s_\mathrm{glob}$: ' + str('%.2f' % s_glob) + ' m' + r' $t$: ' + str(3600 + time.mktime(datetime.datetime.strptime(ts, "%Y-%m-%d-%H:%M:%S.%f").timetuple() )) + ' s' + r' $\mathrm{QP}_\mathrm{status}$: ' + str(qp_status) + r' $\mathrm{QP}_\mathrm{iter}$: ' + str(qp_iter) + r' $\mathrm{SQP}_\mathrm{\Delta t}$: ' + str(dt_sqp) + ' ms') # --- Velocity update self.vis_gui.p1_1.set_ydata(v_op_osqp[0:self.velqp.m]) self.vis_gui.p1_2.set_ydata(v_max) self.vis_gui.p1_3.set_ydata(v_end) self.vis_gui.p1_4.set_ydata(x0_v) # debugging solver updates for key, value in self.sol_options.items(): self.vis_gui.vel_dict[key].set_ydata(self.sol_options[key]['Velocity']) # --- Force update self.vis_gui.p3_1.set_ydata(self.velqp.F_cst + self.velqp.sym_sc_['Fmax_kN_']) self.vis_gui.p3_2.set_ydata([F_ini, F_ini + self.velqp.sym_sc_['Fini_tol_'], F_ini - self.velqp.sym_sc_['Fini_tol_']]) for key, value in self.sol_options.items(): self.vis_gui.F_dict[key].set_ydata(self.sol_options[key]['Force']) # --- Delta Force update self.vis_gui.p4_1.set_ydata(self.velqp.dF_cst + self.velqp.sym_sc_['dF_kN_pos_']) # --- Power if self.velqp.sqp_stgs['b_var_power']: self.vis_gui.p5_1.set_ydata(self.velqp.P_cst + P_max.reshape((self.velqp.m - 1, 1))) self.vis_gui.p5_2.set_ydata(P_max) for key, value in self.sol_options.items(): self.vis_gui.P_dict[key].set_ydata(np.around(self.sol_options[key]['Power'], 2)) else: self.vis_gui.p5_1.set_ydata(self.velqp.P_cst + self.velqp.sym_sc_['Pmax_kW_']) self.vis_gui.p5_2.set_ydata(self.velqp.sym_sc_['Pmax_kW_'] * np.ones((self.m - 1, 1))) for key, value in self.sol_options.items(): self.vis_gui.P_dict[key].set_ydata(np.array(self.sol_options[key]['Power'] * 1.0)) # --- Tire usage update ay_mps2 = kappa[0:self.velqp.sqp_stgs['m'] - 1] * \ np.square(v_op_osqp[0:self.velqp.sqp_stgs['m'] - 1]) ax_mps2 = (np.square(v_op_osqp[1:self.velqp.sqp_stgs['m']]) - np.square( v_op_osqp[0:self.velqp.sqp_stgs['m'] - 1])) / (2 * np.array(delta_s)) + \ (self.velqp.sym_sc_['c_res_'] * np.square(v_op_osqp[0:self.velqp.sqp_stgs['m'] - 1])) / \ (1000 * self.velqp.sym_sc_['m_t_']) self.vis_gui.p6_1.set_xdata(ay_mps2) self.vis_gui.p6_1.set_ydata(ax_mps2) for key, value in self.sol_options.items(): self.vis_gui.a_dict[key].set_xdata(self.sol_options[key]['Acc_y']) self.vis_gui.a_dict[key].set_ydata(self.sol_options[key]['Acc_x']) # Case of variable friction limits if np.size(ax_max) > 1: axmax_max_plot = np.max(ax_max) axmax_min_plot = np.min(ax_max) aymax_max_plot = np.max(ay_max) aymax_min_plot = np.min(ay_max) self.vis_gui.p6_2.set_xdata([aymax_max_plot, 0, - aymax_max_plot, 0, aymax_max_plot]) self.vis_gui.p6_2.set_ydata([0, axmax_max_plot, 0, - axmax_max_plot, 0]) self.vis_gui.p6_3.set_xdata([aymax_min_plot, 0, - aymax_min_plot, 0, aymax_min_plot]) self.vis_gui.p6_3.set_ydata([0, axmax_min_plot, 0, - axmax_min_plot, 0]) # --- Front Axle Tire usage update for key, value in self.sol_options.items(): if self.sol_options[key]['Model'] == "DM" and self.sol_options[key]['Solver'] == "IPOPT": self.vis_gui.F_f_dict[key].set_xdata(self.sol_options[key]['F_yzf']) self.vis_gui.F_f_dict[key].set_ydata(self.sol_options[key]['F_xzf']) elif self.sol_options[key]['Model'] == "FW" and self.sol_options[key]['Solver'] == "IPOPT": self.vis_gui.F_fl_dict[key].set_xdata(self.sol_options[key]['F_yzfl']) self.vis_gui.F_fl_dict[key].set_ydata(self.sol_options[key]['F_xzfl']) self.vis_gui.F_fr_dict[key].set_xdata(self.sol_options[key]['F_yzfr']) self.vis_gui.F_fr_dict[key].set_ydata(self.sol_options[key]['F_xzfr']) # --- Rear Axle Tire usage update for key, value in self.sol_options.items(): if self.sol_options[key]['Model'] == "DM" and self.sol_options[key]['Solver'] == "IPOPT": self.vis_gui.F_r_dict[key].set_xdata(self.sol_options[key]['F_yzr']) self.vis_gui.F_r_dict[key].set_ydata(self.sol_options[key]['F_xzr']) elif self.sol_options[key]['Model'] == "FW" and self.sol_options[key]['Solver'] == "IPOPT": self.vis_gui.F_rl_dict[key].set_xdata(self.sol_options[key]['F_yzrl']) self.vis_gui.F_rl_dict[key].set_ydata(self.sol_options[key]['F_xzrl']) self.vis_gui.F_rr_dict[key].set_xdata(self.sol_options[key]['F_yzrr']) self.vis_gui.F_rr_dict[key].set_ydata(self.sol_options[key]['F_xzrr']) # --- Slack update self.vis_gui.p7_1.set_ydata(s_t_op_osqp) self.vis_gui.p7_2.set_ydata(x0_s_t) self.vis_gui.p7_3.set_ydata([self.velqp.sym_sc_['s_v_t_lim_']] * len(s_t_op_osqp)) for key, value in self.sol_options.items(): if self.sol_options[key]['Slack'] is True: self.vis_gui.slack_dict[key].set_ydata(np.array(self.sol_options[key]['Slack_Sol'])) # self.vis_gui.main_fig.canvas.draw() if self.vis_options['b_save_tikz']: self.vis_gui.main_fig.canvas.draw() tikzplotlib.save('SQP_OSQP_debug.tex') if not self.b_vis_triggered and self.vis_options['b_movie']: # --- Reset trigger once movie is running self.b_vis_triggered = True # --- Allow interactive mode plt.ion() for i in range(0, self.row_count - self.log_lines, self.log_lines): self.vis_gui.slider_vel.set_val(int(i)) print("\n*** Progress: " + str("%.2f" % round((i / self.row_count) * 100, 2)) + " %") # --- Plot runtimes for key, value in self.sol_options.items(): if self.vis_options['b_calc_time_plot']: self.runtimes.plot_hist(name_solver=self.sol_options[key]['Solver'], dt_solver=np.array(self.sol_options[key]['Time']), vis_options=self.vis_options) plt.ioff() # --- Show all plots that have been calculated plt.draw()
[docs] def read_log_file(self, csv_name: str, idx: int) -> list: """ Python version: 3.5 Created by: Thomas Herrmann Created on: 01.02.2020 Documentation: Reads specific line in log file and splits into its variables if several exist in this line. Inputs: csv_name: file name and path to read data from idx: the starting line-number of the data block that should be read from the log-file Outputs: list containing all logged variable values """ row_lc = linecache.getline(csv_name, idx + 1) # --- In case of RAM trouble but calculation speed will drop significantly # linecache.clearcache() # --- Get rid of ending '\n' return row_lc[:-1].rsplit(';')
if __name__ == '__main__': pass