Plotsim as a pipeline test fixture factory¶
Three properties make plotsim well-suited to data-pipeline testing:
- Deterministic — same config + same seed → byte-identical output, every run, every machine.
- Documented ground truth — the manifest names every entity's archetype, every event's firing period, and every quality injection's clean values.
- Configurable shape — a one-line config change re-targets row counts, schemas, and behavior. No giant CSVs to re-curate.
This notebook walks through the four checks worth automating: determinism, FK closure, manifest agreement, and aggregate-vs-archetype shape. Then it shows schema evolution between two config versions.
import numpy as np
import pandas as pd
from pathlib import Path
from plotsim import (
create, generate_tables, generate_tables_with_state,
build_manifest, write_tables,
)
def make_config(extra_metrics=None):
metrics = [
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
]
if extra_metrics:
metrics += extra_metrics
return create(
about="Pipeline test fixture",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=metrics,
segments=[
{"name": "growth_seg", "count": 30, "archetype": "growth",
"attributes": {"shape": "growth"}},
{"name": "decline_seg", "count": 30, "archetype": "decline",
"attributes": {"shape": "decline"}},
],
)
cfg = make_config()
print(f"seed={cfg.seed}, segments={len(cfg.entities)} expanded entities")
Determinism — same seed, same bytes¶
def gen(cfg):
return generate_tables(cfg, np.random.default_rng(cfg.seed))
a = gen(cfg)
b = gen(cfg)
for name in a:
assert a[name].equals(b[name]), f"{name} drifted between runs"
print(f"All {len(a)} tables byte-identical across two runs at seed {cfg.seed}.")
CSV and Parquet from the same dataset¶
The engine path is identical; only the on-disk encoding differs. To switch a config to Parquet, copy the config and override the output.format field.
tables, state = generate_tables_with_state(
cfg, np.random.default_rng(cfg.seed),
)
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
write_tables(tables, cfg, output_dir="./out_csv", manifest=manifest)
cfg_pq = cfg.model_copy(update={
"output": cfg.output.model_copy(update={"format": "parquet"}),
})
write_tables(tables, cfg_pq, output_dir="./out_parquet", manifest=manifest)
csv_size = sum(p.stat().st_size for p in Path("./out_csv").glob("*.csv"))
pq_size = sum(p.stat().st_size for p in Path("./out_parquet").glob("*.parquet"))
print(f"CSV total: {csv_size:>8,} bytes")
print(f"Parquet total: {pq_size:>8,} bytes ({csv_size / pq_size:.1f}x smaller)")
FK closure — every fact's foreign key resolves¶
The first check any pipeline test should do: for each fact's foreign key, every value appears in the parent dim's primary key.
def check_fk_closure(child_df, child_col, parent_df, parent_col):
orphans = set(child_df[child_col]) - set(parent_df[parent_col])
return len(orphans), orphans
orphans_account, _ = check_fk_closure(
tables["fct_account"], "account_id",
tables["dim_account"], "account_id",
)
orphans_date, _ = check_fk_closure(
tables["fct_account"], "date_key",
tables["dim_date"], "date_key",
)
print(f"fct_account.account_id orphans: {orphans_account}")
print(f"fct_account.date_key orphans: {orphans_date}")
assert orphans_account == 0 and orphans_date == 0, 'FK closure failed'
Manifest as ground truth — row counts and archetype mix¶
import json
mf = json.loads(Path("./out_csv/manifest.json").read_text(encoding="utf-8"))
# Each entity's configured archetype is named in archetype_assignments.
archetype_counts = (
pd.DataFrame(mf["archetype_assignments"])["archetype"].value_counts()
)
print("Manifest archetype assignments:")
print(archetype_counts)
# These should match the segment counts we configured.
expected = {"growth_seg": 30, "decline_seg": 30}
print(f"\nExpected from config: {expected}")
assert dict(archetype_counts) == expected, 'manifest disagrees with config'
Aggregate-vs-archetype — shape recovery¶
The growth segment should show an upward-trending mean over time. The decline segment should show the opposite. This is a coarse check — the engine guarantees the trajectory shape, not exact values — but it catches gross regressions.
fct = (
tables["fct_account"]
.merge(tables["dim_date"][["date_key", "period_index"]], on="date_key")
.merge(tables["dim_account"][["account_id", "shape"]], on="account_id")
)
trend = (fct.groupby(["shape", "period_index"])["engagement"]
.mean().reset_index())
growth_first = trend[(trend["shape"] == "growth") & (trend["period_index"] == 0)]["engagement"].iloc[0]
growth_last = trend[(trend["shape"] == "growth") & (trend["period_index"] == 11)]["engagement"].iloc[0]
decline_first = trend[(trend["shape"] == "decline") & (trend["period_index"] == 0)]["engagement"].iloc[0]
decline_last = trend[(trend["shape"] == "decline") & (trend["period_index"] == 11)]["engagement"].iloc[0]
print(f"growth period 0: {growth_first:.3f} → period 11: {growth_last:.3f}")
print(f"decline period 0: {decline_first:.3f} → period 11: {decline_last:.3f}")
assert growth_last > growth_first, "growth segment should trend up"
assert decline_last < decline_first, "decline segment should trend down"
print("Shape recovery checks passed.")
Schema evolution — two config versions side by side¶
Add a new metric, regenerate, and confirm the schema changed in exactly the expected way. This is the smoke-test you want before letting a config change land in production.
cfg_v1 = cfg
cfg_v2 = make_config(extra_metrics=[
{"name": "tickets", "type": "count", "polarity": "negative"},
])
t_v1 = gen(cfg_v1)
t_v2 = gen(cfg_v2)
v1_cols = set(t_v1["fct_account"].columns)
v2_cols = set(t_v2["fct_account"].columns)
print(f"v1 fact columns: {sorted(v1_cols)}")
print(f"v2 fact columns: {sorted(v2_cols)}")
print(f"\nAdded: {sorted(v2_cols - v1_cols)}")
print(f"Removed: {sorted(v1_cols - v2_cols)}")
assert v2_cols - v1_cols == {"tickets"}, "unexpected schema diff"
print("\nSchema evolution looks clean.")
Where to next¶
- Data quality —
data_quality.ipynblayers in the corruption types your pipeline tests should be robust against. - DE use cases —
de_use_cases.ipynbputs these checks together into a fixture pattern you can copy into a real test suite. - Manifest schema —
from plotsim import ManifestSchemafor the full pydantic-typed shape ofmanifest.json.