Hi
I have a problem with the RTI implementation in acados
with Python
. Here, I am trying to implement a solver that can be either set to SQP or SQP_RTI and measure the cpu solving times in closed loop. Now the problem lies in implementing SQP_RTI. I cannot understand the different implementations with the rti loop (example_sqp_rti_loop.py 路 GitHub) and the simple closed loop implementation (minimal_example_closed_loop.py 路 GitHub).
In my code I used so far the implementation of the rti loop in a closed loop with max_rti_iters and the residuals.
def solve(
self,
x0: np.ndarray,
x_guess=None,
u_guess=None
):
simX_traj = np.ndarray((self.nx, self.N_MPC+1))
simU_traj = np.ndarray((self.nu, self.N_MPC))
soliters = 0
soltime = 0.0
if x_guess is not None:
for i_x in range(x_guess.shape[1]):
self.ocp_solver.set(i_x, "x", x_guess[:, i_x])
if u_guess is not None:
for i_u in range(u_guess.shape[1]):
self.ocp_solver.set(i_u, "u", u_guess[:, i_u])
# Initial bounds
self.ocp_solver.set(0, "lbx", x0)
self.ocp_solver.set(0, "ubx", x0)
# solve for SQP
if self.ocp.solver_options.nlp_solver_type == 'SQP':
status = self.ocp_solver.solve()
soliters = np.sum(self.ocp_solver.get_stats('qp_iter'), dtype=np.int64)
soltime = self.ocp_solver.get_stats('time_tot')
# solve for RTI
elif self.ocp.solver_options.nlp_solver_type == 'SQP_RTI':
for i in range(self.max_rti_iters):
status = self.ocp_solver.solve()
residuals = self.ocp_solver.get_residuals()
soliters += int(self.ocp_solver.get_stats('qp_iter')[1])
soltime += self.ocp_solver.get_stats('time_tot')
if max(residuals) < self.rti_tol:
break
# solver failed print and exception
if self.solver_verbose and status != 0:
self.ocp_solver.print_statistics()
if self.raise_exc_solver_fail and status != 0:
raise Exception('Solver returned status {} -> {}'.format(status, self.solver_status_meanings[status]))
# get trajectories
for i in range(self.N_MPC):
simX_traj[:, i] = self.ocp_solver.get(i, "x")
simU_traj[:, i] = self.ocp_solver.get(i, "u")
simX_traj[:, self.N_MPC] = self.ocp_solver.get(self.N_MPC, "x")
return simX_traj, simU_traj, soltime, soliters
def get_MPC_trajectory_acados(
controller: AMPC_class,
W: np.ndarray=None,
xinit: np.ndarray=None,
show_tqdm: bool=True,
verbose = True
) -> AMPC_data:
P = controller.P
MPC_results = AMPC_data(P=P, acados_name=controller.acados_name, acados_options=controller.acados_options)
x_curr = P.xinit.reshape((-1,)) if xinit is None else xinit
x_guess = np.repeat(x_curr.reshape((4, 1)), repeats=controller.N_MPC, axis=1)
u_guess = np.zeros((1, controller.N_MPC))
for i in trange(P.N_sim, desc = controller.horizon_name, mininterval=0.5, disable = not show_tqdm):
try:
x_traj, u_traj, soltime, soliters = controller.solve(x_curr, x_guess=x_guess, u_guess=u_guess)
except Exception as e:
msg = e.args[0]
warnings.warn(f'MPC solver failed on step {i}, reason: {msg}',UserWarning)
break
time.sleep(0.001)
# bring the theta within the [-pi,+pi] range
if x_traj[1, 0] > 1.25*np.pi:
x_traj[1, :] -= 2*np.pi
elif x_traj[1, 0] < -1.25*np.pi:
x_traj[1, 0] += 2*np.pi
# log iteration
MPC_results.Time[i] = soltime
MPC_results.Iterations[i] = soliters
MPC_results.X[:,i] = x_curr
MPC_results.U[:,i] = u_traj[:, 0]
MPC_results.X_traj[i,:,:] = x_traj
MPC_results.U_traj[i,:,:] = u_traj
x_curr = x_traj[:, 1]
x_guess = np.hstack((x_traj[:, 1:], x_traj[:, -1:]))
u_guess = np.vstack((u_traj[1:], u_traj[-1:]))
# add disturbance
if W is not None:
try:
x_curr += (np.random.randn(P.nx)*W)
except ValueError as err:
print(f'Disturbance variance W passed incorrectly, expected scalar or array of size ({P.nx},), got {W}')
print(err)
break
# calculate cost
T_traj = P.Ts*(i+1)
X_cost = MPC_results.X[:,:i]
U_cost = MPC_results.U[:,:i]
MPC_results.Cost = (np.sum(X_cost*P.Q.dot(X_cost)) + np.sum(U_cost*P.R.dot(U_cost)))
# freeze dataclass
MPC_results.freeze()
if verbose:
print(f'Trajectory cost calculation: {i} steps taken, traj. time {T_traj:0.2f} sec, cost = {MPC_results.Cost:0.7f}')
return MPC_results
The solve function is a function of a class, but the rest of the class doesnt really matter here. Because I didn鈥檛 change the default settings for acados solver except using
qp_solver='FULL_CONDENSING_HPIPM',
integrator_type='DISCRETE',
nlp_solver_type='SQP_RTI' # 'SQP'
The constraints and stuff is all correctly set and my implementation works great, especially for SQP, thanks to your beautiful acados. However getting the correct solving times I can get is crutial for me.
Now my question is: Is that correctly implemented for the RTI algorithm or did I miss something here. The minimal_example_closed_loop.py uses both phases and does not solve the issue iteratiely while breaking when a certain residual tolerance is reached. I assume this implementation would be faster, but I dont know how this would work out with the x and u guesses. I tried my implementation with only one iteration and the results were not great. So I guess this is wrong implemented because I didnt use the two phases and setting the x0 after prep phase?
Thanks for the answere
Best regards
Josua