ETL 資料管線
Data6 個節點 · 6 條連接data
視覺化
ex-etl-pipeline.osop.yaml
# ETL Pipeline
# Extract from external API, transform with Spark, validate, load to warehouse.
osop_version: "2.0"
id: etl-pipeline
name: "ETL 資料管線"
nodes:
- id: extract_api
type: api
purpose: Extract raw data from third-party analytics API
runtime:
endpoint: /v2/export
method: GET
url: https://analytics.vendor.com
security:
auth: bearer_token
secret_ref: ANALYTICS_API_KEY
outputs:
- raw_data
retry_policy:
max_retries: 3
backoff_sec: 10
timeout_sec: 120
- id: clean_data
type: data
purpose: Remove duplicates, handle nulls, and fix encoding issues
runtime:
engine: spark
config:
app_name: etl-clean
master: yarn
inputs:
- raw_data
outputs:
- cleaned_data
- id: normalize
type: data
purpose: Normalize fields, convert types, and apply business rules
runtime:
engine: spark
config:
app_name: etl-normalize
master: yarn
inputs:
- cleaned_data
outputs:
- normalized_data
- id: validate_schema
type: cli
purpose: Validate transformed data against expected schema
runtime:
command: "great_expectations checkpoint run etl_validation"
inputs:
- normalized_data
outputs:
- validation_result
- id: load_warehouse
type: db
purpose: Load validated data into the analytics warehouse
runtime:
engine: bigquery
connection: project.analytics.fact_events
inputs:
- normalized_data
outputs:
- load_receipt
explain: "Only loads if validation passed; otherwise pipeline halts."
- id: update_dashboard
type: api
purpose: Trigger dashboard refresh after successful data load
runtime:
endpoint: /api/refresh
method: POST
url: https://dashboard.internal
inputs:
- load_receipt
outputs:
- refresh_status
edges:
- from: extract_api
to: clean_data
mode: sequential
- from: clean_data
to: normalize
mode: sequential
- from: normalize
to: validate_schema
mode: sequential
- from: validate_schema
to: load_warehouse
mode: conditional
condition: "validation_result.success == true"
- from: load_warehouse
to: update_dashboard
mode: sequential
- from: validate_schema
to: extract_api
mode: error
condition: "validation_result.success == false"
explain: "On validation failure, re-extract with corrective parameters."