“读数据→清洗→分析→建模→可视化→导出”
1) 环境与常用包
# 最小依赖
pip install numpy pandas matplotlib scipy scikit-learn pyarrow fastparquet
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
pd.set_option("display.max_columns", 100)
pd.set_option("display.float_format", lambda x: f"{x:,.4f}")
惯例别名:np
/ pd
/ plt
文件格式:CSV/Excel/Parquet 最常用;Parquet 体积小、速度快。
2) NumPy 核心(向量化基石)
a = np.array([1, 2, 3], dtype=np.float64)
b = np.arange(0, 10, 2) # [0,2,4,6,8]
c = np.linspace(0, 1, 5) # 5等分
A = np.random.randn(3, 3)
a.mean(), a.std(), A.T, np.linalg.inv(A @ A.T) # 常用操作
要点:能用向量化就别用 for
;广播规则允许不同形状相加(匹配尾部维度)。
3) Pandas 入门(对象与选择)
df = pd.DataFrame({
"id": [1,2,3,4],
"city": ["Beijing","Shanghai","Beijing","Shenzhen"],
"sales": [100, 200, 140, np.nan],
"date": pd.to_datetime(["2024-01-01","2024-01-02","2024-01-02","2024-01-03"])
})
df.head() # 前5行
df.info() # 列类型与缺失
df.describe() # 数值统计
df["city"].value_counts(dropna=False)
df.loc[df["city"]=="Beijing", ["id","sales"]] # 行列选择
df.query("sales >= 150 and city != 'Beijing'") # 更可读的条件
易错:布尔索引尽量用 loc
,链式赋值(df[df.a>0].b=1
)会触发警告,改用 loc
。
4) 缺失值与类型
df.isna().mean() # 缺失率
df["sales"] = df["sales"].fillna(df["sales"].median())
df["city"] = df["city"].astype("category") # 分类类型省内存
df["date"] = pd.to_datetime(df["date"], errors="coerce")
5) 分组聚合(groupby)
g = df.groupby("city", observed=True) # 分类列建议 observed=True
out = g.agg(
sales_mean=("sales","mean"),
sales_sum=("sales","sum"),
n=("id","count")
).reset_index()
技巧:多指标一次性聚合;transform
回写到原 DataFrame。
df["sales_z"] = (df["sales"] - g["sales"].transform("mean")) / g["sales"].transform("std")
6) 透视表与交叉表
pivot = pd.pivot_table(
df, values="sales", index="date", columns="city", aggfunc="sum", fill_value=0
)
xtab = pd.crosstab(df["city"], df["date"].dt.month, normalize="index")
7) 合并/拼接
# 横向主键合并(SQL: JOIN)
left = df[["id","city"]]
right = df[["id","sales"]]
pd.merge(left, right, on="id", how="left")
# 纵向堆叠
pd.concat([df.iloc[:2], df.iloc[2:]], axis=0, ignore_index=True)
易错:合并前确保键无重复或明确期望;必要时去重 drop_duplicates()
。
8) 时间序列与窗口
# 索引为时间
ts = df.set_index("date").sort_index()
# 频率重采样
ts_day = ts.resample("D")["sales"].sum().fillna(0)
# 滚动窗口
ts["sales_roll7"] = ts["sales"].rolling(window=7, min_periods=1).mean()
# 日期特征
df["dow"] = df["date"].dt.dayofweek
df["week"] = df["date"].dt.isocalendar().week.astype(int)
9) 字符串与正则
s = pd.Series(["A-001","B-002", None, "C-010"])
s.str.contains(r"^\w-\d{3}$", na=False)
s.str.extract(r"(?P<prefix>\w)-(?P<num>\d+)")
df["city_flag"] = df["city"].str.lower().str.contains("bei", na=False)
10) 快速可视化(Matplotlib)
ts_day.plot() # 折线
plt.title("Daily Sales"); plt.xlabel("Date"); plt.ylabel("Sales"); plt.show()
df.boxplot(column="sales", by="city"); plt.suptitle(""); plt.show()
df["sales"].hist(bins=20); plt.show()
习惯:先画→再标注→最后 show()
;分类变量建议先排序再画条形图。
11) I/O(读写数据)
# CSV
pd.read_csv("data.csv", dtype={"id":"int64"}, parse_dates=["date"])
df.to_csv("out.csv", index=False)
# Excel
pd.read_excel("book.xlsx", sheet_name="Sheet1")
df.to_excel("out.xlsx", index=False)
# Parquet(高速 & 小文件)
df.to_parquet("out.parquet", index=False)
# SQL(以 SQLite 为例)
import sqlite3
con = sqlite3.connect("db.sqlite")
pd.read_sql("SELECT * FROM sales WHERE date >= '2024-01-01'", con)
df.to_sql("sales_clean", con, if_exists="replace", index=False)
con.close()
12) 性能与内存小抄
- 类型优化:
category
给高重复字符串;Int8/16/32
给小范围整数;Float32
足够时用它。 - 选择性读取:
usecols=[]
、dtype=dict
、nrows=...
、chunksize=...
流式处理。 - 避免 apply:优先
str
/dt
/where
/np.select
向量化;必要时numba
或cython
(进阶)。 - 去复制:链式操作易产生拷贝;用赋值连写
df.assign(...)
。
# 向量化替代 apply
condlist = [df["sales"]>=200, df["sales"]>=150]
choicelist = ["A","B"]
df["grade"] = np.select(condlist, choicelist, default="C")
13) 统计与检验(SciPy)
from scipy import stats
# 正态假设下均值95%置信区间
mean, sem = df["sales"].mean(), stats.sem(df["sales"], nan_policy="omit")
ci = stats.t.interval(0.95, df=df["sales"].notna().sum()-1, loc=mean, scale=sem)
# 两独立样本 t 检验
grp = df.dropna().groupby("city")["sales"]
stats.ttest_ind(grp.get_group("Beijing"), grp.get_group("Shanghai"), equal_var=False)
14) 机器学习微流程(scikit-learn)
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_absolute_error
X = df[["city","dow","week"]]
y = df["sales"]
num_cols = ["dow","week"]
cat_cols = ["city"]
pre = ColumnTransformer([
("num", StandardScaler(), num_cols),
("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols)
])
pipe = Pipeline([
("pre", pre),
("model", LinearRegression())
])
Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.2, random_state=42)
pipe.fit(Xtr, ytr)
pred = pipe.predict(Xte)
print("MAE:", mean_absolute_error(yte, pred), "R2:", r2_score(yte, pred))
scores = cross_val_score(pipe, X, y, cv=5, scoring="neg_mean_absolute_error")
要点:用 Pipeline
把预处理与模型绑在一起,避免数据泄露。
15) EDA 一键模板(拷走就用)
def quick_eda(df: pd.DataFrame, top_k=10):
print("形状:", df.shape)
print("\n--- 类型信息 ---"); print(df.dtypes)
print("\n--- 缺失率 ---"); print(df.isna().mean().sort_values(ascending=False).head(top_k))
print("\n--- 数值描述 ---"); print(df.describe().T)
obj_cols = df.select_dtypes(include=["object","category"]).columns
if len(obj_cols):
print("\n--- 类别基数 ---")
print(df[obj_cols].nunique().sort_values(ascending=False).head(top_k))
16) 项目结构(分析脚手架)
analysis_project/
├─ data/ # 原始/中间/结果
├─ notebooks/ # 探索式分析
├─ src/
│ ├─ io.py # 读写封装
│ ├─ preprocess.py # 清洗与特征
│ ├─ eda.py # 可视化/报告
│ └─ model.py # 训练/评估
├─ reports/ # 导出的图表/文档
└─ requirements.txt
17) 常见坑与排错清单
- 链式赋值警告:改用
loc
明确赋值。 - 时区错位:统一到
UTC
或业务时区;跨日聚合先对齐时区再resample
。 - 字符串空白:先
str.strip()
,再lower()
,再匹配。 - 重复主键:合并前
df.duplicated(subset=[...]).sum()
。 - 缺失的“0” vs 真缺失:销售/数量型字段,缺失可能代表 0,要与业务确认。
18) 速记表(操作→方法)
任务 | 代码 |
---|---|
读 CSV(部分列 & 类型) | pd.read_csv("f.csv", usecols=["a","b"], dtype={"a":"Int32"}) |
多列条件过滤 | df.query("a>0 and b=='x'") |
多指标聚合 | df.groupby("k").agg(x=("v","sum"), y=("v","mean")) |
透视表 | pd.pivot_table(df, values="v", index="i", columns="j", aggfunc="sum") |
滚动平均 | df["ra7"] = df["v"].rolling(7).mean() |
正则抽取 | df["id"].str.extract(r"(\d{3})") |
合并 | pd.merge(a, b, on="key", how="left") |
导出 Parquet | df.to_parquet("x.parquet", index=False) |
继续把这本手册“肌肉拉满”。下面是进阶篇:更强的分组、时间序列、评估与生产化。依旧是“拿来就用”的代码块 + 易错点提醒。
19) 数据质量与验证(让脏数据无处遁形)
# 规则式校验:唯一性、取值范围、外键匹配
assert df["id"].is_unique, "id 存在重复"
assert df["sales"].ge(0).all(), "sales 存在负数"
# 约束列域:枚举、模式
valid_cities = {"Beijing","Shanghai","Shenzhen"}
bad = ~df["city"].isin(valid_cities)
print("非法城市:", df.loc[bad, "city"].unique())
# 统计异常:Z 分数/IQR
x = df["sales"].dropna()
z = (x - x.mean())/x.std()
outliers = x[abs(z) > 3]
要点:把业务规则写成断言,把断言失败当作“早失败”。长久维护时,规则>肉眼。
20) 进阶分组与窗口(SQL 风格“开窗”)
# 分组 TopN(每个 city 取销售额前2)
topn = (df.sort_values(["city","sales"], ascending=[True, False])
.groupby("city", observed=True).head(2))
# 分组排名/百分位
df["rank_in_city"] = df.groupby("city", observed=True)["sales"].rank("dense", ascending=False)
df["pct_in_city"] = df.groupby("city", observed=True)["sales"].rank(pct=True)
# 类窗口:累计和/移动相关
df = df.sort_values("date")
df["cum_sales"] = df.groupby("city", observed=True)["sales"].cumsum()
df["roll_corr"] = (df.set_index("date").groupby("city")["sales"]
.rolling(window=14, min_periods=5).corr(other=df.set_index("date").groupby("city")["dow"])
.reset_index(level=0, drop=True))
21) 表形变:宽↔长、分层列、时序 Grouper
# 宽转长(melt:列名→行值)
long = df.melt(id_vars=["id","date"], value_vars=["sales"], var_name="metric", value_name="value")
# 分层列 pivot(多统计指标列)
agg = (df.groupby(["date","city"], observed=True)
.agg(sales_mean=("sales","mean"), sales_sum=("sales","sum")))
pivot = agg.reset_index().pivot(index="date", columns="city")
pivot.columns = ['_'.join(col).strip() for col in pivot.columns.to_flat_index()] # 展平多层列
# 按时间分组(等价 SQL: GROUP BY DATE_TRUNC)
ts_agg = (df.set_index("date")
.groupby([pd.Grouper(freq="W"), "city"])["sales"].sum().unstack(fill_value=0))
22) 时间序列:分解与基线预测
# 季节分解(加性模型)
from statsmodels.tsa.seasonal import seasonal_decompose
y = df.set_index("date").sort_index()["sales"].asfreq("D").fillna(0)
res = seasonal_decompose(y, model="additive", period=7)
trend, seasonal, resid = res.trend, res.seasonal, res.resid
# 简易 ARIMA(稳妥起步:差分一次)
from statsmodels.tsa.arima.model import ARIMA
model = ARIMA(y, order=(1,1,1)) # p,d,q
fit = model.fit()
pred = fit.get_forecast(steps=14)
fc = pred.predicted_mean
ci = pred.conf_int()
易错:先 asfreq
统一频率;缺失用业务合理方式填补;周/月周期不要生搬硬套 7/30,按数据频次来。
23) 异常检测(统计 + 机器学习)
# 统计法:IQR(稳健)
q1, q3 = x.quantile([0.25, 0.75])
iqr = q3 - q1
out = df[(df["sales"] < q1 - 1.5*iqr) | (df["sales"] > q3 + 1.5*iqr)]
# 基于树的隔离森林
from sklearn.ensemble import IsolationForest
iso = IsolationForest(random_state=42, contamination=0.02)
df["is_outlier"] = iso.fit_predict(df[["sales"]]).astype(int) == -1
提示:异常是“业务上下文事件”,不是永远错。标注并留评注释,方便下游决策。
24) 特征工程(不踩“泄露”坑)
# 离散化:等距/分位
df["sales_bin"] = pd.cut(df["sales"], bins=[0,100,150,1e9], labels=["low","mid","high"])
df["sales_q"] = pd.qcut(df["sales"], 4, labels=False)
# 频次编码(高基数类别的稳妥底线)
freq = df["city"].value_counts(normalize=True)
df["city_freq"] = df["city"].map(freq)
# 交互项/多项式
from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=2, include_bias=False)
X_poly = poly.fit_transform(df[["dow","week"]])
数据泄露:任何使用全量数据统计(均值/编码表)都应在训练集内拟合,再对验证/测试 transform
。
25) 分类不平衡:权重、阈值、度量
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, precision_recall_curve
clf = LogisticRegression(class_weight="balanced", max_iter=1000)
clf.fit(Xtr, ytr)
proba = clf.predict_proba(Xte)[:,1]
# 根据业务设阈(PR 曲线比 ROC 更敏感)
prec, rec, thr = precision_recall_curve(yte, proba)
best_idx = (2*prec*rec/(prec+rec+1e-9)).argmax() # F1 最大点
best_thr = thr[best_idx]
ypred = (proba >= best_thr).astype(int)
print("AUC:", roc_auc_score(yte, proba))
print(classification_report(yte, ypred, digits=4))
要点:报告准确率之前,先报样本占比;对极度稀少类别,精确率/召回率、PR-AUC 更有意义。
26) Pipeline × 搜参:可复现的一键训练
from sklearn.model_selection import GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
num_cols = ["dow","week","sales"] # 示例
pre = ColumnTransformer([
("num", Pipeline([("imp", SimpleImputer()), ("sc", StandardScaler())]), num_cols),
("cat", OneHotEncoder(handle_unknown="ignore"), ["city"])
])
pipe = Pipeline([
("pre", pre),
("model", RandomForestRegressor(random_state=42))
])
param = {
"model__n_estimators": [200, 400],
"model__max_depth": [None, 10, 20]
}
gs = GridSearchCV(pipe, param, cv=3, n_jobs=-1, scoring="neg_mean_absolute_error")
gs.fit(Xtr, ytr)
best = gs.best_estimator_
习惯:所有预处理都进 Pipeline
,只在训练集 fit
,其余 transform
。
27) 解释性:Permutation Importance & 局部解释
from sklearn.inspection import permutation_importance
r = permutation_importance(best, Xte, yte, n_repeats=5, random_state=42)
imp = pd.Series(r.importances_mean, index=best.named_steps["pre"].get_feature_names_out()).sort_values(ascending=False)
# 局部解释:敏捷起步用对比替换法(What-if)
row = Xte.iloc[[0]]
for col in ["dow","week"]:
tmp = row.copy()
tmp[col] = tmp[col] + 1
print(col, best.predict(tmp) - best.predict(row))
说明:Permutation 更稳健但成本高;想要更细颗粒可用 SHAP(需额外安装)。
28) 可视化增强(Matplotlib 不丑的小技巧)
import matplotlib.pyplot as plt
plt.figure(figsize=(8,4))
ts_day.plot()
plt.title("Daily Sales"); plt.xlabel("Date"); plt.ylabel("Sales")
plt.tight_layout(); plt.show()
# 分类条形图:排序 + 注数字
order = df.groupby("city")["sales"].mean().sort_values().index
ax = df.groupby("city")["sales"].mean().loc[order].plot(kind="bar")
for c in ax.containers:
ax.bar_label(c, fmt="%.1f")
plt.xticks(rotation=0); plt.tight_layout(); plt.show()
审美三件套:排序、网格线淡化、数值标注。必要时“小倍数图”(facet)分城市画多张。
29) 海量数据策略(内存省着点用)
# 分块读取 + 流式聚合
tot = 0
for chunk in pd.read_csv("huge.csv", chunksize=1_000_000, usecols=["date","sales"]):
chunk["date"] = pd.to_datetime(chunk["date"])
tot += chunk["sales"].sum()
print("Total sales:", tot)
# 类型降维
df = pd.read_parquet("x.parquet") # 优先 Parquet
df = df.convert_dtypes() # 自动更合理 dtype
df["city"] = df["city"].astype("category")
思路:能“提早聚合”就不要先全读;能“落地中间结果”(Parquet/SQLite)就不要全在内存玩堆叠。
30) I/O 进阶:批量、压缩、分区
from glob import glob
files = sorted(glob("data/2024-*.csv.gz"))
df = pd.concat([pd.read_csv(f, compression="gzip") for f in files], ignore_index=True)
# 分区写 Parquet(按城市)
for k, g in df.groupby("city", observed=True):
g.to_parquet(f"out/city={k}/part.parquet", index=False)
技巧:目录分区(city=Beijing/
)→ 后续按需读取更快;压缩选 gzip/zstd,权衡体积与速度。
31) “方法链”写法:可读、可插拔
res = (
df
.assign(date=lambda d: pd.to_datetime(d["date"]))
.query("sales >= 100")
.pipe(lambda d: d[d["city"].notna()])
.groupby(["city", pd.Grouper(key="date", freq="M")], observed=True)["sales"].sum()
.reset_index()
.sort_values(["city","date"])
)
心法:assign
命名中间变量、pipe
接入任意自定义函数,替代临时变量污染命名空间。
32) 评估与分层拆分(防止时间穿越)
# 时间序列拆分(不要随机切)
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
for tr, te in tscv.split(y):
# 用 tr 训练,用 te 验证
pass
# 分层抽样(分类任务保持比例)
from sklearn.model_selection import train_test_split
Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
结论:时序任务尽量用前训后测;避免把未来信息泄露给过去。
33) 产出物:模型保存、版本与可复现
import joblib, json, platform, numpy as np, pandas as pd
joblib.dump(best, "models/model.joblib")
meta = {
"python": platform.python_version(),
"pandas": pd.__version__,
"numpy": np.__version__,
"seed": 42,
"train_shape": list(Xtr.shape),
"timestamp": pd.Timestamp.utcnow().isoformat()
}
json.dump(meta, open("models/metadata.json","w"), ensure_ascii=False, indent=2)
要点:保存模型 + 版本 + 训练数据形状 + 随机种子。改了数据/特征,即是新版本。
34) 轻量 CLI:一条命令跑全流程
# save as run.py
import argparse, pandas as pd
from joblib import dump, load
def train(in_path, out_model):
df = pd.read_parquet(in_path)
# ... 清洗/特征/建模(略) ...
dump(best, out_model)
def predict(in_path, model_path, out_path):
df = pd.read_parquet(in_path)
m = load(model_path)
df["pred"] = m.predict(df[feat_cols])
df.to_parquet(out_path, index=False)
if __name__ == "__main__":
ap = argparse.ArgumentParser()
sub = ap.add_subparsers(dest="cmd", required=True)
t = sub.add_parser("train"); t.add_argument("--in"); t.add_argument("--model")
p = sub.add_parser("predict"); p.add_argument("--in"); p.add_argument("--model"); p.add_argument("--out")
args = ap.parse_args()
if args.cmd=="train": train(args.__dict__["in"], args.model)
else: predict(args.__dict__["in"], args.model, args.out)
收益:把“手搓脚本”变“可复用命令”,利于自动化/调度。
35) 单元测试与数据不变量(长期可靠)
# tests/test_integrity.py
import pandas as pd
def test_city_domain():
df = pd.read_parquet("data/clean.parquet")
assert set(df["city"].unique()) <= {"Beijing","Shanghai","Shenzhen"}
def test_no_future_dates():
df = pd.read_parquet("data/clean.parquet")
assert (df["date"] <= pd.Timestamp.now(tz="UTC")).all()
原则:把“数据应当如何”的共识写成测试,CI 中跑,数据变坏第一时间报警。
36) “20 条一行流”速写(收藏夹)
# 1. 逆透视:列→行
pd.melt(df, id_vars=["id"], var_name="metric", value_name="value")
# 2. 合并近似时间(asof join)
pd.merge_asof(a.sort_values("t"), b.sort_values("t"), on="t", by="key", direction="nearest", tolerance=pd.Timedelta("5min"))
# 3. 去重保留最大
df.sort_values("sales").drop_duplicates(subset=["id"], keep="last")
# 4. 组内归一化
df["z"] = df.groupby("city")["sales"].transform(lambda s: (s - s.mean())/s.std())
# 5. 复杂条件赋值
df["grade"] = np.select([df.sales>=200, df.sales>=150], ["A","B"], default="C")
# 6. 高频字符串清理
df["sku"] = (df["sku"].str.strip().str.upper().str.replace(r"[^A-Z0-9\-]","", regex=True))
# 7. 读多表 Excel
xlsx = pd.ExcelFile("book.xlsx"); {name: xlsx.parse(name) for name in xlsx.sheet_names}
# 8. 多层索引聚合后展平列
agg.columns = ['_'.join(map(str, c)) for c in agg.columns.to_flat_index()]
# 9. 有序分类排序
df["grade"] = pd.Categorical(df["grade"], ["C","B","A"], ordered=True); df.sort_values("grade")
# 10. 滚动分位
df["q90_14d"] = df.set_index("date")["sales"].rolling("14D").quantile(0.9).reset_index(drop=True)
# 11. 组内 TopK 的其它列
idx = df.groupby("city")["sales"].nlargest(3).reset_index().set_index("level_1").index
df.loc[idx, :]
# 12. 采样可重复
df.sample(n=1000, random_state=42)
# 13. 多条件 merge 键
pd.merge(a, b, left_on=["k1","k2"], right_on=["k1","k2"], how="left")
# 14. 比例标准化到 1
w = df["weight"].clip(lower=0); w = w / w.sum()
# 15. 时间戳到周起始
df["week_start"] = (df["date"] - pd.to_timedelta(df["date"].dt.dayofweek, unit="D")).dt.normalize()
# 16. 多列字符串统一清洗
cols = ["name","city"]; df[cols] = df[cols].apply(lambda s: s.str.strip().str.lower())
# 17. 稀疏矩阵(高维 one-hot)
from sklearn.feature_extraction.text import CountVectorizer
X = CountVectorizer(min_df=5).fit_transform(texts)
# 18. 训练集/测试集时间分割
cut = "2024-09-01"; tr = df[df.date < cut]; te = df[df.date >= cut]
# 19. 合并报错定位
try:
pd.merge(a,b,on="id",validate="one_to_one")
except Exception as e:
print("关系不满足一对一:", e)
# 20. 内存测算
(df.memory_usage(deep=True).sum()/1024**2)
练习挑战(边做边长)
- 复刻“TopN + 分层时间聚合”:每城市每月销量 Top3 的用户,并画小倍数图。
- 做一版“滚动阈值异常检测”:对每城市做 28 天滚动 99 分位,超过即标记。
- 写“带校验的导入器”:读取外部 CSV→校验→清洗→落地 Parquet→产出日志(错误与统计)。
- 一个“时间穿越安全”的建模脚本:
TimeSeriesSplit
+Pipeline
+GridSearchCV
+ 版本元数据。