🦝python 练习题u1版

默认数据包含列:

  • user_id(用户ID)
  • city(城市)
  • date(日期:YYYY-MM-DD
  • sales(销量/金额)

如果你的字段名不同,只要在函数的参数里改对应列名即可。所有图都用 Matplotlib 且一图一绘(不使用子图),完全符合你之前的要求。


1) TopN + 分层时间聚合:每城市每月销量 Top3 的用户 + 小倍数图

思路:先把 datemonth(按月取起始日时间戳),聚合到 (city, month, user_id),再取每月 TopK。
小倍数图:不使用子图;为“每城-每月”生成单独一张图片(文件多但清晰)。

# --- 依赖 ---
import os
import pandas as pd
import matplotlib.pyplot as plt

def topn_monthly_users(df: pd.DataFrame, k=3,
                       date_col="date", city_col="city", user_col="user_id", sales_col="sales"):
    d = df.copy()
    d[date_col] = pd.to_datetime(d[date_col], errors="coerce")
    d = d.dropna(subset=[date_col, city_col, user_col, sales_col])
    d["month"] = d[date_col].dt.to_period("M").dt.to_timestamp()

    agg = (d.groupby([city_col, "month", user_col], observed=True)[sales_col]
             .sum().rename("sales_sum").reset_index())

    topk = (agg.sort_values([city_col, "month", "sales_sum"], ascending=[True, True, False])
               .groupby([city_col, "month"], observed=True).head(k)
               .reset_index(drop=True))
    return topk

def plot_topn_small_multiples(topk_df: pd.DataFrame,
                              city_col="city", user_col="user_id", month_col="month", sales_col="sales_sum",
                              max_months_per_city=None,    # 传 None 就画所有月份;否则限制最近 N 个月
                              out_dir="plots_task1"):
    os.makedirs(out_dir, exist_ok=True)
    images = []
    for city, sub in topk_df.groupby(city_col):
        months_sorted = sorted(pd.unique(sub[month_col]))
        months_show = months_sorted[-max_months_per_city:] if max_months_per_city else months_sorted
        for m in months_show:
            cur = sub[sub[month_col] == m].sort_values(sales_col, ascending=False)
            plt.figure(figsize=(6, 4))
            plt.bar(cur[user_col].astype(str), cur[sales_col].values)
            plt.title(f"{city} — {pd.Timestamp(m).strftime('%Y-%m')} Top{len(cur)} Users")
            plt.xlabel("user_id"); plt.ylabel("monthly sales")
            plt.xticks(rotation=30)
            plt.tight_layout()
            fname = os.path.join(out_dir, f"{city}_{pd.Timestamp(m).strftime('%Y-%m')}.png")
            plt.savefig(fname, dpi=150)
            plt.close()
            images.append(fname)
    return images

# --- 用法示例 ---
# topk = topn_monthly_users(df, k=3)
# images = plot_topn_small_multiples(topk, max_months_per_city=3, out_dir="plots_task1")
# print(images[:5])

要扩展为 Top5 只需改 k=5。想把“Top3 用户名”与数值标注到图上,可在绘图后循环 ax.bar_label


2) 滚动阈值异常检测:28 天滚动 99 分位(分城市)

思路

  1. 先把原始明细按城市×日聚合(resample('D').sum())。
  2. 对每个城市做 28 天窗口滚动 99% 分位阈值;当日值 > 阈值 → 记为异常。
  3. 一图一城:画日销售与阈值线,并用散点标异常。
# --- 依赖 ---
import os
import pandas as pd
import matplotlib.pyplot as plt

def rolling_threshold_anomaly(df: pd.DataFrame,
                              date_col="date", city_col="city", sales_col="sales",
                              freq="D", window=28, quantile=0.99):
    d = df.copy()
    d[date_col] = pd.to_datetime(d[date_col], errors="coerce")
    d = d.dropna(subset=[date_col, city_col, sales_col])

    # 聚合到日频
    daily = (d.set_index(date_col)
               .groupby(city_col)[sales_col]
               .resample(freq).sum()
               .rename("sales").reset_index())

    outs = []
    for city, sub in daily.groupby(city_col):
        sub = sub.sort_values(date_col).copy()
        thr = sub["sales"].rolling(window=window, min_periods=7).quantile(quantile)
        sub["threshold"] = thr
        sub["is_anomaly"] = (sub["sales"] > sub["threshold"]).astype(int)
        outs.append(sub)
    return pd.concat(outs, ignore_index=True)

def plot_city_anomaly_series(anom_df: pd.DataFrame,
                             city_col="city", date_col="date",
                             sales_col="sales", thr_col="threshold",
                             out_dir="plots_task2"):
    os.makedirs(out_dir, exist_ok=True)
    images = []
    for city, sub in anom_df.groupby(city_col):
        plt.figure(figsize=(8, 4))
        plt.plot(sub[date_col], sub[sales_col], label="sales")
        plt.plot(sub[date_col], sub[thr_col], linestyle="--", label="threshold")
        an = sub[sub["is_anomaly"] == 1]
        if not an.empty:
            plt.scatter(an[date_col], an[sales_col], marker="x", label="anomaly")
        plt.title(f"{city} — Daily Sales with Rolling 99th Percentile")
        plt.xlabel("date"); plt.ylabel("sales")
        plt.legend()
        plt.tight_layout()
        fname = os.path.join(out_dir, f"{city}_anomaly.png")
        plt.savefig(fname, dpi=150)
        plt.close()
        images.append(fname)
    return images

# --- 用法示例 ---
# anom = rolling_threshold_anomaly(df)
# imgs = plot_city_anomaly_series(anom, out_dir="plots_task2")

想更稳健:可以先做业务上的缺失补全/异常值平滑;或者用时间窗口 rolling('28D')(要求日期索引完整),但日频等间隔时 rolling(28)简单高效。


3) 带校验的导入器:CSV → 校验 → 清洗 → 分区落地(Parquet/CSV)+ 日志

目标:把“数据应该如何”的规则写成代码,早失败
规则示例(你可以加/改):

  • 必备列存在:user_id/city/date/sales
  • 日期可解析,且不晚于“现在”
  • city 落在允许集合(可选)
  • sales 非负
  • 重复主键(user_id, city, date)保留最大 sales
说明:优先写 Parquet;若运行环境没装 pyarrow/fastparquet,会自动回退 CSV 并在日志标注。
# --- 依赖 ---
import os, json
import numpy as np
import pandas as pd

def ingest_validate(csv_path_or_df,
                    out_dir="out/ingest",
                    valid_cities=None,
                    date_col="date", city_col="city", user_col="user_id", sales_col="sales"):
    os.makedirs(out_dir, exist_ok=True)

    # 既可传路径也可直接传 DataFrame
    if isinstance(csv_path_or_df, str):
        raw = pd.read_csv(csv_path_or_df)
    else:
        raw = csv_path_or_df.copy()

    required_cols = [user_col, city_col, date_col, sales_col]
    missing_cols = [c for c in required_cols if c not in raw.columns]
    errors = []
    if missing_cols:
        errors.append(f"缺少列: {missing_cols}")

    df = raw.copy()

    # 字符串清洗
    if city_col in df.columns:
        df[city_col] = df[city_col].astype(str).str.strip()
        df[city_col].replace({"": np.nan}, inplace=True)
    if user_col in df.columns:
        df[user_col] = df[user_col].astype(str).str.strip()

    # 解析日期
    if date_col in df.columns:
        df[date_col] = pd.to_datetime(df[date_col], errors="coerce")

    # 规则校验
    now = pd.Timestamp.now(tz="UTC").tz_localize(None)
    bad_mask = pd.Series(False, index=df.index)

    for col in required_cols:
        if col in df.columns:
            bad_mask |= df[col].isna()
    if date_col in df.columns:
        bad_mask |= df[date_col] > now
    if valid_cities is not None and city_col in df.columns:
        bad_mask |= ~df[city_col].isin(set(valid_cities))
    if sales_col in df.columns:
        with np.errstate(invalid="ignore"):
            bad_mask |= ~(df[sales_col] >= 0)

    bad_rows = df[bad_mask].copy()
    clean = df[~bad_mask].copy()

    # 去重(同 user_id-city-date 保留最大 sales)
    if not clean.empty:
        clean = (clean.sort_values(sales_col)
                       .drop_duplicates(subset=[user_col, city_col, date_col], keep="last"))
        clean[sales_col] = clean[sales_col].astype(float)

    # 落地:优先 Parquet;失败回退 CSV
    storage_format = "parquet"
    if not clean.empty:
        for city, sub in clean.groupby(city_col, observed=True):
            pdir = os.path.join(out_dir, f"city={city}")
            os.makedirs(pdir, exist_ok=True)
            try:
                sub.to_parquet(os.path.join(pdir, "part.parquet"), index=False)
            except Exception:
                storage_format = "csv"
                sub.to_csv(os.path.join(pdir, "part.csv"), index=False, encoding="utf-8")

    # 日志
    log = {
        "timestamp": pd.Timestamp.utcnow().isoformat(),
        "input_rows": int(len(df)),
        "clean_rows": int(len(clean)),
        "bad_rows": int(len(bad_rows)),
        "missing_cols": missing_cols,
        "errors": errors,
        "cities_in_clean": (sorted(map(str, clean[city_col].unique())) if not clean.empty else []),
        "storage_format": storage_format
    }
    with open(os.path.join(out_dir, "ingest_log.json"), "w", encoding="utf-8") as f:
        json.dump(log, f, ensure_ascii=False, indent=2)

    if not bad_rows.empty:
        bad_rows.to_csv(os.path.join(out_dir, "bad_rows.csv"), index=False, encoding="utf-8")

    return clean, log, bad_rows

# --- 用法示例 ---
# clean, log, bad = ingest_validate("raw.csv", out_dir="out/ingest", valid_cities={"Beijing","Shanghai","Shenzhen"})
# print(log)

落地目录示例(按城市分区):

out/ingest/
  ├─ city=Beijing/  part.parquet 或 part.csv
  ├─ city=Shanghai/ part.parquet 或 part.csv
  ├─ city=Shenzhen/ part.parquet 或 part.csv
  ├─ ingest_log.json
  └─ bad_rows.csv

4) 时间穿越安全的建模脚本:TimeSeriesSplit + Pipeline + GridSearchCV + 元数据

核心:特征只使用“过去”信息(shift + rolling 严格左闭右开/或先 shiftrolling),评估用 TimeSeriesSplit(前训后测)。

# --- 依赖 ---
import json, platform
import pandas as pd
from sklearn.model_selection import TimeSeriesSplit, GridSearchCV
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error
import joblib

def build_timeseries_features(daily_city_df: pd.DataFrame,
                              date_col="date", city_col="city", target_col="sales"):
    """
    输入:按 城市×日 聚合后的表(包含列 city, date, sales)
    输出:添加了严格“只看过去”的特征:
      - dow, month
      - lag1, lag7
      - ma7(先shift再滚动均值)
    """
    d = daily_city_df.copy()
    d[date_col] = pd.to_datetime(d[date_col], errors="coerce")
    d = d.dropna(subset=[date_col, city_col, target_col])
    d = d.sort_values([city_col, date_col])

    d["dow"] = d[date_col].dt.dayofweek
    d["month"] = d[date_col].dt.month

    def add_lags(g: pd.DataFrame):
        g = g.copy()
        g["lag1"] = g[target_col].shift(1)
        g["lag7"] = g[target_col].shift(7)
        g["ma7"]  = g[target_col].shift(1).rolling(7, min_periods=1).mean()
        return g

    # 未来版本的pandas会改变groupby-apply行为,这里显式 group_keys=False 更稳健
    d = d.groupby(city_col, observed=True, group_keys=False).apply(add_lags)
    d = d.dropna(subset=["lag1", "lag7", "ma7"])
    return d

def timeseries_modeling(df: pd.DataFrame,
                        date_col="date", city_col="city", target_col="sales",
                        model_out="model.joblib", meta_out="metadata.json"):
    """
    - 用 RandomForest 回归做示例(可替换为任意模型)
    - 完整 Pipeline:预处理(数值标准化 + 类别OneHot)+ 模型
    - GridSearchCV + TimeSeriesSplit(默认3折以加快示例)
    - 产出:最优模型 + 元数据json
    """
    data = build_timeseries_features(df, date_col=date_col, city_col=city_col, target_col=target_col)
    features = ["dow", "month", "lag1", "lag7", "ma7", city_col]
    X = data[features]
    y = data[target_col]

    num_cols = ["dow", "month", "lag1", "lag7", "ma7"]
    cat_cols = [city_col]

    pre = ColumnTransformer([
        ("num", Pipeline([("imp", SimpleImputer()), ("sc", StandardScaler())]), num_cols),
        ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols)
    ])

    pipe = Pipeline([
        ("pre", pre),
        ("model", RandomForestRegressor(random_state=42, n_estimators=100, max_depth=10))
    ])

    # 为了示例速度,网格很小;真实项目可扩展
    param_grid = {
        "model__n_estimators": [100],
        "model__max_depth": [10]
    }

    tscv = TimeSeriesSplit(n_splits=3)
    gs = GridSearchCV(pipe, param_grid, cv=tscv, n_jobs=-1,
                      scoring="neg_mean_absolute_error", refit=True)
    gs.fit(X, y)

    best = gs.best_estimator_
    joblib.dump(best, model_out)

    meta = {
        "timestamp": pd.Timestamp.utcnow().isoformat(),
        "python": platform.python_version(),
        "pandas": pd.__version__,
        "sklearn": ">=1.0",
        "seed": 42,
        "train_rows": int(len(X)),
        "features": features,
        "cv_splits": 3,
        "best_params": gs.best_params_,
        "cv_mae": float(-gs.best_score_)  # MAE越小越好
    }
    with open(meta_out, "w", encoding="utf-8") as f:
        json.dump(meta, f, ensure_ascii=False, indent=2)

    return best, meta

# --- 用法示例 ---
# 准备日频数据(如果原始是明细)
# daily_city = (df.groupby(["city", pd.Grouper(key="date", freq="D")], observed=True)["sales"]
#                 .sum().reset_index())
# best_model, meta = timeseries_modeling(daily_city,
#                                        model_out="models/model.joblib",
#                                        meta_out="models/metadata.json")
# print(meta)

为什么“时间穿越安全”

  • 所有时序特征都由滞后(shift)得到,ma7 也基于 shift(1) 后再 rolling(7) 计算,因此任何时刻 t 的特征只使用 t-1 及更早的观测。
  • 切分用 TimeSeriesSplit,保持“前训后测”的时间顺序,杜绝把未来信息泄露到训练。

迷你执行清单(把四题串起来)

  1. TopN

    topk = topn_monthly_users(df, k=3)
    plot_topn_small_multiples(topk, max_months_per_city=3, out_dir="plots_task1")
  2. 滚动异常

    anom = rolling_threshold_anomaly(df) 
    plot_city_anomaly_series(anom, out_dir="plots_task2")
  3. 导入器

    clean, log, bad = ingest_validate("raw.csv", out_dir="out/ingest",
                                      valid_cities={"Beijing","Shanghai","Shenzhen"})
  4. 建模

    daily_city = (df.groupby(["city", pd.Grouper(key="date", freq="D")], observed=True)["sales"]
                    .sum().reset_index())
    best_model, meta = timeseries_modeling(daily_city,
                                           model_out="models/model.joblib",
                                           meta_out="models/metadata.json")

参考文件:
链接:https://pan.quark.cn/s/ecdfdec26ca0?pwd=BJPc
提取码:BJPc

添加新评论