本地数据治理流水线(dbt + Great Expectations + DataHub + DuckDB)

📦 示例仓库结构

dbt-ge-datahub-duckdb-demo/
├─ README.md
├─ requirements.txt
├─ .env.example
├─ Makefile
├─ scripts/
│  ├─ up.sh
│  ├─ down.sh
│  ├─ seed_duckdb.py
│  └─ run_governance.sh
├─ datahub/
│  └─ datahub_dbt_recipe.yml
├─ dbt_project/
│  ├─ dbt_project.yml
│  ├─ models/
│  │  ├─ stg_sales.sql
│  │  └─ schema.yml
│  └─ .dbt/
│     └─ profiles.yml
└─ ge/                               # Great Expectations(最小可用配置)
   ├─ great_expectations.yml
   ├─ checkpoints/
   │  └─ stg_sales_checkpoint.yml
   ├─ expectations/
   │  └─ stg_sales_suite.json
   └─ plugins/

📝 README.md(精简版)

# dbt + GE + DataHub + DuckDB Demo

本仓库演示:
- 用 DuckDB 作本地数仓
- 用 dbt 产出模型与血缘(manifest.json / catalog.json)
- 用 Great Expectations 做质量校验(含失败用例)
- 用 DataHub 展示元数据、血缘与质量结果

## 先决条件
- Linux / macOS(Windows 请用 WSL2)
- Docker & Docker Compose
- Python 3.8+
- 端口:DataHub UI 默认 http://localhost:9002,GMS http://localhost:8080

## 快速开始
```bash
python3 -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env
make up              # 启动 DataHub(官方 quickstart)并初始化 DuckDB 示例数据
make pipeline        # 跑 dbt + GE,并把 dbt 元数据推送到 DataHub
# 浏览 http://localhost:9002 搜索 stg_sales,查看 Schema / Lineage / Quality

关闭与清理

make down
若你已自建 DataHub 或端口不同,修改 .env 里的 DATAHUB_GMS / DATAHUB_UI 即可。

---

## 📄 requirements.txt

```txt
acryl-datahub==0.13.3
duckdb==1.0.0
dbt-duckdb==1.8.2
great-expectations==0.18.12
duckdb-engine==0.13.0
版本只是示例,若你本机已有依赖可按需调整。

⚙️ .env.example

# DataHub 服务地址(官方 quickstart 暴露)
DATAHUB_GMS=http://localhost:8080
DATAHUB_UI=http://localhost:9002

# DuckDB 文件(脚本会在仓库根目录生成)
DUCKDB_PATH=./warehouse.duckdb

# dbt profiles 目录(本仓库内置 .dbt 目录)
DBT_PROFILES_DIR=./dbt_project/.dbt

🛠️ Makefile

include .env
export

.PHONY: up down seed dbt-run dbt-test ge-run ingest pipeline

up:
    @bash scripts/up.sh

down:
    @bash scripts/down.sh

seed:
    @python3 scripts/seed_duckdb.py

dbt-run:
    @cd dbt_project && dbt run

dbt-test:
    @cd dbt_project && dbt test || true

ge-run:
    @great_expectations checkpoint run -y -d ge stg_sales_checkpoint || true

ingest:
    @cd dbt_project && dbt docs generate
    @datahub ingest --recipe datahub/datahub_dbt_recipe.yml

pipeline: seed dbt-run dbt-test ge-run ingest
    @echo "✅ Governance pipeline completed."
dbt testGE 故意允许失败(返回 0),以便流水线继续把“失败的质量结果”上报到 DataHub。

🚀 scripts/up.sh(一键启动 DataHub + 初始化数据

#!/usr/bin/env bash
set -euo pipefail

# 1) 启动 DataHub(官方 quickstart)
if ! command -v datahub >/dev/null 2>&1; then
  echo "Installing acryl-datahub CLI..."
  pip install acryl-datahub
fi

echo "Starting DataHub quickstart (this may take several minutes on first run)..."
datahub docker quickstart --no-populate-sample-data

# 2) 初始化 DuckDB 测试数据
echo "Seeding DuckDB sample data..."
python3 scripts/seed_duckdb.py

echo "✅ DataHub UI: ${DATAHUB_UI}"
使用 datahub docker quickstart 避免你手写一长串 Compose 服务,兼容性最好。

🧹 scripts/down.sh

#!/usr/bin/env bash
set -euo pipefail
echo "Stopping DataHub..."
datahub docker nuke --confirm
rm -f "${DUCKDB_PATH:-./warehouse.duckdb}" || true
echo "✅ Down complete."

🌱 scripts/seed_duckdb.py

import os, duckdb

db_path = os.getenv("DUCKDB_PATH", "./warehouse.duckdb")
con = duckdb.connect(db_path)
con.execute("""
CREATE TABLE IF NOT EXISTS sales (
    id INTEGER,
    product VARCHAR,
    amount DECIMAL(10,2),
    sale_date DATE
);
DELETE FROM sales;
INSERT INTO sales VALUES
(1, 'Laptop', 1200.00, DATE '2025-01-10'),
(2, 'Mouse', NULL,     DATE '2025-01-11'),
(3, 'Keyboard', 80.00, DATE '2025-01-12');
""")
con.close()
print(f"Seeded DuckDB at {db_path}")

▶️ scripts/run_governance.sh(本地一键跑全链路)

#!/usr/bin/env bash
set -euo pipefail

echo "🔄 dbt run..."
make dbt-run

echo "🧪 dbt test (expect failure on amount not_null)..."
make dbt-test

echo "📊 Great Expectations checkpoint..."
make ge-run

echo "📤 Ingest dbt metadata to DataHub..."
make ingest

echo "✅ Governance pipeline completed."

🗂️ dbt_project/dbt_project.yml

name: 'my_governance_demo'
version: '1.0'
config-version: 2

profile: 'my_governance_demo'

model-paths: ["models"]
target-path: "target"
clean-targets: ["target", "dbt_packages"]

dbt_project/models/stg_sales.sql

select
  id,
  product,
  amount,
  sale_date
from sales

dbt_project/models/schema.yml

version: 2

models:
  - name: stg_sales
    columns:
      - name: id
        tests:
          - unique
          - not_null
      - name: amount
        tests:
          - not_null   # 故意失败:有一行 NULL

dbt_project/.dbt/profiles.yml(仓库内置,便于 CI)

my_governance_demo:
  target: dev
  outputs:
    dev:
      type: duckdb
      path: "{{ env_var('DUCKDB_PATH') }}"
      extensions: ['httpfs','json']
通过设置 DBT_PROFILES_DIR=./dbt_project/.dbt,dbt 会优先读取这份 profiles。

🧩 ge/great_expectations.yml(最小可用 + DataHub Action)

config_version: 3

plugins_directory: plugins/
expectations_store_name: expectations_store
validations_store_name: validations_store
evaluation_parameter_store_name: evaluation_parameter_store

stores:
  expectations_store:
    class_name: ExpectationsStore
  validations_store:
    class_name: ValidationsStore
  checkpoint_store:
    class_name: CheckpointStore

data_docs_sites: {}  # 简化,必要时可开启 HTML 报告

anonymous_usage_statistics:
  enabled: false

# 将验证结果上报到 DataHub
validation_operators:
  action_list_operator:
    class_name: ActionListValidationOperator
    action_list:
      - name: datahub
        action:
          class_name: DataHubValidationAction
          config:
            datahub_api:
              server: ${DATAHUB_GMS}
              timeout_sec: 30

ge/checkpoints/stg_sales_checkpoint.yml

name: stg_sales_checkpoint
config_version: 1
class_name: Checkpoint
validations:
  - batch_request:
      datasource_name: duckdb_source
      data_connector_name: runtime_data_connector
      data_asset_name: stg_sales
      runtime_parameters:
        query: "select * from stg_sales"
      batch_identifiers:
        default_identifier_name: default
    expectation_suite_name: stg_sales_suite
action_list:
  - name: datahub
    action:
      class_name: DataHubValidationAction
      config:
        datahub_api:
          server: ${DATAHUB_GMS}

ge/expectations/stg_sales_suite.json(简单的 not_null / unique,对应 dbt 测试)

{
  "expectation_suite_name": "stg_sales_suite",
  "expectations": [
    {"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "id"}},
    {"expectation_type": "expect_column_values_to_be_unique", "kwargs": {"column": "id"}},
    {"expectation_type": "expect_column_values_to_not_be_null", "kwargs": {"column": "amount"}}
  ],
  "meta": {}
}
为了不引入复杂的数据源注册,这里用 runtime 方式直接查询 stg_sales。如需标准化,可补充 datasources 配置,使用 duckdb-engine 指向 DUCKDB_PATH

🔗 datahub/datahub_dbt_recipe.yml(dbt 元数据上报)

source:
  type: dbt
  config:
    manifest_path: dbt_project/target/manifest.json
    catalog_path: dbt_project/target/catalog.json
    sources_path: dbt_project/target/sources.json
    target_platform: duckdb
    write_semantic_models: false

sink:
  type: datahub-rest
  config:
    server: ${DATAHUB_GMS}

🧪 GitHub Actions(.github/workflows/governance.yml)

name: governance-pipeline

on:
  push:
    branches: [ main ]
  pull_request:

jobs:
  run-pipeline:
    runs-on: ubuntu-latest
    env:
      DATAHUB_GMS: ${{ secrets.DATAHUB_GMS || 'http://localhost:8080' }}
      DATAHUB_UI:  ${{ secrets.DATAHUB_UI  || 'http://localhost:9002' }}
      DUCKDB_PATH: ./warehouse.duckdb
      DBT_PROFILES_DIR: ./dbt_project/.dbt

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install deps
        run: |
          python -m pip install -U pip
          pip install -r requirements.txt

      - name: Optionally start DataHub via CLI (self-hosted only)
        if: ${{ env.DATAHUB_GMS == 'http://localhost:8080' }}
        run: |
          datahub docker quickstart --no-populate-sample-data
          # 等待 GMS 就绪
          for i in {1..60}; do
            curl -sSf "${DATAHUB_GMS}/health" && break || sleep 5
          done

      - name: Seed DuckDB
        run: python scripts/seed_duckdb.py

      - name: dbt run + test
        working-directory: ./dbt_project
        run: |
          dbt run
          dbt test || true

      - name: GE checkpoint (send result to DataHub)
        run: |
          great_expectations checkpoint run -y -d ge stg_sales_checkpoint || true

      - name: Ingest dbt metadata to DataHub
        run: |
          cd dbt_project && dbt docs generate
          datahub ingest --recipe datahub/datahub_dbt_recipe.yml

      - name: Print UI URL
        run: echo "DataHub UI => ${DATAHUB_UI}"

      - name: Tear down local DataHub (if started here)
        if: ${{ env.DATAHUB_GMS == 'http://localhost:8080' }}
        run: datahub docker nuke --confirm || true
说明:
1)若你在 CI 中使用公网 DataHub(推荐),把 DATAHUB_GMS/DATAHUB_UI 配到仓库 Secrets,即可直接推送;
2)若没配 Secrets,workflow 会在 CI 里临时拉起本地 DataHub,再跑完即销毁;
3)dbt test / GE 允许失败,以便把“失败的质量结果”同步到 DataHub 做演示。

✅ 你将得到的效果

  • DataHub UI(/datasets 搜索 stg_sales)可看到:

    • 列级 Schema
    • 上游血缘:sales → stg_sales
    • 质量结果页签(GE 上报的断言;amount not_null失败
  • 本地与 CI 都能一键跑通;你也可以把 DuckDB 换成 Postgres/Snowflake,仅需调整 dbt profile 与 GE 数据源。

🔧 小贴士(避坑)
  • DataHub 端口:UI 默认 9002,GMS 默认 8080;若端口冲突,改 .env 并传入脚本或 CI 环境。
  • dbt docs generate 必须成功,target/manifest.json 缺失会导致血缘不可见。
  • GE 与 dbt 测试会有重复:建议保留 dbt 做基础约束(主键唯一/非空),GE 做分布/范围/阈值等更丰富校验。
  • Windows:请用 WSL2,避免路径与 Docker 的兼容性问题。

附录

Airflow 多任务 DAG & Dagster 资产版

目标:把 scripts/run_governance.sh 拆成可观测、可重试的编排流。提供 Airflow 多任务版Dagster 资产(SDA)版 两套完整实现,默认时区 America/Chicago,每日 02:15 运行。均假设你已有本文前置的仓库结构与脚本。

🎛 前置约定
  • 仓库根(REPO_ROOT)包含:scripts/run_governance.shdbt_project/ge/datahub/datahub_dbt_recipe.ymlwarehouse.duckdb(或由 scripts/seed_duckdb.py 生成)。
  • 环境变量(两套方案均读取):

    • DATAHUB_GMS(默认 http://localhost:8080
    • DATAHUB_UI(默认 http://localhost:9002
    • DUCKDB_PATH(默认 ${REPO_ROOT}/warehouse.duckdb
    • DBT_PROFILES_DIR(默认 ${REPO_ROOT}/dbt_project/.dbt

方案 A:Airflow 多任务拆分版(dbt run → dbt test → GE → ingest)

1) DAG 文件:airflow/dags/governance_pipeline_dag.py
# airflow/dags/governance_pipeline_dag.py
import os
import pendulum
from datetime import timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator

# 调度:每天 02:15 America/Chicago
TZ = pendulum.timezone("America/Chicago")

# 从 Airflow Variables 读取,或退回默认值
# 在 UI: Admin -> Variables 可覆盖
try:
    from airflow.models import Variable
    V = lambda k, d: Variable.get(k, default_var=d)
except Exception:  # scheduler 尚未初始化等场景
    V = lambda k, d: d

REPO_ROOT = V("repo_root", "/opt/airflow/repo")
DATAHUB_GMS = V("datahub_gms", "http://host.docker.internal:8080")
DATAHUB_UI  = V("datahub_ui",  "http://host.docker.internal:9002")
DUCKDB_PATH = V("duckdb_path", f"{REPO_ROOT}/warehouse.duckdb")
DBT_PROFILES_DIR = V("dbt_profiles_dir", f"{REPO_ROOT}/dbt_project/.dbt")

DEFAULT_ENV = {
    "DATAHUB_GMS": DATAHUB_GMS,
    "DATAHUB_UI": DATAHUB_UI,
    "DUCKDB_PATH": DUCKDB_PATH,
    "DBT_PROFILES_DIR": DBT_PROFILES_DIR,
    "PYTHONUNBUFFERED": "1",
}

def _bash(task_id: str, cmd: str, retries=1, minutes=5, **kw) -> BashOperator:
    return BashOperator(
        task_id=task_id,
        bash_command=f"bash -lc '{cmd}'",
        env=DEFAULT_ENV,
        retries=retries,
        retry_delay=timedelta(minutes=minutes),
        **kw,
    )

default_args = {
    "owner": "data-governance",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
}

with DAG(
    dag_id="governance_pipeline_multi",
    description="dbt + GE + DataHub (multi-task with retries)",
    start_date=pendulum.datetime(2025, 1, 1, tz=TZ),
    schedule="15 2 * * *",
    catchup=False,
    default_args=default_args,
    max_active_runs=1,
    tags=["governance", "dbt", "datahub", "ge", "duckdb"],
) as dag:

    # 可选:播种 DuckDB(幂等)
    seed = _bash(
        task_id="seed_duckdb",
        cmd="python3 scripts/seed_duckdb.py",
        retries=1,
        minutes=2,
    )

    # 1) dbt run —— 关键阶段:失败需重试
    dbt_run = _bash(
        task_id="dbt_run",
        cmd="cd dbt_project && dbt run",
        retries=2,
        minutes=3,
    )

    # 2) dbt test —— 允许业务校验失败,但不中断编排
    # 用 `|| true` 保持任务成功,从而让后续 GE / ingest 继续执行
    dbt_test = _bash(
        task_id="dbt_test",
        cmd="cd dbt_project && dbt test || true",
        retries=0,
    )

    # 3) Great Expectations —— 将质量结果上报 DataHub(Action 已在 GE 配置)
    ge_checkpoint = _bash(
        task_id="ge_checkpoint",
        cmd="great_expectations checkpoint run -y -d ge stg_sales_checkpoint || true",
        retries=1,
        minutes=2,
    )

    # 4) Ingest dbt 元数据与血缘 —— 对网络抖动更敏感,给更多重试与指数退避
    ingest = BashOperator(
        task_id="datahub_ingest",
        bash_command=(
            "bash -lc 'cd dbt_project && dbt docs generate && "
            "datahub ingest --recipe ../datahub/datahub_dbt_recipe.yml'"
        ),
        env=DEFAULT_ENV,
        retries=3,
        retry_exponential_backoff=True,
        retry_delay=timedelta(minutes=2),
        max_retry_delay=timedelta(minutes=15),
    )

    seed >> dbt_run
    dbt_run >> [dbt_test, ge_checkpoint]
    [dbt_test, ge_checkpoint] >> ingest

2) 最简运行容器:docker-compose.airflow.yml
version: "3.8"
services:
  airflow:
    image: apache/airflow:2.9.2
    container_name: airflow-governance
    ports:
      - "8088:8080"  # Airflow UI -> http://localhost:8088
    environment:
      AIRFLOW__CORE__LOAD_EXAMPLES: "False"
      AIRFLOW__WEBSERVER__DEFAULT_UI_TIMEZONE: "America/Chicago"
      _AIRFLOW_WWW_USER_USERNAME: admin
      _AIRFLOW_WWW_USER_PASSWORD: admin
    volumes:
      - ./airflow/dags:/opt/airflow/dags
      - ./:/opt/airflow/repo
    command: >
      bash -lc "
      pip install -r /opt/airflow/repo/requirements.txt &&
      airflow db migrate &&
      airflow users create --role Admin --username admin --password admin --firstname A --lastname F --email admin@example.com || true &&
      airflow scheduler & exec airflow webserver
      "
启动:docker compose -f docker-compose.airflow.yml up -d → 打开 UI 启用 governance_pipeline_multi

方案 B:Dagster 资产版(Software-Defined Assets)

资产图 表达数据工序: seed_duckdbdbt_assets → (dbt_tests, ge_quality) → datahub_ingest

使用 dagster-dbt 原生加载 dbt 资产,质量与元数据上报走 CLI(简单直观)。

1) 额外依赖

requirements.txt 追加:

dagster==1.8.1
dagster-webserver==1.8.1
dagster-dbt==0.23.1
版本与 dbt-duckdb==1.8.x 兼容。若你固定了 Python 版本,请按需微调。

2) 资产与调度:orchestration/dagster/assets_defs.py
# orchestration/dagster/assets_defs.py
import os
import subprocess
from pathlib import Path
from dagster import (
    asset, AssetExecutionContext, Definitions,
    define_asset_job, ScheduleDefinition, AssetSelection
)
from dagster_dbt import DbtCliResource, load_assets_from_dbt_project

REPO_ROOT = Path(os.getenv("REPO_ROOT", Path(__file__).resolve().parents[2]))
DBT_PROJECT_DIR = REPO_ROOT / "dbt_project"
DBT_PROFILES_DIR = Path(os.getenv("DBT_PROFILES_DIR", REPO_ROOT / "dbt_project" / ".dbt"))
DATAHUB_GMS = os.getenv("DATAHUB_GMS", "http://localhost:8080")
DATAHUB_UI = os.getenv("DATAHUB_UI", "http://localhost:9002")
DUCKDB_PATH = Path(os.getenv("DUCKDB_PATH", REPO_ROOT / "warehouse.duckdb"))
ENV = {
    "DATAHUB_GMS": DATAHUB_GMS,
    "DATAHUB_UI": DATAHUB_UI,
    "DUCKDB_PATH": str(DUCKDB_PATH),
    "DBT_PROFILES_DIR": str(DBT_PROFILES_DIR),
    "PYTHONUNBUFFERED": "1",
    **os.environ,
}

# 0) 播种底表(幂等)
@asset(description="Seed DuckDB with demo data")
def seed_duckdb(context: AssetExecutionContext):
    script = REPO_ROOT / "scripts" / "seed_duckdb.py"
    script.chmod(script.stat().st_mode | 0o111)
    subprocess.run(["python3", str(script)], env=ENV, check=True)
    context.log.info(f"Seeded {DUCKDB_PATH}")

# 1) 加载 dbt 资产(模型、快照、种子等)
dbt_assets = load_assets_from_dbt_project(
    project_dir=str(DBT_PROJECT_DIR),
    profiles_dir=str(DBT_PROFILES_DIR),
    # 你也可以在 selection 中只选某些模型,例如 'stg_sales'
)

# 2) dbt test(允许断言失败,不阻断后续)
@asset(deps=[dbt_assets], description="Run dbt tests; continue on failure")
def dbt_tests(context: AssetExecutionContext):
    cmd = [
        "dbt", "test",
        "--project-dir", str(DBT_PROJECT_DIR),
        "--profiles-dir", str(DBT_PROFILES_DIR),
    ]
    result = subprocess.run(cmd, env=ENV, capture_output=True, text=True)
    context.log.info(result.stdout)
    if result.returncode != 0:
        context.log.warning("dbt tests reported failures (expected in demo); continuing")

# 3) Great Expectations 校验并上报 DataHub(GE 配置含 DataHub Action)
@asset(deps=[dbt_assets], description="Run GE checkpoint and emit to DataHub")
def ge_quality(context: AssetExecutionContext):
    cmd = [
        "great_expectations", "checkpoint", "run", "-y", "-d", "ge", "stg_sales_checkpoint",
    ]
    result = subprocess.run(cmd, env=ENV, capture_output=True, text=True)
    context.log.info(result.stdout)
    if result.returncode != 0:
        context.log.warning("GE checkpoint returned non-zero (allowed in demo)")

# 4) 推送 dbt 元数据与血缘
@asset(deps=[dbt_assets, ge_quality, dbt_tests], description="Ingest dbt artifacts to DataHub")
def datahub_ingest(context: AssetExecutionContext):
    # 先生成 artifact
    subprocess.run(["dbt", "docs", "generate"], cwd=str(DBT_PROJECT_DIR), env=ENV, check=True)
    # 再 ingest
    recipe = REPO_ROOT / "datahub" / "datahub_dbt_recipe.yml"
    subprocess.run(["datahub", "ingest", "--recipe", str(recipe)], env=ENV, check=True)
    context.log.info(f"Ingested to {DATAHUB_GMS}; UI: {DATAHUB_UI}")

# 定义作业与调度
job = define_asset_job(
    name="governance_job",
    selection=(AssetSelection.assets(seed_duckdb) | AssetSelection.keys(*[a.key for a in dbt_assets]))
              .downstream()  # 包含 dbt_tests / ge_quality / datahub_ingest
)

schedule = ScheduleDefinition(
    job=job, cron_schedule="15 2 * * *", execution_timezone="America/Chicago"
)

defs = Definitions(
    assets=[seed_duckdb, dbt_tests, ge_quality, datahub_ingest, *dbt_assets],
    schedules=[schedule],
    resources={
        # 若后续想用 DbtCliResource 驱动 build/test,可在 op/asset 中使用它
        "dbt": DbtCliResource(project_dir=str(DBT_PROJECT_DIR), profiles_dir=str(DBT_PROFILES_DIR)),
    },
)

3) 启动 Dagster(本机方式)
# 1) 准备运行目录
export DAGSTER_HOME=$(pwd)/.dagster_home
mkdir -p "$DAGSTER_HOME"

# 2) 最小配置(本地 sqlite + 守护进程调度)
cat > "$DAGSTER_HOME/dagster.yaml" <<'YAML'
scheduler:
  module: dagster._core.scheduler
  class: DagsterDaemonScheduler
run_coordinator:
  module: dagster._core.run_coordinator
  class: DefaultRunCoordinator
YAML

# 3) 启动守护进程(负责定时触发)
dagster-daemon run

# 4) 启动 WebUI(另开终端)
dagster-webserver -m orchestration.dagster.assets_defs -h 0.0.0.0 -p 3000

打开 http://localhost:3000 ,启用 governance_job@15 2 * * *,或在 UI 手动执行一次验证链路。


🔎 对比与实践建议

  • 可观测性:Airflow 对 Task 级日志、重试、依赖位图更直观;Dagster 以“资产”为一等公民,适合治理主题。
  • 失败策略:演示中 dbt test/GE 不阻断后续(保持 DataHub 入库);若想严厉把关,把 || true 去掉或在 Dagster 中 check=True
  • 扩展

    • Airflow 可拆成更多任务(如 dbt build --select tag:... 分层重试)。
    • Dagster 可改用 Software-defined Assets + Freshness PolicyAuto-materializedbt build/test 事件流。

添加新评论