Home
cd ../playbooks
Data & ReportingAdvanced

Data Pipeline Builder

Data pipeline and ETL automation - extract, transform, load workflows for data integration and analytics

10 minutes
By communitySource
#etl#data-pipeline#integration#analytics#automation

Your data lives in 5 different systems and getting it into one place for analysis means a brittle chain of manual exports, Python scripts, and cron jobs that break every other week.

Who it's for: data engineers building integration workflows, analytics teams connecting data sources, startups setting up their first data infrastructure, BI teams automating report pipelines, companies migrating from manual data processes

Example

"Build an ETL pipeline from our Stripe and HubSpot data into PostgreSQL" → Extract, transform, load workflows with scheduling, error handling, data validation, and incremental loading — based on proven n8n workflow patterns

CLAUDE.md Template

New here? 3-minute setup guide → | Already set up? Copy the template below.

# Data Pipeline

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.

## Overview

This workflow covers:
- Data extraction from multiple sources
- Transformation and cleaning
- Loading to destinations
- Scheduling and monitoring
- Error handling and alerts

---

## ETL Patterns

### Basic ETL Flow

```
┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   EXTRACT   │───▶│  TRANSFORM  │───▶│    LOAD     │
│             │    │             │    │             │
│ • APIs      │    │ • Clean     │    │ • Database  │
│ • Databases │    │ • Map       │    │ • Warehouse │
│ • Files     │    │ • Aggregate │    │ • Files     │
│ • Webhooks  │    │ • Enrich    │    │ • APIs      │
└─────────────┘    └─────────────┘    └─────────────┘
```

### n8n ETL Workflow

```yaml
workflow: "Daily Sales ETL"
schedule: "2am daily"

nodes:
  # EXTRACT
  - name: "Extract from Shopify"
    type: shopify
    action: get_orders
    filter: created_at >= yesterday
    
  - name: "Extract from Stripe"
    type: stripe
    action: get_payments
    filter: created >= yesterday
    
  # TRANSFORM
  - name: "Merge Data"
    type: merge
    mode: combine_by_key
    key: order_id
    
  - name: "Transform"
    type: code
    code: |
      return items.map(item => ({
        date: item.created_at.split('T')[0],
        order_id: item.id,
        customer_email: item.email,
        total: parseFloat(item.total_price),
        currency: item.currency,
        items: item.line_items.length,
        source: item.source_name,
        payment_status: item.payment.status
      }));
      
  # LOAD
  - name: "Load to BigQuery"
    type: google_bigquery
    action: insert_rows
    table: sales_daily
    
  - name: "Update Google Sheets"
    type: google_sheets
    action: append_rows
    spreadsheet: "Daily Sales Report"
```

---

## Data Sources

### Common Extractors

```yaml
extractors:
  databases:
    - postgresql:
        connection: connection_string
        query: "SELECT * FROM orders WHERE date >= $1"
        
    - mysql:
        connection: connection_string
        query: custom_sql
        
    - mongodb:
        connection: connection_string
        collection: orders
        filter: {date: {$gte: yesterday}}
        
  apis:
    - rest_api:
        url: "https://api.example.com/data"
        method: GET
        headers: {Authorization: "Bearer {token}"}
        pagination: handle_automatically
        
    - graphql:
        url: "https://api.example.com/graphql"
        query: graphql_query
        
  files:
    - csv:
        source: sftp/s3/google_drive
        delimiter: ","
        encoding: utf-8
        
    - excel:
        source: file_path
        sheet: "Sheet1"
        
    - json:
        source: api/file
        path: "data.items"
        
  saas:
    - salesforce: get_objects
    - hubspot: get_contacts/deals
    - stripe: get_charges
    - shopify: get_orders
```

---

## Transformations

### Common Transformations

```yaml
transformations:
  cleaning:
    - remove_nulls: drop_or_fill
    - trim_whitespace: all_string_fields
    - deduplicate: by_key
    - validate: against_schema
    
  mapping:
    - rename_fields: {old_name: new_name}
    - convert_types: {date_string: date}
    - map_values: {status_code: status_name}
    
  aggregation:
    - group_by: [date, category]
    - sum: [revenue, quantity]
    - count: orders
    - average: order_value
    
  enrichment:
    - lookup: from_reference_table
    - geocode: from_address
    - calculate: derived_fields
    
  filtering:
    - where: condition
    - limit: n_rows
    - sample: percentage
```

### Code Transform Examples

```javascript
// Clean and normalize data
function transform(items) {
  return items.map(item => ({
    // Clean strings
    name: item.name?.trim().toLowerCase(),
    
    // Parse dates
    date: new Date(item.created_at).toISOString().split('T')[0],
    
    // Convert types
    amount: parseFloat(item.amount) || 0,
    
    // Map values
    status: statusMap[item.status_code] || 'unknown',
    
    // Calculate fields
    total: item.quantity * item.unit_price,
    
    // Filter nested
    tags: item.tags?.filter(t => t.active).map(t => t.name),
    
    // Default values
    source: item.source || 'direct'
  }));
}

// Aggregate data
function aggregate(items) {
  const grouped = {};
  
  items.forEach(item => {
    const key = `${item.date}_${item.category}`;
    if (!grouped[key]) {
      grouped[key] = {
        date: item.date,
        category: item.category,
        total_revenue: 0,
        order_count: 0
      };
    }
    grouped[key].total_revenue += item.amount;
    grouped[key].order_count += 1;
  });
  
  return Object.values(grouped);
}
```

---

## Data Destinations

### Common Loaders

```yaml
loaders:
  data_warehouses:
    - bigquery:
        project: project_id
        dataset: analytics
        table: sales
        write_mode: append/truncate
        
    - snowflake:
        account: account_id
        warehouse: compute_wh
        database: analytics
        schema: public
        
    - redshift:
        cluster: cluster_id
        database: analytics
        
  databases:
    - postgresql:
        upsert: on_conflict_update
        
    - mysql:
        batch_insert: 1000_rows
        
  files:
    - s3:
        bucket: data-lake
        path: /processed/{date}/
        format: parquet
        
    - google_cloud_storage:
        bucket: data-bucket
        
  spreadsheets:
    - google_sheets:
        mode: append/overwrite
        
    - airtable:
        base: base_id
        table: table_name
        
  apis:
    - webhook:
        url: destination_url
        batch_size: 100
```

---

## Scheduling & Monitoring

### Pipeline Scheduling

```yaml
scheduling:
  patterns:
    hourly:
      cron: "0 * * * *"
      use_for: real_time_dashboards
      
    daily:
      cron: "0 2 * * *"
      use_for: daily_reports
      
    weekly:
      cron: "0 3 * * 1"
      use_for: weekly_summaries
      
    on_demand:
      trigger: webhook/manual
      use_for: ad_hoc_analysis
      
  dependencies:
    - pipeline_a: must_complete_before pipeline_b
    - wait_for: all_extracts_complete
    
  retries:
    max_attempts: 3
    delay: exponential_backoff
    alert_on: final_failure
```

### Monitoring & Alerts

```yaml
monitoring:
  metrics:
    - rows_processed
    - execution_time
    - error_count
    - data_freshness
    
  alerts:
    pipeline_failed:
      channels: [slack, pagerduty]
      template: |
        🚨 *Pipeline Failed*
        
        Pipeline: {pipeline_name}
        Stage: {failed_stage}
        Error: {error_message}
        
        [View Logs]({logs_url})
        
    data_quality:
      trigger: anomaly_detected
      conditions:
        - row_count: differs_by > 50%
        - null_rate: exceeds_threshold
        - schema: changed_unexpectedly
        
    stale_data:
      trigger: last_update > threshold
      threshold: 2_hours
```

---

## Data Quality

### Quality Checks

```yaml
data_quality:
  schema_validation:
    - required_fields: [id, date, amount]
    - field_types:
        id: integer
        date: date
        amount: number
    - allowed_values:
        status: [active, pending, closed]
        
  statistical_checks:
    - null_rate: < 5%
    - duplicate_rate: < 1%
    - value_range:
        amount: [0, 1000000]
        
  business_rules:
    - total_equals_sum_of_line_items
    - dates_are_not_in_future
    - email_format_valid
    
  trend_analysis:
    - row_count: within_2_std_of_mean
    - total_value: within_expected_range
```

---

## Output Example

**Request**: "Create a daily sales data pipeline"

**Output**:

```markdown
# Daily Sales Data Pipeline

## Pipeline Overview

```
Shopify + Stripe → Transform → BigQuery + Sheets
```

## Schedule
- Runs: 2am daily
- Timezone: UTC
- Retry: 3 attempts

## Extract

### Shopify Orders
```yaml
source: shopify
filter: created_at >= yesterday
fields: [id, email, total_price, line_items, created_at]
```

### Stripe Payments
```yaml
source: stripe
filter: created >= yesterday
fields: [id, amount, status, metadata.order_id]
```

## Transform

```javascript
// Join and clean data
{
  date: order.created_at.split('T')[0],
  order_id: order.id,
  customer: order.email,
  revenue: parseFloat(order.total_price),
  items: order.line_items.length,
  payment_status: payment.status
}
```

## Load

### BigQuery
- Table: `analytics.sales_daily`
- Mode: Append

### Google Sheets
- Sheet: "Daily Sales Dashboard"
- Tab: "Raw Data"

## Quality Checks
- [ ] Row count > 0
- [ ] No null order_ids
- [ ] Revenue sum matches Stripe

## Alerts
- Slack: #data-alerts
- On failure: @data-team
```

---

*Data Pipeline Workflow - Part of Claude Code*
README.md

What This Does

Build data pipelines and ETL workflows for data integration, transformation, and analytics automation. Based on n8n's data workflow templates.


Quick Start

Step 1: Create a Project Folder

mkdir -p ~/Documents/DataPipeline

Step 2: Download the Template

Click Download above, then:

mv ~/Downloads/CLAUDE.md ~/Documents/DataPipeline/

Step 3: Start Working

cd ~/Documents/DataPipeline
claude

$Related Playbooks