Skip to content

theanibalos/StreamCore

Repository files navigation

StreamCore Framework

Python 3.11+ FastAPI License

StreamCore is a modular, event‑driven framework that lets you build complex Twitch applications without dealing with OAuth boilerplate, EventSub quirks, or manual WebSocket handling.

✨ Core Features

  • 🔌 Plugin System – Drop a Python file in workers/ and it is auto‑registered. No import changes required.
  • 🛡️ Auto‑Auth – The system checks token scopes, halts on missing permissions and prints a one‑click auth URL.
  • 🔄 Token Rotation – Refreshes access tokens automatically.
  • ⚡ Event‑Driven – Redis Pub/Sub decouples event ingestion from processing.
  • 🐳 Docker Readydocker-compose.yml provides PostgreSQL + Redis.
  • 🚀 uv Powered – Fast dependency management and execution.
  • 🔗 Unified Events System – Workers emit to events:{event_name}. A single WebSocket endpoint /ws/events serves every client type (overlays, robots, ESP32, mobile apps, etc.).

🚀 Quick Start

1. Prerequisites

  • Docker (for DB & Redis)
  • uv – install with:
    curl -LsSf https://astral.sh/uv/install.sh | sh   # mac/linux
    # Windows: irm https://astral.sh/uv/install.ps1 | iex

2. Setup

# Start infrastructure
cd db_infrastructure
docker-compose up -d
cd ..

# Create .env from example and fill in your Twitch credentials
cp .env.example .env
# Edit .env → set TWITCH_CLIENT_ID, TWITCH_CLIENT_SECRET, etc.

3. Run the application

uv run fastapi dev main.py

The API will be available at http://localhost:8000.

4. Start the system (via API)

curl -X POST http://localhost:8000/system/start
  • On first run you will receive an auth_url in the JSON response – open it, authorize the app, and then call /system/start again.

5. Connect a client

All clients connect to the same endpoint:

const ws = new WebSocket('ws://localhost:8000/ws/events');
ws.onmessage = (e) => console.log('event →', JSON.parse(e.data));

Filter locally by the event field, e.g. event === 'chat'.

🏗️ Architecture Overview

  1. Gateway (gateway/service.py) – Maintains the Twitch EventSub WebSocket, subscribes to all events required by registered workers, and pushes raw events to Redis (eventsub:*).
  2. Redis Bus – Decouples ingestion from processing. Workers publish higher‑level events to events:{event_name}.
  3. Events Manager (events/manager.py) – A generic ConnectionManager that holds WebSocket connections and broadcasts any events:* message to all clients.
  4. Worker Registry (workers/base.py) – Metaclass registers every BaseWorker subclass automatically.
  5. Token Manager – Handles token storage, refresh, and scope verification.

⚙️ Configuration (.env)

Variable Description
TWITCH_CLIENT_ID Your Twitch app client ID
TWITCH_CLIENT_SECRET Your Twitch app secret
TWITCH_REDIRECT_URI Callback URL (default http://localhost:8000/tokens/callback)
POSTGRES_* PostgreSQL credentials
REDIS_URL Redis connection string (default redis://localhost:6379/0)

📡 How Overlays Work (WebSockets)

The system uses a Pub/Sub architecture via Redis to communicate Workers with the Frontend (Overlays).

  1. Worker Emits Event: When a worker (e.g., subscription_worker.py) detects something interesting, it calls await self.emit("event_name", {data}).
  2. Redis Pub: This emit method publishes the message to a Redis channel named events:event_name.
  3. Broadcaster Task: In main.py, the events_broadcaster_task listens to all channels starting with events:*.
  4. WebSocket Push: Upon receiving a Redis message, it immediately forwards it to all clients connected to the /ws/events WebSocket.
  5. Browser Overlay: Your HTML/JS in the overlay receives the JSON and updates the UI.
graph LR
    A[Twitch EventSub] -->|Webhook/WS| B(Gateway)
    B -->|Redis| C[Worker]
    C -->|Logic| C
    C -->|self.emit()| D[Redis Pub/Sub]
    D -->|events:*| E[FastAPI Broadcaster]
    E -->|WebSocket| F[Browser Overlay 1]
    E -->|WebSocket| G[Browser Overlay 2]
Loading

🏭 Production (Docker)

To deploy in a production environment (Server, Raspberry Pi, VPS), we recommend using the included Docker Compose configuration which orchestrates the App, Database, and Redis together.

  1. Clone and Configure:

    git clone <repo>
    cd StreamCore
    cp .env.example .env
    # Edit .env with your real Twitch credentials
  2. Run with Docker Compose: This will build the application image and start Postgres, Redis, and StreamCore in the background.

    docker-compose -f docker-compose.prod.yaml up -d --build

    The application will be available at http://your-server-ip:8000.

    To view logs:

    docker-compose -f docker-compose.prod.yaml logs -f

🛠️ Creating a Worker

from workers.base import BaseWorker
import logging

logger = logging.getLogger('Greeter')

class GreeterWorker(BaseWorker):
    # Scopes required by this worker
    required_scopes = ["moderator:read:followers", "user:write:chat"]

    def get_subscriptions(self, broadcaster_id, moderator_id):
        return [{
            "type": "channel.follow",
            "version": "2",
            "condition": {
                "broadcaster_user_id": broadcaster_id,
                "moderator_user_id": moderator_id,
            },
        }]

    async def handle(self, event_type, payload):
        if event_type == "channel.follow":
            user = payload["event"]["user_name"]
            logger.info(f"New follower: {user}")
            # Example: emit a custom event for any client
            await self.emit("follow", {"user": user})

Place the file in workers/ and it will be discovered automatically.

📚 API Reference

  • POST /system/start – Starts Gateway, Redis consumer, and events broadcaster.
  • POST /system/stop – Gracefully shuts everything down.
  • GET /tokens/link – Returns an auth URL for the current required scopes.
  • GET / – Health check (returns {status: "StreamCore Online", running: <bool>}).

❓ Troubleshooting

  • ModuleNotFoundError – Ensure you run the app with uv run. Add missing packages with uv add <pkg>.
  • 401 Unauthorized – No token present. Call /system/start to get the auth_url, authorize, then retry.
  • Missing Scopes – The logs will list required scopes. Re‑authorize using the provided URL.
  • WebSocket disconnects – Check network stability. The server will attempt to reconnect; persistent failures usually indicate token issues.

📈 Suggested Improvements

  1. Typed Events Schema – Define a Pydantic model for each event type to give clients static typing and validation.
  2. Authentication Middleware – Protect the /ws/events endpoint with optional JWT so only authorized clients can subscribe.
  3. Rate‑Limiting & Back‑Pressure – Add a small queue/buffer in events/manager.py to avoid flooding slow clients.
  4. Docker Compose for Development – Include a docker-compose.dev.yml that also starts the FastAPI server for one‑command dev environments.
  5. Test Suite – Add unit tests for the BaseWorker.emit method and the events/manager broadcast logic (e.g., using pytest-asyncio).

Happy hacking with StreamCore!

About

Twitch Backend

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors