fix: thread-safe redirection of console outputs in run mode#8353
fix: thread-safe redirection of console outputs in run mode#8353
Conversation
In run mode, multiple session threads share the same process, and our manipulations of file descriptors were not designed to be thread-safe. This was breaking the marimo process. This change adds a proxy in run mode that is installed once as sys.stdout/sys.stderr that dispatches writes to per-thread streams. During cell execution, redirect_streams() auto-detects the proxy (via isinstance check) and registers/clears thread-local streams instead of replacing sys.stdout globally or using fd-level redirection. NB: With this change, in run mode, output written directly to the standard input and error file descriptors will no longer be redirected to the browser. Key changes: - Add _ThreadLocalStreamProxy and helpers in streams.py - Make ThreadSafeStdout/Stderr Watcher optional (skip in run mode) - Make capture.py proxy-aware so capture_stdout/stderr work per-thread - Auto-detect proxy in redirect_streams instead of threading a use_fd_redirect flag through Kernel - Install proxies in KernelManagerImpl for run mode sessions
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
|
@wlach I think this fixes the bug you reported (thanks so much for finding the root cause). |
for more information, see https://pre-commit.ci
There was a problem hiding this comment.
Pull request overview
This PR makes console output redirection in run mode thread-safe by replacing process-global fd/stdout swapping with a single installed sys.stdout/sys.stderr proxy that dispatches writes to per-thread streams. This addresses deadlocks and cross-session output leakage when multiple run-mode sessions share one process (issue #8306).
Changes:
- Add
ThreadLocalStreamProxyand helpers to install proxies once and register per-thread streams during execution. - Update runtime stream redirection/capture to use thread-local stream registration in run mode instead of global
sys.stdoutreplacement /os.dup2fd redirection. - Make fd-level forwarding optional in
ThreadSafeStdout/ThreadSafeStderrand add tests for the new thread-local proxy behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
marimo/_messaging/thread_local_streams.py |
Introduces thread-local stdout/stderr proxy + install/set/clear helpers for run mode. |
marimo/_session/managers/kernel.py |
Installs thread-local proxies once in run mode when console-to-browser redirection is enabled. |
marimo/_runtime/runtime.py |
Disables fd redirection in thread-based run mode; keeps it for subprocess-based modes; clears thread-local streams on exit. |
marimo/_runtime/redirect_streams.py |
Uses thread-local registration when proxies are installed; keeps existing behavior for edit/subprocess mode. |
marimo/_runtime/capture.py |
Updates capture/redirect helpers to be aware of thread-local proxy mode. |
marimo/_messaging/streams.py |
Adds a no-op watcher so stdout/stderr wrappers can be used without fd redirection. |
tests/_messaging/test_thread_local_proxy.py |
Adds unit/integration-style tests for thread-local proxy behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| class ThreadLocalStreamProxy(io.TextIOBase): | ||
| """A proxy that dispatches writes to a per-thread stream. | ||
|
|
||
| When a thread has registered a stream via set_thread_local_streams(), | ||
| writes go there; otherwise they fall through to the original stream | ||
| (real sys.stdout / sys.stderr). | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, original: Union[io.TextIOBase, TextIO], name: str | ||
| ) -> None: | ||
| self._original = original | ||
| self._local = threading.local() | ||
| self._name = name | ||
|
|
||
| # -- per-thread registration ------------------------------------------- | ||
|
|
||
| def _set_stream(self, stream: io.TextIOBase) -> None: | ||
| self._local.stream = stream | ||
|
|
||
| def _clear_stream(self) -> None: | ||
| self._local.stream = None | ||
|
|
||
| def _get_stream(self) -> Union[io.TextIOBase, TextIO]: | ||
| stream = getattr(self._local, "stream", None) | ||
| return stream if stream is not None else self._original | ||
|
|
||
| # -- TextIOBase interface ---------------------------------------------- | ||
|
|
||
| @property | ||
| def name(self) -> str: | ||
| return self._name | ||
|
|
||
| @property | ||
| def encoding(self) -> str: # type: ignore[override] | ||
| return getattr(self._get_stream(), "encoding", "utf-8") | ||
|
|
There was a problem hiding this comment.
ThreadLocalStreamProxy doesn’t expose a .buffer attribute. Some parts of the codebase write bytes to sys.stdout.buffer (e.g., package installation logging), which will raise AttributeError when proxies are installed in run mode. Consider adding a buffer property (likely delegating to the original stream’s buffer) and/or a safe attribute delegation mechanism so sys.stdout.buffer.write(...) continues to work without crashing.
| def install_thread_local_proxies() -> None: | ||
| """Install thread-local proxies as sys.stdout / sys.stderr (idempotent). | ||
|
|
||
| Called once from the main server thread before kernel threads are spawned. | ||
| """ | ||
| global _proxies_installed | ||
| with _install_lock: | ||
| if _proxies_installed: | ||
| return | ||
| sys.stdout = ThreadLocalStreamProxy(sys.stdout, "<stdout>") # type: ignore[assignment] | ||
| sys.stderr = ThreadLocalStreamProxy(sys.stderr, "<stderr>") # type: ignore[assignment] | ||
| _proxies_installed = True | ||
|
|
||
|
|
||
| def uninstall_thread_local_proxies() -> None: | ||
| """Remove thread-local proxies, restoring the original streams.""" | ||
| global _proxies_installed | ||
| with _install_lock: | ||
| if not _proxies_installed: | ||
| return | ||
| stdout = sys.stdout | ||
| stderr = sys.stderr | ||
| if isinstance(stdout, ThreadLocalStreamProxy): | ||
| sys.stdout = stdout._original # type: ignore[assignment] | ||
| if isinstance(stderr, ThreadLocalStreamProxy): | ||
| sys.stderr = stderr._original # type: ignore[assignment] | ||
| _proxies_installed = False |
There was a problem hiding this comment.
uninstall_thread_local_proxies() only restores originals if sys.stdout/sys.stderr are still instances of ThreadLocalStreamProxy. If another piece of code replaces sys.stdout while proxies are installed, uninstall will silently fail to restore the real originals despite the docstring claiming it does. Consider storing the original stdout/stderr in module-level variables at install time and restoring from those during uninstall.
marimo/_runtime/capture.py
Outdated
| proxy = sys.stdout | ||
| if _is_proxy(proxy): | ||
| stream = proxy._get_stream() # type: ignore[union-attr] | ||
| old_write = stream.write | ||
| stream.write = _redirect # type: ignore[assignment] | ||
| try: | ||
| yield | ||
| finally: | ||
| stream.write = old_write # type: ignore[assignment] |
There was a problem hiding this comment.
In the ThreadLocalStreamProxy branch, this patches stream.write on whatever _get_stream() returns. If no thread-local stream is registered, _get_stream() falls back to the original process-wide stdout, so this context manager would globally monkeypatch the real stdout for all threads (and may fail if the underlying stream doesn’t allow attribute assignment). Consider ensuring a per-thread stream is set before patching, or patching via a thread-local wrapper instead of mutating the underlying stream object.
marimo/_runtime/capture.py
Outdated
| proxy = sys.stderr | ||
| if _is_proxy(proxy): | ||
| stream = proxy._get_stream() # type: ignore[union-attr] | ||
| old_write = stream.write | ||
| stream.write = _redirect # type: ignore[assignment] | ||
| try: | ||
| yield | ||
| finally: | ||
| stream.write = old_write # type: ignore[assignment] |
There was a problem hiding this comment.
Same issue as redirect_stdout(): in proxy mode, _get_stream() can return the original process-wide stderr if no thread-local stream is registered, so this would patch the global stderr across threads (and may not be assignable on some stream implementations). Consider guarding against the fallback-to-original case or using a thread-local wrapper instead of mutating the underlying stream object.
marimo/_runtime/redirect_streams.py
Outdated
| stream.cell_id = cell_id_old | ||
| else: | ||
| # In edit mode, we have one process per notebook, so we can safely | ||
| # replace sys.stdout/sys.stder and redirect OS streams |
There was a problem hiding this comment.
Typo in comment: sys.stder should be sys.stderr.
| # replace sys.stdout/sys.stder and redirect OS streams | |
| # replace sys.stdout/sys.stderr and redirect OS streams |
| proxy = sys.stdout | ||
| if _is_proxy(proxy): | ||
| # Temporarily swap the thread-local stream to a StringIO so that | ||
| # writes from this thread are captured while other threads are | ||
| # unaffected. | ||
| buffer = io.StringIO() | ||
| old = proxy._get_stream() # type: ignore[union-attr] | ||
| proxy._set_stream(buffer) # type: ignore[union-attr] | ||
| try: | ||
| yield buffer | ||
| finally: | ||
| proxy._set_stream(old) # type: ignore[union-attr] | ||
| else: | ||
| with contextlib.redirect_stdout(io.StringIO()) as buffer: | ||
| yield buffer |
There was a problem hiding this comment.
The new ThreadLocalStreamProxy-specific branches in capture_stdout/capture_stderr and redirect_stdout/redirect_stderr don’t appear to be exercised by existing tests (current capture tests run with a mocked kernel in EDIT mode). Adding a test that installs the thread-local proxies and sets per-thread streams would help prevent regressions in run-mode behavior.
- Add .buffer property to ThreadLocalStreamProxy so sys.stdout.buffer continues to work (e.g. package installation logging) - Store original stdout/stderr at module level during install so uninstall reliably restores them even if sys.stdout was replaced - Fix redirect_stdout/redirect_stderr in proxy mode to use thread-local stream swap instead of monkeypatching the underlying stream's write method, which could mutate the global original stream - Fix typo sys.stder -> sys.stderr in comment - Add tests for proxy-specific branches in capture/redirect helpers
@akshayka neat! did you try it with my reproduction repo? I tried making a fix using similar premises as yours but it didn't quite work as I expected (it seems like I couldn't make new connections while an existing cell was running in another thread). Anyway inside here there's a good torture test which reproduces the original bug: https://github.com/wlach/marimo-server-deadlock-reproduction |
@wlach sorry I missed this earlier. I tried just now and can confirm that: (1) Without the fix, after enough refreshes the server eventually refuses to start new sessions (kernel not found) Are you okay with the following tradeoff that this change introduces? In run mode
|
Yup! I don't think there's any other reasonable way of doing it that avoids deadlocks. I think actually I would prefer to turn this feature off altogether for our purposes: the main thing we want is to be able to get tracebacks in running code as described in #7984 I actually have a prototype for that in https://github.com/wlach/marimo/tree/marimo-error-run-mode-modal -- if you're aligned with the solution I propose I can file a PR. |
@wlach I tried it out and I'm generally aligned, especially if this makes your experience better. Please open a PR, we will be happy to review it. Thanks! |
…eam#8353) In run mode, multiple session threads share the same process, and our manipulations of file descriptors were not designed to be thread-safe. When `--redirect-console-to-browser` was supplied to `marimo run`, this was breaking the marimo process, noted in marimo-team#8306. This change adds a proxy in run mode that is installed once as `sys.stdout`/`sys.stderr` that dispatches writes to per-thread streams. During cell execution, we register thread-local streams instead of replacing `sys.stdout` globally or using file descriptor-level redirection. This change also fixes a bug/ in which each client would receive the console outputs belonging to all other clients. NB: With this change, in run mode, output written directly to the standard output and standard error file descriptors (for example, by a C-extension) will no longer be redirected to the browser. This tradeoff should be acceptable, since at least we won'thang the marimo process. --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
In run mode, multiple session threads share the same process,
and our manipulations of file descriptors were not designed to be
thread-safe. When
--redirect-console-to-browserwas supplied tomarimo run, this was breakingthe marimo process, noted in #8306.
This change adds a proxy in run mode that is installed once as
sys.stdout/sys.stderrthat dispatches writes to per-thread streams.During cell execution, we register thread-local streams instead of
replacing
sys.stdoutglobally or using file descriptor-level redirection.This change also fixes a bug/ in which each client would receive the
console outputs belonging to all other clients.
NB: With this change, in run mode, output written directly to the
standard output and standard error file descriptors (for example, by a C-extension)
will no longer be redirected to the browser. This tradeoff should be acceptable,
since at least we won'thang the marimo process.