chore(asyncio): hardening pass — utilities, deprecated API cleanup, lifespan fix#9552
Conversation
…ifespan fix - Add marimo/_utils/asyncio_utils.py with supervised_task, fire_and_forget, and cancel_and_wait helpers (strong-ref task tracking, exception logging, shutdown dance). - Fix Lifespans._manager swallowing CancelledError, which was breaking shutdown propagation through the ASGI lifespan chain. - Replace the type-confused run_async() helper in session_manager with fire_and_forget at its three callers. - Replace 11 deprecated asyncio.get_event_loop() sites with get_running_loop() (or asyncio.to_thread in tool_manager). - Migrate api/lifespans.py and execution.py hand-rolled set+callback / cancel-and-await patterns to the new helpers. - ConnectionDistributor now pins the loop at start() and reuses it on stop().
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
There was a problem hiding this comment.
1 issue found across 18 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="marimo/_server/api/lifespans.py">
<violation number="1" location="marimo/_server/api/lifespans.py:133">
P2: Handle cancelled MCP task before calling `task.result()`; otherwise `CancelledError` can escape this cleanup block and interrupt lifespan shutdown.</violation>
</file>
Architecture diagram
sequenceDiagram
participant Client as HTTP/WS Client
participant Server as ASGI Server (Uvicorn)
participant LS as Lifespans Manager
participant LL as Lifespan Context (LSP/MCP)
participant SM as Session Manager
participant EB as Event Bus
participant CD as Connection Distributor
participant AM as Async Middleware
participant FW as File Watcher
participant TM as Tool Manager
participant EX as Exporter
participant EU as Asyncio Utils
participant EH as Extensions Heartbeat
participant IH as Interrupt Handler
participant WS as WebSocket Endpoint
Note over EU: NEW: asyncio_utils.py<br/>supervised_task, fire_and_forget,<br/>cancel_and_wait helpers
Note over LS,LL: LIFESPAN FIX: CancelledError propagates<br/>through chain for shutdown propagation
Client->>Server: HTTP/WS Request
alt ASGI Lifespan Startup
Server->>LS: __call__(app)
LS->>LL: enter lifespan contexts
LL->>EU: supervised_task(lsp.start, registry=background_tasks)
LL->>EU: supervised_task(mcp.connect, registry=background_tasks)
EU->>EU: asyncio.create_task(name=name)<br/>add_done_callback(log exception)<br/>registry.add(task)
LL-->>LS: yield
LS-->>Server: lifespan started
end
Server->>SM: create_session()
SM->>EU: fire_and_forget(session.created)
alt No running loop
EU->>EU: asyncio.run(coro)
else Running loop
EU->>EU: supervised_task(name="session.created",<br/>registry=_BACKGROUND_TASKS)
end
SM-->>Server: session
Server->>SM: close_session()
SM->>EU: fire_and_forget(session.closed)
SM-->>Server: closed
Server->>AM: forward request
AM->>AM: get_running_loop().run_in_executor()
Server->>CD: start()
CD->>CD: _loop = get_running_loop()<br/>_loop.add_reader(fileno, on_change)
CD-->>Server: Disposable
Server->>CD: stop()
CD->>CD: _loop.remove_reader(fileno)<br/>close input_connection
CD-->>Server: stopped
Server->>WS: WebSocket connect
WS->>WS: get_running_loop().call_later(ttl, close)
WS-->>Server: connected
Server->>EX: export file
EX->>EX: get_running_loop().run_in_executor(write_file)
EX-->>Server: exported
Server->>TM: call handler
TM->>TM: asyncio.to_thread(handler, arguments)
TM-->>Server: result
Server->>EH: start heartbeat
EH->>EH: get_running_loop().create_task(heartbeat)
EH-->>Server: heartbeat started
Server->>IH: interrupt handler
IH->>IH: get_running_loop()<br/>signal.signal(signal.SIGINT)
IH-->>Server: handler registered
alt ASGI Lifespan Shutdown
Server->>LS: trigger cancellation
LS->>LL: CancelledError propagates
LL->>EU: cancel_and_wait(lsp.task)
EU->>EU: task.cancel()<br/>suppress CancelledError
LL->>EU: cancel_and_wait(mcp.task)
EU->>EU: task.cancel()<br/>suppress CancelledError
alt MCP task completed
LL->>LL: task.result() -> mcp_client<br/>mcp_client.disconnect()
end
LL->>LS: exit_stack cleanup
LS-->>Server: lifespan ended
end
Server->>FW: create(path, callback)
FW->>FW: loop = get_running_loop()<br/>create watchdog or polling watcher with loop
FW-->>Server: FileWatcher instance
alt Error: Unhandled Exception in Task
EU->>EU: _log_task_exception()
EU->>EU: LOGGER.error("Unhandled exception...", exc_info=exc)
end
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
There was a problem hiding this comment.
Pull request overview
This PR hardens asyncio usage across the server/runtime by introducing centralized task utilities, removing deprecated asyncio.get_event_loop() usage, and fixing ASGI lifespan cancellation propagation to improve graceful shutdown behavior.
Changes:
- Added
marimo/_utils/asyncio_utils.pyprovidingsupervised_task,fire_and_forget, andcancel_and_wait, with accompanying tests. - Fixed
Lifespans._managerto stop swallowingCancelledError, ensuring ASGI lifespan shutdown cancellation propagates correctly. - Migrated multiple call sites to
asyncio.get_running_loop()/asyncio.to_thread()and refactored ad-hoc task tracking/cancel patterns to the new helpers;ConnectionDistributornow pins the loop used foradd_reader/remove_reader.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/_utils/test_lifespans.py | Updates cancellation test to assert CancelledError propagates while teardown still runs. |
| tests/_utils/test_distributor.py | Updates loop mocking to patch asyncio.get_running_loop. |
| tests/_utils/test_asyncio_utils.py | Adds unit tests for the new asyncio utility helpers. |
| tests/_server/api/endpoints/test_ws.py | Updates websocket endpoint tests to patch get_running_loop. |
| marimo/_utils/lifespans.py | Removes CancelledError swallowing from lifespan manager to restore ASGI shutdown semantics. |
| marimo/_utils/file_watcher.py | Captures the running loop at watcher creation for correct cross-thread scheduling. |
| marimo/_utils/distributor.py | Captures event loop at start() and reuses it on stop() for loop-bound reader APIs. |
| marimo/_utils/background_task.py | Refactors stop_sync() loop detection/shutdown behavior. |
| marimo/_utils/asyncio_utils.py | Introduces centralized helpers for supervised tasks, fire-and-forget execution, and cancellation/await patterns. |
| marimo/_session/extensions/extensions.py | Uses get_running_loop() to schedule heartbeat only when a loop is running. |
| marimo/_server/session_manager.py | Replaces run_async() with fire_and_forget() at session event emission call sites and removes the helper. |
| marimo/_server/export/exporter.py | Uses get_running_loop() for executor-based file writes in async context. |
| marimo/_server/api/middleware.py | Uses get_running_loop() to collect request bodies in async context. |
| marimo/_server/api/lifespans.py | Migrates background task creation/cancellation to supervised_task + cancel_and_wait. |
| marimo/_server/api/interrupt.py | Uses get_running_loop() for signal handler loop affinity. |
| marimo/_server/api/endpoints/ws_endpoint.py | Uses get_running_loop().call_later() for session TTL close scheduling. |
| marimo/_server/api/endpoints/execution.py | Replaces manual cancel/suppress/await with cancel_and_wait() for disconnect watcher task cleanup. |
| marimo/_server/ai/tools/tool_manager.py | Replaces run_in_executor usage with asyncio.to_thread for sync handler execution. |
…down - lifespans.mcp: short-circuit on task.cancelled() before reading task.result(); CancelledError is a BaseException on 3.11+ and would otherwise escape the `except Exception` block (cubic, copilot). - background_task.stop_sync: use self.task.get_loop() instead of spinning up a fresh asyncio.run() loop, which would fail with "future attached to a different event loop" the moment we awaited the existing task (copilot).
There was a problem hiding this comment.
1 issue found across 2 files (changes from recent commits).
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="marimo/_utils/background_task.py">
<violation number="1" location="marimo/_utils/background_task.py:108">
P2: `stop_sync()` returns early when the task loop is running, skipping the actual stop/cancel path and ignoring `timeout`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
- cancel_and_wait: retrieve exception on already-done tasks so the loop doesn't log "Task exception was never retrieved" warnings; the exception remains queryable via task.exception() (copilot). - stop_sync: honor `timeout` when the task lives on another thread's loop by dispatching the stop coroutine via run_coroutine_threadsafe and blocking on future.result(timeout). Falls back to cooperative shutdown when called from inside the task's own loop thread, where blocking would deadlock (cubic). - Rename test_cancel_and_wait_propagates_real_error to reflect what it actually asserts (copilot). - Add a regression test for stop_sync's cross-thread path.
- fire_and_forget: when falling back to asyncio.run() in sync contexts, call initialize_asyncio() first so the loop policy matches what the rest of marimo expects (Windows selector loop for add_reader). The initializer is moved to _utils/asyncio_utils.py (its natural home); _server/utils.py re-exports it for existing callers (copilot). - Update supervised_task docstring + log message to match its actual behavior: every non-cancellation exception is logged, including for awaited tasks. Recommend plain create_task when the caller awaits and handles errors itself (copilot).
| task = asyncio.create_task(background_connect_mcp_servers()) | ||
| background_tasks.add(task) # Keep a reference to prevent GC | ||
| task.add_done_callback(background_tasks.discard) # Clean up when done | ||
| task = supervised_task( |
There was a problem hiding this comment.
supervised_task will log any exception from background_connect_mcp_servers() via its done-callback, and the caller below also has except Exception: LOGGER.error(...) — connect failures will be logged twice. Either drop the supervisor's logging here (on_exception=lambda _: None) or use plain asyncio.create_task since this caller handles its own errors.
- lifespans.mcp: collapse cleanup branching by calling cancel_and_wait unconditionally; split connect-result vs disconnect-time error handling; opt out of supervisor logging on the awaited mcp.connect task. - asyncio_utils: surface the "don't supervise awaited tasks" rule in the supervised_task docstring with an explicit on_exception escape hatch. - test_asyncio_utils: drop tautological `or task.done()` assertion.
|
🚀 Development release published. You may be able to view the changes at https://marimo.app?v=0.23.7-dev33 |
and cancel_and_wait helpers (strong-ref task tracking, exception logging,
shutdown dance).
shutdown propagation through the ASGI lifespan chain.
fire_and_forget at its three callers.
get_running_loop() (or asyncio.to_thread in tool_manager).
cancel-and-await patterns to the new helpers.