每日 ETL 流水線
PR ReadyS3 → 驗證 → 轉換 → Snowflake → 儀表板重整,並在失敗時發送告警。
6 個節點 · 8 條連接pr ready
airflowetldata-engineeringsnowflakedbt
視覺化
從 S3 提取資料data
從 s3://data-lake/raw/ 下載每日 CSV 匯出檔案。
↓sequential→ 驗證 Schema
↓error→ Slack 失敗告警
驗證 Schemacicd
依據預期值檢查欄位型別、可為空性與資料列數。
↓sequential→ 使用 dbt 轉換資料
↓error→ Slack 失敗告警
使用 dbt 轉換資料cli
執行 dbt 模型進行資料清理、去重與資料表合併。
↓sequential→ 載入 Snowflake
↓error→ Slack 失敗告警
載入 Snowflakedb
將轉換後的資料寫入 Snowflake 正式環境資料表。
↓sequential→ 重整 Metabase 儀表板
↓error→ Slack 失敗告警
重整 Metabase 儀表板api
觸發高階主管 KPI 儀表板的快取失效。
Slack 失敗告警api
在 #data-alerts 頻道發布含錯誤詳情與執行連結的訊息。
ex-airflow-etl-pipeline.osop.yaml
# Airflow ETL Pipeline — OSOP Portable Workflow
#
# Classic extract-transform-load: pulls raw data from S3, validates the
# schema, transforms with dbt, loads into Snowflake, refreshes a Metabase
# dashboard, and sends a Slack alert on any failure.
#
# Run with Airflow or validate: osop validate airflow-etl-pipeline.osop.yaml
osop_version: "1.0"
id: "airflow-etl-pipeline"
name:"每日 ETL 流水線"
description:"S3 → 驗證 → 轉換 → Snowflake → 儀表板重整,並在失敗時發送告警。"
version: "1.0.0"
tags: [airflow, etl, data-engineering, snowflake, dbt]
nodes:
- id: "extract_s3"
type: "data"
name: "從 S3 提取資料"
description: "從 s3://data-lake/raw/ 下載每日 CSV 匯出檔案。"
config:
source: "s3://data-lake/raw/{{ds}}/"
format: csv
- id: "validate_schema"
type: "cicd"
subtype: "test"
name: "驗證 Schema"
description: "依據預期值檢查欄位型別、可為空性與資料列數。"
config:
tool: "great_expectations"
suite: "raw_data_suite"
- id: "transform_dbt"
type: "cli"
subtype: "script"
name: "使用 dbt 轉換資料"
description: "執行 dbt 模型進行資料清理、去重與資料表合併。"
config:
command: "dbt run --select tag:daily"
- id: "load_warehouse"
type: "db"
name: "載入 Snowflake"
description: "將轉換後的資料寫入 Snowflake 正式環境資料表。"
config:
target: "snowflake://analytics/prod"
- id: "refresh_dashboard"
type: "api"
subtype: "rest"
name: "重整 Metabase 儀表板"
description: "觸發高階主管 KPI 儀表板的快取失效。"
config:
url: "https://metabase.internal/api/card/{{card_id}}/refresh"
- id: "notify_failure"
type: "api"
subtype: "rest"
name: "Slack 失敗告警"
description: "在 #data-alerts 頻道發布含錯誤詳情與執行連結的訊息。"
config:
channel: "#data-alerts"
edges:
- from: "extract_s3"
to: "validate_schema"
mode: "sequential"
- from: "validate_schema"
to: "transform_dbt"
mode: "sequential"
- from: "transform_dbt"
to: "load_warehouse"
mode: "sequential"
- from: "load_warehouse"
to: "refresh_dashboard"
mode: "sequential"
- from: "extract_s3"
to: "notify_failure"
mode: "error"
label: "Any step fails"
- from: "validate_schema"
to: "notify_failure"
mode: "error"
- from: "transform_dbt"
to: "notify_failure"
mode: "error"
- from: "load_warehouse"
to: "notify_failure"
mode: "error"