Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 163 additions & 47 deletions src/mcp/client/stdio/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import logging
import os
import subprocess
import sys
from contextlib import asynccontextmanager
from pathlib import Path
Expand Down Expand Up @@ -48,6 +49,47 @@
PROCESS_TERMINATION_TIMEOUT = 2.0


def _is_jupyter_notebook() -> bool:
"""
Detect if running in a Jupyter notebook or IPython environment.

Returns:
bool: True if running in Jupyter/IPython, False otherwise
"""
try:
from IPython import get_ipython # type: ignore[import-not-found]

ipython = get_ipython() # type: ignore[no-untyped-call]
return ipython is not None and ipython.__class__.__name__ in ("ZMQInteractiveShell", "TerminalInteractiveShell") # type: ignore[union-attr]
except ImportError:
return False


def _print_stderr(line: str, errlog: TextIO) -> None:
"""
Print stderr output, using IPython's display system if in Jupyter notebook.

Args:
line: The line to print
errlog: The fallback TextIO stream (used when not in Jupyter)
"""
if _is_jupyter_notebook():
try:
from IPython.display import HTML, display # type: ignore[import-not-found]

# Use IPython's display system with red color for stderr
# This ensures proper rendering in Jupyter notebooks
display(HTML(f'<pre style="color: red;">{line}</pre>')) # type: ignore[no-untyped-call]
except Exception:
# If IPython display fails, fall back to regular print
# Log the error but continue (non-critical)
logger.debug("Failed to use IPython display for stderr, falling back to print", exc_info=True)
print(line, file=errlog)
else:
# Not in Jupyter, use standard stderr redirection
print(line, file=errlog)


def get_default_environment() -> dict[str, str]:
"""
Returns a default environment object including only environment variables deemed
Expand Down Expand Up @@ -102,11 +144,121 @@ class StdioServerParameters(BaseModel):
"""


async def _stdout_reader(
process: Process | FallbackProcess,
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception],
encoding: str,
encoding_error_handler: str,
):
"""Read stdout from the process and parse JSONRPC messages."""
assert process.stdout, "Opened process is missing stdout"

try:
async with read_stream_writer:
buffer = ""
async for chunk in TextReceiveStream(
process.stdout,
encoding=encoding,
errors=encoding_error_handler,
):
lines = (buffer + chunk).split("\n")
buffer = lines.pop()

for line in lines:
try:
message = types.JSONRPCMessage.model_validate_json(line)
except Exception as exc: # pragma: no cover
logger.exception("Failed to parse JSONRPC message from server")
await read_stream_writer.send(exc)
continue

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()


async def _stdin_writer(
process: Process | FallbackProcess,
write_stream_reader: MemoryObjectReceiveStream[SessionMessage],
encoding: str,
encoding_error_handler: str,
):
"""Write session messages to the process stdin."""
assert process.stdin, "Opened process is missing stdin"

try:
async with write_stream_reader:
async for session_message in write_stream_reader:
json = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
await process.stdin.send(
(json + "\n").encode(
encoding=encoding,
errors=encoding_error_handler,
)
)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()


async def _stderr_reader(
process: Process | FallbackProcess,
errlog: TextIO,
encoding: str,
encoding_error_handler: str,
):
"""Read stderr from the process and display it appropriately."""
if not process.stderr:
return

try:
buffer = ""
async for chunk in TextReceiveStream(
process.stderr,
encoding=encoding,
errors=encoding_error_handler,
):
lines = (buffer + chunk).split("\n")
buffer = lines.pop()

for line in lines:
if line.strip(): # Only print non-empty lines
try:
_print_stderr(line, errlog)
except Exception:
# Log errors but continue (non-critical)
logger.debug("Failed to print stderr line", exc_info=True)

# Print any remaining buffer content
if buffer.strip():
try:
_print_stderr(buffer, errlog)
except Exception:
logger.debug("Failed to print final stderr buffer", exc_info=True)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()
except Exception:
# Log errors but continue (non-critical)
logger.debug("Error reading stderr", exc_info=True)


@asynccontextmanager
async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr):
"""
Client transport for stdio: this will connect to a server by spawning a
process and communicating with it over stdin/stdout.

This function automatically handles stderr output in a way that is compatible
with Jupyter notebook environments. When running in Jupyter, stderr output
is displayed using IPython's display system with red color formatting.
When not in Jupyter, stderr is redirected to the provided errlog stream
(defaults to sys.stderr).

Args:
server: Parameters for the server process to spawn
errlog: TextIO stream for stderr output when not in Jupyter (defaults to sys.stderr).
This parameter is kept for backward compatibility but may be ignored
when running in Jupyter notebook environments.
"""
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]
Expand Down Expand Up @@ -136,55 +288,14 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder
await write_stream_reader.aclose()
raise

async def stdout_reader():
assert process.stdout, "Opened process is missing stdout"

try:
async with read_stream_writer:
buffer = ""
async for chunk in TextReceiveStream(
process.stdout,
encoding=server.encoding,
errors=server.encoding_error_handler,
):
lines = (buffer + chunk).split("\n")
buffer = lines.pop()

for line in lines:
try:
message = types.JSONRPCMessage.model_validate_json(line)
except Exception as exc: # pragma: no cover
logger.exception("Failed to parse JSONRPC message from server")
await read_stream_writer.send(exc)
continue

session_message = SessionMessage(message)
await read_stream_writer.send(session_message)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()

async def stdin_writer():
assert process.stdin, "Opened process is missing stdin"

try:
async with write_stream_reader:
async for session_message in write_stream_reader:
json = session_message.message.model_dump_json(by_alias=True, exclude_none=True)
await process.stdin.send(
(json + "\n").encode(
encoding=server.encoding,
errors=server.encoding_error_handler,
)
)
except anyio.ClosedResourceError: # pragma: no cover
await anyio.lowlevel.checkpoint()

async with (
anyio.create_task_group() as tg,
process,
):
tg.start_soon(stdout_reader)
tg.start_soon(stdin_writer)
tg.start_soon(_stdout_reader, process, read_stream_writer, server.encoding, server.encoding_error_handler)
tg.start_soon(_stdin_writer, process, write_stream_reader, server.encoding, server.encoding_error_handler)
if process.stderr:
tg.start_soon(_stderr_reader, process, errlog, server.encoding, server.encoding_error_handler)
try:
yield read_stream, write_stream
finally:
Expand Down Expand Up @@ -244,14 +355,19 @@ async def _create_platform_compatible_process(

Unix: Creates process in a new session/process group for killpg support
Windows: Creates process in a Job Object for reliable child termination

Note: stderr is piped (not redirected) to allow async reading for Jupyter
notebook compatibility. The errlog parameter is kept for backward compatibility
but is only used when not in Jupyter environments.
"""
if sys.platform == "win32": # pragma: no cover
process = await create_windows_process(command, args, env, errlog, cwd)
process = await create_windows_process(command, args, env, errlog, cwd, pipe_stderr=True)
else:
# Pipe stderr instead of redirecting to allow async reading
process = await anyio.open_process(
[command, *args],
env=env,
stderr=errlog,
stderr=subprocess.PIPE,
cwd=cwd,
start_new_session=True,
) # pragma: no cover
Expand Down
41 changes: 31 additions & 10 deletions src/mcp/os/win32/utilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class FallbackProcess:
A fallback process wrapper for Windows to handle async I/O
when using subprocess.Popen, which provides sync-only FileIO objects.

This wraps stdin and stdout into async-compatible
This wraps stdin, stdout, and stderr into async-compatible
streams (FileReadStream, FileWriteStream),
so that MCP clients expecting async streams can work properly.
"""
Expand All @@ -79,10 +79,12 @@ def __init__(self, popen_obj: subprocess.Popen[bytes]):
self.popen: subprocess.Popen[bytes] = popen_obj
self.stdin_raw = popen_obj.stdin # type: ignore[assignment]
self.stdout_raw = popen_obj.stdout # type: ignore[assignment]
self.stderr = popen_obj.stderr # type: ignore[assignment]
self.stderr_raw = popen_obj.stderr # type: ignore[assignment]

self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None
self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None
# Wrap stderr in async stream if it's piped (for Jupyter compatibility)
self.stderr = FileReadStream(cast(BinaryIO, self.stderr_raw)) if self.stderr_raw else None

async def __aenter__(self):
"""Support async context manager entry."""
Expand All @@ -103,12 +105,14 @@ async def __aexit__(
await self.stdin.aclose()
if self.stdout:
await self.stdout.aclose()
if self.stderr:
await self.stderr.aclose()
if self.stdin_raw:
self.stdin_raw.close()
if self.stdout_raw:
self.stdout_raw.close()
if self.stderr:
self.stderr.close()
if self.stderr_raw:
self.stderr_raw.close()

async def wait(self):
"""Async wait for process completion."""
Expand Down Expand Up @@ -139,6 +143,7 @@ async def create_windows_process(
env: dict[str, str] | None = None,
errlog: TextIO | None = sys.stderr,
cwd: Path | str | None = None,
pipe_stderr: bool = False,
) -> Process | FallbackProcess:
"""
Creates a subprocess in a Windows-compatible way with Job Object support.
Expand All @@ -155,15 +160,20 @@ async def create_windows_process(
command (str): The executable to run
args (list[str]): List of command line arguments
env (dict[str, str] | None): Environment variables
errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr)
errlog (TextIO | None): Where to send stderr output (defaults to sys.stderr).
Only used when pipe_stderr is False.
cwd (Path | str | None): Working directory for the subprocess
pipe_stderr (bool): If True, pipe stderr instead of redirecting to errlog.
This allows async reading of stderr for Jupyter compatibility.

Returns:
Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams
"""
job = _create_job_object()
process = None

stderr_target = subprocess.PIPE if pipe_stderr else errlog

try:
# First try using anyio with Windows-specific flags to hide console window
process = await anyio.open_process(
Expand All @@ -173,18 +183,18 @@ async def create_windows_process(
creationflags=subprocess.CREATE_NO_WINDOW # type: ignore
if hasattr(subprocess, "CREATE_NO_WINDOW")
else 0,
stderr=errlog,
stderr=stderr_target,
cwd=cwd,
)
except NotImplementedError:
# If Windows doesn't support async subprocess creation, use fallback
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
process = await _create_windows_fallback_process(command, args, env, errlog, cwd, pipe_stderr=pipe_stderr)
except Exception:
# Try again without creation flags
process = await anyio.open_process(
[command, *args],
env=env,
stderr=errlog,
stderr=stderr_target,
cwd=cwd,
)

Expand All @@ -198,19 +208,30 @@ async def _create_windows_fallback_process(
env: dict[str, str] | None = None,
errlog: TextIO | None = sys.stderr,
cwd: Path | str | None = None,
pipe_stderr: bool = False,
) -> FallbackProcess:
"""
Create a subprocess using subprocess.Popen as a fallback when anyio fails.

This function wraps the sync subprocess.Popen in an async-compatible interface.

Args:
command: The executable to run
args: List of command line arguments
env: Environment variables
errlog: Where to send stderr output (only used when pipe_stderr is False)
cwd: Working directory for the subprocess
pipe_stderr: If True, pipe stderr instead of redirecting to errlog
"""
stderr_target = subprocess.PIPE if pipe_stderr else errlog

try:
# Try launching with creationflags to avoid opening a new console window
popen_obj = subprocess.Popen(
[command, *args],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=errlog,
stderr=stderr_target,
env=env,
cwd=cwd,
bufsize=0, # Unbuffered output
Expand All @@ -222,7 +243,7 @@ async def _create_windows_fallback_process(
[command, *args],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=errlog,
stderr=stderr_target,
env=env,
cwd=cwd,
bufsize=0,
Expand Down
Loading
Loading