StreamCore is a modular, event‑driven framework that lets you build complex Twitch applications without dealing with OAuth boilerplate, EventSub quirks, or manual WebSocket handling.
- 🔌 Plugin System – Drop a Python file in
workers/and it is auto‑registered. No import changes required. - 🛡️ Auto‑Auth – The system checks token scopes, halts on missing permissions and prints a one‑click auth URL.
- 🔄 Token Rotation – Refreshes access tokens automatically.
- ⚡ Event‑Driven – Redis Pub/Sub decouples event ingestion from processing.
- 🐳 Docker Ready –
docker-compose.ymlprovides PostgreSQL + Redis. - 🚀 uv Powered – Fast dependency management and execution.
- 🔗 Unified Events System – Workers emit to
events:{event_name}. A single WebSocket endpoint/ws/eventsserves every client type (overlays, robots, ESP32, mobile apps, etc.).
- Docker (for DB & Redis)
uv– install with:curl -LsSf https://astral.sh/uv/install.sh | sh # mac/linux # Windows: irm https://astral.sh/uv/install.ps1 | iex
# Start infrastructure
cd db_infrastructure
docker-compose up -d
cd ..
# Create .env from example and fill in your Twitch credentials
cp .env.example .env
# Edit .env → set TWITCH_CLIENT_ID, TWITCH_CLIENT_SECRET, etc.uv run fastapi dev main.pyThe API will be available at http://localhost:8000.
curl -X POST http://localhost:8000/system/start- On first run you will receive an
auth_urlin the JSON response – open it, authorize the app, and then call/system/startagain.
All clients connect to the same endpoint:
const ws = new WebSocket('ws://localhost:8000/ws/events');
ws.onmessage = (e) => console.log('event →', JSON.parse(e.data));Filter locally by the event field, e.g. event === 'chat'.
- Gateway (
gateway/service.py) – Maintains the Twitch EventSub WebSocket, subscribes to all events required by registered workers, and pushes raw events to Redis (eventsub:*). - Redis Bus – Decouples ingestion from processing. Workers publish higher‑level events to
events:{event_name}. - Events Manager (
events/manager.py) – A genericConnectionManagerthat holds WebSocket connections and broadcasts anyevents:*message to all clients. - Worker Registry (
workers/base.py) – Metaclass registers everyBaseWorkersubclass automatically. - Token Manager – Handles token storage, refresh, and scope verification.
| Variable | Description |
|---|---|
TWITCH_CLIENT_ID |
Your Twitch app client ID |
TWITCH_CLIENT_SECRET |
Your Twitch app secret |
TWITCH_REDIRECT_URI |
Callback URL (default http://localhost:8000/tokens/callback) |
POSTGRES_* |
PostgreSQL credentials |
REDIS_URL |
Redis connection string (default redis://localhost:6379/0) |
The system uses a Pub/Sub architecture via Redis to communicate Workers with the Frontend (Overlays).
- Worker Emits Event: When a worker (e.g.,
subscription_worker.py) detects something interesting, it callsawait self.emit("event_name", {data}). - Redis Pub: This
emitmethod publishes the message to a Redis channel namedevents:event_name. - Broadcaster Task: In
main.py, theevents_broadcaster_tasklistens to all channels starting withevents:*. - WebSocket Push: Upon receiving a Redis message, it immediately forwards it to all clients connected to the
/ws/eventsWebSocket. - Browser Overlay: Your HTML/JS in the overlay receives the JSON and updates the UI.
graph LR
A[Twitch EventSub] -->|Webhook/WS| B(Gateway)
B -->|Redis| C[Worker]
C -->|Logic| C
C -->|self.emit()| D[Redis Pub/Sub]
D -->|events:*| E[FastAPI Broadcaster]
E -->|WebSocket| F[Browser Overlay 1]
E -->|WebSocket| G[Browser Overlay 2]
To deploy in a production environment (Server, Raspberry Pi, VPS), we recommend using the included Docker Compose configuration which orchestrates the App, Database, and Redis together.
-
Clone and Configure:
git clone <repo> cd StreamCore cp .env.example .env # Edit .env with your real Twitch credentials
-
Run with Docker Compose: This will build the application image and start Postgres, Redis, and StreamCore in the background.
docker-compose -f docker-compose.prod.yaml up -d --build
The application will be available at
http://your-server-ip:8000.To view logs:
docker-compose -f docker-compose.prod.yaml logs -f
from workers.base import BaseWorker
import logging
logger = logging.getLogger('Greeter')
class GreeterWorker(BaseWorker):
# Scopes required by this worker
required_scopes = ["moderator:read:followers", "user:write:chat"]
def get_subscriptions(self, broadcaster_id, moderator_id):
return [{
"type": "channel.follow",
"version": "2",
"condition": {
"broadcaster_user_id": broadcaster_id,
"moderator_user_id": moderator_id,
},
}]
async def handle(self, event_type, payload):
if event_type == "channel.follow":
user = payload["event"]["user_name"]
logger.info(f"New follower: {user}")
# Example: emit a custom event for any client
await self.emit("follow", {"user": user})Place the file in workers/ and it will be discovered automatically.
POST /system/start– Starts Gateway, Redis consumer, and events broadcaster.POST /system/stop– Gracefully shuts everything down.GET /tokens/link– Returns an auth URL for the current required scopes.GET /– Health check (returns{status: "StreamCore Online", running: <bool>}).
- ModuleNotFoundError – Ensure you run the app with
uv run. Add missing packages withuv add <pkg>. - 401 Unauthorized – No token present. Call
/system/startto get theauth_url, authorize, then retry. - Missing Scopes – The logs will list required scopes. Re‑authorize using the provided URL.
- WebSocket disconnects – Check network stability. The server will attempt to reconnect; persistent failures usually indicate token issues.
- Typed Events Schema – Define a Pydantic model for each event type to give clients static typing and validation.
- Authentication Middleware – Protect the
/ws/eventsendpoint with optional JWT so only authorized clients can subscribe. - Rate‑Limiting & Back‑Pressure – Add a small queue/buffer in
events/manager.pyto avoid flooding slow clients. - Docker Compose for Development – Include a
docker-compose.dev.ymlthat also starts the FastAPI server for one‑command dev environments. - Test Suite – Add unit tests for the
BaseWorker.emitmethod and theevents/managerbroadcast logic (e.g., usingpytest-asyncio).
Happy hacking with StreamCore!