devbook
Backend & Infrastructure

Data Engineering

Building scalable data pipelines, ETL processes, and analytics infrastructure

Data Engineering

Data engineering focuses on building systems and infrastructure for collecting, storing, and analyzing data at scale.

Core Concepts

ETL vs ELT

ETL (Extract, Transform, Load)

// Traditional ETL: Transform before loading
async function etlPipeline() {
  // 1. Extract
  const rawData = await extractFromSource()
  
  // 2. Transform
  const transformedData = transform(rawData)
  
  // 3. Load
  await loadToWarehouse(transformedData)
}

function transform(data: RawData[]): TransformedData[] {
  return data.map(row => ({
    id: row.user_id,
    name: `${row.first_name} ${row.last_name}`,
    email: row.email.toLowerCase(),
    createdAt: new Date(row.created_at),
    // Data cleaning and transformation
    revenue: parseFloat(row.revenue) || 0
  }))
}

ELT (Extract, Load, Transform)

-- Modern ELT: Load raw data, transform in warehouse
-- 1. Extract & Load raw data to warehouse
CREATE TABLE raw_users AS
SELECT * FROM source_database.users;

-- 2. Transform using SQL/dbt
CREATE TABLE transformed_users AS
SELECT
  user_id as id,
  CONCAT(first_name, ' ', last_name) as name,
  LOWER(email) as email,
  CAST(created_at AS TIMESTAMP) as created_at,
  COALESCE(CAST(revenue AS FLOAT), 0) as revenue
FROM raw_users
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days';

Data Pipeline Architecture

Batch Processing

// Batch processing pipeline
import { S3, Glue, Athena } from 'aws-sdk'

class BatchPipeline {
  async processDailyData(date: string) {
    // 1. Extract data from source
    const rawData = await this.extractFromDatabase(date)
    
    // 2. Store in data lake
    await this.storeInS3(rawData, `raw/${date}/data.parquet`)
    
    // 3. Transform with Spark/Glue
    await this.runGlueJob({
      name: 'transform-user-data',
      arguments: {
        '--input': `s3://bucket/raw/${date}/`,
        '--output': `s3://bucket/processed/${date}/`
      }
    })
    
    // 4. Update data catalog
    await this.updateGlueCatalog('processed_users')
    
    // 5. Run analytics queries
    await this.runAthenaQuery(`
      SELECT date, COUNT(*) as user_count
      FROM processed_users
      WHERE date = '${date}'
      GROUP BY date
    `)
  }
}

Stream Processing

// Stream processing with Kafka
import { Kafka } from 'kafkajs'

class StreamPipeline {
  private kafka: Kafka
  
  async processRealTimeEvents() {
    const consumer = this.kafka.consumer({ groupId: 'analytics' })
    
    await consumer.connect()
    await consumer.subscribe({ topic: 'user-events' })
    
    await consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const event = JSON.parse(message.value.toString())
        
        // Transform in real-time
        const transformed = await this.transform(event)
        
        // Write to multiple sinks
        await Promise.all([
          this.writeToDataWarehouse(transformed),
          this.updateMetrics(transformed),
          this.triggerAlerts(transformed)
        ])
      }
    })
  }
  
  private async transform(event: RawEvent): Promise<TransformedEvent> {
    return {
      userId: event.user_id,
      eventType: event.event_type,
      timestamp: new Date(event.timestamp),
      properties: this.enrichProperties(event.properties),
      sessionId: await this.getSessionId(event.user_id)
    }
  }
}

Data Modeling

Star Schema

-- Fact table: Events/Transactions
CREATE TABLE fact_orders (
  order_id BIGINT PRIMARY KEY,
  user_id BIGINT,
  product_id BIGINT,
  date_id INT,
  quantity INT,
  revenue DECIMAL(10,2),
  
  -- Foreign keys to dimension tables
  FOREIGN KEY (user_id) REFERENCES dim_users(user_id),
  FOREIGN KEY (product_id) REFERENCES dim_products(product_id),
  FOREIGN KEY (date_id) REFERENCES dim_date(date_id)
);

-- Dimension table: Users
CREATE TABLE dim_users (
  user_id BIGINT PRIMARY KEY,
  email VARCHAR(255),
  name VARCHAR(255),
  country VARCHAR(100),
  subscription_tier VARCHAR(50)
);

-- Dimension table: Products
CREATE TABLE dim_products (
  product_id BIGINT PRIMARY KEY,
  name VARCHAR(255),
  category VARCHAR(100),
  price DECIMAL(10,2)
);

-- Dimension table: Date
CREATE TABLE dim_date (
  date_id INT PRIMARY KEY,
  date DATE,
  day_of_week VARCHAR(20),
  month VARCHAR(20),
  quarter INT,
  year INT,
  is_weekend BOOLEAN
);

Slowly Changing Dimensions (SCD)

-- Type 2 SCD: Track history with effective dates
CREATE TABLE dim_users_scd (
  user_sk BIGINT PRIMARY KEY, -- Surrogate key
  user_id BIGINT, -- Natural key
  email VARCHAR(255),
  name VARCHAR(255),
  subscription_tier VARCHAR(50),
  effective_date DATE,
  end_date DATE,
  is_current BOOLEAN
);

-- Insert new record on change
INSERT INTO dim_users_scd (
  user_sk, user_id, email, name, subscription_tier,
  effective_date, end_date, is_current
)
VALUES (
  2, 123, 'user@example.com', 'John Doe', 'premium',
  '2024-01-15', '9999-12-31', true
);

-- Update previous record
UPDATE dim_users_scd
SET end_date = '2024-01-14', is_current = false
WHERE user_id = 123 AND is_current = true;

Data Quality

Data Validation

import { z } from 'zod'

// Define schema
const UserSchema = z.object({
  id: z.number().positive(),
  email: z.string().email(),
  age: z.number().min(0).max(120),
  createdAt: z.date(),
  revenue: z.number().nonnegative()
})

// Validation pipeline
class DataQualityPipeline {
  async validateBatch(data: unknown[]) {
    const results = {
      valid: [] as any[],
      invalid: [] as any[],
      errors: [] as any[]
    }
    
    for (const row of data) {
      try {
        const validated = UserSchema.parse(row)
        results.valid.push(validated)
      } catch (error) {
        results.invalid.push(row)
        results.errors.push({
          row,
          error: error.message
        })
      }
    }
    
    // Alert if validation rate is too low
    const validationRate = results.valid.length / data.length
    if (validationRate < 0.95) {
      await this.alertDataQualityIssue({
        validationRate,
        errors: results.errors
      })
    }
    
    return results
  }
  
  // Data completeness check
  async checkCompleteness(tableName: string) {
    const query = `
      SELECT 
        COUNT(*) as total_rows,
        COUNT(email) as non_null_emails,
        COUNT(phone) as non_null_phones,
        COUNT(*) - COUNT(email) as missing_emails
      FROM ${tableName}
    `
    
    const result = await this.runQuery(query)
    
    return {
      completeness: {
        email: result.non_null_emails / result.total_rows,
        phone: result.non_null_phones / result.total_rows
      },
      missingData: {
        emails: result.missing_emails
      }
    }
  }
}

Data Tests

-- dbt data tests
-- tests/assert_unique_user_id.sql
SELECT user_id, COUNT(*) as count
FROM {{ ref('users') }}
GROUP BY user_id
HAVING COUNT(*) > 1

-- tests/assert_valid_email.sql
SELECT *
FROM {{ ref('users') }}
WHERE email NOT LIKE '%@%.%'

-- tests/assert_revenue_positive.sql
SELECT *
FROM {{ ref('orders') }}
WHERE revenue < 0

-- tests/assert_recent_data.sql
SELECT MAX(created_at) as last_update
FROM {{ ref('users') }}
HAVING DATEDIFF(CURRENT_DATE, MAX(created_at)) > 1

Data Warehousing

Snowflake Example

-- Create warehouse for compute
CREATE WAREHOUSE analytics_wh
  WITH WAREHOUSE_SIZE = 'MEDIUM'
  AUTO_SUSPEND = 300
  AUTO_RESUME = TRUE;

-- Create database and schema
CREATE DATABASE analytics;
CREATE SCHEMA analytics.staging;
CREATE SCHEMA analytics.production;

-- Load data from S3
COPY INTO analytics.staging.users
FROM 's3://my-bucket/users/'
CREDENTIALS = (AWS_KEY_ID='xxx' AWS_SECRET_KEY='xxx')
FILE_FORMAT = (TYPE = PARQUET);

-- Transform and load to production
CREATE OR REPLACE TABLE analytics.production.users AS
SELECT
  user_id,
  email,
  CONCAT(first_name, ' ', last_name) as full_name,
  DATE_TRUNC('day', created_at) as signup_date,
  country,
  subscription_tier
FROM analytics.staging.users
WHERE created_at >= DATEADD(day, -30, CURRENT_DATE);

-- Create materialized view for analytics
CREATE MATERIALIZED VIEW analytics.production.daily_metrics AS
SELECT
  DATE_TRUNC('day', order_date) as date,
  COUNT(DISTINCT user_id) as active_users,
  COUNT(*) as total_orders,
  SUM(revenue) as total_revenue,
  AVG(revenue) as avg_order_value
FROM analytics.production.orders
GROUP BY DATE_TRUNC('day', order_date);

Data Pipeline Orchestration

Apache Airflow

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True
}

with DAG(
    'daily_etl_pipeline',
    default_args=default_args,
    description='Daily ETL pipeline',
    schedule_interval='0 2 * * *',  # 2 AM daily
    start_date=datetime(2024, 1, 1),
    catchup=False
) as dag:
    
    # Extract from source
    extract_task = PythonOperator(
        task_id='extract_data',
        python_callable=extract_from_source
    )
    
    # Transform data
    transform_task = PythonOperator(
        task_id='transform_data',
        python_callable=transform_data
    )
    
    # Load to warehouse
    load_task = SQLExecuteQueryOperator(
        task_id='load_to_warehouse',
        sql='load_data.sql'
    )
    
    # Run dbt models
    dbt_task = PythonOperator(
        task_id='run_dbt_models',
        python_callable=run_dbt
    )
    
    # Data quality checks
    quality_task = PythonOperator(
        task_id='data_quality_checks',
        python_callable=run_quality_checks
    )
    
    # Update metrics
    metrics_task = PythonOperator(
        task_id='update_metrics',
        python_callable=update_dashboard_metrics
    )
    
    # Define dependencies
    extract_task >> transform_task >> load_task >> dbt_task >> quality_task >> metrics_task

Prefect

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta

@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(date: str):
    # Extract logic
    return data

@task
def transform_data(data):
    # Transform logic
    return transformed

@task
def load_data(data):
    # Load logic
    pass

@flow(name="etl-pipeline")
def etl_pipeline(date: str):
    raw_data = extract_data(date)
    transformed_data = transform_data(raw_data)
    load_data(transformed_data)

if __name__ == "__main__":
    etl_pipeline("2024-01-15")

Data Processing

PySpark

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, count, date_format

spark = SparkSession.builder \
    .appName("DataProcessing") \
    .getOrCreate()

# Read from data lake
df = spark.read.parquet("s3://bucket/raw/users/")

# Transform
transformed = df \
    .filter(col("created_at") >= "2024-01-01") \
    .withColumn("full_name", 
                concat(col("first_name"), lit(" "), col("last_name"))) \
    .withColumn("revenue_category",
                when(col("revenue") > 1000, "high")
                .when(col("revenue") > 100, "medium")
                .otherwise("low")) \
    .select("user_id", "full_name", "email", "revenue_category")

# Aggregate
daily_stats = transformed \
    .groupBy(date_format("created_at", "yyyy-MM-dd").alias("date")) \
    .agg(
        count("*").alias("user_count"),
        avg("revenue").alias("avg_revenue")
    )

# Write to data warehouse
daily_stats.write \
    .mode("overwrite") \
    .partitionBy("date") \
    .parquet("s3://bucket/processed/daily_stats/")

Real-time Analytics

Kafka Streams

import { Kafka } from 'kafkajs'

class RealTimeAnalytics {
  private kafka: Kafka
  
  async aggregateEvents() {
    const consumer = this.kafka.consumer({ groupId: 'analytics' })
    const producer = this.kafka.producer()
    
    // Windowed aggregation
    const windowSize = 60000 // 1 minute
    const windows = new Map<string, WindowMetrics>()
    
    await consumer.subscribe({ topic: 'user-events' })
    
    await consumer.run({
      eachMessage: async ({ message }) => {
        const event = JSON.parse(message.value.toString())
        const windowKey = this.getWindowKey(event.timestamp, windowSize)
        
        // Update window metrics
        if (!windows.has(windowKey)) {
          windows.set(windowKey, {
            start: new Date(windowKey),
            eventCount: 0,
            uniqueUsers: new Set(),
            revenue: 0
          })
        }
        
        const window = windows.get(windowKey)!
        window.eventCount++
        window.uniqueUsers.add(event.userId)
        window.revenue += event.revenue || 0
        
        // Emit aggregated metrics
        await producer.send({
          topic: 'analytics-metrics',
          messages: [{
            key: windowKey,
            value: JSON.stringify({
              window: windowKey,
              eventCount: window.eventCount,
              uniqueUsers: window.uniqueUsers.size,
              totalRevenue: window.revenue,
              avgRevenue: window.revenue / window.eventCount
            })
          }]
        })
      }
    })
  }
  
  private getWindowKey(timestamp: number, windowSize: number): string {
    const windowStart = Math.floor(timestamp / windowSize) * windowSize
    return new Date(windowStart).toISOString()
  }
}

Data Catalog & Governance

Metadata Management

interface DataCatalog {
  tables: DataTable[]
  datasets: Dataset[]
  pipelines: Pipeline[]
}

interface DataTable {
  name: string
  database: string
  schema: Column[]
  owner: string
  description: string
  tags: string[]
  lineage: {
    upstream: string[]
    downstream: string[]
  }
  quality: {
    completeness: number
    freshness: number
    accuracy: number
  }
}

// Catalog service
class CatalogService {
  async registerTable(table: DataTable) {
    await this.db.tables.create(table)
    await this.indexForSearch(table)
    await this.trackLineage(table)
  }
  
  async searchTables(query: string): Promise<DataTable[]> {
    return this.db.tables.search({
      where: {
        OR: [
          { name: { contains: query } },
          { description: { contains: query } },
          { tags: { has: query } }
        ]
      }
    })
  }
  
  async getLineage(tableName: string) {
    const table = await this.db.tables.findUnique({
      where: { name: tableName },
      include: { lineage: true }
    })
    
    return {
      upstream: await this.getUpstreamTables(table),
      downstream: await this.getDownstreamTables(table)
    }
  }
}

Performance Optimization

Partitioning

-- Time-based partitioning
CREATE TABLE events (
  event_id BIGINT,
  user_id BIGINT,
  event_type VARCHAR(50),
  event_date DATE,
  properties JSON
)
PARTITION BY RANGE (event_date) (
  PARTITION p_2024_01 VALUES LESS THAN ('2024-02-01'),
  PARTITION p_2024_02 VALUES LESS THAN ('2024-03-01'),
  PARTITION p_2024_03 VALUES LESS THAN ('2024-04-01')
);

-- Query with partition pruning
SELECT *
FROM events
WHERE event_date >= '2024-02-01'
  AND event_date < '2024-03-01';

Indexing

-- B-tree index for exact matches
CREATE INDEX idx_user_email ON users(email);

-- Hash index for equality
CREATE INDEX idx_user_id_hash ON users USING HASH(user_id);

-- Composite index
CREATE INDEX idx_orders_user_date ON orders(user_id, order_date);

-- Covering index
CREATE INDEX idx_users_covering ON users(user_id) INCLUDE (email, name);

Data Pipeline Monitoring

class PipelineMonitor {
  async trackPipelineRun(pipeline: string, run: PipelineRun) {
    // Record metrics
    await this.recordMetrics({
      pipeline,
      duration: run.duration,
      rowsProcessed: run.rowsProcessed,
      status: run.status,
      timestamp: run.timestamp
    })
    
    // Check SLAs
    if (run.duration > this.getSLA(pipeline).maxDuration) {
      await this.alertSlaBreach(pipeline, run)
    }
    
    // Data quality alerts
    if (run.qualityScore < 0.95) {
      await this.alertQualityIssue(pipeline, run)
    }
    
    // Freshness check
    const lastRun = await this.getLastSuccessfulRun(pipeline)
    const timeSinceLastRun = Date.now() - lastRun.timestamp
    
    if (timeSinceLastRun > 3600000) { // 1 hour
      await this.alertStaleness(pipeline)
    }
  }
}

Best Practices

Do

✅ Version control your data transformations ✅ Implement data quality checks ✅ Document data lineage ✅ Use idempotent pipelines ✅ Partition large tables ✅ Monitor pipeline performance ✅ Implement proper error handling ✅ Use incremental processing when possible

Don't

❌ Hardcode credentials ❌ Skip data validation ❌ Ignore data quality issues ❌ Process everything in one pipeline ❌ Forget about data retention ❌ Ignore pipeline failures ❌ Skip documentation ❌ Overload single nodes

Tools & Technologies

Storage

  • Data Lakes: S3, Azure Data Lake, Google Cloud Storage
  • Data Warehouses: Snowflake, BigQuery, Redshift
  • Databases: PostgreSQL, MySQL, MongoDB

Processing

  • Batch: Apache Spark, dbt, Apache Beam
  • Streaming: Kafka, Flink, Kinesis

Orchestration

  • Workflow: Apache Airflow, Prefect, Dagster
  • Serverless: AWS Step Functions, Azure Data Factory

Quality & Governance

  • Testing: Great Expectations, dbt tests
  • Catalog: Apache Atlas, DataHub, Amundsen
  • Lineage: OpenLineage, Marquez

Monitoring

  • Observability: DataDog, Monte Carlo, Soda
  • Alerting: PagerDuty, Slack, Email