Question about multiple solvers in parallel

Hello,

I am trying to run multiple solvers in parallel using python’s ProcessPoolExecutor. I understand that there is the batch solver, however this workflow is harder for me to implement, hence I am trying to instantiate multiple solvers.

I have previously created 2 solvers and run them successfully by giving them unique json file names and also model.name hence leading to the built c files also having unique names. This build process is sequential as I initialize one after the other. I am therefore trying to create multiples of these groups with the ProcessPoolExecutor.

The issue that I am running into is that during the first build process, it seems like the build is being truncated or it seems like the different workers are fighting for resources.

For the installation process I have left ACADOS_WITH_OPENMP = OFF and also did not set ACADOS_NUM_THREADS. I have also tried with ACADOS_WITH_OPENMP = ON and ACADOS_NUM_THREADS=1 but it also did not work. The only way I was able to make it work is by letting it build the c_generated_code one by one before calling them asynchronously.

Any help is appreciated. Thank you

Nico

import argparse

def worker(worker_id, run_indices, model_name, output_dir, data_config, config):
    import os
    from datetime import datetime
    from main import main

    os.environ["OMP_NUM_THREADS"] = "1"
    print(f"[Worker {worker_id}] starting with {len(run_indices)} runs")

    worker_output_dir = os.path.join(output_dir, f"worker_{worker_id}")
    os.makedirs(worker_output_dir, exist_ok=True)

    all_logs = {}

    for run_index in run_indices:
        # Unique timestamp per run
        run_timestamp = f"{datetime.now().strftime('%Y-%m-%d_%H-%M-%S-%f')}_run{run_index}"

        logs = main(
            model_name,
            data_collection=True,
            output_dir=worker_output_dir,
            timestamp=run_timestamp,
            data_config=data_config,
            config=config,
            worker_id=worker_id,
        )

        all_logs[run_timestamp] = logs
        print(f"[Worker {worker_id}] Finished run {run_index}")

    print(f"[Worker {worker_id}] completed all assigned runs")
    return all_logs


def run_data_collector(model_name, data_config_path="data_collection/data_config.yaml", run_dir=None, config=None):
    import multiprocessing as mp
    from concurrent.futures import ProcessPoolExecutor, as_completed
    import yaml
    import time
    import os
    from utils import save_yaml
    from data_collection import save_npz

    # Load config
    with open(data_config_path, "r") as f:
        data_config = yaml.safe_load(f)["data_collector"]

    n_runs = data_config["runs"]
    n_workers = min(mp.cpu_count(), data_config.get("workers", 2))  # m workers
    timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")

    # Output dir
    if run_dir is None:
        output_dir = os.path.join("data", f"{timestamp}_{model_name}_data_collection")
    else:
        output_dir = run_dir
    os.makedirs(output_dir, exist_ok=True)

    # Save config used
    save_yaml(data_config, os.path.join(output_dir, "data_config.yaml"))

    # Split n_runs roughly evenly across workers
    run_indices_per_worker = [[] for _ in range(n_workers)]
    for i in range(n_runs):
        run_indices_per_worker[i % n_workers].append(i)

    start_time = time.time()
    all_logs = {}

    print(f"Launching {n_workers} workers for {n_runs} runs")

    with ProcessPoolExecutor(max_workers=n_workers) as executor:
        futures = [
            executor.submit(
                worker,
                worker_id=w_id,
                run_indices=run_indices_per_worker[w_id],
                model_name=model_name,
                output_dir=output_dir,
                data_config=data_config,
                config=config,
            )
            for w_id in range(n_workers)
        ]

        for future in as_completed(futures):
            try:
                logs = future.result()
                all_logs.update(logs)
                print(f"Collected logs. Total runs so far: {len(all_logs)}/{n_runs}")
            except Exception as e:
                print("A worker failed:", e)

    elapsed = time.time() - start_time

    save_npz(f"{timestamp}_{model_name}_logs.npz", data=all_logs, output_dir=output_dir)

    print("\n=== Data collection finished ===")
    print(f"Total elapsed time: {elapsed:.2f} seconds")
    print(f"Saved to: {output_dir}")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Run a model by name")
    parser.add_argument("model", type=str, help="Name of the model to run")
    args = parser.parse_args()

    print(f"\nStarting data collection for model: {args.model}")
    run_data_collector(args.model)
    print("\nDone.")