Data science use cases¶
- Controlled experiments — two datasets differing in one variable
- Feature engineering validation — features should recover configured behavior
- Train/holdout with known future — compare predictions against configured trajectory
- Entity features as cluster labels — segmentation recovery
- Correlation stress testing — recovered Pearson should track configured target
- Seasonality as a confound — model separation with/without it
- Sample size analysis — measure model stabilization vs. entity count
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from plotsim import (
create, generate_tables, generate_tables_with_state,
build_manifest, write_tables,
)
def fixed_seed(cfg, seed=42):
return cfg.model_copy(update={"seed": seed})
1. Controlled experiment — same config, one variable changes¶
Build two datasets that differ in exactly one knob and look for the expected downstream signal. Here: with vs. without a 3-period causal lag between two metrics. The lag should show up as a time-shifted peak in the cross-correlation.
def controlled(with_lag):
metrics = [{"name": "engagement", "type": "score", "polarity": "positive"}]
follower = {"name": "tickets", "type": "score", "polarity": "negative"}
if with_lag:
follower.update(follows="engagement", delay=3)
metrics.append(follower)
return fixed_seed(create(
about="Controlled experiment",
unit="account",
window=("2023-01", "2024-12", "monthly"),
metrics=metrics,
segments=[{"name": "core", "count": 50, "archetype": "spike_then_crash"}],
))
def cross_corr(series_a, series_b, max_lag=6):
return [series_a.corr(series_b.shift(-k)) for k in range(-max_lag, max_lag + 1)]
for with_lag, label in ((False, "no lag"), (True, "follows engagement, delay=3")):
cfg = controlled(with_lag)
fct = generate_tables(cfg, np.random.default_rng(cfg.seed))["fct_account"]
avg = (fct.merge(
cfg.tables[0] if False else
generate_tables(cfg, np.random.default_rng(cfg.seed))["dim_date"]
[["date_key", "period_index"]], on="date_key")
.groupby("period_index")[["engagement", "tickets"]].mean())
xc = cross_corr(avg["engagement"], avg["tickets"])
peak_lag = np.argmax(np.abs(xc)) - 6
print(f" {label:>32}: peak cross-corr at lag {peak_lag:+d} period(s)")
2. Feature engineering validation¶
Construct a small feature panel from the fact table and verify each feature actually distinguishes the segments you configured. If mean doesn't separate growth from decline, your feature pipeline has a bug.
cfg = fixed_seed(create(
about="Feature validation",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=[
{"name": "engagement", "type": "score", "polarity": "positive"},
],
segments=[
{"name": "growth_seg", "count": 50, "archetype": "growth",
"attributes": {"shape": "growth"}},
{"name": "decline_seg", "count": 50, "archetype": "decline",
"attributes": {"shape": "decline"}},
],
))
tables = generate_tables(cfg, np.random.default_rng(cfg.seed))
fct = tables["fct_account"].merge(
tables["dim_account"][["account_id", "shape"]], on="account_id",
)
features = fct.groupby(["account_id", "shape"])["engagement"].agg(
mean="mean", first_period="first", last_period="last",
).reset_index()
features["delta"] = features["last_period"] - features["first_period"]
print("Per-shape feature means:")
features.groupby("shape")[["mean", "delta"]].mean()
3. Train/holdout with known future¶
Hold out the last N periods, train on the prefix, and compare predictions to the engine's actual values. Because the trajectory is deterministic, the residuals isolate model error from generation noise.
cfg = fixed_seed(create(
about="Holdout experiment",
unit="account",
window=("2023-01", "2024-12", "monthly"),
metrics=[{"name": "engagement", "type": "score", "polarity": "positive"}],
segments=[{"name": "core", "count": 50, "archetype": "growth"}],
holdout={"target": "engagement", "periods": 4},
))
tables, state = generate_tables_with_state(cfg, np.random.default_rng(cfg.seed))
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
out = Path("./out_ds_holdout")
write_tables(tables, cfg, output_dir=out, manifest=manifest)
train = pd.read_csv(out / "fct_account_train.csv")
holdout = pd.read_csv(out / "fct_account_holdout.csv")
print(f"Train periods: {train['date_key'].nunique()}, "
f"holdout periods: {holdout['date_key'].nunique()}")
from sklearn.linear_model import LinearRegression
date_index = (
tables["dim_date"][["date_key", "period_index"]]
.set_index("date_key")["period_index"]
)
errors = []
for aid, grp_tr in train.groupby("account_id"):
grp_ho = holdout[holdout["account_id"] == aid]
x_tr = grp_tr["date_key"].map(date_index).to_numpy().reshape(-1, 1)
x_ho = grp_ho["date_key"].map(date_index).to_numpy().reshape(-1, 1)
m = LinearRegression().fit(x_tr, grp_tr["engagement"])
errors.extend((m.predict(x_ho) - grp_ho["engagement"].to_numpy()).tolist())
print(f"Holdout MAE: {np.mean(np.abs(errors)):.4f}")
4. Entity features as cluster labels¶
The manifest's archetype_assignments is a labelled ground-truth column you can score clustering against. Adjusted Rand Index measures how well unsupervised clusters recover the configured segments.
from sklearn.cluster import KMeans
from sklearn.metrics import adjusted_rand_score
cfg = fixed_seed(create(
about="Cluster recovery",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=[
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
],
segments=[
{"name": "stars", "count": 40, "archetype": "growth"},
{"name": "fading", "count": 40, "archetype": "decline"},
{"name": "steady", "count": 40, "archetype": "flat"},
],
entity_features=True,
))
tables, state = generate_tables_with_state(cfg, np.random.default_rng(cfg.seed))
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
out = Path("./out_ds_cluster")
write_tables(tables, cfg, output_dir=out, manifest=manifest)
features = pd.read_csv(out / "_entity_features.csv")
label_col = "archetype" if "archetype" in features.columns else "label"
X = features.drop(columns=["account_id", label_col]).select_dtypes(include="number")
labels_true = features[label_col]
labels_pred = KMeans(n_clusters=labels_true.nunique(), n_init=10,
random_state=42).fit_predict(X)
print(f"Adjusted Rand Index: {adjusted_rand_score(labels_true, labels_pred):.3f} "
"(1.0 = perfect recovery, 0.0 = random)")
5. Correlation stress test¶
Sweep the relationship word from inverts (−0.75) to mirrors (+0.75) and confirm that stronger positive words produce more positive Pearson and stronger negative words produce more negative Pearson. Exact target recovery depends on archetype variance and noise — fidelity is best at high entity counts on mixed archetypes; for the connection-vocabulary coefficients see docs/site/user-guide/metrics-and-connections.md.
def realized_corr(rel_word):
cfg = fixed_seed(create(
about=f"Correlation: {rel_word}",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=[
{"name": "a", "type": "score", "polarity": "positive"},
{"name": "b", "type": "score", "polarity": "positive"},
],
connections=[f"a {rel_word} b"],
segments=[
{"name": "g", "count": 100, "archetype": "growth"},
{"name": "d", "count": 100, "archetype": "decline"},
],
))
tables = generate_tables(cfg, np.random.default_rng(cfg.seed))
return tables["fct_account"][["a", "b"]].corr().iloc[0, 1]
target = {"inverts": -0.75, "opposes": -0.55, "independent": 0.0,
"driven_by": 0.55, "mirrors": 0.75}
rows = [(word, t, realized_corr(word)) for word, t in target.items()]
df = pd.DataFrame(rows, columns=["word", "target", "realized"])
print(df.round(3))
# Monotonic ordering check — stronger configured words should produce
# stronger realized signals.
print("\nRanks (target vs realized):")
print(pd.DataFrame({
"target_rank": df["target"].rank().astype(int).tolist(),
"realized_rank": df["realized"].rank().astype(int).tolist(),
}, index=df["word"]))
6. Seasonality as a confound¶
Add a known seasonal signal and check whether the model can still separate the underlying archetype. With seasonality, the same model should keep most of its accuracy — if it crashes, the model was leaning on seasonal noise.
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split
def f1_for(seasonality):
cfg = fixed_seed(create(
about="Seasonal confound",
unit="account",
window=("2023-01", "2024-12", "monthly"),
metrics=[
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
],
segments=[
{"name": "stars", "count": 50, "archetype": "growth",
"attributes": {"shape": "stars"}},
{"name": "fading", "count": 50, "archetype": "decline",
"attributes": {"shape": "fading"}},
],
seasonality=seasonality,
entity_features=True,
))
tables, state = generate_tables_with_state(cfg, np.random.default_rng(cfg.seed))
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
out = Path(f"./out_seasonal_{int(bool(seasonality))}")
write_tables(tables, cfg, output_dir=out, manifest=manifest)
feat = pd.read_csv(out / "_entity_features.csv")
label_col = "archetype" if "archetype" in feat.columns else "label"
X = feat.drop(columns=["account_id", label_col]).select_dtypes(include="number")
y = feat[label_col]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3,
random_state=0, stratify=y)
pred = LogisticRegression(max_iter=2000).fit(X_tr, y_tr).predict(X_te)
return f1_score(y_te, pred, average="macro")
print(f"F1 without seasonality: {f1_for([]):.3f}")
print(f"F1 with seasonality: {f1_for([{'months':[11,12], 'strength': 0.4}]):.3f}")
7. Sample size analysis¶
Vary the entity count and re-fit the same model. The accuracy curve plateaus once the segments are large enough — useful for sizing real-world cohorts.
def f1_at(n):
cfg = fixed_seed(create(
about="Sample size",
unit="account",
window=("2024-01", "2024-12", "monthly"),
metrics=[
{"name": "engagement", "type": "score", "polarity": "positive"},
{"name": "spend", "type": "amount", "polarity": "positive",
"range": [10, 500]},
],
segments=[
{"name": "stars", "count": n, "archetype": "growth"},
{"name": "fading", "count": n, "archetype": "decline"},
],
entity_features=True,
))
tables, state = generate_tables_with_state(cfg, np.random.default_rng(cfg.seed))
manifest = build_manifest(
cfg, state.trajectories, tables,
scd_state=state.scd, bridge_state=state.bridges,
)
out = Path(f"./out_ss_{n}")
write_tables(tables, cfg, output_dir=out, manifest=manifest)
feat = pd.read_csv(out / "_entity_features.csv")
label_col = "archetype" if "archetype" in feat.columns else "label"
X = feat.drop(columns=["account_id", label_col]).select_dtypes(include="number")
y = feat[label_col]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3,
random_state=0, stratify=y)
pred = LogisticRegression(max_iter=2000).fit(X_tr, y_tr).predict(X_te)
return f1_score(y_te, pred, average="macro")
sizes = [10, 25, 50, 100]
results = [(n, f1_at(n)) for n in sizes]
fig, ax = plt.subplots(figsize=(7, 3.5))
ax.plot(sizes, [f for _, f in results], marker="o")
ax.set_xlabel("Entities per segment")
ax.set_ylabel("F1 (macro)")
ax.set_title("Model accuracy plateaus past N≈50 per segment")
plt.tight_layout(); plt.show()
Where to next¶
- ML readiness —
ml_readiness.ipynbwalks through the entity-feature matrix and holdout split in more depth. - Seasonality and correlations —
seasonality_and_correlations.ipynbcovers the connection vocabulary in full. - DE use cases —
de_use_cases.ipynb— the pipeline-engineering counterpart to this notebook.