Polars数据分析常见操作

Polars 是一个基于 Apache Arrow 的高性能 DataFrame 库,以 Rust 为核心Python 为接口,融合了 SQL 的声明式风格函数式编程的表达力,专为现代数据分析与 ETL 场景设计。

一、架构与核心哲学

核心组件

组件特性说明
DataFrame即时执行(Eager)类似 Pandas,适合交互式探索
LazyFrame延迟执行(Lazy)构建查询计划,自动优化后一次性执行
Expression Engine无副作用、可组合所有操作通过表达式构建,支持向量化

设计理念

“像写 SQL 一样声明,像写函数一样组合。”
(
    pl.scan_csv("data.csv")
    .filter(pl.col("amount") > 0)
    .group_by("region")
    .agg(pl.sum("amount"))
    .sort("amount", descending=True)
    .collect()
)

优势

  • 自动优化执行计划(如谓词下推、列裁剪)
  • 零拷贝 Arrow 内存模型
  • 多线程并行执行(Rust 原生支持)

二、延迟执行(LazyFrame)深度解析

1. 构建延迟管道

q = (
    pl.scan_parquet("sales.parquet")
    .filter(pl.col("year") == 2024)
    .group_by("category")
    .agg(pl.sum("revenue").alias("total_rev"))
)

2. 执行与调试

方法用途
.explain()打印优化后的查询计划
.collect()执行并返回 DataFrame
.collect(streaming=True)流式执行,适合超大数据集
.fetch(5)快速预览前 5 行(不执行全量)

3. 自动优化策略

优化类型说明
Filter Pushdownfilter 尽早下推至数据源(如 Parquet 谓词下推)
Projection Pruning仅加载最终需要的列
Predicate Folding合并多个布尔条件(如 (a > 1) & (a < 10)a.is_between(1, 10)
Join Optimization自动选择最优连接策略(Hash Join / Broadcast)
Streaming Execution边读边算,避免内存峰值

三、聚合与分组策略

1. 基础聚合

df.group_by("region").agg([
    pl.count(),
    pl.mean("sales").alias("avg_sales"),
    pl.max("sales")
])

2. 多键分组

df.group_by(["region", "year"]).agg(pl.sum("sales"))

3. 时间序列动态分组

df.group_by_dynamic("timestamp", every="1h").agg(pl.sum("value"))

4. 滚动与累积计算

df.select(pl.col("value").rolling_mean(window_size=7))
df.select(pl.col("value").cum_sum())

四、表达式魔法:内建函数族速览

类别常用函数
数学abs(), round(), log(), exp(), clip()
逻辑is_null(), is_in(), when().then().otherwise()
聚合sum(), mean(), count(), n_unique(), std()
时间dt.year(), dt.month(), dt.offset_by("1d")
字符串str.lengths(), str.replace_all(), str.to_datetime()
列操作alias(), cast(), shift(), rank()
列表列arr.first(), arr.explode(), arr.eval()
选择器pl.col("^regex$"), pl.exclude("col"), pl.all_horizontal()

五、窗口函数(Window / Over)

模拟 SQL 的 OVER 子句:

df.select([
    pl.col("region"),
    pl.col("sales"),
    pl.mean("sales").over("region").alias("avg_by_region"),
    pl.rank("sales").over("region").alias("rank_in_region")
])

✅ 支持:

  • 分区:.over("key")
  • 排序:.over(sort_by="date")
  • 累积函数:cum_sum(), cum_max(), cum_count()

六、UDF 使用建议

尽量避免 UDF!优先使用表达式。

安全用法(逐元素)

df.with_columns(
    pl.col("text").map_elements(lambda s: s[::-1]).alias("reversed")
)

危险用法(逐行,性能极差)

df.apply(lambda row: ...)  # ❌ 退化为 Python 循环,单线程
建议:仅用于调试或极小数据集。

七、高级 I/O 支持

格式读取写入
CSVpl.read_csv()df.write_csv()
Parquetpl.read_parquet()df.write_parquet()
Feather (IPC)pl.read_ipc()df.write_ipc()
JSONpl.read_json()df.write_json()
SQLpl.read_database(uri, query)
分区 Parquetpl.scan_parquet("data/year=*/month=*")自动识别分区

延迟读取pl.scan_*() + .collect() 是最佳实践。


八、数据类型与内存控制

Polars 完全兼容 Arrow 类型系统

类型示例
整数Int8, Int64
浮点Float32, Float64
字符串Utf8
日期时间Date, Datetime("ns", time_zone="UTC")
列表List(Int64)
结构体Struct([("x", Int32), ("y", Int32)])
分类Categorical(节省内存)

类型转换

df.with_columns(pl.col("col").cast(pl.Int32))

九、性能调优指南

策略说明
✅ 使用 LazyFrame启用查询优化
✅ 列式思维避免逐行操作
❌ 避免 Python UDF退化为单线程
✅ 启用 Streamingcollect(streaming=True)
✅ 列裁剪早期 select() 减少 I/O
✅ 分区读取利用 Hive 风格目录结构
🔍 查询剖析.explain(), .profile()

十、Polars vs Pandas vs PySpark

维度PolarsPandasPySpark
执行模型延迟 + 向量化即时分布式 DAG
语言Rust(核心)C/PythonScala/JVM
性能⚡ 极高(多线程)中等高(分布式)
内存模型Arrow 列式NumPy 行式JVM 序列化
延迟优化
适用场景本地大数据 / ETL / ML 预处理小数据探索超大规模分布式

十一、常见陷阱与高级技巧

问题解决方案
LazyFrame 未执行必须调用 .collect()
Join 键类型不一致显式 .cast() 对齐
嵌套 JSON 列pl.col("data").struct.field("x")
多列 explodedf.explode(["tags", "values"])
动态时间分组group_by_dynamic(..., closed="left")
可视化转 Pandas:df.to_pandas().plot()

十二、生态扩展

项目用途
connectorx高速数据库导入(PostgreSQL, MySQL 等)
DuckDB + Polars混合 SQL 查询
polars-lazyframe-sql用 SQL 编写 Lazy 管道
Polars-ML与 Scikit-learn 集成
Rust/Node.js 绑定嵌入后端服务

十三、性能对比(Polars vs Pandas vs Dask)

测试环境:i7-12700H, 32GB RAM, Ubuntu 22.04, 1000 万行日志数据
任务Polars (s)Pandas (s)Dask (s)加速比(vs Pandas)
读取 Parquet0.82.11.92.6x
筛选 + 新增列0.31.81.26.0x
分组聚合1.14.53.04.1x
Join(10M + 10K)1.58.25.55.5x
排序2.06.84.03.4x

结论

  • Polars 在所有任务中最快,尤其在表达式计算和 Join 上优势显著
  • Dask 适合 超内存数据,但小数据有调度开销
  • Pandas 适合 交互式小数据探索

✨ Polars:为速度而生,为简洁而写。

用更少的代码,跑更快的数据。

📉 可视化图表(文字描述,可转为 Matplotlib/Plotly)

性能对比(对数刻度,时间越低越好)

任务5: 排序
Pandas  ████████████████████ 6.8s
Dask    ████████████ 4.0s
Polars  ██████ 2.0s

任务3: 分组聚合
Pandas  ████████████████████████████████████ 4.5s
Dask    ████████████████████████ 3.0s
Polars  ████████████ 1.1s

任务2: 筛选+计算
Pandas  ████████████████████████████████████████████ 1.8s
Dask    ████████████████████████ 1.2s
Polars  ████ 0.3s

结论

  • Polars 在所有任务中最快,尤其在表达式计算和 join 上优势显著
  • Dask 适合 超大内存数据(> RAM),但小数据有调度开销
  • Pandas 适合 交互式小数据探索,但性能瓶颈明显

附录一 速查手册

1、基础 & 环境

项目命令 / 用法
安装 Polars(Python)pip install polarspip install polars[all] (Gist)
导入import polars as pl
创建 DataFramepl.DataFrame({"a": [1,2,3], "b": ["x","y","z"]}) (Gist)
从 Pandas 转换pl.from_pandas(df_pandas)
读取 / 写入文件pl.read_csv(...), pl.read_parquet(...), df.write_csv(...), df.write_parquet(...) (docs.pola.rs)
惰性 / 延迟执行使用 pl.scan_csv(...)df.lazy() 构造 LazyFrame,最后 .collect() 执行 (docs.pola.rs)
视图 / 计划输出lazy.explain() / .profile()(如果版本支持) (frosty-8.github.io)

2、核心表达式 & 列操作

Polars 的强大在于表达式(Expressions):你构建表达式,系统会优化执行。

操作示例 / 用法
访问列pl.col("a")
算术运算pl.col("x") + pl.col("y"), pl.col("x") * 2
别名 / 重命名… .alias("new_name")
多列操作(表达式扩展)pl.col("a", "b") * 0.5 → 同时对 a 和 b 应用乘法 (docs.pola.rs)
条件表达式pl.when(pl.col("x") > 5).then("high").otherwise("low") (frosty-8.github.io)
map / UDFpl.col("x").map_elements(lambda v: …)(适用于当内建表达式不够时) (frosty-8.github.io)

3、筛选 / 过滤 / 子集

操作示例
行过滤df.filter(pl.col("a") > 10)
多条件过滤df.filter((pl.col("a") > 10) & (pl.col("b") < 5))
在集合中pl.col("a").is_in([1, 2, 3])
区间过滤pl.col("a").is_between(5, 15)
处理空值 / 缺失pl.col("a").is_null()`df.drop_nulls()`df.fill_null(value=…) (frosty-8.github.io)
去重 / 唯一df.unique()pl.col("a").unique()

4、排序 / 排列 / 采样

操作示例
排序df.sort("col", reverse=True)df.sort(by=["a","b"], reverse=[True, False])
抽样 / 采样df.sample(n=100), df.sample(frac=0.2) (frosty-8.github.io)
取前几 / 后几行df.head(n), df.tail(n)
多列排序df.sort(["a", "b"], reverse=[False, True])

5、聚合 / 分组 / 统计

操作示例
聚合df.select(pl.sum("a"), pl.mean("b"))
group_by + aggdf.groupby("g").agg([pl.sum("a"), pl.mean("b").alias("b_avg")])
多重聚合groupby(...).agg([ … ])
计数 / 唯一数pl.count("col"), pl.n_unique("col")
描述统计df.describe()
窗口 / overpl.col("x").cum_sum().over("group") 或使用 .over(...) 操作符
滚动 / 移动窗口pl.col("x").rolling_mean(window_size=3)

6、连接 / 合并 / 拼接 / 重塑

操作示例
内连接 / 左 / 右 / 全df.join(df2, on="key", how="inner") / how="left" / how="outer" / how="anti" 等 (Gist)
垂直拼接pl.concat([df1, df2], how="vertical")
水平拼接pl.concat([df1, df2], how="horizontal")
对角拼接(补 Null)how="diagonal"
melt / pivotdf.melt(id_vars=…, value_vars=…) / df.pivot(values=..., index=..., columns=...)
explode 列(列表列)pl.col("list_col").explode()

7、日期 / 时间 / 时间序列

操作示例
解析字符串成时间pl.col("date").str.strptime(pl.Date, fmt="%Y-%m-%d")
时间组件提取pl.col("date").dt.year(), .dt.month(), .dt.weekday()
时间加减pl.col("date") + pl.duration(days=1)
shift / lag / leadpl.col("x").shift(1).lag() / .lead()
差分 / 差异pl.col("x") - pl.col("x").shift(1)
动态分组 / 重采样df.group_by_dynamic("timestamp", every="1h").agg(pl.sum("value"))

8、字符串 / 文本操作

操作示例
转换大小写pl.col("txt").str.to_lowercase() / .str.to_uppercase()
包含 / 匹配.str.contains("pattern"), .str.starts_with(...), .str.ends_with(...)
替换.str.replace("old", "new")
长度 / 切割.str.lengths(), .str.slice(...)
拼接 / 连接pl.concat_str(["col1", "col2"], separator="-")

9、空值 / 缺失 / NaN 处理

操作示例
判断 Nullpl.col("x").is_null()
判断 NaNpl.col("x").is_nan()
填充值pl.col("x").fill_null(0) 或策略填充如 .fill_null(strategy="forward")
删除包含 Null 的行df.drop_nulls()df.drop_nulls(["a","b"])

10、性能 / 优化建议

建议说明
尽可能使用 Lazy / scan + collect 模式构建查询计划并统一优化执行 (docs.pola.rs)
投影下推(projection pruning)在早期阶段只选择需要的列
谓词下推(filter pushdown)早期过滤可以减少数据量传输
避免使用 Python 循环 / map_elements 过多优先用内建表达式
使用 .explain() / .profile() 检查执行计划与性能
对于大数据量启用 streaming 模式(若支持)在 collect 时开启或使用流式执行

附录二

1. 基础操作

import polars as pl
df = pl.DataFrame({"a": [1,2,3], "b": ["x","y","z"]})
lazy = pl.scan_parquet("*.parquet")

2. 表达式核心

pl.col("x") + 1
pl.when(pl.col("score") > 90).then("A").otherwise("B")
pl.col("^feat_.*$")  # 正则选列

3. 连接与重塑

df1.join(df2, on="id", how="left")
pl.concat([df1, df2], how="vertical")
df.melt(id_vars="id", value_vars=["v1", "v2"])

4. 时间处理

pl.col("ts").dt.hour()
pl.col("ts") + pl.duration(days=7)
df.group_by_dynamic("ts", every="1d").agg(pl.sum("value"))

5. 性能建议

  • 优先使用 scan_*() + collect()
  • .select() 早裁剪列
  • .explain() 查看优化计划
  • 大数据启用 streaming=True

添加新评论