Correct closed loop RTI implementation?

Hi :wave:

I have a problem with the RTI implementation in acados with Python. Here, I am trying to implement a solver that can be either set to SQP or SQP_RTI and measure the cpu solving times in closed loop. Now the problem lies in implementing SQP_RTI. I cannot understand the different implementations with the rti loop (example_sqp_rti_loop.py 路 GitHub) and the simple closed loop implementation (minimal_example_closed_loop.py 路 GitHub).

In my code I used so far the implementation of the rti loop in a closed loop with max_rti_iters and the residuals.

 def solve(
    self, 
    x0: np.ndarray,
    x_guess=None, 
    u_guess=None
):

  simX_traj = np.ndarray((self.nx, self.N_MPC+1))
  simU_traj = np.ndarray((self.nu, self.N_MPC))
  soliters = 0
  soltime = 0.0

  if x_guess is not None:
      for i_x in range(x_guess.shape[1]):
          self.ocp_solver.set(i_x, "x", x_guess[:, i_x])
  if u_guess is not None:
      for i_u in range(u_guess.shape[1]):
          self.ocp_solver.set(i_u, "u", u_guess[:, i_u])

  # Initial bounds
  self.ocp_solver.set(0, "lbx", x0)
  self.ocp_solver.set(0, "ubx", x0)

  # solve for SQP
  if self.ocp.solver_options.nlp_solver_type == 'SQP':
      status = self.ocp_solver.solve()
      soliters = np.sum(self.ocp_solver.get_stats('qp_iter'), dtype=np.int64)
      soltime = self.ocp_solver.get_stats('time_tot')
      
  # solve for RTI
  elif self.ocp.solver_options.nlp_solver_type == 'SQP_RTI':
      for i in range(self.max_rti_iters):
          status = self.ocp_solver.solve()
          residuals = self.ocp_solver.get_residuals()
          soliters += int(self.ocp_solver.get_stats('qp_iter')[1])
          soltime += self.ocp_solver.get_stats('time_tot')
          
          if max(residuals) < self.rti_tol:
              break

  # solver failed print and exception
  if self.solver_verbose and status != 0:
      self.ocp_solver.print_statistics()
  if self.raise_exc_solver_fail and status != 0:
      raise Exception('Solver returned status {} -> {}'.format(status, self.solver_status_meanings[status]))
  
  # get trajectories
  for i in range(self.N_MPC):
      simX_traj[:, i] = self.ocp_solver.get(i, "x")
      simU_traj[:, i] = self.ocp_solver.get(i, "u")
  simX_traj[:, self.N_MPC] = self.ocp_solver.get(self.N_MPC, "x")

  return simX_traj, simU_traj, soltime, soliters



 def get_MPC_trajectory_acados(
        controller: AMPC_class,
        W: np.ndarray=None,
        xinit: np.ndarray=None,
        show_tqdm: bool=True,
        verbose = True
) -> AMPC_data:
  P = controller.P
  MPC_results = AMPC_data(P=P, acados_name=controller.acados_name, acados_options=controller.acados_options)
  x_curr = P.xinit.reshape((-1,)) if xinit is None else xinit

  x_guess = np.repeat(x_curr.reshape((4, 1)), repeats=controller.N_MPC, axis=1)
  u_guess = np.zeros((1, controller.N_MPC))

  for i in trange(P.N_sim, desc = controller.horizon_name, mininterval=0.5, disable = not show_tqdm):
      try:
          x_traj, u_traj, soltime, soliters = controller.solve(x_curr, x_guess=x_guess, u_guess=u_guess)
      except Exception as e:
          msg = e.args[0]
          warnings.warn(f'MPC solver failed on step {i}, reason: {msg}',UserWarning)
          break
      time.sleep(0.001)

      # bring the theta within the [-pi,+pi] range
      if x_traj[1, 0] > 1.25*np.pi:
          x_traj[1, :] -= 2*np.pi
      elif x_traj[1, 0] < -1.25*np.pi:
          x_traj[1, 0] += 2*np.pi
          
      # log iteration
      MPC_results.Time[i] = soltime
      MPC_results.Iterations[i] = soliters
      MPC_results.X[:,i] = x_curr
      MPC_results.U[:,i] = u_traj[:, 0]
      MPC_results.X_traj[i,:,:] = x_traj
      MPC_results.U_traj[i,:,:] = u_traj

      x_curr = x_traj[:, 1]
      x_guess = np.hstack((x_traj[:, 1:], x_traj[:, -1:]))
      u_guess = np.vstack((u_traj[1:], u_traj[-1:]))

      # add disturbance
      if W is not None:
          try:
              x_curr += (np.random.randn(P.nx)*W)
          except ValueError as err:
              print(f'Disturbance variance W passed incorrectly, expected scalar or array of size ({P.nx},), got {W}')
              print(err)
              break

  # calculate cost
  T_traj = P.Ts*(i+1)
  X_cost = MPC_results.X[:,:i]
  U_cost = MPC_results.U[:,:i]
  MPC_results.Cost = (np.sum(X_cost*P.Q.dot(X_cost)) + np.sum(U_cost*P.R.dot(U_cost)))

  # freeze dataclass
  MPC_results.freeze()

  if verbose:
      print(f'Trajectory cost calculation: {i} steps taken, traj. time {T_traj:0.2f} sec, cost = {MPC_results.Cost:0.7f}')

  return MPC_results

The solve function is a function of a class, but the rest of the class doesnt really matter here. Because I didn鈥檛 change the default settings for acados solver except using

  qp_solver='FULL_CONDENSING_HPIPM', 
  integrator_type='DISCRETE', 
  nlp_solver_type='SQP_RTI'  # 'SQP'

The constraints and stuff is all correctly set and my implementation works great, especially for SQP, thanks to your beautiful acados. However getting the correct solving times I can get is crutial for me.

Now my question is: Is that correctly implemented for the RTI algorithm or did I miss something here. The minimal_example_closed_loop.py uses both phases and does not solve the issue iteratiely while breaking when a certain residual tolerance is reached. I assume this implementation would be faster, but I dont know how this would work out with the x and u guesses. I tried my implementation with only one iteration and the results were not great. So I guess this is wrong implemented because I didnt use the two phases and setting the x0 after prep phase?

Thanks for the answere
Best regards
Josua

Hi,

typically, I would recommend not to shift initial guesses when using a real-time algorithm.
Shifting has some problems:

  1. you have no value for the last state and control.
  2. it does not make sense for nonuniform discretization grids.

You can also check our recent work on AS-RTI and its acados implementation

Hi,

thank you for your quick answere. In the updated code, I use as_rti_iter = 1, and as_rti_level = 3. The level seems to be important for this example (inverted pendulum). The cost is always higher for the other 3 options (A, B, C). However I have a question regarding the iterations you have in your earlier mentioned rti loop example with the residual check in every rti loop. Is this number of iterations there the same as the as_rti_iter value, which is then only without the residual check so the iteration is fixed? I now use the algorithms without trajectory shifts and at least sqp works very good. Now, without the rti loops and setting the as_rti_iter = 1 the costs not that good for A and B (Cost around Cost: 125). With C and standard RTI the cost is bad. And with SQP everything is fine and the cost is as low as it can get, as well as for D (Cost: 103). There is one minor drawback, I have to ignore infeasability for all AS-RTI algorithms and continue with the trajectory, which is not really what we want.
Here a quick comparisson


The solver times are averaged over 10 runs.

Now here is the updated code.

  def solve(
        self, 
        x0: np.ndarray,
):
  """
  Solves the OCP with acados. Print statistics, if solver returns status that is not 0.
  status:
      - 0 = ACADOS_SUCCESS
      - 1 = ACADOS_FAILURE
      - 2 = ACADOS_MAXITER
      - 3 = ACADOS_MINSTEP
      - 4 = ACADOS_QP_FAILURE
      - 5 = ACADOS_READY
  
  Parameters
  ----------
  ``x0`` : numpy.ndarray
      An array that is the initial value for the open loop MPC.

  Returns
  -------
  ``simX_traj`` : numpy.ndarray
      An array containing the calculated states for the OCP up to N_MPC. Of shape (nx, N_MPC + 1)
  ``simU_traj`` :  numpy.ndarray
      An array containing the calculated inputs for the OCP up to N_MPC. Of shape (nx, N_MPC)
  ``soltime`` : float
      The solvingtime needed for solving the OCP.
  ``soliters`` : int
      The summed solver iterations solving the OCP.
  """
  simX_traj = np.ndarray((self.nx, self.N_MPC+1))
  simU_traj = np.ndarray((self.nu, self.N_MPC))
  soliters = 0
  soltime = 0.0

  # solve for SQP
  if self.ocp.solver_options.nlp_solver_type == 'SQP':
      # Initial bounds
      self.ocp_solver.set(0, "lbx", x0)
      self.ocp_solver.set(0, "ubx", x0)

      status = self.ocp_solver.solve()
      self._solver_status_error(status)
      soltime = self.ocp_solver.get_stats('time_tot')
      soliters = np.sum(self.ocp_solver.get_stats('qp_iter'), dtype=np.int64)
      
      
  # solve for RTI
  elif self.ocp.solver_options.nlp_solver_type == 'SQP_RTI':
      # preparation phase
      self.ocp_solver.options_set('rti_phase', 1)
      status = self.ocp_solver.solve()
      self._solver_status_error(status)
      prep_time = self.ocp_solver.get_stats('time_tot')
      prep_iters = np.sum(self.ocp_solver.get_stats('qp_iter'), dtype=np.int64)

      # Initial bounds
      self.ocp_solver.set(0, "lbx", x0)
      self.ocp_solver.set(0, "ubx", x0)

      # feedback phase
      self.ocp_solver.options_set('rti_phase', 2)
      status = self.ocp_solver.solve()
      self._solver_status_error(status)
      fb_time = self.ocp_solver.get_stats('time_tot')
      fb_iters = np.sum(self.ocp_solver.get_stats('qp_iter'), dtype=np.int64)

      soltime = prep_time + fb_time
      soliters = prep_iters + fb_iters
  
  # get open loop traj
  for i in range(self.N_MPC):
      simX_traj[:, i] = self.ocp_solver.get(i, "x")
      simU_traj[:, i] = self.ocp_solver.get(i, "u")
  simX_traj[:, self.N_MPC] = self.ocp_solver.get(self.N_MPC, "x")

  return simX_traj, simU_traj, soltime, soliters

def get_MPC_trajectory_acados(
          controller: AMPC_class | NH_AMPC_class,
          W: np.ndarray=None,
          xinit: np.ndarray=None,
          show_tqdm: bool=True,
          verbose = True
  ) -> AMPC_data:
  """
  Generates a trajectory based on the MPC ``controller``.

  Parameters
  ----------
  ``controller`` : MPC_acados | MPC_NN_acados 
      The acados MPC controller class.
  ``W`` : ndarray, optional
      Variance for the additive state disturbance, can be a scalar or array of size X. Default = None
  ``xinit`` : ndarray, optional
      Initial state of the system. Defaults = None.
  ``show_tqdm`` : bool, optional
      Whether to show a tqdm progress bar. Defaults = True.

  Returns
  -------
  ``MPC_results`` : AMPC_data
      A dataclass with the closed and open loop trajectories, solving times, parameters and acados options. 
  """
  P = controller.P
  MPC_results = AMPC_data(P=P, acados_name=controller.acados_name, acados_options=controller.acados_options)
  x_curr = P.xinit.reshape((-1,)) if xinit is None else xinit

  # x_guess = np.repeat(x_curr.reshape((4, 1)), repeats=controller.N_MPC, axis=1)
  # u_guess = np.zeros((1, controller.N_MPC))

  for i in trange(P.N_sim, desc = controller.horizon_name, mininterval=0.5, disable = not show_tqdm):
      try:
          x_traj, u_traj, soltime, soliters = controller.solve(x_curr)
      except Exception as e:
          msg = e.args[0]
          warnings.warn(f'MPC solver failed on step {i}, reason: {msg}',UserWarning)
          break
      time.sleep(0.001)

      # bring the theta within the [-pi,+pi] range
      if x_traj[1, 0] > 1.25*np.pi:
          x_traj[1, :] -= 2*np.pi
      elif x_traj[1, 0] < -1.25*np.pi:
          x_traj[1, 0] += 2*np.pi
          
      # log iteration
      MPC_results.Time[i] = soltime
      MPC_results.Iterations[i] = soliters
      MPC_results.X[:,i] = x_curr
      MPC_results.U[:,i] = u_traj[:, 0]
      MPC_results.X_traj[i,:,:] = x_traj
      MPC_results.U_traj[i,:,:] = u_traj

      x_curr = x_traj[:, 1]
      # x_guess = np.hstack((x_traj[:, 1:], x_traj[:, -1:]))
      # u_guess = np.vstack((u_traj[1:], u_traj[-1:]))

      # add disturbance
      if W is not None:
          try:
              x_curr += (np.random.randn(P.nx)*W)
          except ValueError as err:
              print(f'Disturbance variance W passed incorrectly, expected scalar or array of size ({P.nx},), got {W}')
              print(err)
              break

  # calculate cost
  T_traj = P.Ts*(i+1)
  X_cost = MPC_results.X[:,:i]
  U_cost = MPC_results.U[:,:i]
  MPC_results.Cost = (np.sum(X_cost*P.Q.dot(X_cost)) + np.sum(U_cost*P.R.dot(U_cost)))

  # freeze dataclass
  MPC_results.freeze()

  if verbose:
      print(f'Trajectory cost calculation: {i} steps taken, traj. time {T_traj:0.2f} sec, cost = {MPC_results.Cost:0.7f}')

  return MPC_results

The status I always get with AS-RTI no matter which type is status 5 - ACADOS_READY.
Best,
Josua

NVM, status 5 was returned because I checked it after the preperation phase. Why standard RTI works better in terms of cost with the previous implementation still is beyond me :smiley: