📦 示例仓库结构
dbt-ge-datahub-duckdb-demo/
├─ README.md
├─ requirements.txt
├─ .env.example
├─ Makefile
├─ scripts/
│ ├─ up.sh
│ ├─ down.sh
│ ├─ seed_duckdb.py
│ └─ run_governance.sh
├─ datahub/
│ └─ datahub_dbt_recipe.yml
├─ dbt_project/
│ ├─ dbt_project.yml
│ ├─ models/
│ │ ├─ stg_sales.sql
│ │ └─ schema.yml
│ └─ .dbt/
│ └─ profiles.yml
└─ ge/ # Great Expectations(最小可用配置)
├─ great_expectations.yml
├─ checkpoints/
│ └─ stg_sales_checkpoint.yml
├─ expectations/
│ └─ stg_sales_suite.json
└─ plugins/
📝 README.md(精简版)
# dbt + GE + DataHub + DuckDB Demo
本仓库演示:
- 用 DuckDB 作本地数仓
- 用 dbt 产出模型与血缘(manifest.json / catalog.json)
- 用 Great Expectations 做质量校验(含失败用例)
- 用 DataHub 展示元数据、血缘与质量结果
## 先决条件
- Linux / macOS(Windows 请用 WSL2)
- Docker & Docker Compose
- Python 3.8+
- 端口:DataHub UI 默认 http://localhost:9002,GMS http://localhost:8080
## 快速开始
```bash
python3 -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env
make up # 启动 DataHub(官方 quickstart)并初始化 DuckDB 示例数据
make pipeline # 跑 dbt + GE,并把 dbt 元数据推送到 DataHub
# 浏览 http://localhost:9002 搜索 stg_sales,查看 Schema / Lineage / Quality
关闭与清理
make down
若你已自建 DataHub 或端口不同,修改.env
里的DATAHUB_GMS
/DATAHUB_UI
即可。
---
## 📄 requirements.txt
```txt
acryl-datahub==0.13.3
duckdb==1.0.0
dbt-duckdb==1.8.2
great-expectations==0.18.12
duckdb-engine==0.13.0
版本只是示例,若你本机已有依赖可按需调整。
⚙️ .env.example
# DataHub 服务地址(官方 quickstart 暴露)
DATAHUB_GMS=http://localhost:8080
DATAHUB_UI=http://localhost:9002
# DuckDB 文件(脚本会在仓库根目录生成)
DUCKDB_PATH=./warehouse.duckdb
# dbt profiles 目录(本仓库内置 .dbt 目录)
DBT_PROFILES_DIR=./dbt_project/.dbt
🛠️ Makefile
include .env
export
.PHONY: up down seed dbt-run dbt-test ge-run ingest pipeline
up:
@bash scripts/up.sh
down:
@bash scripts/down.sh
seed:
@python3 scripts/seed_duckdb.py
dbt-run:
@cd dbt_project && dbt run
dbt-test:
@cd dbt_project && dbt test || true
ge-run:
@great_expectations checkpoint run -y -d ge stg_sales_checkpoint || true
ingest:
@cd dbt_project && dbt docs generate
@datahub ingest --recipe datahub/datahub_dbt_recipe.yml
pipeline: seed dbt-run dbt-test ge-run ingest
@echo "✅ Governance pipeline completed."
dbt test
与GE
故意允许失败(返回 0),以便流水线继续把“失败的质量结果”上报到 DataHub。
🚀 scripts/up.sh(一键启动 DataHub + 初始化数据)
#!/usr/bin/env bash
set -euo pipefail
# 1) 启动 DataHub(官方 quickstart)
if ! command -v datahub >/dev/null 2>&1; then
echo "Installing acryl-datahub CLI..."
pip install acryl-datahub
fi
echo "Starting DataHub quickstart (this may take several minutes on first run)..."
datahub docker quickstart --no-populate-sample-data
# 2) 初始化 DuckDB 测试数据
echo "Seeding DuckDB sample data..."
python3 scripts/seed_duckdb.py
echo "✅ DataHub UI: ${DATAHUB_UI}"
使用 datahub docker quickstart
避免你手写一长串 Compose 服务,兼容性最好。
🧹 scripts/down.sh
#!/usr/bin/env bash
set -euo pipefail
echo "Stopping DataHub..."
datahub docker nuke --confirm
rm -f "${DUCKDB_PATH:-./warehouse.duckdb}" || true
echo "✅ Down complete."
🌱 scripts/seed_duckdb.py
import os, duckdb
db_path = os.getenv("DUCKDB_PATH", "./warehouse.duckdb")
con = duckdb.connect(db_path)
con.execute("""
CREATE TABLE IF NOT EXISTS sales (
id INTEGER,
product VARCHAR,
amount DECIMAL(10,2),
sale_date DATE
);
DELETE FROM sales;
INSERT INTO sales VALUES
(1, 'Laptop', 1200.00, DATE '2025-01-10'),
(2, 'Mouse', NULL, DATE '2025-01-11'),
(3, 'Keyboard', 80.00, DATE '2025-01-12');
""")
con.close()
print(f"Seeded DuckDB at {db_path}")
▶️ scripts/run_governance.sh(本地一键跑全链路)
#!/usr/bin/env bash
set -euo pipefail
echo "🔄 dbt run..."
make dbt-run
echo "🧪 dbt test (expect failure on amount not_null)..."
make dbt-test
echo "📊 Great Expectations checkpoint..."
make ge-run
echo "📤 Ingest dbt metadata to DataHub..."
make ingest
echo "✅ Governance pipeline completed."
🗂️ dbt_project/dbt_project.yml
name: 'my_governance_demo'
version: '1.0'
config-version: 2
profile: 'my_governance_demo'
model-paths: ["models"]
target-path: "target"
clean-targets: ["target", "dbt_packages"]
dbt_project/models/stg_sales.sql
select
id,
product,
amount,
sale_date
from sales
dbt_project/models/schema.yml
version: 2
models:
- name: stg_sales
columns:
- name: id
tests:
- unique
- not_null
- name: amount
tests:
- not_null # 故意失败:有一行 NULL
dbt_project/.dbt/profiles.yml(仓库内置,便于 CI)
my_governance_demo:
target: dev
outputs:
dev:
type: duckdb
path: "{{ env_var('DUCKDB_PATH') }}"
extensions: ['httpfs','json']
通过设置 DBT_PROFILES_DIR=./dbt_project/.dbt
,dbt 会优先读取这份 profiles。
🧩 ge/great_expectations.yml(最小可用 + DataHub Action)
config_version: 3
plugins_directory: plugins/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store
stores:
expectations_store:
class_name: ExpectationsStore
validations_store:
class_name: ValidationsStore
checkpoint_store:
class_name: CheckpointStore
data_docs_sites: {} # 简化,必要时可开启 HTML 报告
anonymous_usage_statistics:
enabled: false
# 将验证结果上报到 DataHub
validation_operators:
action_list_operator:
class_name: ActionListValidationOperator
action_list:
- name: datahub
action:
class_name: DataHubValidationAction
config:
datahub_api:
server: ${DATAHUB_GMS}
timeout_sec: 30
ge/checkpoints/stg_sales_checkpoint.yml
name: stg_sales_checkpoint
config_version: 1
class_name: Checkpoint
validations:
- batch_request:
datasource_name: duckdb_source
data_connector_name: runtime_data_connector
data_asset_name: stg_sales
runtime_parameters:
query: "select * from stg_sales"
batch_identifiers:
default_identifier_name: default
expectation_suite_name: stg_sales_suite
action_list:
- name: datahub
action:
class_name: DataHubValidationAction
config:
datahub_api:
server: ${DATAHUB_GMS}
ge/expectations/stg_sales_suite.json(简单的 not_null / unique,对应 dbt 测试)
{
"expectation_suite_name": "stg_sales_suite",
"expectations": [
{"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "id"}},
{"expectation_type": "expect_column_values_to_be_unique", "kwargs": {"column": "id"}},
{"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "amount"}}
],
"meta": {}
}
为了不引入复杂的数据源注册,这里用runtime
方式直接查询stg_sales
。如需标准化,可补充datasources
配置,使用duckdb-engine
指向DUCKDB_PATH
。
🔗 datahub/datahub_dbt_recipe.yml(dbt 元数据上报)
source:
type: dbt
config:
manifest_path: dbt_project/target/manifest.json
catalog_path: dbt_project/target/catalog.json
sources_path: dbt_project/target/sources.json
target_platform: duckdb
write_semantic_models: false
sink:
type: datahub-rest
config:
server: ${DATAHUB_GMS}
🧪 GitHub Actions(.github/workflows/governance.yml)
name: governance-pipeline
on:
push:
branches: [ main ]
pull_request:
jobs:
run-pipeline:
runs-on: ubuntu-latest
env:
DATAHUB_GMS: ${{ secrets.DATAHUB_GMS || 'http://localhost:8080' }}
DATAHUB_UI: ${{ secrets.DATAHUB_UI || 'http://localhost:9002' }}
DUCKDB_PATH: ./warehouse.duckdb
DBT_PROFILES_DIR: ./dbt_project/.dbt
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install deps
run: |
python -m pip install -U pip
pip install -r requirements.txt
- name: Optionally start DataHub via CLI (self-hosted only)
if: ${{ env.DATAHUB_GMS == 'http://localhost:8080' }}
run: |
datahub docker quickstart --no-populate-sample-data
# 等待 GMS 就绪
for i in {1..60}; do
curl -sSf "${DATAHUB_GMS}/health" && break || sleep 5
done
- name: Seed DuckDB
run: python scripts/seed_duckdb.py
- name: dbt run + test
working-directory: ./dbt_project
run: |
dbt run
dbt test || true
- name: GE checkpoint (send result to DataHub)
run: |
great_expectations checkpoint run -y -d ge stg_sales_checkpoint || true
- name: Ingest dbt metadata to DataHub
run: |
cd dbt_project && dbt docs generate
datahub ingest --recipe datahub/datahub_dbt_recipe.yml
- name: Print UI URL
run: echo "DataHub UI => ${DATAHUB_UI}"
- name: Tear down local DataHub (if started here)
if: ${{ env.DATAHUB_GMS == 'http://localhost:8080' }}
run: datahub docker nuke --confirm || true
说明:
1)若你在 CI 中使用公网 DataHub(推荐),把DATAHUB_GMS
/DATAHUB_UI
配到仓库 Secrets,即可直接推送;
2)若没配 Secrets,workflow 会在 CI 里临时拉起本地 DataHub,再跑完即销毁;
3)dbt test
/GE
允许失败,以便把“失败的质量结果”同步到 DataHub 做演示。
✅ 你将得到的效果
DataHub UI(
/datasets
搜索stg_sales
)可看到:- 列级 Schema
- 上游血缘:
sales → stg_sales
- 质量结果页签(GE 上报的断言;
amount not_null
为 失败)
- 本地与 CI 都能一键跑通;你也可以把 DuckDB 换成 Postgres/Snowflake,仅需调整 dbt profile 与 GE 数据源。
🔧 小贴士(避坑)
- DataHub 端口:UI 默认
9002
,GMS 默认8080
;若端口冲突,改.env
并传入脚本或 CI 环境。 dbt docs generate
必须成功,target/manifest.json
缺失会导致血缘不可见。- GE 与 dbt 测试会有重复:建议保留 dbt 做基础约束(主键唯一/非空),GE 做分布/范围/阈值等更丰富校验。
- Windows:请用 WSL2,避免路径与 Docker 的兼容性问题。
附录
Airflow 多任务 DAG & Dagster 资产版
目标:把scripts/run_governance.sh
拆成可观测、可重试的编排流。提供 Airflow 多任务版 与 Dagster 资产(SDA)版 两套完整实现,默认时区America/Chicago
,每日 02:15 运行。均假设你已有本文前置的仓库结构与脚本。
🎛 前置约定
- 仓库根(
REPO_ROOT
)包含:scripts/run_governance.sh
、dbt_project/
、ge/
、datahub/datahub_dbt_recipe.yml
、warehouse.duckdb
(或由scripts/seed_duckdb.py
生成)。 环境变量(两套方案均读取):
DATAHUB_GMS
(默认http://localhost:8080
)DATAHUB_UI
(默认http://localhost:9002
)DUCKDB_PATH
(默认${REPO_ROOT}/warehouse.duckdb
)DBT_PROFILES_DIR
(默认${REPO_ROOT}/dbt_project/.dbt
)
方案 A:Airflow 多任务拆分版(dbt run → dbt test → GE → ingest)
1) DAG 文件:airflow/dags/governance_pipeline_dag.py
# airflow/dags/governance_pipeline_dag.py
import os
import pendulum
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
# 调度:每天 02:15 America/Chicago
TZ = pendulum.timezone("America/Chicago")
# 从 Airflow Variables 读取,或退回默认值
# 在 UI: Admin -> Variables 可覆盖
try:
from airflow.models import Variable
V = lambda k, d: Variable.get(k, default_var=d)
except Exception: # scheduler 尚未初始化等场景
V = lambda k, d: d
REPO_ROOT = V("repo_root", "/opt/airflow/repo")
DATAHUB_GMS = V("datahub_gms", "http://host.docker.internal:8080")
DATAHUB_UI = V("datahub_ui", "http://host.docker.internal:9002")
DUCKDB_PATH = V("duckdb_path", f"{REPO_ROOT}/warehouse.duckdb")
DBT_PROFILES_DIR = V("dbt_profiles_dir", f"{REPO_ROOT}/dbt_project/.dbt")
DEFAULT_ENV = {
"DATAHUB_GMS": DATAHUB_GMS,
"DATAHUB_UI": DATAHUB_UI,
"DUCKDB_PATH": DUCKDB_PATH,
"DBT_PROFILES_DIR": DBT_PROFILES_DIR,
"PYTHONUNBUFFERED": "1",
}
def _bash(task_id: str, cmd: str, retries=1, minutes=5, **kw) -> BashOperator:
return BashOperator(
task_id=task_id,
bash_command=f"bash -lc '{cmd}'",
env=DEFAULT_ENV,
retries=retries,
retry_delay=timedelta(minutes=minutes),
**kw,
)
default_args = {
"owner": "data-governance",
"depends_on_past": False,
"email_on_failure": False,
"email_on_retry": False,
}
with DAG(
dag_id="governance_pipeline_multi",
description="dbt + GE + DataHub (multi-task with retries)",
start_date=pendulum.datetime(2025, 1, 1, tz=TZ),
schedule="15 2 * * *",
catchup=False,
default_args=default_args,
max_active_runs=1,
tags=["governance", "dbt", "datahub", "ge", "duckdb"],
) as dag:
# 可选:播种 DuckDB(幂等)
seed = _bash(
task_id="seed_duckdb",
cmd="python3 scripts/seed_duckdb.py",
retries=1,
minutes=2,
)
# 1) dbt run —— 关键阶段:失败需重试
dbt_run = _bash(
task_id="dbt_run",
cmd="cd dbt_project && dbt run",
retries=2,
minutes=3,
)
# 2) dbt test —— 允许业务校验失败,但不中断编排
# 用 `|| true` 保持任务成功,从而让后续 GE / ingest 继续执行
dbt_test = _bash(
task_id="dbt_test",
cmd="cd dbt_project && dbt test || true",
retries=0,
)
# 3) Great Expectations —— 将质量结果上报 DataHub(Action 已在 GE 配置)
ge_checkpoint = _bash(
task_id="ge_checkpoint",
cmd="great_expectations checkpoint run -y -d ge stg_sales_checkpoint || true",
retries=1,
minutes=2,
)
# 4) Ingest dbt 元数据与血缘 —— 对网络抖动更敏感,给更多重试与指数退避
ingest = BashOperator(
task_id="datahub_ingest",
bash_command=(
"bash -lc 'cd dbt_project && dbt docs generate && "
"datahub ingest --recipe ../datahub/datahub_dbt_recipe.yml'"
),
env=DEFAULT_ENV,
retries=3,
retry_exponential_backoff=True,
retry_delay=timedelta(minutes=2),
max_retry_delay=timedelta(minutes=15),
)
seed >> dbt_run
dbt_run >> [dbt_test, ge_checkpoint]
[dbt_test, ge_checkpoint] >> ingest
2) 最简运行容器:docker-compose.airflow.yml
version: "3.8"
services:
airflow:
image: apache/airflow:2.9.2
container_name: airflow-governance
ports:
- "8088:8080" # Airflow UI -> http://localhost:8088
environment:
AIRFLOW__CORE__LOAD_EXAMPLES: "False"
AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: "America/Chicago"
_AIRFLOW_WWW_USER_USERNAME: admin
_AIRFLOW_WWW_USER_PASSWORD: admin
volumes:
- ./airflow/dags:/opt/airflow/dags
- ./:/opt/airflow/repo
command: >
bash -lc "
pip install -r /opt/airflow/repo/requirements.txt &&
airflow db migrate &&
airflow users create --role Admin --username admin --password admin --firstname A --lastname F --email admin@example.com || true &&
airflow scheduler & exec airflow webserver
"
启动:docker compose -f docker-compose.airflow.yml up -d
→ 打开 UI 启用governance_pipeline_multi
。
方案 B:Dagster 资产版(Software-Defined Assets)
以 资产图 表达数据工序:
seed_duckdb
→dbt_assets
→ (dbt_tests
,ge_quality
) →datahub_ingest
使用
dagster-dbt
原生加载 dbt 资产,质量与元数据上报走 CLI(简单直观)。
1) 额外依赖
requirements.txt
追加:
dagster==1.8.1
dagster-webserver==1.8.1
dagster-dbt==0.23.1
版本与 dbt-duckdb==1.8.x
兼容。若你固定了 Python 版本,请按需微调。
2) 资产与调度:orchestration/dagster/assets_defs.py
# orchestration/dagster/assets_defs.py
import os
import subprocess
from pathlib import Path
from dagster import (
asset, AssetExecutionContext, Definitions,
define_asset_job, ScheduleDefinition, AssetSelection
)
from dagster_dbt import DbtCliResource, load_assets_from_dbt_project
REPO_ROOT = Path(os.getenv("REPO_ROOT", Path(__file__).resolve().parents[2]))
DBT_PROJECT_DIR = REPO_ROOT / "dbt_project"
DBT_PROFILES_DIR = Path(os.getenv("DBT_PROFILES_DIR", REPO_ROOT / "dbt_project" / ".dbt"))
DATAHUB_GMS = os.getenv("DATAHUB_GMS", "http://localhost:8080")
DATAHUB_UI = os.getenv("DATAHUB_UI", "http://localhost:9002")
DUCKDB_PATH = Path(os.getenv("DUCKDB_PATH", REPO_ROOT / "warehouse.duckdb"))
ENV = {
"DATAHUB_GMS": DATAHUB_GMS,
"DATAHUB_UI": DATAHUB_UI,
"DUCKDB_PATH": str(DUCKDB_PATH),
"DBT_PROFILES_DIR": str(DBT_PROFILES_DIR),
"PYTHONUNBUFFERED": "1",
**os.environ,
}
# 0) 播种底表(幂等)
@asset(description="Seed DuckDB with demo data")
def seed_duckdb(context: AssetExecutionContext):
script = REPO_ROOT / "scripts" / "seed_duckdb.py"
script.chmod(script.stat().st_mode | 0o111)
subprocess.run(["python3", str(script)], env=ENV, check=True)
context.log.info(f"Seeded {DUCKDB_PATH}")
# 1) 加载 dbt 资产(模型、快照、种子等)
dbt_assets = load_assets_from_dbt_project(
project_dir=str(DBT_PROJECT_DIR),
profiles_dir=str(DBT_PROFILES_DIR),
# 你也可以在 selection 中只选某些模型,例如 'stg_sales'
)
# 2) dbt test(允许断言失败,不阻断后续)
@asset(deps=[dbt_assets], description="Run dbt tests; continue on failure")
def dbt_tests(context: AssetExecutionContext):
cmd = [
"dbt", "test",
"--project-dir", str(DBT_PROJECT_DIR),
"--profiles-dir", str(DBT_PROFILES_DIR),
]
result = subprocess.run(cmd, env=ENV, capture_output=True, text=True)
context.log.info(result.stdout)
if result.returncode != 0:
context.log.warning("dbt tests reported failures (expected in demo); continuing")
# 3) Great Expectations 校验并上报 DataHub(GE 配置含 DataHub Action)
@asset(deps=[dbt_assets], description="Run GE checkpoint and emit to DataHub")
def ge_quality(context: AssetExecutionContext):
cmd = [
"great_expectations", "checkpoint", "run", "-y", "-d", "ge", "stg_sales_checkpoint",
]
result = subprocess.run(cmd, env=ENV, capture_output=True, text=True)
context.log.info(result.stdout)
if result.returncode != 0:
context.log.warning("GE checkpoint returned non-zero (allowed in demo)")
# 4) 推送 dbt 元数据与血缘
@asset(deps=[dbt_assets, ge_quality, dbt_tests], description="Ingest dbt artifacts to DataHub")
def datahub_ingest(context: AssetExecutionContext):
# 先生成 artifact
subprocess.run(["dbt", "docs", "generate"], cwd=str(DBT_PROJECT_DIR), env=ENV, check=True)
# 再 ingest
recipe = REPO_ROOT / "datahub" / "datahub_dbt_recipe.yml"
subprocess.run(["datahub", "ingest", "--recipe", str(recipe)], env=ENV, check=True)
context.log.info(f"Ingested to {DATAHUB_GMS}; UI: {DATAHUB_UI}")
# 定义作业与调度
job = define_asset_job(
name="governance_job",
selection=(AssetSelection.assets(seed_duckdb) | AssetSelection.keys(*[a.key for a in dbt_assets]))
.downstream() # 包含 dbt_tests / ge_quality / datahub_ingest
)
schedule = ScheduleDefinition(
job=job, cron_schedule="15 2 * * *", execution_timezone="America/Chicago"
)
defs = Definitions(
assets=[seed_duckdb, dbt_tests, ge_quality, datahub_ingest, *dbt_assets],
schedules=[schedule],
resources={
# 若后续想用 DbtCliResource 驱动 build/test,可在 op/asset 中使用它
"dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR), profiles_dir=str(DBT_PROFILES_DIR)),
},
)
3) 启动 Dagster(本机方式)
# 1) 准备运行目录
export DAGSTER_HOME=$(pwd)/.dagster_home
mkdir -p "$DAGSTER_HOME"
# 2) 最小配置(本地 sqlite + 守护进程调度)
cat > "$DAGSTER_HOME/dagster.yaml" <<'YAML'
scheduler:
module: dagster._core.scheduler
class: DagsterDaemonScheduler
run_coordinator:
module: dagster._core.run_coordinator
class: DefaultRunCoordinator
YAML
# 3) 启动守护进程(负责定时触发)
dagster-daemon run
# 4) 启动 WebUI(另开终端)
dagster-webserver -m orchestration.dagster.assets_defs -h 0.0.0.0 -p 3000
打开 http://localhost:3000 ,启用 governance_job@15 2 * * *
,或在 UI 手动执行一次验证链路。
🔎 对比与实践建议
- 可观测性:Airflow 对 Task 级日志、重试、依赖位图更直观;Dagster 以“资产”为一等公民,适合治理主题。
- 失败策略:演示中
dbt test/GE
不阻断后续(保持 DataHub 入库);若想严厉把关,把|| true
去掉或在 Dagster 中check=True
。 扩展:
- Airflow 可拆成更多任务(如
dbt build --select tag:...
分层重试)。 - Dagster 可改用
Software-defined Assets + Freshness Policy
、Auto-materialize
、dbt build/test
事件流。
- Airflow 可拆成更多任务(如