Real-time CDC pipeline β from raw transactions to analytics-ready data
This pipeline ingests the Olist Brazilian E-Commerce dataset from a source PostgreSQL database and streams it in real time through Kafka into a clean analytics warehouse β with no manual refresh needed.
Source DB
β
β WAL (Write-Ahead Log)
βΌ
Debezium CDC βββΊ Kafka Topics βββΊ Spark Streaming
β
βΌ
Silver Tables (ecommerce_db)
β
βΌ
Gold Materialized Views
Any insert or update in the source DB propagates to the gold analytics layer within ~10 seconds β automatically.
| Layer | Tool |
|---|---|
| Change Data Capture | Debezium 2.7 + PostgreSQL pgoutput |
| Message Broker | Apache Kafka (Confluent) |
| Stream Processing | Apache Spark 3.5 Structured Streaming |
| Storage | PostgreSQL 14+ |
| Orchestration | Docker Compose + Taskfile |
Raw, cleaned mirror of source tables. Upserted on every Kafka batch.
customers Β· orders Β· order_items Β· order_payments Β· order_reviews Β· products Β· sellers
Aggregated and dashboard-ready. Auto-refreshed whenever orders or payments are updated.
| View | Contents |
|---|---|
gold_sales_monthly |
Revenue, order volume, avg order value per month |
gold_sales_daily |
Day-level sales summary |
gold_customer_metrics |
Spending, order history, review scores per customer |
gold_product_performance |
Revenue, units sold, ratings per product |
gold_seller_performance |
Revenue, ratings, product count per seller |
gold_delivery_analytics |
On-time rate, avg delivery days, late deliveries |
gold_payment_analytics |
Payment method breakdown, installment patterns |
- Docker Desktop
- PostgreSQL 14+ β running locally
- Taskfile β task runner
npm install -g @go-task/cliPostgreSQL must have logical WAL enabled β required for Debezium CDC.
Connect to PostgreSQL and run:
-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
-- Grant replication permission to your user
ALTER USER postgres REPLICATION;Then restart PostgreSQL for the changes to take effect:
# Windows (run as Administrator)
net stop postgresql-x64-17
net start postgresql-x64-17# Linux / macOS
sudo systemctl restart postgresqlVerify:
SHOW wal_level; -- should return 'logical'git clone https://github.com/your-username/ecommerce-dashboard.git
cd ecommerce-dashboard
cp .env.example .envEdit .env:
# Silver / Gold database
DB_URL=jdbc:postgresql://host.docker.internal:5432/ecommerce_db
DB_HOST=host.docker.internal
DB_NAME=ecommerce_db
DB_USER=postgres
DB_PASS=your_password
# Source database
SOURCE_DB_NAME=source_db
TOPIC_PREFIX=kafka_topic_prefix
# Kafka
KAFKA_BOOTSTRAP_SERVERS=kafka:29092On Linux, replace
host.docker.internalwith172.17.0.1
psql -U postgres -c "CREATE DATABASE ecommerce_db;"
psql -U postgres -d ecommerce_db -f init.sql
psql -U postgres -d ecommerce_db -f gold_layer.sqlDownload the dataset from Kaggle, place the CSV files in csvs/, then:
task loadThis loads all CSVs into the source database.
task upThat's it. Debezium will snapshot the source DB, push all records through Kafka, and Spark will populate the silver and gold layers automatically.
task up # Start all services
task down # Stop all services
task restart # Stop then start
task build # Rebuild Docker images
task load # Load CSVs into source DB (once)
task status # Container status
task connector # Debezium connector status
task logs-spark # Spark streaming logs
task logs-kafka # Kafka logs
task logs-debezium # Debezium connector logs
task db-source # psql into source DB
task db-silver # psql into silver DB (ecommerce_db)
task refresh-gold # Manually refresh all gold views
task clean # Stop + delete all volumes
task clean-checkpoints # Clear Spark checkpoints| Service | URL |
|---|---|
| Spark Master UI | http://localhost:8081 |
| Spark Worker UI | http://localhost:8082 |
| Kafka Connect API | http://localhost:8083/connectors |
Gold views are empty
psql -U postgres -d ecommerce_db -c "SELECT COUNT(*) FROM silver_orders WHERE order_purchase_timestamp IS NOT NULL;"If count is 0, timestamps failed to parse. Clear checkpoints and rebuild:
task clean-checkpoints
docker compose up -d --build streaming-appDebezium connector in FAILED state
task connector # check status
curl -X POST http://localhost:8083/connectors/postgres-connector/restartKafka broker conflict on restart
task clean # wipes volumes
task upInspect a raw Kafka message
docker exec ecommerce-dashboard-kafka-1 kafka-console-consumer \
--bootstrap-server localhost:29092 \
--topic kafka_topic_prefix.public.orders \
--from-beginning --max-messages 1ecommerce-dashboard/
βββ csvs/ # Source CSV files (Kaggle dataset)
βββ debezium/
β βββ postgres-connector.json # Debezium connector config
|
βββ dockerfiles/
β βββ csv_to_db.Dockerfile
β βββ spark_app.Dockerfile
βββ scripts/
β βββ csv_to_db.py # CSV loader
β βββ spark_app.py # Spark streaming job
|
βββ init.sql # Silver layer schema
βββ gold_layer.sql # Gold materialized views
βββ compose.yml # Main services
βββ load-compose.yml # CSV loader service
βββ Taskfile.yml
βββ .env.example
- Debezium sends PostgreSQL timestamps as microseconds since epoch (
MicroTimestamp). Spark converts them withfrom_unixtime(col / 1000000). - Clearing checkpoints causes Spark to re-read Kafka from the beginning. Upsert logic prevents duplicate data.
- Gold views refresh automatically when
ordersororder_paymentsbatches are processed. Usetask refresh-goldto trigger manually. host.docker.internalis a Docker DNS name for the host machine. Not available on Linux by default β use172.17.0.1or addextra_hoststocompose.yml.