使用子工作流程的模組化管線

v1.1 Feature

展示 v1.1 子工作流程功能——以可重用的建構單元組合複雜管線。

6 個節點 · 6 條連接v1.1 features
v1.1sub-workflowcompositiondata-pipeline
視覺化
extract_salessystem
paralleltransform
extract_inventorysystem
paralleltransform
transformsystem
sequentialquality_gate
quality_gatesystem
conditionalload_warehouse
fallbacknotify_team
load_warehousedb
sequentialnotify_team
notify_teamapi
ex-sub-workflow-composition.osop.yaml
osop_version: "1.1"
id: "sub-workflow-composition"
name:"使用子工作流程的模組化管線"
description:"展示 v1.1 子工作流程功能——以可重用的建構單元組合複雜管線。"
tags: [v1.1, sub-workflow, composition, data-pipeline]

imports:
  - "./etl-extract.osop"
  - "./etl-transform.osop"
  - "./quality-check.osop"

timeout_sec: 7200  # 2 hour global deadline

nodes:
  - id: "extract_sales"
    type: "system"
    purpose: "Extract sales data from source systems."
    workflow_ref: "etl-extract"
    workflow_inputs:
      source: "salesforce"
      date_range: "${inputs.date_range}"

  - id: "extract_inventory"
    type: "system"
    purpose: "Extract inventory data from warehouse system."
    workflow_ref: "etl-extract"
    workflow_inputs:
      source: "warehouse_api"
      date_range: "${inputs.date_range}"

  - id: "transform"
    type: "system"
    purpose: "Clean and normalize extracted data."
    workflow_ref: "etl-transform"
    workflow_inputs:
      sales_data: "${outputs.extract_sales.data}"
      inventory_data: "${outputs.extract_inventory.data}"

  - id: "quality_gate"
    type: "system"
    purpose: "Run data quality checks."
    workflow_ref: "quality-check"
    workflow_inputs:
      dataset: "${outputs.transform.result}"
      rules: "strict"

  - id: "load_warehouse"
    type: "db"
    purpose: "Load validated data into analytics warehouse."
    runtime:
      engine: "bigquery"
      table: "analytics.daily_report"

  - id: "notify_team"
    type: "api"
    purpose: "Send completion notification to Slack."
    runtime:
      endpoint: "https://hooks.slack.com/services/..."
      method: "POST"

edges:
  - from: "extract_sales"
    to: "transform"
    mode: "parallel"
  - from: "extract_inventory"
    to: "transform"
    mode: "parallel"
    join_mode: "wait_all"
  - from: "transform"
    to: "quality_gate"
    mode: "sequential"
  - from: "quality_gate"
    to: "load_warehouse"
    mode: "conditional"
    when: "outputs.quality_gate.passed == true"
  - from: "load_warehouse"
    to: "notify_team"
    mode: "sequential"
  - from: "quality_gate"
    to: "notify_team"
    mode: "fallback"
    label: "Alert team on quality failure"