資料遷移工作流程

Data
8 個節點 · 8 條連接data
ex-data-migration.osop.yaml
# Data Migration Workflow
# Snapshot source, transform schema, load to target, validate, cutover
osop_version: "2.0"
id: data-migration
name: "資料遷移工作流程"

nodes:
  - id: snapshot_source
    type: db
    purpose: Create consistent point-in-time snapshot of source database
    runtime:
      engine: postgresql
      connection: postgresql://source:5432/production
    outputs: [snapshot_id, snapshot_timestamp, table_count, row_counts]
    timeout_sec: 3600
    explain: |
      Uses pg_dump with --serializable-deferrable for consistent snapshot
      without blocking production writes. Snapshot stored in S3 staging bucket.

  - id: schema_mapping
    type: agent
    purpose: Generate schema transformation mappings between source and target models
    runtime:
      provider: anthropic
      model: claude-sonnet-4-20250514
      config:
        system_prompt: |
          Compare source and target database schemas. Generate a complete
          mapping of tables, columns, types, constraints, and default values.
          Flag breaking changes and data loss risks.
    inputs: [snapshot_id]
    outputs: [schema_map, breaking_changes, data_loss_warnings]
    timeout_sec: 60

  - id: review_mappings
    type: human
    purpose: Data engineer reviews schema mappings and resolves ambiguous transformations
    role: data_engineer
    inputs: [schema_map, breaking_changes, data_loss_warnings]
    outputs: [approved_mappings, custom_transforms]
    approval_gate:
      required_approvers: 1
      timeout_min: 480

  - id: transform_data
    type: cli
    purpose: Apply schema transformations and custom business logic to migrated data
    runtime:
      command: |
        python migrate_transform.py \
          --snapshot ${snapshot_id} \
          --mappings approved-mappings.json \
          --custom-transforms custom-transforms.py \
          --output transformed/ \
          --parallel-tables 8
    inputs: [snapshot_id, approved_mappings, custom_transforms]
    outputs: [transformed_tables, transform_errors, row_counts_transformed]
    timeout_sec: 7200
    retry_policy:
      max_retries: 2
      backoff_sec: 60

  - id: load_target
    type: db
    purpose: Bulk load transformed data into target database with foreign key validation
    runtime:
      engine: postgresql
      connection: postgresql://target:5432/production_v2
    inputs: [transformed_tables]
    outputs: [load_result, loaded_row_counts, load_errors]
    timeout_sec: 7200
    explain: |
      Disables foreign keys during load for performance, then re-enables
      and validates referential integrity after load completes.

  - id: validate_migration
    type: cli
    purpose: Run comprehensive data validation comparing source snapshot to target
    runtime:
      command: |
        python validate_migration.py \
          --source-snapshot ${snapshot_id} \
          --target-db postgresql://target:5432/production_v2 \
          --checks row_counts,checksums,referential_integrity,business_rules \
          --sample-rate 0.1 \
          --output validation-report.json
    inputs: [snapshot_id, loaded_row_counts]
    outputs: [validation_passed, discrepancies, checksum_matches, validation_report]
    timeout_sec: 3600

  - id: cutover_approval
    type: human
    purpose: Technical lead and product owner approve production cutover
    role: tech_lead
    inputs: [validation_report, discrepancies, row_counts_transformed, loaded_row_counts]
    outputs: [cutover_approved]
    approval_gate:
      required_approvers: 2
      timeout_min: 120

  - id: execute_cutover
    type: cli
    purpose: Switch application traffic from source to target database
    runtime:
      command: |
        python cutover.py \
          --source-dsn postgresql://source:5432/production \
          --target-dsn postgresql://target:5432/production_v2 \
          --update-dns \
          --drain-connections \
          --set-source-readonly
    inputs: [cutover_approved]
    outputs: [cutover_status, dns_updated, source_readonly]
    timeout_sec: 300
    security:
      credentials: [DNS_API_KEY, SOURCE_DB_ADMIN, TARGET_DB_ADMIN]

edges:
  - from: snapshot_source
    to: schema_mapping
    mode: sequential

  - from: schema_mapping
    to: review_mappings
    mode: sequential

  - from: review_mappings
    to: transform_data
    mode: sequential

  - from: transform_data
    to: load_target
    mode: sequential

  - from: load_target
    to: validate_migration
    mode: sequential

  - from: validate_migration
    to: cutover_approval
    mode: conditional
    condition: "validation_passed == true && discrepancies.length == 0"

  - from: validate_migration
    to: transform_data
    mode: fallback
    label: "Validation failed, fix transforms and reload"

  - from: cutover_approval
    to: execute_cutover
    mode: sequential