Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 176 additions & 8 deletions src/praisonai/praisonai/auto.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from typing import Any, Dict, List, Optional, Type, TypeVar
import os
import json
import asyncio
import yaml
import threading
from rich import print
Expand Down Expand Up @@ -423,12 +424,13 @@ def __init__(self, config_list: Optional[List[Dict]] = None):
}
]
self._openai_client = None # lazy, per-instance
self._openai_client_lock = threading.Lock()
self._async_openai_client = None # lazy, per-instance async client
self._client_lock = threading.Lock()

def _get_openai_client(self):
"""Get or create the OpenAI client for this instance."""
if self._openai_client is None:
with self._openai_client_lock:
with self._client_lock:
if self._openai_client is None:
try:
from openai import OpenAI
Expand All @@ -442,18 +444,43 @@ def _get_openai_client(self):
return self._openai_client

def close(self):
"""Close the OpenAI client if it exists."""
if not hasattr(self, '_openai_client_lock'):
"""Close the sync OpenAI client if it exists."""
if not hasattr(self, '_client_lock'):
return # Object was never fully initialized
with self._openai_client_lock:
with self._client_lock:
client = getattr(self, '_openai_client', None)
self._openai_client = None
if client is not None:
client.close()

def __del__(self):
"""Best-effort cleanup, but the canonical path is explicit close()."""

async def aclose(self):
"""Close both sync and async OpenAI clients if they exist."""
if not hasattr(self, '_client_lock'):
return # Object was never fully initialized
with self._client_lock:
sync_client = getattr(self, '_openai_client', None)
self._openai_client = None
async_client = getattr(self, '_async_openai_client', None)
self._async_openai_client = None
if sync_client is not None:
await asyncio.to_thread(sync_client.close)
if async_client is not None:
await async_client.close()

def __enter__(self):
return self

def __exit__(self, exc_type, exc, tb):
self.close()
return False

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc, tb):
await self.aclose()
return False


def _structured_completion(self, response_model: Type[T], messages: List[Dict], **kwargs) -> T:
"""
Expand Down Expand Up @@ -505,6 +532,68 @@ def _structured_completion(self, response_model: Type[T], messages: List[Dict],
"Install with: pip install litellm OR pip install openai"
)

async def _astructured_completion(self, response_model: Type[T], messages: List[Dict], **kwargs) -> T:
"""
Make an async structured LLM completion with provider fallback.

Priority:
1. LiteLLM async (if available) - supports 100+ LLM providers
2. OpenAI AsyncSDK (fallback) - uses beta.chat.completions.parse

Args:
response_model: Pydantic model class for structured output
messages: List of message dicts for the LLM
**kwargs: Additional arguments passed to the LLM

Returns:
Instance of response_model with parsed response

Raises:
ImportError: If neither litellm nor openai is installed
"""
model_name = self.config_list[0]['model']

# Try LiteLLM async first (preferred - supports 100+ providers)
if _check_litellm_available():
litellm = _get_litellm()
response = await litellm.acompletion(
model=model_name,
messages=messages,
response_format=response_model,
**kwargs
)
content = response.choices[0].message.content
return response_model.model_validate_json(content)

# Fallback to OpenAI AsyncSDK (uses beta.chat.completions.parse)
if _check_openai_available():
if self._async_openai_client is None:
with self._client_lock:
if self._async_openai_client is None:
try:
from openai import AsyncOpenAI
except ImportError as e:
raise ImportError("Install with: pip install openai") from e
cfg = self.config_list[0]
self._async_openai_client = AsyncOpenAI(
api_key=cfg.get("api_key") or os.environ.get("OPENAI_API_KEY"),
base_url=cfg.get("base_url"),
)

Comment on lines +569 to +582

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Sync threading.Lock blocks the event loop under contention in async context

self._client_lock is a threading.Lock. Inside an async method, if two coroutines concurrently pass the outer None check and then race to with self._client_lock:, the second coroutine blocks the entire event loop thread until the lock is released. A dedicated asyncio.Lock for the async client would be the idiomatic fix.

response = await self._async_openai_client.beta.chat.completions.parse(
model=model_name,
messages=messages,
response_format=response_model,
**kwargs
)
return response.choices[0].message.parsed

# Neither available - raise helpful error
raise ImportError(
"Structured output requires either litellm or openai. "
"Install with: pip install litellm OR pip install openai"
)

@staticmethod
def get_available_tools() -> List[str]:
"""Return list of available tools for agent assignment."""
Expand Down Expand Up @@ -708,6 +797,36 @@ def generate(self, merge=False):
self.convert_and_save(json_data, merge=merge)
full_path = os.path.abspath(self.agent_file)
return full_path

async def agenerate(self, merge=False):
"""
Async version of generate() - generates a team structure for the specified topic.

Args:
merge (bool): Whether to merge with existing agents.yaml file instead of overwriting.

Returns:
str: The full path of the YAML file containing the generated team structure.

Raises:
Exception: If the generation process fails.

Usage:
async with AutoGenerator(framework="crewai", topic="Create a movie script about Cat in Mars") as gen:
path = await gen.agenerate()
print(path)
"""
response = await self._astructured_completion(
response_model=TeamStructure,
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output complex team structures."},
{"role": "user", "content": self.get_user_content()}
]
)
json_data = json.loads(response.model_dump_json())
self.convert_and_save(json_data, merge=merge)
full_path = os.path.abspath(self.agent_file)
return full_path

def convert_and_save(self, json_data, merge=False):
"""Converts the provided JSON data into the desired YAML format and saves it to a file.
Expand Down Expand Up @@ -1121,6 +1240,32 @@ def generate(self, pattern: str = "sequential", merge: bool = False) -> str:
return self._save_workflow(self.merge_with_existing_workflow(json_data), pattern)
return self._save_workflow(json_data, pattern)

async def agenerate(self, pattern: str = "sequential", merge: bool = False) -> str:
"""
Async version of generate() - Generate a workflow YAML file.

Args:
pattern: Workflow pattern - "sequential", "routing", "parallel", "loop",
"orchestrator-workers", "evaluator-optimizer"
merge: If True, merge with existing workflow file instead of overwriting

Returns:
Path to the generated workflow file
"""
response = await self._astructured_completion(
response_model=WorkflowStructure,
messages=[
{"role": "system", "content": "You are a helpful assistant that designs workflow structures."},
{"role": "user", "content": self._get_prompt(pattern)}
]
)

json_data = json.loads(response.model_dump_json())

if merge and os.path.exists(self.workflow_file):
return self._save_workflow(self.merge_with_existing_workflow(json_data), pattern)
return self._save_workflow(json_data, pattern)

def merge_with_existing_workflow(self, new_data: Dict) -> Dict:
"""
Merge new workflow data with existing workflow file.
Expand Down Expand Up @@ -1421,6 +1566,29 @@ def generate(self, include_judge: bool = True, include_approve: bool = False) ->

return self._save_workflow(response)

async def agenerate(self, include_judge: bool = True, include_approve: bool = False) -> str:
"""
Async version of generate() - Generate a job workflow YAML file.

Args:
include_judge: Include a judge step for quality gating
include_approve: Include an approve step for human approval

Returns:
Path to the generated workflow file
"""
prompt = self._get_prompt(include_judge, include_approve)

response = await self._astructured_completion(
response_model=JobWorkflowStructure,
messages=[
{"role": "system", "content": "You are a helpful assistant that designs job workflow structures with AI agent steps."},
{"role": "user", "content": prompt}
]
)

return self._save_workflow(response)

def _get_prompt(self, include_judge: bool, include_approve: bool) -> str:
"""Generate the prompt for job workflow generation."""
tools_list = ", ".join(self.get_available_tools())
Expand Down
18 changes: 17 additions & 1 deletion src/praisonai/praisonai/tool_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ def __init__(
tools_py_path: Optional path to tools.py. If None, uses ./tools.py
registry: Optional ToolRegistry to include in resolution chain
"""
self._tools_py_path = tools_py_path or "tools.py"
from pathlib import Path
# Resolve path eagerly in constructor to make binding explicit and inspectable
self._tools_py_path = str(Path(tools_py_path or "tools.py").resolve())
self._local_tools_cache: Mapping[str, Callable] = MappingProxyType({})
self._local_tools_loaded: bool = False
self._praisonai_tools_available: Optional[bool] = None
Expand Down Expand Up @@ -577,3 +579,17 @@ def validate_yaml_tools(yaml_config: Dict[str, Any], resolver: Optional[ToolReso
List of missing tool names
"""
return (resolver or _get_default_resolver()).validate_yaml_tools(yaml_config)


def reset_default_resolver() -> None:
"""Clear the process-default resolver.

Call this between tenants, on CWD change, or in test setup to ensure
that local tools.py resolution is not affected by previous calls.

This follows the same pattern as _framework_availability.invalidate()
for resetting cached state.
"""
global _default_resolver
with _default_resolver_lock:
_default_resolver = None
11 changes: 6 additions & 5 deletions src/praisonai/praisonai/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -549,12 +549,13 @@ def prepare_modelfile_content(self):
"""

def create_and_push_ollama_model(self):
from ._ollama import create_and_push_ollama_model
modelfile_content = self.prepare_modelfile_content()
with open("Modelfile", "w") as file:
file.write(modelfile_content)
subprocess.run(["ollama", "serve"])
subprocess.run(["ollama", "create", f"{self.config['ollama_model']}:{self.config['model_parameters']}", "-f", "Modelfile"])
subprocess.run(["ollama", "push", f"{self.config['ollama_model']}:{self.config['model_parameters']}"])
create_and_push_ollama_model(
self.config['ollama_model'],
self.config['model_parameters'],
modelfile_content
)
Comment on lines 551 to +558

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P0 Relative import breaks when file is executed as script

train.py is invoked by the CLI as a subprocess script (python -u train.py train) via cli/main.py line 730. When run as __main__, relative imports raise ImportError: attempted relative import with no known parent package, making create_and_push_ollama_model() always fail. The Ollama blocking fix is completely unreachable through the primary CLI code path.

The correct approach for a script file is to use an absolute/path-based import, e.g. importlib.util.spec_from_file_location pointed at os.path.join(os.path.dirname(os.path.abspath(__file__)), "train", "_ollama.py"), or to factor the helper call into the train/__init__.py and adjust the CLI to import the module instead of executing it as a script.


def run(self):
self.print_system_info()
Expand Down
97 changes: 97 additions & 0 deletions src/praisonai/praisonai/train/_ollama.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
"""Shared Ollama daemon management utilities.

This module provides utilities to start and check Ollama daemon status,
fixing the blocking subprocess.run(["ollama", "serve"]) issue present
in multiple files.
"""
import contextlib
import shutil
import socket
import subprocess
import time
from typing import Optional


def _ollama_ready(host: str = "127.0.0.1", port: int = 11434, timeout: float = 0.2) -> bool:
"""Check if Ollama daemon is ready to accept connections.

Args:
host: Ollama host (default 127.0.0.1)
port: Ollama port (default 11434)
timeout: Connection timeout in seconds

Returns:
True if Ollama is ready, False otherwise
"""
with contextlib.suppress(OSError):
with socket.create_connection((host, port), timeout):
return True
return False


def ensure_ollama_running(max_wait_seconds: float = 5.0) -> Optional[subprocess.Popen]:
"""Ensure Ollama daemon is running, start it if necessary.

Args:
max_wait_seconds: Maximum time to wait for daemon to become ready

Returns:
Process object if we started the daemon, None if it was already running

Raises:
RuntimeError: If ollama CLI not found or daemon doesn't become ready
"""
# Check if already running
if _ollama_ready():
return None

# Check if ollama CLI is available
if shutil.which("ollama") is None:
raise RuntimeError("`ollama` CLI not found; install from https://ollama.com")

# Start daemon in detached mode
proc = subprocess.Popen(
["ollama", "serve"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
start_new_session=True, # Detach from parent
)

# Poll until ready or timeout
wait_interval = 0.1
max_polls = int(max_wait_seconds / wait_interval)

for _ in range(max_polls):
if _ollama_ready():
return proc
time.sleep(wait_interval)

# If we get here, daemon didn't become ready in time
proc.terminate()
raise RuntimeError(f"ollama serve did not become ready in {max_wait_seconds} seconds")


def create_and_push_ollama_model(ollama_model: str, model_parameters: str, modelfile_content: str) -> None:
"""Create and push an Ollama model with proper daemon management.

Args:
ollama_model: Name of the Ollama model
model_parameters: Model parameters/tag
modelfile_content: Content for the Modelfile

Raises:
RuntimeError: If ollama operations fail
subprocess.CalledProcessError: If create/push commands fail
"""
# Write Modelfile
with open("Modelfile", "w") as f:
f.write(modelfile_content)
Comment on lines +86 to +88

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Modelfile written to CWD may conflict in concurrent or multi-tenant scenarios.

Writing to a hardcoded "Modelfile" path in the current working directory can cause race conditions if multiple processes invoke this function concurrently. Consider using a temporary file with tempfile.NamedTemporaryFile or a unique filename.

🛠️ Proposed fix using tempfile
+import tempfile
+
 def create_and_push_ollama_model(ollama_model: str, model_parameters: str, modelfile_content: str) -> None:
     ...
-    # Write Modelfile
-    with open("Modelfile", "w") as f:
-        f.write(modelfile_content)
+    # Write Modelfile to a temporary file
+    with tempfile.NamedTemporaryFile(mode="w", suffix="_Modelfile", delete=False) as f:
+        f.write(modelfile_content)
+        modelfile_path = f.name
     
     # Ensure daemon is running
     ensure_ollama_running()
     
     # Create and push model
     tag = f"{ollama_model}:{model_parameters}"
     
-    subprocess.run(["ollama", "create", tag, "-f", "Modelfile"], check=True)
+    subprocess.run(["ollama", "create", tag, "-f", modelfile_path], check=True)
     subprocess.run(["ollama", "push", tag], check=True)
+    
+    # Cleanup
+    os.unlink(modelfile_path)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/praisonai/praisonai/train/_ollama.py` around lines 86 - 88, The code
currently writes the Modelfile to a hardcoded "Modelfile" in the CWD (writing
modelfile_content), which can race in concurrent/multi-tenant runs; change this
to create and write to a unique temporary file (e.g., use
tempfile.NamedTemporaryFile(delete=False) or tempfile.mkstemp), write
modelfile_content to that temp path, use that temp filename wherever "Modelfile"
was referenced, and ensure the temp file is cleaned up after use (remove it in a
finally block or context manager); update any references to the literal
"Modelfile" in this module (_ollama.py) to use the temp filename variable.


# Ensure daemon is running
ensure_ollama_running()

# Create and push model
tag = f"{ollama_model}:{model_parameters}"

subprocess.run(["ollama", "create", tag, "-f", "Modelfile"], check=True)
subprocess.run(["ollama", "push", tag], check=True)
Loading