🐍 Python 数据分析手册

“读数据→清洗→分析→建模→可视化→导出”

1) 环境与常用包

# 最小依赖
pip install numpy pandas matplotlib scipy scikit-learn pyarrow fastparquet
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

pd.set_option("display.max_columns", 100)
pd.set_option("display.float_format", lambda x: f"{x:,.4f}")

惯例别名np / pd / plt
文件格式:CSV/Excel/Parquet 最常用;Parquet 体积小、速度快。


2) NumPy 核心(向量化基石)

a = np.array([1, 2, 3], dtype=np.float64)
b = np.arange(0, 10, 2)            # [0,2,4,6,8]
c = np.linspace(0, 1, 5)           # 5等分
A = np.random.randn(3, 3)

a.mean(), a.std(), A.T, np.linalg.inv(A @ A.T)  # 常用操作

要点:能用向量化就别用 for;广播规则允许不同形状相加(匹配尾部维度)。


3) Pandas 入门(对象与选择)

df = pd.DataFrame({
    "id": [1,2,3,4],
    "city": ["Beijing","Shanghai","Beijing","Shenzhen"],
    "sales": [100, 200, 140, np.nan],
    "date": pd.to_datetime(["2024-01-01","2024-01-02","2024-01-02","2024-01-03"])
})

df.head()           # 前5行
df.info()           # 列类型与缺失
df.describe()       # 数值统计
df["city"].value_counts(dropna=False)
df.loc[df["city"]=="Beijing", ["id","sales"]]   # 行列选择
df.query("sales >= 150 and city != 'Beijing'")  # 更可读的条件

易错:布尔索引尽量用 loc,链式赋值(df[df.a>0].b=1)会触发警告,改用 loc


4) 缺失值与类型

df.isna().mean()                     # 缺失率
df["sales"] = df["sales"].fillna(df["sales"].median())
df["city"] = df["city"].astype("category")      # 分类类型省内存
df["date"] = pd.to_datetime(df["date"], errors="coerce")

5) 分组聚合(groupby)

g = df.groupby("city", observed=True)           # 分类列建议 observed=True
out = g.agg(
    sales_mean=("sales","mean"),
    sales_sum=("sales","sum"),
    n=("id","count")
).reset_index()

技巧:多指标一次性聚合;transform 回写到原 DataFrame。

df["sales_z"] = (df["sales"] - g["sales"].transform("mean")) / g["sales"].transform("std")

6) 透视表与交叉表

pivot = pd.pivot_table(
    df, values="sales", index="date", columns="city", aggfunc="sum", fill_value=0
)
xtab = pd.crosstab(df["city"], df["date"].dt.month, normalize="index")

7) 合并/拼接

# 横向主键合并(SQL: JOIN)
left  = df[["id","city"]]
right = df[["id","sales"]]
pd.merge(left, right, on="id", how="left")

# 纵向堆叠
pd.concat([df.iloc[:2], df.iloc[2:]], axis=0, ignore_index=True)

易错:合并前确保键无重复或明确期望;必要时去重 drop_duplicates()


8) 时间序列与窗口

# 索引为时间
ts = df.set_index("date").sort_index()

# 频率重采样
ts_day = ts.resample("D")["sales"].sum().fillna(0)

# 滚动窗口
ts["sales_roll7"] = ts["sales"].rolling(window=7, min_periods=1).mean()

# 日期特征
df["dow"]  = df["date"].dt.dayofweek
df["week"] = df["date"].dt.isocalendar().week.astype(int)

9) 字符串与正则

s = pd.Series(["A-001","B-002", None, "C-010"])
s.str.contains(r"^\w-\d{3}$", na=False)
s.str.extract(r"(?P<prefix>\w)-(?P<num>\d+)")
df["city_flag"] = df["city"].str.lower().str.contains("bei", na=False)

10) 快速可视化(Matplotlib)

ts_day.plot()                 # 折线
plt.title("Daily Sales"); plt.xlabel("Date"); plt.ylabel("Sales"); plt.show()

df.boxplot(column="sales", by="city"); plt.suptitle(""); plt.show()
df["sales"].hist(bins=20); plt.show()

习惯:先画→再标注→最后 show();分类变量建议先排序再画条形图。


11) I/O(读写数据)

# CSV
pd.read_csv("data.csv", dtype={"id":"int64"}, parse_dates=["date"])
df.to_csv("out.csv", index=False)

# Excel
pd.read_excel("book.xlsx", sheet_name="Sheet1")
df.to_excel("out.xlsx", index=False)

# Parquet(高速 & 小文件)
df.to_parquet("out.parquet", index=False)

# SQL(以 SQLite 为例)
import sqlite3
con = sqlite3.connect("db.sqlite")
pd.read_sql("SELECT * FROM sales WHERE date >= '2024-01-01'", con)
df.to_sql("sales_clean", con, if_exists="replace", index=False)
con.close()

12) 性能与内存小抄

  • 类型优化category 给高重复字符串;Int8/16/32 给小范围整数;Float32 足够时用它。
  • 选择性读取usecols=[]dtype=dictnrows=...chunksize=... 流式处理。
  • 避免 apply:优先 str/dt/where/np.select 向量化;必要时 numbacython(进阶)。
  • 去复制:链式操作易产生拷贝;用赋值连写 df.assign(...)
# 向量化替代 apply
condlist = [df["sales"]>=200, df["sales"]>=150]
choicelist = ["A","B"]
df["grade"] = np.select(condlist, choicelist, default="C")

13) 统计与检验(SciPy)

from scipy import stats

# 正态假设下均值95%置信区间
mean, sem = df["sales"].mean(), stats.sem(df["sales"], nan_policy="omit")
ci = stats.t.interval(0.95, df=df["sales"].notna().sum()-1, loc=mean, scale=sem)

# 两独立样本 t 检验
grp = df.dropna().groupby("city")["sales"]
stats.ttest_ind(grp.get_group("Beijing"), grp.get_group("Shanghai"), equal_var=False)

14) 机器学习微流程(scikit-learn)

from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LinearRegression
from sklearn.metrics import r2_score, mean_absolute_error

X = df[["city","dow","week"]]
y = df["sales"]

num_cols = ["dow","week"]
cat_cols = ["city"]

pre = ColumnTransformer([
    ("num", StandardScaler(), num_cols),
    ("cat", OneHotEncoder(handle_unknown="ignore"), cat_cols)
])

pipe = Pipeline([
    ("pre", pre),
    ("model", LinearRegression())
])

Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.2, random_state=42)
pipe.fit(Xtr, ytr)
pred = pipe.predict(Xte)

print("MAE:", mean_absolute_error(yte, pred), "R2:", r2_score(yte, pred))
scores = cross_val_score(pipe, X, y, cv=5, scoring="neg_mean_absolute_error")

要点:用 Pipeline 把预处理与模型绑在一起,避免数据泄露。


15) EDA 一键模板(拷走就用)

def quick_eda(df: pd.DataFrame, top_k=10):
    print("形状:", df.shape)
    print("\n--- 类型信息 ---"); print(df.dtypes)
    print("\n--- 缺失率 ---"); print(df.isna().mean().sort_values(ascending=False).head(top_k))
    print("\n--- 数值描述 ---"); print(df.describe().T)
    obj_cols = df.select_dtypes(include=["object","category"]).columns
    if len(obj_cols):
        print("\n--- 类别基数 ---")
        print(df[obj_cols].nunique().sort_values(ascending=False).head(top_k))

16) 项目结构(分析脚手架)

analysis_project/
  ├─ data/                # 原始/中间/结果
  ├─ notebooks/           # 探索式分析
  ├─ src/
  │   ├─ io.py            # 读写封装
  │   ├─ preprocess.py    # 清洗与特征
  │   ├─ eda.py           # 可视化/报告
  │   └─ model.py         # 训练/评估
  ├─ reports/             # 导出的图表/文档
  └─ requirements.txt

17) 常见坑与排错清单

  • 链式赋值警告:改用 loc 明确赋值。
  • 时区错位:统一到 UTC 或业务时区;跨日聚合先对齐时区再 resample
  • 字符串空白:先 str.strip(),再 lower(),再匹配。
  • 重复主键:合并前 df.duplicated(subset=[...]).sum()
  • 缺失的“0” vs 真缺失:销售/数量型字段,缺失可能代表 0,要与业务确认。

18) 速记表(操作→方法)

任务代码
读 CSV(部分列 & 类型)pd.read_csv("f.csv", usecols=["a","b"], dtype={"a":"Int32"})
多列条件过滤df.query("a>0 and b=='x'")
多指标聚合df.groupby("k").agg(x=("v","sum"), y=("v","mean"))
透视表pd.pivot_table(df, values="v", index="i", columns="j", aggfunc="sum")
滚动平均df["ra7"] = df["v"].rolling(7).mean()
正则抽取df["id"].str.extract(r"(\d{3})")
合并pd.merge(a, b, on="key", how="left")
导出 Parquetdf.to_parquet("x.parquet", index=False)

继续把这本手册“肌肉拉满”。下面是进阶篇:更强的分组、时间序列、评估与生产化。依旧是“拿来就用”的代码块 + 易错点提醒。


19) 数据质量与验证(让脏数据无处遁形)

# 规则式校验:唯一性、取值范围、外键匹配
assert df["id"].is_unique, "id 存在重复"
assert df["sales"].ge(0).all(), "sales 存在负数"

# 约束列域:枚举、模式
valid_cities = {"Beijing","Shanghai","Shenzhen"}
bad = ~df["city"].isin(valid_cities)
print("非法城市:", df.loc[bad, "city"].unique())

# 统计异常:Z 分数/IQR
x = df["sales"].dropna()
z = (x - x.mean())/x.std()
outliers = x[abs(z) > 3]

要点:把业务规则写成断言,把断言失败当作“早失败”。长久维护时,规则>肉眼。


20) 进阶分组与窗口(SQL 风格“开窗”)

# 分组 TopN(每个 city 取销售额前2)
topn = (df.sort_values(["city","sales"], ascending=[True, False])
          .groupby("city", observed=True).head(2))

# 分组排名/百分位
df["rank_in_city"] = df.groupby("city", observed=True)["sales"].rank("dense", ascending=False)
df["pct_in_city"]  = df.groupby("city", observed=True)["sales"].rank(pct=True)

# 类窗口:累计和/移动相关
df = df.sort_values("date")
df["cum_sales"] = df.groupby("city", observed=True)["sales"].cumsum()
df["roll_corr"] = (df.set_index("date").groupby("city")["sales"]
                     .rolling(window=14, min_periods=5).corr(other=df.set_index("date").groupby("city")["dow"])
                     .reset_index(level=0, drop=True))

21) 表形变:宽↔长、分层列、时序 Grouper

# 宽转长(melt:列名→行值)
long = df.melt(id_vars=["id","date"], value_vars=["sales"], var_name="metric", value_name="value")

# 分层列 pivot(多统计指标列)
agg = (df.groupby(["date","city"], observed=True)
         .agg(sales_mean=("sales","mean"), sales_sum=("sales","sum")))
pivot = agg.reset_index().pivot(index="date", columns="city")
pivot.columns = ['_'.join(col).strip() for col in pivot.columns.to_flat_index()]  # 展平多层列

# 按时间分组(等价 SQL: GROUP BY DATE_TRUNC)
ts_agg = (df.set_index("date")
            .groupby([pd.Grouper(freq="W"), "city"])["sales"].sum().unstack(fill_value=0))

22) 时间序列:分解与基线预测

# 季节分解(加性模型)
from statsmodels.tsa.seasonal import seasonal_decompose
y = df.set_index("date").sort_index()["sales"].asfreq("D").fillna(0)
res = seasonal_decompose(y, model="additive", period=7)
trend, seasonal, resid = res.trend, res.seasonal, res.resid

# 简易 ARIMA(稳妥起步:差分一次)
from statsmodels.tsa.arima.model import ARIMA
model = ARIMA(y, order=(1,1,1))     # p,d,q
fit = model.fit()
pred = fit.get_forecast(steps=14)
fc = pred.predicted_mean
ci = pred.conf_int()

易错:先 asfreq 统一频率;缺失用业务合理方式填补;周/月周期不要生搬硬套 7/30,按数据频次来。


23) 异常检测(统计 + 机器学习)

# 统计法:IQR(稳健)
q1, q3 = x.quantile([0.25, 0.75])
iqr = q3 - q1
out = df[(df["sales"] < q1 - 1.5*iqr) | (df["sales"] > q3 + 1.5*iqr)]

# 基于树的隔离森林
from sklearn.ensemble import IsolationForest
iso = IsolationForest(random_state=42, contamination=0.02)
df["is_outlier"] = iso.fit_predict(df[["sales"]]).astype(int) == -1

提示:异常是“业务上下文事件”,不是永远错。标注并留评注释,方便下游决策。


24) 特征工程(不踩“泄露”坑)

# 离散化:等距/分位
df["sales_bin"] = pd.cut(df["sales"], bins=[0,100,150,1e9], labels=["low","mid","high"])
df["sales_q"]   = pd.qcut(df["sales"], 4, labels=False)

# 频次编码(高基数类别的稳妥底线)
freq = df["city"].value_counts(normalize=True)
df["city_freq"] = df["city"].map(freq)

# 交互项/多项式
from sklearn.preprocessing import PolynomialFeatures
poly = PolynomialFeatures(degree=2, include_bias=False)
X_poly = poly.fit_transform(df[["dow","week"]])

数据泄露:任何使用全量数据统计(均值/编码表)都应在训练集内拟合,再对验证/测试 transform


25) 分类不平衡:权重、阈值、度量

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, precision_recall_curve

clf = LogisticRegression(class_weight="balanced", max_iter=1000)
clf.fit(Xtr, ytr)
proba = clf.predict_proba(Xte)[:,1]

# 根据业务设阈(PR 曲线比 ROC 更敏感)
prec, rec, thr = precision_recall_curve(yte, proba)
best_idx = (2*prec*rec/(prec+rec+1e-9)).argmax()   # F1 最大点
best_thr = thr[best_idx]
ypred = (proba >= best_thr).astype(int)

print("AUC:", roc_auc_score(yte, proba))
print(classification_report(yte, ypred, digits=4))

要点:报告准确率之前,先报样本占比;对极度稀少类别,精确率/召回率、PR-AUC 更有意义。


26) Pipeline × 搜参:可复现的一键训练

from sklearn.model_selection import GridSearchCV
from sklearn.impute import SimpleImputer
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline

num_cols = ["dow","week","sales"]    # 示例
pre = ColumnTransformer([
    ("num", Pipeline([("imp", SimpleImputer()), ("sc", StandardScaler())]), num_cols),
    ("cat", OneHotEncoder(handle_unknown="ignore"), ["city"])
])

pipe = Pipeline([
    ("pre", pre),
    ("model", RandomForestRegressor(random_state=42))
])

param = {
    "model__n_estimators": [200, 400],
    "model__max_depth": [None, 10, 20]
}
gs = GridSearchCV(pipe, param, cv=3, n_jobs=-1, scoring="neg_mean_absolute_error")
gs.fit(Xtr, ytr)
best = gs.best_estimator_

习惯:所有预处理都进 Pipeline在训练集 fit,其余 transform


27) 解释性:Permutation Importance & 局部解释

from sklearn.inspection import permutation_importance
r = permutation_importance(best, Xte, yte, n_repeats=5, random_state=42)
imp = pd.Series(r.importances_mean, index=best.named_steps["pre"].get_feature_names_out()).sort_values(ascending=False)

# 局部解释:敏捷起步用对比替换法(What-if)
row = Xte.iloc[[0]]
for col in ["dow","week"]:
    tmp = row.copy()
    tmp[col] = tmp[col] + 1
    print(col, best.predict(tmp) - best.predict(row))

说明:Permutation 更稳健但成本高;想要更细颗粒可用 SHAP(需额外安装)。


28) 可视化增强(Matplotlib 不丑的小技巧)

import matplotlib.pyplot as plt

plt.figure(figsize=(8,4))
ts_day.plot()
plt.title("Daily Sales"); plt.xlabel("Date"); plt.ylabel("Sales")
plt.tight_layout(); plt.show()

# 分类条形图:排序 + 注数字
order = df.groupby("city")["sales"].mean().sort_values().index
ax = df.groupby("city")["sales"].mean().loc[order].plot(kind="bar")
for c in ax.containers:
    ax.bar_label(c, fmt="%.1f")
plt.xticks(rotation=0); plt.tight_layout(); plt.show()

审美三件套:排序、网格线淡化、数值标注。必要时“小倍数图”(facet)分城市画多张。


29) 海量数据策略(内存省着点用)

# 分块读取 + 流式聚合
tot = 0
for chunk in pd.read_csv("huge.csv", chunksize=1_000_000, usecols=["date","sales"]):
    chunk["date"] = pd.to_datetime(chunk["date"])
    tot += chunk["sales"].sum()
print("Total sales:", tot)

# 类型降维
df = pd.read_parquet("x.parquet")          # 优先 Parquet
df = df.convert_dtypes()                   # 自动更合理 dtype
df["city"] = df["city"].astype("category")

思路:能“提早聚合”就不要先全读;能“落地中间结果”(Parquet/SQLite)就不要全在内存玩堆叠。


30) I/O 进阶:批量、压缩、分区

from glob import glob
files = sorted(glob("data/2024-*.csv.gz"))
df = pd.concat([pd.read_csv(f, compression="gzip") for f in files], ignore_index=True)

# 分区写 Parquet(按城市)
for k, g in df.groupby("city", observed=True):
    g.to_parquet(f"out/city={k}/part.parquet", index=False)

技巧:目录分区(city=Beijing/)→ 后续按需读取更快;压缩选 gzip/zstd,权衡体积与速度。


31) “方法链”写法:可读、可插拔

res = (
  df
  .assign(date=lambda d: pd.to_datetime(d["date"]))
  .query("sales >= 100")
  .pipe(lambda d: d[d["city"].notna()])
  .groupby(["city", pd.Grouper(key="date", freq="M")], observed=True)["sales"].sum()
  .reset_index()
  .sort_values(["city","date"])
)

心法assign 命名中间变量、pipe 接入任意自定义函数,替代临时变量污染命名空间。


32) 评估与分层拆分(防止时间穿越)

# 时间序列拆分(不要随机切)
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
for tr, te in tscv.split(y):
    # 用 tr 训练,用 te 验证
    pass

# 分层抽样(分类任务保持比例)
from sklearn.model_selection import train_test_split
Xtr, Xte, ytr, yte = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)

结论:时序任务尽量用前训后测;避免把未来信息泄露给过去。


33) 产出物:模型保存、版本与可复现

import joblib, json, platform, numpy as np, pandas as pd
joblib.dump(best, "models/model.joblib")

meta = {
  "python": platform.python_version(),
  "pandas": pd.__version__,
  "numpy": np.__version__,
  "seed": 42,
  "train_shape": list(Xtr.shape),
  "timestamp": pd.Timestamp.utcnow().isoformat()
}
json.dump(meta, open("models/metadata.json","w"), ensure_ascii=False, indent=2)

要点:保存模型 + 版本 + 训练数据形状 + 随机种子。改了数据/特征,即是新版本。


34) 轻量 CLI:一条命令跑全流程

# save as run.py
import argparse, pandas as pd
from joblib import dump, load

def train(in_path, out_model):
    df = pd.read_parquet(in_path)
    # ... 清洗/特征/建模(略) ...
    dump(best, out_model)

def predict(in_path, model_path, out_path):
    df = pd.read_parquet(in_path)
    m = load(model_path)
    df["pred"] = m.predict(df[feat_cols])
    df.to_parquet(out_path, index=False)

if __name__ == "__main__":
    ap = argparse.ArgumentParser()
    sub = ap.add_subparsers(dest="cmd", required=True)
    t = sub.add_parser("train"); t.add_argument("--in"); t.add_argument("--model")
    p = sub.add_parser("predict"); p.add_argument("--in"); p.add_argument("--model"); p.add_argument("--out")
    args = ap.parse_args()
    if args.cmd=="train": train(args.__dict__["in"], args.model)
    else: predict(args.__dict__["in"], args.model, args.out)

收益:把“手搓脚本”变“可复用命令”,利于自动化/调度。


35) 单元测试与数据不变量(长期可靠)

# tests/test_integrity.py
import pandas as pd

def test_city_domain():
    df = pd.read_parquet("data/clean.parquet")
    assert set(df["city"].unique()) <= {"Beijing","Shanghai","Shenzhen"}

def test_no_future_dates():
    df = pd.read_parquet("data/clean.parquet")
    assert (df["date"] <= pd.Timestamp.now(tz="UTC")).all()

原则:把“数据应当如何”的共识写成测试,CI 中跑,数据变坏第一时间报警。


36) “20 条一行流”速写(收藏夹)

# 1. 逆透视:列→行
pd.melt(df, id_vars=["id"], var_name="metric", value_name="value")

# 2. 合并近似时间(asof join)
pd.merge_asof(a.sort_values("t"), b.sort_values("t"), on="t", by="key", direction="nearest", tolerance=pd.Timedelta("5min"))

# 3. 去重保留最大
df.sort_values("sales").drop_duplicates(subset=["id"], keep="last")

# 4. 组内归一化
df["z"] = df.groupby("city")["sales"].transform(lambda s: (s - s.mean())/s.std())

# 5. 复杂条件赋值
df["grade"] = np.select([df.sales>=200, df.sales>=150], ["A","B"], default="C")

# 6. 高频字符串清理
df["sku"] = (df["sku"].str.strip().str.upper().str.replace(r"[^A-Z0-9\-]","", regex=True))

# 7. 读多表 Excel
xlsx = pd.ExcelFile("book.xlsx"); {name: xlsx.parse(name) for name in xlsx.sheet_names}

# 8. 多层索引聚合后展平列
agg.columns = ['_'.join(map(str, c)) for c in agg.columns.to_flat_index()]

# 9. 有序分类排序
df["grade"] = pd.Categorical(df["grade"], ["C","B","A"], ordered=True); df.sort_values("grade")

# 10. 滚动分位
df["q90_14d"] = df.set_index("date")["sales"].rolling("14D").quantile(0.9).reset_index(drop=True)

# 11. 组内 TopK 的其它列
idx = df.groupby("city")["sales"].nlargest(3).reset_index().set_index("level_1").index
df.loc[idx, :]

# 12. 采样可重复
df.sample(n=1000, random_state=42)

# 13. 多条件 merge 键
pd.merge(a, b, left_on=["k1","k2"], right_on=["k1","k2"], how="left")

# 14. 比例标准化到 1
w = df["weight"].clip(lower=0); w = w / w.sum()

# 15. 时间戳到周起始
df["week_start"] = (df["date"] - pd.to_timedelta(df["date"].dt.dayofweek, unit="D")).dt.normalize()

# 16. 多列字符串统一清洗
cols = ["name","city"]; df[cols] = df[cols].apply(lambda s: s.str.strip().str.lower())

# 17. 稀疏矩阵(高维 one-hot)
from sklearn.feature_extraction.text import CountVectorizer
X = CountVectorizer(min_df=5).fit_transform(texts)

# 18. 训练集/测试集时间分割
cut = "2024-09-01"; tr = df[df.date < cut]; te = df[df.date >= cut]

# 19. 合并报错定位
try:
    pd.merge(a,b,on="id",validate="one_to_one")
except Exception as e:
    print("关系不满足一对一:", e)

# 20. 内存测算
(df.memory_usage(deep=True).sum()/1024**2)

练习挑战(边做边长)

  1. 复刻“TopN + 分层时间聚合”:每城市每月销量 Top3 的用户,并画小倍数图。
  2. 做一版“滚动阈值异常检测”:对每城市做 28 天滚动 99 分位,超过即标记。
  3. 写“带校验的导入器”:读取外部 CSV→校验→清洗→落地 Parquet→产出日志(错误与统计)。
  4. 一个“时间穿越安全”的建模脚本:TimeSeriesSplit + Pipeline + GridSearchCV + 版本元数据。

添加新评论