diff --git a/src/praisonai/praisonai/agents_generator.py b/src/praisonai/praisonai/agents_generator.py index 4d086f77b..f11c27bef 100644 --- a/src/praisonai/praisonai/agents_generator.py +++ b/src/praisonai/praisonai/agents_generator.py @@ -26,15 +26,7 @@ # Compatibility imports - now handled by centralized detection # (inbuilt_tools still defines these but they're read-only compatibility) -# Import BaseTool for tools handling -BaseTool = None -try: - from praisonai_tools import BaseTool -except ImportError: - try: - from praisonai.tools import BaseTool - except ImportError: - pass +# BaseTool import is now handled centrally by ToolResolver # Check for additional framework availability using centralized detection from ._framework_availability import is_available @@ -346,9 +338,31 @@ def _merge_cli_config(self, config, cli_config): self.logger.debug(f"CLI override: lsp = {cli_config['lsp']}") # Handle agent-level overrides using unified approach - agent_level_fields = ['tool_timeout', 'planning_tools', 'autonomy'] + agent_level_fields = ['tool_timeout', 'planning_tools', 'autonomy', 'planning', 'web', 'web_fetch'] agent_overrides = {k: v for k, v in cli_config.items() if k in agent_level_fields} + # Handle handoff configuration - convert CLI flags into handoff dict + handoff_fields = ['handoff', 'handoff_policy', 'handoff_timeout', 'handoff_max_depth', 'handoff_max_concurrent', 'handoff_detect_cycles'] + if any(field in cli_config for field in handoff_fields): + handoff_config = {} + if 'handoff' in cli_config: + # Convert comma-separated roles to list + handoff_roles = [role.strip() for role in cli_config['handoff'].split(',') if role.strip()] + handoff_config['to'] = handoff_roles + if 'handoff_policy' in cli_config: + handoff_config['policy'] = cli_config['handoff_policy'] + if 'handoff_timeout' in cli_config: + handoff_config['timeout'] = cli_config['handoff_timeout'] + if 'handoff_max_depth' in cli_config: + handoff_config['max_depth'] = cli_config['handoff_max_depth'] + if 'handoff_max_concurrent' in cli_config: + handoff_config['max_concurrent'] = cli_config['handoff_max_concurrent'] + if 'handoff_detect_cycles' in cli_config: + handoff_config['detect_cycles'] = cli_config['handoff_detect_cycles'].lower() == 'true' + + if handoff_config: + agent_overrides['handoff'] = handoff_config + # Handle approval configuration using unified spec approval_fields = ['trust', 'approval', 'approve_all_tools', 'approval_timeout', 'approve_level'] if any(field in cli_config for field in approval_fields): @@ -399,7 +413,7 @@ def _validate_agents_config(self, config): 'max_execution_time', 'verbose', 'cache', 'system_template', 'prompt_template', 'response_template', 'tool_timeout', 'planning_tools', 'planning', 'autonomy', 'guardrails', 'streaming', 'stream', - 'approval', 'skills', 'cli_backend', 'reflection' + 'approval', 'skills', 'cli_backend', 'reflection', 'handoff', 'web', 'web_fetch' } for section_name in ('agents', 'roles'): @@ -456,24 +470,6 @@ def load_tools_from_module(self, module_path): return {} return {name: obj for name, obj in inspect.getmembers(module, self.is_function_or_decorated)} - def _extract_tool_classes(self, module): - """ - Extract tool classes from a loaded module that inherit from BaseTool - or are part of langchain_community.tools package. - """ - result = {} - for name, obj in inspect.getmembers(module, - lambda x: inspect.isclass(x) and ( - x.__module__.startswith('langchain_community.tools') or - (PRAISONAI_TOOLS_AVAILABLE and BaseTool and issubclass(x, BaseTool)) - ) and x is not BaseTool): - try: - result[name] = obj() - except Exception as e: - self.logger.warning(f"Error instantiating tool class {name}: {e}") - continue - return result - def load_tools_from_module_class(self, module_path): """ Load BaseTool / langchain tool classes from a user-supplied module (gated by PRAISONAI_ALLOW_LOCAL_TOOLS). @@ -482,7 +478,7 @@ def load_tools_from_module_class(self, module_path): module = load_user_module(module_path, name="tools_module") if module is None: return {} - return self._extract_tool_classes(module) + return self.tool_resolver._extract_tool_classes(module) def load_tools_from_package(self, package_path): """ @@ -612,12 +608,17 @@ def generate_crew_and_kickoff(self): except Exception as e: self.logger.warning(f"Error collecting YAML tool references: {e}") - # Add tools from class names + # Add tools from class names - use tool_resolver to check tool validity for tool_class in self.tools: - if isinstance(tool_class, type) and BaseTool and issubclass(tool_class, BaseTool): - tool_name = tool_class.__name__ - tools_dict[tool_name] = tool_class() - self.logger.debug(f"Added tool: {tool_name}") + if isinstance(tool_class, type): + try: + # Try to instantiate the tool to validate it + tool_instance = tool_class() + tool_name = tool_class.__name__ + tools_dict[tool_name] = tool_instance + self.logger.debug(f"Added tool: {tool_name}") + except Exception as e: + self.logger.warning(f"Failed to instantiate tool class {tool_class.__name__}: {e}") root_directory = os.getcwd() tools_py_path = os.path.join(root_directory, 'tools.py') @@ -628,13 +629,7 @@ def generate_crew_and_kickoff(self): if os.path.isfile(tools_py_path): self.logger.debug("tools.py exists in the root directory. Loading tools.py and skipping tools folder.") elif tools_dir_path.is_dir(): - from ._safe_loader import load_user_module - for py_file in tools_dir_path.glob("*.py"): - if py_file.name.startswith("__"): - continue - module = load_user_module(py_file, name=f"tools_{py_file.stem}") - if module is not None: - tools_dict.update(self._extract_tool_classes(module)) + tools_dict.update(self.tool_resolver.get_local_tool_classes_from_dir(tools_dir_path)) if tools_dict: self.logger.debug("tools folder exists in the root directory") diff --git a/src/praisonai/praisonai/cli/main.py b/src/praisonai/praisonai/cli/main.py index 596dc1df1..1d235db65 100644 --- a/src/praisonai/praisonai/cli/main.py +++ b/src/praisonai/praisonai/cli/main.py @@ -4117,6 +4117,16 @@ def _extract_cli_config_for_yaml(self): planning_tools = getattr(self.args, 'planning_tools', None) if planning_tools: cli_config['planning_tools'] = planning_tools + + # Extract --planning flag + if getattr(self.args, 'planning', False): + cli_config['planning'] = True + + # Extract web flags + if getattr(self.args, 'web', False): + cli_config['web'] = True + if getattr(self.args, 'web_fetch', False): + cli_config['web_fetch'] = True # Extract --acp flag if getattr(self.args, 'acp', False): @@ -4154,6 +4164,31 @@ def _extract_cli_config_for_yaml(self): cli_config['stream'] = stream or stream_metrics if stream_metrics: cli_config['stream_metrics'] = True + + # Extract handoff configuration for YAML CLI parity + handoff = getattr(self.args, 'handoff', None) + if handoff: + cli_config['handoff'] = handoff + + handoff_policy = getattr(self.args, 'handoff_policy', None) + if handoff_policy is not None: + cli_config['handoff_policy'] = handoff_policy + + handoff_timeout = getattr(self.args, 'handoff_timeout', None) + if handoff_timeout is not None: + cli_config['handoff_timeout'] = handoff_timeout + + handoff_max_depth = getattr(self.args, 'handoff_max_depth', None) + if handoff_max_depth is not None: + cli_config['handoff_max_depth'] = handoff_max_depth + + handoff_max_concurrent = getattr(self.args, 'handoff_max_concurrent', None) + if handoff_max_concurrent is not None: + cli_config['handoff_max_concurrent'] = handoff_max_concurrent + + handoff_detect_cycles = getattr(self.args, 'handoff_detect_cycles', None) + if handoff_detect_cycles is not None: + cli_config['handoff_detect_cycles'] = handoff_detect_cycles return cli_config diff --git a/src/praisonai/praisonai/scheduler/agent_scheduler.py b/src/praisonai/praisonai/scheduler/agent_scheduler.py index 0abb41c6f..ade4882bc 100644 --- a/src/praisonai/praisonai/scheduler/agent_scheduler.py +++ b/src/praisonai/praisonai/scheduler/agent_scheduler.py @@ -9,6 +9,7 @@ import logging from datetime import datetime from typing import Optional, Dict, Any, Callable +from concurrent.futures import ThreadPoolExecutor, TimeoutError as FuturesTimeout from .base import ScheduleParser, PraisonAgentExecutor from .shared import backoff_delay @@ -73,6 +74,7 @@ def __init__( self._failure_count = 0 self._total_cost = 0.0 self._start_time = None + self._stats_lock = threading.Lock() def start( self, @@ -160,17 +162,18 @@ def get_stats(self) -> Dict[str, Any]: Returns: Dictionary with execution stats including cost """ - runtime = (datetime.now() - self._start_time).total_seconds() if self._start_time else 0 - return { - "is_running": self.is_running, - "total_executions": self._execution_count, - "successful_executions": self._success_count, - "failed_executions": self._failure_count, - "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0, - "total_cost_usd": round(self._total_cost, 4), - "runtime_seconds": round(runtime, 1), - "cost_per_execution": round(self._total_cost / self._execution_count, 4) if self._execution_count > 0 else 0 - } + with self._stats_lock: + runtime = (datetime.now() - self._start_time).total_seconds() if self._start_time else 0 + return { + "is_running": self.is_running, + "total_executions": self._execution_count, + "successful_executions": self._success_count, + "failed_executions": self._failure_count, + "success_rate": (self._success_count / self._execution_count * 100) if self._execution_count > 0 else 0, + "total_cost_usd": round(self._total_cost, 4), + "runtime_seconds": round(runtime, 1), + "cost_per_execution": round(self._total_cost / self._execution_count, 4) if self._execution_count > 0 else 0 + } def _update_state_if_daemon(self): """Update state file with execution stats if running as daemon.""" @@ -232,7 +235,8 @@ def _run_schedule(self, interval: int, max_retries: int): def _execute_with_retry(self, max_retries: int): """Execute agent with retry logic and timeout.""" - self._execution_count += 1 + with self._stats_lock: + self._execution_count += 1 success = False result = None @@ -242,21 +246,16 @@ def _execute_with_retry(self, max_retries: int): # Execute with timeout if specified if self.timeout: - import signal - - def timeout_handler(signum, frame): - raise TimeoutError(f"Execution exceeded {self.timeout}s timeout") - - # Set timeout alarm (Unix only) + executor = ThreadPoolExecutor(max_workers=1) + future = executor.submit(self._executor.execute, self.task) try: - signal.signal(signal.SIGALRM, timeout_handler) - signal.alarm(self.timeout) - result = self._executor.execute(self.task) - signal.alarm(0) # Cancel alarm - except AttributeError: - # Windows doesn't support SIGALRM, use threading.Timer fallback - logger.warning("Timeout not supported on this platform, executing without timeout") - result = self._executor.execute(self.task) + result = future.result(timeout=self.timeout) + except FuturesTimeout as e: + future.cancel() + executor.shutdown(wait=False, cancel_futures=True) + raise TimeoutError(f"Execution exceeded {self.timeout}s timeout") from e + else: + executor.shutdown(wait=False, cancel_futures=True) else: result = self._executor.execute(self.task) @@ -268,10 +267,10 @@ def timeout_handler(signum, frame): # Estimate cost (rough: ~$0.0001 per execution for gpt-4o-mini) estimated_cost = 0.0001 # Base cost estimate - self._total_cost += estimated_cost + with self._stats_lock: + self._total_cost += estimated_cost + self._success_count += 1 logger.debug(f"Estimated cost this run: ${estimated_cost:.4f}, Total: ${self._total_cost:.4f}") - - self._success_count += 1 success = True if self.on_success: @@ -299,7 +298,8 @@ def timeout_handler(signum, frame): return if not success: - self._failure_count += 1 + with self._stats_lock: + self._failure_count += 1 logger.error(f"Agent execution failed after {max_retries} attempts") if self.on_failure: diff --git a/src/praisonai/praisonai/tool_resolver.py b/src/praisonai/praisonai/tool_resolver.py index 51239b03b..75140148b 100644 --- a/src/praisonai/praisonai/tool_resolver.py +++ b/src/praisonai/praisonai/tool_resolver.py @@ -454,37 +454,66 @@ def get_local_tool_classes(self) -> Dict[str, Any]: module = load_user_module(self._tools_py_path, name="tools_module") if module is None: return {} + return self._extract_tool_classes(module) + except Exception as e: + logger.warning(f"Error loading tool classes from {self._tools_py_path}: {e}") + return {} + + def get_local_tool_classes_from_dir(self, tools_dir: "os.PathLike|str") -> Dict[str, Any]: + """Load BaseTool/langchain classes from every *.py in a tools/ directory. + + Args: + tools_dir: Path to the tools directory - # Import the necessary classes (matching agents_generator.py logic) - BaseTool = None - PRAISONAI_TOOLS_AVAILABLE = False + Returns: + Dictionary mapping class names to instantiated tool objects + """ + from pathlib import Path + from ._safe_loader import load_user_module + + classes: Dict[str, Any] = {} + for py_file in Path(tools_dir).glob("*.py"): + if py_file.name.startswith("__"): + continue try: - from praisonai_tools import BaseTool + module = load_user_module(py_file, name=f"tools_{py_file.stem}") + if module is not None: + classes.update(self._extract_tool_classes(module)) + except Exception as e: + logger.warning(f"Error loading tool classes from file {py_file}: {e}") + return classes + + def _extract_tool_classes(self, module): + """Extract tool classes from a loaded module that inherit from BaseTool + or are part of langchain_community.tools package. + """ + # Import the necessary classes (matching agents_generator.py logic) + BaseTool = None + PRAISONAI_TOOLS_AVAILABLE = False + try: + from praisonai_tools import BaseTool + PRAISONAI_TOOLS_AVAILABLE = True + except ImportError: + try: + from praisonai.tools import BaseTool PRAISONAI_TOOLS_AVAILABLE = True except ImportError: - try: - from praisonai.tools import BaseTool - PRAISONAI_TOOLS_AVAILABLE = True - except ImportError: - pass - - result = {} - for name, obj in inspect.getmembers(module, - lambda x: inspect.isclass(x) and ( - x.__module__.startswith('langchain_community.tools') or - (PRAISONAI_TOOLS_AVAILABLE and BaseTool and issubclass(x, BaseTool)) - ) and x is not BaseTool): - try: - result[name] = obj() - logger.debug(f"Loaded local tool class: {name}") - except Exception as e: - logger.warning(f"Error instantiating tool class {name}: {e}") - continue - - return result - except Exception as e: - logger.warning(f"Error loading tool classes from {self._tools_py_path}: {e}") - return {} + pass + + result = {} + for name, obj in inspect.getmembers(module, + lambda x: inspect.isclass(x) and ( + x.__module__.startswith('langchain_community.tools') or + (PRAISONAI_TOOLS_AVAILABLE and BaseTool and issubclass(x, BaseTool)) + ) and x is not BaseTool): + try: + result[name] = obj() + logger.debug(f"Loaded tool class: {name}") + except Exception as e: + logger.warning(f"Error instantiating tool class {name}: {e}") + continue + + return result # Process-level lazy singleton for performance (matches profiler.py pattern) diff --git a/src/praisonai/tests/unit/scheduler/test_agent_scheduler.py b/src/praisonai/tests/unit/scheduler/test_agent_scheduler.py index 3d3d5bf80..6aac019eb 100644 --- a/src/praisonai/tests/unit/scheduler/test_agent_scheduler.py +++ b/src/praisonai/tests/unit/scheduler/test_agent_scheduler.py @@ -240,6 +240,29 @@ def test_execute_with_retry_stops_immediately_on_stop_event(self): # Should stop after first failure + backoff signal, not complete all retries assert mock_agent.start.call_count == 1 + def test_execute_with_retry_timeout_returns_quickly(self): + """Test timeout handling does not block until worker completion.""" + mock_agent = Mock() + + def slow_start(_task): + delay_event = threading.Event() + delay_event.wait(timeout=1.2) + return "late" + + mock_agent.start = Mock(side_effect=slow_start) + scheduler = AgentScheduler(mock_agent, "Test task", timeout=0.1) + scheduler._stop_event.wait = Mock(return_value=False) + + start = time.time() + scheduler._execute_with_retry(max_retries=1) + duration = time.time() - start + timing_buffer_seconds = 1.0 + max_expected_duration = scheduler.timeout + timing_buffer_seconds + + assert scheduler._failure_count == 1 + assert scheduler._success_count == 0 + assert duration < max_expected_duration + class TestAgentSchedulerCallbacks: """Test AgentScheduler callback functionality.""" diff --git a/src/praisonai/tests/unit/test_cli_features.py b/src/praisonai/tests/unit/test_cli_features.py index 7278ce548..3d0621cd0 100644 --- a/src/praisonai/tests/unit/test_cli_features.py +++ b/src/praisonai/tests/unit/test_cli_features.py @@ -459,6 +459,47 @@ def test_image_argument_parsed(self): with patch.object(sys, 'argv', ['praisonai', '--image', '/path/to/image.png']): PraisonAI() # Just verify no exception is raised + def test_extract_cli_config_includes_new_yaml_parity_flags(self): + """Test YAML CLI parity extraction includes planning/web/handoff flags.""" + from types import SimpleNamespace + from praisonai.cli import PraisonAI + + app = PraisonAI() + app.args = SimpleNamespace( + trust=False, + tool_timeout=None, + planning_tools=None, + planning=True, + web=True, + web_fetch=True, + acp=False, + lsp=False, + autonomy=None, + guardrail=None, + approval=None, + approve_all_tools=None, + approval_timeout=None, + stream=False, + stream_metrics=False, + handoff='writer,reviewer', + handoff_policy='summary', + handoff_timeout=12.0, + handoff_max_depth=4, + handoff_max_concurrent=2, + handoff_detect_cycles='false', + ) + cli_config = app._extract_cli_config_for_yaml() + + assert cli_config['planning'] is True + assert cli_config['web'] is True + assert cli_config['web_fetch'] is True + assert cli_config['handoff'] == 'writer,reviewer' + assert cli_config['handoff_policy'] == 'summary' + assert cli_config['handoff_timeout'] == 12.0 + assert cli_config['handoff_max_depth'] == 4 + assert cli_config['handoff_max_concurrent'] == 2 + assert cli_config['handoff_detect_cycles'] == 'false' + if __name__ == "__main__": pytest.main([__file__, "-v", "--tb=short", "-x"])