Skip to content

chore(asyncio): hardening pass — utilities, deprecated API cleanup, lifespan fix#9552

Merged
mscolnick merged 6 commits into
mainfrom
ms/async-fixes
May 18, 2026
Merged

chore(asyncio): hardening pass — utilities, deprecated API cleanup, lifespan fix#9552
mscolnick merged 6 commits into
mainfrom
ms/async-fixes

Conversation

@mscolnick

Copy link
Copy Markdown
Contributor
  • Add marimo/_utils/asyncio_utils.py with supervised_task, fire_and_forget,
    and cancel_and_wait helpers (strong-ref task tracking, exception logging,
    shutdown dance).
  • Fix Lifespans._manager swallowing CancelledError, which was breaking
    shutdown propagation through the ASGI lifespan chain.
  • Replace the type-confused run_async() helper in session_manager with
    fire_and_forget at its three callers.
  • Replace 11 deprecated asyncio.get_event_loop() sites with
    get_running_loop() (or asyncio.to_thread in tool_manager).
  • Migrate api/lifespans.py and execution.py hand-rolled set+callback /
    cancel-and-await patterns to the new helpers.
  • ConnectionDistributor now pins the loop at start() and reuses it on stop().
…ifespan fix

- Add marimo/_utils/asyncio_utils.py with supervised_task, fire_and_forget,
  and cancel_and_wait helpers (strong-ref task tracking, exception logging,
  shutdown dance).
- Fix Lifespans._manager swallowing CancelledError, which was breaking
  shutdown propagation through the ASGI lifespan chain.
- Replace the type-confused run_async() helper in session_manager with
  fire_and_forget at its three callers.
- Replace 11 deprecated asyncio.get_event_loop() sites with
  get_running_loop() (or asyncio.to_thread in tool_manager).
- Migrate api/lifespans.py and execution.py hand-rolled set+callback /
  cancel-and-await patterns to the new helpers.
- ConnectionDistributor now pins the loop at start() and reuses it on stop().
Copilot AI review requested due to automatic review settings May 14, 2026 17:19
@vercel

vercel Bot commented May 14, 2026

Copy link
Copy Markdown

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
marimo-docs Ready Ready Preview, Comment May 16, 2026 9:47pm

Request Review

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 18 files

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="marimo/_server/api/lifespans.py">

<violation number="1" location="marimo/_server/api/lifespans.py:133">
P2: Handle cancelled MCP task before calling `task.result()`; otherwise `CancelledError` can escape this cleanup block and interrupt lifespan shutdown.</violation>
</file>
Architecture diagram
sequenceDiagram
    participant Client as HTTP/WS Client
    participant Server as ASGI Server (Uvicorn)
    participant LS as Lifespans Manager
    participant LL as Lifespan Context (LSP/MCP)
    participant SM as Session Manager
    participant EB as Event Bus
    participant CD as Connection Distributor
    participant AM as Async Middleware
    participant FW as File Watcher
    participant TM as Tool Manager
    participant EX as Exporter
    participant EU as Asyncio Utils
    participant EH as Extensions Heartbeat
    participant IH as Interrupt Handler
    participant WS as WebSocket Endpoint

    Note over EU: NEW: asyncio_utils.py<br/>supervised_task, fire_and_forget,<br/>cancel_and_wait helpers

    Note over LS,LL: LIFESPAN FIX: CancelledError propagates<br/>through chain for shutdown propagation

    Client->>Server: HTTP/WS Request

    alt ASGI Lifespan Startup
        Server->>LS: __call__(app)
        LS->>LL: enter lifespan contexts
        LL->>EU: supervised_task(lsp.start, registry=background_tasks)
        LL->>EU: supervised_task(mcp.connect, registry=background_tasks)
        EU->>EU: asyncio.create_task(name=name)<br/>add_done_callback(log exception)<br/>registry.add(task)
        LL-->>LS: yield
        LS-->>Server: lifespan started
    end

    Server->>SM: create_session()
    SM->>EU: fire_and_forget(session.created)
    alt No running loop
        EU->>EU: asyncio.run(coro)
    else Running loop
        EU->>EU: supervised_task(name="session.created",<br/>registry=_BACKGROUND_TASKS)
    end
    SM-->>Server: session

    Server->>SM: close_session()
    SM->>EU: fire_and_forget(session.closed)
    SM-->>Server: closed

    Server->>AM: forward request
    AM->>AM: get_running_loop().run_in_executor()

    Server->>CD: start()
    CD->>CD: _loop = get_running_loop()<br/>_loop.add_reader(fileno, on_change)
    CD-->>Server: Disposable

    Server->>CD: stop()
    CD->>CD: _loop.remove_reader(fileno)<br/>close input_connection
    CD-->>Server: stopped

    Server->>WS: WebSocket connect
    WS->>WS: get_running_loop().call_later(ttl, close)
    WS-->>Server: connected

    Server->>EX: export file
    EX->>EX: get_running_loop().run_in_executor(write_file)
    EX-->>Server: exported

    Server->>TM: call handler
    TM->>TM: asyncio.to_thread(handler, arguments)
    TM-->>Server: result

    Server->>EH: start heartbeat
    EH->>EH: get_running_loop().create_task(heartbeat)
    EH-->>Server: heartbeat started

    Server->>IH: interrupt handler
    IH->>IH: get_running_loop()<br/>signal.signal(signal.SIGINT)
    IH-->>Server: handler registered

    alt ASGI Lifespan Shutdown
        Server->>LS: trigger cancellation
        LS->>LL: CancelledError propagates
        LL->>EU: cancel_and_wait(lsp.task)
        EU->>EU: task.cancel()<br/>suppress CancelledError
        LL->>EU: cancel_and_wait(mcp.task)
        EU->>EU: task.cancel()<br/>suppress CancelledError
        alt MCP task completed
            LL->>LL: task.result() -> mcp_client<br/>mcp_client.disconnect()
        end
        LL->>LS: exit_stack cleanup
        LS-->>Server: lifespan ended
    end

    Server->>FW: create(path, callback)
    FW->>FW: loop = get_running_loop()<br/>create watchdog or polling watcher with loop
    FW-->>Server: FileWatcher instance

    alt Error: Unhandled Exception in Task
        EU->>EU: _log_task_exception()
        EU->>EU: LOGGER.error("Unhandled exception...", exc_info=exc)
    end
Loading

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread marimo/_server/api/lifespans.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR hardens asyncio usage across the server/runtime by introducing centralized task utilities, removing deprecated asyncio.get_event_loop() usage, and fixing ASGI lifespan cancellation propagation to improve graceful shutdown behavior.

Changes:

  • Added marimo/_utils/asyncio_utils.py providing supervised_task, fire_and_forget, and cancel_and_wait, with accompanying tests.
  • Fixed Lifespans._manager to stop swallowing CancelledError, ensuring ASGI lifespan shutdown cancellation propagates correctly.
  • Migrated multiple call sites to asyncio.get_running_loop() / asyncio.to_thread() and refactored ad-hoc task tracking/cancel patterns to the new helpers; ConnectionDistributor now pins the loop used for add_reader/remove_reader.

Reviewed changes

Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
tests/_utils/test_lifespans.py Updates cancellation test to assert CancelledError propagates while teardown still runs.
tests/_utils/test_distributor.py Updates loop mocking to patch asyncio.get_running_loop.
tests/_utils/test_asyncio_utils.py Adds unit tests for the new asyncio utility helpers.
tests/_server/api/endpoints/test_ws.py Updates websocket endpoint tests to patch get_running_loop.
marimo/_utils/lifespans.py Removes CancelledError swallowing from lifespan manager to restore ASGI shutdown semantics.
marimo/_utils/file_watcher.py Captures the running loop at watcher creation for correct cross-thread scheduling.
marimo/_utils/distributor.py Captures event loop at start() and reuses it on stop() for loop-bound reader APIs.
marimo/_utils/background_task.py Refactors stop_sync() loop detection/shutdown behavior.
marimo/_utils/asyncio_utils.py Introduces centralized helpers for supervised tasks, fire-and-forget execution, and cancellation/await patterns.
marimo/_session/extensions/extensions.py Uses get_running_loop() to schedule heartbeat only when a loop is running.
marimo/_server/session_manager.py Replaces run_async() with fire_and_forget() at session event emission call sites and removes the helper.
marimo/_server/export/exporter.py Uses get_running_loop() for executor-based file writes in async context.
marimo/_server/api/middleware.py Uses get_running_loop() to collect request bodies in async context.
marimo/_server/api/lifespans.py Migrates background task creation/cancellation to supervised_task + cancel_and_wait.
marimo/_server/api/interrupt.py Uses get_running_loop() for signal handler loop affinity.
marimo/_server/api/endpoints/ws_endpoint.py Uses get_running_loop().call_later() for session TTL close scheduling.
marimo/_server/api/endpoints/execution.py Replaces manual cancel/suppress/await with cancel_and_wait() for disconnect watcher task cleanup.
marimo/_server/ai/tools/tool_manager.py Replaces run_in_executor usage with asyncio.to_thread for sync handler execution.
Comment thread marimo/_server/api/lifespans.py Outdated
Comment thread marimo/_utils/background_task.py Outdated
@mscolnick mscolnick added the enhancement New feature or request label May 14, 2026
…down

- lifespans.mcp: short-circuit on task.cancelled() before reading
  task.result(); CancelledError is a BaseException on 3.11+ and would
  otherwise escape the `except Exception` block (cubic, copilot).
- background_task.stop_sync: use self.task.get_loop() instead of
  spinning up a fresh asyncio.run() loop, which would fail with
  "future attached to a different event loop" the moment we awaited
  the existing task (copilot).

@cubic-dev-ai cubic-dev-ai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 issue found across 2 files (changes from recent commits).

Prompt for AI agents (unresolved issues)

Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.


<file name="marimo/_utils/background_task.py">

<violation number="1" location="marimo/_utils/background_task.py:108">
P2: `stop_sync()` returns early when the task loop is running, skipping the actual stop/cancel path and ignoring `timeout`.</violation>
</file>

Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.

Comment thread marimo/_utils/background_task.py Outdated

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 18 out of 18 changed files in this pull request and generated 2 comments.

Comment thread marimo/_utils/asyncio_utils.py Outdated
Comment thread tests/_utils/test_asyncio_utils.py Outdated
- cancel_and_wait: retrieve exception on already-done tasks so the
  loop doesn't log "Task exception was never retrieved" warnings; the
  exception remains queryable via task.exception() (copilot).
- stop_sync: honor `timeout` when the task lives on another thread's
  loop by dispatching the stop coroutine via run_coroutine_threadsafe
  and blocking on future.result(timeout). Falls back to cooperative
  shutdown when called from inside the task's own loop thread, where
  blocking would deadlock (cubic).
- Rename test_cancel_and_wait_propagates_real_error to reflect what
  it actually asserts (copilot).
- Add a regression test for stop_sync's cross-thread path.

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.

Comment thread marimo/_utils/asyncio_utils.py
Comment thread marimo/_utils/asyncio_utils.py
- fire_and_forget: when falling back to asyncio.run() in sync contexts,
  call initialize_asyncio() first so the loop policy matches what the
  rest of marimo expects (Windows selector loop for add_reader). The
  initializer is moved to _utils/asyncio_utils.py (its natural home);
  _server/utils.py re-exports it for existing callers (copilot).
- Update supervised_task docstring + log message to match its actual
  behavior: every non-cancellation exception is logged, including for
  awaited tasks. Recommend plain create_task when the caller awaits
  and handles errors itself (copilot).
task = asyncio.create_task(background_connect_mcp_servers())
background_tasks.add(task) # Keep a reference to prevent GC
task.add_done_callback(background_tasks.discard) # Clean up when done
task = supervised_task(

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

supervised_task will log any exception from background_connect_mcp_servers() via its done-callback, and the caller below also has except Exception: LOGGER.error(...) — connect failures will be logged twice. Either drop the supervisor's logging here (on_exception=lambda _: None) or use plain asyncio.create_task since this caller handles its own errors.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Comment thread marimo/_server/api/lifespans.py Outdated
Comment thread marimo/_server/api/lifespans.py Outdated
Comment thread marimo/_utils/asyncio_utils.py
Comment thread tests/_utils/test_asyncio_utils.py Outdated
- lifespans.mcp: collapse cleanup branching by calling cancel_and_wait
  unconditionally; split connect-result vs disconnect-time error handling;
  opt out of supervisor logging on the awaited mcp.connect task.
- asyncio_utils: surface the "don't supervise awaited tasks" rule in the
  supervised_task docstring with an explicit on_exception escape hatch.
- test_asyncio_utils: drop tautological `or task.done()` assertion.
@mscolnick mscolnick merged commit 3d9032a into main May 18, 2026
44 checks passed
@mscolnick mscolnick deleted the ms/async-fixes branch May 18, 2026 02:12
@github-actions

Copy link
Copy Markdown
Contributor

🚀 Development release published. You may be able to view the changes at https://marimo.app?v=0.23.7-dev33

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

3 participants