From 82ea7040912c0db5cb153013290a756ae82ea0f9 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Wed, 21 Jan 2026 20:25:31 +0900 Subject: [PATCH 1/3] feat(zeromq): Allow external zmq.asyncio.Context injection via transport_opts Add support for injecting an external zmq.asyncio.Context through transport_opts["zctx"]. This enables applications to share a single context across multiple Peer instances throughout the application lifecycle, reducing resource overhead. When an external context is provided, it will not be destroyed when the transport is closed - the application is responsible for managing its lifecycle. --- src/callosum/lower/zeromq.py | 8 ++- tests/test_rpc.py | 94 ++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 2 deletions(-) diff --git a/src/callosum/lower/zeromq.py b/src/callosum/lower/zeromq.py index 51a6662..19bfc8f 100644 --- a/src/callosum/lower/zeromq.py +++ b/src/callosum/lower/zeromq.py @@ -483,6 +483,7 @@ class ZeroMQBaseTransport(BaseTransport): __slots__ = BaseTransport.__slots__ + ( "_zctx", + "_external_zctx", "_zsock_opts", "_zap_server", "_zap_task", @@ -502,7 +503,9 @@ def __init__( super().__init__(authenticator, **kwargs) self._zap_server = None self._zap_task = None - self._zctx = zmq.asyncio.Context() + # Support external context injection via transport_opts["zctx"] + self._external_zctx = self.transport_opts.get("zctx") is not None + self._zctx = self.transport_opts.get("zctx") or zmq.asyncio.Context() match self.authenticator: case AbstractServerAuthenticator() as auth: self._zap_server = ZAPServer(self._zctx, auth) @@ -525,7 +528,8 @@ async def close(self) -> None: self._zap_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._zap_task - if self._zctx is not None: + # Do not destroy externally injected context + if self._zctx is not None and not self._external_zctx: self._zctx.destroy(linger=50) diff --git a/tests/test_rpc.py b/tests/test_rpc.py index 7e60012..06b6f0d 100644 --- a/tests/test_rpc.py +++ b/tests/test_rpc.py @@ -295,3 +295,97 @@ async def _do_request(idx: int) -> int: assert e.args[0] == "ZeroDivisionError" else: assert call_results[idx] == idx + + +@pytest.mark.asyncio +async def test_external_context_injection() -> None: + """Test that an externally injected zmq context is used and not destroyed on close.""" + import zmq.asyncio + + # Create an external context + external_zctx = zmq.asyncio.Context() + + async def func(request: RPCMessage) -> str: + return "ok" + + # Create server with external context + server = Peer( + bind=ZeroMQAddress("tcp://127.0.0.1:5021"), + transport=ZeroMQRPCTransport, + transport_opts={"zctx": external_zctx}, + scheduler=ExitOrderedAsyncScheduler(), + serializer=lambda o: json.dumps(o).encode("utf8"), + deserializer=lambda b: json.loads(b), + ) + server.handle_function("func", func) + + # Create client with external context + client = Peer( + connect=ZeroMQAddress("tcp://localhost:5021"), + transport=ZeroMQRPCTransport, + transport_opts={"zctx": external_zctx}, + serializer=lambda o: json.dumps(o).encode("utf8"), + deserializer=lambda b: json.loads(b), + ) + + # Verify the external context is used + server_transport = cast(ZeroMQRPCTransport, server._transport) + client_transport = cast(ZeroMQRPCTransport, client._transport) + assert server_transport._zctx is external_zctx + assert server_transport._external_zctx is True + assert client_transport._zctx is external_zctx + assert client_transport._external_zctx is True + + async with server: + async with client: + result = await client.invoke("func", {}) + assert result == "ok" + + # Verify context is NOT destroyed after transport close + assert not external_zctx.closed + + # Clean up + external_zctx.destroy(linger=0) + + +@pytest.mark.asyncio +async def test_internal_context_destroyed_on_close() -> None: + """Test that internally created zmq context is destroyed on close.""" + + async def func(request: RPCMessage) -> str: + return "ok" + + # Create server without external context (uses internal) + server = Peer( + bind=ZeroMQAddress("tcp://127.0.0.1:5022"), + transport=ZeroMQRPCTransport, + scheduler=ExitOrderedAsyncScheduler(), + serializer=lambda o: json.dumps(o).encode("utf8"), + deserializer=lambda b: json.loads(b), + ) + server.handle_function("func", func) + + # Create client without external context (uses internal) + client = Peer( + connect=ZeroMQAddress("tcp://localhost:5022"), + transport=ZeroMQRPCTransport, + serializer=lambda o: json.dumps(o).encode("utf8"), + deserializer=lambda b: json.loads(b), + ) + + # Verify internal context is created + server_transport = cast(ZeroMQRPCTransport, server._transport) + client_transport = cast(ZeroMQRPCTransport, client._transport) + assert server_transport._external_zctx is False + assert client_transport._external_zctx is False + server_zctx = server_transport._zctx + client_zctx = client_transport._zctx + + async with server: + async with client: + result = await client.invoke("func", {}) + assert result == "ok" + + # Verify context IS destroyed after transport close + assert server_zctx.closed + assert client_zctx.closed From 9ef1366c8d015b82c4e55d2551c52dc0873730d0 Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Wed, 21 Jan 2026 20:27:44 +0900 Subject: [PATCH 2/3] doc: Add CLAUDE.md for development environment guide --- CLAUDE.md | 72 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..182e05f --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,72 @@ +# Claude Code Guide for Callosum + +## Project Overview + +Callosum is an asyncio-based RPC library for Python that supports multiple transport backends (ZeroMQ, Redis, Thrift). + +## Development Environment + +This project uses **uv** as the package manager. + +### Setup + +```bash +uv sync --extra zeromq --extra redis --extra thrift +``` + +### Running Tests + +```bash +# Run all tests +uv run pytest + +# Run specific test file +uv run pytest tests/test_rpc.py -v + +# Run specific test +uv run pytest tests/test_rpc.py::test_external_context_injection -v +``` + +### Type Checking + +```bash +uv run mypy src/callosum +``` + +### Linting + +```bash +uv run ruff check src/ +uv run ruff format src/ +``` + +## Project Structure + +``` +src/callosum/ +├── lower/ # Transport layer (zeromq, redis) +│ ├── __init__.py # BaseTransport, AbstractBinder, AbstractConnector +│ ├── zeromq.py # ZeroMQ transport implementation +│ └── redis.py # Redis transport implementation +├── rpc/ # RPC layer +│ ├── channel.py # Peer class (main RPC interface) +│ ├── message.py # RPC message types +│ └── exceptions.py +├── auth.py # Authentication (CURVE for ZeroMQ) +├── ordering.py # Async schedulers +└── serial.py # Serialization utilities +``` + +## Key Classes + +- `Peer` (rpc/channel.py): Main RPC interface for both client and server +- `ZeroMQBaseTransport` (lower/zeromq.py): ZeroMQ transport with context management +- `ZeroMQRPCTransport`: RPC-specific transport using ROUTER/DEALER sockets + +## Optional Dependencies + +Defined in `pyproject.toml` under `[project.optional-dependencies]`: +- `zeromq`: pyzmq for ZeroMQ transport +- `redis`: redis-py for Redis transport +- `thrift`: thriftpy2 for Thrift serialization +- `snappy`: python-snappy for compression From ea5ba682e6b749128de30e82dd70507126668b1e Mon Sep 17 00:00:00 2001 From: Joongi Kim Date: Wed, 21 Jan 2026 20:28:35 +0900 Subject: [PATCH 3/3] doc: Add changelog fragment for PR #43 --- changes/43.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/43.feature.md diff --git a/changes/43.feature.md b/changes/43.feature.md new file mode 100644 index 0000000..f732411 --- /dev/null +++ b/changes/43.feature.md @@ -0,0 +1 @@ +lower.zeromq: Add support for external `zmq.asyncio.Context` injection via `transport_opts["zctx"]` to allow sharing a single context across multiple Peer instances