Hello,
I am trying to run multiple solvers in parallel using python’s ProcessPoolExecutor. I understand that there is the batch solver, however this workflow is harder for me to implement, hence I am trying to instantiate multiple solvers.
I have previously created 2 solvers and run them successfully by giving them unique json file names and also model.name hence leading to the built c files also having unique names. This build process is sequential as I initialize one after the other. I am therefore trying to create multiples of these groups with the ProcessPoolExecutor.
The issue that I am running into is that during the first build process, it seems like the build is being truncated or it seems like the different workers are fighting for resources.
For the installation process I have left ACADOS_WITH_OPENMP = OFF and also did not set ACADOS_NUM_THREADS. I have also tried with ACADOS_WITH_OPENMP = ON and ACADOS_NUM_THREADS=1 but it also did not work. The only way I was able to make it work is by letting it build the c_generated_code one by one before calling them asynchronously.
Any help is appreciated. Thank you
Nico
import argparse
def worker(worker_id, run_indices, model_name, output_dir, data_config, config):
import os
from datetime import datetime
from main import main
os.environ["OMP_NUM_THREADS"] = "1"
print(f"[Worker {worker_id}] starting with {len(run_indices)} runs")
worker_output_dir = os.path.join(output_dir, f"worker_{worker_id}")
os.makedirs(worker_output_dir, exist_ok=True)
all_logs = {}
for run_index in run_indices:
# Unique timestamp per run
run_timestamp = f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f')}_run{run_index}"
logs = main(
model_name,
data_collection=True,
output_dir=worker_output_dir,
timestamp=run_timestamp,
data_config=data_config,
config=config,
worker_id=worker_id,
)
all_logs[run_timestamp] = logs
print(f"[Worker {worker_id}] Finished run {run_index}")
print(f"[Worker {worker_id}] completed all assigned runs")
return all_logs
def run_data_collector(model_name, data_config_path="data_collection/data_config.yaml", run_dir=None, config=None):
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import yaml
import time
import os
from utils import save_yaml
from data_collection import save_npz
# Load config
with open(data_config_path, "r") as f:
data_config = yaml.safe_load(f)["data_collector"]
n_runs = data_config["runs"]
n_workers = min(mp.cpu_count(), data_config.get("workers", 2)) # m workers
timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
# Output dir
if run_dir is None:
output_dir = os.path.join("data", f"{timestamp}_{model_name}_data_collection")
else:
output_dir = run_dir
os.makedirs(output_dir, exist_ok=True)
# Save config used
save_yaml(data_config, os.path.join(output_dir, "data_config.yaml"))
# Split n_runs roughly evenly across workers
run_indices_per_worker = [[] for _ in range(n_workers)]
for i in range(n_runs):
run_indices_per_worker[i % n_workers].append(i)
start_time = time.time()
all_logs = {}
print(f"Launching {n_workers} workers for {n_runs} runs")
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = [
executor.submit(
worker,
worker_id=w_id,
run_indices=run_indices_per_worker[w_id],
model_name=model_name,
output_dir=output_dir,
data_config=data_config,
config=config,
)
for w_id in range(n_workers)
]
for future in as_completed(futures):
try:
logs = future.result()
all_logs.update(logs)
print(f"Collected logs. Total runs so far: {len(all_logs)}/{n_runs}")
except Exception as e:
print("A worker failed:", e)
elapsed = time.time() - start_time
save_npz(f"{timestamp}_{model_name}_logs.npz", data=all_logs, output_dir=output_dir)
print("\n=== Data collection finished ===")
print(f"Total elapsed time: {elapsed:.2f} seconds")
print(f"Saved to: {output_dir}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Run a model by name")
parser.add_argument("model", type=str, help="Name of the model to run")
args = parser.parse_args()
print(f"\nStarting data collection for model: {args.model}")
run_data_collector(args.model)
print("\nDone.")