默认数据包含列:
user_id
(用户ID)city
(城市)date
(日期:YYYY-MM-DD
)sales
(销量/金额)
如果你的字段名不同,只要在函数的参数里改对应列名即可。所有图都用 Matplotlib 且一图一绘(不使用子图),完全符合你之前的要求。
1) TopN + 分层时间聚合:每城市每月销量 Top3 的用户 + 小倍数图
思路:先把 date
→ month
(按月取起始日时间戳),聚合到 (city, month, user_id)
,再取每月 TopK。
小倍数图:不使用子图;为“每城-每月”生成单独一张图片(文件多但清晰)。
# --- 依赖 ---
import os
import pandas as pd
import matplotlib.pyplot as plt
def topn_monthly_users(df: pd.DataFrame, k=3,
date_col="date", city_col="city", user_col="user_id", sales_col="sales"):
d = df.copy()
d[date_col] = pd.to_datetime(d[date_col], errors="coerce")
d = d.dropna(subset=[date_col, city_col, user_col, sales_col])
d["month"] = d[date_col].dt.to_period("M").dt.to_timestamp()
agg = (d.groupby([city_col, "month", user_col], observed=True)[sales_col]
.sum().rename("sales_sum").reset_index())
topk = (agg.sort_values([city_col, "month", "sales_sum"], ascending=[True, True, False])
.groupby([city_col, "month"], observed=True).head(k)
.reset_index(drop=True))
return topk
def plot_topn_small_multiples(topk_df: pd.DataFrame,
city_col="city", user_col="user_id", month_col="month", sales_col="sales_sum",
max_months_per_city=None, # 传 None 就画所有月份;否则限制最近 N 个月
out_dir="plots_task1"):
os.makedirs(out_dir, exist_ok=True)
images = []
for city, sub in topk_df.groupby(city_col):
months_sorted = sorted(pd.unique(sub[month_col]))
months_show = months_sorted[-max_months_per_city:] if max_months_per_city else months_sorted
for m in months_show:
cur = sub[sub[month_col] == m].sort_values(sales_col, ascending=False)
plt.figure(figsize=(6, 4))
plt.bar(cur[user_col].astype(str), cur[sales_col].values)
plt.title(f"{city} — {pd.Timestamp(m).strftime('%Y-%m')} Top{len(cur)} Users")
plt.xlabel("user_id"); plt.ylabel("monthly sales")
plt.xticks(rotation=30)
plt.tight_layout()
fname = os.path.join(out_dir, f"{city}_{pd.Timestamp(m).strftime('%Y-%m')}.png")
plt.savefig(fname, dpi=150)
plt.close()
images.append(fname)
return images
# --- 用法示例 ---
# topk = topn_monthly_users(df, k=3)
# images = plot_topn_small_multiples(topk, max_months_per_city=3, out_dir="plots_task1")
# print(images[:5])
要扩展为 Top5 只需改 k=5
。想把“Top3 用户名”与数值标注到图上,可在绘图后循环 ax.bar_label
。
2) 滚动阈值异常检测:28 天滚动 99 分位(分城市)
思路:
- 先把原始明细按城市×日聚合(
resample('D').sum()
)。 - 对每个城市做 28 天窗口滚动 99% 分位阈值;当日值
>
阈值 → 记为异常。 - 一图一城:画日销售与阈值线,并用散点标异常。
# --- 依赖 ---
import os
import pandas as pd
import matplotlib.pyplot as plt
def rolling_threshold_anomaly(df: pd.DataFrame,
date_col="date", city_col="city", sales_col="sales",
freq="D", window=28, quantile=0.99):
d = df.copy()
d[date_col] = pd.to_datetime(d[date_col], errors="coerce")
d = d.dropna(subset=[date_col, city_col, sales_col])
# 聚合到日频
daily = (d.set_index(date_col)
.groupby(city_col)[sales_col]
.resample(freq).sum()
.rename("sales").reset_index())
outs = []
for city, sub in daily.groupby(city_col):
sub = sub.sort_values(date_col).copy()
thr = sub["sales"].rolling(window=window, min_periods=7).quantile(quantile)
sub["threshold"] = thr
sub["is_anomaly"] = (sub["sales"] > sub["threshold"]).astype(int)
outs.append(sub)
return pd.concat(outs, ignore_index=True)
def plot_city_anomaly_series(anom_df: pd.DataFrame,
city_col="city", date_col="date",
sales_col="sales", thr_col="threshold",
out_dir="plots_task2"):
os.makedirs(out_dir, exist_ok=True)
images = []
for city, sub in anom_df.groupby(city_col):
plt.figure(figsize=(8, 4))
plt.plot(sub[date_col], sub[sales_col], label="sales")
plt.plot(sub[date_col], sub[thr_col], linestyle="--", label="threshold")
an = sub[sub["is_anomaly"] == 1]
if not an.empty:
plt.scatter(an[date_col], an[sales_col], marker="x", label="anomaly")
plt.title(f"{city} — Daily Sales with Rolling 99th Percentile")
plt.xlabel("date"); plt.ylabel("sales")
plt.legend()
plt.tight_layout()
fname = os.path.join(out_dir, f"{city}_anomaly.png")
plt.savefig(fname, dpi=150)
plt.close()
images.append(fname)
return images
# --- 用法示例 ---
# anom = rolling_threshold_anomaly(df)
# imgs = plot_city_anomaly_series(anom, out_dir="plots_task2")
想更稳健:可以先做业务上的缺失补全/异常值平滑;或者用时间窗口 rolling('28D')
(要求日期索引完整),但日频等间隔时 rolling(28)
简单高效。
3) 带校验的导入器:CSV → 校验 → 清洗 → 分区落地(Parquet/CSV)+ 日志
目标:把“数据应该如何”的规则写成代码,早失败。
规则示例(你可以加/改):
- 必备列存在:
user_id/city/date/sales
- 日期可解析,且不晚于“现在”
city
落在允许集合(可选)sales
非负- 重复主键(
user_id, city, date
)保留最大sales
说明:优先写 Parquet;若运行环境没装 pyarrow/fastparquet
,会自动回退 CSV 并在日志标注。
# --- 依赖 ---
import os, json
import numpy as np
import pandas as pd
def ingest_validate(csv_path_or_df,
out_dir="out/ingest",
valid_cities=None,
date_col="date", city_col="city", user_col="user_id", sales_col="sales"):
os.makedirs(out_dir, exist_ok=True)
# 既可传路径也可直接传 DataFrame
if isinstance(csv_path_or_df, str):
raw = pd.read_csv(csv_path_or_df)
else:
raw = csv_path_or_df.copy()
required_cols = [user_col, city_col, date_col, sales_col]
missing_cols = [c for c in required_cols if c not in raw.columns]
errors = []
if missing_cols:
errors.append(f"缺少列: {missing_cols}")
df = raw.copy()
# 字符串清洗
if city_col in df.columns:
df[city_col] = df[city_col].astype(str).str.strip()
df[city_col].replace({"": np.nan}, inplace=True)
if user_col in df.columns:
df[user_col] = df[user_col].astype(str).str.strip()
# 解析日期
if date_col in df.columns:
df[date_col] = pd.to_datetime(df[date_col], errors="coerce")
# 规则校验
now = pd.Timestamp.now(tz="UTC").tz_localize(None)
bad_mask = pd.Series(False, index=df.index)
for col in required_cols:
if col in df.columns:
bad_mask |= df[col].isna()
if date_col in df.columns:
bad_mask |= df[date_col] > now
if valid_cities is not None and city_col in df.columns:
bad_mask |= ~df[city_col].isin(set(valid_cities))
if sales_col in df.columns:
with np.errstate(invalid="ignore"):
bad_mask |= ~(df[sales_col] >= 0)
bad_rows = df[bad_mask].copy()
clean = df[~bad_mask].copy()
# 去重(同 user_id-city-date 保留最大 sales)
if not clean.empty:
clean = (clean.sort_values(sales_col)
.drop_duplicates(subset=[user_col, city_col, date_col], keep="last"))
clean[sales_col] = clean[sales_col].astype(float)
# 落地:优先 Parquet;失败回退 CSV
storage_format = "parquet"
if not clean.empty:
for city, sub in clean.groupby(city_col, observed=True):
pdir = os.path.join(out_dir, f"city={city}")
os.makedirs(pdir, exist_ok=True)
try:
sub.to_parquet(os.path.join(pdir, "part.parquet"), index=False)
except Exception:
storage_format = "csv"
sub.to_csv(os.path.join(pdir, "part.csv"), index=False, encoding="utf-8")
# 日志
log = {
"timestamp": pd.Timestamp.utcnow().isoformat(),
"input_rows": int(len(df)),
"clean_rows": int(len(clean)),
"bad_rows": int(len(bad_rows)),
"missing_cols": missing_cols,
"errors": errors,
"cities_in_clean": (sorted(map(str, clean[city_col].unique())) if not clean.empty else []),
"storage_format": storage_format
}
with open(os.path.join(out_dir, "ingest_log.json"), "w", encoding="utf-8") as f:
json.dump(log, f, ensure_ascii=False, indent=2)
if not bad_rows.empty:
bad_rows.to_csv(os.path.join(out_dir, "bad_rows.csv"), index=False, encoding="utf-8")
return clean, log, bad_rows
# --- 用法示例 ---
# clean, log, bad = ingest_validate("raw.csv", out_dir="out/ingest", valid_cities={"Beijing","Shanghai","Shenzhen"})
# print(log)
落地目录示例(按城市分区):
out/ingest/
├─ city=Beijing/ part.parquet 或 part.csv
├─ city=Shanghai/ part.parquet 或 part.csv
├─ city=Shenzhen/ part.parquet 或 part.csv
├─ ingest_log.json
└─ bad_rows.csv
4) 时间穿越安全的建模脚本:TimeSeriesSplit + Pipeline + GridSearchCV + 元数据
核心:特征只使用“过去”信息(shift
+ rolling
严格左闭右开/或先 shift
再 rolling
),评估用 TimeSeriesSplit
(前训后测)。
# --- 依赖 ---
import json, platform
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
import joblib
def build_timeseries_features(daily_city_df: pd.DataFrame,
date_col="date", city_col="city", target_col="sales"):
"""
输入:按 城市×日 聚合后的表(包含列 city, date, sales)
输出:添加了严格“只看过去”的特征:
- dow, month
- lag1, lag7
- ma7(先shift再滚动均值)
"""
d = daily_city_df.copy()
d[date_col] = pd.to_datetime(d[date_col], errors="coerce")
d = d.dropna(subset=[date_col, city_col, target_col])
d = d.sort_values([city_col, date_col])
d["dow"] = d[date_col].dt.dayofweek
d["month"] = d[date_col].dt.month
def add_lags(g: pd.DataFrame):
g = g.copy()
g["lag1"] = g[target_col].shift(1)
g["lag7"] = g[target_col].shift(7)
g["ma7"] = g[target_col].shift(1).rolling(7, min_periods=1).mean()
return g
# 未来版本的pandas会改变groupby-apply行为,这里显式 group_keys=False 更稳健
d = d.groupby(city_col, observed=True, group_keys=False).apply(add_lags)
d = d.dropna(subset=["lag1", "lag7", "ma7"])
return d
def timeseries_modeling(df: pd.DataFrame,
date_col="date", city_col="city", target_col="sales",
model_out="model.joblib", meta_out="metadata.json"):
"""
- 用 RandomForest 回归做示例(可替换为任意模型)
- 完整 Pipeline:预处理(数值标准化 + 类别OneHot)+ 模型
- GridSearchCV + TimeSeriesSplit(默认3折以加快示例)
- 产出:最优模型 + 元数据json
"""
data = build_timeseries_features(df, date_col=date_col, city_col=city_col, target_col=target_col)
features = ["dow", "month", "lag1", "lag7", "ma7", city_col]
X = data[features]
y = data[target_col]
num_cols = ["dow", "month", "lag1", "lag7", "ma7"]
cat_cols = [city_col]
pre = ColumnTransformer([
("num", Pipeline([("imp", SimpleImputer()), ("sc", StandardScaler())]), num_cols),
("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols)
])
pipe = Pipeline([
("pre", pre),
("model", RandomForestRegressor(random_state=42, n_estimators=100, max_depth=10))
])
# 为了示例速度,网格很小;真实项目可扩展
param_grid = {
"model__n_estimators": [100],
"model__max_depth": [10]
}
tscv = TimeSeriesSplit(n_splits=3)
gs = GridSearchCV(pipe, param_grid, cv=tscv, n_jobs=-1,
scoring="neg_mean_absolute_error", refit=True)
gs.fit(X, y)
best = gs.best_estimator_
joblib.dump(best, model_out)
meta = {
"timestamp": pd.Timestamp.utcnow().isoformat(),
"python": platform.python_version(),
"pandas": pd.__version__,
"sklearn": ">=1.0",
"seed": 42,
"train_rows": int(len(X)),
"features": features,
"cv_splits": 3,
"best_params": gs.best_params_,
"cv_mae": float(-gs.best_score_) # MAE越小越好
}
with open(meta_out, "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
return best, meta
# --- 用法示例 ---
# 准备日频数据(如果原始是明细)
# daily_city = (df.groupby(["city", pd.Grouper(key="date", freq="D")], observed=True)["sales"]
# .sum().reset_index())
# best_model, meta = timeseries_modeling(daily_city,
# model_out="models/model.joblib",
# meta_out="models/metadata.json")
# print(meta)
为什么“时间穿越安全”:
- 所有时序特征都由滞后(
shift
)得到,ma7
也基于shift(1)
后再rolling(7)
计算,因此任何时刻 t 的特征只使用t-1
及更早的观测。 - 切分用
TimeSeriesSplit
,保持“前训后测”的时间顺序,杜绝把未来信息泄露到训练。
迷你执行清单(把四题串起来)
TopN
topk = topn_monthly_users(df, k=3) plot_topn_small_multiples(topk, max_months_per_city=3, out_dir="plots_task1")
滚动异常
anom = rolling_threshold_anomaly(df) plot_city_anomaly_series(anom, out_dir="plots_task2")
导入器
clean, log, bad = ingest_validate("raw.csv", out_dir="out/ingest", valid_cities={"Beijing","Shanghai","Shenzhen"})
建模
daily_city = (df.groupby(["city", pd.Grouper(key="date", freq="D")], observed=True)["sales"] .sum().reset_index()) best_model, meta = timeseries_modeling(daily_city, model_out="models/model.joblib", meta_out="models/metadata.json")
参考文件:
链接:https://pan.quark.cn/s/ecdfdec26ca0?pwd=BJPc
提取码:BJPc