Backend & Infrastructure
Data Engineering
Building scalable data pipelines, ETL processes, and analytics infrastructure
Data Engineering
Data engineering focuses on building systems and infrastructure for collecting, storing, and analyzing data at scale.
Core Concepts
ETL vs ELT
ETL (Extract, Transform, Load)
// Traditional ETL: Transform before loading
async function etlPipeline() {
// 1. Extract
const rawData = await extractFromSource()
// 2. Transform
const transformedData = transform(rawData)
// 3. Load
await loadToWarehouse(transformedData)
}
function transform(data: RawData[]): TransformedData[] {
return data.map(row => ({
id: row.user_id,
name: `${row.first_name} ${row.last_name}`,
email: row.email.toLowerCase(),
createdAt: new Date(row.created_at),
// Data cleaning and transformation
revenue: parseFloat(row.revenue) || 0
}))
}
ELT (Extract, Load, Transform)
-- Modern ELT: Load raw data, transform in warehouse
-- 1. Extract & Load raw data to warehouse
CREATE TABLE raw_users AS
SELECT * FROM source_database.users;
-- 2. Transform using SQL/dbt
CREATE TABLE transformed_users AS
SELECT
user_id as id,
CONCAT(first_name, ' ', last_name) as name,
LOWER(email) as email,
CAST(created_at AS TIMESTAMP) as created_at,
COALESCE(CAST(revenue AS FLOAT), 0) as revenue
FROM raw_users
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days';
Data Pipeline Architecture
Batch Processing
// Batch processing pipeline
import { S3, Glue, Athena } from 'aws-sdk'
class BatchPipeline {
async processDailyData(date: string) {
// 1. Extract data from source
const rawData = await this.extractFromDatabase(date)
// 2. Store in data lake
await this.storeInS3(rawData, `raw/${date}/data.parquet`)
// 3. Transform with Spark/Glue
await this.runGlueJob({
name: 'transform-user-data',
arguments: {
'--input': `s3://bucket/raw/${date}/`,
'--output': `s3://bucket/processed/${date}/`
}
})
// 4. Update data catalog
await this.updateGlueCatalog('processed_users')
// 5. Run analytics queries
await this.runAthenaQuery(`
SELECT date, COUNT(*) as user_count
FROM processed_users
WHERE date = '${date}'
GROUP BY date
`)
}
}
Stream Processing
// Stream processing with Kafka
import { Kafka } from 'kafkajs'
class StreamPipeline {
private kafka: Kafka
async processRealTimeEvents() {
const consumer = this.kafka.consumer({ groupId: 'analytics' })
await consumer.connect()
await consumer.subscribe({ topic: 'user-events' })
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString())
// Transform in real-time
const transformed = await this.transform(event)
// Write to multiple sinks
await Promise.all([
this.writeToDataWarehouse(transformed),
this.updateMetrics(transformed),
this.triggerAlerts(transformed)
])
}
})
}
private async transform(event: RawEvent): Promise<TransformedEvent> {
return {
userId: event.user_id,
eventType: event.event_type,
timestamp: new Date(event.timestamp),
properties: this.enrichProperties(event.properties),
sessionId: await this.getSessionId(event.user_id)
}
}
}
Data Modeling
Star Schema
-- Fact table: Events/Transactions
CREATE TABLE fact_orders (
order_id BIGINT PRIMARY KEY,
user_id BIGINT,
product_id BIGINT,
date_id INT,
quantity INT,
revenue DECIMAL(10,2),
-- Foreign keys to dimension tables
FOREIGN KEY (user_id) REFERENCES dim_users(user_id),
FOREIGN KEY (product_id) REFERENCES dim_products(product_id),
FOREIGN KEY (date_id) REFERENCES dim_date(date_id)
);
-- Dimension table: Users
CREATE TABLE dim_users (
user_id BIGINT PRIMARY KEY,
email VARCHAR(255),
name VARCHAR(255),
country VARCHAR(100),
subscription_tier VARCHAR(50)
);
-- Dimension table: Products
CREATE TABLE dim_products (
product_id BIGINT PRIMARY KEY,
name VARCHAR(255),
category VARCHAR(100),
price DECIMAL(10,2)
);
-- Dimension table: Date
CREATE TABLE dim_date (
date_id INT PRIMARY KEY,
date DATE,
day_of_week VARCHAR(20),
month VARCHAR(20),
quarter INT,
year INT,
is_weekend BOOLEAN
);
Slowly Changing Dimensions (SCD)
-- Type 2 SCD: Track history with effective dates
CREATE TABLE dim_users_scd (
user_sk BIGINT PRIMARY KEY, -- Surrogate key
user_id BIGINT, -- Natural key
email VARCHAR(255),
name VARCHAR(255),
subscription_tier VARCHAR(50),
effective_date DATE,
end_date DATE,
is_current BOOLEAN
);
-- Insert new record on change
INSERT INTO dim_users_scd (
user_sk, user_id, email, name, subscription_tier,
effective_date, end_date, is_current
)
VALUES (
2, 123, 'user@example.com', 'John Doe', 'premium',
'2024-01-15', '9999-12-31', true
);
-- Update previous record
UPDATE dim_users_scd
SET end_date = '2024-01-14', is_current = false
WHERE user_id = 123 AND is_current = true;
Data Quality
Data Validation
import { z } from 'zod'
// Define schema
const UserSchema = z.object({
id: z.number().positive(),
email: z.string().email(),
age: z.number().min(0).max(120),
createdAt: z.date(),
revenue: z.number().nonnegative()
})
// Validation pipeline
class DataQualityPipeline {
async validateBatch(data: unknown[]) {
const results = {
valid: [] as any[],
invalid: [] as any[],
errors: [] as any[]
}
for (const row of data) {
try {
const validated = UserSchema.parse(row)
results.valid.push(validated)
} catch (error) {
results.invalid.push(row)
results.errors.push({
row,
error: error.message
})
}
}
// Alert if validation rate is too low
const validationRate = results.valid.length / data.length
if (validationRate < 0.95) {
await this.alertDataQualityIssue({
validationRate,
errors: results.errors
})
}
return results
}
// Data completeness check
async checkCompleteness(tableName: string) {
const query = `
SELECT
COUNT(*) as total_rows,
COUNT(email) as non_null_emails,
COUNT(phone) as non_null_phones,
COUNT(*) - COUNT(email) as missing_emails
FROM ${tableName}
`
const result = await this.runQuery(query)
return {
completeness: {
email: result.non_null_emails / result.total_rows,
phone: result.non_null_phones / result.total_rows
},
missingData: {
emails: result.missing_emails
}
}
}
}
Data Tests
-- dbt data tests
-- tests/assert_unique_user_id.sql
SELECT user_id, COUNT(*) as count
FROM {{ ref('users') }}
GROUP BY user_id
HAVING COUNT(*) > 1
-- tests/assert_valid_email.sql
SELECT *
FROM {{ ref('users') }}
WHERE email NOT LIKE '%@%.%'
-- tests/assert_revenue_positive.sql
SELECT *
FROM {{ ref('orders') }}
WHERE revenue < 0
-- tests/assert_recent_data.sql
SELECT MAX(created_at) as last_update
FROM {{ ref('users') }}
HAVING DATEDIFF(CURRENT_DATE, MAX(created_at)) > 1
Data Warehousing
Snowflake Example
-- Create warehouse for compute
CREATE WAREHOUSE analytics_wh
WITH WAREHOUSE_SIZE = 'MEDIUM'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE;
-- Create database and schema
CREATE DATABASE analytics;
CREATE SCHEMA analytics.staging;
CREATE SCHEMA analytics.production;
-- Load data from S3
COPY INTO analytics.staging.users
FROM 's3://my-bucket/users/'
CREDENTIALS = (AWS_KEY_ID='xxx' AWS_SECRET_KEY='xxx')
FILE_FORMAT = (TYPE = PARQUET);
-- Transform and load to production
CREATE OR REPLACE TABLE analytics.production.users AS
SELECT
user_id,
email,
CONCAT(first_name, ' ', last_name) as full_name,
DATE_TRUNC('day', created_at) as signup_date,
country,
subscription_tier
FROM analytics.staging.users
WHERE created_at >= DATEADD(day, -30, CURRENT_DATE);
-- Create materialized view for analytics
CREATE MATERIALIZED VIEW analytics.production.daily_metrics AS
SELECT
DATE_TRUNC('day', order_date) as date,
COUNT(DISTINCT user_id) as active_users,
COUNT(*) as total_orders,
SUM(revenue) as total_revenue,
AVG(revenue) as avg_order_value
FROM analytics.production.orders
GROUP BY DATE_TRUNC('day', order_date);
Data Pipeline Orchestration
Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.sql import SQLExecuteQueryOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True
}
with DAG(
'daily_etl_pipeline',
default_args=default_args,
description='Daily ETL pipeline',
schedule_interval='0 2 * * *', # 2 AM daily
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
# Extract from source
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
# Transform data
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
# Load to warehouse
load_task = SQLExecuteQueryOperator(
task_id='load_to_warehouse',
sql='load_data.sql'
)
# Run dbt models
dbt_task = PythonOperator(
task_id='run_dbt_models',
python_callable=run_dbt
)
# Data quality checks
quality_task = PythonOperator(
task_id='data_quality_checks',
python_callable=run_quality_checks
)
# Update metrics
metrics_task = PythonOperator(
task_id='update_metrics',
python_callable=update_dashboard_metrics
)
# Define dependencies
extract_task >> transform_task >> load_task >> dbt_task >> quality_task >> metrics_task
Prefect
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(cache_key_fn=task_input_hash, cache_expiration=timedelta(hours=1))
def extract_data(date: str):
# Extract logic
return data
@task
def transform_data(data):
# Transform logic
return transformed
@task
def load_data(data):
# Load logic
pass
@flow(name="etl-pipeline")
def etl_pipeline(date: str):
raw_data = extract_data(date)
transformed_data = transform_data(raw_data)
load_data(transformed_data)
if __name__ == "__main__":
etl_pipeline("2024-01-15")
Data Processing
PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, avg, count, date_format
spark = SparkSession.builder \
.appName("DataProcessing") \
.getOrCreate()
# Read from data lake
df = spark.read.parquet("s3://bucket/raw/users/")
# Transform
transformed = df \
.filter(col("created_at") >= "2024-01-01") \
.withColumn("full_name",
concat(col("first_name"), lit(" "), col("last_name"))) \
.withColumn("revenue_category",
when(col("revenue") > 1000, "high")
.when(col("revenue") > 100, "medium")
.otherwise("low")) \
.select("user_id", "full_name", "email", "revenue_category")
# Aggregate
daily_stats = transformed \
.groupBy(date_format("created_at", "yyyy-MM-dd").alias("date")) \
.agg(
count("*").alias("user_count"),
avg("revenue").alias("avg_revenue")
)
# Write to data warehouse
daily_stats.write \
.mode("overwrite") \
.partitionBy("date") \
.parquet("s3://bucket/processed/daily_stats/")
Real-time Analytics
Kafka Streams
import { Kafka } from 'kafkajs'
class RealTimeAnalytics {
private kafka: Kafka
async aggregateEvents() {
const consumer = this.kafka.consumer({ groupId: 'analytics' })
const producer = this.kafka.producer()
// Windowed aggregation
const windowSize = 60000 // 1 minute
const windows = new Map<string, WindowMetrics>()
await consumer.subscribe({ topic: 'user-events' })
await consumer.run({
eachMessage: async ({ message }) => {
const event = JSON.parse(message.value.toString())
const windowKey = this.getWindowKey(event.timestamp, windowSize)
// Update window metrics
if (!windows.has(windowKey)) {
windows.set(windowKey, {
start: new Date(windowKey),
eventCount: 0,
uniqueUsers: new Set(),
revenue: 0
})
}
const window = windows.get(windowKey)!
window.eventCount++
window.uniqueUsers.add(event.userId)
window.revenue += event.revenue || 0
// Emit aggregated metrics
await producer.send({
topic: 'analytics-metrics',
messages: [{
key: windowKey,
value: JSON.stringify({
window: windowKey,
eventCount: window.eventCount,
uniqueUsers: window.uniqueUsers.size,
totalRevenue: window.revenue,
avgRevenue: window.revenue / window.eventCount
})
}]
})
}
})
}
private getWindowKey(timestamp: number, windowSize: number): string {
const windowStart = Math.floor(timestamp / windowSize) * windowSize
return new Date(windowStart).toISOString()
}
}
Data Catalog & Governance
Metadata Management
interface DataCatalog {
tables: DataTable[]
datasets: Dataset[]
pipelines: Pipeline[]
}
interface DataTable {
name: string
database: string
schema: Column[]
owner: string
description: string
tags: string[]
lineage: {
upstream: string[]
downstream: string[]
}
quality: {
completeness: number
freshness: number
accuracy: number
}
}
// Catalog service
class CatalogService {
async registerTable(table: DataTable) {
await this.db.tables.create(table)
await this.indexForSearch(table)
await this.trackLineage(table)
}
async searchTables(query: string): Promise<DataTable[]> {
return this.db.tables.search({
where: {
OR: [
{ name: { contains: query } },
{ description: { contains: query } },
{ tags: { has: query } }
]
}
})
}
async getLineage(tableName: string) {
const table = await this.db.tables.findUnique({
where: { name: tableName },
include: { lineage: true }
})
return {
upstream: await this.getUpstreamTables(table),
downstream: await this.getDownstreamTables(table)
}
}
}
Performance Optimization
Partitioning
-- Time-based partitioning
CREATE TABLE events (
event_id BIGINT,
user_id BIGINT,
event_type VARCHAR(50),
event_date DATE,
properties JSON
)
PARTITION BY RANGE (event_date) (
PARTITION p_2024_01 VALUES LESS THAN ('2024-02-01'),
PARTITION p_2024_02 VALUES LESS THAN ('2024-03-01'),
PARTITION p_2024_03 VALUES LESS THAN ('2024-04-01')
);
-- Query with partition pruning
SELECT *
FROM events
WHERE event_date >= '2024-02-01'
AND event_date < '2024-03-01';
Indexing
-- B-tree index for exact matches
CREATE INDEX idx_user_email ON users(email);
-- Hash index for equality
CREATE INDEX idx_user_id_hash ON users USING HASH(user_id);
-- Composite index
CREATE INDEX idx_orders_user_date ON orders(user_id, order_date);
-- Covering index
CREATE INDEX idx_users_covering ON users(user_id) INCLUDE (email, name);
Data Pipeline Monitoring
class PipelineMonitor {
async trackPipelineRun(pipeline: string, run: PipelineRun) {
// Record metrics
await this.recordMetrics({
pipeline,
duration: run.duration,
rowsProcessed: run.rowsProcessed,
status: run.status,
timestamp: run.timestamp
})
// Check SLAs
if (run.duration > this.getSLA(pipeline).maxDuration) {
await this.alertSlaBreach(pipeline, run)
}
// Data quality alerts
if (run.qualityScore < 0.95) {
await this.alertQualityIssue(pipeline, run)
}
// Freshness check
const lastRun = await this.getLastSuccessfulRun(pipeline)
const timeSinceLastRun = Date.now() - lastRun.timestamp
if (timeSinceLastRun > 3600000) { // 1 hour
await this.alertStaleness(pipeline)
}
}
}
Best Practices
Do
✅ Version control your data transformations ✅ Implement data quality checks ✅ Document data lineage ✅ Use idempotent pipelines ✅ Partition large tables ✅ Monitor pipeline performance ✅ Implement proper error handling ✅ Use incremental processing when possible
Don't
❌ Hardcode credentials ❌ Skip data validation ❌ Ignore data quality issues ❌ Process everything in one pipeline ❌ Forget about data retention ❌ Ignore pipeline failures ❌ Skip documentation ❌ Overload single nodes
Tools & Technologies
Storage
- Data Lakes: S3, Azure Data Lake, Google Cloud Storage
- Data Warehouses: Snowflake, BigQuery, Redshift
- Databases: PostgreSQL, MySQL, MongoDB
Processing
- Batch: Apache Spark, dbt, Apache Beam
- Streaming: Kafka, Flink, Kinesis
Orchestration
- Workflow: Apache Airflow, Prefect, Dagster
- Serverless: AWS Step Functions, Azure Data Factory
Quality & Governance
- Testing: Great Expectations, dbt tests
- Catalog: Apache Atlas, DataHub, Amundsen
- Lineage: OpenLineage, Marquez
Monitoring
- Observability: DataDog, Monte Carlo, Soda
- Alerting: PagerDuty, Slack, Email