Skip to content

Niwarthana00/ecommerce-dashboard

Repository files navigation

πŸ›’ Ecommerce Streaming Pipeline

Real-time CDC pipeline β€” from raw transactions to analytics-ready data

Kafka Spark Debezium PostgreSQL Docker


Overview

This pipeline ingests the Olist Brazilian E-Commerce dataset from a source PostgreSQL database and streams it in real time through Kafka into a clean analytics warehouse β€” with no manual refresh needed.

Source DB
    β”‚
    β”‚  WAL (Write-Ahead Log)
    β–Ό
Debezium CDC  ──►  Kafka Topics  ──►  Spark Streaming
                                            β”‚
                                            β–Ό
                                    Silver Tables  (ecommerce_db)
                                            β”‚
                                            β–Ό
                                    Gold Materialized Views

Any insert or update in the source DB propagates to the gold analytics layer within ~10 seconds β€” automatically.


Stack

Layer Tool
Change Data Capture Debezium 2.7 + PostgreSQL pgoutput
Message Broker Apache Kafka (Confluent)
Stream Processing Apache Spark 3.5 Structured Streaming
Storage PostgreSQL 14+
Orchestration Docker Compose + Taskfile

Data Layers

Silver β€” 7 tables

Raw, cleaned mirror of source tables. Upserted on every Kafka batch.

customers Β· orders Β· order_items Β· order_payments Β· order_reviews Β· products Β· sellers

Gold β€” 7 materialized views

Aggregated and dashboard-ready. Auto-refreshed whenever orders or payments are updated.

View Contents
gold_sales_monthly Revenue, order volume, avg order value per month
gold_sales_daily Day-level sales summary
gold_customer_metrics Spending, order history, review scores per customer
gold_product_performance Revenue, units sold, ratings per product
gold_seller_performance Revenue, ratings, product count per seller
gold_delivery_analytics On-time rate, avg delivery days, late deliveries
gold_payment_analytics Payment method breakdown, installment patterns

Prerequisites

npm install -g @go-task/cli

PostgreSQL must have logical WAL enabled β€” required for Debezium CDC.

Connect to PostgreSQL and run:

-- Enable logical replication
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;

-- Grant replication permission to your user
ALTER USER postgres REPLICATION;

Then restart PostgreSQL for the changes to take effect:

# Windows (run as Administrator)
net stop postgresql-x64-17
net start postgresql-x64-17
# Linux / macOS
sudo systemctl restart postgresql

Verify:

SHOW wal_level;  -- should return 'logical'

Setup

1. Clone & configure

git clone https://github.com/your-username/ecommerce-dashboard.git
cd ecommerce-dashboard
cp .env.example .env

Edit .env:

# Silver / Gold database
DB_URL=jdbc:postgresql://host.docker.internal:5432/ecommerce_db
DB_HOST=host.docker.internal
DB_NAME=ecommerce_db
DB_USER=postgres
DB_PASS=your_password

# Source database
SOURCE_DB_NAME=source_db
TOPIC_PREFIX=kafka_topic_prefix

# Kafka
KAFKA_BOOTSTRAP_SERVERS=kafka:29092

On Linux, replace host.docker.internal with 172.17.0.1

2. Create the target database

psql -U postgres -c "CREATE DATABASE ecommerce_db;"
psql -U postgres -d ecommerce_db -f init.sql
psql -U postgres -d ecommerce_db -f gold_layer.sql

3. Load source data

Download the dataset from Kaggle, place the CSV files in csvs/, then:

task load

This loads all CSVs into the source database.

4. Start the pipeline

task up

That's it. Debezium will snapshot the source DB, push all records through Kafka, and Spark will populate the silver and gold layers automatically.


Task Commands

task up                # Start all services
task down              # Stop all services
task restart           # Stop then start
task build             # Rebuild Docker images
task load              # Load CSVs into source DB (once)

task status            # Container status
task connector         # Debezium connector status
task logs-spark        # Spark streaming logs
task logs-kafka        # Kafka logs
task logs-debezium     # Debezium connector logs

task db-source         # psql into source DB 
task db-silver         # psql into silver DB (ecommerce_db)
task refresh-gold      # Manually refresh all gold views

task clean             # Stop + delete all volumes
task clean-checkpoints # Clear Spark checkpoints

Monitoring

Service URL
Spark Master UI http://localhost:8081
Spark Worker UI http://localhost:8082
Kafka Connect API http://localhost:8083/connectors

Troubleshooting

Gold views are empty

psql -U postgres -d ecommerce_db -c "SELECT COUNT(*) FROM silver_orders WHERE order_purchase_timestamp IS NOT NULL;"

If count is 0, timestamps failed to parse. Clear checkpoints and rebuild:

task clean-checkpoints
docker compose up -d --build streaming-app

Debezium connector in FAILED state

task connector   # check status
curl -X POST http://localhost:8083/connectors/postgres-connector/restart

Kafka broker conflict on restart

task clean       # wipes volumes
task up

Inspect a raw Kafka message

docker exec ecommerce-dashboard-kafka-1 kafka-console-consumer \
  --bootstrap-server localhost:29092 \
  --topic kafka_topic_prefix.public.orders \
  --from-beginning --max-messages 1

Project Structure

ecommerce-dashboard/
β”œβ”€β”€ csvs/                          # Source CSV files (Kaggle dataset)
β”œβ”€β”€ debezium/
β”‚   └── postgres-connector.json    # Debezium connector config 
|
β”œβ”€β”€ dockerfiles/
β”‚   β”œβ”€β”€ csv_to_db.Dockerfile
β”‚   └── spark_app.Dockerfile
β”œβ”€β”€ scripts/
β”‚   β”œβ”€β”€ csv_to_db.py               # CSV loader
β”‚   └── spark_app.py               # Spark streaming job
|
β”œβ”€β”€ init.sql                       # Silver layer schema
β”œβ”€β”€ gold_layer.sql                 # Gold materialized views
β”œβ”€β”€ compose.yml                    # Main services
β”œβ”€β”€ load-compose.yml               # CSV loader service
β”œβ”€β”€ Taskfile.yml
└── .env.example

Notes

  • Debezium sends PostgreSQL timestamps as microseconds since epoch (MicroTimestamp). Spark converts them with from_unixtime(col / 1000000).
  • Clearing checkpoints causes Spark to re-read Kafka from the beginning. Upsert logic prevents duplicate data.
  • Gold views refresh automatically when orders or order_payments batches are processed. Use task refresh-gold to trigger manually.
  • host.docker.internal is a Docker DNS name for the host machine. Not available on Linux by default β€” use 172.17.0.1 or add extra_hosts to compose.yml.

About

πŸ›’ Real-time e-commerce analytics pipeline built on the Olist Brazil dataset β€” PostgreSQL CDC via Debezium, streamed through Kafka, processed with Spark Streaming into Silver/Gold layers, visualized with Python Dash

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors