資料遷移工作流程
Data8 個節點 · 8 條連接data
視覺化
ex-data-migration.osop.yaml
# Data Migration Workflow
# Snapshot source, transform schema, load to target, validate, cutover
osop_version: "2.0"
id: data-migration
name: "資料遷移工作流程"
nodes:
- id: snapshot_source
type: db
purpose: Create consistent point-in-time snapshot of source database
runtime:
engine: postgresql
connection: postgresql://source:5432/production
outputs: [snapshot_id, snapshot_timestamp, table_count, row_counts]
timeout_sec: 3600
explain: |
Uses pg_dump with --serializable-deferrable for consistent snapshot
without blocking production writes. Snapshot stored in S3 staging bucket.
- id: schema_mapping
type: agent
purpose: Generate schema transformation mappings between source and target models
runtime:
provider: anthropic
model: claude-sonnet-4-20250514
config:
system_prompt: |
Compare source and target database schemas. Generate a complete
mapping of tables, columns, types, constraints, and default values.
Flag breaking changes and data loss risks.
inputs: [snapshot_id]
outputs: [schema_map, breaking_changes, data_loss_warnings]
timeout_sec: 60
- id: review_mappings
type: human
purpose: Data engineer reviews schema mappings and resolves ambiguous transformations
role: data_engineer
inputs: [schema_map, breaking_changes, data_loss_warnings]
outputs: [approved_mappings, custom_transforms]
approval_gate:
required_approvers: 1
timeout_min: 480
- id: transform_data
type: cli
purpose: Apply schema transformations and custom business logic to migrated data
runtime:
command: |
python migrate_transform.py \
--snapshot ${snapshot_id} \
--mappings approved-mappings.json \
--custom-transforms custom-transforms.py \
--output transformed/ \
--parallel-tables 8
inputs: [snapshot_id, approved_mappings, custom_transforms]
outputs: [transformed_tables, transform_errors, row_counts_transformed]
timeout_sec: 7200
retry_policy:
max_retries: 2
backoff_sec: 60
- id: load_target
type: db
purpose: Bulk load transformed data into target database with foreign key validation
runtime:
engine: postgresql
connection: postgresql://target:5432/production_v2
inputs: [transformed_tables]
outputs: [load_result, loaded_row_counts, load_errors]
timeout_sec: 7200
explain: |
Disables foreign keys during load for performance, then re-enables
and validates referential integrity after load completes.
- id: validate_migration
type: cli
purpose: Run comprehensive data validation comparing source snapshot to target
runtime:
command: |
python validate_migration.py \
--source-snapshot ${snapshot_id} \
--target-db postgresql://target:5432/production_v2 \
--checks row_counts,checksums,referential_integrity,business_rules \
--sample-rate 0.1 \
--output validation-report.json
inputs: [snapshot_id, loaded_row_counts]
outputs: [validation_passed, discrepancies, checksum_matches, validation_report]
timeout_sec: 3600
- id: cutover_approval
type: human
purpose: Technical lead and product owner approve production cutover
role: tech_lead
inputs: [validation_report, discrepancies, row_counts_transformed, loaded_row_counts]
outputs: [cutover_approved]
approval_gate:
required_approvers: 2
timeout_min: 120
- id: execute_cutover
type: cli
purpose: Switch application traffic from source to target database
runtime:
command: |
python cutover.py \
--source-dsn postgresql://source:5432/production \
--target-dsn postgresql://target:5432/production_v2 \
--update-dns \
--drain-connections \
--set-source-readonly
inputs: [cutover_approved]
outputs: [cutover_status, dns_updated, source_readonly]
timeout_sec: 300
security:
credentials: [DNS_API_KEY, SOURCE_DB_ADMIN, TARGET_DB_ADMIN]
edges:
- from: snapshot_source
to: schema_mapping
mode: sequential
- from: schema_mapping
to: review_mappings
mode: sequential
- from: review_mappings
to: transform_data
mode: sequential
- from: transform_data
to: load_target
mode: sequential
- from: load_target
to: validate_migration
mode: sequential
- from: validate_migration
to: cutover_approval
mode: conditional
condition: "validation_passed == true && discrepancies.length == 0"
- from: validate_migration
to: transform_data
mode: fallback
label: "Validation failed, fix transforms and reload"
- from: cutover_approval
to: execute_cutover
mode: sequential