Data quality — clean vs. dirty by design¶
Real data lies. Some rows are missing values, some rows are duplicated, some columns occasionally hold the wrong type, and late-arriving rows show up days after they should have. plotsim's quality layer lets you reproduce all of these defects on top of a clean dataset, with the manifest recording the ground truth — every corrupted row's index and original value.
That's the loop: generate clean, generate dirty alongside, validate that your downstream pipeline catches the dirt and recovers the clean values from the manifest.
A clean baseline¶
import json
import numpy as np
import pandas as pd
from pathlib import Path
from plotsim import (
create, generate_tables_with_state, build_manifest, write_tables,
)
def base_config(quality=None):
return create(
about="Quality demo",
unit="account",
window=("2024-01", "2024-06", "monthly"),
metrics=[
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
],
segments=[
{"name": "core", "count": 30, "archetype": "growth"},
{"name": "lapsed", "count": 30, "archetype": "decline"},
],
quality=quality or [],
)
clean_cfg = base_config()
clean_tables, clean_state = generate_tables_with_state(
clean_cfg, np.random.default_rng(clean_cfg.seed),
)
clean_manifest = build_manifest(
clean_cfg, clean_state.trajectories, clean_tables,
scd_state=clean_state.scd, bridge_state=clean_state.bridges,
)
write_tables(
clean_tables, clean_cfg, output_dir="./out_clean",
manifest=clean_manifest,
)
clean_disk = pd.read_csv("./out_clean/fct_account.csv")
print("Clean fct_account null counts:")
clean_disk.isna().sum()
Five issue types¶
Each entry in quality names a table, an issue type, a rate (0..1), and (for column-typed issues) a target column. Multiple issues can corrupt the same table — they apply in sequence with separate seed offsets.
| Issue | Effect | Needs column? |
|---|---|---|
null_injection |
replaces values with NULL | yes |
duplicate_rows |
appends exact duplicates of a sampled subset | no |
type_mismatch |
swaps the value's type (e.g. int → string) | yes |
late_arrival |
shuffles date_key values within the table |
no |
schema_drift |
renames the column on disk | yes |
dirty_cfg = base_config(quality=[
{"table": "fct_account", "issue": "null_injection",
"column": "engagement", "rate": 0.05},
{"table": "fct_account", "issue": "type_mismatch",
"column": "spend", "rate": 0.03},
{"table": "fct_account", "issue": "duplicate_rows",
"rate": 0.02},
{"table": "fct_account", "issue": "late_arrival",
"rate": 0.04},
])
dirty_tables, dirty_state = generate_tables_with_state(
dirty_cfg, np.random.default_rng(dirty_cfg.seed),
)
dirty_manifest = build_manifest(
dirty_cfg, dirty_state.trajectories, dirty_tables,
scd_state=dirty_state.scd, bridge_state=dirty_state.bridges,
)
write_tables(
dirty_tables, dirty_cfg, output_dir="./out_dirty",
manifest=dirty_manifest,
)
# The IN-MEMORY tables are still clean — quality injection runs at write
# time. To see the corruption, read the on-disk CSV back.
dirty_disk = pd.read_csv("./out_dirty/fct_account.csv")
print(f"Clean rows: {len(clean_disk)}, dirty rows: {len(dirty_disk)} "
f"(difference is duplicates)")
print(f"\nClean nulls: {int(clean_disk.isna().sum().sum())}, "
f"dirty nulls: {int(dirty_disk.isna().sum().sum())}")
print(f"\nClean dtypes:\n{clean_disk.dtypes}")
print(f"\nDirty dtypes — late_arrival adds an _arrival_period helper column:\n{dirty_disk.dtypes}")
Manifest as ground truth¶
Every quality injection writes a record to manifest.json under quality_injections. Each record names the issue type, table, column (if any), the affected row indices, and the clean values that were displaced. That's the recovery key for anyone who wants to test their pipeline's data-quality stage.
manifest_path = Path("./out_dirty/manifest.json")
manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
print(f"quality_injections: {len(manifest['quality_injections'])} records")
for record in manifest["quality_injections"][:3]:
print(f"\n issue_index: {record['issue_index']}")
print(f" issue_type: {record['issue_type']!r} on "
f"{record['table']!r}.{record.get('column') or '*'}")
print(f" rows: first 5 indices = {record['row_indices'][:5]} "
f"(of {len(record['row_indices'])})")
if record.get("clean_values"):
print(f" clean values: first 3 = {record['clean_values'][:3]}")
Severity dial — sweep the rate¶
The rate knob is the corruption fraction. Sweeping it shows how the on-disk null count scales with severity, and is a quick sanity check that your downstream tests still pass at the rates you expect to see in production.
rates = [0.0, 0.01, 0.05, 0.10, 0.20]
null_counts = []
for r in rates:
cfg = base_config(quality=[
{"table": "fct_account", "issue": "null_injection",
"column": "engagement", "rate": r},
])
tables, state = generate_tables_with_state(
cfg, np.random.default_rng(cfg.seed),
)
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
out = f"./out_sweep_{int(r * 100):02d}"
write_tables(tables, cfg, output_dir=out, manifest=manifest)
df = pd.read_csv(f"{out}/fct_account.csv")
null_counts.append((r, int(df["engagement"].isna().sum()), len(df)))
import matplotlib.pyplot as plt
fig, ax = plt.subplots(figsize=(7, 3.5))
ax.plot([r * 100 for r, _, _ in null_counts],
[n for _, n, _ in null_counts], marker="o")
ax.set_xlabel("null_injection rate (%)")
ax.set_ylabel("Null cells in engagement")
ax.set_title("Severity dial — null count scales with configured rate")
plt.tight_layout(); plt.show()
pd.DataFrame(null_counts, columns=["rate", "null_count", "total_rows"])
Where to next¶
- Pipeline testing —
pipeline_testing.ipynbshows how to wire the manifest's ground truth into pytest assertions. - DE use cases —
de_use_cases.ipynbbuilds a robustness-testing fixture set on top of the quality layer. - Config fields —
docs/site/config-reference.md§quality documents every issue type and its required fields.