Airflow 風格資料處理 DAG

Data
8 個節點 · 9 條連接data
ex-airflow-dag.osop.yaml
# Airflow-Style DAG Workflow
# Sensor-driven pipeline with parallel transforms, quality checks, and notification.
osop_version: "2.0"
id: airflow-dag
name: "Airflow 風格資料處理 DAG"

nodes:
  - id: file_sensor
    type: system
    purpose: Wait for upstream data file to appear in S3 landing zone
    runtime:
      tool: s3-sensor
    outputs:
      - file_path
    timeout_sec: 3600
    explain: "Polls S3 every 60 seconds until the expected file lands or timeout."

  - id: extract
    type: api
    purpose: Download the landed file and stage for processing
    runtime:
      endpoint: /v1/stage
      method: POST
      url: https://data-platform.internal
    inputs:
      - file_path
    outputs:
      - staged_data

  - id: transform_users
    type: data
    purpose: Transform and enrich user dimension data
    runtime:
      engine: spark
      config:
        app_name: transform-users
        master: k8s
    inputs:
      - staged_data
    outputs:
      - users_table

  - id: transform_events
    type: data
    purpose: Transform and aggregate event fact data
    runtime:
      engine: spark
      config:
        app_name: transform-events
        master: k8s
    inputs:
      - staged_data
    outputs:
      - events_table

  - id: transform_metrics
    type: data
    purpose: Compute daily business metrics from events
    runtime:
      engine: spark
      config:
        app_name: transform-metrics
        master: k8s
    inputs:
      - staged_data
    outputs:
      - metrics_table

  - id: load_warehouse
    type: db
    purpose: Load all transformed tables into the data warehouse
    runtime:
      engine: bigquery
      connection: project.analytics
    inputs:
      - users_table
      - events_table
      - metrics_table
    outputs:
      - load_status

  - id: quality_check
    type: data
    purpose: Run data quality assertions on loaded tables
    runtime:
      engine: dbt
      config:
        command: test
        select: "tag:daily_quality"
    inputs:
      - load_status
    outputs:
      - quality_result

  - id: notify
    type: api
    purpose: Send pipeline completion notification to Slack and PagerDuty
    runtime:
      endpoint: /v1/notify
      method: POST
      url: https://alerts.internal
    inputs:
      - quality_result
    outputs:
      - notification_status

edges:
  - from: file_sensor
    to: extract
    mode: event
    explain: "Triggered when sensor detects the file."

  - from: extract
    to: transform_users
    mode: parallel

  - from: extract
    to: transform_events
    mode: parallel

  - from: extract
    to: transform_metrics
    mode: parallel
    explain: "All three transforms run concurrently after extraction."

  - from: transform_users
    to: load_warehouse
    mode: sequential

  - from: transform_events
    to: load_warehouse
    mode: sequential

  - from: transform_metrics
    to: load_warehouse
    mode: sequential

  - from: load_warehouse
    to: quality_check
    mode: sequential

  - from: quality_check
    to: notify
    mode: sequential