diff --git a/pyproject.toml b/pyproject.toml index 3270c542d..e2533b312 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath" -version = "2.8.28" +version = "2.8.29" description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools." readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/functions/debug.py b/src/uipath/functions/debug.py index 692c61afb..2e20b1d6c 100644 --- a/src/uipath/functions/debug.py +++ b/src/uipath/functions/debug.py @@ -17,6 +17,7 @@ from __future__ import annotations import asyncio +import contextvars import json import logging import os @@ -35,6 +36,7 @@ UiPathRuntimeResult, UiPathStreamOptions, ) +from uipath.runtime.events import UiPathRuntimeStateEvent from uipath.runtime.schema import UiPathRuntimeSchema logger = logging.getLogger(__name__) @@ -53,7 +55,9 @@ def _capture_frame_locals(frame: FrameType) -> dict[str, Any]: if isinstance(value, (bool, int, float, str, type(None))): snapshot[name] = value elif isinstance(value, (dict, list, tuple)): - json.dumps(value, default=str) # serialisability probe + # Strict probe — no default=str, so nested non-serialisable + # objects (code, frame, etc.) correctly fail here. + json.dumps(value) snapshot[name] = value else: snapshot[name] = repr(value) @@ -96,8 +100,18 @@ def __init__( project_dir: str, breakpoints: list[str] | Literal["*"], entrypoint_path: str | None = None, + state_tracked_functions: dict[str, set[str]] | None = None, ) -> None: - """Initialize the controller with project directory, breakpoints, and optional entrypoint path.""" + """Initialize the controller. + + Parameters + ---------- + state_tracked_functions: + If provided, state events are emitted *only* for function calls + whose ``(abs_file_path, func_name)`` appears in this mapping + (``{abs_path: {func_name, …}, …}``). When ``None``, no state + events are emitted. + """ self._project_dir = project_dir self._entrypoint_path = ( os.path.abspath(entrypoint_path) if entrypoint_path else None @@ -107,11 +121,17 @@ def __init__( if isinstance(breakpoints, list): self._parse_breakpoints(breakpoints) + self._state_tracked: dict[str, set[str]] | None = state_tracked_functions self._events: queue.Queue[tuple[str, Any]] = queue.Queue() self._resume_event = threading.Event() self._stopped = False self._thread: threading.Thread | None = None self._abspath_cache: dict[str, str] = {} + # Track which breakpoints already fired per frame so that + # multiline expressions (where the bytecodes bounce back to the + # call line after evaluating arguments on deeper lines) don't + # trigger the same breakpoint twice within one function call. + self._hit_lines: dict[int, set[int]] = {} # frame-id → {lines} # Breakpoint management @@ -160,8 +180,19 @@ def _abspath(self, path: str) -> str: return result def _is_project_file(self, abspath: str) -> bool: - """Return *True* for files under the project directory that are not vendored.""" - return abspath.startswith(self._project_dir) and "site-packages" not in abspath + """Return *True* for real .py files under the project directory.""" + return ( + abspath.endswith(".py") + and abspath.startswith(self._project_dir) + and "site-packages" not in abspath + ) + + def _is_tracked_function(self, abspath: str, func_name: str) -> bool: + """Return *True* if this function should produce a state event.""" + if self._state_tracked is None: + return False + funcs = self._state_tracked.get(abspath) + return funcs is not None and func_name in funcs def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Any: """sys.settrace callback — dispatched for every frame event.""" @@ -169,27 +200,68 @@ def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Any: return None try: - filepath = self._abspath(frame.f_code.co_filename) + co_filename = frame.f_code.co_filename + # Fast reject: frozen/built-in modules never have a dot-py path + if co_filename.startswith("<"): + return None + + filepath = self._abspath(co_filename) if event == "call": + is_project = self._is_project_file(filepath) + + # Emit state event only for tracked graph-node functions + if is_project and self._is_tracked_function( + filepath, frame.f_code.co_name + ): + self._events.put( + ( + "state", + { + "file": filepath, + "line": frame.f_lineno, + "function": frame.f_code.co_name, + "locals": _capture_frame_locals(frame), + }, + ) + ) + + # Reset per-frame hit tracking so each call starts fresh. + self._hit_lines[id(frame)] = set() + # Decide whether to trace *into* this function's frame. if self._step_mode: - return ( - self._trace_callback - if self._is_project_file(filepath) - else None - ) - return ( - self._trace_callback if filepath in self._file_breakpoints else None - ) + return self._trace_callback if is_project else None + if filepath in self._file_breakpoints: + return self._trace_callback + # Also trace project files that contain tracked functions + if is_project and filepath in (self._state_tracked or {}): + return self._trace_callback + return None if event == "line": + # Skip module-level lines (imports, class/function defs). + # These fire during module loading, not user code execution. + if frame.f_code.co_name == "": + return self._trace_callback + lineno = frame.f_lineno should_break = ( self._step_mode and self._is_project_file(filepath) ) or (lineno in self._file_breakpoints.get(filepath, ())) if should_break: + # Deduplicate: multiline expressions (e.g. + # ``return Foo(arg=bar(...))``) cause the bytecode to + # bounce back to the call-site line after evaluating + # arguments on deeper lines. Without this guard the + # same breakpoint would fire twice per call. + frame_hits = self._hit_lines.get(id(frame)) + if frame_hits is not None and lineno in frame_hits: + return self._trace_callback + if frame_hits is not None: + frame_hits.add(lineno) + self._events.put( ( "breakpoint", @@ -208,6 +280,10 @@ def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Any: if self._stopped: return None + elif event == "return": + # Clean up per-frame tracking when the frame exits. + self._hit_lines.pop(id(frame), None) + return self._trace_callback except Exception: @@ -222,10 +298,16 @@ def start( input: dict[str, Any] | None, options: UiPathExecuteOptions | None, ) -> None: - """Launch delegate.execute() in a traced daemon thread.""" + """Launch delegate.execute() in a traced daemon thread. + + Copies the caller's contextvars (including OTEL span context) so + that ``@traced`` decorators in user code produce spans that are + properly linked to the parent trace. + """ + ctx = contextvars.copy_context() self._thread = threading.Thread( - target=self._run, - args=(delegate, input, options), + target=ctx.run, + args=(self._run, delegate, input, options), daemon=True, ) self._thread.start() @@ -274,12 +356,15 @@ class UiPathDebugFunctionsRuntime: Follows the same composition pattern as UiPathDebugRuntime: wraps a UiPathRuntimeProtocol delegate and intercepts stream() to inject breakpoint behaviour. - When no breakpoints are active every call delegates transparently. When breakpoints **are** present the delegate's execute() runs in a background thread with sys.settrace enabled. The trace callback pauses the thread at matching lines and this runtime yields UiPathBreakpointResult events with captured local variables. + Additionally emits ``UiPathRuntimeStateEvent`` for every function call + that appears in the entrypoint's call graph, so the debug bridge can + visualise execution flow through the graph nodes. + Works for both sync and async user functions — async functions run in a dedicated asyncio event loop on the background thread. @@ -290,16 +375,21 @@ class UiPathDebugFunctionsRuntime: entrypoint_path: Absolute or relative path to the user's entrypoint file. Used to resolve bare line-number breakpoints (e.g. "42"). + function_name: + Name of the entrypoint function. Used together with + *entrypoint_path* to build the call graph for state events. """ def __init__( self, delegate: UiPathRuntimeProtocol, entrypoint_path: str | None = None, + function_name: str | None = None, ) -> None: - """Initialize the debug wrapper with a delegate runtime and optional entrypoint path.""" + """Initialize the debug wrapper.""" self.delegate = delegate self._entrypoint_path = entrypoint_path + self._function_name = function_name self._controller: BreakpointController | None = None async def execute( @@ -317,6 +407,9 @@ async def stream( ) -> AsyncGenerator[UiPathRuntimeEvent, None]: """Stream execution events with line-level breakpoint support. + Emits ``UiPathRuntimeStateEvent`` for every call-graph function + entry so the debug bridge can visualise execution flow. + Breakpoint formats (via options.breakpoints): * "42" — line 42 in the entrypoint file @@ -330,21 +423,28 @@ async def stream( self._controller.update_breakpoints(breakpoints) self._controller.resume() - event_type, data = await asyncio.to_thread(self._controller.wait_for_event) - yield self._to_runtime_event(event_type, data) + async for event in self._drain_events(): + yield event return - # No breakpoints, transparent delegation - if not breakpoints: + # Build the set of tracked functions from the call graph so we + # can emit state events even without breakpoints. + tracked = self._build_tracked_functions() + + # Nothing to trace → transparent delegation. The controller + # path runs delegate.execute() in a background thread with a + # new asyncio event loop, so we only use it when there is + # something to observe (breakpoints and/or state tracking). + if not breakpoints and not tracked: async for event in self.delegate.stream(input, options): yield event return - # First execution with breakpoints controller = BreakpointController( project_dir=str(Path.cwd()), - breakpoints=breakpoints, + breakpoints=breakpoints if breakpoints else [], entrypoint_path=self._entrypoint_path, + state_tracked_functions=tracked, ) self._controller = controller @@ -355,8 +455,8 @@ async def stream( ) controller.start(self.delegate, input, delegate_options) - event_type, data = await asyncio.to_thread(controller.wait_for_event) - yield self._to_runtime_event(event_type, data) + async for event in self._drain_events(): + yield event async def get_schema(self) -> UiPathRuntimeSchema: """Pass-through to delegate.""" @@ -369,6 +469,55 @@ async def dispose(self) -> None: self._controller = None await self.delegate.dispose() + def _build_tracked_functions(self) -> dict[str, set[str]] | None: + """Build a mapping of abs_file → {func_names} from the call graph. + + Returns None when the graph cannot be built (missing path / name). + """ + if not self._entrypoint_path or not self._function_name: + return None + + try: + from .graph_builder import build_call_graph + + graph = build_call_graph( + self._entrypoint_path, + self._function_name, + project_dir=str(Path.cwd()), + ) + + tracked: dict[str, set[str]] = {} + for node in graph.nodes: + file_rel = (node.metadata or {}).get("file") + if not file_rel: + continue + abs_path = os.path.abspath(file_rel) + tracked.setdefault(abs_path, set()).add(node.name) + + return tracked if tracked else None + except Exception: + logger.debug("Failed to build call graph for state tracking", exc_info=True) + return None + + async def _drain_events(self) -> AsyncGenerator[UiPathRuntimeEvent, None]: + """Drain events from the controller, yielding state events and stopping at a terminal event.""" + while self._controller is not None: + event_type, data = await asyncio.to_thread(self._controller.wait_for_event) + if event_type == "state": + yield self._to_state_event(data) + else: + yield self._to_runtime_event(event_type, data) + return + + @staticmethod + def _to_state_event(data: dict[str, Any]) -> UiPathRuntimeStateEvent: + """Convert a trace state event into a UiPathRuntimeStateEvent.""" + return UiPathRuntimeStateEvent( + node_name=data["function"], + qualified_node_name=_format_location(data["file"], data["line"]), + payload=data["locals"], + ) + def _to_runtime_event(self, event_type: str, data: Any) -> UiPathRuntimeEvent: """Convert a BreakpointController event into a UiPathRuntimeEvent.""" if event_type == "breakpoint": diff --git a/src/uipath/functions/factory.py b/src/uipath/functions/factory.py index 17f8cf607..12a159986 100644 --- a/src/uipath/functions/factory.py +++ b/src/uipath/functions/factory.py @@ -102,4 +102,5 @@ def _create_runtime(self, entrypoint: str) -> UiPathRuntimeProtocol: return UiPathDebugFunctionsRuntime( delegate=inner, entrypoint_path=str(full_path), + function_name=function_name, ) diff --git a/src/uipath/functions/graph_builder.py b/src/uipath/functions/graph_builder.py new file mode 100644 index 000000000..14dceb024 --- /dev/null +++ b/src/uipath/functions/graph_builder.py @@ -0,0 +1,335 @@ +"""AST-based call graph builder for Python function runtimes. + +Parses user code starting from an entrypoint function and builds a +UiPathRuntimeGraph of function call relationships. Only follows calls +into local project files (skips external dependencies). + +Node IDs use the "file:line" format so they can double as breakpoint +locations for the debug runtime. +""" + +from __future__ import annotations + +import ast +import logging +import os +from pathlib import Path + +from uipath.runtime.schema import ( + UiPathRuntimeEdge, + UiPathRuntimeGraph, + UiPathRuntimeNode, +) + +logger = logging.getLogger(__name__) + +DEFAULT_MAX_DEPTH = 3 + + +def build_call_graph( + file_path: str, + function_name: str, + *, + project_dir: str | None = None, + max_depth: int = DEFAULT_MAX_DEPTH, +) -> UiPathRuntimeGraph: + """Build a call graph starting from *function_name* in *file_path*. + + Parameters + ---------- + file_path: + Absolute or relative path to the Python source file containing the + entrypoint function. + function_name: + Name of the entrypoint function inside *file_path*. + project_dir: + Root directory of the project. Only files under this directory are + followed. Defaults to the parent of *file_path*. + max_depth: + Maximum recursion depth for following function calls. + + UiPathRuntimeGraph + A graph with nodes (id="relative/path.py:line") and edges + representing call relationships. + """ + abs_file = os.path.abspath(file_path) + if project_dir is None: + project_dir = str(Path(abs_file).parent) + project_dir = os.path.abspath(project_dir) + + ctx = _BuildContext(project_dir=project_dir, max_depth=max_depth) + ctx.visit_function(abs_file, function_name, depth=0) + return UiPathRuntimeGraph(nodes=ctx.nodes, edges=ctx.edges) + + +class _BuildContext: + """Accumulates nodes and edges while walking the call graph.""" + + def __init__(self, project_dir: str, max_depth: int) -> None: + self.project_dir = project_dir + self.max_depth = max_depth + self.nodes: list[UiPathRuntimeNode] = [] + self.edges: list[UiPathRuntimeEdge] = [] + self._visited: set[str] = set() # node IDs already processed + self._ast_cache: dict[str, ast.Module] = {} + + def _parse_file(self, abs_path: str) -> ast.Module | None: + """Parse a Python file, returning the cached AST or None on failure.""" + if abs_path in self._ast_cache: + return self._ast_cache[abs_path] + try: + with open(abs_path, encoding="utf-8") as f: + tree = ast.parse(f.read(), filename=abs_path) + self._ast_cache[abs_path] = tree + return tree + except Exception: + logger.debug("Failed to parse %s", abs_path, exc_info=True) + return None + + def _relative_path(self, abs_path: str) -> str: + """Return a forward-slash relative path from the project dir.""" + try: + return str(Path(abs_path).relative_to(self.project_dir)).replace("\\", "/") + except ValueError: + return Path(abs_path).name + + def _node_id(self, abs_path: str, line: int) -> str: + return f"{self._relative_path(abs_path)}:{line}" + + def _is_project_file(self, abs_path: str) -> bool: + return abs_path.startswith(self.project_dir) and "site-packages" not in abs_path + + def _find_function_def( + self, tree: ast.Module, name: str + ) -> ast.FunctionDef | ast.AsyncFunctionDef | None: + """Find a top-level function definition by name.""" + for node in ast.iter_child_nodes(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + if node.name == name: + return node + return None + + def _resolve_imports( + self, tree: ast.Module, abs_file: str + ) -> dict[str, _ImportInfo]: + """Build a map of imported names → their source locations. + + Only resolves imports that point to local project files. + """ + result: dict[str, _ImportInfo] = {} + file_dir = os.path.dirname(abs_file) + + for node in ast.iter_child_nodes(tree): + if isinstance(node, ast.ImportFrom): + module_path = self._resolve_module_path( + node.module, node.level, file_dir + ) + if module_path is None or not self._is_project_file(module_path): + continue + for alias in node.names: + imported_name = alias.asname if alias.asname else alias.name + result[imported_name] = _ImportInfo( + abs_path=module_path, + original_name=alias.name, + ) + elif isinstance(node, ast.Import): + for alias in node.names: + module_path = self._resolve_module_path(alias.name, 0, file_dir) + if module_path is None or not self._is_project_file(module_path): + continue + local_name = alias.asname if alias.asname else alias.name + result[local_name] = _ImportInfo( + abs_path=module_path, + original_name=None, # module-level import + ) + return result + + def _resolve_module_path( + self, module: str | None, level: int, file_dir: str + ) -> str | None: + """Resolve a module name to an absolute file path, or None.""" + if module is None and level == 0: + return None + + if level > 0: + # Relative import: go up (level - 1) directories from file_dir + base = file_dir + for _ in range(level - 1): + base = os.path.dirname(base) + if module: + parts = module.split(".") + candidate = os.path.join(base, *parts) + else: + candidate = base + else: + # Absolute import: try from project dir + parts = module.split(".") # type: ignore[union-attr] + candidate = os.path.join(self.project_dir, *parts) + + # Check file.py then package/__init__.py + as_file = candidate + ".py" + if os.path.isfile(as_file): + return os.path.abspath(as_file) + + as_pkg = os.path.join(candidate, "__init__.py") + if os.path.isfile(as_pkg): + return os.path.abspath(as_pkg) + + return None + + def _collect_calls( + self, func_node: ast.FunctionDef | ast.AsyncFunctionDef + ) -> list[_CallSite]: + """Walk the function body and collect all function call sites.""" + calls: list[_CallSite] = [] + for node in ast.walk(func_node): + if not isinstance(node, ast.Call): + continue + info = self._extract_call_info(node) + if info is not None: + calls.append(info) + return calls + + def _extract_call_info(self, call_node: ast.Call) -> _CallSite | None: + """Extract the callable name and line from a Call AST node.""" + func = call_node.func + line = call_node.lineno + + if isinstance(func, ast.Name): + # Simple call: foo() + return _CallSite(name=func.id, attr=None, line=line) + elif isinstance(func, ast.Attribute): + # Attribute call: module.foo() or obj.method() + if isinstance(func.value, ast.Name): + return _CallSite(name=func.value.id, attr=func.attr, line=line) + return None + + @staticmethod + def _first_body_line( + func_def: ast.FunctionDef | ast.AsyncFunctionDef, + ) -> int: + """Return the line number of the first executable statement in the body. + + Skips leading docstrings so the resulting line sits *inside* the + function, not on the ``def`` line. This matters for breakpoints: + a ``def`` line is a module-level statement executed during import, + whereas the first body line only fires when the function is called. + """ + for stmt in func_def.body: + # Skip docstring (Expr wrapping a Constant string) + if ( + isinstance(stmt, ast.Expr) + and isinstance(stmt.value, ast.Constant) + and isinstance(stmt.value.value, str) + ): + continue + return stmt.lineno + # Fallback: function has only a docstring (or is empty) + return func_def.body[0].lineno if func_def.body else func_def.lineno + + def visit_function(self, abs_file: str, func_name: str, depth: int) -> str | None: + """Process a function: create its node and recurse into its calls. + + Returns the node ID if the function was found, otherwise None. + """ + tree = self._parse_file(abs_file) + if tree is None: + return None + + func_def = self._find_function_def(tree, func_name) + if func_def is None: + return None + + node_id = self._node_id(abs_file, self._first_body_line(func_def)) + + # Add node even if already visited (we need the ID for edges) + if node_id in self._visited: + return node_id + + self._visited.add(node_id) + self.nodes.append( + UiPathRuntimeNode( + id=node_id, + name=func_name, + type="function", + metadata={"file": self._relative_path(abs_file)}, + ) + ) + + if depth >= self.max_depth: + return node_id + + # Resolve imports and local definitions + imports = self._resolve_imports(tree, abs_file) + local_funcs = self._collect_local_function_names(tree) + calls = self._collect_calls(func_def) + + for call in calls: + target_id = self._resolve_and_visit_call( + call, abs_file, tree, imports, local_funcs, depth + ) + if target_id is not None: + self.edges.append(UiPathRuntimeEdge(source=node_id, target=target_id)) + + return node_id + + def _collect_local_function_names(self, tree: ast.Module) -> set[str]: + """Collect names of all top-level functions defined in a module.""" + names: set[str] = set() + for node in ast.iter_child_nodes(tree): + if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): + names.add(node.name) + return names + + def _resolve_and_visit_call( + self, + call: _CallSite, + caller_file: str, + caller_tree: ast.Module, + imports: dict[str, _ImportInfo], + local_funcs: set[str], + depth: int, + ) -> str | None: + """Resolve a call site to a target function and visit it. + + Returns the target node ID, or None if unresolvable / external. + """ + if call.attr is None: + # Simple call: foo() + if call.name in imports: + imp = imports[call.name] + if imp.original_name is not None: + return self.visit_function( + imp.abs_path, imp.original_name, depth + 1 + ) + if call.name in local_funcs: + return self.visit_function(caller_file, call.name, depth + 1) + else: + # Attribute call: module.foo() + if call.name in imports: + imp = imports[call.name] + if imp.original_name is None: + # Module-level import: import module → module.func() + return self.visit_function(imp.abs_path, call.attr, depth + 1) + return None + + +class _ImportInfo: + """Tracks where an imported name comes from.""" + + __slots__ = ("abs_path", "original_name") + + def __init__(self, abs_path: str, original_name: str | None) -> None: + self.abs_path = abs_path + self.original_name = original_name + + +class _CallSite: + """A function call found in the AST.""" + + __slots__ = ("name", "attr", "line") + + def __init__(self, name: str, attr: str | None, line: int) -> None: + self.name = name + self.attr = attr + self.line = line diff --git a/src/uipath/functions/runtime.py b/src/uipath/functions/runtime.py index 2bd91b12c..1d24ddaa9 100644 --- a/src/uipath/functions/runtime.py +++ b/src/uipath/functions/runtime.py @@ -24,6 +24,7 @@ ) from uipath.runtime.schema import UiPathRuntimeSchema, transform_attachments +from .graph_builder import build_call_graph from .schema_gen import get_type_schema from .type_conversion import ( convert_from_class, @@ -180,12 +181,30 @@ async def get_schema(self) -> UiPathRuntimeSchema: # Determine output schema raw_output_schema = get_type_schema(hints.get("return")) output_schema = transform_attachments(raw_output_schema) + + # Build call graph from AST + graph = None + try: + graph = build_call_graph( + str(self.file_path), + self.function_name, + project_dir=str(self.file_path.parent), + ) + except Exception: + logger.debug( + "Failed to build call graph for %s:%s", + self.file_path, + self.function_name, + exc_info=True, + ) + return UiPathRuntimeSchema( filePath=self.entrypoint_name, uniqueId=str(uuid.uuid4()), type="agent", input=input_schema, output=output_schema, + graph=graph, ) async def dispose(self) -> None: diff --git a/tests/functions/test_debug_breakpoints.py b/tests/functions/test_debug_breakpoints.py index 0806a5431..1966806fb 100644 --- a/tests/functions/test_debug_breakpoints.py +++ b/tests/functions/test_debug_breakpoints.py @@ -14,6 +14,7 @@ from __future__ import annotations import asyncio +import os from pathlib import Path from typing import Any, Literal @@ -103,7 +104,9 @@ def _build_stack( ) -> tuple[UiPathDebugRuntime, MockDebugBridge]: """Build the full debug stack and return (runtime, bridge).""" inner = UiPathFunctionsRuntime(str(script), func_name, script.name) - debug_fn = UiPathDebugFunctionsRuntime(inner, entrypoint_path=str(script)) + debug_fn = UiPathDebugFunctionsRuntime( + inner, entrypoint_path=str(script), function_name=func_name + ) bridge = MockDebugBridge(breakpoints=breakpoints) runtime = UiPathDebugRuntime(delegate=debug_fn, debug_bridge=bridge) return runtime, bridge @@ -113,6 +116,7 @@ def _build_stack( def script_dir(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) -> Path: """Temp directory used as cwd so BreakpointController treats scripts as project files.""" monkeypatch.chdir(tmp_path) + monkeypatch.syspath_prepend(tmp_path) return tmp_path @@ -457,3 +461,274 @@ def test_is_project_file(self, script_dir: Path): assert ctrl._is_project_file(str(script_dir / "main.py")) assert not ctrl._is_project_file(str(script_dir / "site-packages" / "lib.py")) assert not ctrl._is_project_file("/some/other/path/foo.py") + + def test_is_project_file_rejects_frozen_modules(self, script_dir: Path): + """Frozen/built-in module paths must not pass the project-file check.""" + ctrl = BreakpointController( + project_dir=str(script_dir), + breakpoints=[], + ) + # os.path.abspath("") resolves under cwd + frozen_resolved = os.path.abspath("") + assert not ctrl._is_project_file(frozen_resolved) + + +class TestStateEvents: + """State events should fire only for call-graph functions.""" + + async def test_state_events_emitted_for_graph_functions(self, script_dir: Path): + """State events fire for the entrypoint and functions it calls.""" + _write_script( + script_dir, + "helpers.py", + "def helper(n):\n return n * 2\n", + ) + script = _write_script( + script_dir, + "main.py", + "from helpers import helper\n" + "\n" + "def main(input):\n" + ' val = input.get("n", 5)\n' + " result = helper(val)\n" # line 5 + ' return {"result": result}\n', + ) + # Use step mode so the controller path is active and state events fire + runtime, bridge = _build_stack(script, breakpoints="*") + + try: + result = await runtime.execute({"n": 3}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"result": 6} + + # State events should include main and helper + state_names = [s.node_name for s in bridge.state_updates] + assert "main" in state_names + assert "helper" in state_names + finally: + await runtime.dispose() + + async def test_state_events_not_emitted_for_external_functions( + self, script_dir: Path + ): + """Functions from external modules (json, os, etc.) should NOT produce state events.""" + script = _write_script( + script_dir, + "main.py", + "import json\n" + "\n" + "def main(input):\n" + ' data = json.dumps({"hello": "world"})\n' # line 4 + ' return {"data": data}\n', + ) + # Step mode to activate the controller (and state events) + runtime, bridge = _build_stack(script, breakpoints="*") + + try: + result = await runtime.execute({}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + + state_names = [s.node_name for s in bridge.state_updates] + # Only main — json.dumps is external + assert state_names == ["main"] + finally: + await runtime.dispose() + + async def test_state_events_carry_locals(self, script_dir: Path): + """State event payload should contain the function's arguments.""" + script = _write_script( + script_dir, + "main.py", + "def helper(x, y):\n" + " return x + y\n" + "\n" + "def main(input):\n" + " return helper(1, 2)\n", # line 5 + ) + # Use a breakpoint to activate the controller path + runtime, bridge = _build_stack(script, breakpoints=["5"]) + + try: + await runtime.execute({}) + + helper_states = [s for s in bridge.state_updates if s.node_name == "helper"] + assert len(helper_states) == 1 + # At call time, x and y should be in the payload + assert helper_states[0].payload["x"] == 1 + assert helper_states[0].payload["y"] == 2 + finally: + await runtime.dispose() + + async def test_state_events_with_breakpoints(self, script_dir: Path): + """State events and breakpoints work together.""" + script = _write_script( + script_dir, + "main.py", + "def helper():\n" + " return 42\n" + "\n" + "def main(input):\n" + " x = helper()\n" # line 5 + ' return {"x": x}\n', + ) + runtime, bridge = _build_stack(script, breakpoints=["5"]) + + try: + result = await runtime.execute({}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"x": 42} + + # Should have both state updates and breakpoint hits + assert len(bridge.breakpoint_hits) == 1 + state_names = [s.node_name for s in bridge.state_updates] + assert "main" in state_names + finally: + await runtime.dispose() + + async def test_no_state_events_without_function_name(self, script_dir: Path): + """When function_name is not provided, no state events fire (even with breakpoints).""" + script = _write_script( + script_dir, + "main.py", + 'def main(input):\n x = 1\n return {"ok": True}\n', + ) + inner = UiPathFunctionsRuntime(str(script), "main", script.name) + # No function_name → no graph → no state events + debug_fn = UiPathDebugFunctionsRuntime(inner, entrypoint_path=str(script)) + bridge = MockDebugBridge(breakpoints=["2"]) + runtime = UiPathDebugRuntime(delegate=debug_fn, debug_bridge=bridge) + + try: + result = await runtime.execute({}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert len(bridge.breakpoint_hits) == 1 + assert len(bridge.state_updates) == 0 + finally: + await runtime.dispose() + + async def test_state_events_emitted_without_breakpoints(self, script_dir: Path): + """State events fire even without breakpoints when the call graph exists.""" + script = _write_script( + script_dir, + "main.py", + "def helper():\n" + " return 42\n" + "\n" + "def main(input):\n" + " x = helper()\n" + ' return {"x": x}\n', + ) + runtime, bridge = _build_stack(script, breakpoints=[]) + + try: + result = await runtime.execute({}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"x": 42} + + # State events fire even with no breakpoints + state_names = [s.node_name for s in bridge.state_updates] + assert "main" in state_names + assert "helper" in state_names + # No breakpoints should have been hit + assert len(bridge.breakpoint_hits) == 0 + finally: + await runtime.dispose() + + async def test_multiline_expression_breakpoint_hits_once(self, script_dir: Path): + """Breakpoint on a multiline call expression should fire exactly once. + + Python's bytecode bounces back to the call-site line after + evaluating nested arguments on deeper lines, e.g.:: + + return Wrapper( # line 5 — LOAD_GLOBAL + CALL + result=choice( # line 6 + [1, 2, 3] # line 7 + ) # ← CALL choice → back to line 6 + ) # ← CALL Wrapper → back to line 5 + + Without deduplication the breakpoint on line 5 fires twice. + """ + script = _write_script( + script_dir, + "main.py", + "import random\n" # 1 + "\n" # 2 + "async def get_random():\n" # 3 + ' """Get a random value."""\n' # 4 + " return dict(\n" # 5 + " value=random.choice(\n" # 6 + " [1, 2, 3]\n" # 7 + " )\n" # 8 + " )\n" # 9 + "\n" # 10 + "async def main(input):\n" # 11 + " result = await get_random()\n" # 12 + ' return {"result": result}\n', # 13 + ) + # Breakpoint on line 5 — the first body line (multiline return) + runtime, bridge = _build_stack(script, breakpoints=["main.py:5"]) + + try: + result = await runtime.execute({}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + + # The breakpoint should fire exactly ONCE despite bytecode bouncing + bp_nodes = [h.breakpoint_node for h in bridge.breakpoint_hits] + assert len(bridge.breakpoint_hits) == 1, ( + f"Expected 1 breakpoint hit but got {len(bridge.breakpoint_hits)}: {bp_nodes}" + ) + finally: + await runtime.dispose() + + async def test_state_events_through_decorator_wrappers(self, script_dir: Path): + """State events fire for functions wrapped with functools.wraps decorators. + + Simulates @traced-style decorators where the wrapper (sync_wrapper/ + async_wrapper) lives in an external module. The trace callback should + still fire for the ORIGINAL function called from within the wrapper. + """ + script = _write_script( + script_dir, + "main.py", + "from functools import wraps\n" + "\n" + "def my_decorator(func):\n" + " @wraps(func)\n" + " def wrapper(*args, **kwargs):\n" + " return func(*args, **kwargs)\n" + " return wrapper\n" + "\n" + "@my_decorator\n" + "def track_operator(op):\n" + " pass\n" + "\n" + "@my_decorator\n" + "def apply_operator(op, a, b):\n" + " return a + b\n" + "\n" + "def main(input):\n" + ' op = input.get("op", "+")\n' + " track_operator(op)\n" + ' result = apply_operator(op, input.get("a", 1), input.get("b", 2))\n' + ' return {"result": result}\n', + ) + runtime, bridge = _build_stack(script, breakpoints="*") + + try: + result = await runtime.execute({"a": 3, "b": 4}) + + assert result.status == UiPathRuntimeStatus.SUCCESSFUL + assert result.output == {"result": 7} + + state_names = [s.node_name for s in bridge.state_updates] + assert "main" in state_names + assert "track_operator" in state_names + assert "apply_operator" in state_names + finally: + await runtime.dispose() diff --git a/tests/functions/test_graph_builder.py b/tests/functions/test_graph_builder.py new file mode 100644 index 000000000..00878598d --- /dev/null +++ b/tests/functions/test_graph_builder.py @@ -0,0 +1,290 @@ +"""Tests for the AST-based call graph builder.""" + +import textwrap + +import pytest + +from uipath.functions.graph_builder import build_call_graph + + +@pytest.fixture +def project_dir(tmp_path): + """Create a small multi-file project for testing.""" + # main.py — the entrypoint + (tmp_path / "main.py").write_text( + textwrap.dedent("""\ + from helpers import process_data + from utils import format_output + + def main(input): + result = process_data(input) + return format_output(result) + """) + ) + + # helpers.py — calls into a deeper utility + (tmp_path / "helpers.py").write_text( + textwrap.dedent("""\ + from deep import transform + + def process_data(data): + return transform(data) + + def unused_function(): + pass + """) + ) + + # utils.py — leaf function + (tmp_path / "utils.py").write_text( + textwrap.dedent("""\ + def format_output(data): + return str(data) + """) + ) + + # deep.py — deeper than default depth + (tmp_path / "deep.py").write_text( + textwrap.dedent("""\ + def transform(data): + return data + """) + ) + + return tmp_path + + +def test_basic_graph_structure(project_dir): + """Entrypoint node and its direct callees are discovered.""" + graph = build_call_graph( + str(project_dir / "main.py"), + "main", + project_dir=str(project_dir), + max_depth=3, + ) + + node_names = {n.name for n in graph.nodes} + + # Should contain main, process_data, format_output, transform + assert "main" in node_names + assert "process_data" in node_names + assert "format_output" in node_names + assert "transform" in node_names + + # unused_function should NOT appear + assert "unused_function" not in node_names + + +def test_node_ids_are_file_line(project_dir): + """Node IDs must follow the file:line format for breakpoints.""" + graph = build_call_graph( + str(project_dir / "main.py"), + "main", + project_dir=str(project_dir), + ) + + for node in graph.nodes: + parts = node.id.rsplit(":", 1) + assert len(parts) == 2, f"Node ID '{node.id}' is not in file:line format" + assert parts[1].isdigit(), f"Line part of '{node.id}' is not a number" + + +def test_edges_connect_caller_to_callee(project_dir): + """Edges should go from caller to callee.""" + graph = build_call_graph( + str(project_dir / "main.py"), + "main", + project_dir=str(project_dir), + ) + + id_to_name = {n.id: n.name for n in graph.nodes} + edge_pairs = {(id_to_name[e.source], id_to_name[e.target]) for e in graph.edges} + + assert ("main", "process_data") in edge_pairs + assert ("main", "format_output") in edge_pairs + assert ("process_data", "transform") in edge_pairs + + +def test_max_depth_limits_recursion(project_dir): + """Setting max_depth=1 should only include the entrypoint and its direct calls.""" + graph = build_call_graph( + str(project_dir / "main.py"), + "main", + project_dir=str(project_dir), + max_depth=1, + ) + + node_names = {n.name for n in graph.nodes} + + assert "main" in node_names + assert "process_data" in node_names + assert "format_output" in node_names + # transform is 2 levels deep, should be excluded + assert "transform" not in node_names + + +def test_no_duplicates_on_repeated_calls(tmp_path): + """A function called multiple times should appear as one node.""" + (tmp_path / "main.py").write_text( + textwrap.dedent("""\ + def helper(): + pass + + def main(): + helper() + helper() + helper() + """) + ) + + graph = build_call_graph( + str(tmp_path / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + names = [n.name for n in graph.nodes] + assert names.count("helper") == 1 + # But there can be multiple edges (one per call site) + + +def test_local_function_calls(tmp_path): + """Functions defined in the same file are resolved.""" + (tmp_path / "main.py").write_text( + textwrap.dedent("""\ + def step_a(): + pass + + def step_b(): + step_a() + + def main(): + step_b() + """) + ) + + graph = build_call_graph( + str(tmp_path / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + node_names = {n.name for n in graph.nodes} + assert node_names == {"main", "step_b", "step_a"} + + +def test_async_functions(tmp_path): + """Async function definitions and await calls are handled.""" + (tmp_path / "main.py").write_text( + textwrap.dedent("""\ + async def helper(): + pass + + async def main(): + await helper() + """) + ) + + graph = build_call_graph( + str(tmp_path / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + node_names = {n.name for n in graph.nodes} + assert node_names == {"main", "helper"} + + +def test_external_calls_ignored(tmp_path): + """Calls to external/unknown functions produce no nodes.""" + (tmp_path / "main.py").write_text( + textwrap.dedent("""\ + import json + + def main(data): + return json.dumps(data) + """) + ) + + graph = build_call_graph( + str(tmp_path / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + assert len(graph.nodes) == 1 + assert graph.nodes[0].name == "main" + assert len(graph.edges) == 0 + + +def test_missing_function_returns_empty_graph(tmp_path): + """If the entrypoint function doesn't exist, return an empty graph.""" + (tmp_path / "main.py").write_text("def other(): pass\n") + + graph = build_call_graph( + str(tmp_path / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + assert len(graph.nodes) == 0 + assert len(graph.edges) == 0 + + +def test_relative_import(tmp_path): + """Relative imports (from .module import func) are resolved.""" + pkg = tmp_path / "pkg" + pkg.mkdir() + (pkg / "__init__.py").write_text("") + + (pkg / "main.py").write_text( + textwrap.dedent("""\ + from .helpers import do_work + + def main(): + do_work() + """) + ) + (pkg / "helpers.py").write_text( + textwrap.dedent("""\ + def do_work(): + pass + """) + ) + + graph = build_call_graph( + str(pkg / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + node_names = {n.name for n in graph.nodes} + assert "main" in node_names + assert "do_work" in node_names + + +def test_module_attribute_call(tmp_path): + """import module; module.func() pattern is resolved.""" + (tmp_path / "mymod.py").write_text( + textwrap.dedent("""\ + def compute(): + pass + """) + ) + (tmp_path / "main.py").write_text( + textwrap.dedent("""\ + import mymod + + def main(): + mymod.compute() + """) + ) + + graph = build_call_graph( + str(tmp_path / "main.py"), + "main", + project_dir=str(tmp_path), + ) + + node_names = {n.name for n in graph.nodes} + assert "compute" in node_names diff --git a/uv.lock b/uv.lock index 4eaac2caa..c8e09215d 100644 --- a/uv.lock +++ b/uv.lock @@ -2531,7 +2531,7 @@ wheels = [ [[package]] name = "uipath" -version = "2.8.28" +version = "2.8.29" source = { editable = "." } dependencies = [ { name = "applicationinsights" },