API Reference
PhysLink v0.1 — tous les symboles publics sont importables directement depuis physlink.
from physlink import (
DreamerV3Adapter,
ObservationSpace,
ActionSpace,
ComplianceReport,
register_invariant,
doctor,
PhysLinkError,
)
Adapter
The main adapter class. Wraps your simulator environment and drives the DreamerV3 training loop.
DreamerV3Adapter
Bases: BaseAdapter
DreamerV3 adapter for physical simulation reinforcement learning.
Validates space compatibility at construction time. Training, visualization, and export are deferred to fit() / visualize() / export() respectively. No model weights are loaded and no GPU is required at construction.
| PARAMETER | DESCRIPTION |
|---|---|
obs_space
|
Observation space with dims >= 4.
TYPE:
|
act_space
|
Action space with dims >= 1.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ConfigurationError
|
If obs_space.dims < 4 or act_space.dims < 1. |
Example
from physlink import DreamerV3Adapter, ObservationSpace, ActionSpace obs = ObservationSpace.from_proprioception(joints=7, include_velocity=True) act = ActionSpace.continuous(dims=7, bounds=[(-1.0, 1.0)] * 7) adapter = DreamerV3Adapter(obs, act) adapter.obs_space.dims 14
compliance_report() -> ComplianceReport
Return a ComplianceReport summarizing invariant compliance from the last fit().
Reads _invariants and _invariant_residuals stored on the adapter.
Pure computation — no side effects, safe to call multiple times.
| RETURNS | DESCRIPTION |
|---|---|
ComplianceReport
|
ComplianceReport with per-invariant summary and violation details. |
ComplianceReport
|
Empty report (no entries) if no invariants are registered. |
ComplianceReport
|
Zero-trajectory report if fit() has not yet been called. |
Example
register_invariant(adapter, "mass", fn, tolerance=0.01) adapter.fit(trajectories, steps=100) report = adapter.compliance_report() print(report.summary()) mass: PASS (max_residual=0.0042, threshold=0.0100, violations=0/10)
load_checkpoint(path: str) -> None
Load model weights from a safetensors checkpoint.
Reads checkpoint metadata before loading weights for early detection of version incompatibility or file corruption.
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Path to the .safetensors checkpoint file to load.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
CheckpointCorruptError
|
If the file is malformed, unreadable, or missing required metadata. |
CheckpointVersionError
|
If physlink_version in the checkpoint metadata is incompatible with the installed version (different major.minor component). |
Example
adapter = DreamerV3Adapter(obs, act) adapter.load_checkpoint("./physlink_checkpoints/checkpoint_step_1000.safetensors")
fit(trajectories: list[dict[str, Any]] | TrajectoryBatch | TrajectoryBuffer, steps: int, checkpoint_interval_steps: int = 1000, debug_hooks: bool = False, checkpoint_dir: str = 'physlink_checkpoints') -> AdaptationRun
Run the DreamerV3 adaptation loop with a live progress bar.
Adapts the DreamerV3 world model to the provided trajectory data over
steps gradient updates. Displays a rich progress bar in Colab output
with step count, ETA, prediction health (OK/ANOMALY), and throughput.
Calling fit() multiple times is safe: each call resets optimizer state and training history for a fresh run (NFR-09 idempotence).
| PARAMETER | DESCRIPTION |
|---|---|
trajectories
|
Trajectory dataset.
TYPE:
|
steps
|
Total gradient steps to run. Must be > 0.
TYPE:
|
checkpoint_interval_steps
|
Interval (in steps) between checkpoint saves. A checkpoint file is written every this many steps. Must be > 0.
TYPE:
|
debug_hooks
|
When True, displays a debug panel alongside the progress bar showing pipeline stage statuses (data_loading, world_model_update, actor_update, critic_update). Each stage shows OK or a diagnostic status. Defaults to False (opt-in, not default).
TYPE:
|
checkpoint_dir
|
Directory where checkpoint files are written. Defaults to "physlink_checkpoints" relative to the current working directory.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
AdaptationRun
|
AdaptationRun capturing config, step count, checkpoint paths, and elapsed time. |
| RAISES | DESCRIPTION |
|---|---|
ValidationError
|
If steps <= 0 or checkpoint_interval_steps <= 0. |
Example
from physlink import DreamerV3Adapter, ObservationSpace, ActionSpace obs = ObservationSpace.from_proprioception(joints=7) act = ActionSpace.continuous(dims=7, bounds=[(-1.0, 1.0)] * 7) adapter = DreamerV3Adapter(obs, act) trajectories = [{"obs": [0.1] * 7, "action": [0.0] * 7}] * 100 run = adapter.fit(trajectories, steps=10, debug_hooks=True)
explain() -> dict[str, Any]
Return a metadata dict describing this adapter's space configuration.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON-serializable dict with keys: type, obs_space, act_space. |
Example
adapter = DreamerV3Adapter(obs, act) info = adapter.explain() info["type"] 'DreamerV3Adapter'
visualize(trajectories: list[dict[str, Any]] | TrajectoryBatch | TrajectoryBuffer, output_path: str = 'physlink_triptych.gif') -> str
Produce a triptych GIF comparing Imagination, Real, and Difference panels.
Runs a single inference pass through the trained world model to produce reconstructed (Imagination) observations, then renders them alongside the real observations and the absolute difference as a 3-panel GIF.
Prints a "Friday afternoon window" callout comparing elapsed adaptation time to the documented from-scratch baseline.
| PARAMETER | DESCRIPTION |
|---|---|
trajectories
|
Trajectory dataset to visualize. Uses the first trajectory
for the panel rendering.
TYPE:
|
output_path
|
File path for the output GIF. Defaults to "physlink_triptych.gif" in the current working directory.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
str
|
Absolute path to the saved GIF file. |
| RAISES | DESCRIPTION |
|---|---|
AdapterError
|
If the model has not been initialized via fit() or load_checkpoint(). |
Example
adapter = DreamerV3Adapter(obs, act) adapter.fit(trajectories, steps=1000) path = adapter.visualize(trajectories) print(path) # absolute path to physlink_triptych.gif
export(path: str) -> dict[str, str]
Export a complete artifact bundle to the specified directory.
Copies the triptych GIF, writes a YAML configuration file, and writes a human-readable summary. Calls the share panel to copy the Colab notebook URL to the clipboard (Colab only; graceful fallback elsewhere).
| PARAMETER | DESCRIPTION |
|---|---|
path
|
Directory path for the exported artifacts. Created if it does not exist. Existing files in the directory are overwritten.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
dict[str, str]
|
dict with keys |
dict[str, str]
|
absolute paths of the respective exported files. |
| RAISES | DESCRIPTION |
|---|---|
AdapterError
|
If |
Example
adapter.fit(trajectories, steps=1000) adapter.visualize(trajectories) artifacts = adapter.export("./physlink_export") artifacts["config"] # absolute path to config.yaml '/abs/path/physlink_export/config.yaml'
Spaces
Space descriptors passed to DreamerV3Adapter. They validate dimensions and dtypes at construction time — no silent mismatches at runtime.
ObservationSpace
Proprioceptive observation space with immediate dimension validation.
Constructed exclusively via the factory classmethod. Validates inputs at creation time so configuration errors surface before any training.
| PARAMETER | DESCRIPTION |
|---|---|
dims
|
Total observation dimension count (joints or joints*2 with velocity).
TYPE:
|
include_velocity
|
Whether joint velocities are included in observations.
TYPE:
|
_joints
|
Raw joint count passed to from_proprioception.
TYPE:
|
clip_bounds
|
Optional (min, max) clipping range applied to observations.
TYPE:
|
normalize
|
Whether observations are normalized to [0, 1] before passing to the model.
TYPE:
|
Example
obs_space = ObservationSpace.from_proprioception(joints=7, include_velocity=True) obs_space.dims 14
from_proprioception(joints: int, include_velocity: bool = False, clip_bounds: tuple[float, float] | None = None, normalize: bool = False) -> ObservationSpace
classmethod
Construct an ObservationSpace for proprioceptive (joint-based) observations.
| PARAMETER | DESCRIPTION |
|---|---|
joints
|
Number of robot joints. Must be a positive integer >= 1.
TYPE:
|
include_velocity
|
If True, each joint contributes position + velocity to the observation, doubling the dimension count.
TYPE:
|
clip_bounds
|
Optional (min, max) clipping range for raw observations. None means no clipping applied.
TYPE:
|
normalize
|
If True, observations will be normalized before passing to the model. Default False.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ObservationSpace
|
An ObservationSpace with dims = joints (or joints * 2 if include_velocity). |
| RAISES | DESCRIPTION |
|---|---|
ValidationError
|
If joints is not a positive integer (wrong type or value <= 0). |
Example
obs_space = ObservationSpace.from_proprioception(joints=7, include_velocity=True) obs_space.dims 14
explain() -> dict[str, Any]
Return a metadata dict describing this observation space configuration.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON-serializable dict with keys: type: Class name ("ObservationSpace"). dims: Total observation dimension count. joints: Raw joint count passed to from_proprioception. include_velocity: Whether joint velocities are included. clip_bounds: [min, max] list if set, else None. normalize: Whether normalization is applied. |
Example
obs_space = ObservationSpace.from_proprioception(joints=7, include_velocity=True) info = obs_space.explain() info["dims"] 14 info["include_velocity"] True
ActionSpace
Continuous action space with per-dimension bounds and immediate validation.
Constructed exclusively via the factory classmethod. Validates dims/bounds consistency at creation time so configuration errors surface before training.
| PARAMETER | DESCRIPTION |
|---|---|
dims
|
Number of action dimensions.
TYPE:
|
bounds
|
Per-dimension (min, max) clipping bounds.
TYPE:
|
Example
act_space = ActionSpace.continuous(dims=7, bounds=[(-1.0, 1.0)] * 7) act_space.dims 7
continuous(dims: int, bounds: list[tuple[float, float]]) -> ActionSpace
classmethod
Construct a continuous ActionSpace with per-dimension bounds.
| PARAMETER | DESCRIPTION |
|---|---|
dims
|
Number of action dimensions. Must be a positive integer >= 1.
TYPE:
|
bounds
|
List of (min, max) tuples, one per dimension. Each element must satisfy min <= max.
TYPE:
|
| RETURNS | DESCRIPTION |
|---|---|
ActionSpace
|
An ActionSpace with the specified dimensions and bounds. |
| RAISES | DESCRIPTION |
|---|---|
ValidationError
|
If dims is not a positive integer, bounds is not a list, bounds length does not match dims, or any bound has min > max. |
Example
act_space = ActionSpace.continuous(dims=7, bounds=[(-1.0, 1.0)] * 7) act_space.dims 7
explain() -> dict[str, Any]
Return a metadata dict describing this action space configuration.
| RETURNS | DESCRIPTION |
|---|---|
dict[str, Any]
|
A JSON-serializable dict with keys: type: Class name ("ActionSpace"). dims: Number of action dimensions. bounds: Per-dimension [[min, max], ...] clipping bounds as lists. clipping_behavior: Description of how clipping is applied. |
Example
act_space = ActionSpace.continuous(dims=7, bounds=[(-1.0, 1.0)] * 7) info = act_space.explain() info["dims"] 7 len(info["bounds"]) 7
Compliance
Register physical invariants (energy conservation, mass conservation, joint limits, …) and inspect violations after training.
register_invariant(adapter: _HasInvariants, name: str, fn: Callable[[dict[str, Any]], float], tolerance: float, mode: Literal['hard', 'soft'] = 'soft') -> None
Attach a physical invariant check to an adapter via a plain Python callable.
No subclassing, decorators, or inheritance is required. The function is
validated immediately at registration time — errors are never deferred to
fit().
| PARAMETER | DESCRIPTION |
|---|---|
adapter
|
The adapter instance to attach the invariant to. Must expose
a
TYPE:
|
name
|
Human-readable identifier for this invariant. Used in diagnostic
messages and
TYPE:
|
fn
|
A callable with signature
TYPE:
|
tolerance
|
Maximum acceptable residual. Violations occur when
TYPE:
|
mode
|
How violations are handled during
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ValidationError
|
If |
ConfigurationError
|
If |
Example
def mass_conservation(trajectory: dict) -> float: ... return abs(trajectory.get("mass_in", 0.0) - trajectory.get("mass_out", 0.0)) register_invariant(adapter, name="mass_conservation", ... fn=mass_conservation, tolerance=0.01, mode="hard")
ComplianceReport
Pure data object summarizing invariant compliance across adaptation trajectories.
Constructed by DreamerV3Adapter.compliance_report() after fit() is called.
All methods are deterministic and side-effect-free (NFR-13).
| PARAMETER | DESCRIPTION |
|---|---|
_stats
|
Per-invariant summary dicts with keys:
TYPE:
|
_violation_list
|
Per-violation dicts with keys:
TYPE:
|
_residuals_by_invariant
|
Mapping from invariant name to the full list of
residuals (one per trajectory). Used by
TYPE:
|
Example
report = adapter.compliance_report() print(report.summary()) mass_conservation: PASS (max_residual=0.0042, threshold=0.0100, violations=0/50) violations = report.violations() violations # empty list when no violations []
summary() -> str
Return a human-readable compliance summary string.
One line per invariant in format:
"name: PASS (max_residual=X.XXXX, threshold=Y.YYYY, violations=Z/N)"
| RETURNS | DESCRIPTION |
|---|---|
str
|
Formatted summary string. Empty string if no invariants registered. |
str
|
Multiple invariants produce one line each, joined with newline. |
Example
report = ComplianceReport( ... _stats=[{"name": "mass", "max_residual": 0.0, "threshold": 0.01, ... "violation_count": 0, "total": 5}], ... _violation_list=[], ... ) report.summary() 'mass: PASS (max_residual=0.0000, threshold=0.0100, violations=0/5)'
violations() -> list[dict[str, Any]]
Return a list of all invariant violations detected during fit().
| RETURNS | DESCRIPTION |
|---|---|
list[dict[str, Any]]
|
List of violation dicts, each containing: |
list[dict[str, Any]]
|
|
list[dict[str, Any]]
|
|
list[dict[str, Any]]
|
|
list[dict[str, Any]]
|
|
list[dict[str, Any]]
|
Sorted by |
list[dict[str, Any]]
|
Empty list when no violations occurred. |
Example
report = ComplianceReport(_stats=[], _violation_list=[]) report.violations() []
plot(title: str = '', show_threshold: bool = True) -> None
Render a matplotlib histogram of invariant residuals inline.
Imports matplotlib lazily — avoids ImportError in headless environments for users who have not called plot() explicitly.
Each invariant gets its own subplot. If show_threshold=True, a red
dashed vertical line is drawn at the tolerance threshold. Deterministic:
same data produces the same plot (NFR-13).
| PARAMETER | DESCRIPTION |
|---|---|
title
|
Overall figure title. Defaults to empty string (no title).
TYPE:
|
show_threshold
|
If True, draw a labeled vertical threshold line on each subplot. Defaults to True.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
ImportError
|
If matplotlib is not installed. |
Example
report = adapter.compliance_report() report.plot(title="Mass Conservation Check", show_threshold=True)
export(path: str) -> None
Write a JSON compliance report to disk.
Produces a list of per-invariant dicts containing summary statistics and
full violation details. The output is parseable by json.load() with
no custom decoder needed (all values are JSON-native types).
| PARAMETER | DESCRIPTION |
|---|---|
path
|
File path for the output JSON file. Parent directory must exist.
TYPE:
|
| RAISES | DESCRIPTION |
|---|---|
OSError
|
If the file cannot be written (permission error, missing parent directory, disk full, etc.). |
Example
report = adapter.compliance_report() report.export("./compliance_report.json") import json with open("./compliance_report.json") as f: ... data = json.load(f) data[0]["status"] # "PASS" or "FAIL" 'PASS'
Diagnostics
doctor() -> DiagnosticReport
Run a Go/No-Go environment diagnostic and print a structured report.
Checks Python version, PyTorch presence, CUDA availability, VRAM, and Colab session — all without requiring torch at module import time.
| RETURNS | DESCRIPTION |
|---|---|
DiagnosticReport
|
DiagnosticReport with ordered checks, verdict ("GO" or "NO-GO"), |
DiagnosticReport
|
and elapsed_seconds. |
Example
report = physlink.doctor() report.verdict 'GO' report.checks[0].status 'OK'
Exceptions
PhysLinkError
Bases: Exception
Base exception for all PhysLink errors.
All PhysLink exceptions inherit from this class. Catch PhysLinkError to handle any PhysLink-specific error at the coarsest granularity.
Example
try: ... raise PhysLinkError("Got: x\n Expected: y\n Fix: do z") ... except PhysLinkError as e: ... print(e)