Hello everyone,
I am implementing Nonlinear Model Predictive Control (NMPC) for real-time trajectory tracking on my underwater vehicle using Acados and its Python interface. While the NMPC performed well in simulations, I am facing significant challenges in real-time experiments. The vehicle fails to track the reference trajectory, and the cost function does not behave as expected, despite extensive parameter tuning.
The core issues in my real-time experiments are:
- Poor trajectory tracking**: The vehicle does not follow the desired reference trajectory effectively, resulting in large steady-state errors.
- Unresponsive cost function**: The cost function shows minimal reaction to deviations from the reference trajectory, raising concerns about its role in optimization.
Key Observations:
*The control inputs vary but fail to generate meaningful corrections to the trajectory error.
- The system does not adapt well to changes in the reference trajectory, which was not an issue in simulations.
- Despite extensive variation of NMPC parameters (e.g., weights, prediction horizon, solver settings), the performance does not improve in real-time.
Code
Snippet
# Define the number of states and controls
n_states = 2 # [z, w]
n_controls = 1 # [tau_z]
# State and control variables
state = ca.MX.sym('state', n_states)
control = ca.MX.sym('control', n_controls)
# Dynamics parameters
z, w = state[0], state[1]
tau_z = control[0]
m = 46.0 # Vehicle mass
d_z = 22.0 # Linear damping coefficient
g = 9.81 # Gravity
F = 15 # Buoyancy force
B = m * g + F # Total buoyancy force
# Coefficients de traînée
d_z1 = 22 # Coefficient linéaire de traînée (kg/s)
d_z2 = 800 # Coefficient quadratique de traînée (kg/m)
# Simplified dynamics
z_dot = w
drag = d_z1 * z_dot + d_z2 * z_dot**2 # Traînée totale
w_dot = (tau_z - drag * w - F) / m
state_dot = ca.vertcat(z_dot, w_dot)
# CasADi dynamics function
f_dynamics = ca.Function('f_dynamics', [state, control], [state_dot])
# Discretization of dynamics
dt = 0.05 ## a corrige
##### NMPC Configuration
ocp = AcadosOcp()
ocp.model.name = 'depth_control'
ocp.model.x = state
ocp.model.u = control
ocp.model.f_expl_expr = state_dot
ocp.dims.N = 2 # Prediction horizon
ocp.solver_options.tf = ocp.dims.N * dt
# Matrices de coût
#Q = np.diag([5000000000000000, 50]) # Poids pour [z, w] _ np.diag([5000000000, 0])
Q = np.diag([100000000000,1000 ]) # Poids pour [z, w] _ np.diag([5000000000, 0])
R = np.array([[0.001]]) # Poids pour tau_z np.array([[0.001]])
W = np.block([[Q, np.zeros((n_states, n_controls))],
[np.zeros((n_controls, n_states)), R]])
# Terminal weight matrix
W_e = Q # Terminal weight matrix
# Set cost weights in Acados OCP
ocp.cost.W = W
ocp.cost.W_e = W_e
ocp.cost.W_0 = W
# Define selection matrices for cost function
Vx = np.eye(n_states) # Select all states
Vu = np.eye(n_controls) # Select all controls
Vx_e = np.eye(n_states) # Terminal state cost selection
# Configurer les références
ocp.cost.cost_type = 'LINEAR_LS' # Type de coût (Least Squares)
ocp.cost.cost_type_e = 'LINEAR_LS' # Coût terminal
# Update cost selection matrices
ocp.cost.Vx = np.vstack([Vx, np.zeros((n_controls, n_states))])
ocp.cost.Vu = np.vstack([np.zeros((n_states, n_controls)), Vu])
ocp.cost.Vx_e = Vx_e
ocp.cost.yref = np.zeros((n_states + n_controls,))
ocp.cost.yref_e = np.zeros((n_states,))
# Constraints
ocp.constraints.lbu = np.array([-50.0]) # Lower bound for tau_z
ocp.constraints.ubu = np.array([50.0]) # Upper bound for tau_z
ocp.constraints.idxbu = np.arange(n_controls)
# Solver options
ocp.solver_options.qp_solver_tol_stat = 1e-6 # QP solver tolerance for stationarity
ocp.solver_options.qp_solver_tol_eq = 1e-6 # QP solver tolerance for equality constraints
ocp.solver_options.qp_solver_tol_ineq = 1e-6 # QP solver tolerance for inequality constraints
ocp.solver_options.qp_solver_tol_comp = 1e-6 # QP solver tolerance for complementarity
ocp.solver_options.nlp_solver_tol_stat = 1e-6 # NLP tolerance for stationarity
ocp.solver_options.nlp_solver_tol_eq = 1e-6 # NLP tolerance for equality constraints
ocp.solver_options.nlp_solver_tol_ineq = 1e-6 # NLP tolerance for inequality constraints
ocp.solver_options.nlp_solver_tol_comp = 1e-6 # NLP tolerance for complementarity
ocp.solver_options.nlp_solver_max_iter = 1000 # Maximum iterations
# Initialize solver
ocp_solver = AcadosOcpSolver(ocp, json_file='acados_ocp.json')
# Warm up the NMPC solver
initial_state = np.zeros((n_states,)) # Initial state [z=0, w=0]
ocp_solver.set(0, 'x', initial_state) # Set the initial state in the solver
for k in range(ocp.dims.N): # For each prediction horizon step
ocp_solver.set(k, 'yref', np.zeros((n_states + n_controls,))) # Set dummy reference
ocp_solver.solve() # Run a warm-up solve to initialize solver's internal states
t_end = 30
time_array = np.arange(0, t_end, dt)
ZINI, ZINTER, ZF, TF, T_STABLE = 0, 0.5, 0.25, 20.0, 10.0
def poly5_trajectory(t, t_start, t_end, z_start, z_end):
"""
Generates a 5th-order polynomial trajectory.
:param t: Current time.
:param t_start: Start time.
:param t_end: End time.
:param z_start: Start position.
:param z_end: End position.
:return: Position z(t).
"""
T = t_end - t_start
if t < t_start:
return z_start, 0.0 # z and its derivative w
elif t > t_end:
return z_end, 0.0 # z and its derivative w
else:
a0 = z_start
a1 = 0
a2 = 0
a3 = 10 * (z_end - z_start) / T**3
a4 = -15 * (z_end - z_start) / T**4
a5 = 6 * (z_end - z_start) / T**5
tau = t - t_start
z_t = a0 + a1 * tau + a2 * tau**2 + a3 * tau**3 + a4 * tau**4 + a5 * tau**5
w_t = a1 + 2*a2 * tau + 3*a3 * tau**2 + 4 * a4 * tau**3 + 5 * a5 * tau**4
return z_t, w_t
# Generate reference trajectories
z_trajectory = []
w_trajectory = []
for t in time_array:
if t < TF:
# Transition from ZINI to ZINTER
z_t, w_t = poly5_trajectory(t, 0, TF, ZINI, ZINTER)
elif t < TF + T_STABLE:
# Steady state at ZINTER
z_t, w_t = ZINTER, 0.01 # Add a small non-zero zdot (w_t)
elif t < 2 * TF + T_STABLE:
# Transition from ZINTER to ZF
z_t, w_t = poly5_trajectory(t, TF + T_STABLE, 2 * TF + T_STABLE, ZINTER, ZF)
else:
# Steady state at ZF
z_t, w_t = ZF, 0.01 # Add a small non-zero zdot (w_t)
z_trajectory.append(z_t)
w_trajectory.append(w_t)
# Generate reference trajectories
z_trajectory = []
w_trajectory = []
for t in time_array:
if t < TF:
z_t, w_t = poly5_trajectory(t, 0, TF, ZINI, ZINTER)
elif t < TF + T_STABLE:
z_t, w_t = ZINTER, 0.0
elif t < 2 * TF + T_STABLE:
z_t, w_t = poly5_trajectory(t, TF + T_STABLE, 2 * TF + T_STABLE, ZINTER, ZF)
else:
z_t, w_t = ZF, 0.0
z_trajectory.append(z_t)
w_trajectory.append(w_t)
# The resulting z_trajectory contains the depth reference, and w_trajectory contains the rate of change (velocity) reference.
# Start the control loop
start_time = time.time()
##### Control Loop
# Update global index based on elapsed time
# Calculate current_time and global_idx
current_time = time.time() - start_time # Elapsed time since the script started
global_idx = int(current_time / dt) # Update global index based on elapsed time
actual_trajectory = []
actual_w_trajectory = []
control_inputs = []
errors = []
# Initialize storage for plots
control_inputs = [] # To store control input values
trajectory_errors = [] # To store trajectory errors
cost_function_values = [] # To store the cost function values
yref_segments_over_time = [] # Initialize a list to store yref_segment values
predicted_trajectories = []
# Initialization Phase
print("Starting initialization phase...")
# Warm up the NMPC solver
initial_state = np.array([z_trajectory[0], 0.0]) # Match initial depth
# Debug: Check the initial state
print(f"Expected initial state: [z = {z_trajectory[0]}, w = 0.0]")
print(f"Setting initial state in solver: {initial_state}")
# Set the initial state in the solver
ocp_solver.set(0, 'x', initial_state)
# Debug: Log warm-up references for all prediction horizon steps
for k in range(ocp.dims.N):
yref_debug = np.zeros((n_states + n_controls,))
print(f"Warm-up reference yref at k={k}: {yref_debug}")
ocp_solver.set(k, 'yref', yref_debug) # Set dummy reference
# Run solver warm-up
print("Running solver warm-up...")
status = ocp_solver.solve() # Run a warm-up solve to initialize solver's internal states
# Debug: Check solver status
if status != 0:
print(f"Solver warm-up failed with status {status}. Exiting...")
sys.exit(1)
else:
print("Solver warm-up completed successfully.")
print("Initialization phase complete.")
# Start the control loop
# Address and ports for communication
DEST_IP = "192.168.0.17"
DEST_PORT = 5816
# Create a UDP socket for receiving robot states
recv_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
recv_socket.bind(("", 5825))
print("Waiting for UDP frames on port 5825...")
# Create a socket for sending control commands
send_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
recv_socket.settimeout(0.1) # Avoid hanging
# Begin Control Loop
# Begin Control Loop
while True:
try:
current_time = time.time() - start_time # Elapsed time since control loop started
# Receive current state
try:
recv_time = time.time() # Moment où l'état est reçu
data, address = recv_socket.recvfrom(1024)
#print(f"[{recv_time:.3f}s] Received state: {data.decode('utf-8')}")
except socket.timeout:
print(f"[{time.time():.3f}s] No state received (timeout). Skipping...")
continue # Skip iteration if no data received
fields = data.decode('utf-8').strip().split(',')
if len(fields) < 22 or fields[0] != 'I' or fields[13].strip() != 'J':
print(f"[{current_time:.2f}s] Malformed frame skipped.")
continue
# Extract values
ROV_z = float(fields[3])
ROV_w = float(fields[9]) / 100
#print(f"[{ROV_w:.2f}s] ")
# Set initial state for solver
initial_state = np.array([ROV_z, ROV_w])
ocp_solver.set(0, 'x', initial_state)
print(f"Actual state: {initial_state}")
yref_segment_list = [] # Temporary list to store segments for current time step
# Update reference trajectory
for k in range(ocp.dims.N):
ref_idx = min(global_idx + k, len(z_trajectory) - 1)
yref_segment = np.array([z_trajectory[ref_idx],w_trajectory[ref_idx], 50])
#yref_segment = np.array([z_trajectory[ref_idx],w_trajectory[ref_idx], 0.0])
ocp_solver.set(k, 'yref', yref_segment)
yref_segment_list.append(yref_segment[0]) # Save the depth reference
# Debugging: Print global_idx, ref_idx, and the corresponding value in z_trajectory
#print(f"global_idx: {global_idx}, ref_idx at k={k}: {ref_idx}, z_trajectory[ref_idx]: {z_trajectory[ref_idx]}")
#print(f"At global_idx={global_idx}, ref_idx={ref_idx}, yref_segment={yref_segment}")
print(f"t={current_time:.2f}, z_ref={z_trajectory[global_idx]:.2f}, zdot_ref={w_trajectory[global_idx]:.2f}")
yref_segments_over_time.append(yref_segment_list) # Store all segments for the time step
# Terminal reference
terminal_idx = min(global_idx + ocp.dims.N, len(z_trajectory) - 1)
yref_terminal = np.array([z_trajectory[terminal_idx], 0.0])
ocp_solver.set(ocp.dims.N, 'yref', yref_terminal)
# Solve OCP
status = ocp_solver.solve()
print(f"Solver status at t={current_time:.2f}: {status}")
if status != 0:
print(f"Solver failed with status {status}")
tau_z_optimal = 0.0 # Default to a safe value if solver fails
else:
# Extract optimal control input
tau_z_optimal = ocp_solver.get(0, 'u')[0]
# **Extract Predicted States**:
predicted_states = []
for k in range(ocp.dims.N):
try:
state_k = ocp_solver.get(k, 'x') # Get predicted state at step k
predicted_states.append(state_k)
except Exception as e:
print(f"Error retrieving predicted state at step {k}: {e}")
predicted_states.append(np.nan)
# Convert to numpy array for easier handling
predicted_states = np.array(predicted_states)
# **Debugging**: Print or store predicted states
print(f"Predicted states at t={current_time:.2f}: {predicted_states}")
# **Add Data Collection Here**
# Store control input
control_inputs.append(tau_z_optimal)
# Compute and store trajectory error
error = z_trajectory[global_idx] - ROV_z
trajectory_errors.append(error)
# Retrieve and store cost function value
cost_value = ocp_solver.get_cost()
cost_function_values.append(cost_value)
# Store actual trajectory for analysis
actual_trajectory.append(ROV_z)
# Store actual zdot (w)
actual_w_trajectory.append(ROV_w)
# Send control command
send_time = time.time() # Moment où la commande est envoyée
response_message = f"S,0.00,0.00,{tau_z_optimal:.2f},0.000,0.000,0.000$\n"
send_socket.sendto(response_message.encode('utf-8'), (DEST_IP, DEST_PORT))
#print(f"[{send_time:.3f}s] Sent control command: {response_message.strip()}")
# Update global index
global_idx += 1
if global_idx >= len(z_trajectory):
print("Trajectory complete. Stopping control loop.")
break
except Exception as e:
print(f"Error: {e}")
break
data:image/s3,"s3://crabby-images/fea7e/fea7ed534c35609f298fe0d21888941b55b16607" alt="figss_page-0001|353x500"