I am implementing an NMPC (SQP_RTI) with acados for a rotary inverted pendulum (Furuta pendulum / ADIP). The goal is setpoint stabilization at the upright position:
x_ref = [0, π, 0, 0] (arm=0, pendulum=upright, velocities=0)
The system runs in real-time on hardware (TwinCAT/Beckhoff PLC, 20ms sample time, N=50, IRK with 4 stages).
Problem
I have the suspicion that the problem is related to my wrapping. My reference for the pendulum is at π, which is exactly where atan2 wraps from +π to -π. When the pendulum approaches the reference and crosses this boundary, I think this is where the problem lies — but I am not sure.
What supports this suspicion: when I hold the pendulum manually at +170° (approaching π from below), the solver does not solutions. When I hold it at -170° (same physical distance from π, but on the other side of the wrapping boundary), the solver fails as well.
When using the following code the set-point stabilization works but the swingup does not work
ocp.cost.cost_type = "LINEAR_LS" # linear cost funkltion J = = (x - x_ref)' * Q * (x - x_ref)+ (u - u_ref)' * R * (u - u_ref)
ocp.cost.cost_type_e = "LINEAR_LS"
ocp.cost.W = np.block([[Q, np.zeros((nx, nu))], # Gewichtung mit Q und R
[np.zeros((nu, nx)), R]])
ocp.cost.W_e = P # terminal penalty
ocp.cost.Vx = np.vstack([np.eye(nx), np.zeros((nu, nx))])
ocp.cost.Vu = np.vstack([np.zeros((nx, nu)), np.eye(nu)])
ocp.cost.Vx_e = np.eye(nx)
ocp.cost.yref = np.concatenate([x_ref, [u_ref]]) # Referenz [x_ref, u_ref]
ocp.cost.yref_e = x_ref
This is my current code. here the swing-up comes close to pi but then i get the error from above
My code # ---------------- ADS / Timing ----------------
AMS_NET_ID = "134.28.49.25.1.1" # deine AMS-NetID
PLC_IP = "134.28.49.25" # IP der Steuerung
AMS_PORT = 851 # Standard-Port für PLC1
# ---------------- Encoder ----------------
ENCODER_CPR_ARM = 8192 # Counts per Revolution
ENCODER_CPR_PENDLE = 4096 # Counts per Revolution
UINT16_MAX = 65536 # 2^16 für UINT16 wrap-around
position = "down"
# ============================================================
# Hilfsfunktionen: Wrap, Encoder delta, etc.
# ============================================================
def center_y_on_ref(ax, y_values, ref_value):
"""
Skaliert die y-Achse so, dass ref_value genau in der Mitte liegt.
"""
if len(y_values) == 0:
return
dev = np.max(np.abs(y_values - ref_value))
if dev == 0:
dev = 1.0 # damit man etwas sieht
dev *= 1.1 # kleiner Sicherheitsfaktor
ax.set_ylim(ref_value - dev, ref_value + dev)
def convert_to_radiant(counts, cpr=8192, offset=0):
angle_rad = ((counts - offset) / cpr) * 2.0 * np.pi
return angle_rad % (2.0 * np.pi) # <- wrap auf [0, 2π)
def handle_encoder_overflow(current, previous, max_value=65536):
diff = current - previous
# Überlauf nach oben (0 -> 65535)
if diff < -max_value / 2:
diff += max_value
# Überlauf nach unten (65535 -> 0)
elif diff > max_value / 2:
diff -= max_value
return diff
def counts_to_rad_delta(diff_counts, cpr=8192):
return (diff_counts / cpr) * 2.0 * np.pi # KEIN modulo!
def convert_to_minus_pi_and_pi(a):
return ca.atan2(ca.sin(a), ca.cos(a))
# --- Für NumPy / echte Zahlen (Runtime) ---
def wrap_angle_np(a):
return (a + np.pi) % (2.0 * np.pi) - np.pi
def bounds_symmetric_around_zero(deg):
rad = np.deg2rad(deg)
theta_min = -rad
theta_max = rad
return theta_min, theta_max
arm_boundary_min, arm_boundary_max = bounds_symmetric_around_zero(10)
def bounds_around_180(half_width_deg):
h = np.deg2rad(half_width_deg)
theta_min_pos = np.pi - h # z.B. 170°
theta_max_pos = np.pi # 180°
theta_min_neg = -np.pi # -180°
theta_max_neg = -np.pi + h # z.B. -170°
return theta_min_pos, theta_max_pos, theta_min_neg, theta_max_neg
pendlum_boundary_min_pos, pendlum_boundary_max_pos, pendlum_boundary_min_neg, pendlum_boundary_max_neg = bounds_around_180(10)
# ─────────────────────────────────────────────────────────────────────────────
# Model parameter
# ─────────────────────────────────────────────────────────────────────────────
m1 = 0.09
m2 = 0.08
mh = 0.10
L_1 = 0.1492
L_2 = 0.1778
g = 9.81
km = 0.0741
# ─────────────────────────────────────────────────────────────────────────────
# NMPC PARAMETER und contrains
# ─────────────────────────────────────────────────────────────────────────────
N = 50
dt = 0.02
T_sim = 4
n_steps = int(T_sim / dt)
U = 5
x_ref = np.array([0.0, np.pi, 0.0, 0.0])
x0 = np.array([0.0, 0.0, 0.0, 0.0])
u_ref = 0.0
Q = np.diag([20, 50, 1, 10])
R = np.diag([10])
B = np.diag([1e8])
# ------------------------------------logging---------------------------
t_history = []
x_history = np.zeros((4, n_steps + 1))
u_history = np.zeros((1, n_steps))
status_history = np.zeros(n_steps)
qp_status_history = np.zeros(n_steps)
u_opt_history = np.zeros((N, n_steps))
fallback_mask = np.zeros(n_steps, dtype=bool) # True = Fallback wurde genutzt
solver_time = []
twincat_time = []
error_history = np.zeros((4, n_steps))
# ------------------------------------logging---------------------------
# ─────────────────────────────────────────────────────────────────────────────
# Terminal Cost (LQR)
# ─────────────────────────────────────────────────────────────────────────────
A_down = np.array([
[0.0, 0.0, 1.0, 0.0],
[0.0, 0.0, 0.0, 1.0],
[-77.7, 42.9, 0.0, 0.0],
[57.5, 139.3, 0.0, 0.0]
])
B_down = np.array([[0.0], [0.0], [27.2], [18.4]])
C_mat = np.eye(4)
D_mat = np.zeros((4, 1))
Ad, Bd, _, _, _ = cont2discrete((A_down, B_down, C_mat, D_mat), dt, method="zoh")
_, P, _ = control.dlqr(Ad, Bd, Q, R)
# ─────────────────────────────────────────────────────────────────────────────
# Acados Modell (CasADi symbolisch, nur für den Solver)
# ─────────────────────────────────────────────────────────────────────────────
def build_acados_model():
model = AcadosModel()
model.name = "adip"
x = ca.MX.sym("x", 4)
u = ca.MX.sym("u", 1)
theta, alpha, theta_dot, alpha_dot = x[0], x[1], x[2], x[3]
Force = u[0]
denom = L_1**2 * (4*ca.cos(alpha)**2 * m2 - m1 - 4*m2 - 4*mh)
theta_ddot = (
-(1 / denom) * 2 * (
2*L_1**2 * ca.cos(alpha)*ca.sin(alpha) * theta_dot**2 * m2
- 2*L_1**2 * ca.cos(alpha)*ca.sin(alpha) * theta_dot * alpha_dot * m2
+ L_2*m2*L_1 * theta_dot**2 * ca.sin(alpha)
+ m2*L_2 * alpha_dot * L_1 * theta_dot * ca.sin(alpha)
+ m2*L_2 * alpha_dot**2 * L_1 * ca.sin(alpha)
+ 2*L_1 * ca.cos(alpha)**2 * ca.sin(theta) * g * m2
+ 2*L_1 * ca.cos(alpha)*ca.cos(theta)*ca.sin(alpha) * g * m2
- m1*g*L_1 * ca.sin(theta)
- 2*m2*g*L_1 * ca.sin(theta)
- 2*mh*g*L_1 * ca.sin(theta)
+ 2*Force
)
)
alpha_ddot = (
(1 / (L_2 * denom)) * 2 * (
4*L_1**2*L_2 * ca.cos(alpha)*ca.sin(alpha) * theta_dot**2 * m2
+ 2*L_1**2*L_2 * ca.cos(alpha)*ca.sin(alpha) * alpha_dot**2 * m2
+ L_1**3 * ca.sin(alpha) * theta_dot**2 * m1
+ 4*L_1**3 * ca.sin(alpha) * theta_dot**2 * m2
+ 4*L_1**3 * ca.sin(alpha) * theta_dot**2 * mh
- L_1**3 * ca.sin(alpha) * theta_dot * alpha_dot * m1
- 4*L_1**3 * ca.sin(alpha) * theta_dot * alpha_dot * m2
- 4*L_1**3 * ca.sin(alpha) * theta_dot * alpha_dot * mh
+ L_1*L_2**2 * ca.sin(alpha) * theta_dot**2 * m2
+ L_2**2*m2*L_1 * alpha_dot * theta_dot * ca.sin(alpha)
+ m2*L_2**2 * alpha_dot**2 * L_1 * ca.sin(alpha)
+ 2*L_1*L_2 * ca.cos(alpha)**2 * ca.sin(theta) * g * m2
+ 2*L_1*L_2 * ca.cos(alpha)*ca.cos(theta)*ca.sin(alpha) * g * m2
- L_1**2 * ca.cos(alpha)*ca.sin(theta) * g * m1
+ L_1**2 * ca.cos(theta)*ca.sin(alpha) * g * m1
+ 4*L_1**2 * ca.cos(theta)*ca.sin(alpha) * g * m2
+ 4*L_1**2 * ca.cos(theta)*ca.sin(alpha) * g * mh
- ca.sin(theta)*g*L_1*m1*L_2
- 2*ca.sin(theta)*g*L_1*m2*L_2
- 2*ca.sin(theta)*g*L_1*mh*L_2
+ 4*Force*L_1*ca.cos(alpha)
+ 2*Force*L_2
)
)
f_expl = ca.vertcat(theta_dot, alpha_dot, theta_ddot, alpha_ddot)
xdot = ca.MX.sym("xdot", 4)
u_prev = ca.MX.sym("u_prev", 1)
model.x = x
model.u = u
model.f_expl_expr = f_expl
model.f_impl_expr = xdot - f_expl
model.xdot = xdot
model.p = u_prev
return model
# ─────────────────────────────────────────────────────────────────────────────
# Acados OCP Setup
# ─────────────────────────────────────────────────────────────────────────────
def create_ocp_solver(x_ref, u_ref):
ocp = AcadosOcp() # create ocp object to formulate the OCP
model = build_acados_model() # set model
ocp.model = model # set model
nx = 4 # 4 states
nu = 1 # 1 input
x = ocp.model.x
u = ocp.model.u
u_prev = ocp.model.p
ocp.dims.N = N
ocp.parameter_values = np.array([0.0])
ocp.cost.cost_type = "EXTERNAL"
ocp.cost.cost_type_e = "EXTERNAL"
error_x = x - x_ref
error_u = u - u_ref
change_of_u = u - u_prev
error_x[0] = convert_to_minus_pi_and_pi(error_x[0])
error_x[1] = convert_to_minus_pi_and_pi(error_x[1])
model.cost_expr_ext_cost = error_x.T @ Q @ error_x + error_u.T @ R @ error_u + change_of_u.T @ B @ change_of_u
model.cost_expr_ext_cost_e = error_x.T @ P @ error_x
ocp.constraints.x0 = x0 # Anfangsbedingung
ocp.constraints.lbu = np.array([-3.5* km])
ocp.constraints.ubu = np.array([3.5* km])
ocp.constraints.idxbu = np.array([0])
ocp.solver_options.tf = N * dt # istdas richitg für mich ?
ocp.solver_options.integrator_type = "IRK" # ERK, IRK, GNSF, DISCRETE # Explicit Runge-Kutta # Implicit Runge-Kutta -> IRK benutzen
ocp.solver_options.num_stages = 4 # 1, 2, 3, 4 (Runge-Kutta Stufen)
ocp.solver_options.num_steps = 1 # 1 default, 2, 3, ... (mehr = genauer, langsamer) Number of steps in the integrator.
ocp.solver_options.qp_solver = "PARTIAL_CONDENSING_HPIPM" # PARTIAL_CONDENSING_HPIPM, FULL_CONDENSING_HPIPM, FULL_CONDENSING_QPOASES
ocp.solver_options.hpipm_mode = "BALANCE" # SPEED_ABS, SPEED, BALANCE, ROBUST
ocp.solver_options.qp_solver_iter_max = 400 # 50, 100, 200, 400, ...
ocp.solver_options.nlp_solver_type = "SQP_RTI" # SQP, SQP_RTI
ocp.solver_options.nlp_solver_max_iter = 1 # 1 (RTI), 50, 100, 200 (SQP)
ocp.solver_options.hessian_approx = "EXACT" # GAUSS_NEWTON, EXACT
ocp.solver_options.regularize_method = "CONVEXIFY" # NO_REGULARIZE, MIRROR, PROJECT, PROJECT_REDUC_HESS, CONVEXIFY, GERSHGORIN_LEVENBERG_MARQUARDT # nochmka nachguckebn
ocp.solver_options.levenberg_marquardt = 0.0 # adds an constant term to the hessain
ocp.solver_options.reg_epsilon = 1e-2 # 1e-3, 1e-4, 1e-6, ...
ocp.solver_options.qp_tol = 1e-2 # 1e-3, 1e-4, 1e-6, 1e-8
ocp.solver_options.tol = 1e-2 # 1e-3, 1e-4, 1e-6, 1e-8
ocp.solver_options.print_level = 0 # 0 (still), 1 (iter), 2 (verbose)
solver = AcadosOcpSolver(ocp, json_file="adip_ocp.json")
return solver
# ─────────────────────────────────────────────────────────────────────────────
# Hilfsfunktionen: Lösung auslesen / Warm-Start setzen
# ─────────────────────────────────────────────────────────────────────────────
def get_solution(solver):
"""Komplette Trajektorie aus dem Solver auslesen – analog zu sol.value(X/U)."""
x_opt = np.array([solver.get(i, "x") for i in range(N + 1)]).T # (4, N+1)
u_opt = np.array([solver.get(i, "u") for i in range(N)]).T # (1, N)
return u_opt, x_opt
def set_warm_start(solver, x_init, u_init):
"""Warm-Start setzen – analog zu opti.set_initial(X, x_init)."""
for i in range(N):
solver.set(i, "x", x_init[:, i])
solver.set(i, "u", u_init[:, i])
solver.set(N, "x", x_init[:, N])
# ----------------Initialisierung ----------------
plc = pyads.Connection(AMS_NET_ID, pyads.PORT_TC3PLC1)
plc.open()
enc = plc.read_by_name("GVL.Enc")
prev_EncCounter_Ch1, prev_EncCounter_Ch2 = enc[0], enc[1]
plc.write_by_name("GVL.Motor_value", 0) # Motor auf 0 setzen
# ─────────────────────────────────────────────────────────────────────────────
# MPC Closed-Loop Simulation
# ─────────────────────────────────────────────────────────────────────────────
print("Starte MPC-Simulation ...")
x_init_warm = None
u_init_warm = None
counter = 0
planned_u = np.zeros((1, N))
planned_x = np.zeros((4, N + 1))
for i in range(N + 1):
planned_x[:, i] = x0
solver = create_ocp_solver(x_ref, u_ref)
try:
while counter < n_steps:
counter = counter + 1
t0 = time.perf_counter()
time_twincat_1 = time.perf_counter()
# Encoder auslesen
enc = plc.read_by_name("GVL.Enc")
EncCounter_Ch1, EncCounter_Ch2 = enc[0], enc[1] # raw value
time_twincat_2 = time.perf_counter()
twincat_diff = time_twincat_2 - time_twincat_1
twincat_time.append(twincat_diff)
# Winkel berechnen
arm_angle = convert_to_radiant(EncCounter_Ch1, ENCODER_CPR_ARM) # encoder Daten zu Radiant
pendle_angle = convert_to_radiant(EncCounter_Ch2, ENCODER_CPR_PENDLE) # encoder Daten zu Radiant
if abs(wrap_angle_np(arm_angle) - np.pi) < np.deg2rad(10):
print("Pendelwinkel = π erreicht. Schleife wird beendet.")
break
arm_angle_pi = wrap_angle_np(arm_angle)
pendle_angle_pi = wrap_angle_np(pendle_angle)
# Anfangsbedingung prüfen (nur im ersten Schritt)
if counter == 1:
ARM_TOL_DEG = 5.0 # ±5° Toleranz für Arm
PENDEL_TOL_DEG = 5.0 # ±5° Toleranz für Pendel
arm_deg = np.rad2deg(arm_angle_pi)
pendel_deg = np.rad2deg(pendle_angle_pi)
if abs(arm_deg) > ARM_TOL_DEG or abs(pendel_deg) > PENDEL_TOL_DEG:
print("Pendelwinkel Fehlerhaft.")
break # finally-Block übernimmt plc.write + plc.close
# Differenz mit Überlauf-Behandlung berechnen
diff_EncCounter_Ch1 = handle_encoder_overflow(EncCounter_Ch1, prev_EncCounter_Ch1, UINT16_MAX)
diff_EncCounter_Ch2 = handle_encoder_overflow(EncCounter_Ch2, prev_EncCounter_Ch2, UINT16_MAX)
# Winkelgeschwindigkeit berechnen
omega1 = counts_to_rad_delta(diff_EncCounter_Ch1, ENCODER_CPR_ARM) / dt
omega2 = counts_to_rad_delta(diff_EncCounter_Ch2, ENCODER_CPR_PENDLE) / dt
# Aktueller Zustand
x_current = np.array([arm_angle_pi, pendle_angle_pi, omega1, omega2])
x_history[:, counter - 1] = x_current
error_x = x_current - x_ref
error_x[0] = convert_to_minus_pi_and_pi(error_x[0])
error_x[1] = convert_to_minus_pi_and_pi(error_x[1])
error_history[:, counter - 1] = error_x
# Anfangsbedingung setzen
solver.set(0, "lbx", x_current)
solver.set(0, "ubx", x_current)
for i in range(N):
solver.set(i, "p", np.array([planned_u[0, i]]))
solver_1 = time.perf_counter()
solver.solve()
solver_2 = time.perf_counter()
solver_time_current = solver_2 - solver_1
solver_time.append(solver_time_current)
status = solver.status
qp_status = solver.get_stats('qp_stat')
if status == 0:
# Erfolgreich: Neue Lösung auslesen und anwenden
u_opt, x_opt = get_solution(solver)
u_apply = np.clip(float(u_opt[0, 0]) / km, -U, U)
input = -1 * u_apply * 555.556
motor_value = int(input)
plc.write_by_name("GVL.Motor_value", motor_value)
# Trajektorie für den NÄCHSTEN Schritt vorbereiten
planned_u = np.hstack([u_opt[:, 1:], np.zeros((1, 1))])
planned_x = np.hstack([x_opt[:, 1:], x_opt[:, -1:]])
# Warm-Start und Logging nur bei Erfolg
u_opt_history[:, counter - 1] = u_opt[0, :] / km
else:
# Fallback: Solver hat keine Lösung gefunden
u_apply = np.clip(float(u_opt[0, 0]) / km, -U, U)
input = -1 * u_apply * 555.556
motor_value = int(input)
plc.write_by_name("GVL.Motor_value", motor_value)
print(f"Schritt {counter}: Solver Status = {status} -> Nutze Fallback! u_apply = {u_apply} " )
# Gespeicherte Trajektorie weiter nach links verschieben
planned_u = np.hstack([planned_u[:, 1:], np.zeros((1, 1))])
planned_x = np.hstack([planned_x[:, 1:], planned_x[:, -1:]])
fallback_mask[counter - 1] = True
u_opt_history[:, counter - 1] = planned_u[0, :]
# Status loggen
status_history[counter - 1] = status
qp_status_history[counter - 1] = qp_status[-1]
u_history[:, counter - 1] = u_apply
# Warm-Start für nächsten Schritt setzen
set_warm_start(solver, planned_x, planned_u)
# ---------- loggen ----------
print(
f"Schritt {counter} | Status={status} | "
f"input = u={float(u_apply):+7.2f}| motorvalue= {motor_value:6d} | "
f"θ1={np.rad2deg(arm_angle_pi):+7.2f} deg, "
f"θ2={np.rad2deg(pendle_angle_pi):+7.2f} deg, "
f"ω1={omega1:+7.2f} rad/s, "
f"ω2={omega2:+7.2f} rad/s | "
f"e_θ={np.rad2deg(error_x[0]):+7.2f}°, "
f"e_α={np.rad2deg(error_x[1]):+7.2f}°, "
)
# Update für nächste Iteration
prev_EncCounter_Ch1 = EncCounter_Ch1
prev_EncCounter_Ch2 = EncCounter_Ch2
loop_dt = time.perf_counter() - t0
print(loop_dt)
t_history.append(loop_dt)
time.sleep(max(0.0, dt - loop_dt))
except KeyboardInterrupt:
print("\nProgramm beendet.")
finally:
plc.write_by_name("GVL.Motor_value", 0) # Motor ausschalten
plc.close()
print("PLC-Verbindung geschlossen.")