Skip to content

App & UI API

Stable interfaces used by the CLI/TUI layer.

App-layer API

api

Stable application-layer API surface.

Attributes

AnalyticsKind module-attribute

AnalyticsKind = Literal['aggregate']

AppClient module-attribute

AppClient = ApplicationClient

Classes

AnalyticsRequest dataclass

AnalyticsRequest(run, kind='aggregate', hosts=None, workloads=None)

Parameters to run analytics on a stored run.

Attributes
hosts class-attribute instance-attribute
hosts = None
kind class-attribute instance-attribute
kind = 'aggregate'
run instance-attribute
run
workloads class-attribute instance-attribute
workloads = None

AnalyticsService

Execute analytics against existing artifacts.

Functions
run
run(request)
Source code in lb_analytics/engine/service.py
38
39
40
41
def run(self, request: AnalyticsRequest) -> List[Path]:
    if request.kind == "aggregate":
        return self._run_aggregate(request)
    raise ValueError(f"Unsupported analytics kind: {request.kind}")

AnsibleOutputFormatter

AnsibleOutputFormatter()

Parses raw Ansible output stream and prints user-friendly status updates.

Source code in lb_app/services/run_output.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
def __init__(self):
    # Greedy match so nested brackets in task names (e.g., role prefix + [run:...])
    # are captured in full.
    self.task_pattern = re.compile(r"TASK \[(.*)\]")
    self.bench_pattern = re.compile(r"Running benchmark: (.*)")
    self.current_phase = "Initializing"  # Default phase
    self.suppress_progress = False
    self.host_label: str = ""
    self.emit_task_timings = False
    self.emit_task_starts = False
    self._task_timer = TaskTimer()
    self._last_timing_line: str | None = None
    self._always_show_tasks = (
        "workload_runner : Build repetitions list",
        "workload_runner : Run benchmark via local runner (per repetition)",
    )
    self._suppress_task_names = {
        "Skip polling if already finished",
        "Poll LB_EVENT stream",
        "Streaming indicator",
        "Update finished status",
        "Delay",
        "Initialize polling status",
        "workload_runner : Skip polling if already finished",
        "workload_runner : Poll LB_EVENT stream",
        "workload_runner : Streaming indicator",
        "workload_runner : Update finished status",
        "workload_runner : Delay",
        "workload_runner : Initialize polling status",
    }
Attributes
bench_pattern instance-attribute
bench_pattern = compile('Running benchmark: (.*)')
current_phase instance-attribute
current_phase = 'Initializing'
emit_task_starts instance-attribute
emit_task_starts = False
emit_task_timings instance-attribute
emit_task_timings = False
host_label instance-attribute
host_label = ''
suppress_progress instance-attribute
suppress_progress = False
task_pattern instance-attribute
task_pattern = compile('TASK \\[(.*)\\]')
Functions
process
process(text, end='', log_sink=None)
Source code in lb_app/services/run_output.py
67
68
69
70
71
72
73
74
75
def process(
    self, text: str, end: str = "", log_sink: Callable[[str], None] | None = None
):
    if not text:
        return

    lines = text.splitlines()
    for line in lines:
        self._handle_line(line, log_sink=log_sink)
process_timing
process_timing(text, end='', log_sink=None)

Emit task duration lines without altering raw output.

Source code in lb_app/services/run_output.py
77
78
79
80
81
82
83
84
85
86
def process_timing(
    self, text: str, end: str = "", log_sink: Callable[[str], None] | None = None
) -> None:
    """Emit task duration lines without altering raw output."""
    if not text:
        return

    lines = text.splitlines()
    for line in lines:
        self._handle_timing_line(line, log_sink=log_sink)
set_phase
set_phase(phase)
Source code in lb_app/services/run_output.py
64
65
def set_phase(self, phase: str):
    self.current_phase = phase

ApplicationClient

ApplicationClient()

Concrete application-layer client.

Source code in lb_app/client.py
38
39
40
41
42
43
44
45
def __init__(self) -> None:
    configure_logging()
    self._config_service = ConfigService()
    self._run_service = RunService(registry_factory=create_registry)
    self._provision_service = ProvisionService(self._config_service)
    self._provisioner = ProvisioningService(
        enforce_ui_caller=True, allowed_callers=("lb_ui", "lb_app")
    )
Functions
get_run_plan
get_run_plan(config, tests, execution_mode='remote')
Source code in lb_app/client.py
58
59
60
61
62
63
64
65
66
67
68
69
70
def get_run_plan(
    self,
    config: BenchmarkConfig,
    tests: Sequence[str],
    execution_mode: str = "remote",
):
    platform_cfg, _, _ = self._config_service.load_platform_config()
    return self._run_service.get_run_plan(
        config,
        list(tests),
        execution_mode=execution_mode,
        platform_config=platform_cfg,
    )
install_loki_grafana
install_loki_grafana(*, mode, config_path, grafana_url, grafana_api_key, grafana_admin_user, grafana_admin_password, grafana_token_name, grafana_org_id, loki_endpoint, configure_assets=True)
Source code in lb_app/client.py
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
def install_loki_grafana(
    self,
    *,
    mode: str,
    config_path: Path | None,
    grafana_url: str | None,
    grafana_api_key: str | None,
    grafana_admin_user: str | None,
    grafana_admin_password: str | None,
    grafana_token_name: str | None,
    grafana_org_id: int | None,
    loki_endpoint: str | None,
    configure_assets: bool = True,
) -> ProvisionConfigSummary | None:
    return self._provision_service.install_loki_grafana(
        mode=mode,
        config_path=config_path,
        grafana_url=grafana_url,
        grafana_api_key=grafana_api_key,
        grafana_admin_user=grafana_admin_user,
        grafana_admin_password=grafana_admin_password,
        grafana_token_name=grafana_token_name,
        grafana_org_id=grafana_org_id,
        loki_endpoint=loki_endpoint,
        configure_assets=configure_assets,
    )
list_runs
list_runs(config)
Source code in lb_app/client.py
55
56
def list_runs(self, config: BenchmarkConfig) -> Iterable[RunJournal]:
    return RunJournal.list_runs(config.output_dir)
load_config
load_config(path=None)
Source code in lb_app/client.py
47
48
49
def load_config(self, path: Path | None = None) -> BenchmarkConfig:
    cfg, _, _ = self._config_service.load_for_read(path)
    return cfg
remove_loki_grafana
remove_loki_grafana(*, remove_data=False)
Source code in lb_app/client.py
374
375
def remove_loki_grafana(self, *, remove_data: bool = False) -> None:
    self._provision_service.remove_loki_grafana(remove_data=remove_data)
save_config
save_config(config, path)
Source code in lb_app/client.py
51
52
53
def save_config(self, config: BenchmarkConfig, path: Path) -> None:
    config.save(path)
    self._config_service.write_saved_config_path(path)
start_run
start_run(request, hooks)
Source code in lb_app/client.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
def start_run(self, request: RunRequest, hooks: UIHooks) -> RunResult | None:
    cfg = request.config
    target_tests = list(request.tests or list(cfg.workloads.keys()))
    self._ensure_workloads(cfg, target_tests)

    context = self._run_service.create_session(
        self._config_service,
        tests=target_tests,
        config_path=None,
        run_id=request.run_id,
        resume=request.resume,
        repetitions=request.repetitions,
        debug=request.debug,
        intensity=request.intensity,
        ui_adapter=request.ui_adapter,
        setup=request.setup,
        stop_file=request.stop_file,
        execution_mode=request.execution_mode,
        node_count=request.node_count,
        preloaded_config=cfg,
    )

    if not self._connectivity_ok(request, cfg, hooks):
        return None

    # Provision according to execution mode
    prov_result = None
    try:
        cfg, prov_result = self._provision(
            cfg,
            request.execution_mode,
            request.node_count,
            docker_engine=request.docker_engine,
            resume=request.resume,
        )
    except ProvisioningError as exc:
        hooks.on_warning(f"Provisioning failed: {exc}", ttl=5)
        return None
    context.config = cfg

    run_result: RunResult | None = None
    try:
        output_cb = self._make_output_callback(request, hooks)
        run_result = self._run_service.execute(
            context,
            run_id=request.run_id,
            output_callback=output_cb,
            ui_adapter=request.ui_adapter,
        )
        self._emit_controller_state(run_result, hooks)
    finally:
        self._cleanup_provisioning(prov_result, run_result, hooks)
    return run_result
status_loki_grafana
status_loki_grafana(*, grafana_url, grafana_api_key, grafana_org_id, loki_endpoint)
Source code in lb_app/client.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
def status_loki_grafana(
    self,
    *,
    grafana_url: str | None,
    grafana_api_key: str | None,
    grafana_org_id: int | None,
    loki_endpoint: str | None,
) -> ProvisionStatus:
    return self._provision_service.status_loki_grafana(
        grafana_url=grafana_url,
        grafana_api_key=grafana_api_key,
        grafana_org_id=grafana_org_id,
        loki_endpoint=loki_endpoint,
    )

ConfigService

ConfigService(config_home=None)

Resolve, load, and mutate BenchmarkConfig files.

Source code in lb_app/services/config_service.py
30
31
def __init__(self, config_home: Optional[Path] = None) -> None:
    self._repo = ConfigRepository(config_home)
Functions
add_remote_host
add_remote_host(host, config, enable_remote=True, set_default=False)

Add or replace a remote host definition and persist the config.

Source code in lb_app/services/config_service.py
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
def add_remote_host(
    self,
    host: RemoteHostConfig,
    config: Optional[Path],
    enable_remote: bool = True,
    set_default: bool = False,
) -> Tuple[BenchmarkConfig, Path, Optional[Path]]:
    """Add or replace a remote host definition and persist the config."""
    cfg, target, stale, _ = self.load_for_write(config, allow_create=True)
    cfg.remote_hosts = [
        existing for existing in cfg.remote_hosts if existing.name != host.name
    ]
    cfg.remote_hosts.append(host)
    cfg.remote_execution.enabled = enable_remote
    self._repo.write_benchmark_config(cfg, target)
    if set_default:
        self._repo.write_saved_config_path(target)
    return cfg, target, stale
add_workload
add_workload(name, config, set_default)

Add a workload to the run config (ensuring plugin settings).

Source code in lb_app/services/config_service.py
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def add_workload(
    self,
    name: str,
    config: Optional[Path],
    set_default: bool,
) -> Tuple[BenchmarkConfig, Path, Optional[Path]]:
    """Add a workload to the run config (ensuring plugin settings)."""
    cfg, target, stale, _ = self.load_for_write(config, allow_create=True)
    registry = create_registry()
    if name not in registry.available():
        raise ValueError(
            f"Plugin '{name}' is not installed. "
            "Use `lb plugin list` to see available plugins."
        )

    if name not in cfg.plugin_settings:
        plugin = registry.get(name)
        if hasattr(plugin, "config_cls"):
            cfg.plugin_settings[name] = plugin.config_cls()
    apply_plugin_assets(cfg, registry)

    workload = cfg.workloads.get(name) or WorkloadConfig(plugin=name, options={})
    cfg.workloads[name] = workload

    self._repo.write_benchmark_config(cfg, target)

    if set_default:
        self._repo.write_saved_config_path(target)
    return cfg, target, stale
apply_platform_defaults staticmethod
apply_platform_defaults(cfg, platform_config)

Apply platform defaults without mutating workload selection.

Source code in lb_app/services/config_service.py
87
88
89
90
91
92
@staticmethod
def apply_platform_defaults(
    cfg: BenchmarkConfig, platform_config: PlatformConfig
) -> None:
    """Apply platform defaults without mutating workload selection."""
    apply_platform_defaults(cfg, platform_config)
clear_saved_config_path
clear_saved_config_path()

Remove the saved config path pointer, if present.

Source code in lb_app/services/config_service.py
269
270
271
def clear_saved_config_path(self) -> None:
    """Remove the saved config path pointer, if present."""
    self._repo.clear_saved_config_path()
create_default_config
create_default_config()

Create a fresh BenchmarkConfig with no workloads selected.

Source code in lb_app/services/config_service.py
78
79
80
81
82
83
84
85
def create_default_config(self) -> BenchmarkConfig:
    """Create a fresh BenchmarkConfig with no workloads selected."""
    cfg = BenchmarkConfig()
    cfg.workloads = {}
    cfg.plugin_settings = {}
    apply_plugin_assets(cfg, create_registry())
    apply_playbook_defaults(cfg)
    return cfg
ensure_home
ensure_home()

Create the config home directory.

Source code in lb_app/services/config_service.py
33
34
35
def ensure_home(self) -> None:
    """Create the config home directory."""
    self._repo.ensure_home()
load_for_read
load_for_read(config_path)

Load a config for read-only scenarios.

Source code in lb_app/services/config_service.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def load_for_read(
    self, config_path: Optional[Path]
) -> Tuple[BenchmarkConfig, Optional[Path], Optional[Path]]:
    """Load a config for read-only scenarios."""
    resolved, stale = self.resolve_config_path(config_path)
    if resolved is None:
        cfg = self.create_default_config()
        platform_cfg, _, _ = self.load_platform_config()
        self.apply_platform_defaults(cfg, platform_cfg)
        return cfg, None, stale

    cfg = self._repo.read_benchmark_config(resolved)
    self._hydrate_config(cfg)
    platform_cfg, _, _ = self.load_platform_config()
    self.apply_platform_defaults(cfg, platform_cfg)
    return cfg, resolved, stale
load_for_write
load_for_write(config_path, allow_create=True)

Load a config for mutation and return (config, target_path, stale_pointer, created_new).

Source code in lb_app/services/config_service.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def load_for_write(
    self,
    config_path: Optional[Path],
    allow_create: bool = True,
) -> Tuple[BenchmarkConfig, Path, Optional[Path], bool]:
    """
    Load a config for mutation and return (config, target_path,
    stale_pointer, created_new).
    """
    resolved, stale = self.resolve_config_path(config_path)
    target = resolved or self._repo.default_target
    created = False

    if target.exists():
        cfg = self._repo.read_benchmark_config(target)
        self._hydrate_config(cfg)
    else:
        if not allow_create:
            raise FileNotFoundError(f"Config file not found: {target}")
        self._repo.ensure_parent(target)
        cfg = self.create_default_config()
        created = True

    return cfg, target, stale, created
load_platform_config
load_platform_config()

Load platform config from ~/.config/lb/platform.json (empty if missing).

Source code in lb_app/services/config_service.py
94
95
96
97
98
99
def load_platform_config(self) -> tuple[PlatformConfig, Path, bool]:
    """Load platform config from ~/.config/lb/platform.json (empty if missing)."""
    cfg = self._repo.read_platform_config()
    if cfg is not None:
        return cfg, self._repo.platform_target, True
    return PlatformConfig(), self._repo.platform_target, False
load_platform_for_write
load_platform_for_write()

Load platform config (create default if missing) for mutation.

Source code in lb_app/services/config_service.py
101
102
103
104
def load_platform_for_write(self) -> tuple[PlatformConfig, Path]:
    """Load platform config (create default if missing) for mutation."""
    cfg, target, _ = self.load_platform_config()
    return cfg, target
open_editor
open_editor(config_path)

Open the resolved config file in the system editor.

Source code in lb_app/services/config_service.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def open_editor(self, config_path: Optional[Path]) -> Path:
    """
    Open the resolved config file in the system editor.
    """
    resolved, stale = self.resolve_config_path(config_path)
    if resolved is None:
        raise FileNotFoundError(
            "No config file found to edit. Run `lb config init` first."
        )

    editor = os.environ.get("EDITOR")
    if not editor:
        raise EnvironmentError(f"Set $EDITOR or open the file manually: {resolved}")

    try:
        subprocess.run([editor, str(resolved)], check=False)
    except Exception as exc:
        raise RuntimeError(f"Failed to launch editor: {exc}") from exc

    return resolved
read_saved_config_path
read_saved_config_path()

Return (resolved_path, stale_path) from the pointer file, if any.

Source code in lb_app/services/config_service.py
265
266
267
def read_saved_config_path(self) -> Tuple[Optional[Path], Optional[Path]]:
    """Return (resolved_path, stale_path) from the pointer file, if any."""
    return self._repo.read_saved_config_path()
remove_plugin
remove_plugin(name, config)

Remove a plugin's workload and settings from a config file.

Returns (config, target_path, stale_pointer, removed_flag).

Source code in lb_app/services/config_service.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
def remove_plugin(
    self,
    name: str,
    config: Optional[Path],
) -> Tuple[BenchmarkConfig, Path, Optional[Path], bool]:
    """
    Remove a plugin's workload and settings from a config file.

    Returns (config, target_path, stale_pointer, removed_flag).
    """
    cfg, target, stale, _ = self.load_for_write(config, allow_create=False)
    removed = False
    if name in cfg.workloads:
        cfg.workloads.pop(name, None)
        removed = True
    if name in cfg.plugin_settings:
        cfg.plugin_settings.pop(name, None)
        removed = True
    self._repo.write_benchmark_config(cfg, target)
    return cfg, target, stale, removed
remove_remote_host
remove_remote_host(name, config)

Remove a remote host by name from the config.

Returns (config, target_path, stale_pointer, removed_flag).

Source code in lb_app/services/config_service.py
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
def remove_remote_host(
    self,
    name: str,
    config: Optional[Path],
) -> Tuple[BenchmarkConfig, Path, Optional[Path], bool]:
    """Remove a remote host by name from the config.

    Returns (config, target_path, stale_pointer, removed_flag).
    """
    cfg, target, stale, _ = self.load_for_write(config, allow_create=False)
    original_count = len(cfg.remote_hosts)
    cfg.remote_hosts = [h for h in cfg.remote_hosts if h.name != name]
    removed = len(cfg.remote_hosts) < original_count

    # Disable remote execution if no hosts remain
    if not cfg.remote_hosts:
        cfg.remote_execution.enabled = False

    self._repo.write_benchmark_config(cfg, target)
    return cfg, target, stale, removed
resolve_config_path
resolve_config_path(config_path)

Return (resolved_config, stale_pointer_target). Respects explicit path, environment variable LB_CONFIG_PATH, stored pointer, or local benchmark_config.json.

Source code in lb_app/services/config_service.py
58
59
60
61
62
63
64
65
66
def resolve_config_path(
    self, config_path: Optional[Path]
) -> Tuple[Optional[Path], Optional[Path]]:
    """
    Return (resolved_config, stale_pointer_target).
    Respects explicit path, environment variable LB_CONFIG_PATH, stored
    pointer, or local benchmark_config.json.
    """
    return self._repo.resolve_config_path(config_path)
set_plugin_enabled
set_plugin_enabled(name, enabled)

Enable/disable a plugin in platform config.

Source code in lb_app/services/config_service.py
106
107
108
109
110
111
112
113
114
115
def set_plugin_enabled(
    self,
    name: str,
    enabled: bool,
) -> tuple[PlatformConfig, Path]:
    """Enable/disable a plugin in platform config."""
    cfg, target = self.load_platform_for_write()
    cfg.plugins[name] = enabled
    self._repo.write_platform_config(cfg, target)
    return cfg, target
set_plugin_selection
set_plugin_selection(selection, registry)

Persist plugin selection to platform config.

Source code in lb_app/services/config_service.py
117
118
119
120
121
122
123
124
125
126
def set_plugin_selection(
    self,
    selection: set[str],
    registry: PluginRegistry,
) -> tuple[PlatformConfig, Path]:
    """Persist plugin selection to platform config."""
    cfg, target = self.load_platform_for_write()
    cfg.plugins = {name: name in selection for name in registry.available()}
    self._repo.write_platform_config(cfg, target)
    return cfg, target
write_saved_config_path
write_saved_config_path(path)

Persist a pointer to the preferred config path.

Source code in lb_app/services/config_service.py
261
262
263
def write_saved_config_path(self, path: Path) -> None:
    """Persist a pointer to the preferred config path."""
    self._repo.write_saved_config_path(path)

DashboardHandle

Bases: Protocol

Handle for a live dashboard visualization.

Functions
add_log
add_log(line)

Add a log line to the dashboard.

Source code in lb_app/ui_interfaces.py
15
16
def add_log(self, line: str) -> None:
    """Add a log line to the dashboard."""
live
live()

Return a context manager that keeps the dashboard live.

Source code in lb_app/ui_interfaces.py
12
13
def live(self) -> AbstractContextManager[None]:
    """Return a context manager that keeps the dashboard live."""
mark_event
mark_event(source)

Mark an event occurrence (e.g., visual flash).

Source code in lb_app/ui_interfaces.py
21
22
def mark_event(self, source: str) -> None:
    """Mark an event occurrence (e.g., visual flash)."""
refresh
refresh()

Trigger a refresh of the dashboard.

Source code in lb_app/ui_interfaces.py
18
19
def refresh(self) -> None:
    """Trigger a refresh of the dashboard."""

DashboardLogMetadata dataclass

DashboardLogMetadata(title='Log Stream')
Attributes
title class-attribute instance-attribute
title = 'Log Stream'

DashboardRow dataclass

DashboardRow(host, workload, intensity, status, progress, current_action, last_rep_time)
Attributes
current_action instance-attribute
current_action
host instance-attribute
host
intensity instance-attribute
intensity
last_rep_time instance-attribute
last_rep_time
progress instance-attribute
progress
status instance-attribute
status
workload instance-attribute
workload

DashboardSnapshot dataclass

DashboardSnapshot(run_id, rows, row_count, plan_rows, intensity_map, status_summary, log_metadata)
Attributes
intensity_map instance-attribute
intensity_map
log_metadata instance-attribute
log_metadata
plan_rows instance-attribute
plan_rows
row_count instance-attribute
row_count
rows instance-attribute
rows
run_id instance-attribute
run_id
status_summary instance-attribute
status_summary

DashboardStatusSummary dataclass

DashboardStatusSummary(total, completed, running, failed, skipped, pending)
Attributes
completed instance-attribute
completed
failed instance-attribute
failed
pending instance-attribute
pending
running instance-attribute
running
skipped instance-attribute
skipped
total instance-attribute
total

DashboardViewModel

DashboardViewModel(plan, journal)

Build dashboard snapshots from mutable run state.

Source code in lb_app/viewmodels/dashboard.py
53
54
55
56
57
def __init__(self, plan: list[dict[str, Any]], journal: RunJournal) -> None:
    self._journal = journal
    self._plan_rows = run_viewmodels.plan_rows(plan)
    self._intensity_map = _build_intensity_map(plan)
    self._log_metadata = DashboardLogMetadata()
Attributes
run_id property
run_id
Functions
snapshot
snapshot()
Source code in lb_app/viewmodels/dashboard.py
63
64
65
66
67
68
69
70
71
72
73
74
def snapshot(self) -> DashboardSnapshot:
    rows = _build_journal_rows(self._journal, self._intensity_map)
    status_summary = _summarize_statuses(self._journal.tasks.values())
    return DashboardSnapshot(
        run_id=self._journal.run_id,
        rows=rows,
        row_count=len(rows),
        plan_rows=self._plan_rows,
        intensity_map=self._intensity_map,
        status_summary=status_summary,
        log_metadata=self._log_metadata,
    )

DoctorCheckGroup dataclass

DoctorCheckGroup(title, items, failures)
Attributes
failures instance-attribute
failures
items instance-attribute
items
title instance-attribute
title

DoctorCheckItem dataclass

DoctorCheckItem(label, ok, required)
Attributes
label instance-attribute
label
ok instance-attribute
ok
required instance-attribute
required

DoctorReport dataclass

DoctorReport(groups, info_messages, total_failures)
Attributes
groups instance-attribute
groups
info_messages instance-attribute
info_messages
total_failures instance-attribute
total_failures

DoctorService

DoctorService(config_service=None)

Service to check local prerequisites and environment health.

Source code in lb_app/services/doctor_service.py
22
23
24
25
26
def __init__(
    self,
    config_service: Optional[ConfigService] = None,
):
    self.config_service = config_service or ConfigService()
Attributes
config_service instance-attribute
config_service = config_service or ConfigService()
Functions
check_all
check_all()

Run all checks.

Source code in lb_app/services/doctor_service.py
204
205
206
207
208
209
210
211
212
213
214
def check_all(self) -> DoctorReport:
    """Run all checks."""
    r1 = self.check_controller()
    r2 = self.check_local_tools()
    r3 = self.check_multipass()

    return DoctorReport(
        groups=r1.groups + r2.groups + r3.groups,
        info_messages=r1.info_messages + r2.info_messages + r3.info_messages,
        total_failures=r1.total_failures + r2.total_failures + r3.total_failures,
    )
check_controller
check_controller()

Check controller-side requirements (Python deps, ansible-runner).

Source code in lb_app/services/doctor_service.py
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def check_controller(self) -> DoctorReport:
    """Check controller-side requirements (Python deps, ansible-runner)."""
    groups = []
    py_deps = [
        ("psutil", self._check_import("psutil"), True),
        ("pandas", self._check_import("pandas"), True),
        ("numpy", self._check_import("numpy"), True),
        ("matplotlib", self._check_import("matplotlib"), True),
        ("seaborn", self._check_import("seaborn"), True),
        ("jc", self._check_import("jc"), True),
        (
            "influxdb-client (optional)",
            self._check_import("influxdb_client"),
            False,
        ),
    ]
    groups.append(self._build_check_group("Python Dependencies", py_deps))

    controller_tools = [
        ("ansible-runner (python)", self._check_import("ansible_runner"), True),
        ("ansible-playbook", self._check_command("ansible-playbook"), True),
    ]
    groups.append(self._build_check_group("Controller Tools", controller_tools))

    resolved, stale = self.config_service.resolve_config_path(None)
    cfg_items = [
        ("Active config", resolved is not None, False),
        ("Stale default path", stale is None, False),
    ]
    groups.append(self._build_check_group("Config Resolution", cfg_items))

    info = (
        f"Python: {platform.python_version()} ({platform.python_implementation()}) "
        f"on {platform.system()} {platform.release()}"
    )

    return DoctorReport(
        groups=groups,
        info_messages=[info],
        total_failures=sum(g.failures for g in groups),
    )
check_local_tools
check_local_tools()

Check local workload tools required by installed plugins.

Source code in lb_app/services/doctor_service.py
90
91
92
93
94
95
def check_local_tools(self) -> DoctorReport:
    """Check local workload tools required by installed plugins."""
    registry = create_registry()
    items = self._common_tool_items()
    items.extend(self._plugin_tool_items(registry))
    return self._local_tools_report(items)
check_multipass
check_multipass()

Check if Multipass is installed (used by integration test).

Source code in lb_app/services/doctor_service.py
129
130
131
132
133
134
135
def check_multipass(self) -> DoctorReport:
    """Check if Multipass is installed (used by integration test)."""
    items = [("multipass", self._check_command("multipass"), True)]
    group = self._build_check_group("Multipass", items)
    return DoctorReport(
        groups=[group], info_messages=[], total_failures=group.failures
    )
check_remote_hosts
check_remote_hosts(config=None, timeout_seconds=10)

Check SSH connectivity to configured remote hosts.

Parameters:

Name Type Description Default
config Optional[BenchmarkConfig]

Benchmark configuration with remote hosts. If None, loads from default config path.

None
timeout_seconds int

Timeout for each host connection check.

10

Returns:

Type Description
DoctorReport

DoctorReport with connectivity results for each host.

Source code in lb_app/services/doctor_service.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def check_remote_hosts(
    self,
    config: Optional[BenchmarkConfig] = None,
    timeout_seconds: int = 10,
) -> DoctorReport:
    """Check SSH connectivity to configured remote hosts.

    Args:
        config: Benchmark configuration with remote hosts.
            If None, loads from default config path.
        timeout_seconds: Timeout for each host connection check.

    Returns:
        DoctorReport with connectivity results for each host.
    """
    cfg = self._resolve_config(config)
    if not cfg.remote_hosts:
        return DoctorReport(
            groups=[],
            info_messages=["No remote hosts configured."],
            total_failures=0,
        )

    report = self._check_connectivity(cfg.remote_hosts, timeout_seconds)
    items = self._connectivity_items(report)
    group = self._build_check_group("Remote Host Connectivity", items)
    info_messages = self._connectivity_messages(report, timeout_seconds)

    return DoctorReport(
        groups=[group],
        info_messages=info_messages,
        total_failures=group.failures,
    )

NoOpDashboardHandle

Bases: DashboardHandle

No-op dashboard handle.

Functions
add_log
add_log(line)
Source code in lb_app/ui_interfaces.py
100
101
102
def add_log(self, line: str) -> None:  # pragma: no cover - trivial
    _ = line
    pass
live
live()
Source code in lb_app/ui_interfaces.py
97
98
def live(self) -> AbstractContextManager[None]:
    return nullcontext()
mark_event
mark_event(source)
Source code in lb_app/ui_interfaces.py
107
108
109
def mark_event(self, source: str) -> None:  # pragma: no cover - trivial
    _ = source
    pass
refresh
refresh()
Source code in lb_app/ui_interfaces.py
104
105
def refresh(self) -> None:  # pragma: no cover - trivial
    pass

NoOpProgressHandle

Bases: ProgressHandle

No-op progress handle.

Functions
finish
finish()
Source code in lb_app/ui_interfaces.py
90
91
def finish(self) -> None:  # pragma: no cover - trivial
    pass
update
update(completed)
Source code in lb_app/ui_interfaces.py
86
87
88
def update(self, completed: int) -> None:  # pragma: no cover - trivial
    _ = completed
    pass

NoOpUIAdapter

Bases: UIAdapter

No-op UI adapter that discards all output.

Functions
create_dashboard
create_dashboard(plan, journal, ui_log_file=None)

Create a run dashboard.

Source code in lb_app/ui_interfaces.py
157
158
159
160
161
162
163
164
165
def create_dashboard(
    self,
    plan: list[dict[str, Any]],
    journal: Any,
    ui_log_file: IO[str] | None = None,
) -> DashboardHandle:
    """Create a run dashboard."""
    _ = (plan, journal, ui_log_file)
    return NoOpDashboardHandle()
create_progress
create_progress(description, total)

Create a progress task.

Source code in lb_app/ui_interfaces.py
152
153
154
155
def create_progress(self, description: str, total: int) -> ProgressHandle:
    """Create a progress task."""
    _ = (description, total)
    return NoOpProgressHandle()
prompt_multipass_scenario
prompt_multipass_scenario(options, default_level)

Prompt user for multipass scenario selection.

Source code in lb_app/ui_interfaces.py
167
168
169
170
171
172
def prompt_multipass_scenario(
    self, options: list[str], default_level: str
) -> tuple[str, str] | None:  # pragma: no cover - trivial
    """Prompt user for multipass scenario selection."""
    _ = (options, default_level)
    return None
show_error
show_error(message)

Render an error message.

Source code in lb_app/ui_interfaces.py
123
124
125
def show_error(self, message: str) -> None:  # pragma: no cover - trivial
    """Render an error message."""
    _ = message
show_info
show_info(message)

Render an informational message.

Source code in lb_app/ui_interfaces.py
115
116
117
def show_info(self, message: str) -> None:  # pragma: no cover - trivial
    """Render an informational message."""
    _ = message
show_panel
show_panel(message, title=None, border_style=None)

Render a block/panel container.

Source code in lb_app/ui_interfaces.py
131
132
133
134
135
def show_panel(
    self, message: str, title: str | None = None, border_style: str | None = None
) -> None:  # pragma: no cover - trivial
    """Render a block/panel container."""
    _ = (message, title, border_style)
show_rule
show_rule(title)

Render a horizontal rule with a title.

Source code in lb_app/ui_interfaces.py
137
138
139
def show_rule(self, title: str) -> None:  # pragma: no cover - trivial
    """Render a horizontal rule with a title."""
    _ = title
show_success
show_success(message)

Render a success message.

Source code in lb_app/ui_interfaces.py
127
128
129
def show_success(self, message: str) -> None:  # pragma: no cover - trivial
    """Render a success message."""
    _ = message
show_table
show_table(title, columns, rows)

Render a simple table.

Source code in lb_app/ui_interfaces.py
141
142
143
144
145
def show_table(
    self, title: str, columns: Sequence[str], rows: list[Sequence[str]]
) -> None:  # pragma: no cover - trivial
    """Render a simple table."""
    _ = (title, columns, rows)
show_warning
show_warning(message)

Render a warning message.

Source code in lb_app/ui_interfaces.py
119
120
121
def show_warning(self, message: str) -> None:  # pragma: no cover - trivial
    """Render a warning message."""
    _ = message
status
status(message)

Context manager that shows a status/spinner while work is running.

Source code in lb_app/ui_interfaces.py
147
148
149
150
def status(self, message: str) -> AbstractContextManager[None]:
    """Context manager that shows a status/spinner while work is running."""
    _ = message
    return nullcontext()

ProgressHandle

Bases: Protocol

Progress task handle for updating completion.

Functions
finish
finish()

Mark the task as finished and flush any pending output.

Source code in lb_app/ui_interfaces.py
31
32
def finish(self) -> None:
    """Mark the task as finished and flush any pending output."""
update
update(completed)

Update the task with the absolute completed amount.

Source code in lb_app/ui_interfaces.py
28
29
def update(self, completed: int) -> None:
    """Update the task with the absolute completed amount."""

ProvisionConfigSummary dataclass

ProvisionConfigSummary(loki_datasource_id, datasources_configured, dashboards_configured, warnings=tuple())

Summary of Grafana configuration.

Attributes
dashboards_configured instance-attribute
dashboards_configured
datasources_configured instance-attribute
datasources_configured
loki_datasource_id instance-attribute
loki_datasource_id
warnings class-attribute instance-attribute
warnings = field(default_factory=tuple)
Functions
from_grafana classmethod
from_grafana(summary, warnings=())
Source code in lb_app/services/provision_service.py
41
42
43
44
45
46
47
48
49
50
51
52
@classmethod
def from_grafana(
    cls,
    summary: GrafanaConfigSummary,
    warnings: tuple[str, ...] = (),
) -> "ProvisionConfigSummary":
    return cls(
        loki_datasource_id=summary.loki_datasource_id,
        datasources_configured=summary.datasources_configured,
        dashboards_configured=summary.dashboards_configured,
        warnings=warnings,
    )

ProvisionService

ProvisionService(config_service)

Provision Loki + Grafana and configure dashboards.

Source code in lb_app/services/provision_service.py
58
59
60
def __init__(self, config_service: ConfigService) -> None:
    self._config_service = config_service
    self._logger = logging.getLogger(__name__)
Functions
install_loki_grafana
install_loki_grafana(*, mode, config_path, grafana_url, grafana_api_key, grafana_admin_user, grafana_admin_password, grafana_token_name, grafana_org_id, loki_endpoint, configure_assets=True)
Source code in lb_app/services/provision_service.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def install_loki_grafana(
    self,
    *,
    mode: str,
    config_path: Path | None,
    grafana_url: str | None,
    grafana_api_key: str | None,
    grafana_admin_user: str | None,
    grafana_admin_password: str | None,
    grafana_token_name: str | None,
    grafana_org_id: int | None,
    loki_endpoint: str | None,
    configure_assets: bool = True,
) -> ProvisionConfigSummary | None:
    resolved_loki, resolved_grafana, resolved_api_key, resolved_org_id = (
        self._resolve_platform_settings(
            grafana_url=grafana_url,
            grafana_api_key=grafana_api_key,
            grafana_org_id=grafana_org_id,
            loki_endpoint=loki_endpoint,
        )
    )

    install_loki_grafana(mode=mode)

    self._persist_observability_defaults(
        loki_endpoint=resolved_loki,
        grafana_url=resolved_grafana,
        grafana_api_key=resolved_api_key,
        grafana_org_id=resolved_org_id,
    )

    if not configure_assets:
        return None
    grafana_admin_user, grafana_admin_password = self._resolve_admin_credentials(
        resolved_api_key, grafana_admin_user, grafana_admin_password
    )
    registry = create_registry()
    enabled_map = self._enabled_plugin_map(registry)
    assets, warnings = self._collect_grafana_assets(
        registry, enabled_map, config_path
    )

    summary = configure_grafana(
        grafana_url=resolved_grafana,
        grafana_api_key=resolved_api_key,
        grafana_admin_user=grafana_admin_user,
        grafana_admin_password=grafana_admin_password,
        grafana_token_name=grafana_token_name,
        grafana_org_id=resolved_org_id,
        loki_endpoint=resolved_loki,
        assets=assets,
    )
    return ProvisionConfigSummary.from_grafana(summary, warnings=tuple(warnings))
remove_loki_grafana
remove_loki_grafana(*, remove_data=False)
Source code in lb_app/services/provision_service.py
221
222
def remove_loki_grafana(self, *, remove_data: bool = False) -> None:
    remove_loki_grafana(remove_data=remove_data)
status_loki_grafana
status_loki_grafana(*, grafana_url, grafana_api_key, grafana_org_id, loki_endpoint)
Source code in lb_app/services/provision_service.py
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def status_loki_grafana(
    self,
    *,
    grafana_url: str | None,
    grafana_api_key: str | None,
    grafana_org_id: int | None,
    loki_endpoint: str | None,
) -> ProvisionStatus:
    resolved_loki, resolved_grafana, resolved_api_key, resolved_org_id = (
        self._resolve_platform_settings(
            grafana_url=grafana_url,
            grafana_api_key=grafana_api_key,
            grafana_org_id=grafana_org_id,
            loki_endpoint=loki_endpoint,
        )
    )
    loki_ok = check_loki_ready(resolved_loki)
    grafana_ok = check_grafana_ready(
        resolved_grafana,
        api_key=resolved_api_key,
        org_id=resolved_org_id,
    )
    return ProvisionStatus(
        loki_url=resolved_loki,
        grafana_url=resolved_grafana,
        loki_ready=loki_ok,
        grafana_ready=grafana_ok,
    )

ProvisionStatus dataclass

ProvisionStatus(loki_url, grafana_url, loki_ready, grafana_ready)

Health status for Loki and Grafana.

Attributes
grafana_ready instance-attribute
grafana_ready
grafana_url instance-attribute
grafana_url
loki_ready instance-attribute
loki_ready
loki_url instance-attribute
loki_url

RunContext dataclass

RunContext(config, target_tests, registry, config_path=None, debug=False, resume_from=None, resume_latest=False, stop_file=None, execution_mode='remote', node_count=None)

Inputs required to execute a run.

Attributes
config instance-attribute
config
config_path class-attribute instance-attribute
config_path = None
debug class-attribute instance-attribute
debug = False
execution_mode class-attribute instance-attribute
execution_mode = 'remote'
node_count class-attribute instance-attribute
node_count = None
registry instance-attribute
registry
resume_from class-attribute instance-attribute
resume_from = None
resume_latest class-attribute instance-attribute
resume_latest = False
stop_file class-attribute instance-attribute
stop_file = None
target_tests instance-attribute
target_tests

RunRequest dataclass

RunRequest(config, tests, run_id=None, resume=None, debug=False, intensity=None, setup=True, stop_file=None, execution_mode='remote', repetitions=None, node_count=1, docker_engine='docker', ui_adapter=None, skip_connectivity_check=False, connectivity_timeout=10)

Inputs required to start a run from the UI.

Attributes
config instance-attribute
config
connectivity_timeout class-attribute instance-attribute
connectivity_timeout = 10
debug class-attribute instance-attribute
debug = False
docker_engine class-attribute instance-attribute
docker_engine = 'docker'
execution_mode class-attribute instance-attribute
execution_mode = 'remote'
intensity class-attribute instance-attribute
intensity = None
node_count class-attribute instance-attribute
node_count = 1
repetitions class-attribute instance-attribute
repetitions = None
resume class-attribute instance-attribute
resume = None
run_id class-attribute instance-attribute
run_id = None
setup class-attribute instance-attribute
setup = True
skip_connectivity_check class-attribute instance-attribute
skip_connectivity_check = False
stop_file class-attribute instance-attribute
stop_file = None
tests instance-attribute
tests
ui_adapter class-attribute instance-attribute
ui_adapter = None

RunResult dataclass

RunResult(context, summary, journal_path=None, log_path=None, ui_log_path=None)

Outcome of a run.

Attributes
context instance-attribute
context
journal_path class-attribute instance-attribute
journal_path = None
log_path class-attribute instance-attribute
log_path = None
summary instance-attribute
summary
ui_log_path class-attribute instance-attribute
ui_log_path = None

RunService

RunService(registry_factory)

Coordinate benchmark execution for CLI commands.

Source code in lb_app/services/run_service.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def __init__(self, registry_factory: Callable[[], PluginRegistry]):
    self._registry_factory = registry_factory
    self._progress_token = "LB_EVENT"
    self._execution_loop = RunExecutionLoop()
    self._session_manager = SessionManager(registry_factory)
    self._context_builder = RunContextBuilder(registry_factory)
    self._log_attachment_service = ControllerLogAttachmentService()
    self._execution_coordinator = RunExecutionCoordinator(
        session_manager=self._session_manager,
        execution_loop=self._execution_loop,
        progress_token=self._progress_token,
        log_attachment_service=self._log_attachment_service,
    )
Functions
apply_overrides staticmethod
apply_overrides(cfg, intensity, debug)

Apply CLI-driven overrides to the configuration.

Source code in lb_app/services/run_service.py
78
79
80
81
82
83
@staticmethod
def apply_overrides(
    cfg: BenchmarkConfig, intensity: str | None, debug: bool
) -> None:
    """Apply CLI-driven overrides to the configuration."""
    apply_overrides(cfg, intensity=intensity, debug=debug)
build_context
build_context(cfg, tests, config_path=None, debug=False, resume=None, stop_file=None, execution_mode='remote', node_count=None)

Compute the run context and registry.

Source code in lb_app/services/run_service.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def build_context(
    self,
    cfg: BenchmarkConfig,
    tests: Optional[List[str]],
    config_path: Optional[Path] = None,
    debug: bool = False,
    resume: Optional[str] = None,
    stop_file: Optional[Path] = None,
    execution_mode: str = "remote",
    node_count: int | None = None,
) -> RunContext:
    """Compute the run context and registry."""
    return self._context_builder.build_context(
        cfg,
        tests,
        config_path=config_path,
        debug=debug,
        resume=resume,
        stop_file=stop_file,
        execution_mode=execution_mode,
        node_count=node_count,
    )
create_session
create_session(config_service, tests=None, config_path=None, run_id=None, resume=None, repetitions=None, debug=False, intensity=None, ui_adapter=None, setup=True, stop_file=None, execution_mode='remote', node_count=None, preloaded_config=None)

Orchestrate the creation of a RunContext from raw inputs.

This method consolidates configuration loading, overrides, and context building.

Source code in lb_app/services/run_service.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def create_session(
    self,
    config_service: Any,
    tests: Optional[List[str]] = None,
    config_path: Optional[Path] = None,
    run_id: Optional[str] = None,
    resume: Optional[str] = None,
    repetitions: Optional[int] = None,
    debug: bool = False,
    intensity: Optional[str] = None,
    ui_adapter: UIAdapter | None = None,
    setup: bool = True,
    stop_file: Optional[Path] = None,
    execution_mode: str = "remote",
    node_count: int | None = None,
    preloaded_config: BenchmarkConfig | None = None,
) -> RunContext:
    """
    Orchestrate the creation of a RunContext from raw inputs.

    This method consolidates configuration loading, overrides, and context building.
    """
    return self._context_builder.create_session(
        config_service,
        tests=tests,
        config_path=config_path,
        run_id=run_id,
        resume=resume,
        repetitions=repetitions,
        debug=debug,
        intensity=intensity,
        ui_adapter=ui_adapter,
        setup=setup,
        stop_file=stop_file,
        execution_mode=execution_mode,
        node_count=node_count,
        preloaded_config=preloaded_config,
    )
execute
execute(context, run_id, output_callback=None, ui_adapter=None)

Execute benchmarks using remote hosts provisioned upstream.

Source code in lb_app/services/run_service.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
def execute(
    self,
    context: RunContext,
    run_id: Optional[str],
    output_callback: Optional[Callable[[str, str], None]] = None,
    ui_adapter: UIAdapter | None = None,
) -> RunResult:
    """Execute benchmarks using remote hosts provisioned upstream."""
    if not context.config.remote_hosts:
        raise ValueError(
            "No remote hosts available. Provision nodes before running benchmarks."
        )

    stop_token = StopToken(stop_file=context.stop_file, enable_signals=False)
    formatter = AnsibleOutputFormatter()
    formatter.emit_task_timings = False
    formatter.emit_task_starts = False
    callback = output_callback
    emit_timing = False
    if callback is None:
        if context.debug:
            # In debug mode, print everything raw for troubleshooting
            def _debug_printer(text: str, end: str = ""):
                print(text, end=end, flush=True)

            callback = _debug_printer
        else:
            callback = formatter.process

    return self._run_remote(
        context,
        run_id,
        callback,
        formatter,
        ui_adapter,
        stop_token=stop_token,
        emit_timing=emit_timing,
    )
get_run_plan
get_run_plan(cfg, tests, execution_mode='remote', registry=None, platform_config=None)

Build a detailed plan for the workloads to be run.

Returns a list of dictionaries containing status, intensity, details, etc.

Source code in lb_app/services/run_service.py
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
def get_run_plan(
    self,
    cfg: BenchmarkConfig,
    tests: List[str],
    execution_mode: str = "remote",
    registry: PluginRegistry | None = None,
    platform_config: PlatformConfig | None = None,
) -> List[Dict[str, Any]]:
    """
    Build a detailed plan for the workloads to be run.

    Returns a list of dictionaries containing status, intensity, details, etc.
    """
    registry = registry or self._registry_factory()
    return build_run_plan(
        cfg,
        tests,
        execution_mode=execution_mode,
        registry=registry,
        platform_config=platform_config,
    )

TestService

TestService(ui=None, config_service=None)

Service for test configuration and interactions.

Source code in lb_app/services/test_service.py
32
33
34
35
36
37
38
def __init__(
    self,
    ui: UIAdapter | None = None,
    config_service: ConfigService | None = None,
) -> None:
    self.ui: UIAdapter = ui or NoOpUIAdapter()
    self.config_service: ConfigService = config_service or ConfigService()
Attributes
config_service instance-attribute
config_service = config_service or ConfigService()
ui instance-attribute
ui = ui or NoOpUIAdapter()
Functions
build_multipass_scenario
build_multipass_scenario(intensity, selection)

Construct the scenario details for the test plan.

Source code in lb_app/services/test_service.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def build_multipass_scenario(
    self, intensity: Dict[str, Any], selection: str
) -> MultipassScenario:
    """Construct the scenario details for the test plan."""

    # Defaults for generic single workload
    target = "tests/e2e/test_multipass_benchmark.py"
    target_label = "benchmark"
    workload_label = selection
    duration_label = f"{selection} (default duration)"
    workload_rows = []
    env_vars = {"LB_MULTIPASS_WORKLOADS": selection}

    # Helper to build specific rows
    def row_stress_ng():
        return (
            "stress_ng",
            f"{intensity['stress']}s",
            "1",
            "0s/0s",
            f"timeout={intensity['stress']}s, cpu_workers=1",
        )

    def row_dd():
        return (
            "dd",
            f"approx {intensity['dd_count']}MiB",
            "1",
            "0s/0s",
            f"bs=1M, count={intensity['dd_count']}",
        )

    def row_fio():
        return (
            "fio",
            f"{intensity['fio_runtime']}s",
            "1",
            "0s/0s",
            f"size={intensity['fio_size']}, randrw, bs=4k",
        )

    if selection == "multi":
        target = "tests/e2e/test_multipass_multi_workloads.py"
        target_label = "multi-workloads"
        workload_label = "stress_ng, dd, fio"
        duration_label = (
            f"stress_ng {intensity['stress']}s; "
            f"dd ~{intensity['dd_count']}MiB; "
            f"fio {intensity['fio_runtime']}s/{intensity['fio_size']}"
        )
        workload_rows = [row_stress_ng(), row_dd(), row_fio()]
        env_vars = {}  # Uses its own hardcoded logic or we could migrate it

    elif selection == "stress_ng":
        duration_label = f"stress_ng {intensity['stress']}s"
        workload_rows = [row_stress_ng()]

    elif selection == "dd":
        duration_label = f"dd ~{intensity['dd_count']}MiB"
        workload_rows = [row_dd()]

    elif selection == "fio":
        duration_label = f"fio {intensity['fio_runtime']}s"
        workload_rows = [row_fio()]

    else:
        # Generic fallback for workloads not explicitly detailed in intensity
        duration_label = "default"
        workload_rows = [(selection, "default", "1", "0s/0s", "default config")]

    return MultipassScenario(
        target=target,
        target_label=target_label,
        workload_label=workload_label,
        duration_label=duration_label,
        workload_rows=workload_rows,
        env_vars=env_vars,
    )
get_multipass_intensity
get_multipass_intensity(force_env=None)

Return intensity parameters based on LB_MULTIPASS_FORCE env var or argument.

Source code in lb_app/services/test_service.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def get_multipass_intensity(
    self, force_env: Optional[str] = None
) -> Dict[str, Any]:
    """
    Return intensity parameters based on LB_MULTIPASS_FORCE env var or argument.
    """
    level = (force_env or os.environ.get("LB_MULTIPASS_FORCE", "medium")).lower()
    normalized = {
        "bassa": "low",
        "low": "low",
        "media": "medium",
        "medium": "medium",
        "alta": "high",
        "high": "high",
    }.get(level, "medium")

    if normalized == "low":
        return {
            "level": "low",
            "stress": 3,
            "dd_count": 8,
            "fio_runtime": 3,
            "fio_size": "32M",
            "stress_duration": 3,
            "stress_timeout": 3,
        }
    if normalized == "high":
        return {
            "level": "high",
            "stress": 10,
            "dd_count": 64,
            "fio_runtime": 10,
            "fio_size": "128M",
            "stress_duration": 10,
            "stress_timeout": 10,
        }
    return {
        "level": "medium",
        "stress": 5,
        "dd_count": 32,
        "fio_runtime": 5,
        "fio_size": "64M",
        "stress_duration": 5,
        "stress_timeout": 5,
    }
select_multipass
select_multipass(force_interactive=False, default_level='medium')

Select a Multipass scenario; fall back to defaults when non-interactive.

Source code in lb_app/services/test_service.py
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def select_multipass(
    self, force_interactive: bool = False, default_level: str = "medium"
) -> tuple[str, str]:
    """Select a Multipass scenario; fall back to defaults when non-interactive."""
    cfg = self.config_service.create_default_config()
    workload_names = list(cfg.workloads.keys())
    if not workload_names:
        return "stress_ng", default_level

    options = list(dict.fromkeys(workload_names + ["multi"]))

    interactive = force_interactive or (sys.stdin.isatty() and sys.stdout.isatty())
    if interactive:
        choice = self.ui.prompt_multipass_scenario(options, default_level)
        if choice is not None:
            return choice

    return options[0], default_level

UIAdapter

Bases: Protocol

Minimal interface for presentation concerns.

Functions
create_dashboard
create_dashboard(plan, journal, ui_log_file=None)

Create a run dashboard.

Source code in lb_app/ui_interfaces.py
69
70
71
72
73
74
75
def create_dashboard(
    self,
    plan: list[dict[str, Any]],
    journal: Any,
    ui_log_file: IO[str] | None = None,
) -> DashboardHandle:
    """Create a run dashboard."""
create_progress
create_progress(description, total)

Create a progress task.

Source code in lb_app/ui_interfaces.py
66
67
def create_progress(self, description: str, total: int) -> ProgressHandle:
    """Create a progress task."""
prompt_multipass_scenario
prompt_multipass_scenario(options, default_level)

Prompt user for multipass scenario selection.

Source code in lb_app/ui_interfaces.py
77
78
79
80
def prompt_multipass_scenario(
    self, options: list[str], default_level: str
) -> tuple[str, str] | None:
    """Prompt user for multipass scenario selection."""
show_error
show_error(message)

Render an error message.

Source code in lb_app/ui_interfaces.py
44
45
def show_error(self, message: str) -> None:
    """Render an error message."""
show_info
show_info(message)

Render an informational message.

Source code in lb_app/ui_interfaces.py
38
39
def show_info(self, message: str) -> None:
    """Render an informational message."""
show_panel
show_panel(message, title=None, border_style=None)

Render a block/panel container.

Source code in lb_app/ui_interfaces.py
50
51
52
53
def show_panel(
    self, message: str, title: str | None = None, border_style: str | None = None
) -> None:
    """Render a block/panel container."""
show_rule
show_rule(title)

Render a horizontal rule with a title.

Source code in lb_app/ui_interfaces.py
55
56
def show_rule(self, title: str) -> None:
    """Render a horizontal rule with a title."""
show_success
show_success(message)

Render a success message.

Source code in lb_app/ui_interfaces.py
47
48
def show_success(self, message: str) -> None:
    """Render a success message."""
show_table
show_table(title, columns, rows)

Render a simple table.

Source code in lb_app/ui_interfaces.py
58
59
60
61
def show_table(
    self, title: str, columns: Sequence[str], rows: list[Sequence[str]]
) -> None:
    """Render a simple table."""
show_warning
show_warning(message)

Render a warning message.

Source code in lb_app/ui_interfaces.py
41
42
def show_warning(self, message: str) -> None:
    """Render a warning message."""
status
status(message)

Context manager that shows a status/spinner while work is running.

Source code in lb_app/ui_interfaces.py
63
64
def status(self, message: str) -> AbstractContextManager[None]:
    """Context manager that shows a status/spinner while work is running."""

UIHooks

Bases: Protocol

Callbacks invoked by the application layer to update the UI.

Functions
on_event
on_event(event)
Source code in lb_app/interfaces.py
20
def on_event(self, event: RunEvent) -> None: ...
on_journal
on_journal(journal)
Source code in lb_app/interfaces.py
21
def on_journal(self, journal: RunJournal) -> None: ...
on_log
on_log(line)
Source code in lb_app/interfaces.py
17
def on_log(self, line: str) -> None: ...
on_status
on_status(controller_state)
Source code in lb_app/interfaces.py
18
def on_status(self, controller_state: str) -> None: ...
on_warning
on_warning(message, ttl=10.0)
Source code in lb_app/interfaces.py
19
def on_warning(self, message: str, ttl: float = 10.0) -> None: ...

Functions

build_dashboard_viewmodel

build_dashboard_viewmodel(plan, journal)
Source code in lb_app/viewmodels/dashboard.py
77
78
79
80
def build_dashboard_viewmodel(
    plan: list[dict[str, Any]], journal: RunJournal
) -> DashboardViewModel:
    return DashboardViewModel(plan, journal)

event_status_line

event_status_line(event_source, last_event_ts, *, now=None)

Return (status, detail) for event freshness.

Source code in lb_app/viewmodels/dashboard.py
83
84
85
86
87
88
89
90
91
92
def event_status_line(
    event_source: str, last_event_ts: float | None, *, now: float | None = None
) -> tuple[str, str]:
    """Return (status, detail) for event freshness."""
    if last_event_ts is None:
        return "waiting", "waiting"
    now_ts = time.monotonic() if now is None else now
    age = now_ts - last_event_ts
    freshness = "just now" if age < 1.0 else f"{age:.1f}s ago"
    return "live", freshness

generate_run_id

generate_run_id()

Generate a timestamped run id matching the controller's format.

Source code in lb_app/services/run_journal.py
398
399
400
def generate_run_id() -> str:
    """Generate a timestamped run id matching the controller's format."""
    return datetime.now(UTC).strftime("run-%Y%m%d-%H%M%S")

journal_rows

journal_rows(journal)

Return headers and rows summarizing a journal.

Source code in lb_app/viewmodels/run_viewmodels.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def journal_rows(journal: RunJournal) -> tuple[list[str], list[list[str]]]:
    """Return headers and rows summarizing a journal."""
    if not journal.tasks:
        return ["Host", "Workload"], []

    target = target_repetitions(journal)
    columns = ["Host", "Workload", "Run", "Last Action"]

    pairs = sorted({(task.host, task.workload) for task in journal.tasks.values()})
    rows: list[list[str]] = []
    for host, workload in pairs:
        tasks = {
            task.repetition: task
            for task in journal.tasks.values()
            if task.host == host and task.workload == workload
        }
        last_action = ""
        if tasks:
            latest = max(tasks.values(), key=lambda t: t.timestamp)
            last_action = latest.error or latest.current_action or ""

        status, progress = summarize_progress(tasks, target)
        row = [host, workload, f"{status}\n{progress}", last_action]
        rows.append(row)

    return columns, rows

plan_rows

plan_rows(plan)

Transform plan entries into table rows.

Source code in lb_app/viewmodels/run_viewmodels.py
61
62
63
64
65
66
67
68
69
70
71
72
73
def plan_rows(plan: Iterable[dict]) -> list[list[str]]:
    """Transform plan entries into table rows."""
    return [
        [
            item.get("name"),
            item.get("plugin"),
            item.get("intensity"),
            item.get("details"),
            item.get("repetitions", ""),
            item.get("status"),
        ]
        for item in plan
    ]

results_exist_for_run

results_exist_for_run(run_root)

Return True when any *_results.json exists under the run root.

Source code in lb_app/services/run_journal.py
348
349
350
351
352
353
354
def results_exist_for_run(run_root: Path) -> bool:
    """Return True when any *_results.json exists under the run root."""
    try:
        next(run_root.rglob("*_results.json"))
        return True
    except StopIteration:
        return False

summarize_progress

summarize_progress(tasks, target_reps)
Source code in lb_app/viewmodels/run_viewmodels.py
18
19
20
21
22
23
24
25
26
27
28
29
30
def summarize_progress(
    tasks: Dict[int, TaskState], target_reps: int
) -> tuple[str, str]:
    total = target_reps or len(tasks)
    completed = sum(
        1
        for task in tasks.values()
        if task.status in (RunStatus.COMPLETED, RunStatus.SKIPPED, RunStatus.FAILED)
    )
    flags = _progress_flags(tasks)
    status = _status_from_flags(flags, completed, total)
    progress = _progress_label(completed, total)
    return status, progress

summarize_system_info

summarize_system_info(path)

Return a one-line summary for a system_info.json file.

Source code in lb_app/services/run_system_info.py
62
63
64
65
66
67
68
69
70
71
72
73
def summarize_system_info(path: Path) -> str | None:
    """Return a one-line summary for a system_info.json file."""
    data = load_json(path)
    if not isinstance(data, dict):
        return None
    os_part = format_os_summary(data)
    cpu_part = format_cpu_summary(data)
    mem_part = format_memory_summary(data)
    disk_part = format_disk_summary(data)
    parts = [os_part, cpu_part, mem_part, disk_part]
    parts = [part for part in parts if part]
    return " | ".join(parts) if parts else None

target_repetitions

target_repetitions(journal)
Source code in lb_app/viewmodels/run_viewmodels.py
10
11
12
13
14
15
def target_repetitions(journal: RunJournal) -> int:
    from_metadata = journal.metadata.get("repetitions")
    if isinstance(from_metadata, int) and from_metadata > 0:
        return from_metadata
    reps = [task.repetition for task in journal.tasks.values()]
    return max(reps) if reps else 0

UI composition helpers

UIContext dataclass

UIContext(headless=False, dev_mode=False, _ui=None, _ui_adapter=None, _config_service=None, _doctor_service=None, _test_service=None, _analytics_service=None, _app_client=None)

Container for UI services and state, initialized lazily.

Attributes

analytics_service property writable

analytics_service

app_client property writable

app_client

config_service property writable

config_service

dev_mode class-attribute instance-attribute

dev_mode = False

doctor_service property writable

doctor_service

headless class-attribute instance-attribute

headless = False

test_service property writable

test_service

ui property writable

ui

ui_adapter property writable

ui_adapter

load_dev_mode

load_dev_mode(cli_root)

Return True when dev mode is enabled via marker file or pyproject flag.

Source code in lb_ui/wiring/dependencies.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def load_dev_mode(cli_root: Path) -> bool:
    """Return True when dev mode is enabled via marker file or pyproject flag."""
    marker = cli_root / ".lb_dev_cli"
    if marker.exists():
        return True
    pyproject = cli_root / "pyproject.toml"
    if pyproject.exists():
        try:
            data = tomllib.loads(pyproject.read_text())
            tool_cfg = data.get("tool", {}).get("lb_ui", {}) or {}
            if isinstance(tool_cfg, dict):
                dev_flag = tool_cfg.get("dev_mode")
                if isinstance(dev_flag, bool):
                    return dev_flag
        except Exception:
            pass
    return False