Data engineering use cases¶
- Deterministic test-fixture factory
- Schema evolution testing
- Independent FK / grain / PK validation
- Quality injection for pipeline robustness
- CSV vs Parquet format testing
- Scale testing — measure pipeline performance against entity counts
- Manifest as ground truth for pipeline output comparison
In [ ]:
Copied!
import json
import time
import numpy as np
import pandas as pd
from pathlib import Path
from plotsim import (
create, generate_tables, generate_tables_with_state,
build_manifest, write_tables,
)
def make_config(seed=42, extra_metrics=None, quality=None, n_per_segment=20):
metrics = [
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
]
if extra_metrics:
metrics += extra_metrics
cfg = create(
about="DE fixture",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=metrics,
segments=[
{"name": "growth_seg", "count": n_per_segment, "archetype": "growth"},
{"name": "decline_seg", "count": n_per_segment, "archetype": "decline"},
],
quality=quality or [],
)
# `create()` randomises the seed; pin it so this fixture is deterministic.
return cfg.model_copy(update={"seed": seed})
import json
import time
import numpy as np
import pandas as pd
from pathlib import Path
from plotsim import (
create, generate_tables, generate_tables_with_state,
build_manifest, write_tables,
)
def make_config(seed=42, extra_metrics=None, quality=None, n_per_segment=20):
metrics = [
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
]
if extra_metrics:
metrics += extra_metrics
cfg = create(
about="DE fixture",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=metrics,
segments=[
{"name": "growth_seg", "count": n_per_segment, "archetype": "growth"},
{"name": "decline_seg", "count": n_per_segment, "archetype": "decline"},
],
quality=quality or [],
)
# `create()` randomises the seed; pin it so this fixture is deterministic.
return cfg.model_copy(update={"seed": seed})
1. Deterministic test-fixture factory¶
Every config takes a seed; every output is reproducible from (config, seed). That makes plotsim fixtures cheaper than checked-in CSVs — the config is the artifact.
In [ ]:
Copied!
cfg = make_config(seed=42)
a = generate_tables(cfg, np.random.default_rng(cfg.seed))
b = generate_tables(cfg, np.random.default_rng(cfg.seed))
assert all(a[k].equals(b[k]) for k in a)
print(f"Reproducible: {len(a)} tables byte-identical at seed {cfg.seed}.")
cfg = make_config(seed=42)
a = generate_tables(cfg, np.random.default_rng(cfg.seed))
b = generate_tables(cfg, np.random.default_rng(cfg.seed))
assert all(a[k].equals(b[k]) for k in a)
print(f"Reproducible: {len(a)} tables byte-identical at seed {cfg.seed}.")
2. Schema evolution testing¶
Add a metric to the config — does the new column appear, do existing columns stay typed, do downstream queries break?
In [ ]:
Copied!
cfg_v1 = make_config()
cfg_v2 = make_config(extra_metrics=[
{"name": "tickets", "type": "count", "polarity": "negative"},
])
v1 = generate_tables(cfg_v1, np.random.default_rng(cfg_v1.seed))
v2 = generate_tables(cfg_v2, np.random.default_rng(cfg_v2.seed))
added = set(v2["fct_account"].columns) - set(v1["fct_account"].columns)
removed = set(v1["fct_account"].columns) - set(v2["fct_account"].columns)
print(f"Added: {sorted(added)}")
print(f"Removed: {sorted(removed)}")
cfg_v1 = make_config()
cfg_v2 = make_config(extra_metrics=[
{"name": "tickets", "type": "count", "polarity": "negative"},
])
v1 = generate_tables(cfg_v1, np.random.default_rng(cfg_v1.seed))
v2 = generate_tables(cfg_v2, np.random.default_rng(cfg_v2.seed))
added = set(v2["fct_account"].columns) - set(v1["fct_account"].columns)
removed = set(v1["fct_account"].columns) - set(v2["fct_account"].columns)
print(f"Added: {sorted(added)}")
print(f"Removed: {sorted(removed)}")
3. Independent FK / grain / PK validation¶
Replicate plotsim's validate() checks in your own pipeline tests so you're not coupled to a specific plotsim version.
In [ ]:
Copied!
def fk_orphans(child_df, child_col, parent_df, parent_col):
return set(child_df[child_col]) - set(parent_df[parent_col])
def pk_unique(df, key_cols):
return not df.duplicated(subset=key_cols).any()
def grain_one_per(df, group_cols):
return df.groupby(group_cols).size().eq(1).all()
tables = v1
checks = {
"fct→dim_account FK closure": len(fk_orphans(
tables["fct_account"], "account_id",
tables["dim_account"], "account_id")) == 0,
"fct→dim_date FK closure": len(fk_orphans(
tables["fct_account"], "date_key",
tables["dim_date"], "date_key")) == 0,
"dim_account PK unique": pk_unique(tables["dim_account"], ["account_id"]),
"fct grain (account, date)": grain_one_per(
tables["fct_account"], ["account_id", "date_key"]),
}
for name, passed in checks.items():
print(f" {'PASS' if passed else 'FAIL'} {name}")
def fk_orphans(child_df, child_col, parent_df, parent_col):
return set(child_df[child_col]) - set(parent_df[parent_col])
def pk_unique(df, key_cols):
return not df.duplicated(subset=key_cols).any()
def grain_one_per(df, group_cols):
return df.groupby(group_cols).size().eq(1).all()
tables = v1
checks = {
"fct→dim_account FK closure": len(fk_orphans(
tables["fct_account"], "account_id",
tables["dim_account"], "account_id")) == 0,
"fct→dim_date FK closure": len(fk_orphans(
tables["fct_account"], "date_key",
tables["dim_date"], "date_key")) == 0,
"dim_account PK unique": pk_unique(tables["dim_account"], ["account_id"]),
"fct grain (account, date)": grain_one_per(
tables["fct_account"], ["account_id", "date_key"]),
}
for name, passed in checks.items():
print(f" {'PASS' if passed else 'FAIL'} {name}")
4. Quality injection for pipeline robustness¶
In [ ]:
Copied!
clean_cfg = make_config()
dirty_cfg = make_config(quality=[
{"table": "fct_account", "issue": "null_injection",
"column": "engagement", "rate": 0.05},
{"table": "fct_account", "issue": "duplicate_rows", "rate": 0.02},
])
for cfg, label in ((clean_cfg, "clean"), (dirty_cfg, "dirty")):
tables, state = generate_tables_with_state(cfg, np.random.default_rng(cfg.seed))
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
write_tables(tables, cfg, output_dir=f"./out_{label}", manifest=manifest)
on_disk = pd.read_csv(f"./out_{label}/fct_account.csv")
print(f" {label:>5}: {len(on_disk)} rows, "
f"{int(on_disk.isna().sum().sum())} nulls")
clean_cfg = make_config()
dirty_cfg = make_config(quality=[
{"table": "fct_account", "issue": "null_injection",
"column": "engagement", "rate": 0.05},
{"table": "fct_account", "issue": "duplicate_rows", "rate": 0.02},
])
for cfg, label in ((clean_cfg, "clean"), (dirty_cfg, "dirty")):
tables, state = generate_tables_with_state(cfg, np.random.default_rng(cfg.seed))
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
write_tables(tables, cfg, output_dir=f"./out_{label}", manifest=manifest)
on_disk = pd.read_csv(f"./out_{label}/fct_account.csv")
print(f" {label:>5}: {len(on_disk)} rows, "
f"{int(on_disk.isna().sum().sum())} nulls")
5. CSV vs Parquet — format-aware tests¶
In [ ]:
Copied!
tables_, state_ = generate_tables_with_state(
clean_cfg, np.random.default_rng(clean_cfg.seed),
)
manifest_ = build_manifest(
clean_cfg, state_.trajectories, tables_,
scd_state=state_.scd, bridge_state=state_.bridges,
)
csv_cfg = clean_cfg
parquet_cfg = clean_cfg.model_copy(update={
"output": clean_cfg.output.model_copy(update={"format": "parquet"}),
})
write_tables(tables_, csv_cfg, output_dir="./out_fmt_csv", manifest=manifest_)
write_tables(tables_, parquet_cfg, output_dir="./out_fmt_parquet", manifest=manifest_)
csv_size = sum(p.stat().st_size for p in Path("./out_fmt_csv").glob("*.csv"))
pq_size = sum(p.stat().st_size for p in Path("./out_fmt_parquet").glob("*.parquet"))
print(f"CSV total: {csv_size:>8,} bytes")
print(f"Parquet total: {pq_size:>8,} bytes")
csv_back = pd.read_csv("./out_fmt_csv/fct_account.csv")
pq_back = pd.read_parquet("./out_fmt_parquet/fct_account.parquet")
print(f"\nRow counts equal: {len(csv_back) == len(pq_back)}")
print(f"Column sets equal: {set(csv_back.columns) == set(pq_back.columns)}")
tables_, state_ = generate_tables_with_state(
clean_cfg, np.random.default_rng(clean_cfg.seed),
)
manifest_ = build_manifest(
clean_cfg, state_.trajectories, tables_,
scd_state=state_.scd, bridge_state=state_.bridges,
)
csv_cfg = clean_cfg
parquet_cfg = clean_cfg.model_copy(update={
"output": clean_cfg.output.model_copy(update={"format": "parquet"}),
})
write_tables(tables_, csv_cfg, output_dir="./out_fmt_csv", manifest=manifest_)
write_tables(tables_, parquet_cfg, output_dir="./out_fmt_parquet", manifest=manifest_)
csv_size = sum(p.stat().st_size for p in Path("./out_fmt_csv").glob("*.csv"))
pq_size = sum(p.stat().st_size for p in Path("./out_fmt_parquet").glob("*.parquet"))
print(f"CSV total: {csv_size:>8,} bytes")
print(f"Parquet total: {pq_size:>8,} bytes")
csv_back = pd.read_csv("./out_fmt_csv/fct_account.csv")
pq_back = pd.read_parquet("./out_fmt_parquet/fct_account.parquet")
print(f"\nRow counts equal: {len(csv_back) == len(pq_back)}")
print(f"Column sets equal: {set(csv_back.columns) == set(pq_back.columns)}")
6. Scale testing¶
Sweep entity counts and measure your pipeline's wall time. The plot is a quick sanity check that linear-in-rows holds.
In [ ]:
Copied!
import matplotlib.pyplot as plt
sizes = [10, 50, 100, 200]
gen_times = []
for n in sizes:
cfg = make_config(n_per_segment=n)
t0 = time.perf_counter()
_ = generate_tables(cfg, np.random.default_rng(cfg.seed))
gen_times.append(time.perf_counter() - t0)
fig, ax = plt.subplots(figsize=(7, 3.5))
ax.plot([2 * n for n in sizes], gen_times, marker="o")
ax.set_xlabel("Total entities"); ax.set_ylabel("generate_tables time (s)")
ax.set_title("Wall-time scales with entity count")
plt.tight_layout(); plt.show()
import matplotlib.pyplot as plt
sizes = [10, 50, 100, 200]
gen_times = []
for n in sizes:
cfg = make_config(n_per_segment=n)
t0 = time.perf_counter()
_ = generate_tables(cfg, np.random.default_rng(cfg.seed))
gen_times.append(time.perf_counter() - t0)
fig, ax = plt.subplots(figsize=(7, 3.5))
ax.plot([2 * n for n in sizes], gen_times, marker="o")
ax.set_xlabel("Total entities"); ax.set_ylabel("generate_tables time (s)")
ax.set_title("Wall-time scales with entity count")
plt.tight_layout(); plt.show()
7. Manifest as ground truth¶
The manifest names every entity's archetype, every event firing, and every quality injection. Use it as the ground truth in pipeline-output comparisons — your pipeline's row counts and aggregates should match.
In [ ]:
Copied!
mf = json.loads(Path("./out_clean/manifest.json").read_text(encoding="utf-8"))
print(f"Manifest top-level keys: {sorted(mf)}")
print(f"\nArchetype assignments — first 3:")
for record in mf["archetype_assignments"][:3]:
print(f" {record['entity']!r:>10} → {record['archetype']!r}")
mf = json.loads(Path("./out_clean/manifest.json").read_text(encoding="utf-8"))
print(f"Manifest top-level keys: {sorted(mf)}")
print(f"\nArchetype assignments — first 3:")
for record in mf["archetype_assignments"][:3]:
print(f" {record['entity']!r:>10} → {record['archetype']!r}")
Where to next¶
- Pipeline testing —
pipeline_testing.ipynbgoes deeper on assertion patterns and schema diffs. - Data quality —
data_quality.ipynbcovers all five quality issue types in detail. - DS use cases —
ds_use_cases.ipynb— the ML-flavored counterpart to this notebook.