Skip to content

Runner API

Entry points for running workloads locally and working with the plugin system.

Public surface

api

Stable runner API surface.

Attributes

DEFAULT_LB_WORKDIR module-attribute

DEFAULT_LB_WORKDIR = "{{ (ansible_user == 'root') | ternary('/root', '/home/' ~ ansible_user) }}/.lb"

Classes

BaseCollector

BaseCollector(name, interval_seconds=1.0)

Bases: ABC

Abstract base class for all metric collectors.

Initialize the base collector.

Parameters:

Name Type Description Default
name str

Name of the collector

required
interval_seconds float

Sampling interval in seconds

1.0
Source code in lb_runner/metric_collectors/_base_collector.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, name: str, interval_seconds: float = 1.0):
    """
    Initialize the base collector.

    Args:
        name: Name of the collector
        interval_seconds: Sampling interval in seconds
    """
    self.name = name
    self.interval_seconds = interval_seconds
    self._is_running = False
    self._thread: Optional[threading.Thread] = None
    self._data: List[Dict[str, Any]] = []
    self._lock = threading.Lock()
    self._start_time: Optional[datetime] = None
    self._stop_time: Optional[datetime] = None
    self._errors: list[MetricCollectionError] = []
Attributes
interval_seconds instance-attribute
interval_seconds = interval_seconds
name instance-attribute
name = name
Functions
clear_data
clear_data()

Clear all collected data.

Source code in lb_runner/metric_collectors/_base_collector.py
185
186
187
188
def clear_data(self) -> None:
    """Clear all collected data."""
    with self._lock:
        self._data.clear()
get_data
get_data()

Get the collected data.

Returns:

Type Description
List[Dict[str, Any]]

List of dictionaries containing metric data

Source code in lb_runner/metric_collectors/_base_collector.py
132
133
134
135
136
137
138
139
140
def get_data(self) -> List[Dict[str, Any]]:
    """
    Get the collected data.

    Returns:
        List of dictionaries containing metric data
    """
    with self._lock:
        return self._data.copy()
get_dataframe
get_dataframe()

Get the collected data as a pandas DataFrame.

Returns:

Type Description
DataFrame

DataFrame with metrics data

Source code in lb_runner/metric_collectors/_base_collector.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_dataframe(self) -> pd.DataFrame:
    """
    Get the collected data as a pandas DataFrame.

    Returns:
        DataFrame with metrics data
    """
    data = self.get_data()
    if not data:
        return pd.DataFrame()

    df = pd.DataFrame(data)
    if "timestamp" in df.columns:
        df["timestamp"] = pd.to_datetime(df["timestamp"])
        df.set_index("timestamp", inplace=True)

    return df
get_errors
get_errors()

Return collected metric errors, if any.

Source code in lb_runner/metric_collectors/_base_collector.py
142
143
144
def get_errors(self) -> list[MetricCollectionError]:
    """Return collected metric errors, if any."""
    return list(self._errors)
get_summary_stats
get_summary_stats()

Get summary statistics for all numeric metrics.

Returns:

Type Description
Dict[str, Dict[str, float]]

Dictionary mapping metric names to their statistics

Source code in lb_runner/metric_collectors/_base_collector.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def get_summary_stats(self) -> Dict[str, Dict[str, float]]:
    """
    Get summary statistics for all numeric metrics.

    Returns:
        Dictionary mapping metric names to their statistics
    """
    df = self.get_dataframe()
    if df.empty:
        return {}

    # Select only numeric columns
    numeric_df = df.select_dtypes(include=["number"])

    stats = {}
    for col in numeric_df.columns:
        stats[col] = {
            "mean": numeric_df[col].mean(),
            "std": numeric_df[col].std(),
            "min": numeric_df[col].min(),
            "max": numeric_df[col].max(),
            "median": numeric_df[col].median(),
            "p95": numeric_df[col].quantile(0.95),
            "p99": numeric_df[col].quantile(0.99),
        }

    return stats
save_data
save_data(filepath, format='csv')

Save collected data to file.

Parameters:

Name Type Description Default
filepath Path

Path to save the data

required
format str

Format to save in ('csv', 'json', 'parquet')

'csv'
Source code in lb_runner/metric_collectors/_base_collector.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
def save_data(self, filepath: Path, format: str = "csv") -> None:
    """
    Save collected data to file.

    Args:
        filepath: Path to save the data
        format: Format to save in ('csv', 'json', 'parquet')
    """
    df = self.get_dataframe()

    if format == "csv":
        df.to_csv(filepath)
    elif format == "json":
        df.to_json(filepath, orient="records", date_format="iso")
    elif format == "parquet":
        df.to_parquet(filepath)
    else:
        raise ValueError(f"Unsupported format: {format}")

    logger.info(f"Saved {self.name} data to {filepath}")
start
start()

Start the metric collection in a background thread.

Source code in lb_runner/metric_collectors/_base_collector.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def start(self) -> None:
    """Start the metric collection in a background thread."""
    if self._is_running:
        logger.warning(f"{self.name} collector is already running")
        return

    if not self._validate_environment():
        raise MetricCollectionError(
            f"{self.name} collector cannot run in this environment",
            context={"collector": self.name},
        )

    self._is_running = True
    self._start_time = datetime.now()
    self._data.clear()
    self._errors.clear()

    self._thread = threading.Thread(target=self._collection_loop, daemon=True)
    self._thread.start()

    logger.info(f"{self.name} collector started")
stop
stop()

Stop the metric collection.

Source code in lb_runner/metric_collectors/_base_collector.py
85
86
87
88
89
90
91
92
93
94
95
96
97
def stop(self) -> None:
    """Stop the metric collection."""
    if not self._is_running:
        logger.warning(f"{self.name} collector is not running")
        return

    self._is_running = False
    self._stop_time = datetime.now()

    if self._thread:
        self._thread.join(timeout=self.interval_seconds * 2)

    logger.info(f"{self.name} collector stopped")

BenchmarkConfig

Bases: BaseModel

Main configuration for benchmark tests.

Attributes
collect_system_info class-attribute instance-attribute
collect_system_info = Field(default=True, description='Collect system information before running benchmarks')
collectors class-attribute instance-attribute
collectors = Field(default_factory=MetricCollectorConfig, description='Configuration for metric collectors')
cooldown_seconds class-attribute instance-attribute
cooldown_seconds = Field(default=5, ge=0, description='Cooldown period after test finishes')
data_export_dir class-attribute instance-attribute
data_export_dir = Field(default=Path('./data_exports'), description='Directory for raw data exports')
influxdb_bucket class-attribute instance-attribute
influxdb_bucket = Field(default='performance', description='InfluxDB Bucket')
influxdb_enabled class-attribute instance-attribute
influxdb_enabled = Field(default=False, description='Enable InfluxDB integration')
influxdb_org class-attribute instance-attribute
influxdb_org = Field(default='benchmark', description='InfluxDB Organization')
influxdb_token class-attribute instance-attribute
influxdb_token = Field(default='', description='InfluxDB API Token')
influxdb_url class-attribute instance-attribute
influxdb_url = Field(default='http://localhost:8086', description='InfluxDB URL')
loki class-attribute instance-attribute
loki = Field(default_factory=LokiConfig, description='Loki log shipping configuration')
metrics_interval_seconds class-attribute instance-attribute
metrics_interval_seconds = Field(default=1.0, gt=0, description='Interval for metric collection in seconds')
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='ignore')
output_dir class-attribute instance-attribute
output_dir = Field(default=Path('./benchmark_results'), description='Root directory for all benchmark output')
plugin_assets class-attribute instance-attribute
plugin_assets = Field(default_factory=dict, description='Resolved plugin Ansible assets/extravars (from runner registry)')
plugin_settings class-attribute instance-attribute
plugin_settings = Field(default_factory=dict, description='Dictionary of plugin-specific Pydantic config models')
remote_execution class-attribute instance-attribute
remote_execution = Field(default_factory=RemoteExecutionConfig, description='Configuration for remote execution')
remote_hosts class-attribute instance-attribute
remote_hosts = Field(default_factory=list, description='List of remote hosts for benchmarking')
repetitions class-attribute instance-attribute
repetitions = Field(default=3, gt=0, description='Number of repetitions for each test')
report_dir class-attribute instance-attribute
report_dir = Field(default=Path('./reports'), description='Directory for generated reports')
test_duration_seconds class-attribute instance-attribute
test_duration_seconds = Field(default=3600, gt=0, description='Default duration for tests in seconds')
warmup_seconds class-attribute instance-attribute
warmup_seconds = Field(default=5, ge=0, description='Warmup period before metric collection starts')
workloads class-attribute instance-attribute
workloads = Field(default_factory=dict, description='Dictionary of workload definitions')
Functions
ensure_output_dirs
ensure_output_dirs()

Ensures all configured output directories exist.

Source code in lb_runner/models/config.py
311
312
313
314
def ensure_output_dirs(self) -> None:
    """Ensures all configured output directories exist."""
    for path in (self.output_dir, self.report_dir, self.data_export_dir):
        path.mkdir(parents=True, exist_ok=True)
from_dict classmethod
from_dict(data)
Source code in lb_runner/models/config.py
334
335
336
337
338
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BenchmarkConfig":
    # Use Pydantic's built-in dictionary parsing and validation
    # Pydantic will handle nested models and Path conversions automatically
    return cls.model_validate(data)
from_json classmethod
from_json(json_str)
Source code in lb_runner/models/config.py
329
330
331
332
@classmethod
def from_json(cls, json_str: str) -> "BenchmarkConfig":
    # Use Pydantic's built-in JSON parsing and validation
    return cls.model_validate_json(json_str)
load classmethod
load(filepath)
Source code in lb_runner/models/config.py
343
344
345
@classmethod
def load(cls, filepath: Path) -> "BenchmarkConfig":
    return cls.model_validate_json(filepath.read_text())
save
save(filepath)
Source code in lb_runner/models/config.py
340
341
def save(self, filepath: Path) -> None:
    filepath.write_text(self.model_dump_json(indent=2))

GrafanaPlatformConfig

Bases: BaseModel

Platform-level Grafana connection settings.

Attributes
api_key class-attribute instance-attribute
api_key = Field(default=None, description='Grafana API key (optional)')
model_config class-attribute instance-attribute
model_config = {'extra': 'ignore'}
org_id class-attribute instance-attribute
org_id = Field(default=1, ge=1, description='Grafana organization id')
url class-attribute instance-attribute
url = Field(default='http://localhost:3000', description='Grafana base URL')

LBEventLogHandler

LBEventLogHandler(run_id, host, workload, repetition, total_repetitions)

Bases: Handler

Logging handler that emits logs as structured LB_EVENT JSON lines to stdout, allowing the controller to capture and stream them.

Source code in lb_runner/services/log_handler.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(
    self,
    run_id: str,
    host: str,
    workload: str,
    repetition: int,
    total_repetitions: int,
) -> None:
    super().__init__()
    self.run_id = run_id
    self.host = host
    self.workload = workload
    self.repetition = repetition
    self.total_repetitions = total_repetitions
Attributes
host instance-attribute
host = host
repetition instance-attribute
repetition = repetition
run_id instance-attribute
run_id = run_id
total_repetitions instance-attribute
total_repetitions = total_repetitions
workload instance-attribute
workload = workload
Functions
emit
emit(record)
Source code in lb_runner/services/log_handler.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def emit(self, record: logging.LogRecord) -> None:
    try:
        msg = self.format(record)
        payload = {
            "run_id": self.run_id,
            "host": self.host,
            "workload": self.workload,
            "repetition": self.repetition,
            "total_repetitions": self.total_repetitions,
            "status": "running",
            "type": "log",
            "level": record.levelname,
            "message": msg,
            "timestamp": record.created,
            "logger": record.name,
        }
        # Use direct print to stdout to ensure Ansible captures it
        # We must flush to ensure real-time streaming
        print(f"LB_EVENT {json.dumps(payload)}", file=sys.stdout, flush=True)
    except Exception:
        self.handleError(record)

LocalRunner

LocalRunner(config, registry, progress_callback=None, host_name=None, stop_token=None, collector_registry=None)

Local agent for executing benchmarks on a single node.

Initialize the local runner.

Parameters:

Name Type Description Default
config BenchmarkConfig

Benchmark configuration

required
Source code in lb_runner/engine/runner.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(
    self,
    config: BenchmarkConfig,
    registry: PluginRegistry | RunnerRegistry,
    progress_callback: Optional[Callable[[RunEvent], None]] = None,
    host_name: str | None = None,
    stop_token: StopToken | None = None,
    collector_registry: CollectorRegistry | None = None,
):
    """
    Initialize the local runner.

    Args:
        config: Benchmark configuration
    """
    self.config = config
    self.test_results: List[Dict[str, Any]] = []
    self.plugin_registry = self._resolve_registry(registry, collector_registry)
    workloads = getattr(self.config, "workloads", {})
    repetitions = getattr(self.config, "repetitions", 1)
    plugin_settings = getattr(self.config, "plugin_settings", {})
    self._planner = RunPlanner(
        workloads=workloads,
        repetitions=repetitions,
        logger=logger,
        plugin_settings=plugin_settings,
    )
    self._scope_manager = RunScopeManager(self.config, logger)
    self._result_persister = ResultPersister()
    self._current_run_id: Optional[str] = None
    self._host_name = (
        host_name or os.environ.get("LB_RUN_HOST") or platform.node() or "localhost"
    )
    self._progress = RunProgressEmitter(
        host=self._host_name, callback=progress_callback
    )
    self._stop_token = stop_token
    self._output_manager = RunnerOutputManager(
        config=self.config,
        persister=self._result_persister,
        logger=logger,
    )
    self._log_manager = RunnerLogManager(
        config=self.config,
        host_name=self._host_name,
        logger=logger,
    )
    self._metric_manager = MetricManager(
        registry=self.plugin_registry,
        output_manager=self._output_manager,
        host_name=self._host_name,
    )
Attributes
config instance-attribute
config = config
plugin_registry instance-attribute
plugin_registry = _resolve_registry(registry, collector_registry)
system_info property
system_info
test_results instance-attribute
test_results = []
Functions
collect_system_info
collect_system_info()

Collect detailed information about the system.

Returns:

Type Description
Dict[str, Any]

Dictionary containing system information

Source code in lb_runner/engine/runner.py
110
111
112
113
114
115
116
117
def collect_system_info(self) -> Dict[str, Any]:
    """
    Collect detailed information about the system.

    Returns:
        Dictionary containing system information
    """
    return self._metric_manager.collect_system_info()
run_all_benchmarks
run_all_benchmarks()

Run all configured benchmark tests.

Source code in lb_runner/engine/runner.py
306
307
308
309
310
311
312
313
314
315
316
def run_all_benchmarks(self) -> None:
    """Run all configured benchmark tests."""
    run_id = self._planner.generate_run_id()
    for test_name, workload in self.config.workloads.items():
        if not workload.enabled:
            logger.info("Skipping disabled workload: %s", test_name)
            continue
        try:
            self.run_benchmark(test_name, run_id=run_id)
        except Exception:
            logger.exception("Failed to run %s benchmark", test_name)
run_benchmark
run_benchmark(test_type, repetition_override=None, total_repetitions=None, run_id=None, pending_reps=None)

Run a complete benchmark test.

Parameters:

Name Type Description Default
test_type str

Name of the workload to run (plugin id)

required
repetition_override int | None

When set, run only this repetition index.

None
total_repetitions int | None

Total repetitions planned (for display purposes).

None
Source code in lb_runner/engine/runner.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def run_benchmark(
    self,
    test_type: str,
    repetition_override: int | None = None,
    total_repetitions: int | None = None,
    run_id: str | None = None,
    pending_reps: List[int] | None = None,
) -> bool:
    """
    Run a complete benchmark test.

    Args:
        test_type: Name of the workload to run (plugin id)
        repetition_override: When set, run only this repetition index.
        total_repetitions: Total repetitions planned (for display purposes).
    """
    with stop_context(self._stop_token):
        total_reps = total_repetitions or self.config.repetitions
        reps = self._planner.select_repetitions(repetition_override, pending_reps)
        first_rep = reps[0] if reps else 1

        self._prepare_run_scope(
            run_id,
            workload=test_type,
            repetition=first_rep,
            phase="setup",
        )
        logger.info(f"Starting benchmark: {test_type}")

        if self.config.collect_system_info and not self.system_info:
            self.collect_system_info()

        workload_cfg = self._planner.resolve_workload(test_type)
        plugin: WorkloadPlugin = self.plugin_registry.get(workload_cfg.plugin)

        success_overall = True
        for idx, rep in enumerate(reps):
            success = self._run_single_repetition(
                test_type=test_type,
                workload_cfg=workload_cfg,
                plugin=plugin,
                repetition=rep,
                total_reps=total_reps,
            )
            success_overall = success_overall and success

            if idx < len(reps) - 1 and self.config.cooldown_seconds > 0:
                logger.info(
                    "Cooldown period: %s seconds", self.config.cooldown_seconds
                )
                time.sleep(self.config.cooldown_seconds)

            if should_stop(self._stop_token):
                break

        logger.info(f"Completed benchmark: {test_type}")
        return success_overall

LokiConfig

Bases: BaseModel

Configuration for Loki log shipping.

Attributes
backoff_base class-attribute instance-attribute
backoff_base = Field(default=0.5, ge=0, description='Base backoff delay in seconds')
backoff_factor class-attribute instance-attribute
backoff_factor = Field(default=2.0, ge=1.0, description='Backoff multiplier')
batch_size class-attribute instance-attribute
batch_size = Field(default=100, gt=0, description='Logs per batch')
enabled class-attribute instance-attribute
enabled = Field(default=False, description='Enable Loki log push')
endpoint class-attribute instance-attribute
endpoint = Field(default='http://localhost:3100', description='Loki base URL or push endpoint')
flush_interval_ms class-attribute instance-attribute
flush_interval_ms = Field(default=1000, gt=0, description='Flush interval in milliseconds')
labels class-attribute instance-attribute
labels = Field(default_factory=dict, description='Static labels sent with Loki logs')
max_queue_size class-attribute instance-attribute
max_queue_size = Field(default=10000, gt=0, description='Max pending logs in queue')
max_retries class-attribute instance-attribute
max_retries = Field(default=3, ge=0, description='Max retries on failure')
timeout_seconds class-attribute instance-attribute
timeout_seconds = Field(default=5.0, gt=0, description='HTTP timeout for Loki push')

MetricCollectorConfig

Bases: BaseModel

Configuration for metric collectors.

Attributes
cli_commands class-attribute instance-attribute
cli_commands = Field(default_factory=lambda: ['sar -u 1 1', 'vmstat 1 1', 'iostat -d 1 1', 'mpstat 1 1', 'pidstat -h 1 1'], description='List of CLI commands to execute for metric collection')
enable_ebpf class-attribute instance-attribute
enable_ebpf = Field(default=False, description='Enable eBPF-based metric collection')
perf_config class-attribute instance-attribute
perf_config = Field(default_factory=PerfConfig, description='Configuration for perf profiling')
psutil_interval class-attribute instance-attribute
psutil_interval = Field(default=1.0, gt=0, description='Interval for psutil collector in seconds')

PerfConfig

Bases: BaseModel

Configuration for perf profiling.

Attributes
cpu class-attribute instance-attribute
cpu = Field(default=None, ge=0, description='CPU to profile')
events class-attribute instance-attribute
events = Field(default_factory=lambda: ['cpu-cycles', 'instructions', 'cache-references', 'cache-misses', 'branches', 'branch-misses'], description='List of perf events to monitor')
interval_ms class-attribute instance-attribute
interval_ms = Field(default=1000, gt=0, description='Sampling interval in milliseconds')
pid class-attribute instance-attribute
pid = Field(default=None, gt=0, description='Process ID to profile')

PlatformConfig

Bases: BaseModel

Platform-level configuration for defaults and plugin enablement.

Attributes
data_export_dir class-attribute instance-attribute
data_export_dir = Field(default=None, description='Default data export directory')
grafana class-attribute instance-attribute
grafana = Field(default=None, description='Optional Grafana defaults for the platform')
loki class-attribute instance-attribute
loki = Field(default=None, description='Optional Loki defaults for the platform')
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='ignore')
output_dir class-attribute instance-attribute
output_dir = Field(default=None, description='Default benchmark output directory')
plugins class-attribute instance-attribute
plugins = Field(default_factory=dict, description='Plugin enable/disable map (missing entries default to enabled)')
report_dir class-attribute instance-attribute
report_dir = Field(default=None, description='Default report directory')
Functions
is_plugin_enabled
is_plugin_enabled(name)

Return True when the plugin is enabled or not explicitly disabled.

Source code in lb_runner/models/config.py
376
377
378
def is_plugin_enabled(self, name: str) -> bool:
    """Return True when the plugin is enabled or not explicitly disabled."""
    return self.plugins.get(name, True)
load classmethod
load(filepath)
Source code in lb_runner/models/config.py
383
384
385
@classmethod
def load(cls, filepath: Path) -> "PlatformConfig":
    return cls.model_validate_json(filepath.read_text())
save
save(filepath)
Source code in lb_runner/models/config.py
380
381
def save(self, filepath: Path) -> None:
    filepath.write_text(self.model_dump_json(indent=2))

RemoteExecutionConfig

Bases: BaseModel

Configuration for remote execution via Ansible.

Attributes
collect_playbook class-attribute instance-attribute
collect_playbook = Field(default=None, description='Path to the Ansible collect playbook')
enabled class-attribute instance-attribute
enabled = Field(default=False, description='Enable remote execution')
inventory_path class-attribute instance-attribute
inventory_path = Field(default=None, description='Path to a custom Ansible inventory file')
lb_workdir class-attribute instance-attribute
lb_workdir = Field(default=DEFAULT_LB_WORKDIR, description='Remote workdir for benchmark install (Ansible-templated)')
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='ignore')
run_collect class-attribute instance-attribute
run_collect = Field(default=True, description='Execute collection playbooks after tests')
run_playbook class-attribute instance-attribute
run_playbook = Field(default=None, description='Path to the Ansible run playbook')
run_setup class-attribute instance-attribute
run_setup = Field(default=True, description='Execute setup playbooks before tests')
run_teardown class-attribute instance-attribute
run_teardown = Field(default=True, description='Execute teardown playbooks after tests')
setup_playbook class-attribute instance-attribute
setup_playbook = Field(default=None, description='Path to the Ansible setup playbook')
teardown_playbook class-attribute instance-attribute
teardown_playbook = Field(default=None, description='Path to the Ansible teardown playbook')
upgrade_pip class-attribute instance-attribute
upgrade_pip = Field(default=False, description='Upgrade pip inside the benchmark virtual environment during setup')
use_container_fallback class-attribute instance-attribute
use_container_fallback = Field(default=False, description='Use container-based fallback for remote execution')

RemoteHostConfig

Bases: BaseModel

Configuration for a remote benchmark host.

Attributes
address class-attribute instance-attribute
address = Field(description='IP address or hostname of the remote host')
become class-attribute instance-attribute
become = Field(default=True, description='Use Ansible become (sudo) for escalated privileges')
become_method class-attribute instance-attribute
become_method = Field(default='sudo', description='Ansible become method')
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='ignore')
name class-attribute instance-attribute
name = Field(description='Unique name for the remote host')
port class-attribute instance-attribute
port = Field(default=22, gt=0, description='SSH port for connection')
user class-attribute instance-attribute
user = Field(default='root', description='SSH user for connection')
vars class-attribute instance-attribute
vars = Field(default_factory=dict, description='Additional Ansible variables for this host')
Functions
ansible_host_line
ansible_host_line()

Render an INI-style inventory line for this host (compat helper).

Source code in lb_runner/models/config.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def ansible_host_line(self) -> str:
    """Render an INI-style inventory line for this host (compat helper)."""
    parts = [
        self.name,
        f"ansible_host={self.address}",
        f"ansible_port={self.port}",
        f"ansible_user={self.user}",
    ]
    if self.become:
        parts.append("ansible_become=true")
    if self.become_method:
        parts.append(f"ansible_become_method={self.become_method}")
    for key, value in self.vars.items():
        val = str(value)
        if " " in val:
            val = f'"{val}"'
        parts.append(f"{key}={val}")
    return " ".join(parts)
validate_name_not_empty
validate_name_not_empty()
Source code in lb_runner/models/config.py
137
138
139
140
141
@model_validator(mode="after")
def validate_name_not_empty(self) -> "RemoteHostConfig":
    if not self.name or not self.name.strip():
        raise ValueError("RemoteHostConfig: 'name' must be non-empty")
    return self

RunEvent dataclass

RunEvent(run_id, host, workload, repetition, total_repetitions, status, message='', timestamp=0.0, type='status', level='INFO', error_type=None, error_context=None)

A structured event emitted during a run.

Attributes
error_context class-attribute instance-attribute
error_context = None
error_type class-attribute instance-attribute
error_type = None
host instance-attribute
host
level class-attribute instance-attribute
level = 'INFO'
message class-attribute instance-attribute
message = ''
repetition instance-attribute
repetition
run_id instance-attribute
run_id
status instance-attribute
status
timestamp class-attribute instance-attribute
timestamp = 0.0
total_repetitions instance-attribute
total_repetitions
type class-attribute instance-attribute
type = 'status'
workload instance-attribute
workload
Functions
to_dict
to_dict()
Source code in lb_runner/models/events.py
27
28
def to_dict(self) -> dict[str, Any]:
    return asdict(self)
to_json
to_json()
Source code in lb_runner/models/events.py
30
31
def to_json(self) -> str:
    return json.dumps(self.to_dict())

RunnerRegistry

RunnerRegistry(workloads, collectors)

Adapter that exposes workload and collector helpers on a single object.

Source code in lb_runner/registry.py
15
16
17
18
19
20
21
def __init__(
    self,
    workloads: PluginRegistry,
    collectors: CollectorRegistry,
) -> None:
    self._workloads = workloads
    self._collectors = collectors
Functions
available
available(load_entrypoints=False)
Source code in lb_runner/registry.py
26
27
def available(self, load_entrypoints: bool = False) -> Dict[str, Any]:
    return self._workloads.available(load_entrypoints=load_entrypoints)
available_collectors
available_collectors(load_entrypoints=False)
Source code in lb_runner/registry.py
37
38
39
40
def available_collectors(
    self, load_entrypoints: bool = False
) -> Dict[str, CollectorPlugin]:
    return self._collectors.available(load_entrypoints=load_entrypoints)
create_collectors
create_collectors(config)
Source code in lb_runner/registry.py
34
35
def create_collectors(self, config: BenchmarkConfig) -> list[Any]:
    return self._collectors.create_collectors(config)
create_generator
create_generator(plugin_name, options=None)
Source code in lb_runner/registry.py
29
30
31
32
def create_generator(
    self, plugin_name: str, options: Dict[str, Any] | None = None
) -> Any:
    return self._workloads.create_generator(plugin_name, options)
get
get(name)
Source code in lb_runner/registry.py
23
24
def get(self, name: str) -> Any:
    return self._workloads.get(name)

StdoutEmitter

Emit progress markers to stdout for parsing in Ansible streams.

Functions
emit
emit(event)
Source code in lb_runner/models/events.py
37
38
def emit(self, event: RunEvent) -> None:
    print(f"LB_EVENT {event.to_json()}", flush=True)

StopToken

StopToken(stop_file=None, enable_signals=True, on_stop=None)

Lightweight cooperative stop controller.

It can be tripped by signals (SIGINT/SIGTERM) or by the presence of a stop file on disk. Consumers should call should_stop() in long-running loops and abort work when True.

Source code in lb_runner/engine/stop_token.py
19
20
21
22
23
24
25
26
27
28
29
30
def __init__(
    self,
    stop_file: Optional[Path] = None,
    enable_signals: bool = True,
    on_stop: Optional[Callable[[], None]] = None,
) -> None:
    self.stop_file = stop_file
    self._on_stop = on_stop
    self._stop_requested = False
    self._prev_handlers: Dict[int, Callable] = {}
    if enable_signals:
        self._install_signal_handlers()
Attributes
stop_file instance-attribute
stop_file = stop_file
Functions
request_stop
request_stop()

Mark the token as stopped and trigger callback once.

Source code in lb_runner/engine/stop_token.py
58
59
60
61
62
63
64
65
66
67
def request_stop(self) -> None:
    """Mark the token as stopped and trigger callback once."""
    if self._stop_requested:
        return
    self._stop_requested = True
    if self._on_stop:
        try:
            self._on_stop()
        except Exception:
            pass
restore
restore()

Restore original signal handlers.

Source code in lb_runner/engine/stop_token.py
83
84
85
86
87
88
89
90
def restore(self) -> None:
    """Restore original signal handlers."""
    for sig, handler in self._prev_handlers.items():
        try:
            signal.signal(sig, handler)  # type: ignore[arg-type]
        except Exception:
            continue
    self._prev_handlers.clear()
should_stop
should_stop()

Return True when stop was requested or the stop file exists.

Source code in lb_runner/engine/stop_token.py
69
70
71
72
73
74
75
76
77
78
79
80
81
def should_stop(self) -> bool:
    """Return True when stop was requested or the stop file exists."""
    if self._stop_requested:
        return True
    if self.stop_file and self.stop_file.exists():
        self._stop_requested = True
        if self._on_stop:
            try:
                self._on_stop()
            except Exception:
                pass
        return True
    return False

WorkloadConfig

Bases: BaseModel

Configuration wrapper for workload plugins.

Attributes
collectors_enabled class-attribute instance-attribute
collectors_enabled = Field(default=True, description='Enable metric collectors for this workload')
enabled class-attribute instance-attribute
enabled = Field(default=True, description='Whether this workload is enabled')
intensity class-attribute instance-attribute
intensity = Field(default='user_defined', description='Pre-defined intensity level (low, medium, high, user_defined)')
model_config class-attribute instance-attribute
model_config = ConfigDict(extra='ignore')
options class-attribute instance-attribute
options = Field(default_factory=dict, description='Plugin-specific options for the workload')
plugin class-attribute instance-attribute
plugin = Field(description='Name of the plugin to use')

Functions

aggregate_cli

aggregate_cli(df)

Aggregate metrics collected by CLICollector.

Parameters:

Name Type Description Default
df DataFrame | None

DataFrame with CLI metrics

required

Returns:

Type Description
Dict[str, float]

Dictionary of aggregated metrics.

Source code in lb_runner/metric_collectors/aggregators.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
def aggregate_cli(df: pd.DataFrame | None) -> Dict[str, float]:
    """
    Aggregate metrics collected by CLICollector.

    Args:
        df: DataFrame with CLI metrics

    Returns:
        Dictionary of aggregated metrics.
    """
    if df is None or df.empty:
        return {}

    summary: Dict[str, float] = {}
    if "r" in df.columns:
        summary["processes_running_avg"] = df["r"].mean()
    if "b" in df.columns:
        summary["processes_blocked_avg"] = df["b"].mean()
    if "si" in df.columns:
        summary["swap_in_kbps_avg"] = df["si"].mean()
    if "so" in df.columns:
        summary["swap_out_kbps_avg"] = df["so"].mean()

    return summary

collect_metrics

collect_metrics(collectors, workload_dir, rep_dir, test_name, repetition, result)
Source code in lb_runner/services/results.py
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def collect_metrics(
    collectors: list[Any],
    workload_dir: Path,
    rep_dir: Path,
    test_name: str,
    repetition: int,
    result: dict[str, Any],
) -> None:
    metric_errors: list[dict[str, Any]] = []
    for collector in collectors:
        name = getattr(collector, "name", "unknown")
        try:
            collector_data = collector.get_data()
            result["metrics"][name] = collector_data
        except Exception as exc:
            logger.exception("Collector %s failed to return metrics", name)
            error = MetricCollectionError(
                "Collector data retrieval failed",
                context={"collector": name, "test_name": test_name},
                cause=exc,
            )
            metric_errors.append(error_to_payload(error))
            continue

        filename = f"{test_name}_rep{repetition}_{name}.csv"
        rep_filepath = rep_dir / filename
        try:
            collector.save_data(rep_filepath)
        except Exception as exc:
            logger.exception("Collector %s failed to save metrics", name)
            error = MetricCollectionError(
                "Collector data persistence failed",
                context={"collector": name, "test_name": test_name},
                cause=exc,
            )
            metric_errors.append(error_to_payload(error))

        get_errors = getattr(collector, "get_errors", None)
        if callable(get_errors):
            errors = get_errors()
            if isinstance(errors, list):
                for err in errors:
                    if isinstance(err, MetricCollectionError):
                        metric_errors.append(error_to_payload(err))

    if metric_errors:
        result["metric_errors"] = metric_errors
        result["success"] = False
        if not result.get("error_type"):
            result.update(
                {
                    "error_type": "MetricCollectionError",
                    "error": "Metric collection reported errors",
                    "error_context": {"errors": metric_errors},
                }
            )

ensure_run_dirs

ensure_run_dirs(config, run_id)

Create required local output directories for a run.

Source code in lb_runner/services/storage.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
def ensure_run_dirs(config: BenchmarkConfig, run_id: str) -> tuple[Path, Path, Path]:
    """Create required local output directories for a run."""
    config.ensure_output_dirs()

    def _scope_with_run_id(base: Path) -> Path:
        """
        Attach run_id unless the path is already scoped.

        Remote runs pass in an output_dir that already contains run_id and host; avoid
        nesting an extra level in that case so collectors and plugins write
        where the controller expects to fetch from.
        """
        host = os.environ.get("LB_RUN_HOST")
        if run_id in base.parts or (host and host in base.parts):
            return base.resolve()
        return (base / run_id).resolve()

    output_root = _scope_with_run_id(config.output_dir)
    report_root = _scope_with_run_id(config.report_dir)
    data_export_root = _scope_with_run_id(config.data_export_dir)

    # Only create output_root; report/export dirs are created on demand by analytics.
    output_root.mkdir(parents=True, exist_ok=True)

    return output_root, data_export_root, report_root

workload_output_dir

workload_output_dir(output_root, workload, ensure=False)

Return the output directory dedicated to a workload inside a run.

Parameters:

Name Type Description Default
output_root Path

Base output directory for the run (already scoped by run_id/host).

required
workload str

Workload/plugin identifier.

required
ensure bool

When True, create the directory.

False
Source code in lb_runner/services/storage.py
41
42
43
44
45
46
47
48
49
50
51
52
53
def workload_output_dir(output_root: Path, workload: str, ensure: bool = False) -> Path:
    """
    Return the output directory dedicated to a workload inside a run.

    Args:
        output_root: Base output directory for the run (already scoped by run_id/host).
        workload: Workload/plugin identifier.
        ensure: When True, create the directory.
    """
    path = output_root / workload
    if ensure:
        path.mkdir(parents=True, exist_ok=True)
    return path

write_outputs

write_outputs(info, json_path, csv_path)

Persist collected info to JSON/CSV if paths are provided.

Source code in lb_runner/services/system_info_io.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def write_outputs(
    info: SystemInfo, json_path: Path | None, csv_path: Path | None
) -> None:
    """Persist collected info to JSON/CSV if paths are provided."""
    if json_path:
        json_path.parent.mkdir(parents=True, exist_ok=True)
        json_path.write_text(json.dumps(info.to_dict(), indent=2), encoding="utf-8")
    if csv_path:
        csv_path.parent.mkdir(parents=True, exist_ok=True)
        with csv_path.open("w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=["category", "name", "value"])
            writer.writeheader()
            for row in info.to_csv_rows():
                writer.writerow(row)

Local runner

LocalRunner

LocalRunner(config, registry, progress_callback=None, host_name=None, stop_token=None, collector_registry=None)

Local agent for executing benchmarks on a single node.

Initialize the local runner.

Parameters:

Name Type Description Default
config BenchmarkConfig

Benchmark configuration

required
Source code in lb_runner/engine/runner.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def __init__(
    self,
    config: BenchmarkConfig,
    registry: PluginRegistry | RunnerRegistry,
    progress_callback: Optional[Callable[[RunEvent], None]] = None,
    host_name: str | None = None,
    stop_token: StopToken | None = None,
    collector_registry: CollectorRegistry | None = None,
):
    """
    Initialize the local runner.

    Args:
        config: Benchmark configuration
    """
    self.config = config
    self.test_results: List[Dict[str, Any]] = []
    self.plugin_registry = self._resolve_registry(registry, collector_registry)
    workloads = getattr(self.config, "workloads", {})
    repetitions = getattr(self.config, "repetitions", 1)
    plugin_settings = getattr(self.config, "plugin_settings", {})
    self._planner = RunPlanner(
        workloads=workloads,
        repetitions=repetitions,
        logger=logger,
        plugin_settings=plugin_settings,
    )
    self._scope_manager = RunScopeManager(self.config, logger)
    self._result_persister = ResultPersister()
    self._current_run_id: Optional[str] = None
    self._host_name = (
        host_name or os.environ.get("LB_RUN_HOST") or platform.node() or "localhost"
    )
    self._progress = RunProgressEmitter(
        host=self._host_name, callback=progress_callback
    )
    self._stop_token = stop_token
    self._output_manager = RunnerOutputManager(
        config=self.config,
        persister=self._result_persister,
        logger=logger,
    )
    self._log_manager = RunnerLogManager(
        config=self.config,
        host_name=self._host_name,
        logger=logger,
    )
    self._metric_manager = MetricManager(
        registry=self.plugin_registry,
        output_manager=self._output_manager,
        host_name=self._host_name,
    )

Attributes

config instance-attribute

config = config

plugin_registry instance-attribute

plugin_registry = _resolve_registry(registry, collector_registry)

system_info property

system_info

test_results instance-attribute

test_results = []

Functions

collect_system_info

collect_system_info()

Collect detailed information about the system.

Returns:

Type Description
Dict[str, Any]

Dictionary containing system information

Source code in lb_runner/engine/runner.py
110
111
112
113
114
115
116
117
def collect_system_info(self) -> Dict[str, Any]:
    """
    Collect detailed information about the system.

    Returns:
        Dictionary containing system information
    """
    return self._metric_manager.collect_system_info()

run_all_benchmarks

run_all_benchmarks()

Run all configured benchmark tests.

Source code in lb_runner/engine/runner.py
306
307
308
309
310
311
312
313
314
315
316
def run_all_benchmarks(self) -> None:
    """Run all configured benchmark tests."""
    run_id = self._planner.generate_run_id()
    for test_name, workload in self.config.workloads.items():
        if not workload.enabled:
            logger.info("Skipping disabled workload: %s", test_name)
            continue
        try:
            self.run_benchmark(test_name, run_id=run_id)
        except Exception:
            logger.exception("Failed to run %s benchmark", test_name)

run_benchmark

run_benchmark(test_type, repetition_override=None, total_repetitions=None, run_id=None, pending_reps=None)

Run a complete benchmark test.

Parameters:

Name Type Description Default
test_type str

Name of the workload to run (plugin id)

required
repetition_override int | None

When set, run only this repetition index.

None
total_repetitions int | None

Total repetitions planned (for display purposes).

None
Source code in lb_runner/engine/runner.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def run_benchmark(
    self,
    test_type: str,
    repetition_override: int | None = None,
    total_repetitions: int | None = None,
    run_id: str | None = None,
    pending_reps: List[int] | None = None,
) -> bool:
    """
    Run a complete benchmark test.

    Args:
        test_type: Name of the workload to run (plugin id)
        repetition_override: When set, run only this repetition index.
        total_repetitions: Total repetitions planned (for display purposes).
    """
    with stop_context(self._stop_token):
        total_reps = total_repetitions or self.config.repetitions
        reps = self._planner.select_repetitions(repetition_override, pending_reps)
        first_rep = reps[0] if reps else 1

        self._prepare_run_scope(
            run_id,
            workload=test_type,
            repetition=first_rep,
            phase="setup",
        )
        logger.info(f"Starting benchmark: {test_type}")

        if self.config.collect_system_info and not self.system_info:
            self.collect_system_info()

        workload_cfg = self._planner.resolve_workload(test_type)
        plugin: WorkloadPlugin = self.plugin_registry.get(workload_cfg.plugin)

        success_overall = True
        for idx, rep in enumerate(reps):
            success = self._run_single_repetition(
                test_type=test_type,
                workload_cfg=workload_cfg,
                plugin=plugin,
                repetition=rep,
                total_reps=total_reps,
            )
            success_overall = success_overall and success

            if idx < len(reps) - 1 and self.config.cooldown_seconds > 0:
                logger.info(
                    "Cooldown period: %s seconds", self.config.cooldown_seconds
                )
                time.sleep(self.config.cooldown_seconds)

            if should_stop(self._stop_token):
                break

        logger.info(f"Completed benchmark: {test_type}")
        return success_overall

Plugin registry

PluginRegistry

PluginRegistry(plugins=None)

In-memory registry for built-in, entry-point, and user plugins.

Source code in lb_plugins/registry.py
23
24
25
26
27
28
29
30
def __init__(self, plugins: Optional[Iterable[Any]] = None):
    self._workloads: Dict[str, IWorkloadPlugin] = {}
    self._pending_entrypoints: Dict[str, Any] = {}
    if plugins:
        for plugin in plugins:
            self.register(plugin)
    self._discover_entrypoint_plugins()
    self._load_user_plugins()

Functions

available

available(load_entrypoints=False)

Return available workload plugins.

When load_entrypoints is True, pending entry-point plugins are resolved and registered; otherwise only already-registered plugins are returned.

Source code in lb_plugins/registry.py
71
72
73
74
75
76
77
78
79
80
def available(self, load_entrypoints: bool = False) -> Dict[str, Any]:
    """
    Return available workload plugins.

    When load_entrypoints is True, pending entry-point plugins are resolved and
    registered; otherwise only already-registered plugins are returned.
    """
    if load_entrypoints:
        self._load_pending_entrypoints()
    return dict(self._workloads)

create_generator

create_generator(plugin_name, options=None)
Source code in lb_plugins/registry.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def create_generator(
    self, plugin_name: str, options: Optional[Dict[str, Any]] = None
) -> BaseGenerator:
    plugin = self.get(plugin_name)

    # New style: we need to handle config instantiation here or in the plugin
    # The interface says `create_generator(config: Any)`.
    # We need to convert dict -> config_obj.

    if options is None:
        options = {}

    # If options is already the config object, pass it
    if isinstance(options, plugin.config_cls):
        return plugin.create_generator(options)

    # Otherwise instantiate from dict
    config_obj = plugin.config_cls(**options)
    return plugin.create_generator(config_obj)

get

get(name)
Source code in lb_plugins/registry.py
44
45
46
47
48
49
def get(self, name: str) -> IWorkloadPlugin:
    if name not in self._workloads and name in self._pending_entrypoints:
        self._load_entrypoint(name)
    if name not in self._workloads:
        raise KeyError(f"Workload Plugin '{name}' not found")
    return self._workloads[name]

register

register(plugin)

Register a new plugin.

Source code in lb_plugins/registry.py
32
33
34
35
36
37
38
39
40
41
42
def register(self, plugin: Any) -> None:
    """Register a new plugin."""
    if isinstance(plugin, IWorkloadPlugin):
        self._workloads[plugin.name] = plugin
    else:
        # Try duck typing for IWorkloadPlugin if strict check fails
        # (e.g. different import paths).
        if hasattr(plugin, "name") and hasattr(plugin, "create_generator"):
            self._workloads[plugin.name] = plugin
        else:
            raise TypeError(f"Unknown plugin type: {type(plugin)}")

Plugin interface

WorkloadPlugin

Bases: ABC

Abstract base class for all workload plugins.

A plugin encapsulates the logic for: 1. Configuration (schema) 2. Execution (Generator creation) 3. Metadata (Name, description) 4. Assets (Ansible playbooks)

Attributes

config_cls abstractmethod property

config_cls

The Pydantic model used for configuration. The app config service uses this to deserialize raw JSON.

description abstractmethod property

description

Human-readable description.

name abstractmethod property

name

Unique identifier for the workload (e.g., 'stress_ng').

Functions

create_generator abstractmethod

create_generator(config)

Create a new instance of the workload generator.

Parameters:

Name Type Description Default
config BasePluginConfig

An instance of self.config_cls

required
Source code in lb_plugins/interface.py
74
75
76
77
78
79
80
81
82
@abstractmethod
def create_generator(self, config: BasePluginConfig) -> Any:
    """
    Create a new instance of the workload generator.

    Args:
        config: An instance of self.config_cls
    """
    pass

export_results_to_csv

export_results_to_csv(results, output_dir, run_id, test_name)

Normalize plugin-specific results into CSV files stored in output_dir.

Default implementation flattens generator_result and metadata into a single CSV. Plugins with richer report formats can override to write multiple CSVs.

Source code in lb_plugins/interface.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
def export_results_to_csv(
    self,
    results: List[Dict[str, Any]],
    output_dir: Path,
    run_id: str,
    test_name: str,
) -> List[Path]:
    """
    Normalize plugin-specific results into CSV files stored in output_dir.

    Default implementation flattens generator_result and metadata into a single CSV.
    Plugins with richer report formats can override to write multiple CSVs.
    """
    rows = [
        row
        for entry in results
        if (row := _build_result_row(entry, run_id, test_name)) is not None
    ]
    if not rows:
        return []

    df = pd.DataFrame(rows)
    output_dir.mkdir(parents=True, exist_ok=True)
    csv_path = output_dir / f"{test_name}_plugin.csv"
    df.to_csv(csv_path, index=False)
    return [csv_path]

get_ansible_collect_post_extravars

get_ansible_collect_post_extravars()

Return extra vars merged into the plugin collect post-playbook run.

Source code in lb_plugins/interface.py
189
190
191
def get_ansible_collect_post_extravars(self) -> Dict[str, Any]:
    """Return extra vars merged into the plugin collect post-playbook run."""
    return {}

get_ansible_collect_post_path

get_ansible_collect_post_path()

Return the path to the Ansible collect post-playbook. Executed after the collect phase for plugin-specific log collection.

Source code in lb_plugins/interface.py
178
179
180
181
182
183
def get_ansible_collect_post_path(self) -> Optional[Path]:
    """
    Return the path to the Ansible collect post-playbook.
    Executed after the collect phase for plugin-specific log collection.
    """
    return None

get_ansible_collect_pre_extravars

get_ansible_collect_pre_extravars()

Return extra vars merged into the plugin collect pre-playbook run.

Source code in lb_plugins/interface.py
185
186
187
def get_ansible_collect_pre_extravars(self) -> Dict[str, Any]:
    """Return extra vars merged into the plugin collect pre-playbook run."""
    return {}

get_ansible_collect_pre_path

get_ansible_collect_pre_path()

Return the path to the Ansible collect pre-playbook. Executed before the collect phase (e.g., to add dynamic hosts to inventory).

Source code in lb_plugins/interface.py
171
172
173
174
175
176
def get_ansible_collect_pre_path(self) -> Optional[Path]:
    """
    Return the path to the Ansible collect pre-playbook.
    Executed before the collect phase (e.g., to add dynamic hosts to inventory).
    """
    return None

get_ansible_setup_extravars

get_ansible_setup_extravars()

Return extra vars merged into the plugin setup playbook run.

Source code in lb_plugins/interface.py
163
164
165
def get_ansible_setup_extravars(self) -> Dict[str, Any]:
    """Return extra vars merged into the plugin setup playbook run."""
    return {}

get_ansible_setup_path

get_ansible_setup_path()

Return the path to the Ansible setup playbook. Executed before the workload runs on remote hosts.

Source code in lb_plugins/interface.py
149
150
151
152
153
154
def get_ansible_setup_path(self) -> Optional[Path]:
    """
    Return the path to the Ansible setup playbook.
    Executed before the workload runs on remote hosts.
    """
    return None

get_ansible_teardown_extravars

get_ansible_teardown_extravars()

Return extra vars merged into the plugin teardown playbook run.

Source code in lb_plugins/interface.py
167
168
169
def get_ansible_teardown_extravars(self) -> Dict[str, Any]:
    """Return extra vars merged into the plugin teardown playbook run."""
    return {}

get_ansible_teardown_path

get_ansible_teardown_path()

Return the path to the Ansible teardown playbook. Executed after the workload runs (even on failure) on remote hosts.

Source code in lb_plugins/interface.py
156
157
158
159
160
161
def get_ansible_teardown_path(self) -> Optional[Path]:
    """
    Return the path to the Ansible teardown playbook.
    Executed after the workload runs (even on failure) on remote hosts.
    """
    return None

get_grafana_assets

get_grafana_assets()

Return Grafana datasources/dashboards provided by this plugin.

Source code in lb_plugins/interface.py
193
194
195
def get_grafana_assets(self) -> GrafanaAssets | None:
    """Return Grafana datasources/dashboards provided by this plugin."""
    return None

get_preset_config

get_preset_config(level)

Return a configuration object for the specified intensity level. If USER_DEFINED or not implemented, return None.

Source code in lb_plugins/interface.py
123
124
125
126
127
128
def get_preset_config(self, level: WorkloadIntensity) -> Optional[BasePluginConfig]:
    """
    Return a configuration object for the specified intensity level.
    If USER_DEFINED or not implemented, return None.
    """
    return None

get_required_apt_packages

get_required_apt_packages()

Return list of APT packages required by this plugin.

Source code in lb_plugins/interface.py
130
131
132
def get_required_apt_packages(self) -> List[str]:
    """Return list of APT packages required by this plugin."""
    return []

get_required_local_tools

get_required_local_tools()

Return list of command-line tools required by this plugin for local execution. Used by lb doctor to verify the local environment.

Source code in lb_plugins/interface.py
142
143
144
145
146
147
def get_required_local_tools(self) -> List[str]:
    """
    Return list of command-line tools required by this plugin for local execution.
    Used by `lb doctor` to verify the local environment.
    """
    return []

get_required_pip_packages

get_required_pip_packages()

Return list of Python packages required by this plugin.

Source code in lb_plugins/interface.py
134
135
136
def get_required_pip_packages(self) -> List[str]:
    """Return list of Python packages required by this plugin."""
    return []

get_required_uv_extras

get_required_uv_extras()

Return list of UV extras required by this plugin.

Source code in lb_plugins/interface.py
138
139
140
def get_required_uv_extras(self) -> List[str]:
    """Return list of UV extras required by this plugin."""
    return []

load_config_from_file

load_config_from_file(config_file_path)

Loads and validates plugin configuration from a YAML file.

The method merges common configuration (from the 'common' section) with plugin-specific configuration (from the 'plugins.' section). Plugin-specific settings override common settings. Finally, the merged configuration is validated against self.config_cls.

Parameters:

Name Type Description Default
config_file_path Path

The path to the YAML configuration file.

required

Returns:

Type Description
BasePluginConfig

An instance of self.config_cls with the loaded and validated

BasePluginConfig

configuration.

Raises:

Type Description
FileNotFoundError

If the config_file_path does not exist.

YAMLError

If the file content is not valid YAML.

ValidationError

If the merged configuration does not conform to self.config_cls schema.

Source code in lb_plugins/interface.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
def load_config_from_file(self, config_file_path: Path) -> BasePluginConfig:
    """
    Loads and validates plugin configuration from a YAML file.

    The method merges common configuration (from the 'common' section)
    with plugin-specific configuration (from the 'plugins.<plugin_name>' section).
    Plugin-specific settings override common settings.
    Finally, the merged configuration is validated against `self.config_cls`.

    Args:
        config_file_path: The path to the YAML configuration file.

    Returns:
        An instance of `self.config_cls` with the loaded and validated
        configuration.

    Raises:
        FileNotFoundError: If the config_file_path does not exist.
        yaml.YAMLError: If the file content is not valid YAML.
        ValidationError: If the merged configuration does not conform to
            `self.config_cls` schema.
    """
    if not config_file_path.exists():
        raise FileNotFoundError(f"Configuration file not found: {config_file_path}")

    with open(config_file_path, "r") as f:
        full_data = yaml.safe_load(f) or {}

    # Extract common and plugin-specific data
    common_data = full_data.get("common", {})
    plugin_data = full_data.get("plugins", {}).get(self.name, {})

    # Merge data: plugin-specific overrides common
    merged_data = {**common_data, **plugin_data}

    # Validate and instantiate the config class using Pydantic
    # Pydantic will handle default values for missing fields
    return self.config_cls(**merged_data)

BasePluginConfig

Bases: BaseModel

Base model for common plugin configuration fields.

Attributes

max_retries class-attribute instance-attribute

max_retries = Field(default=0, ge=0, description='Maximum number of retries for the workload')

model_config class-attribute instance-attribute

model_config = {'extra': 'ignore'}

tags class-attribute instance-attribute

tags = Field(default_factory=list, description='Tags associated with the workload')

timeout_buffer class-attribute instance-attribute

timeout_buffer = Field(default=10, description='Safety buffer in seconds added to expected runtime')

SimpleWorkloadPlugin

Bases: WorkloadPlugin

Lightweight plugin base that relies on class attributes.

Attributes

COLLECT_POST_PLAYBOOK class-attribute instance-attribute

COLLECT_POST_PLAYBOOK = None

COLLECT_PRE_PLAYBOOK class-attribute instance-attribute

COLLECT_PRE_PLAYBOOK = None

CONFIG_CLS class-attribute instance-attribute

CONFIG_CLS = BasePluginConfig

DESCRIPTION class-attribute instance-attribute

DESCRIPTION = ''

GENERATOR_CLS class-attribute instance-attribute

GENERATOR_CLS = None

GRAFANA_ASSETS class-attribute instance-attribute

GRAFANA_ASSETS = None

NAME class-attribute instance-attribute

NAME = ''

REQUIRED_APT_PACKAGES class-attribute instance-attribute

REQUIRED_APT_PACKAGES = []

REQUIRED_LOCAL_TOOLS class-attribute instance-attribute

REQUIRED_LOCAL_TOOLS = []

REQUIRED_PIP_PACKAGES class-attribute instance-attribute

REQUIRED_PIP_PACKAGES = []

REQUIRED_UV_EXTRAS class-attribute instance-attribute

REQUIRED_UV_EXTRAS = []

SETUP_PLAYBOOK class-attribute instance-attribute

SETUP_PLAYBOOK = None

TEARDOWN_PLAYBOOK class-attribute instance-attribute

TEARDOWN_PLAYBOOK = None

config_cls property

config_cls

description property

description

name property

name

Functions

create_generator

create_generator(config)
Source code in lb_plugins/interface.py
278
279
280
281
def create_generator(self, config: BasePluginConfig) -> Any:
    if self.GENERATOR_CLS is None:
        raise NotImplementedError("SimpleWorkloadPlugin.GENERATOR_CLS must be set")
    return self.GENERATOR_CLS(config)

get_ansible_collect_post_path

get_ansible_collect_post_path()
Source code in lb_plugins/interface.py
313
314
315
316
def get_ansible_collect_post_path(self) -> Optional[Path]:
    if self.COLLECT_POST_PLAYBOOK and self.COLLECT_POST_PLAYBOOK.exists():
        return self.COLLECT_POST_PLAYBOOK
    return None

get_ansible_collect_pre_path

get_ansible_collect_pre_path()
Source code in lb_plugins/interface.py
308
309
310
311
def get_ansible_collect_pre_path(self) -> Optional[Path]:
    if self.COLLECT_PRE_PLAYBOOK and self.COLLECT_PRE_PLAYBOOK.exists():
        return self.COLLECT_PRE_PLAYBOOK
    return None

get_ansible_setup_path

get_ansible_setup_path()
Source code in lb_plugins/interface.py
298
299
300
301
def get_ansible_setup_path(self) -> Optional[Path]:
    if self.SETUP_PLAYBOOK and self.SETUP_PLAYBOOK.exists():
        return self.SETUP_PLAYBOOK
    return None

get_ansible_teardown_path

get_ansible_teardown_path()
Source code in lb_plugins/interface.py
303
304
305
306
def get_ansible_teardown_path(self) -> Optional[Path]:
    if self.TEARDOWN_PLAYBOOK and self.TEARDOWN_PLAYBOOK.exists():
        return self.TEARDOWN_PLAYBOOK
    return None

get_grafana_assets

get_grafana_assets()
Source code in lb_plugins/interface.py
283
284
def get_grafana_assets(self) -> GrafanaAssets | None:
    return self.GRAFANA_ASSETS

get_required_apt_packages

get_required_apt_packages()
Source code in lb_plugins/interface.py
286
287
def get_required_apt_packages(self) -> List[str]:
    return list(self.REQUIRED_APT_PACKAGES)

get_required_local_tools

get_required_local_tools()
Source code in lb_plugins/interface.py
295
296
def get_required_local_tools(self) -> List[str]:
    return list(self.REQUIRED_LOCAL_TOOLS)

get_required_pip_packages

get_required_pip_packages()
Source code in lb_plugins/interface.py
289
290
def get_required_pip_packages(self) -> List[str]:
    return list(self.REQUIRED_PIP_PACKAGES)

get_required_uv_extras

get_required_uv_extras()
Source code in lb_plugins/interface.py
292
293
def get_required_uv_extras(self) -> List[str]:
    return list(self.REQUIRED_UV_EXTRAS)

WorkloadIntensity

Bases: str, Enum

Attributes

HIGH class-attribute instance-attribute

HIGH = 'high'

LOW class-attribute instance-attribute

LOW = 'low'

MEDIUM class-attribute instance-attribute

MEDIUM = 'medium'

USER_DEFINED class-attribute instance-attribute

USER_DEFINED = 'user_defined'

Plugin assets

PluginAssetConfig

Bases: BaseModel

Ansible assets and extravars resolved from a workload plugin.

Attributes

collect_post_extravars class-attribute instance-attribute

collect_post_extravars = Field(default_factory=dict, description='Collect post extravars')

collect_post_playbook class-attribute instance-attribute

collect_post_playbook = Field(default=None, description='Plugin collect post-playbook path')

collect_pre_extravars class-attribute instance-attribute

collect_pre_extravars = Field(default_factory=dict, description='Collect pre extravars')

collect_pre_playbook class-attribute instance-attribute

collect_pre_playbook = Field(default=None, description='Plugin collect pre-playbook path')

required_uv_extras class-attribute instance-attribute

required_uv_extras = Field(default_factory=list, description='UV extras required by plugin runtime')

setup_extravars class-attribute instance-attribute

setup_extravars = Field(default_factory=dict, description='Setup extravars')

setup_playbook class-attribute instance-attribute

setup_playbook = Field(default=None, description='Plugin setup playbook path')

teardown_extravars class-attribute instance-attribute

teardown_extravars = Field(default_factory=dict, description='Teardown extravars')

teardown_playbook class-attribute instance-attribute

teardown_playbook = Field(default=None, description='Plugin teardown playbook path')

Configuration models

BenchmarkConfig

Bases: BaseModel

Main configuration for benchmark tests.

Attributes

collect_system_info class-attribute instance-attribute

collect_system_info = Field(default=True, description='Collect system information before running benchmarks')

collectors class-attribute instance-attribute

collectors = Field(default_factory=MetricCollectorConfig, description='Configuration for metric collectors')

cooldown_seconds class-attribute instance-attribute

cooldown_seconds = Field(default=5, ge=0, description='Cooldown period after test finishes')

data_export_dir class-attribute instance-attribute

data_export_dir = Field(default=Path('./data_exports'), description='Directory for raw data exports')

influxdb_bucket class-attribute instance-attribute

influxdb_bucket = Field(default='performance', description='InfluxDB Bucket')

influxdb_enabled class-attribute instance-attribute

influxdb_enabled = Field(default=False, description='Enable InfluxDB integration')

influxdb_org class-attribute instance-attribute

influxdb_org = Field(default='benchmark', description='InfluxDB Organization')

influxdb_token class-attribute instance-attribute

influxdb_token = Field(default='', description='InfluxDB API Token')

influxdb_url class-attribute instance-attribute

influxdb_url = Field(default='http://localhost:8086', description='InfluxDB URL')

loki class-attribute instance-attribute

loki = Field(default_factory=LokiConfig, description='Loki log shipping configuration')

metrics_interval_seconds class-attribute instance-attribute

metrics_interval_seconds = Field(default=1.0, gt=0, description='Interval for metric collection in seconds')

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='ignore')

output_dir class-attribute instance-attribute

output_dir = Field(default=Path('./benchmark_results'), description='Root directory for all benchmark output')

plugin_assets class-attribute instance-attribute

plugin_assets = Field(default_factory=dict, description='Resolved plugin Ansible assets/extravars (from runner registry)')

plugin_settings class-attribute instance-attribute

plugin_settings = Field(default_factory=dict, description='Dictionary of plugin-specific Pydantic config models')

remote_execution class-attribute instance-attribute

remote_execution = Field(default_factory=RemoteExecutionConfig, description='Configuration for remote execution')

remote_hosts class-attribute instance-attribute

remote_hosts = Field(default_factory=list, description='List of remote hosts for benchmarking')

repetitions class-attribute instance-attribute

repetitions = Field(default=3, gt=0, description='Number of repetitions for each test')

report_dir class-attribute instance-attribute

report_dir = Field(default=Path('./reports'), description='Directory for generated reports')

test_duration_seconds class-attribute instance-attribute

test_duration_seconds = Field(default=3600, gt=0, description='Default duration for tests in seconds')

warmup_seconds class-attribute instance-attribute

warmup_seconds = Field(default=5, ge=0, description='Warmup period before metric collection starts')

workloads class-attribute instance-attribute

workloads = Field(default_factory=dict, description='Dictionary of workload definitions')

Functions

ensure_output_dirs

ensure_output_dirs()

Ensures all configured output directories exist.

Source code in lb_runner/models/config.py
311
312
313
314
def ensure_output_dirs(self) -> None:
    """Ensures all configured output directories exist."""
    for path in (self.output_dir, self.report_dir, self.data_export_dir):
        path.mkdir(parents=True, exist_ok=True)

from_dict classmethod

from_dict(data)
Source code in lb_runner/models/config.py
334
335
336
337
338
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "BenchmarkConfig":
    # Use Pydantic's built-in dictionary parsing and validation
    # Pydantic will handle nested models and Path conversions automatically
    return cls.model_validate(data)

from_json classmethod

from_json(json_str)
Source code in lb_runner/models/config.py
329
330
331
332
@classmethod
def from_json(cls, json_str: str) -> "BenchmarkConfig":
    # Use Pydantic's built-in JSON parsing and validation
    return cls.model_validate_json(json_str)

load classmethod

load(filepath)
Source code in lb_runner/models/config.py
343
344
345
@classmethod
def load(cls, filepath: Path) -> "BenchmarkConfig":
    return cls.model_validate_json(filepath.read_text())

save

save(filepath)
Source code in lb_runner/models/config.py
340
341
def save(self, filepath: Path) -> None:
    filepath.write_text(self.model_dump_json(indent=2))

WorkloadConfig

Bases: BaseModel

Configuration wrapper for workload plugins.

Attributes

collectors_enabled class-attribute instance-attribute

collectors_enabled = Field(default=True, description='Enable metric collectors for this workload')

enabled class-attribute instance-attribute

enabled = Field(default=True, description='Whether this workload is enabled')

intensity class-attribute instance-attribute

intensity = Field(default='user_defined', description='Pre-defined intensity level (low, medium, high, user_defined)')

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='ignore')

options class-attribute instance-attribute

options = Field(default_factory=dict, description='Plugin-specific options for the workload')

plugin class-attribute instance-attribute

plugin = Field(description='Name of the plugin to use')

RemoteExecutionConfig

Bases: BaseModel

Configuration for remote execution via Ansible.

Attributes

collect_playbook class-attribute instance-attribute

collect_playbook = Field(default=None, description='Path to the Ansible collect playbook')

enabled class-attribute instance-attribute

enabled = Field(default=False, description='Enable remote execution')

inventory_path class-attribute instance-attribute

inventory_path = Field(default=None, description='Path to a custom Ansible inventory file')

lb_workdir class-attribute instance-attribute

lb_workdir = Field(default=DEFAULT_LB_WORKDIR, description='Remote workdir for benchmark install (Ansible-templated)')

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='ignore')

run_collect class-attribute instance-attribute

run_collect = Field(default=True, description='Execute collection playbooks after tests')

run_playbook class-attribute instance-attribute

run_playbook = Field(default=None, description='Path to the Ansible run playbook')

run_setup class-attribute instance-attribute

run_setup = Field(default=True, description='Execute setup playbooks before tests')

run_teardown class-attribute instance-attribute

run_teardown = Field(default=True, description='Execute teardown playbooks after tests')

setup_playbook class-attribute instance-attribute

setup_playbook = Field(default=None, description='Path to the Ansible setup playbook')

teardown_playbook class-attribute instance-attribute

teardown_playbook = Field(default=None, description='Path to the Ansible teardown playbook')

upgrade_pip class-attribute instance-attribute

upgrade_pip = Field(default=False, description='Upgrade pip inside the benchmark virtual environment during setup')

use_container_fallback class-attribute instance-attribute

use_container_fallback = Field(default=False, description='Use container-based fallback for remote execution')

RemoteHostConfig

Bases: BaseModel

Configuration for a remote benchmark host.

Attributes

address class-attribute instance-attribute

address = Field(description='IP address or hostname of the remote host')

become class-attribute instance-attribute

become = Field(default=True, description='Use Ansible become (sudo) for escalated privileges')

become_method class-attribute instance-attribute

become_method = Field(default='sudo', description='Ansible become method')

model_config class-attribute instance-attribute

model_config = ConfigDict(extra='ignore')

name class-attribute instance-attribute

name = Field(description='Unique name for the remote host')

port class-attribute instance-attribute

port = Field(default=22, gt=0, description='SSH port for connection')

user class-attribute instance-attribute

user = Field(default='root', description='SSH user for connection')

vars class-attribute instance-attribute

vars = Field(default_factory=dict, description='Additional Ansible variables for this host')

Functions

ansible_host_line

ansible_host_line()

Render an INI-style inventory line for this host (compat helper).

Source code in lb_runner/models/config.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def ansible_host_line(self) -> str:
    """Render an INI-style inventory line for this host (compat helper)."""
    parts = [
        self.name,
        f"ansible_host={self.address}",
        f"ansible_port={self.port}",
        f"ansible_user={self.user}",
    ]
    if self.become:
        parts.append("ansible_become=true")
    if self.become_method:
        parts.append(f"ansible_become_method={self.become_method}")
    for key, value in self.vars.items():
        val = str(value)
        if " " in val:
            val = f'"{val}"'
        parts.append(f"{key}={val}")
    return " ".join(parts)

validate_name_not_empty

validate_name_not_empty()
Source code in lb_runner/models/config.py
137
138
139
140
141
@model_validator(mode="after")
def validate_name_not_empty(self) -> "RemoteHostConfig":
    if not self.name or not self.name.strip():
        raise ValueError("RemoteHostConfig: 'name' must be non-empty")
    return self