diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..c0ed7b9 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,259 @@ +# DurableMCP Examples + +Examples demonstrating idempotency patterns and durable storage +primitives for Model Context Protocol servers. + +## Concepts + +### Idempotency Guards + +**`at_least_once(alias, context, callable, type)`** + +Operation completes at least once. Caches result on success. Retries +on all failures. + +```python +user_id = await at_least_once( + "create_user", + context, + create_user, + type=str, +) +``` + +**`at_most_once(alias, context, callable, type, retryable_exceptions)`** + +Operation executes at most once. Only retries on specified exceptions. +Raises `AtMostOnceFailedBeforeCompleting` on subsequent calls after +non-retryable failure. + +```python +result = await at_most_once( + "payment", + context, + make_payment, + type=dict, + retryable_exceptions=[NetworkError], +) +``` + +### Storage Primitives + +**SortedMap** + +Larger-than-memory key-value store with lexicographic ordering. +Supports batch operations and range queries. + +```python +map = SortedMap.ref("name") +await map.insert(context, entries={"key": b"value"}) +response = await map.get(context, key="key") +response = await map.range(context, start_key="a", limit=100) +response = await map.reverse_range(context, limit=100) +await map.remove(context, keys=["key"]) +``` + +When calling methods on the same named SortedMap multiple times within +the same context, use `.idempotently()` with unique aliases: + +```python +map = SortedMap.ref("results") +await map.idempotently("store_step1").insert(context, entries={...}) +await map.idempotently("store_step2").insert(context, entries={...}) +``` + +Different named maps don't require idempotency guards. + +**UUIDv7** + +Time-ordered UUID with embedded timestamp. Sorts chronologically in +SortedMap. + +```python +from uuid7 import create as uuid7 + +key = str(uuid7()) # Embeds current timestamp. +await map.insert(context, entries={key: data}) +response = await map.reverse_range(context, limit=10) # Most recent. +``` + +### Tool Lifecycle + +Each `@mcp.tool()` invocation has its own idempotency manager. Guards +only deduplicate within a single tool call, not across multiple calls. + +## Examples + +### audit + +Audit logging with `@audit()` decorator. Stores tool invocations in +SortedMap with UUIDv7 keys for chronological access. + +**Demonstrates**: Decorator pattern, time-range queries, `reverse_range` +for recent entries. + +### steps + +Multi-step operations where each step is independently idempotent. If +tool is retried after step 1 succeeds but before step 2 completes, +step 1 returns cached result. + +**Demonstrates**: Multiple `at_least_once` guards with separate aliases, +sequential dependencies. + +### processing + +Payment processing with `at_most_once` to prevent duplicate charges. +Distinguishes retryable (network errors) from non-retryable (payment +rejected) failures. + +**Demonstrates**: `retryable_exceptions` parameter, +`AtMostOnceFailedBeforeCompleting` exception, error classification. + +### document + +Document processing pipeline combining `at_least_once` (idempotent +reads/writes) and `at_most_once` (external API calls) in a single +workflow. + +**Demonstrates**: Mixed patterns, OCR and translation APIs, multi-step +error handling. + +### define + +Technical glossary demonstrating all SortedMap CRUD operations. +Maintains dual indexes: alphabetical (by term) and chronological +(by UUIDv7). + +**Demonstrates**: `insert`, `get`, `range`, `reverse_range`, `remove`, +prefix search, dual indexing. + +## Running Examples + +### Interactive Harness (Recommended) + +The interactive harness runs examples end-to-end with client +demonstrations: + +```bash +cd examples +python run.py +``` + +**What it does:** + +1. Shows menu of available examples +2. Starts selected server on port 9991 +3. Waits for server to be ready +4. Runs corresponding client script +5. Shows full client output with examples +6. Cleans up server process on exit + +**Exit:** Press `q` at the menu or `Ctrl-C` to exit. + +### Client Pattern + +All example clients follow this pattern: + +```python +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + +async def main(): + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + # List tools. + tools = await session.list_tools() + + # Call tools. + result = await session.call_tool("tool_name", {"arg": "value"}) +``` + +### Running Servers Directly + +To run servers standalone without the harness: + +```bash +cd examples/ +uv run python example.py +``` + +Each example is a standalone MCP server exposing tools via the Model +Context Protocol on `http://localhost:9991/mcp`. + +## Patterns + +### Idempotent Multi-Step Operations + +```python +# Step 1: Cached on success. +step1_result = await at_least_once( + "step1", + context, + do_step1, + type=dict, +) + +# Step 2: Uses result from step 1. +step2_result = await at_least_once( + "step2", + context, + do_step2, + type=dict, +) +``` + +### External API with Retry Policy + +```python +try: + result = await at_most_once( + "api_call", + context, + call_api, + type=dict, + retryable_exceptions=[NetworkError], + ) +except NetworkError: + # Retries exhausted. + return {"error": "service unavailable"} +except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"error": "operation failed previously"} +``` + +### Recent Items with UUIDv7 + +```python +# Store with time-ordered keys. +key = str(uuid7()) +await map.insert(context, entries={key: data}) + +# Query most recent. +response = await map.reverse_range(context, limit=20) +``` + +### Prefix Search + +```python +# Find all keys starting with "api". +start_key = "api" +end_key = "apj" # Increment last character. +response = await map.range( + context, + start_key=start_key, + end_key=end_key, + limit=100, +) +``` + +## Notes + +- Idempotency guards are per-tool-invocation, not per-server. +- SortedMap operations are not atomic across multiple maps. +- UUIDv7 provides millisecond precision for time ordering. +- All storage is persistent and survives server restarts. diff --git a/examples/audit/README.md b/examples/audit/README.md new file mode 100644 index 0000000..f0b1d0e --- /dev/null +++ b/examples/audit/README.md @@ -0,0 +1,225 @@ +# Audit Logging + +Durable audit trails for MCP tool invocations using SortedMap and +UUIDv7. + +## Overview + +Store audit entries with time-ordered UUIDv7 keys for chronological +access. Provides both decorator and explicit logging patterns. + +## Features + +- Decorator pattern for automatic tool auditing +- Explicit logging with custom data +- Time-range queries via UUIDv7 boundaries +- Non-blocking (audit failures don't break tools) + +## Usage + +### Decorator Pattern + +Automatically log tool invocations: + +```python +@mcp.tool() +@audit("user_operations") +async def create_user( + name: str, + email: str, + context: DurableContext = None, +) -> dict: + """Create a new user.""" + user_id = f"user_{hash(name) % 10000}" + return {"status": "success", "user_id": user_id} +``` + +Logged data: + +```json +{ + "timestamp": 1699123456789, + "tool": "create_user", + "inputs": {"name": "Alice", "email": "alice@example.com"}, + "outputs": {"status": "success", "user_id": "1234"}, + "success": true, + "duration_seconds": 0.123 +} +``` + +### Explicit Logging + +Add custom audit entries: + +```python +@mcp.tool() +async def delete_user( + user_id: str, + reason: str = None, + context: DurableContext = None, +) -> dict: + """Delete a user.""" + # Perform deletion. + # ... + + # Log with custom fields. + await audit("user_operations", context, { + "action": "delete_user", + "user_id": user_id, + "reason": reason or "no reason provided", + "severity": "high", + }) + + return {"status": "success"} +``` + +### Querying Audit Logs + +```python +@mcp.tool() +async def get_audit_log( + log_name: str, + begin: int = None, + end: int = None, + limit: int = 100, + context: DurableContext = None, +) -> dict: + """Query audit logs by time range.""" + audit_map = SortedMap.ref(f"audit:{log_name}") + + if begin and end: + # Range query with UUIDv7 boundaries. + response = await audit_map.range( + context, + start_key=str(timestamp_to_uuidv7(begin)), + end_key=str(timestamp_to_uuidv7(end)), + limit=limit, + ) + else: + # Get most recent entries. + response = await audit_map.reverse_range(context, limit=limit) + + # Parse and return entries. + # ... +``` + +## How It Works + +### UUIDv7 Keys + +UUIDv7 embeds timestamp in first 48 bits, providing natural +chronological sorting: + +```python +from uuid7 import create as uuid7 + +key = str(uuid7()) # "018b8c5a-3f7e-7abc-9012-3456789abcdef" +``` + +Later keys sort after earlier keys lexicographically. + +### Storage Structure + +Audit entries stored in SortedMap `audit:{log_name}`: + +``` +audit:user_operations +├─ 018b8c5a-3f7e-7abc-9012-... → {"tool": "create_user", ...} +├─ 018b8c5b-1234-7abc-9012-... → {"tool": "delete_user", ...} +└─ 018b8c5c-5678-7abc-9012-... → {"tool": "update_user", ...} +``` + +### Timestamp to UUIDv7 Conversion + +Convert timestamps to UUIDv7 for range boundaries: + +```python +begin_uuid = timestamp_to_uuidv7(1699000000000) +end_uuid = timestamp_to_uuidv7(1699100000000) + +response = await audit_map.range( + context, + start_key=str(begin_uuid), + end_key=str(end_uuid), + limit=100, +) +``` + +## Examples + +```python +# Get last 50 entries. +get_audit_log("user_operations", limit=50) + +# Get entries from last hour. +import time +one_hour_ago = int((time.time() - 3600) * 1000) +get_audit_log("user_operations", begin=one_hour_ago) + +# Get entries in specific time range. +get_audit_log( + "user_operations", + begin=1699000000000, + end=1699100000000, +) +``` + +## API Reference + +### `audit(log_name, context=None, data=None)` + +Dual-purpose function for audit logging. + +**As decorator:** + +```python +@audit("log_name") +async def my_tool(arg: str, context: DurableContext = None): + ... +``` + +**As function:** + +```python +await audit("log_name", context, { + "action": "example", + "status": "success", +}) +``` + +### `timestamp_to_uuidv7(timestamp_ms)` + +Convert Unix timestamp (milliseconds) to UUIDv7 for range queries. + +## Best Practices + +Choose meaningful log names: + +```python +await audit("user_operations", ...) +await audit("security_events", ...) +await audit("api_calls", ...) +``` + +Use decorator for standard logging, explicit for custom context: + +```python +@mcp.tool() +@audit("user_operations") +async def promote_user(user_id: str, context: DurableContext = None): + # Decorator logs invocation. + + # Also log security event. + await audit("security_events", context, { + "action": "privilege_escalation", + "user_id": user_id, + "severity": "critical", + }) +``` + +## Running + +```bash +cd examples/audit +uv run python example.py +``` diff --git a/examples/audit/audit.py b/examples/audit/audit.py new file mode 100644 index 0000000..78e6033 --- /dev/null +++ b/examples/audit/audit.py @@ -0,0 +1,185 @@ +""" +Audit logging to SortedMap for DurableMCP tools. + +Provides both decorator and explicit logging for storing audit data in a +durable, chronologically-ordered audit trail using UUIDv7. +""" + +import functools +import time +from typing import Any, Callable, Dict, Optional, Union +from uuid import UUID + +from pydantic import BaseModel, Field + +from reboot.mcp.server import DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap +from uuid7 import create as uuid7 # type: ignore[import-untyped] + + +# Pydantic model for audit log entries. +class AuditEntry(BaseModel): + """Audit log entry with structured data.""" + + timestamp: int + tool: Optional[str] = None + inputs: Optional[Dict[str, Any]] = None + outputs: Optional[Any] = None + success: Optional[bool] = None + duration_seconds: Optional[float] = None + error: Optional[str] = None + + # Allow extra fields for custom audit data. + model_config = {"extra": "allow"} + + +def timestamp_to_uuidv7(timestamp_ms: int) -> UUID: + """ + Create a UUIDv7 from a Unix timestamp in milliseconds. + + This is useful for creating range boundaries when querying audit logs. + The UUIDv7 will have the timestamp embedded and minimal random bits. + + Args: + timestamp_ms: Unix timestamp in milliseconds. + + Returns: + UUIDv7 with the given timestamp. + """ + # UUIDv7 format (128 bits): + # - 48 bits: Unix timestamp in milliseconds + # - 4 bits: version (0111 = 7) + # - 12 bits: random + # - 2 bits: variant (10) + # - 62 bits: random + + # Create minimal `UUIDv7` with timestamp and zeros for random bits. + timestamp_48 = timestamp_ms & 0xFFFFFFFFFFFF # 48 bits. + + # Build the 128-bit `UUID`. + uuid_int = (timestamp_48 << 80) | (0x7 << 76) # Timestamp + version. + uuid_int |= (0x2 << 62) # Variant bits. + + return UUID(int=uuid_int) + + +async def _write_audit( + log_name: str, + context: DurableContext, + data: Dict[str, Any], +) -> None: + """ + Internal function to write audit entry to SortedMap. + + Args: + log_name: Name of the audit log. + context: The durable context. + data: Dictionary of audit data to store. + """ + timestamp = int(time.time() * 1000) + # Use `UUIDv7` for time-ordered, unique keys. + key = str(uuid7()) + + # Create Pydantic model with timestamp and provided data. + audit_entry = AuditEntry(timestamp=timestamp, **data) + + try: + audit_map = SortedMap.ref(f"audit:{log_name}") + await audit_map.insert( + context, + entries={key: audit_entry.model_dump_json().encode("utf-8")}, + ) + except Exception: + # Don't fail the original operation if audit fails. + pass + + +def audit( + log_name: str, + context: Optional[DurableContext] = None, + data: Optional[Dict[str, Any]] = None, +) -> Union[Callable, None]: + """ + Audit logging - works as both decorator and explicit function. + + As a decorator: + @mcp.tool() + @audit("user_operations") + async def my_tool(name: str, context: DurableContext = None): + return {"status": "success"} + + As an explicit function: + await audit("user_operations", context, { + "action": "custom_event", + "user": "alice", + "result": "success", + }) + + Args: + log_name: Name of the audit log (creates SortedMap "audit:{log_name}"). + context: The durable context (required for explicit logging). + data: Freeform dictionary (required for explicit logging). + + Returns: + Decorator function if used as decorator, coroutine if used explicitly. + + Storage: + Audit entries are stored in SortedMap with UUIDv7 keys for + time-ordered, unique identification. Query with reverse_range() + for chronological order (newest first). + + Decorator behavior: + - Captures all function arguments (except context) + - Records function return value or error + - Measures execution duration + - Automatically adds: tool, inputs, outputs, success, duration_seconds + """ + # Explicit logging: `audit(log_name, context, data)`. + if context is not None and data is not None: + return _write_audit(log_name, context, data) + + # Decorator mode: `@audit(log_name)`. + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def wrapper(*args, **kwargs) -> Any: + # Assume `context` is in `kwargs`. + ctx = kwargs.get("context") + + # Capture inputs (exclude `context`). + inputs = {k: v for k, v in kwargs.items() if k != "context"} + + # Call function and capture timing. + start_time = time.time() + success = False + result = None + error = None + + try: + result = await func(*args, **kwargs) + success = True + except Exception as e: + error = f"{type(e).__name__}: {str(e)}" + raise + finally: + # Log if we have `context`. + if ctx: + duration = time.time() - start_time + audit_data = { + "tool": func.__name__, + "inputs": inputs, + "success": success, + "duration_seconds": round(duration, 3), + } + + if success: + audit_data["outputs"] = result + else: + audit_data["error"] = error + + await _write_audit(log_name, ctx, audit_data) + + return result + + return wrapper + + return decorator diff --git a/examples/audit/client.py b/examples/audit/client.py new file mode 100644 index 0000000..27ed198 --- /dev/null +++ b/examples/audit/client.py @@ -0,0 +1,108 @@ +""" +Example client for audit logging demonstration. +""" + +import asyncio +import time +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run audit logging example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to audit example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Create users (decorator auditing). + print("=" * 60) + print("Example 1: Creating users (decorator auditing)") + print("=" * 60) + + users = [ + ("alice", "alice@example.com"), + ("bob", "bob@example.com"), + ("carol", "carol@example.com"), + ] + + for name, email in users: + result = await session.call_tool( + "create_user", + {"name": name, "email": email}, + ) + print(f"Created: {result.content[0].text}") + + print() + + # Example 2: Delete user (explicit auditing). + print("=" * 60) + print("Example 2: Deleting user (explicit auditing)") + print("=" * 60) + + result = await session.call_tool( + "delete_user", + { + "user_id": "user_1234", + "reason": "Account inactive for 2 years", + }, + ) + print(f"Deleted: {result.content[0].text}") + print() + + # Example 3: Query recent audit logs. + print("=" * 60) + print("Example 3: Query recent audit logs") + print("=" * 60) + + result = await session.call_tool( + "get_audit_log", + {"log_name": "user_operations", "limit": 10}, + ) + print(f"Audit log: {result.content[0].text}") + print() + + # Example 4: Update user (mixed pattern). + print("=" * 60) + print("Example 4: Update user role (mixed pattern)") + print("=" * 60) + + result = await session.call_tool( + "update_user", + { + "user_id": "user_5678", + "updates": {"role": "admin", "verified": True}, + }, + ) + print(f"Updated: {result.content[0].text}") + print() + + # Query security events log. + result = await session.call_tool( + "get_audit_log", + {"log_name": "security_events", "limit": 5}, + ) + print(f"Security events: {result.content[0].text}") + print() + + print("Audit example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/audit/example.py b/examples/audit/example.py new file mode 100644 index 0000000..96d7f10 --- /dev/null +++ b/examples/audit/example.py @@ -0,0 +1,212 @@ +""" +Example DurableMCP server with audit logging. + +Demonstrates both decorator and explicit audit logging patterns. +""" + +import asyncio +import sys +from pathlib import Path +from typing import Any, Dict, Optional + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from audit import AuditEntry, audit, timestamp_to_uuidv7 +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +# Example 1: Using @audit decorator +@mcp.tool() +@audit("user_operations") +async def create_user( + name: str, + email: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Create a new user. + + This tool is decorated with @audit, so all invocations are automatically + logged with inputs, outputs, duration, and success/failure. + """ + # Simulate user creation. + user_id = f"user_{hash(name) % 10000}" + + return { + "status": "success", + "user_id": user_id, + "name": name, + "email": email, + } + + +# Example 2: Using explicit audit logging +@mcp.tool() +async def delete_user( + user_id: str, + reason: Optional[str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Delete a user. + + This tool uses explicit audit logging to record additional context + beyond what the decorator would capture. + """ + # Perform deletion logic here. + # ... + + # Explicit audit with custom fields. + await audit( + "user_operations", + context, + { + "action": "delete_user", + "user_id": user_id, + "reason": reason or "no reason provided", + "severity": "high", + "status": "success", + }, + ) + + return {"status": "success", "user_id": user_id} + + +# Example 3: Tool to query audit logs +@mcp.tool() +async def get_audit_log( + log_name: str, + begin: Optional[int] = None, + end: Optional[int] = None, + limit: int = 100, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Query audit log entries by time range. + + Args: + log_name: Name of the audit log to query. + begin: Start timestamp in milliseconds (Unix epoch). Optional. + end: End timestamp in milliseconds (Unix epoch). Optional. + limit: Maximum number of entries to return (default 100). + context: The durable context. + + Returns: + Dictionary with count and entries list. + + Examples: + # Get last 50 entries. + get_audit_log("user_operations", limit=50) + + # Get entries from last hour. + get_audit_log("user_operations", begin=time.time()*1000 - 3600000) + + # Get entries in specific range. + get_audit_log("user_operations", begin=1699000000000, end=1699100000000) + """ + audit_map = SortedMap.ref(f"audit:{log_name}") + + # Build range query using `UUIDv7` boundaries. + if begin is not None and end is not None: + # Range query: begin to end. + begin_key = str(timestamp_to_uuidv7(begin)) + end_key = str(timestamp_to_uuidv7(end)) + + response = await audit_map.range( + context, + start_key=begin_key, + end_key=end_key, + limit=limit, + ) + elif begin is not None: + # Query from begin onwards. + begin_key = str(timestamp_to_uuidv7(begin)) + + response = await audit_map.range( + context, + start_key=begin_key, + limit=limit, + ) + elif end is not None: + # Query up to end (newest first, then filter). + end_key = str(timestamp_to_uuidv7(end)) + + response = await audit_map.reverse_range( + context, + end_key=end_key, + limit=limit, + ) + else: + # No time bounds - get most recent entries. + response = await audit_map.reverse_range( + context, + limit=limit, + ) + + # Parse entries using Pydantic. + entries = [] + for entry in response.entries: + audit_entry = AuditEntry.model_validate_json(entry.value) + entries.append(audit_entry.model_dump()) + + return { + "log_name": log_name, + "count": len(entries), + "entries": entries, + } + + +# Example 4: Mixed pattern - decorator + explicit logging +@mcp.tool() +@audit("user_operations") +async def update_user( + user_id: str, + updates: Dict[str, Any], + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Update user information. + + Uses both decorator (for automatic capture) and explicit logging + (for additional context). + """ + # Perform update. + # ... + + # The decorator will log this automatically, but we can add + # additional entries for specific events. + if "role" in updates: + await audit( + "security_events", + context, + { + "action": "role_change", + "user_id": user_id, + "old_role": "user", + "new_role": updates["role"], + "severity": "medium", + }, + ) + + return { + "status": "success", + "user_id": user_id, + "updated_fields": list(updates.keys()), + } + + +async def main(): + """Start the example audit server.""" + # Reboot application that runs everything necessary for `DurableMCP`. + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/define/README.md b/examples/define/README.md new file mode 100644 index 0000000..0ceef10 --- /dev/null +++ b/examples/define/README.md @@ -0,0 +1,244 @@ +# Technical Glossary + +Complete OrderedMap CRUD reference using a technical glossary with Pydantic models. + +## Overview + +Demonstrates all OrderedMap operations with type-safe Pydantic models through a practical use case: maintaining a technical glossary with both alphabetical and chronological indexes. + +## OrderedMap + Pydantic Pattern + +This example shows the **OrderedMap with protobuf Values** pattern: +- Uses `OrderedMap` for durable key-value storage +- Pydantic models for type safety and validation +- `from_model()` / `as_model()` helpers for serialization + +## OrderedMap Operations + +| Operation | Method | Use Case | +|-----------|--------|----------| +| Insert | `insert(context, key="...", value=...)` | Add terms | +| Search | `search(context, key="...")` | Look up term | +| Range | `range(context, start_key=..., limit=...)` | Browse alphabetically | +| Reverse Range | `reverse_range(context, limit=...)` | Recent additions | +| Remove | `remove(context, key="...")` | Delete terms | + +## Architecture + +### Pydantic Model + +```python +from pydantic import BaseModel + +class TermEntry(BaseModel): + """Type-safe term entry.""" + term: str + definition: str + category: str = "general" + examples: List[str] = [] + timestamp: int +``` + +### Two OrderedMaps for Different Access Patterns + +```python +# Map 1: Alphabetical index (keyed by term name). +terms_map = OrderedMap.ref("terms") +# Key: "api" -> Value: `protobuf.Value(TermEntry)`. + +# Map 2: Chronological index (keyed by `UUIDv7`). +recent_map = OrderedMap.ref("recent") +# Key: "018c1234-..." -> Value: `protobuf.Value(TermEntry)`. +``` + +## Usage Examples + +### Insert with Pydantic + +Add terms with type validation: + +```python +from rebootdev.protobuf import from_model + +@mcp.tool() +async def add_term( + term: str, + definition: str, + category: str = "general", + examples: List[str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """Add a technical term to the glossary.""" + timestamp = int(time.time() * 1000) + + # Create Pydantic model instance. + term_entry = TermEntry( + term=term, + definition=definition, + category=category, + examples=examples or [], + timestamp=timestamp, + ) + + # Insert into alphabetical map using `from_model()`. + await terms_map.insert( + context, + key=term.lower(), + value=from_model(term_entry), + ) + + # Insert into chronological map. + recent_key = str(uuid7()) + await recent_map.insert( + context, + key=recent_key, + value=from_model(term_entry), + ) + + return {"status": "success", "term": term} +``` + +### Search with Pydantic + +Point lookup with type-safe deserialization: + +```python +from rebootdev.protobuf import as_model + +@mcp.tool() +async def define( + term: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """Look up a term's definition.""" + response = await terms_map.search(context, key=term.lower()) + + if not response.found: + return {"status": "error", "message": "Term not found"} + + # Convert protobuf `Value` to Pydantic model. + term_entry = as_model(response.value, TermEntry) + + return {"status": "success", "term": term_entry.model_dump()} +``` + +### Range Query + +Browse terms alphabetically: + +```python +@mcp.tool() +async def list_terms( + start_with: str = "", + limit: int = 50, + context: DurableContext = None, +) -> Dict[str, Any]: + """List terms alphabetically.""" + response = await terms_map.range( + context, + start_key=start_with.lower() if start_with else None, + limit=limit, + ) + + terms = [] + for entry in response.entries: + # Deserialize each entry using `as_model()`. + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition[:100] + "..." + if len(term_entry.definition) > 100 + else term_entry.definition, + "category": term_entry.category, + }) + + return {"status": "success", "count": len(terms), "terms": terms} +``` + +### Reverse Range + +Get recently added terms: + +```python +@mcp.tool() +async def recent_terms( + limit: int = 20, + context: DurableContext = None, +) -> Dict[str, Any]: + """Get recently added terms (newest first).""" + response = await recent_map.reverse_range( + context, + limit=limit, + ) + + terms = [] + for entry in response.entries: + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition[:100] + "...", + "category": term_entry.category, + "added_at": term_entry.timestamp, + }) + + return {"status": "success", "count": len(terms), "recent_terms": terms} +``` + +### Remove + +Delete a term: + +```python +@mcp.tool() +async def remove_term( + term: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """Remove a term from the glossary.""" + # Check if term exists. + response = await terms_map.search(context, key=term.lower()) + + if not response.found: + return {"status": "error", "message": "Term not found"} + + # Remove from alphabetical map. + await terms_map.remove(context, key=term.lower()) + + return {"status": "success", "message": f"Removed '{term}'"} +``` + +## Registering OrderedMap Servicers + +**Important**: OrderedMap requires servicer registration: + +```python +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, +) + +async def main(): + await mcp.application(servicers=ordered_map_servicers()).run() +``` + +## Benefits of OrderedMap + Pydantic + +- Type Safety: Pydantic validates all data structures +- Clean API: `from_model()` / `as_model()` are explicit and readable +- IDE Support: Full autocomplete with `term_entry.field` +- Protobuf Integration: Works seamlessly with protobuf Values +- Validation: Catch errors at serialization boundaries + +## When to Use OrderedMap vs SortedMap + +**Use OrderedMap when:** +- You want protobuf Value integration +- Type safety with Pydantic is important +- You need the `from_model` / `as_model` pattern + +**Use SortedMap when:** +- You prefer working with raw bytes +- You need batch operations (`entries={...}`) +- Simplicity is preferred over type safety + +See other examples for SortedMap + Pydantic pattern (audit, steps, processing). diff --git a/examples/define/client.py b/examples/define/client.py new file mode 100644 index 0000000..7233d02 --- /dev/null +++ b/examples/define/client.py @@ -0,0 +1,177 @@ +""" +Example client for technical glossary demonstration. +""" + +import asyncio +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run technical glossary example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to define example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Add technical terms. + print("=" * 60) + print("Example 1: Add technical terms to glossary") + print("=" * 60) + + terms = [ + ( + "API", + "Application Programming Interface - A set of protocols " + "for building software", + "architecture", + ["REST API", "GraphQL API"], + ), + ( + "REST", + "Representational State Transfer - Architectural style " + "for distributed systems", + "architecture", + ["RESTful services", "HTTP methods"], + ), + ( + "gRPC", + "Google Remote Procedure Call - High-performance RPC " + "framework", + "networking", + ["Protocol Buffers", "HTTP/2"], + ), + ( + "Docker", + "Platform for developing and running containerized " + "applications", + "devops", + ["containers", "images", "Dockerfile"], + ), + ( + "Kubernetes", + "Container orchestration platform for automating " + "deployment and scaling", + "devops", + ["K8s", "pods", "deployments"], + ), + ] + + for term, definition, category, examples in terms: + result = await session.call_tool( + "add_term", + { + "term": term, + "definition": definition, + "category": category, + "examples": examples, + }, + ) + print(f"Added: {result.content[0].text}") + + print() + + # Example 2: Look up term definitions. + print("=" * 60) + print("Example 2: Look up term definitions") + print("=" * 60) + + for term in ["API", "gRPC", "Docker"]: + result = await session.call_tool( + "define", + {"term": term}, + ) + print(f"Definition of {term}: {result.content[0].text}") + print() + + # Example 3: Case-insensitive lookup. + print("=" * 60) + print("Example 3: Case-insensitive lookup") + print("=" * 60) + + # Demonstrate that lookups work regardless of case. + for query in ["grpc", "GRPC", "GrPc"]: + result = await session.call_tool( + "define", + {"term": query}, + ) + print(f"Lookup '{query}': {result.content[0].text}") + print() + + # Example 4: List terms alphabetically. + print("=" * 60) + print("Example 4: List terms alphabetically") + print("=" * 60) + + result = await session.call_tool( + "list_terms", + {"start_with": "", "limit": 10}, + ) + print(f"Terms (all): {result.content[0].text}") + print() + + # Example 5: Search by prefix. + print("=" * 60) + print("Example 5: Search terms by prefix") + print("=" * 60) + + result = await session.call_tool( + "search_terms", + {"prefix": "gR", "limit": 5}, + ) + print(f"Terms starting with 'gR': {result.content[0].text}") + print() + + # Example 6: Get recently added terms. + print("=" * 60) + print("Example 6: Get recently added terms") + print("=" * 60) + + result = await session.call_tool( + "recent_terms", + {"limit": 3}, + ) + print(f"Recent terms: {result.content[0].text}") + print() + + # Example 7: Remove a term. + print("=" * 60) + print("Example 7: Remove a term") + print("=" * 60) + + result = await session.call_tool( + "remove_term", + {"term": "Docker"}, + ) + print(f"Removed: {result.content[0].text}") + print() + + # Verify removal. + result = await session.call_tool( + "define", + {"term": "Docker"}, + ) + print(f"Lookup after removal: {result.content[0].text}") + print() + + print("Define example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/define/example.py b/examples/define/example.py new file mode 100644 index 0000000..cbf5668 --- /dev/null +++ b/examples/define/example.py @@ -0,0 +1,386 @@ +""" +Technical Glossary with OrderedMap CRUD Operations. + +Demonstrates OrderedMap operations using Pydantic models and +from_model/as_model helpers: Insert, Search, Range, ReverseRange, and +Remove, using a technical terms glossary as an example. +""" + +import asyncio +import sys +import time +from pathlib import Path +from typing import Any, Dict, List + +from pydantic import BaseModel + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, +) +from rebootdev.protobuf import from_model, as_model +from uuid7 import create as uuid7 # type: ignore[import-untyped] + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +# Pydantic model for term entries. +class TermEntry(BaseModel): + term: str + definition: str + category: str = "general" + examples: List[str] = [] + timestamp: int + + +@mcp.tool() +async def add_term( + term: str, + definition: str, + category: str = "general", + examples: List[str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Add a technical term to the glossary. + + Stores the term in two OrderedMaps: + - Alphabetically by term name (for lookup and browsing). + - Chronologically by UUIDv7 (for recent additions). + + Args: + term: The technical term to define. + definition: Definition of the term. + category: Category (e.g., "programming", "architecture"). + examples: Optional list of usage examples. + context: The durable context. + + Returns: + Confirmation with the term and timestamp. + """ + terms_map = OrderedMap.ref("terms") + recent_map = OrderedMap.ref("recent") + + timestamp = int(time.time() * 1000) + + # Create Pydantic model instance. + term_entry = TermEntry( + term=term, + definition=definition, + category=category, + examples=examples or [], + timestamp=timestamp, + ) + + # Insert into alphabetical map (keyed by `term`). + await terms_map.insert( + context, + key=term.lower(), + value=from_model(term_entry), + ) + + # Insert into chronological map (keyed by `UUIDv7`). + recent_key = str(uuid7()) + await recent_map.insert( + context, + key=recent_key, + value=from_model(term_entry), + ) + + return { + "status": "success", + "term": term, + "timestamp": timestamp, + } + + +@mcp.tool() +async def define( + term: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Look up a term's definition. + + Uses the `search` method for point lookup. + + Args: + term: The term to look up. + context: The durable context. + + Returns: + Term definition or error if not found. + """ + terms_map = OrderedMap.ref("terms") + + response = await terms_map.search(context, key=term.lower()) + + if not response.found: + return { + "status": "error", + "message": f"Term '{term}' not found in glossary", + } + + term_entry = as_model(response.value, TermEntry) + + return { + "status": "success", + "term": term_entry.model_dump(), + } + + +@mcp.tool() +async def remove_term( + term: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Remove a term from the glossary. + + Demonstrates the `remove` method. Note: Only removes from alphabetical + map. The recent map entry remains (showing historical additions). + + Args: + term: The term to remove. + context: The durable context. + + Returns: + Confirmation of removal. + """ + terms_map = OrderedMap.ref("terms") + + # Check if term exists first. + response = await terms_map.search(context, key=term.lower()) + + if not response.found: + return { + "status": "error", + "message": f"Term '{term}' not found", + } + + # Remove from alphabetical map. + await terms_map.remove( + context, + key=term.lower(), + ) + + return { + "status": "success", + "message": f"Removed '{term}' from glossary", + } + + +@mcp.tool() +async def list_terms( + start_with: str = "", + limit: int = 50, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + List terms alphabetically. + + Demonstrates the `range` method with optional start key. + + Args: + start_with: Optional prefix to start listing from. + limit: Maximum number of terms to return. + context: The durable context. + + Returns: + List of terms in alphabetical order. + """ + terms_map = OrderedMap.ref("terms") + + if start_with: + # Range starting from prefix. + response = await terms_map.range( + context, + start_key=start_with.lower(), + limit=limit, + ) + else: + # Range from beginning. + response = await terms_map.range( + context, + limit=limit, + ) + + terms = [] + for entry in response.entries: + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition[:100] + "..." + if len(term_entry.definition) > 100 + else term_entry.definition, + "category": term_entry.category, + }) + + return { + "status": "success", + "count": len(terms), + "terms": terms, + } + + +@mcp.tool() +async def browse_category( + category: str, + limit: int = 50, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Browse terms by category. + + Uses prefix-based range query on category-prefixed keys. + + Args: + category: Category to browse. + limit: Maximum number of terms. + context: The durable context. + + Returns: + Terms in the specified category. + """ + terms_map = OrderedMap.ref("terms") + + # Get all terms and filter by category. + # Note: In production, you'd use a separate category-indexed map. + response = await terms_map.range( + context, + limit=1000, # Fetch more to filter. + ) + + terms = [] + for entry in response.entries: + term_entry = as_model(entry.value, TermEntry) + if term_entry.category == category: + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition, + "examples": term_entry.examples, + }) + if len(terms) >= limit: + break + + return { + "status": "success", + "category": category, + "count": len(terms), + "terms": terms, + } + + +@mcp.tool() +async def recent_terms( + limit: int = 20, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Get recently added terms. + + Demonstrates `reverse_range` on UUIDv7-keyed map to get chronological + order (newest first). + + Args: + limit: Maximum number of recent terms. + context: The durable context. + + Returns: + Recently added terms in reverse chronological order. + """ + recent_map = OrderedMap.ref("recent") + + response = await recent_map.reverse_range( + context, + limit=limit, + ) + + terms = [] + for entry in response.entries: + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition[:100] + "..." + if len(term_entry.definition) > 100 + else term_entry.definition, + "category": term_entry.category, + "added_at": term_entry.timestamp, + }) + + return { + "status": "success", + "count": len(terms), + "recent_terms": terms, + } + + +@mcp.tool() +async def search_terms( + prefix: str, + limit: int = 20, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Search for terms by prefix. + + Demonstrates range query with client-side prefix filtering. + Note: OrderedMap.range() only supports start_key, not end_key. + + Args: + prefix: Search prefix. + limit: Maximum results. + context: The durable context. + + Returns: + Terms matching the prefix. + """ + terms_map = OrderedMap.ref("terms") + + start_key = prefix.lower() + + # Fetch more than limit to account for client-side filtering. + response = await terms_map.range( + context, + start_key=start_key, + limit=limit * 2, + ) + + terms = [] + for entry in response.entries: + # Check if key still matches prefix. + if not entry.key.startswith(start_key): + break + + term_entry = as_model(entry.value, TermEntry) + terms.append({ + "term": term_entry.term, + "definition": term_entry.definition, + "category": term_entry.category, + }) + + if len(terms) >= limit: + break + + return { + "status": "success", + "prefix": prefix, + "count": len(terms), + "terms": terms, + } + + +async def main(): + """Start the technical glossary server.""" + await mcp.application(servicers=ordered_map_servicers()).run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/document/README.md b/examples/document/README.md new file mode 100644 index 0000000..269a780 --- /dev/null +++ b/examples/document/README.md @@ -0,0 +1,241 @@ +# Document Processing + +Document processing pipeline combining `at_least_once` and `at_most_once` patterns with OrderedMap and Pydantic models. + +## Overview + +Real-world workflows often need both idempotency patterns. Use `at_least_once` for idempotent operations (reads, storage) and `at_most_once` for operations with side effects (external APIs). + +This example demonstrates: +- OrderedMap for durable storage +- Pydantic models for type-safe data structures +- `from_model()` / `as_model()` for serialization +- Mixed idempotency patterns in a single workflow + +## Workflow + +``` +Upload File -> Process Document + |- Step 1: Get file metadata (at_least_once) + |- Step 2: OCR extraction (at_most_once) + |- Step 3: Translation (at_most_once) + |- Step 4: Store result (at_least_once) +``` + +## Pydantic Models + +```python +from pydantic import BaseModel + +class FileData(BaseModel): + """File data model.""" + file_id: str + content: str + metadata: Dict[str, str] = {} + +class OCRResult(BaseModel): + """OCR processing result.""" + job_id: str + step: str + text: str + +class TranslationResult(BaseModel): + """Translation processing result.""" + job_id: str + step: str + text: str + language: str + +class JobResult(BaseModel): + """Final job result.""" + job_id: str + file_id: str + target_language: str + status: str + result: str +``` + +## Pattern with OrderedMap + Pydantic + +```python +from rebootdev.protobuf import from_model, as_model +from reboot.std.collections.ordered_map.v1.ordered_map import OrderedMap + +@mcp.tool() +async def process_document( + file_id: str, + target_language: str = "en", + context: DurableContext = None, +) -> dict: + """Process document through OCR and translation pipeline.""" + + files_map = OrderedMap.ref("files") + results_map = OrderedMap.ref("results") + jobs_map = OrderedMap.ref("jobs") + + # Step 1: Idempotent file lookup. + async def get_file_metadata(): + response = await files_map.search(context, key=file_id) + if not response.found: + raise ValueError(f"File {file_id} not found") + + # Deserialize with Pydantic. + file_data = as_model(response.value, FileData) + return file_data.model_dump() + + file_metadata = await at_least_once( + f"get_file_{file_id}", + context, + get_file_metadata, + type=dict, + ) + + # Step 2: OCR (external API, at most once). + async def perform_ocr(): + extracted_text = await simulate_ocr_api(file_metadata["content"]) + + # Create Pydantic model and store with from_model(). + ocr_result = OCRResult( + job_id=job_id, + step="ocr", + text=extracted_text, + ) + await results_map.idempotently(f"store_ocr_{job_id}").insert( + context, + key=f"{job_id}_ocr", + value=from_model(ocr_result), + ) + return extracted_text + + try: + ocr_text = await at_most_once( + f"ocr_{job_id}", + context, + perform_ocr, + type=str, + retryable_exceptions=[NetworkError], + ) + except NetworkError: + return {"status": "error", "step": "ocr", "error": "..."} + except AtMostOnceFailedBeforeCompleting: + return {"status": "error", "step": "ocr", "error": "..."} + except InvalidDocumentError as e: + return {"status": "error", "step": "ocr", "error": str(e)} + + # Step 3: Translation (external API, at most once). + async def perform_translation(): + translated_text = await simulate_translation_api( + ocr_text, + target_language, + ) + + # Create Pydantic model and store. + translation_result = TranslationResult( + job_id=job_id, + step="translation", + text=translated_text, + language=target_language, + ) + await results_map.idempotently(f"store_translation_{job_id}").insert( + context, + key=f"{job_id}_translation", + value=from_model(translation_result), + ) + return translated_text + + try: + translated_text = await at_most_once( + f"translate_{job_id}", + context, + perform_translation, + type=str, + retryable_exceptions=[NetworkError], + ) + except NetworkError: + return {"status": "error", "step": "translation", "error": "..."} + except AtMostOnceFailedBeforeCompleting: + return {"status": "error", "step": "translation", "error": "..."} + except QuotaExceededError as e: + return {"status": "error", "step": "translation", "error": str(e)} + + # Step 4: Store final result (idempotent write). + async def store_job_result(): + job_result = JobResult( + job_id=job_id, + file_id=file_id, + target_language=target_language, + status="completed", + result=translated_text, + ) + await jobs_map.insert( + context, + key=job_id, + value=from_model(job_result), + ) + return job_id + + final_job_id = await at_least_once( + f"store_job_{job_id}", + context, + store_job_result, + type=str, + ) + + return {"status": "success", "job_id": final_job_id} +``` + +## Error Handling + +### Retryable Errors +- `NetworkError`: Temporary network issues (API timeouts) +- Handled by `at_most_once` with automatic retry + +### Non-Retryable Errors +- `InvalidDocumentError`: Document format not supported +- `QuotaExceededError`: API quota exceeded +- Caught and returned as error responses + +## Idempotency Guards + +### at_least_once +- Used for: File reads, result storage +- Behavior: Caches return value, retries until success +- Ideal for: Idempotent operations that should eventually complete + +### at_most_once +- Used for: External API calls (OCR, translation) +- Behavior: Executes at most once, even if retried +- Ideal for: Operations with side effects (charges, state changes) + +### Combined Pattern +The `.idempotently()` modifier on `insert()` ensures intermediate results are stored exactly once, even if the enclosing `at_most_once` block retries. + +## Registering Servicers + +OrderedMap requires servicer registration: + +```python +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, +) + +async def main(): + await mcp.application(servicers=ordered_map_servicers()).run() +``` + +## Benefits + +- Type Safety: Pydantic validates all intermediate results +- Clear Errors: Each step has specific error types +- Resumable: If translation fails, OCR doesn't re-run +- Protobuf Integration: OrderedMap with `from_model` / `as_model` +- Audit Trail: Intermediate results stored for debugging + +## Use Case + +Perfect for workflows that: +- Call external APIs with charges or side effects +- Need to resume after partial failures +- Require validation of intermediate results +- Want type-safe data structures throughout diff --git a/examples/document/client.py b/examples/document/client.py new file mode 100644 index 0000000..dc8f354 --- /dev/null +++ b/examples/document/client.py @@ -0,0 +1,124 @@ +""" +Example client for document processing demonstration. +""" + +import asyncio +import json +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run document processing example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to document example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Upload files. + print("=" * 60) + print("Example 1: Upload files for processing") + print("=" * 60) + + files = [ + ("file_001", "invoice.pdf", "application/pdf"), + ("file_002", "contract.pdf", "application/pdf"), + ("file_003", "report.docx", "application/vnd.openxmlformats"), + ] + + file_ids = [] + for file_id, filename, content_type in files: + result = await session.call_tool( + "upload_file", + { + "file_id": file_id, + "content": f"Binary content of {filename}", + "metadata": { + "filename": filename, + "content_type": content_type, + }, + }, + ) + print(f"Uploaded: {result.content[0].text}") + + # Extract file_id from result for later use. + try: + data = json.loads(result.content[0].text) + if data.get("status") == "success": + file_ids.append(data["file_id"]) + except json.JSONDecodeError: + print(f" Error parsing response: {result.content[0].text}") + + print() + + # Example 2: Process documents. + print("=" * 60) + print("Example 2: Process documents (OCR + Translation)") + print("=" * 60) + + job_ids = [] + for file_id in file_ids: + result = await session.call_tool( + "process_document", + {"file_id": file_id, "target_language": "es"}, + ) + print(f"Processing result: {result.content[0].text}") + + # Extract job_id if successful. + try: + data = json.loads(result.content[0].text) + if data.get("status") == "success": + job_ids.append(data.get("job_id")) + except json.JSONDecodeError: + pass + + print() + + # Example 3: Check job status. + print("=" * 60) + print("Example 3: Check processing job status") + print("=" * 60) + + # Use the job_id from first successful processing. + if job_ids and job_ids[0]: + result = await session.call_tool( + "get_job_status", + {"job_id": job_ids[0]}, + ) + print(f"Job status: {result.content[0].text}") + print() + + # Example 4: Process with different language. + print("=" * 60) + print("Example 4: Process document to French") + print("=" * 60) + + if file_ids: + result = await session.call_tool( + "process_document", + {"file_id": file_ids[0], "target_language": "fr"}, + ) + print(f"Processing result: {result.content[0].text}") + print() + + print("Document example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/document/example.py b/examples/document/example.py new file mode 100644 index 0000000..d688cd8 --- /dev/null +++ b/examples/document/example.py @@ -0,0 +1,393 @@ +""" +Document Processing with Mixed Idempotency Patterns. + +Demonstrates combining `at_least_once` and `at_most_once` in a single workflow +for document processing with external API calls and multi-step operations. +""" + +import asyncio +import hashlib +import random +import sys +from pathlib import Path +from typing import Any, Dict + +from pydantic import BaseModel + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.aio.workflows import ( + at_least_once, + at_most_once, + AtMostOnceFailedBeforeCompleting, +) +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.ordered_map.v1.ordered_map import ( + OrderedMap, + servicers as ordered_map_servicers, +) +from rebootdev.protobuf import from_model, as_model + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +# Pydantic models for document processing. +class FileData(BaseModel): + """File data model.""" + + file_id: str + content: str + metadata: Dict[str, str] = {} + + +class OCRResult(BaseModel): + """OCR processing result.""" + + job_id: str + step: str + text: str + + +class TranslationResult(BaseModel): + """Translation processing result.""" + + job_id: str + step: str + text: str + language: str + + +class JobResult(BaseModel): + """Final job result.""" + + job_id: str + file_id: str + target_language: str + status: str + result: str + + +class NetworkError(Exception): + """Temporary network error (retryable).""" + + pass + + +class InvalidDocumentError(Exception): + """Document format is invalid (not retryable).""" + + pass + + +class QuotaExceededError(Exception): + """API quota exceeded (not retryable).""" + + pass + + +async def simulate_ocr_api(content: str) -> str: + """ + Simulate external OCR API call. + + May raise `NetworkError` (retryable) or `InvalidDocumentError` (not + retryable). + """ + # Simulate network issues. + if random.random() < 0.15: + raise NetworkError("OCR service timeout") + + # Simulate invalid documents. + if random.random() < 0.05: + raise InvalidDocumentError("Unsupported document format") + + # Return simulated OCR text. + return f"Extracted text from document: {content[:50]}..." + + +async def simulate_translation_api(text: str, target_lang: str) -> str: + """ + Simulate external translation API call. + + May raise `NetworkError` (retryable) or `QuotaExceededError` (not + retryable). + """ + # Simulate network issues. + if random.random() < 0.1: + raise NetworkError("Translation service timeout") + + # Simulate quota exceeded. + if random.random() < 0.03: + raise QuotaExceededError("Daily translation quota exceeded") + + # Return simulated translation. + return f"[{target_lang}] {text}" + + +@mcp.tool() +async def process_document( + file_id: str, + target_language: str = "en", + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Process a document through OCR and translation pipeline. + + This demonstrates a complex workflow combining: + - `at_least_once` for idempotent steps (file lookup, result storage) + - `at_most_once` for external API calls with side effects + + Args: + file_id: The file ID to process. + target_language: Target language code for translation. + context: The durable context. + + Returns: + Processing result with job_id and status. + """ + files_map = OrderedMap.ref("files") + results_map = OrderedMap.ref("results") + jobs_map = OrderedMap.ref("jobs") + + # Generate job ID. + job_id = f"job_{hashlib.md5(f'{file_id}_{target_language}'.encode()).hexdigest()[:12]}" + + # Step 1: Retrieve file metadata (idempotent read). + async def get_file_metadata(): + response = await files_map.search(context, key=file_id) + + if not response.found: + raise ValueError(f"File {file_id} not found") + + file_data = as_model(response.value, FileData) + return file_data.model_dump() + + # Use `at_least_once` for idempotent file lookup. + file_metadata = await at_least_once( + f"get_file_{file_id}", + context, + get_file_metadata, + type=dict, + ) + + # Step 2: Perform OCR (external API, at most once). + async def perform_ocr(): + # Call OCR API. + extracted_text = await simulate_ocr_api(file_metadata["content"]) + + # Create Pydantic model and store OCR result. + ocr_result_key = f"{job_id}_ocr" + ocr_result = OCRResult( + job_id=job_id, + step="ocr", + text=extracted_text, + ) + await results_map.idempotently(f"store_ocr_{job_id}").insert( + context, + key=ocr_result_key, + value=from_model(ocr_result), + ) + + return extracted_text + + try: + # Use `at_most_once` to ensure OCR is called at most once. + # Retry only on network errors. + ocr_text = await at_most_once( + f"ocr_{job_id}", + context, + perform_ocr, + type=str, + retryable_exceptions=[NetworkError], + ) + + except NetworkError: + return { + "status": "error", + "job_id": job_id, + "step": "ocr", + "error": "OCR service unavailable", + "retryable": True, + } + + except AtMostOnceFailedBeforeCompleting: + return { + "status": "error", + "job_id": job_id, + "step": "ocr", + "error": "OCR failed on previous attempt", + "retryable": False, + } + + except InvalidDocumentError as e: + return { + "status": "error", + "job_id": job_id, + "step": "ocr", + "error": str(e), + "retryable": False, + } + + # Step 3: Translate text (external API, at most once). + async def perform_translation(): + # Call translation API. + translated_text = await simulate_translation_api( + ocr_text, target_language + ) + + # Create Pydantic model and store translation result. + translation_result_key = f"{job_id}_translation" + translation_result = TranslationResult( + job_id=job_id, + step="translation", + text=translated_text, + language=target_language, + ) + await results_map.idempotently(f"store_translation_{job_id}").insert( + context, + key=translation_result_key, + value=from_model(translation_result), + ) + + return translated_text + + try: + # Use `at_most_once` for translation. + translated_text = await at_most_once( + f"translate_{job_id}", + context, + perform_translation, + type=str, + retryable_exceptions=[NetworkError], + ) + + except NetworkError: + return { + "status": "error", + "job_id": job_id, + "step": "translation", + "error": "Translation service unavailable", + "retryable": True, + } + + except AtMostOnceFailedBeforeCompleting: + return { + "status": "error", + "job_id": job_id, + "step": "translation", + "error": "Translation failed on previous attempt", + "retryable": False, + } + + except QuotaExceededError as e: + return { + "status": "error", + "job_id": job_id, + "step": "translation", + "error": str(e), + "retryable": False, + } + + # Step 4: Store final job result (idempotent write). + async def store_job_result(): + job_result = JobResult( + job_id=job_id, + file_id=file_id, + target_language=target_language, + status="completed", + result=translated_text, + ) + await jobs_map.insert( + context, + key=job_id, + value=from_model(job_result), + ) + return job_id + + # Use `at_least_once` for final storage. + final_job_id = await at_least_once( + f"store_job_{job_id}", + context, + store_job_result, + type=str, + ) + + return { + "status": "success", + "job_id": final_job_id, + "result": translated_text, + } + + +@mcp.tool() +async def upload_file( + file_id: str, + content: str, + metadata: Dict[str, str] = None, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Upload a file for processing. + + Args: + file_id: Unique file identifier. + content: File content. + metadata: Optional metadata dictionary. + context: The durable context. + + Returns: + Upload confirmation. + """ + files_map = OrderedMap.ref("files") + + file_data = FileData( + file_id=file_id, + content=content, + metadata=metadata or {}, + ) + await files_map.insert( + context, + key=file_id, + value=from_model(file_data), + ) + + return {"status": "success", "file_id": file_id} + + +@mcp.tool() +async def get_job_status( + job_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Get job processing status and result. + + Args: + job_id: The job ID to query. + context: The durable context. + + Returns: + Job status and result if completed. + """ + jobs_map = OrderedMap.ref("jobs") + + response = await jobs_map.search(context, key=job_id) + + if not response.found: + return {"status": "error", "message": "Job not found"} + + job_result = as_model(response.value, JobResult) + + return {"status": "success", "job": job_result.model_dump()} + + +async def main(): + """Start the document processing example server.""" + await mcp.application(servicers=ordered_map_servicers()).run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/processing/README.md b/examples/processing/README.md new file mode 100644 index 0000000..fe20e41 --- /dev/null +++ b/examples/processing/README.md @@ -0,0 +1,198 @@ +# Payment Processing + +Payment processing with `at_most_once` to prevent duplicate charges. + +## Overview + +Use `at_most_once` with `retryable_exceptions` to distinguish between +temporary failures (network errors) and permanent failures (payment +rejected). This prevents duplicate payments while allowing retries on +transient errors. + +## Pattern + +```python +@mcp.tool() +async def process_payment( + amount: float, + currency: str = "USD", + context: DurableContext = None, +) -> dict: + """Process payment via external API.""" + + async def make_payment(): + # Call external payment API. + result = await simulate_payment_api(amount, currency) + + # Store payment record. + await payments_map.insert(context, entries={...}) + + return result + + try: + # Retry only on network errors. + result = await at_most_once( + f"payment_{amount}_{currency}", + context, + make_payment, + type=dict, + retryable_exceptions=[NetworkError], + ) + + return {"status": "success", "payment": result} + + except NetworkError: + # Network error after retries exhausted. + return {"status": "error", "message": "Service unavailable"} + + except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"status": "error", "message": "Payment failed previously"} + + except (PaymentRejectedError, InsufficientFundsError) as e: + # First attempt with non-retryable error. + return {"status": "error", "message": str(e)} +``` + +## How It Works + +### Retryable vs Non-Retryable + +Define clear exception classes: + +```python +# Retryable: Temporary failures. +class NetworkError(Exception): + pass + +# Non-retryable: Permanent failures. +class PaymentRejectedError(Exception): + pass + +class InsufficientFundsError(Exception): + pass +``` + +Specify which exceptions should trigger retry: + +```python +result = await at_most_once( + "operation", + context, + operation_func, + type=dict, + retryable_exceptions=[NetworkError], # Only retry these. +) +``` + +### Error Scenarios + +**Network Error (Retryable)** + +1. Initial attempt: `NetworkError` raised +2. `at_most_once` retries the operation +3. Second attempt succeeds +4. Result cached and returned + +**Payment Rejected (Non-Retryable)** + +1. Initial attempt: `PaymentRejectedError` raised +2. Exception propagates (not in `retryable_exceptions`) +3. Tool returns error response + +**Tool Retry After Success** + +1. Initial call: Steps succeed, network issue prevents response +2. Tool called again by MCP framework +3. `at_most_once` returns cached result + +**Tool Retry After Rejection** + +1. Initial call: `PaymentRejectedError` raised +2. Tool returns error response +3. Tool called again +4. `at_most_once` raises `AtMostOnceFailedBeforeCompleting` + +### Three Exception Handlers + +```python +try: + result = await at_most_once(...) + return {"status": "success", ...} + +except NetworkError: + # Retryable error after exhausting retries. + return {"status": "error", "retryable": True} + +except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + return {"status": "error", "retryable": False} + +except (PaymentRejectedError, InsufficientFundsError) as e: + # First attempt with non-retryable error. + return {"status": "error", "message": str(e)} +``` + +## AtMostOnceFailedBeforeCompleting + +Raised when: + +1. Previous attempt failed with non-retryable exception +2. Tool is called again (retry by MCP framework) +3. `at_most_once` detects the previous failure + +Purpose: Prevent re-executing operations that already failed +permanently. + +## Best Practices + +Be specific with `retryable_exceptions`: + +```python +# Good: Explicit list of retryable exceptions. +retryable_exceptions=[NetworkError, TimeoutError] + +# Bad: Too broad (might retry unintended exceptions). +retryable_exceptions=[Exception] +``` + +Handle all exception cases: + +```python +try: + result = await at_most_once(...) +except RetryableError: + pass # Handle exhausted retries. +except AtMostOnceFailedBeforeCompleting: + pass # Handle previous failure. +except PermanentError: + pass # Handle first-time permanent failure. +``` + +Use descriptive aliases: + +```python +# Include identifying information in alias. +await at_most_once( + f"payment_{user_id}_{amount}_{timestamp}", + context, + make_payment, + type=dict, +) +``` + +## Comparison: at_least_once vs at_most_once + +| Feature | at_least_once | at_most_once | +|---------|---------------|--------------| +| Guarantee | Completes at least once | Executes at most once | +| Retry | Always retries on failure | Only on `retryable_exceptions` | +| Use case | Idempotent operations | Operations with side effects | +| Exception | None | `AtMostOnceFailedBeforeCompleting` | + +## Running + +```bash +cd examples/processing +uv run python example.py +``` diff --git a/examples/processing/client.py b/examples/processing/client.py new file mode 100644 index 0000000..9b90ec6 --- /dev/null +++ b/examples/processing/client.py @@ -0,0 +1,125 @@ +""" +Example client for payment processing demonstration. +""" + +import asyncio +import json +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run payment processing example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to processing example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Successful payment. + print("=" * 60) + print("Example 1: Successful payment") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 99.99, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + + # Extract transaction ID from result. + payment1_data = json.loads(result.content[0].text) + txn1_id = payment1_data.get("payment", {}).get("transaction_id") + print() + + # Example 2: Another successful payment. + result = await session.call_tool( + "process_payment", + {"amount": 49.99, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + + # Extract transaction ID from result. + payment2_data = json.loads(result.content[0].text) + txn2_id = payment2_data.get("payment", {}).get("transaction_id") + print() + + # Example 3: Retriable network error (will retry and succeed). + print("=" * 60) + print("Example 3: Retriable network error (retries and succeeds)") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 75.01, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + print() + + # Example 4: Retrieve payment records. + print("=" * 60) + print("Example 4: Retrieve payment records") + print("=" * 60) + + # Try first payment. + if txn1_id: + result = await session.call_tool( + "get_payment", + {"transaction_id": txn1_id}, + ) + print(f"Payment record 1: {result.content[0].text}") + print() + + # Try second payment. + if txn2_id: + result = await session.call_tool( + "get_payment", + {"transaction_id": txn2_id}, + ) + print(f"Payment record 2: {result.content[0].text}") + print() + + # Example 5: Payment that will fail (insufficient funds). + print("=" * 60) + print("Example 5: Failed payment (insufficient funds)") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 999999.99, "currency": "USD"}, + ) + print(f"Payment result: {result.content[0].text}") + print() + + # Example 6: Payment that will fail (invalid currency). + print("=" * 60) + print("Example 6: Failed payment (invalid currency)") + print("=" * 60) + + result = await session.call_tool( + "process_payment", + {"amount": 50.00, "currency": "INVALID"}, + ) + print(f"Payment result: {result.content[0].text}") + print() + + print("Processing example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/processing/example.py b/examples/processing/example.py new file mode 100644 index 0000000..dc4279e --- /dev/null +++ b/examples/processing/example.py @@ -0,0 +1,295 @@ +""" +Payment Processing with at_most_once. + +Demonstrates using at_most_once for operations that call external APIs where +retrying after certain errors could cause unintended side effects. +""" + +import asyncio +import random +import sys +from pathlib import Path +from typing import Any, Dict + +from pydantic import BaseModel + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.aio.workflows import at_most_once, AtMostOnceFailedBeforeCompleting +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +# Pydantic model for payment records. +class PaymentRecord(BaseModel): + """Payment record model.""" + + transaction_id: str + amount: float + currency: str + description: str = "" + status: str + + +class NetworkError(Exception): + """Temporary network error (retryable).""" + + pass + + +class PaymentRejectedError(Exception): + """Payment was rejected by payment processor (not retryable).""" + + pass + + +class InsufficientFundsError(Exception): + """Insufficient funds (not retryable).""" + + pass + + +async def simulate_payment_api( + amount: float, + currency: str, + context: DurableContext, +) -> Dict[str, Any]: + """ + Simulate external payment API call. + + Raises different exception types to demonstrate retry behavior. + """ + # Check for invalid currency (non-retryable error). + valid_currencies = ["USD", "EUR", "GBP", "JPY"] + if currency not in valid_currencies: + raise PaymentRejectedError(f"Invalid currency: {currency}") + + # Check for insufficient funds (non-retryable error). + if amount > 100000: + raise InsufficientFundsError( + f"Amount {amount} exceeds available balance" + ) + + # For amounts ending in .01, simulate retriable network errors using + # a SortedMap to track attempts. Fail first attempt, succeed on retry. + if amount % 1 == 0.01: + retry_map = SortedMap.ref("retry_attempts") + attempt_key = f"payment_{amount}_{currency}" + + # Get current attempt count. + response = await retry_map.get(context, key=attempt_key) + + if response.HasField("value"): + attempts = int(response.value.decode("utf-8")) + else: + attempts = 0 + + # Increment attempt count. + await retry_map.insert( + context, + entries={attempt_key: str(attempts + 1).encode("utf-8")}, + ) + + # Fail first attempt to demonstrate retry. + if attempts == 0: + raise NetworkError("Simulated network timeout for demo (will retry)") + + # Success: Return payment confirmation. + return { + "transaction_id": f"txn_{random.randint(100000, 999999)}", + "amount": amount, + "currency": currency, + "status": "completed", + } + + +@mcp.tool() +async def process_payment( + amount: float, + currency: str = "USD", + description: str = "", + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Process a payment via external payment API. + + Uses at_most_once to ensure the payment is only attempted once, even if + the tool is retried. Network errors are retryable, but payment rejections + are not. + + Args: + amount: Payment amount. + currency: Currency code (default: USD). + description: Payment description. + context: The durable context. + + Returns: + Payment result or error information. + """ + payments_map = SortedMap.ref("payments") + + async def make_payment(): + # Call external payment API. + result = await simulate_payment_api(amount, currency, context) + + # Create Pydantic model and store payment record. + payment_id = result["transaction_id"] + payment_record = PaymentRecord( + transaction_id=payment_id, + amount=amount, + currency=currency, + description=description, + status=result["status"], + ) + await payments_map.insert( + context, + entries={ + payment_id: payment_record.model_dump_json().encode("utf-8") + }, + ) + + return result + + try: + # Use at_most_once to ensure payment is attempted at most once. + # Only retry on network errors - payment rejections are final. + result = await at_most_once( + f"payment_{amount}_{currency}_{hash(description)}", + context, + make_payment, + type=dict, + retryable_exceptions=[NetworkError], + ) + + return { + "status": "success", + "payment": result, + } + + except NetworkError: + # Network error after retries exhausted. + return { + "status": "error", + "error_type": "network_error", + "message": "Payment service temporarily unavailable", + "retryable": True, + } + + except AtMostOnceFailedBeforeCompleting: + # Previous attempt failed with non-retryable error. + # This means payment was rejected or funds were insufficient. + return { + "status": "error", + "error_type": "payment_failed", + "message": "Payment failed on previous attempt (not retryable)", + "retryable": False, + } + + except (PaymentRejectedError, InsufficientFundsError) as e: + # First attempt with non-retryable error. + return { + "status": "error", + "error_type": type(e).__name__, + "message": str(e), + "retryable": False, + } + + +@mcp.tool() +async def get_payment( + transaction_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Retrieve payment record. + + Args: + transaction_id: The transaction ID to retrieve. + context: The durable context. + + Returns: + Payment data or error if not found. + """ + payments_map = SortedMap.ref("payments") + + response = await payments_map.get(context, key=transaction_id) + + if not response.HasField("value"): + return {"status": "error", "message": "Payment not found"} + + payment_record = PaymentRecord.model_validate_json(response.value) + + return {"status": "success", "payment": payment_record.model_dump()} + + +@mcp.tool() +async def fetch_exchange_rate( + from_currency: str, + to_currency: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Fetch exchange rate from external API. + + Demonstrates at_most_once for read-only API calls where retries are safe + but we want to avoid redundant network calls. + + Args: + from_currency: Source currency code. + to_currency: Target currency code. + context: The durable context. + + Returns: + Exchange rate or error. + """ + + async def fetch_rate(): + # Simulate API call with occasional network errors. + if random.random() < 0.1: + raise NetworkError("API timeout") + + # Return simulated exchange rate. + return { + "from": from_currency, + "to": to_currency, + "rate": round(random.uniform(0.5, 2.0), 4), + } + + try: + # Retry on network errors only. + result = await at_most_once( + f"exchange_rate_{from_currency}_{to_currency}", + context, + fetch_rate, + type=dict, + retryable_exceptions=[NetworkError], + ) + + return {"status": "success", "data": result} + + except NetworkError: + return { + "status": "error", + "message": "Exchange rate service unavailable", + } + + except AtMostOnceFailedBeforeCompleting: + return { + "status": "error", + "message": "Previous fetch attempt failed", + } + + +async def main(): + """Start the payment processing example server.""" + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/run.py b/examples/run.py new file mode 100755 index 0000000..a2d85aa --- /dev/null +++ b/examples/run.py @@ -0,0 +1,235 @@ +#!/usr/bin/env python3 +""" +Harness for running DurableMCP examples. + +Lets user select an example, starts the server, and runs the client. +""" + +import asyncio +import os +import signal +import socket +import subprocess +import sys +import time +from pathlib import Path + +import aiohttp + + +EXAMPLES = { + "1": { + "name": "audit", + "description": "Audit logging with decorator and explicit patterns", + }, + "2": { + "name": "steps", + "description": "Multi-step operations with independent idempotency", + }, + "3": { + "name": "processing", + "description": "Payment processing with at_most_once", + }, + "4": { + "name": "document", + "description": "Document pipeline with OrderedMap + Pydantic", + }, + "5": { + "name": "define", + "description": "Technical glossary with OrderedMap CRUD", + }, +} + + +def print_menu(): + """Print the example selection menu.""" + print("\nDurableMCP Examples") + print("=" * 60) + for key, example in sorted(EXAMPLES.items()): + print(f"{key}. {example['name']:12} - {example['description']}") + print("=" * 60) + + +def get_selection(): + """Get user's example selection.""" + while True: + choice = input("\nSelect example (1-5, or 'q' to quit): ").strip() + if choice.lower() == "q": + return None + if choice in EXAMPLES: + return EXAMPLES[choice]["name"] + print(f"Invalid selection: {choice}") + + +def check_port_in_use(port: int) -> bool: + """Check if a port is already in use.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + return s.connect_ex(("localhost", port)) == 0 + + +async def wait_for_server(url: str, timeout: int = 30): + """Wait for server to be ready.""" + start = time.time() + port = 9991 + + # First, wait for port to be open. + while time.time() - start < timeout: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + if s.connect_ex(("localhost", port)) == 0: + print(f"Port {port} is open, checking MCP endpoint...") + break + await asyncio.sleep(0.5) + else: + print(f"Timeout: Port {port} never opened") + return False + + # Port is open, now wait for MCP to respond. + # Give it a moment to fully initialize. + await asyncio.sleep(2) + + while time.time() - start < timeout: + try: + async with aiohttp.ClientSession() as session: + # Try to list tools via MCP protocol. + async with session.post( + url, + json={ + "jsonrpc": "2.0", + "id": 1, + "method": "tools/list", + }, + ) as resp: + # Any response means server is up. + print(f"MCP responded with status {resp.status}") + if resp.status in (200, 400, 405, 406): + return True + except (aiohttp.ClientError, ConnectionError) as e: + print(f"MCP check failed: {e.__class__.__name__}") + await asyncio.sleep(0.5) + except Exception as e: + print(f"Unexpected error: {e}") + await asyncio.sleep(0.5) + + print("Timeout: MCP endpoint never responded") + return False + + +async def run_example(example_name: str): + """Run the selected example.""" + example_dir = Path(__file__).parent / example_name + server_path = example_dir / "example.py" + client_path = example_dir / "client.py" + + if not server_path.exists(): + print(f"Error: Server not found at {server_path}") + return + + if not client_path.exists(): + print(f"Error: Client not found at {client_path}") + return + + print(f"\nStarting {example_name} example...") + print(f"Server: {server_path}") + print(f"Client: {client_path}") + + # Check if port 9991 is already in use. + if check_port_in_use(9991): + print("\nWarning: Port 9991 is already in use!") + print("Please stop the existing server before continuing.") + return + + # Start the server with rbt dev run in a new process group. + print("\nStarting server (output below)...") + print("-" * 60) + server_process = subprocess.Popen( + [ + "uv", + "run", + "rbt", + "dev", + "run", + "--python", + f"--application={server_path.name}", + "--working-directory=.", + "--no-generate-watch", + ], + cwd=example_dir, + stdin=subprocess.PIPE, # Provide a pipe for stdin. + preexec_fn=os.setsid, # Create new process group. + ) + + try: + # Wait for server to be ready. + print("\nWaiting for server to be ready on port 9991...") + if not await wait_for_server("http://localhost:9991/mcp"): + print("Error: Server did not start in time") + print("Check server output above for errors") + return + + print("Server ready!") + print("\n" + "=" * 60) + + # Run the client. + client_process = await asyncio.create_subprocess_exec( + sys.executable, + str(client_path), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + stdout, stderr = await client_process.communicate() + + print(stdout.decode()) + if stderr: + print("Errors:", stderr.decode(), file=sys.stderr) + + print("=" * 60) + + finally: + # Clean up - send SIGINT (Ctrl-C) to allow rbt to cleanup properly. + print("\nShutting down server...") + try: + # Send SIGINT (like Ctrl-C) to the entire process group. + # This gives rbt a chance to cleanup docker containers. + os.killpg(os.getpgid(server_process.pid), signal.SIGINT) + # Wait longer for graceful shutdown (docker cleanup takes time). + try: + server_process.wait(timeout=10) + print("Server stopped cleanly") + except subprocess.TimeoutExpired: + print("Server didn't stop in time, forcing shutdown...") + # Force kill if necessary. + os.killpg(os.getpgid(server_process.pid), signal.SIGKILL) + server_process.wait() + except ProcessLookupError: + # Process already terminated. + pass + + +async def main(): + """Main harness loop.""" + while True: + print_menu() + example = get_selection() + + if example is None: + print("\nExiting...") + break + + try: + await run_example(example) + except KeyboardInterrupt: + print("\n\nInterrupted by user") + break + except Exception as e: + print(f"\nError running example: {e}", file=sys.stderr) + + input("\nPress Enter to continue...") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n\nExiting...") + sys.exit(0) diff --git a/examples/steps/README.md b/examples/steps/README.md new file mode 100644 index 0000000..b88eab4 --- /dev/null +++ b/examples/steps/README.md @@ -0,0 +1,189 @@ +# Multi-Step Operations + +Multi-step operations with independent idempotency guards using +`at_least_once`. + +## Overview + +When a tool performs multiple sequential operations, each step should +be independently idempotent. If the tool is retried after step 1 +succeeds but before step 2 completes, step 1 returns its cached result +and only step 2 retries. + +## Pattern + +```python +@mcp.tool() +async def create_user_with_profile( + username: str, + email: str, + bio: str = "", + context: DurableContext = None, +) -> dict: + """Create user and profile in separate steps.""" + + # Step 1: Create user (cached on success). + async def create_user(): + user_id = f"user_{hash(username) % 100000}" + await users_map.insert( + context, + entries={user_id: json.dumps({...}).encode("utf-8")}, + ) + return user_id + + user_id = await at_least_once( + f"create_user_{username}", + context, + create_user, + type=str, + ) + + # Step 2: Create profile (cached independently). + async def create_profile(): + profile_id = f"profile_{user_id}" + await profiles_map.insert( + context, + entries={profile_id: json.dumps({...}).encode("utf-8")}, + ) + return profile_id + + profile_id = await at_least_once( + f"create_profile_{user_id}", + context, + create_profile, + type=str, + ) + + return {"user_id": user_id, "profile_id": profile_id} +``` + +## How It Works + +Each step has its own `at_least_once` guard with a unique alias. + +### Retry After Step 1 Success + +1. Initial call: Step 1 creates user, succeeds +2. Crash before step 2 +3. Retry: Step 1 returns cached `user_id`, step 2 executes + +Result: User created once, profile created on retry. + +### Retry After Both Steps + +1. Initial call: Step 1 succeeds, step 2 succeeds +2. Network error prevents response delivery +3. Retry: Both steps return cached results + +Result: Both operations return immediately without re-execution. + +### Retry After Step 1 Failure + +1. Initial call: Step 1 raises exception +2. Retry: Step 1 executes again + +Result: Step 1 retries until success, then step 2 executes. + +## Key Concepts + +### Independent Guards + +Each step has its own guard with distinct alias: + +```python +# Step 1 alias. +await at_least_once(f"create_user_{username}", ...) + +# Step 2 alias. +await at_least_once(f"create_profile_{user_id}", ...) +``` + +This ensures each step caches independently. + +### Sequential Dependencies + +Later steps can depend on earlier step results: + +```python +# Step 1: Get data. +user_id = await at_least_once("create_user", context, ...) + +# Step 2: Use result from step 1. +profile_id = await at_least_once( + f"create_profile_{user_id}", # Uses `user_id`. + context, + ..., +) +``` + +### Function References + +Pass function references directly (no lambda): + +```python +# Correct. +async def create_user(): + return await create_user_record(...) + +user_id = await at_least_once( + "create_user", + context, + create_user, # Function reference. + type=str, +) + +# Wrong (don't use lambda unless needed for arguments). +user_id = await at_least_once( + "create_user", + context, + lambda: create_user(), + type=str, +) +``` + +## Best Practices + +Use distinct aliases: + +```python +# Good: Different aliases. +await at_least_once(f"create_user_{username}", ...) +await at_least_once(f"create_profile_{user_id}", ...) + +# Bad: Same alias. +await at_least_once("create", ...) +await at_least_once("create", ...) +``` + +Make each step atomic: + +```python +# Good: Each step is complete operation. +user_id = await at_least_once("create_user", ...) +profile_id = await at_least_once("create_profile", ...) + +# Bad: Steps too granular. +await at_least_once("validate_username", ...) +await at_least_once("hash_password", ...) +await at_least_once("insert_database", ...) +``` + +Let exceptions propagate: + +```python +# Good: `at_least_once` handles retries. +user_id = await at_least_once("create_user", context, create_user, ...) + +# Bad: Catching exceptions defeats retry. +try: + user_id = await at_least_once("create_user", ...) +except Exception: + return {"error": "failed"} # Operation won't retry. +``` + +## Running + +```bash +cd examples/steps +uv run python example.py +``` diff --git a/examples/steps/client.py b/examples/steps/client.py new file mode 100644 index 0000000..b91fef8 --- /dev/null +++ b/examples/steps/client.py @@ -0,0 +1,92 @@ +""" +Example client for multi-step operations demonstration. +""" + +import asyncio +import json +from reboot.mcp.client import connect + +URL = "http://localhost:9991" + + +async def main(): + """Run multi-step operations example client.""" + async with connect(URL + "/mcp") as ( + session, + session_id, + protocol_version, + ): + print("Connected to steps example server") + print(f"Session ID: {session_id}") + print() + + # List available tools. + tools = await session.list_tools() + print(f"Available tools: {len(tools.tools)}") + for tool in tools.tools: + print(f" - {tool.name}") + # Print description with proper indentation. + if tool.description: + for line in tool.description.split("\n"): + print(f" {line}") + print() + + # Example 1: Create user with profile. + print("=" * 60) + print("Example 1: Create user with profile (two-step)") + print("=" * 60) + + result = await session.call_tool( + "create_user_with_profile", + { + "username": "alice", + "email": "alice@example.com", + "bio": "Software engineer interested in distributed systems", + }, + ) + print(f"Created: {result.content[0].text}") + + # Parse the result to get actual user_id and profile_id. + alice_data = json.loads(result.content[0].text) + alice_user_id = alice_data.get("user_id") + alice_profile_id = alice_data.get("profile_id") + print() + + # Example 2: Create another user. + result = await session.call_tool( + "create_user_with_profile", + { + "username": "bob", + "email": "bob@example.com", + "bio": "Data scientist working on ML infrastructure", + }, + ) + print(f"Created: {result.content[0].text}") + print() + + # Example 3: Retrieve user data. + print("=" * 60) + print("Example 2: Retrieve user and profile data") + print("=" * 60) + + if alice_user_id: + result = await session.call_tool( + "get_user", + {"user_id": alice_user_id}, + ) + print(f"User data: {result.content[0].text}") + print() + + if alice_profile_id: + result = await session.call_tool( + "get_profile", + {"profile_id": alice_profile_id}, + ) + print(f"Profile data: {result.content[0].text}") + print() + + print("Steps example completed successfully!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/steps/example.py b/examples/steps/example.py new file mode 100644 index 0000000..dd6cdcd --- /dev/null +++ b/examples/steps/example.py @@ -0,0 +1,186 @@ +""" +Multi-Step Operations with Partial Failure Recovery. + +Demonstrates using at_least_once for operations with multiple steps where +each step should be idempotent and cached independently. +""" + +import asyncio +import sys +from pathlib import Path +from typing import Any, Dict + +from pydantic import BaseModel + +# Add api/ to Python path for generated proto code. +api_path = Path(__file__).parent.parent.parent / "api" +if api_path.exists(): + sys.path.insert(0, str(api_path)) + +from reboot.aio.workflows import at_least_once +from reboot.mcp.server import DurableMCP, DurableContext +from reboot.std.collections.v1.sorted_map import SortedMap + +# Initialize MCP server. +mcp = DurableMCP(path="/mcp") + + +# Pydantic models for user and profile data. +class UserData(BaseModel): + """User data model.""" + + username: str + email: str + + +class ProfileData(BaseModel): + """Profile data model.""" + + user_id: str + bio: str = "" + avatar_url: str = "" + + +@mcp.tool() +async def create_user_with_profile( + username: str, + email: str, + bio: str = "", + avatar_url: str = "", + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Create a user and their profile in separate steps. + + This demonstrates multi-step operations where each step is independently + idempotent. If the tool is retried after the user is created but before + the profile is created, the user creation will return the cached result + and only the profile creation will retry. + + Args: + username: Unique username for the user. + email: User's email address. + bio: Optional user bio. + avatar_url: Optional avatar URL. + context: The durable context. + + Returns: + Dictionary with user_id and profile_id. + """ + + # Step 1: Create user (idempotent). + async def create_user(): + users_map = SortedMap.ref("users") + user_id = f"user_{hash(username) % 100000}" + + # Create Pydantic model and store user data. + user_data = UserData(username=username, email=email) + await users_map.insert( + context, + entries={user_id: user_data.model_dump_json().encode("utf-8")}, + ) + + return user_id + + # If this tool is retried after user creation succeeds, this will + # return the cached user_id without re-creating the user. + user_id = await at_least_once( + f"create_user_{username}", + context, + create_user, + type=str, + ) + + # Step 2: Create profile (idempotent, separate guard). + async def create_profile(): + profiles_map = SortedMap.ref("profiles") + profile_id = f"profile_{user_id}" + + # Create Pydantic model and store profile data. + profile_data = ProfileData( + user_id=user_id, + bio=bio, + avatar_url=avatar_url, + ) + await profiles_map.insert( + context, + entries={profile_id: profile_data.model_dump_json().encode("utf-8")}, + ) + + return profile_id + + # If this tool is retried after step 1 but before step 2 completes, + # only this step will execute (step 1 returns cached result). + profile_id = await at_least_once( + f"create_profile_{user_id}", + context, + create_profile, + type=str, + ) + + return { + "status": "success", + "user_id": user_id, + "profile_id": profile_id, + } + + +@mcp.tool() +async def get_user( + user_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Retrieve user data. + + Args: + user_id: The user ID to retrieve. + context: The durable context. + + Returns: + User data or error if not found. + """ + users_map = SortedMap.ref("users") + response = await users_map.get(context, key=user_id) + + if not response.HasField("value"): + return {"status": "error", "message": "User not found"} + + user_data = UserData.model_validate_json(response.value) + + return {"status": "success", "user": user_data.model_dump()} + + +@mcp.tool() +async def get_profile( + profile_id: str, + context: DurableContext = None, +) -> Dict[str, Any]: + """ + Retrieve profile data. + + Args: + profile_id: The profile ID to retrieve. + context: The durable context. + + Returns: + Profile data or error if not found. + """ + profiles_map = SortedMap.ref("profiles") + response = await profiles_map.get(context, key=profile_id) + + if not response.HasField("value"): + return {"status": "error", "message": "Profile not found"} + + profile_data = ProfileData.model_validate_json(response.value) + + return {"status": "success", "profile": profile_data.model_dump()} + + +async def main(): + """Start the multi-step example server.""" + await mcp.application().run() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/uv.lock b/uv.lock index cedb75c..0ab49f8 100644 --- a/uv.lock +++ b/uv.lock @@ -284,7 +284,7 @@ wheels = [ [[package]] name = "durable-mcp" -version = "0.5.0" +version = "0.6.0" source = { editable = "." } dependencies = [ { name = "mcp" }, @@ -307,7 +307,7 @@ requires-dist = [ { name = "mcp", specifier = "==1.21.0" }, { name = "mypy", marker = "extra == 'dev'", specifier = "==1.2.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = "==8.4.1" }, - { name = "reboot", specifier = "==0.39.3" }, + { name = "reboot", specifier = "==0.40.1" }, { name = "twine", marker = "extra == 'dev'" }, { name = "types-protobuf", marker = "extra == 'dev'", specifier = ">=4.24.0.20240129" }, { name = "uuid7-standard", specifier = ">=1.1.0" }, @@ -1418,7 +1418,7 @@ wheels = [ [[package]] name = "reboot" -version = "0.39.3" +version = "0.40.1" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "aiofiles" }, @@ -1462,9 +1462,9 @@ dependencies = [ { name = "websockets" }, ] wheels = [ - { url = "https://files.pythonhosted.org/packages/a5/89/f1e03159918f72388176d00af2ee21d06f10655d2f1ecf73ffa126c3049d/reboot-0.39.3-py3-none-macosx_13_0_arm64.whl", hash = "sha256:fcacb37fc7a43e0444120180ffb0b68f44e985cab5214913e141b9b9bdb943a5", size = 18290389, upload-time = "2025-11-08T21:31:45.597Z" }, - { url = "https://files.pythonhosted.org/packages/fd/c4/7234ea8ac6fb80f1dd6f18020ed376bd5ce45578541f8ff557a7b10a210e/reboot-0.39.3-py3-none-manylinux_2_35_aarch64.whl", hash = "sha256:862528e8e4073844a30b6bce9a472bbfd132ddf9bbfe545efcb3978a39ff8936", size = 22041443, upload-time = "2025-11-08T21:19:12.786Z" }, - { url = "https://files.pythonhosted.org/packages/54/e7/3f73e7d60c1e6c375a1c8c207ec672e99d9203dab7595c94863d6bc4df72/reboot-0.39.3-py3-none-manylinux_2_35_x86_64.whl", hash = "sha256:85a029aa7545d0634737a97d473194567190122f0d2e0ffddd3dbaa2c0bcf481", size = 22159062, upload-time = "2025-11-08T21:18:30.383Z" }, + { url = "https://files.pythonhosted.org/packages/0b/b8/41dac97ad72cbe3051429f13b8ad5e56599f2abad2126796ad178966b94f/reboot-0.40.1-py3-none-macosx_13_0_arm64.whl", hash = "sha256:d6a5f85ddac802ffb902e7e64a8f7151186fc044e6efcac0392ef8174ab99710", size = 18318771, upload-time = "2025-11-13T18:14:14.174Z" }, + { url = "https://files.pythonhosted.org/packages/cc/3e/3043d1b69f8a1dd677c52aebe75541c381c9ab76a5529054c5c3ad588b1b/reboot-0.40.1-py3-none-manylinux_2_35_aarch64.whl", hash = "sha256:e4277bed694aba535cdb771d572ab3a9c882a37693c54f86165e5b8c3b9f7ac4", size = 22069832, upload-time = "2025-11-13T18:01:46.694Z" }, + { url = "https://files.pythonhosted.org/packages/39/ac/2541d1143a98a0390bef57a9407b64778224d506713bb917c927fe429300/reboot-0.40.1-py3-none-manylinux_2_35_x86_64.whl", hash = "sha256:11b0360d597974a41d390f6566bb2080ff6267afa2564cc1c36f057088178d6d", size = 22187636, upload-time = "2025-11-13T18:01:42.228Z" }, ] [[package]]