Developer ToolsAdvanced
ETL Pipeline Builder
Design and automate Extract, Transform, Load data pipelines for data integration and analytics
#etl#data-pipeline#integration#analytics#automation
CLAUDE.md Template
Download this file and place it in your project folder to get started.
# ETL Pipeline
Comprehensive workflow for designing and automating Extract, Transform, Load data pipelines.
## Pipeline Architecture
### Core ETL Flow
```
DATA PIPELINE ARCHITECTURE:
┌─────────────────────────────────────────────────────────┐
│ EXTRACT │
├─────────┬─────────┬─────────┬─────────┬─────────────────┤
│ Postgres│ MySQL │ MongoDB │ APIs │ Files (CSV/JSON)│
└────┬────┴────┬────┴────┬────┴────┬────┴────────┬────────┘
│ │ │ │ │
└─────────┴─────────┴────┬────┴──────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ TRANSFORM │
│ • Clean & Validate • Aggregate & Join │
│ • Normalize • Calculate Metrics │
│ • Deduplicate • Apply Business Rules │
└────────────────────────┬────────────────────────────────┘
▼
┌─────────────────────────────────────────────────────────┐
│ LOAD │
├─────────────┬─────────────┬─────────────┬───────────────┤
│ BigQuery │ Snowflake │ Redshift │ Data Lake │
└─────────────┴─────────────┴─────────────┴───────────────┘
```
## Source Connectors
### Database Connections
```yaml
sources:
postgres:
type: postgresql
host: db.example.com
port: 5432
database: production
ssl: true
extraction:
method: incremental
key: updated_at
batch_size: 10000
mysql:
type: mysql
host: mysql.example.com
port: 3306
database: analytics
extraction:
method: cdc
binlog: true
mongodb:
type: mongodb
connection_string: mongodb+srv://...
database: app_data
extraction:
method: change_streams
```
### API Sources
```yaml
api_sources:
stripe:
type: rest_api
base_url: https://api.stripe.com/v1
auth: bearer_token
endpoints:
- /charges
- /customers
- /subscriptions
pagination: cursor
rate_limit: 100/second
salesforce:
type: salesforce
instance_url: https://company.salesforce.com
auth: oauth2
objects:
- Account
- Opportunity
- Contact
bulk_api: true
```
## Transformation Layer
### Common Transformations
```python
# Data Cleaning
transformations = {
"clean_nulls": {
"operation": "fill_null",
"columns": ["email", "phone"],
"value": "unknown"
},
"standardize_dates": {
"operation": "date_parse",
"columns": ["created_at", "updated_at"],
"format": "ISO8601"
},
"normalize_currency": {
"operation": "convert_currency",
"source_column": "amount",
"currency_column": "currency",
"target": "USD"
},
"deduplicate": {
"operation": "distinct",
"key_columns": ["customer_id", "transaction_id"],
"keep": "latest"
}
}
```
### Aggregation Rules
```sql
-- Daily Revenue Aggregation
SELECT
DATE(created_at) as date,
product_category,
COUNT(*) as transactions,
SUM(amount) as total_revenue,
AVG(amount) as avg_order_value,
COUNT(DISTINCT customer_id) as unique_customers
FROM orders
WHERE created_at >= '${start_date}'
GROUP BY 1, 2
```
### Join Operations
```yaml
joins:
- name: enrich_orders
left: orders
right: customers
type: left
on:
- left: customer_id
right: id
select:
- orders.*
- customers.email
- customers.segment
- customers.lifetime_value
- name: add_product_details
left: enriched_orders
right: products
type: left
on:
- left: product_id
right: id
```
## Load Strategies
### BigQuery Load
```yaml
bigquery_load:
project: my-project
dataset: analytics
table: fact_orders
schema:
- name: order_id
type: STRING
mode: REQUIRED
- name: customer_id
type: STRING
- name: amount
type: NUMERIC
- name: created_at
type: TIMESTAMP
load_config:
write_disposition: WRITE_APPEND
create_disposition: CREATE_IF_NEEDED
clustering_fields: [customer_id]
time_partitioning:
field: created_at
type: DAY
```
### Snowflake Load
```yaml
snowflake_load:
warehouse: ETL_WH
database: ANALYTICS
schema: PUBLIC
table: FACT_ORDERS
staging:
stage: '@MY_STAGE'
file_format: JSON
copy_options:
on_error: CONTINUE
purge: true
match_by_column_name: CASE_INSENSITIVE
```
## Pipeline Orchestration
### DAG Definition
```yaml
pipeline:
name: daily_analytics_etl
schedule: "0 2 * * *" # 2 AM daily
tasks:
- id: extract_orders
type: extract
source: postgres
query: "SELECT * FROM orders WHERE date = '${execution_date}'"
- id: extract_customers
type: extract
source: postgres
query: "SELECT * FROM customers"
- id: transform_data
type: transform
dependencies: [extract_orders, extract_customers]
operations:
- join_customers
- calculate_metrics
- apply_business_rules
- id: load_warehouse
type: load
dependencies: [transform_data]
target: bigquery
table: fact_orders
- id: notify_complete
type: notification
dependencies: [load_warehouse]
channel: slack
message: "Daily ETL completed successfully"
```
### Error Handling
```yaml
error_handling:
retry:
max_attempts: 3
delay_seconds: 300
exponential_backoff: true
on_failure:
- log_error
- send_alert
- save_failed_records
dead_letter:
enabled: true
destination: s3://etl-errors/
retention_days: 30
```
## Data Quality
### Validation Rules
```yaml
quality_checks:
- name: null_check
column: customer_id
rule: not_null
severity: error
- name: range_check
column: amount
rule: between
min: 0
max: 100000
severity: warning
- name: uniqueness
columns: [order_id]
rule: unique
severity: error
- name: referential_integrity
column: product_id
reference_table: products
reference_column: id
severity: error
- name: freshness
column: updated_at
rule: max_age_hours
value: 24
severity: warning
```
### Quality Metrics Dashboard
```
DATA QUALITY REPORT - ${date}
═══════════════════════════════════════
Total Records Processed: 1,250,000
Passed Validation: 1,247,500 (99.8%)
Failed Validation: 2,500 (0.2%)
ISSUES BY TYPE:
┌─────────────────┬────────┬──────────┐
│ Issue Type │ Count │ Severity │
├─────────────────┼────────┼──────────┤
│ Null values │ 1,200 │ Warning │
│ Invalid format │ 850 │ Error │
│ Out of range │ 300 │ Warning │
│ Duplicates │ 150 │ Error │
└─────────────────┴────────┴──────────┘
```
## Monitoring & Alerting
### Pipeline Metrics
```yaml
metrics:
- name: pipeline_duration
type: gauge
labels: [pipeline_name, status]
- name: records_processed
type: counter
labels: [pipeline_name, source, destination]
- name: error_count
type: counter
labels: [pipeline_name, error_type]
- name: data_freshness
type: gauge
labels: [table_name]
```
### Alert Configuration
```yaml
alerts:
- name: pipeline_failed
condition: status == 'failed'
channels: [pagerduty, slack]
- name: high_error_rate
condition: error_rate > 0.05
channels: [slack]
- name: slow_pipeline
condition: duration > 2 * avg_duration
channels: [slack]
- name: data_freshness
condition: freshness_hours > 24
channels: [email]
```
## Best Practices
1. **Incremental Loading**: Use incremental extraction when possible
2. **Idempotency**: Ensure pipelines can be re-run safely
3. **Partitioning**: Partition large tables by date
4. **Monitoring**: Track pipeline health metrics
5. **Documentation**: Document all transformations
6. **Testing**: Test with sample data before production
7. **Version Control**: Track pipeline changes in gitREADME.md
What This Does
Comprehensive workflow for designing and automating Extract, Transform, Load data pipelines.
Quick Start
Step 1: Create a Project Folder
mkdir -p ~/Documents/EtlPipeline
Step 2: Download the Template
Click Download above, then:
mv ~/Downloads/CLAUDE.md ~/Documents/EtlPipeline/
Step 3: Start Working
cd ~/Documents/EtlPipeline
claude
Best Practices
- Incremental Loading: Use incremental extraction when possible
- Idempotency: Ensure pipelines can be re-run safely
- Partitioning: Partition large tables by date
- Monitoring: Track pipeline health metrics
- Documentation: Document all transformations
- Testing: Test with sample data before production
- Version Control: Track pipeline changes in git