Skip to content

Controller API

Orchestration helpers for remote runs, Ansible execution, and run catalogs.

Public surface

api

Public controller API surface.

Classes

AnsibleRunnerExecutor

AnsibleRunnerExecutor(private_data_dir=None, runner_fn=None, stream_output=False, output_callback=None, stop_token=None)

Bases: RemoteExecutor

Remote executor implemented with ansible-runner.

Initialize the executor.

Parameters:

Name Type Description Default
private_data_dir Optional[Path]

Directory used by ansible-runner.

None
runner_fn Optional[Callable[..., Any]]

Optional runner callable for testing. Defaults to ansible_runner.run when not provided.

None
stream_output bool

When True, stream Ansible stdout events to the local process (useful for visibility in long-running tasks).

False
output_callback Optional[Callable[[str, str], None]]

Optional callback to handle stdout stream. Signature: (text: str, end: str) -> None

None
Source code in lb_controller/adapters/ansible_runner.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    private_data_dir: Optional[Path] = None,
    runner_fn: Optional[Callable[..., Any]] = None,
    stream_output: bool = False,
    output_callback: Optional[Callable[[str, str], None]] = None,
    stop_token: StopToken | None = None,
):
    """
    Initialize the executor.

    Args:
        private_data_dir: Directory used by ansible-runner.
        runner_fn: Optional runner callable for testing. Defaults to
            ansible_runner.run when not provided.
        stream_output: When True, stream Ansible stdout events to the local
            process (useful for visibility in long-running tasks).
        output_callback: Optional callback to handle stdout stream.
                         Signature: (text: str, end: str) -> None
    """
    self.private_data_dir = private_data_dir or Path(".ansible_runner")
    self.private_data_dir.mkdir(parents=True, exist_ok=True)
    self.event_log_path = self.private_data_dir / "lb_events.jsonl"
    self._runner_fn = runner_fn
    self.stream_output = stream_output
    self.stop_token = stop_token
    self._active_label: str | None = None
    # Force Ansible temp into a writable location inside the runner dir to avoid
    # host-level permission issues.
    self.local_tmp = self.private_data_dir / "tmp"
    self.local_tmp.mkdir(parents=True, exist_ok=True)
    if stream_output and output_callback is None:
        # Default to streaming to stdout when caller requests streaming but
        # doesn't provide a handler.
        def _default_cb(text: str, end: str = "") -> None:
            sys.stdout.write(text + end)
            sys.stdout.flush()

        self.output_callback = _default_cb
    else:
        self.output_callback = output_callback
    self._inventory_writer = InventoryWriter(self.private_data_dir)
    self._env_builder = AnsibleEnvBuilder(
        private_data_dir=self.private_data_dir,
        local_tmp=self.local_tmp,
        event_log_path=self.event_log_path,
        ansible_root=ANSIBLE_ROOT,
    )
    self._extravars_writer = ExtravarsWriter(self.private_data_dir)
    self._command_builder = PlaybookCommandBuilder()
    self._process_runner = PlaybookProcessRunner(
        private_data_dir=self.private_data_dir,
        stream_output=self.stream_output,
        output_callback=self.output_callback,
        stop_token=self.stop_token,
    )
Attributes
event_log_path instance-attribute
event_log_path = private_data_dir / 'lb_events.jsonl'
is_running property
is_running

Return True when a playbook is in-flight.

local_tmp instance-attribute
local_tmp = private_data_dir / 'tmp'
output_callback instance-attribute
output_callback = _default_cb
private_data_dir instance-attribute
private_data_dir = private_data_dir or Path('.ansible_runner')
stop_token instance-attribute
stop_token = stop_token
stream_output instance-attribute
stream_output = stream_output
Functions
interrupt
interrupt()

Request interruption of the current playbook execution.

Source code in lb_controller/adapters/ansible_runner.py
177
178
179
180
def interrupt(self) -> None:
    """Request interruption of the current playbook execution."""
    self._process_runner.interrupt()
    self._active_label = None
run_playbook
run_playbook(playbook_path, inventory, extravars=None, tags=None, limit_hosts=None, *, cancellable=True)

Execute a playbook using ansible-runner.

Source code in lb_controller/adapters/ansible_runner.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def run_playbook(
    self,
    playbook_path: Path,
    inventory: InventorySpec,
    extravars: Optional[Dict[str, Any]] = None,
    tags: Optional[List[str]] = None,
    limit_hosts: Optional[List[str]] = None,
    *,
    cancellable: bool = True,
) -> ExecutionResult:
    """Execute a playbook using ansible-runner."""
    self._process_runner.clear_interrupt()
    stop_result = self._maybe_stop(cancellable)
    if stop_result is not None:
        return stop_result
    if not playbook_path.exists():
        raise FileNotFoundError(f"Playbook not found: {playbook_path}")

    inventory_path = self._inventory_writer.prepare(inventory)
    runner_fn = self._runner_fn or self._import_runner()

    # Ensure playbook path is absolute so runner can find it
    # regardless of private_data_dir location
    abs_playbook_path = playbook_path.resolve()

    label = abs_playbook_path.name
    logger.info(
        "Running playbook %s against %d host(s)", label, len(inventory.hosts)
    )
    self._active_label = label

    merged_extravars = self._merge_extravars(extravars, inventory_path)
    envvars = self._env_builder.build()

    try:
        result = self._execute_playbook(
            runner_fn=runner_fn,
            abs_playbook_path=abs_playbook_path,
            inventory_path=inventory_path,
            extravars=merged_extravars,
            tags=tags,
            envvars=envvars,
            limit_hosts=limit_hosts,
            cancellable=cancellable,
        )
    finally:
        self._active_label = None

    return self._finalize_result(playbook_path, result)

BenchmarkController

BenchmarkController(config, options=None)

Controller coordinating remote benchmark runs.

Source code in lb_controller/engine/controller.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    config: BenchmarkConfig,
    options: ControllerOptions | None = None,
) -> None:
    self.config = config
    from lb_controller.services.paths import apply_playbook_defaults
    from lb_plugins.api import create_registry, merge_plugin_assets

    apply_playbook_defaults(self.config)
    merge_plugin_assets(self.config, create_registry())
    self._options = options or ControllerOptions()
    self.output_formatter = self._options.output_formatter
    self.stop_token = self._options.stop_token
    self._stop_timeout_s = self._options.stop_timeout_s
    self.lifecycle = RunLifecycle()
    self.state_machine = self._options.state_machine or ControllerStateMachine()
    self.executor = self._options.build_executor()
    self._use_progress_stream = True
    self._journal_refresh = self._options.journal_refresh

    self.services = ControllerServices(
        config=self.config,
        executor=self.executor,
        output_formatter=self.output_formatter,
        stop_token=self.stop_token,
        lifecycle=self.lifecycle,
        journal_refresh=self._journal_refresh,
        use_progress_stream=self._use_progress_stream,
    )

    self._ui = UINotifier(
        output_formatter=self.output_formatter,
        journal_refresh=self._journal_refresh,
    )
    self.workload_runner = WorkloadRunner(
        config=self.config,
        ui_notifier=self._ui,
    )
    self.teardown_service = TeardownService()
    self._current_session: Optional[RunSession] = None
    self._session_builder = RunSessionBuilder(
        config=self.config,
        state_machine=self.state_machine,
        stop_timeout_s=self._stop_timeout_s,
        journal_refresh=self._ui.refresh_journal,
        collector_packages=self._collector_apt_packages,
    )
    self._orchestrator = RunOrchestrator(
        services=self.services,
        workload_runner=self.workload_runner,
        teardown_service=self.teardown_service,
        ui_notifier=self._ui,
    )
Attributes
config instance-attribute
config = config
coordinator property
coordinator
executor instance-attribute
executor = build_executor()
lifecycle instance-attribute
lifecycle = RunLifecycle()
output_formatter instance-attribute
output_formatter = output_formatter
services instance-attribute
services = ControllerServices(config=config, executor=executor, output_formatter=output_formatter, stop_token=stop_token, lifecycle=lifecycle, journal_refresh=_journal_refresh, use_progress_stream=_use_progress_stream)
state_machine instance-attribute
state_machine = state_machine or ControllerStateMachine()
stop_token instance-attribute
stop_token = stop_token
teardown_service instance-attribute
teardown_service = TeardownService()
workload_runner instance-attribute
workload_runner = WorkloadRunner(config=config, ui_notifier=_ui)
Functions
on_event
on_event(event)

Process an event for stop coordination.

Source code in lb_controller/engine/controller.py
87
88
89
90
def on_event(self, event: RunEvent) -> None:
    """Process an event for stop coordination."""
    if self.coordinator:
        self.coordinator.process_event(event)
run
run(test_types, run_id=None, journal=None, resume=False, journal_path=None)

Execute the configured benchmarks on remote hosts.

Parameters:

Name Type Description Default
test_types List[str]

List of benchmark identifiers to execute.

required
run_id Optional[str]

Optional run identifier. If not provided, a timestamp-based id is generated.

None
journal Optional[RunJournal]

Optional pre-loaded journal used for resume flows.

None
resume bool

When True, reuse the provided journal instead of creating a new one.

False
journal_path Optional[Path]

Optional override for where the journal is persisted.

None
Source code in lb_controller/engine/controller.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def run(
    self,
    test_types: List[str],
    run_id: Optional[str] = None,
    journal: Optional[RunJournal] = None,
    resume: bool = False,
    journal_path: Optional[Path] = None,
) -> RunExecutionSummary:
    """
    Execute the configured benchmarks on remote hosts.

    Args:
        test_types: List of benchmark identifiers to execute.
        run_id: Optional run identifier. If not provided, a timestamp-based
            id is generated.
        journal: Optional pre-loaded journal used for resume flows.
        resume: When True, reuse the provided journal instead of creating a new one.
        journal_path: Optional override for where the journal is persisted.
    """
    if not self.config.remote_hosts:
        raise ValueError("At least one remote host must be configured.")
    if resume and journal is None:
        raise ValueError("Resume requested without a journal instance.")

    session = self._session_builder.build(
        test_types=test_types,
        run_id=run_id,
        journal=journal,
        journal_path=journal_path,
    )
    self._current_session = session
    return self._orchestrator.run(session, resume_requested=resume)

ConnectivityReport dataclass

ConnectivityReport(results=list(), timeout_seconds=10)

Aggregated report of connectivity checks for multiple hosts.

Attributes
all_reachable property
all_reachable

Return True if all hosts are reachable.

reachable_count property
reachable_count

Return count of reachable hosts.

results class-attribute instance-attribute
results = field(default_factory=list)
timeout_seconds class-attribute instance-attribute
timeout_seconds = 10
total_count property
total_count

Return total number of hosts checked.

unreachable_hosts property
unreachable_hosts

Return list of unreachable host names.

ConnectivityService

ConnectivityService(timeout_seconds=None)

Service for checking SSH connectivity to remote hosts.

Initialize the connectivity service.

Parameters:

Name Type Description Default
timeout_seconds int | None

Default timeout for connectivity checks. Defaults to 10 seconds.

None
Source code in lb_controller/services/connectivity_service.py
63
64
65
66
67
68
69
70
def __init__(self, timeout_seconds: int | None = None) -> None:
    """Initialize the connectivity service.

    Args:
        timeout_seconds: Default timeout for connectivity checks.
            Defaults to 10 seconds.
    """
    self._timeout_seconds = timeout_seconds or self.DEFAULT_TIMEOUT_SECONDS
Attributes
DEFAULT_TIMEOUT_SECONDS class-attribute instance-attribute
DEFAULT_TIMEOUT_SECONDS = 10
Functions
check_hosts
check_hosts(hosts, timeout_seconds=None)

Check connectivity to all specified hosts.

Parameters:

Name Type Description Default
hosts list[RemoteHostConfig]

List of remote host configurations to check.

required
timeout_seconds int | None

Optional timeout override for this check.

None

Returns:

Type Description
ConnectivityReport

ConnectivityReport with results for each host.

Source code in lb_controller/services/connectivity_service.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def check_hosts(
    self,
    hosts: list[RemoteHostConfig],
    timeout_seconds: int | None = None,
) -> ConnectivityReport:
    """Check connectivity to all specified hosts.

    Args:
        hosts: List of remote host configurations to check.
        timeout_seconds: Optional timeout override for this check.

    Returns:
        ConnectivityReport with results for each host.
    """
    timeout = timeout_seconds or self._timeout_seconds
    results = []

    for host in hosts:
        result = self._check_single_host(host, timeout)
        results.append(result)

    return ConnectivityReport(results=results, timeout_seconds=timeout)

ControllerOptions dataclass

ControllerOptions(executor=None, output_callback=None, output_formatter=None, journal_refresh=None, stop_token=None, stop_timeout_s=30.0, state_machine=None)

Optional dependencies and hooks for BenchmarkController.

Attributes
executor class-attribute instance-attribute
executor = None
journal_refresh class-attribute instance-attribute
journal_refresh = None
output_callback class-attribute instance-attribute
output_callback = None
output_formatter class-attribute instance-attribute
output_formatter = None
state_machine class-attribute instance-attribute
state_machine = None
stop_timeout_s class-attribute instance-attribute
stop_timeout_s = 30.0
stop_token class-attribute instance-attribute
stop_token = None
Functions
build_executor
build_executor()
Source code in lb_controller/models/controller_options.py
26
27
28
29
30
31
32
33
34
def build_executor(self) -> RemoteExecutor:
    if self.executor:
        return self.executor
    stream = self.output_callback is not None
    return AnsibleRunnerExecutor(
        output_callback=self.output_callback,
        stream_output=stream,
        stop_token=self.stop_token,
    )

ControllerRunner

ControllerRunner(run_callable, stop_token=None, on_state_change=None, state_machine=None)

Run a BenchmarkController in a dedicated thread with state tracking.

Parameters:

Name Type Description Default
run_callable Callable[[], Any]

A callable that executes the controller and returns a summary.

required
stop_token StopToken | None

Optional stop token to request graceful termination.

None
on_state_change Optional[StateCallback]

Optional callback invoked on every state transition.

None
Source code in lb_controller/adapters/remote_runner.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
    self,
    run_callable: Callable[[], Any],
    stop_token: StopToken | None = None,
    on_state_change: Optional[StateCallback] = None,
    state_machine: ControllerStateMachine | None = None,
) -> None:
    """
    Args:
        run_callable: A callable that executes the controller and returns a summary.
        stop_token: Optional stop token to request graceful termination.
        on_state_change: Optional callback invoked on every state transition.
    """
    self._run_callable = run_callable
    self._stop_token = stop_token
    self._machine = state_machine or ControllerStateMachine()
    if on_state_change:
        self._machine.register_callback(on_state_change)
    self._thread: threading.Thread | None = None
    self._result: Any = None
    self._exception: BaseException | None = None
    self._done = threading.Event()
Attributes
exception property
exception
result property
result
state property
state
Functions
arm_stop
arm_stop(reason=None)

Signal the controller to stop gracefully.

Source code in lb_controller/adapters/remote_runner.py
74
75
76
77
78
79
80
81
82
83
84
85
def arm_stop(self, reason: str | None = None) -> None:
    """Signal the controller to stop gracefully."""
    try:
        self._machine.transition(ControllerState.STOP_ARMED, reason=reason)
    except Exception:
        # Ignore invalid transition; best-effort arming
        pass
    if self._stop_token:
        try:
            self._stop_token.request_stop()
        except Exception:
            pass
start
start()

Start the controller thread.

Source code in lb_controller/adapters/remote_runner.py
54
55
56
57
58
59
60
61
62
63
def start(self) -> None:
    """Start the controller thread."""
    if self._thread and self._thread.is_alive():
        return
    self._thread = threading.Thread(
        target=self._run,
        name="lb-controller-runner",
        daemon=True,
    )
    self._thread.start()
wait
wait(timeout=None)

Block until completion or timeout; re-raise exceptions from the worker.

Source code in lb_controller/adapters/remote_runner.py
65
66
67
68
69
70
71
72
def wait(self, timeout: float | None = None) -> Any:
    """Block until completion or timeout; re-raise exceptions from the worker."""
    finished = self._done.wait(timeout=timeout)
    if not finished:
        return None
    if self._exception:
        raise self._exception
    return self._result

ControllerState

Bases: str, Enum

Phase-aware controller lifecycle states.

Attributes
ABORTED class-attribute instance-attribute
ABORTED = 'aborted'
FAILED class-attribute instance-attribute
FAILED = 'failed'
FINISHED class-attribute instance-attribute
FINISHED = 'finished'
INIT class-attribute instance-attribute
INIT = 'init'
RUNNING_GLOBAL_SETUP class-attribute instance-attribute
RUNNING_GLOBAL_SETUP = 'running_global_setup'
RUNNING_GLOBAL_TEARDOWN class-attribute instance-attribute
RUNNING_GLOBAL_TEARDOWN = 'running_global_teardown'
RUNNING_WORKLOADS class-attribute instance-attribute
RUNNING_WORKLOADS = 'running_workloads'
STOPPING_INTERRUPT_SETUP class-attribute instance-attribute
STOPPING_INTERRUPT_SETUP = 'stopping_interrupt_setup'
STOPPING_INTERRUPT_TEARDOWN class-attribute instance-attribute
STOPPING_INTERRUPT_TEARDOWN = 'stopping_interrupt_teardown'
STOPPING_TEARDOWN class-attribute instance-attribute
STOPPING_TEARDOWN = 'stopping_teardown'
STOPPING_WAIT_RUNNERS class-attribute instance-attribute
STOPPING_WAIT_RUNNERS = 'stopping_wait_runners'
STOP_ARMED class-attribute instance-attribute
STOP_ARMED = 'stop_armed'
STOP_FAILED class-attribute instance-attribute
STOP_FAILED = 'stop_failed'

ControllerStateMachine

ControllerStateMachine()

Thread-safe controller state tracker.

Source code in lb_controller/models/state.py
104
105
106
107
108
def __init__(self) -> None:
    self._state = ControllerState.INIT
    self._lock = threading.RLock()
    self._reason: Optional[str] = None
    self._callbacks: list[Callable[[ControllerState, Optional[str]], None]] = []
Attributes
reason property
reason
state property
state
Functions
allows_cleanup
allows_cleanup()
Source code in lb_controller/models/state.py
124
125
126
127
128
129
def allows_cleanup(self) -> bool:
    with self._lock:
        return self._state in {
            ControllerState.FINISHED,
            ControllerState.ABORTED,
        }
is_terminal
is_terminal()
Source code in lb_controller/models/state.py
120
121
122
def is_terminal(self) -> bool:
    with self._lock:
        return self._state in _TERMINAL_STATES
register_callback
register_callback(callback)

Register a callback invoked on every transition.

Source code in lb_controller/models/state.py
131
132
133
134
135
def register_callback(
    self, callback: Callable[[ControllerState, Optional[str]], None]
) -> None:
    """Register a callback invoked on every transition."""
    self._callbacks.append(callback)
snapshot
snapshot()
Source code in lb_controller/models/state.py
158
159
160
def snapshot(self) -> tuple[ControllerState, Optional[str]]:
    with self._lock:
        return self._state, self._reason
transition
transition(new_state, reason=None)

Attempt a state transition; raise ValueError if invalid.

Source code in lb_controller/models/state.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def transition(
    self, new_state: ControllerState, reason: Optional[str] = None
) -> ControllerState:
    """Attempt a state transition; raise ValueError if invalid."""
    with self._lock:
        allowed = _ALLOWED_TRANSITIONS.get(self._state, set())
        if new_state not in allowed and new_state not in {
            ControllerState.FAILED,
            ControllerState.ABORTED,
            ControllerState.STOP_FAILED,
        }:
            raise ValueError(f"Invalid transition {self._state} -> {new_state}")
        self._state = new_state
        self._reason = reason
        for cb in list(self._callbacks):
            try:
                cb(self._state, self._reason)
            except Exception:
                continue
        return self._state

DoubleCtrlCStateMachine dataclass

DoubleCtrlCStateMachine(state=RunInterruptState.RUNNING)

Double-press Ctrl+C confirmation state machine.

Policy: the second Ctrl+C confirms stop at any time after the first press, until the run finishes (no timeout window).

Attributes
state class-attribute instance-attribute
state = RUNNING
Functions
mark_finished
mark_finished()

Transition to FINISHED and disable further confirmation handling.

Source code in lb_controller/engine/interrupts.py
60
61
62
def mark_finished(self) -> None:
    """Transition to FINISHED and disable further confirmation handling."""
    self.state = RunInterruptState.FINISHED
on_sigint
on_sigint(*, run_active)

Process a SIGINT and return what the caller should do next.

Source code in lb_controller/engine/interrupts.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def on_sigint(self, *, run_active: bool) -> SigintDecision:
    """Process a SIGINT and return what the caller should do next."""
    if not run_active or self.state == RunInterruptState.FINISHED:
        return SigintDecision.DELEGATE

    if self.state == RunInterruptState.RUNNING:
        self.state = RunInterruptState.STOP_ARMED
        return SigintDecision.WARN_ARM

    if self.state == RunInterruptState.STOP_ARMED:
        self.state = RunInterruptState.STOPPING
        return SigintDecision.REQUEST_STOP

    # In STOPPING we swallow further Ctrl+C while the run is active to avoid
    # tearing down the process; the caller can still expose an explicit exit.
    return SigintDecision.IGNORE
reset_arm
reset_arm()

Return to RUNNING from STOP_ARMED after a timeout window.

Source code in lb_controller/engine/interrupts.py
64
65
66
67
def reset_arm(self) -> None:
    """Return to RUNNING from STOP_ARMED after a timeout window."""
    if self.state == RunInterruptState.STOP_ARMED:
        self.state = RunInterruptState.RUNNING

ExecutionResult dataclass

ExecutionResult(rc, status, stats=dict())

Result of a single Ansible playbook execution.

Attributes
rc instance-attribute
rc
stats class-attribute instance-attribute
stats = field(default_factory=dict)
status instance-attribute
status
success property
success

Return True when the playbook completed successfully.

HostConnectivityResult dataclass

HostConnectivityResult(name, address, reachable, latency_ms=None, error_message=None)

Result of a connectivity check for a single host.

Attributes
address instance-attribute
address
error_message class-attribute instance-attribute
error_message = None
latency_ms class-attribute instance-attribute
latency_ms = None
name instance-attribute
name
reachable instance-attribute
reachable

InventorySpec dataclass

InventorySpec(hosts, inventory_path=None)

Inventory specification for Ansible execution.

Attributes
hosts instance-attribute
hosts
inventory_path class-attribute instance-attribute
inventory_path = None

LogSink

LogSink(journal, journal_path, log_file=None)

Persist events and mirror them to the run journal and optional log file.

Source code in lb_controller/services/journal.py
181
182
183
184
185
186
187
188
189
190
def __init__(
    self, journal: RunJournal, journal_path: Path, log_file: Path | None = None
):
    self.journal = journal
    self.journal_path = journal_path
    self.log_file = log_file
    self._log_handle = None
    if log_file:
        log_file.parent.mkdir(parents=True, exist_ok=True)
        self._log_handle = log_file.open("a", encoding="utf-8")
Attributes
journal instance-attribute
journal = journal
journal_path instance-attribute
journal_path = journal_path
log_file instance-attribute
log_file = log_file
Functions
close
close()
Source code in lb_controller/services/journal.py
201
202
203
204
205
206
207
def close(self) -> None:
    if self._log_handle:
        try:
            self._log_handle.close()
        except Exception:
            pass
        self._log_handle = None
emit
emit(event)

Handle a single event.

Source code in lb_controller/services/journal.py
192
193
194
195
def emit(self, event: RunEvent) -> None:
    """Handle a single event."""
    self._update_journal(event)
    self._write_log(event)
emit_many
emit_many(events)
Source code in lb_controller/services/journal.py
197
198
199
def emit_many(self, events: Iterable[RunEvent]) -> None:
    for ev in events:
        self.emit(ev)

RemoteExecutor

Bases: Protocol

Protocol for remote execution engines.

Functions
run_playbook
run_playbook(playbook_path, inventory, extravars=None, tags=None, limit_hosts=None, *, cancellable=True)

Execute a playbook and return the result.

Source code in lb_controller/models/types.py
54
55
56
57
58
59
60
61
62
63
64
65
def run_playbook(
    self,
    playbook_path: Path,
    inventory: InventorySpec,
    extravars: Optional[Dict[str, Any]] = None,
    tags: Optional[List[str]] = None,
    limit_hosts: Optional[List[str]] = None,
    *,
    cancellable: bool = True,
) -> ExecutionResult:
    """Execute a playbook and return the result."""
    raise NotImplementedError

RunCatalogService

RunCatalogService(output_dir, report_dir=None, data_export_dir=None)

Discover run directories and their basic metadata.

Source code in lb_controller/services/run_catalog_service.py
20
21
22
23
24
25
26
27
28
def __init__(
    self,
    output_dir: Path,
    report_dir: Optional[Path] = None,
    data_export_dir: Optional[Path] = None,
) -> None:
    self.output_dir = output_dir.resolve()
    self.report_dir = report_dir.resolve() if report_dir else None
    self.data_export_dir = data_export_dir.resolve() if data_export_dir else None
Attributes
data_export_dir instance-attribute
data_export_dir = resolve() if data_export_dir else None
output_dir instance-attribute
output_dir = resolve()
report_dir instance-attribute
report_dir = resolve() if report_dir else None
Functions
get_run
get_run(run_id)

Return RunInfo for the given run_id if present.

Source code in lb_controller/services/run_catalog_service.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_run(self, run_id: str) -> Optional[RunInfo]:
    """Return RunInfo for the given run_id if present."""
    output_root = self._resolve_output_root(run_id)
    if output_root is None:
        return None

    report_root = self._resolve_optional_root(self.report_dir, run_id)
    export_root = self._resolve_optional_root(self.data_export_dir, run_id)

    journal_path = output_root / "run_journal.json"
    journal_data = self._load_journal(journal_path)
    created_at = self._extract_created_at(journal_data)
    hosts, workloads = self._extract_hosts_workloads(journal_data)

    if not hosts:
        hosts = self._fallback_hosts(output_root)
    if not workloads and hosts:
        workloads = self._fallback_workloads(output_root, hosts)

    return RunInfo(
        run_id=run_id,
        output_root=output_root,
        report_root=self._existing_or_none(report_root),
        data_export_root=self._existing_or_none(export_root),
        hosts=sorted(hosts),
        workloads=sorted(workloads),
        created_at=created_at,
        journal_path=journal_path if journal_path.exists() else None,
    )
list_runs
list_runs()

Return all runs found under output_dir, newest first when possible.

Source code in lb_controller/services/run_catalog_service.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def list_runs(self) -> List[RunInfo]:
    """Return all runs found under output_dir, newest first when possible."""
    if not self.output_dir.exists():
        return []

    runs: List[RunInfo] = []
    for run_id in self._iter_run_ids():
        info = self.get_run(run_id)
        if info:
            runs.append(info)

    runs.sort(
        key=lambda r: r.created_at.timestamp() if r.created_at else 0.0,
        reverse=True,
    )
    return runs

RunExecutionSummary dataclass

RunExecutionSummary(run_id, per_host_output, phases, success, output_root, report_root, data_export_root, controller_state=None, cleanup_allowed=False)

Summary of a complete controller run.

Attributes
cleanup_allowed class-attribute instance-attribute
cleanup_allowed = False
controller_state class-attribute instance-attribute
controller_state = None
data_export_root instance-attribute
data_export_root
output_root instance-attribute
output_root
per_host_output instance-attribute
per_host_output
phases instance-attribute
phases
report_root instance-attribute
report_root
run_id instance-attribute
run_id
success instance-attribute
success

RunInterruptState

Bases: str, Enum

Explicit run/interrupt lifecycle states.

Attributes
FINISHED class-attribute instance-attribute
FINISHED = 'finished'
RUNNING class-attribute instance-attribute
RUNNING = 'running'
STOPPING class-attribute instance-attribute
STOPPING = 'stopping'
STOP_ARMED class-attribute instance-attribute
STOP_ARMED = 'stop_armed'

RunJournal dataclass

RunJournal(run_id, tasks=dict(), metadata=dict())

Contains the entire execution plan and state.

Attributes
metadata class-attribute instance-attribute
metadata = field(default_factory=dict)
run_id instance-attribute
run_id
tasks class-attribute instance-attribute
tasks = field(default_factory=dict)
Functions
add_task
add_task(task)
Source code in lb_controller/services/journal.py
64
65
def add_task(self, task: TaskState) -> None:
    self.tasks[task.key] = task
get_task
get_task(host, workload, rep)

Return a specific task or None when absent.

Source code in lb_controller/services/journal.py
73
74
75
76
def get_task(self, host: str, workload: str, rep: int) -> Optional[TaskState]:
    """Return a specific task or None when absent."""
    key = f"{host}::{workload}::{rep}"
    return self.tasks.get(key)
get_tasks_by_host
get_tasks_by_host(host)
Source code in lb_controller/services/journal.py
67
68
69
70
71
def get_tasks_by_host(self, host: str) -> List[TaskState]:
    return sorted(
        [t for t in self.tasks.values() if t.host == host],
        key=lambda x: x.repetition,
    )
initialize classmethod
initialize(run_id, config, test_types)

Factory to create a new journal based on configuration.

Source code in lb_controller/services/journal.py
54
55
56
57
58
59
60
61
62
@classmethod
def initialize(
    cls, run_id: str, config: Any, test_types: List[str]
) -> "RunJournal":
    """Factory to create a new journal based on configuration."""
    journal = cls(run_id=run_id)
    journal.metadata = _build_metadata(config)
    _populate_tasks(journal, config, test_types)
    return journal
load classmethod
load(path, config=None)

Load journal from disk, optionally validating against a config.

Source code in lb_controller/services/journal.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@classmethod
def load(cls, path: Path, config: Any | None = None) -> "RunJournal":
    """Load journal from disk, optionally validating against a config."""
    with open(path, "r") as f:
        data = json.load(f)

    metadata = data.get("metadata", {}) or {}
    _validate_config(metadata, config)
    tasks_data = data.pop("tasks", [])
    journal = cls(**data)
    journal.tasks = _load_tasks(tasks_data)
    if not getattr(journal, "metadata", None):
        journal.metadata = metadata
    return journal
rehydrate_config
rehydrate_config()

Return a BenchmarkConfig reconstructed from the stored config_dump.

Source code in lb_controller/services/journal.py
165
166
167
168
169
170
171
172
173
174
175
def rehydrate_config(self) -> BenchmarkConfig | None:
    """
    Return a BenchmarkConfig reconstructed from the stored config_dump.
    """
    cfg_dump = (self.metadata or {}).get("config_dump")
    if not cfg_dump:
        return None
    try:
        return BenchmarkConfig.model_validate(cfg_dump)
    except Exception:
        return None
save
save(path)

Persist journal to disk.

Source code in lb_controller/services/journal.py
136
137
138
139
140
141
142
143
144
145
146
147
148
def save(self, path: Path) -> None:
    """Persist journal to disk."""
    data = asdict(self)
    # Ensure path exists
    if isinstance(path, str):
        path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)

    serialized = data.copy()
    serialized["tasks"] = [asdict(task) for task in self.tasks.values()]

    with open(path, "w") as f:
        json.dump(serialized, f, indent=2, default=str)
should_run
should_run(host, workload, rep, *, allow_skipped=False)

Determines if a task should be executed. Returns True if task is PENDING or FAILED (and we want to retry). For now, we skip COMPLETED tasks.

Source code in lb_controller/services/journal.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def should_run(
    self,
    host: str,
    workload: str,
    rep: int,
    *,
    allow_skipped: bool = False,
) -> bool:
    """
    Determines if a task should be executed.
    Returns True if task is PENDING or FAILED (and we want to retry).
    For now, we skip COMPLETED tasks.
    """
    task = self.get_task(host, workload, rep)
    if task:
        if allow_skipped:
            return task.status != RunStatus.COMPLETED
        return task.status not in (RunStatus.COMPLETED, RunStatus.SKIPPED)
    # If task not found, it's technically new, so run it (this shouldn't
    # happen if initialized correctly).
    return True
update_task
update_task(host, workload, rep, status, action='', error=None, error_type=None, error_context=None)
Source code in lb_controller/services/journal.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def update_task(
    self,
    host: str,
    workload: str,
    rep: int,
    status: str,
    action: str = "",
    error: Optional[str] = None,
    error_type: Optional[str] = None,
    error_context: Optional[Dict[str, Any]] = None,
) -> None:
    task = self.get_task(host, workload, rep)
    if not task:
        return
    now_ts = datetime.now().timestamp()
    self._update_task_timings(task, status, now_ts)
    task.status = status
    task.timestamp = now_ts
    if action:
        task.current_action = action
    if error:
        task.error = error
    if error_type:
        task.error_type = error_type
    if error_context:
        task.error_context = error_context

RunLifecycle dataclass

RunLifecycle(phase=RunPhase.IDLE, stop_stage=StopStage.IDLE)

Tracks the lifecycle phase and stop intent.

Attributes
phase class-attribute instance-attribute
phase = IDLE
stop_stage class-attribute instance-attribute
stop_stage = IDLE
Functions
arm_stop
arm_stop()
Source code in lb_controller/engine/lifecycle.py
50
51
52
def arm_stop(self) -> None:
    if self.stop_stage == StopStage.IDLE:
        self.stop_stage = StopStage.ARMED
finish
finish()
Source code in lb_controller/engine/lifecycle.py
42
43
44
45
46
47
48
def finish(self) -> None:
    self.phase = RunPhase.FINISHED
    if self.stop_stage not in (StopStage.FAILED, StopStage.STOPPED):
        if self.stop_stage != StopStage.IDLE:
            self.stop_stage = StopStage.STOPPED
        else:
            self.stop_stage = StopStage.IDLE
mark_failed
mark_failed()
Source code in lb_controller/engine/lifecycle.py
66
67
def mark_failed(self) -> None:
    self.stop_stage = StopStage.FAILED
mark_interrupting_setup
mark_interrupting_setup()
Source code in lb_controller/engine/lifecycle.py
54
55
def mark_interrupting_setup(self) -> None:
    self.stop_stage = StopStage.INTERRUPTING_SETUP
mark_interrupting_teardown
mark_interrupting_teardown()
Source code in lb_controller/engine/lifecycle.py
63
64
def mark_interrupting_teardown(self) -> None:
    self.stop_stage = StopStage.INTERRUPTING_TEARDOWN
mark_stopped
mark_stopped()
Source code in lb_controller/engine/lifecycle.py
69
70
def mark_stopped(self) -> None:
    self.stop_stage = StopStage.STOPPED
mark_teardown
mark_teardown()
Source code in lb_controller/engine/lifecycle.py
60
61
def mark_teardown(self) -> None:
    self.stop_stage = StopStage.TEARDOWN
mark_waiting_runners
mark_waiting_runners()
Source code in lb_controller/engine/lifecycle.py
57
58
def mark_waiting_runners(self) -> None:
    self.stop_stage = StopStage.WAITING_FOR_RUNNERS
start_phase
start_phase(phase)
Source code in lb_controller/engine/lifecycle.py
39
40
def start_phase(self, phase: RunPhase) -> None:
    self.phase = phase

RunPhase

Bases: Enum

High-level run phases.

Attributes
FINISHED class-attribute instance-attribute
FINISHED = auto()
GLOBAL_SETUP class-attribute instance-attribute
GLOBAL_SETUP = auto()
GLOBAL_TEARDOWN class-attribute instance-attribute
GLOBAL_TEARDOWN = auto()
IDLE class-attribute instance-attribute
IDLE = auto()
WORKLOADS class-attribute instance-attribute
WORKLOADS = auto()

RunStatus

Attributes
COMPLETED class-attribute instance-attribute
COMPLETED = 'COMPLETED'
FAILED class-attribute instance-attribute
FAILED = 'FAILED'
PENDING class-attribute instance-attribute
PENDING = 'PENDING'
RUNNING class-attribute instance-attribute
RUNNING = 'RUNNING'
SKIPPED class-attribute instance-attribute
SKIPPED = 'SKIPPED'

SigintDecision

Bases: str, Enum

Decision returned by the SIGINT state machine.

Attributes
DELEGATE class-attribute instance-attribute
DELEGATE = 'delegate'
IGNORE class-attribute instance-attribute
IGNORE = 'ignore'
REQUEST_STOP class-attribute instance-attribute
REQUEST_STOP = 'request_stop'
WARN_ARM class-attribute instance-attribute
WARN_ARM = 'warn_arm'

SigintDoublePressHandler

SigintDoublePressHandler(*, state_machine, run_active, on_first_sigint, on_confirmed_sigint)

Bases: AbstractContextManager['SigintDoublePressHandler']

Installs a SIGINT handler that implements double-press confirmation.

Source code in lb_controller/engine/interrupts.py
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    *,
    state_machine: DoubleCtrlCStateMachine,
    run_active: Callable[[], bool],
    on_first_sigint: Callable[[], None],
    on_confirmed_sigint: Callable[[], None],
) -> None:
    self._sm = state_machine
    self._run_active = run_active
    self._on_first = on_first_sigint
    self._on_confirmed = on_confirmed_sigint
    self._prev_handler: Any = None

StopCoordinator

StopCoordinator(expected_runners, stop_timeout=30.0, run_id=None)

Parameters:

Name Type Description Default
expected_runners Set[str]

Set of hostnames expected to confirm stop.

required
stop_timeout float

Seconds to wait for confirmations.

30.0
run_id str | None

Run identifier used to correlate stop events.

None
Source code in lb_controller/engine/stops.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    expected_runners: Set[str],
    stop_timeout: float = 30.0,
    run_id: str | None = None,
):
    """
    Args:
        expected_runners: Set of hostnames expected to confirm stop.
        stop_timeout: Seconds to wait for confirmations.
        run_id: Run identifier used to correlate stop events.
    """
    self.expected_runners = expected_runners
    self.confirmed_runners: Set[str] = set()
    self.state = StopState.IDLE
    self.stop_timeout = stop_timeout
    self.start_time: float | None = None
    self.run_id = run_id
Attributes
confirmed_runners instance-attribute
confirmed_runners = set()
expected_runners instance-attribute
expected_runners = expected_runners
run_id instance-attribute
run_id = run_id
start_time instance-attribute
start_time = None
state instance-attribute
state = IDLE
stop_timeout instance-attribute
stop_timeout = stop_timeout
Functions
can_proceed_to_teardown
can_proceed_to_teardown()
Source code in lb_controller/engine/stops.py
108
109
def can_proceed_to_teardown(self) -> bool:
    return self.state == StopState.TEARDOWN_READY
check_timeout
check_timeout()

Check if the stop protocol has timed out.

Source code in lb_controller/engine/stops.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def check_timeout(self) -> None:
    """Check if the stop protocol has timed out."""
    if self.state != StopState.STOPPING_WORKLOADS:
        return

    if self.start_time and (time.time() - self.start_time > self.stop_timeout):
        missing = self.expected_runners - self.confirmed_runners
        logger.error(
            "Stop protocol timed out. Missing confirmations from: %s",
            missing,
        )
        self.state = StopState.STOP_FAILED
initiate_stop
initiate_stop()

Transition to STOPPING_WORKLOADS.

Source code in lb_controller/engine/stops.py
47
48
49
50
51
52
53
def initiate_stop(self) -> None:
    """Transition to STOPPING_WORKLOADS."""
    if self.state != StopState.IDLE:
        return
    logger.info("Initiating distributed stop protocol.")
    self.state = StopState.STOPPING_WORKLOADS
    self.start_time = time.time()
process_event
process_event(event)

Process incoming events to check for stop confirmation.

We accept 'stopped', 'failed', or 'cancelled' as confirmation that the runner has ceased execution for the current workload.

Source code in lb_controller/engine/stops.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def process_event(self, event: RunEvent) -> None:
    """
    Process incoming events to check for stop confirmation.

    We accept 'stopped', 'failed', or 'cancelled' as confirmation that
    the runner has ceased execution for the current workload.
    """
    if self.state != StopState.STOPPING_WORKLOADS:
        return

    if (
        self.run_id
        and getattr(event, "run_id", None)
        and event.run_id != self.run_id
    ):
        return

    if event.host not in self.expected_runners:
        return

    # Check for status indicating the workload has stopped/aborted
    # Note: 'failed' is often emitted on interrupt. 'stopped' is ideal.
    if event.status.lower() in ("stopped", "failed", "cancelled", "done"):
        if event.host not in self.confirmed_runners:
            logger.info(
                "Stop confirmed for host: %s (status=%s)",
                event.host,
                event.status,
            )
            self.confirmed_runners.add(event.host)
            self._check_completion()

StopStage

Bases: Enum

Stages of a coordinated stop.

Attributes
ARMED class-attribute instance-attribute
ARMED = auto()
FAILED class-attribute instance-attribute
FAILED = auto()
IDLE class-attribute instance-attribute
IDLE = auto()
INTERRUPTING_SETUP class-attribute instance-attribute
INTERRUPTING_SETUP = auto()
INTERRUPTING_TEARDOWN class-attribute instance-attribute
INTERRUPTING_TEARDOWN = auto()
STOPPED class-attribute instance-attribute
STOPPED = auto()
TEARDOWN class-attribute instance-attribute
TEARDOWN = auto()
WAITING_FOR_RUNNERS class-attribute instance-attribute
WAITING_FOR_RUNNERS = auto()

StopState

Bases: Enum

Attributes
IDLE class-attribute instance-attribute
IDLE = auto()
STOPPING_WORKLOADS class-attribute instance-attribute
STOPPING_WORKLOADS = auto()
STOP_FAILED class-attribute instance-attribute
STOP_FAILED = auto()
TEARDOWN_READY class-attribute instance-attribute
TEARDOWN_READY = auto()

TaskState dataclass

TaskState(host, workload, repetition, status=RunStatus.PENDING, current_action='', timestamp=(lambda: datetime.now().timestamp())(), error=None, error_type=None, error_context=None, started_at=None, finished_at=None, duration_seconds=None)

Represents a single atomic unit of work (Host + Workload + Repetition).

Attributes
current_action class-attribute instance-attribute
current_action = ''
duration_seconds class-attribute instance-attribute
duration_seconds = None
error class-attribute instance-attribute
error = None
error_context class-attribute instance-attribute
error_context = None
error_type class-attribute instance-attribute
error_type = None
finished_at class-attribute instance-attribute
finished_at = None
host instance-attribute
host
key property
key
repetition instance-attribute
repetition
started_at class-attribute instance-attribute
started_at = None
status class-attribute instance-attribute
status = PENDING
timestamp class-attribute instance-attribute
timestamp = field(default_factory=lambda: timestamp())
workload instance-attribute
workload

Functions

apply_playbook_defaults

apply_playbook_defaults(config)

Ensure remote_execution playbook paths point to controller Ansible assets.

Source code in lb_controller/services/paths.py
55
56
57
58
59
60
61
62
63
def apply_playbook_defaults(config: BenchmarkConfig) -> None:
    """Ensure remote_execution playbook paths point to controller Ansible assets."""
    rex = config.remote_execution
    for attr, fname in PLAYBOOK_FILES.items():
        current = getattr(rex, attr)
        if current is not None and Path(current).exists():
            continue
        candidate = ANSIBLE_ROOT / "playbooks" / fname
        setattr(rex, attr, candidate)

backfill_timings_from_results

backfill_timings_from_results(journal, journal_path, hosts, workload, per_host_output, refresh=None)

Backfill per-repetition timing data from all *_results.json artifacts.

Source code in lb_controller/services/journal_sync.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def backfill_timings_from_results(
    journal: RunJournal,
    journal_path: Path,
    hosts: List[RemoteHostConfig],
    workload: str,
    per_host_output: Dict[str, Path],
    refresh: Optional[callable] = None,
) -> None:
    """Backfill per-repetition timing data from all *_results.json artifacts."""
    updated = False
    for host in hosts:
        entries = _collect_results(per_host_output.get(host.name), workload)
        for entry in entries:
            updated |= _apply_result_entry(journal, host.name, workload, entry)
    if updated:
        journal.save(journal_path)
        _refresh_journal(refresh)

pending_exists

pending_exists(journal, tests, hosts, repetitions, *, allow_skipped=False)

Return True if any repetition remains to run.

Source code in lb_controller/models/pending.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
def pending_exists(
    journal: RunJournal,
    tests: Iterable[str],
    hosts: Sequence[RemoteHostConfig],
    repetitions: int,
    *,
    allow_skipped: bool = False,
) -> bool:
    """Return True if any repetition remains to run."""
    return any(
        journal.should_run(host.name, test_name, rep, allow_skipped=allow_skipped)
        for host in hosts
        for test_name in tests
        for rep in range(1, repetitions + 1)
    )

prepare_run_dirs

prepare_run_dirs(config, run_id)

Create base directories for a run.

Source code in lb_controller/services/paths.py
26
27
28
29
30
31
32
33
34
35
36
37
38
def prepare_run_dirs(
    config: BenchmarkConfig,
    run_id: str,
) -> tuple[Path, Path, Path]:
    """Create base directories for a run."""
    output_root = (config.output_dir / run_id).resolve()
    report_root = (config.report_dir / run_id).resolve()
    data_export_root = (config.data_export_dir / run_id).resolve()

    # Only create output_root; report/export dirs are created on demand by analytics.
    output_root.mkdir(parents=True, exist_ok=True)

    return output_root, report_root, data_export_root

Benchmark controller

BenchmarkController

BenchmarkController(config, options=None)

Controller coordinating remote benchmark runs.

Source code in lb_controller/engine/controller.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def __init__(
    self,
    config: BenchmarkConfig,
    options: ControllerOptions | None = None,
) -> None:
    self.config = config
    from lb_controller.services.paths import apply_playbook_defaults
    from lb_plugins.api import create_registry, merge_plugin_assets

    apply_playbook_defaults(self.config)
    merge_plugin_assets(self.config, create_registry())
    self._options = options or ControllerOptions()
    self.output_formatter = self._options.output_formatter
    self.stop_token = self._options.stop_token
    self._stop_timeout_s = self._options.stop_timeout_s
    self.lifecycle = RunLifecycle()
    self.state_machine = self._options.state_machine or ControllerStateMachine()
    self.executor = self._options.build_executor()
    self._use_progress_stream = True
    self._journal_refresh = self._options.journal_refresh

    self.services = ControllerServices(
        config=self.config,
        executor=self.executor,
        output_formatter=self.output_formatter,
        stop_token=self.stop_token,
        lifecycle=self.lifecycle,
        journal_refresh=self._journal_refresh,
        use_progress_stream=self._use_progress_stream,
    )

    self._ui = UINotifier(
        output_formatter=self.output_formatter,
        journal_refresh=self._journal_refresh,
    )
    self.workload_runner = WorkloadRunner(
        config=self.config,
        ui_notifier=self._ui,
    )
    self.teardown_service = TeardownService()
    self._current_session: Optional[RunSession] = None
    self._session_builder = RunSessionBuilder(
        config=self.config,
        state_machine=self.state_machine,
        stop_timeout_s=self._stop_timeout_s,
        journal_refresh=self._ui.refresh_journal,
        collector_packages=self._collector_apt_packages,
    )
    self._orchestrator = RunOrchestrator(
        services=self.services,
        workload_runner=self.workload_runner,
        teardown_service=self.teardown_service,
        ui_notifier=self._ui,
    )

Attributes

config instance-attribute

config = config

coordinator property

coordinator

executor instance-attribute

executor = build_executor()

lifecycle instance-attribute

lifecycle = RunLifecycle()

output_formatter instance-attribute

output_formatter = output_formatter

services instance-attribute

services = ControllerServices(config=config, executor=executor, output_formatter=output_formatter, stop_token=stop_token, lifecycle=lifecycle, journal_refresh=_journal_refresh, use_progress_stream=_use_progress_stream)

state_machine instance-attribute

state_machine = state_machine or ControllerStateMachine()

stop_token instance-attribute

stop_token = stop_token

teardown_service instance-attribute

teardown_service = TeardownService()

workload_runner instance-attribute

workload_runner = WorkloadRunner(config=config, ui_notifier=_ui)

Functions

on_event

on_event(event)

Process an event for stop coordination.

Source code in lb_controller/engine/controller.py
87
88
89
90
def on_event(self, event: RunEvent) -> None:
    """Process an event for stop coordination."""
    if self.coordinator:
        self.coordinator.process_event(event)

run

run(test_types, run_id=None, journal=None, resume=False, journal_path=None)

Execute the configured benchmarks on remote hosts.

Parameters:

Name Type Description Default
test_types List[str]

List of benchmark identifiers to execute.

required
run_id Optional[str]

Optional run identifier. If not provided, a timestamp-based id is generated.

None
journal Optional[RunJournal]

Optional pre-loaded journal used for resume flows.

None
resume bool

When True, reuse the provided journal instead of creating a new one.

False
journal_path Optional[Path]

Optional override for where the journal is persisted.

None
Source code in lb_controller/engine/controller.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def run(
    self,
    test_types: List[str],
    run_id: Optional[str] = None,
    journal: Optional[RunJournal] = None,
    resume: bool = False,
    journal_path: Optional[Path] = None,
) -> RunExecutionSummary:
    """
    Execute the configured benchmarks on remote hosts.

    Args:
        test_types: List of benchmark identifiers to execute.
        run_id: Optional run identifier. If not provided, a timestamp-based
            id is generated.
        journal: Optional pre-loaded journal used for resume flows.
        resume: When True, reuse the provided journal instead of creating a new one.
        journal_path: Optional override for where the journal is persisted.
    """
    if not self.config.remote_hosts:
        raise ValueError("At least one remote host must be configured.")
    if resume and journal is None:
        raise ValueError("Resume requested without a journal instance.")

    session = self._session_builder.build(
        test_types=test_types,
        run_id=run_id,
        journal=journal,
        journal_path=journal_path,
    )
    self._current_session = session
    return self._orchestrator.run(session, resume_requested=resume)

Ansible executor

AnsibleRunnerExecutor

AnsibleRunnerExecutor(private_data_dir=None, runner_fn=None, stream_output=False, output_callback=None, stop_token=None)

Bases: RemoteExecutor

Remote executor implemented with ansible-runner.

Initialize the executor.

Parameters:

Name Type Description Default
private_data_dir Optional[Path]

Directory used by ansible-runner.

None
runner_fn Optional[Callable[..., Any]]

Optional runner callable for testing. Defaults to ansible_runner.run when not provided.

None
stream_output bool

When True, stream Ansible stdout events to the local process (useful for visibility in long-running tasks).

False
output_callback Optional[Callable[[str, str], None]]

Optional callback to handle stdout stream. Signature: (text: str, end: str) -> None

None
Source code in lb_controller/adapters/ansible_runner.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def __init__(
    self,
    private_data_dir: Optional[Path] = None,
    runner_fn: Optional[Callable[..., Any]] = None,
    stream_output: bool = False,
    output_callback: Optional[Callable[[str, str], None]] = None,
    stop_token: StopToken | None = None,
):
    """
    Initialize the executor.

    Args:
        private_data_dir: Directory used by ansible-runner.
        runner_fn: Optional runner callable for testing. Defaults to
            ansible_runner.run when not provided.
        stream_output: When True, stream Ansible stdout events to the local
            process (useful for visibility in long-running tasks).
        output_callback: Optional callback to handle stdout stream.
                         Signature: (text: str, end: str) -> None
    """
    self.private_data_dir = private_data_dir or Path(".ansible_runner")
    self.private_data_dir.mkdir(parents=True, exist_ok=True)
    self.event_log_path = self.private_data_dir / "lb_events.jsonl"
    self._runner_fn = runner_fn
    self.stream_output = stream_output
    self.stop_token = stop_token
    self._active_label: str | None = None
    # Force Ansible temp into a writable location inside the runner dir to avoid
    # host-level permission issues.
    self.local_tmp = self.private_data_dir / "tmp"
    self.local_tmp.mkdir(parents=True, exist_ok=True)
    if stream_output and output_callback is None:
        # Default to streaming to stdout when caller requests streaming but
        # doesn't provide a handler.
        def _default_cb(text: str, end: str = "") -> None:
            sys.stdout.write(text + end)
            sys.stdout.flush()

        self.output_callback = _default_cb
    else:
        self.output_callback = output_callback
    self._inventory_writer = InventoryWriter(self.private_data_dir)
    self._env_builder = AnsibleEnvBuilder(
        private_data_dir=self.private_data_dir,
        local_tmp=self.local_tmp,
        event_log_path=self.event_log_path,
        ansible_root=ANSIBLE_ROOT,
    )
    self._extravars_writer = ExtravarsWriter(self.private_data_dir)
    self._command_builder = PlaybookCommandBuilder()
    self._process_runner = PlaybookProcessRunner(
        private_data_dir=self.private_data_dir,
        stream_output=self.stream_output,
        output_callback=self.output_callback,
        stop_token=self.stop_token,
    )

Attributes

event_log_path instance-attribute

event_log_path = private_data_dir / 'lb_events.jsonl'

is_running property

is_running

Return True when a playbook is in-flight.

local_tmp instance-attribute

local_tmp = private_data_dir / 'tmp'

output_callback instance-attribute

output_callback = _default_cb

private_data_dir instance-attribute

private_data_dir = private_data_dir or Path('.ansible_runner')

stop_token instance-attribute

stop_token = stop_token

stream_output instance-attribute

stream_output = stream_output

Functions

interrupt

interrupt()

Request interruption of the current playbook execution.

Source code in lb_controller/adapters/ansible_runner.py
177
178
179
180
def interrupt(self) -> None:
    """Request interruption of the current playbook execution."""
    self._process_runner.interrupt()
    self._active_label = None

run_playbook

run_playbook(playbook_path, inventory, extravars=None, tags=None, limit_hosts=None, *, cancellable=True)

Execute a playbook using ansible-runner.

Source code in lb_controller/adapters/ansible_runner.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def run_playbook(
    self,
    playbook_path: Path,
    inventory: InventorySpec,
    extravars: Optional[Dict[str, Any]] = None,
    tags: Optional[List[str]] = None,
    limit_hosts: Optional[List[str]] = None,
    *,
    cancellable: bool = True,
) -> ExecutionResult:
    """Execute a playbook using ansible-runner."""
    self._process_runner.clear_interrupt()
    stop_result = self._maybe_stop(cancellable)
    if stop_result is not None:
        return stop_result
    if not playbook_path.exists():
        raise FileNotFoundError(f"Playbook not found: {playbook_path}")

    inventory_path = self._inventory_writer.prepare(inventory)
    runner_fn = self._runner_fn or self._import_runner()

    # Ensure playbook path is absolute so runner can find it
    # regardless of private_data_dir location
    abs_playbook_path = playbook_path.resolve()

    label = abs_playbook_path.name
    logger.info(
        "Running playbook %s against %d host(s)", label, len(inventory.hosts)
    )
    self._active_label = label

    merged_extravars = self._merge_extravars(extravars, inventory_path)
    envvars = self._env_builder.build()

    try:
        result = self._execute_playbook(
            runner_fn=runner_fn,
            abs_playbook_path=abs_playbook_path,
            inventory_path=inventory_path,
            extravars=merged_extravars,
            tags=tags,
            envvars=envvars,
            limit_hosts=limit_hosts,
            cancellable=cancellable,
        )
    finally:
        self._active_label = None

    return self._finalize_result(playbook_path, result)

Catalog services

RunCatalogService

RunCatalogService(output_dir, report_dir=None, data_export_dir=None)

Discover run directories and their basic metadata.

Source code in lb_controller/services/run_catalog_service.py
20
21
22
23
24
25
26
27
28
def __init__(
    self,
    output_dir: Path,
    report_dir: Optional[Path] = None,
    data_export_dir: Optional[Path] = None,
) -> None:
    self.output_dir = output_dir.resolve()
    self.report_dir = report_dir.resolve() if report_dir else None
    self.data_export_dir = data_export_dir.resolve() if data_export_dir else None

Attributes

data_export_dir instance-attribute

data_export_dir = resolve() if data_export_dir else None

output_dir instance-attribute

output_dir = resolve()

report_dir instance-attribute

report_dir = resolve() if report_dir else None

Functions

get_run

get_run(run_id)

Return RunInfo for the given run_id if present.

Source code in lb_controller/services/run_catalog_service.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def get_run(self, run_id: str) -> Optional[RunInfo]:
    """Return RunInfo for the given run_id if present."""
    output_root = self._resolve_output_root(run_id)
    if output_root is None:
        return None

    report_root = self._resolve_optional_root(self.report_dir, run_id)
    export_root = self._resolve_optional_root(self.data_export_dir, run_id)

    journal_path = output_root / "run_journal.json"
    journal_data = self._load_journal(journal_path)
    created_at = self._extract_created_at(journal_data)
    hosts, workloads = self._extract_hosts_workloads(journal_data)

    if not hosts:
        hosts = self._fallback_hosts(output_root)
    if not workloads and hosts:
        workloads = self._fallback_workloads(output_root, hosts)

    return RunInfo(
        run_id=run_id,
        output_root=output_root,
        report_root=self._existing_or_none(report_root),
        data_export_root=self._existing_or_none(export_root),
        hosts=sorted(hosts),
        workloads=sorted(workloads),
        created_at=created_at,
        journal_path=journal_path if journal_path.exists() else None,
    )

list_runs

list_runs()

Return all runs found under output_dir, newest first when possible.

Source code in lb_controller/services/run_catalog_service.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def list_runs(self) -> List[RunInfo]:
    """Return all runs found under output_dir, newest first when possible."""
    if not self.output_dir.exists():
        return []

    runs: List[RunInfo] = []
    for run_id in self._iter_run_ids():
        info = self.get_run(run_id)
        if info:
            runs.append(info)

    runs.sort(
        key=lambda r: r.created_at.timestamp() if r.created_at else 0.0,
        reverse=True,
    )
    return runs

Journals and execution types

RunJournal dataclass

RunJournal(run_id, tasks=dict(), metadata=dict())

Contains the entire execution plan and state.

Attributes

metadata class-attribute instance-attribute

metadata = field(default_factory=dict)

run_id instance-attribute

run_id

tasks class-attribute instance-attribute

tasks = field(default_factory=dict)

Functions

add_task

add_task(task)
Source code in lb_controller/services/journal.py
64
65
def add_task(self, task: TaskState) -> None:
    self.tasks[task.key] = task

get_task

get_task(host, workload, rep)

Return a specific task or None when absent.

Source code in lb_controller/services/journal.py
73
74
75
76
def get_task(self, host: str, workload: str, rep: int) -> Optional[TaskState]:
    """Return a specific task or None when absent."""
    key = f"{host}::{workload}::{rep}"
    return self.tasks.get(key)

get_tasks_by_host

get_tasks_by_host(host)
Source code in lb_controller/services/journal.py
67
68
69
70
71
def get_tasks_by_host(self, host: str) -> List[TaskState]:
    return sorted(
        [t for t in self.tasks.values() if t.host == host],
        key=lambda x: x.repetition,
    )

initialize classmethod

initialize(run_id, config, test_types)

Factory to create a new journal based on configuration.

Source code in lb_controller/services/journal.py
54
55
56
57
58
59
60
61
62
@classmethod
def initialize(
    cls, run_id: str, config: Any, test_types: List[str]
) -> "RunJournal":
    """Factory to create a new journal based on configuration."""
    journal = cls(run_id=run_id)
    journal.metadata = _build_metadata(config)
    _populate_tasks(journal, config, test_types)
    return journal

load classmethod

load(path, config=None)

Load journal from disk, optionally validating against a config.

Source code in lb_controller/services/journal.py
150
151
152
153
154
155
156
157
158
159
160
161
162
163
@classmethod
def load(cls, path: Path, config: Any | None = None) -> "RunJournal":
    """Load journal from disk, optionally validating against a config."""
    with open(path, "r") as f:
        data = json.load(f)

    metadata = data.get("metadata", {}) or {}
    _validate_config(metadata, config)
    tasks_data = data.pop("tasks", [])
    journal = cls(**data)
    journal.tasks = _load_tasks(tasks_data)
    if not getattr(journal, "metadata", None):
        journal.metadata = metadata
    return journal

rehydrate_config

rehydrate_config()

Return a BenchmarkConfig reconstructed from the stored config_dump.

Source code in lb_controller/services/journal.py
165
166
167
168
169
170
171
172
173
174
175
def rehydrate_config(self) -> BenchmarkConfig | None:
    """
    Return a BenchmarkConfig reconstructed from the stored config_dump.
    """
    cfg_dump = (self.metadata or {}).get("config_dump")
    if not cfg_dump:
        return None
    try:
        return BenchmarkConfig.model_validate(cfg_dump)
    except Exception:
        return None

save

save(path)

Persist journal to disk.

Source code in lb_controller/services/journal.py
136
137
138
139
140
141
142
143
144
145
146
147
148
def save(self, path: Path) -> None:
    """Persist journal to disk."""
    data = asdict(self)
    # Ensure path exists
    if isinstance(path, str):
        path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)

    serialized = data.copy()
    serialized["tasks"] = [asdict(task) for task in self.tasks.values()]

    with open(path, "w") as f:
        json.dump(serialized, f, indent=2, default=str)

should_run

should_run(host, workload, rep, *, allow_skipped=False)

Determines if a task should be executed. Returns True if task is PENDING or FAILED (and we want to retry). For now, we skip COMPLETED tasks.

Source code in lb_controller/services/journal.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def should_run(
    self,
    host: str,
    workload: str,
    rep: int,
    *,
    allow_skipped: bool = False,
) -> bool:
    """
    Determines if a task should be executed.
    Returns True if task is PENDING or FAILED (and we want to retry).
    For now, we skip COMPLETED tasks.
    """
    task = self.get_task(host, workload, rep)
    if task:
        if allow_skipped:
            return task.status != RunStatus.COMPLETED
        return task.status not in (RunStatus.COMPLETED, RunStatus.SKIPPED)
    # If task not found, it's technically new, so run it (this shouldn't
    # happen if initialized correctly).
    return True

update_task

update_task(host, workload, rep, status, action='', error=None, error_type=None, error_context=None)
Source code in lb_controller/services/journal.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def update_task(
    self,
    host: str,
    workload: str,
    rep: int,
    status: str,
    action: str = "",
    error: Optional[str] = None,
    error_type: Optional[str] = None,
    error_context: Optional[Dict[str, Any]] = None,
) -> None:
    task = self.get_task(host, workload, rep)
    if not task:
        return
    now_ts = datetime.now().timestamp()
    self._update_task_timings(task, status, now_ts)
    task.status = status
    task.timestamp = now_ts
    if action:
        task.current_action = action
    if error:
        task.error = error
    if error_type:
        task.error_type = error_type
    if error_context:
        task.error_context = error_context

RunExecutionSummary dataclass

RunExecutionSummary(run_id, per_host_output, phases, success, output_root, report_root, data_export_root, controller_state=None, cleanup_allowed=False)

Summary of a complete controller run.

Attributes

cleanup_allowed class-attribute instance-attribute

cleanup_allowed = False

controller_state class-attribute instance-attribute

controller_state = None

data_export_root instance-attribute

data_export_root

output_root instance-attribute

output_root

per_host_output instance-attribute

per_host_output

phases instance-attribute

phases

report_root instance-attribute

report_root

run_id instance-attribute

run_id

success instance-attribute

success