Daily ETL Pipeline
PR ReadyS3 → validate → transform → Snowflake → dashboard refresh with failure alerts.
6 nodes · 8 edgespr ready
airflowetldata-engineeringsnowflakedbt
Visual
Extract from S3data
Download daily CSV exports from s3://data-lake/raw/.
↓sequential→ Validate Schema
↓error→ Slack Failure Alert
Validate Schemacicd
Check column types, nullability, and row count against expectations.
↓sequential→ Transform with dbt
↓error→ Slack Failure Alert
Transform with dbtcli
Run dbt models to clean, deduplicate, and join tables.
↓sequential→ Load to Snowflake
↓error→ Slack Failure Alert
Load to Snowflakedb
Insert transformed data into Snowflake production tables.
↓sequential→ Refresh Metabase Dashboard
↓error→ Slack Failure Alert
Refresh Metabase Dashboardapi
Trigger cache invalidation on the executive KPI dashboard.
Slack Failure Alertapi
Post to #data-alerts with error details and run link.
ex-airflow-etl-pipeline.osop.yaml
# Airflow ETL Pipeline — OSOP Portable Workflow
#
# Classic extract-transform-load: pulls raw data from S3, validates the
# schema, transforms with dbt, loads into Snowflake, refreshes a Metabase
# dashboard, and sends a Slack alert on any failure.
#
# Run with Airflow or validate: osop validate airflow-etl-pipeline.osop.yaml
osop_version: "1.0"
id: "airflow-etl-pipeline"
name: "Daily ETL Pipeline"
description: "S3 → validate → transform → Snowflake → dashboard refresh with failure alerts."
version: "1.0.0"
tags: [airflow, etl, data-engineering, snowflake, dbt]
nodes:
- id: "extract_s3"
type: "data"
name: "Extract from S3"
description: "Download daily CSV exports from s3://data-lake/raw/."
config:
source: "s3://data-lake/raw/{{ds}}/"
format: csv
- id: "validate_schema"
type: "cicd"
subtype: "test"
name: "Validate Schema"
description: "Check column types, nullability, and row count against expectations."
config:
tool: "great_expectations"
suite: "raw_data_suite"
- id: "transform_dbt"
type: "cli"
subtype: "script"
name: "Transform with dbt"
description: "Run dbt models to clean, deduplicate, and join tables."
config:
command: "dbt run --select tag:daily"
- id: "load_warehouse"
type: "db"
name: "Load to Snowflake"
description: "Insert transformed data into Snowflake production tables."
config:
target: "snowflake://analytics/prod"
- id: "refresh_dashboard"
type: "api"
subtype: "rest"
name: "Refresh Metabase Dashboard"
description: "Trigger cache invalidation on the executive KPI dashboard."
config:
url: "https://metabase.internal/api/card/{{card_id}}/refresh"
- id: "notify_failure"
type: "api"
subtype: "rest"
name: "Slack Failure Alert"
description: "Post to #data-alerts with error details and run link."
config:
channel: "#data-alerts"
edges:
- from: "extract_s3"
to: "validate_schema"
mode: "sequential"
- from: "validate_schema"
to: "transform_dbt"
mode: "sequential"
- from: "transform_dbt"
to: "load_warehouse"
mode: "sequential"
- from: "load_warehouse"
to: "refresh_dashboard"
mode: "sequential"
- from: "extract_s3"
to: "notify_failure"
mode: "error"
label: "Any step fails"
- from: "validate_schema"
to: "notify_failure"
mode: "error"
- from: "transform_dbt"
to: "notify_failure"
mode: "error"
- from: "load_warehouse"
to: "notify_failure"
mode: "error"