Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions libs/executors/garf/executors/entrypoints/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
# limitations under the License.

"""FastAPI endpoint for executing queries."""
from __future__ import annotations

import pathlib
from typing import Any, Optional, Union

import fastapi
import garf.core
import garf.executors
import garf.io
import pydantic
import smart_open
import typer
import uvicorn
import yaml
Expand Down Expand Up @@ -149,19 +152,21 @@ async def execute_batch_task(
@app.post('/api/execute:workflow')
def execute_workflow(
workflow_file: Optional[fastapi.UploadFile] = fastapi.File(None),
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
workflow_path: str | pathlib.Path = None,
config_file: Optional[fastapi.UploadFile] = fastapi.File(None),
config_path: str | pathlib.Path = None,
selected_aliases: Optional[list[str]] = None,
skipped_aliases: Optional[list[str]] = None,
) -> list[str]:
"""Runs garf workflow till completion."""
content = workflow_file.file.read()
workflow_data = yaml.safe_load(content.decode('utf-8'))
execution_workflow = workflow.Workflow(**workflow_data)
try:
execution_workflow = _init_workflow(
workflow_file, workflow_path, config_file, config_path
)
except workflow.GarfWorkflowError as e:
raise fastapi.HTTPException(404, detail=str(e))
return tasks.execute_workflow(
execution_workflow=execution_workflow.model_dump(),
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
selected_aliases=selected_aliases,
skipped_aliases=skipped_aliases,
)
Expand All @@ -172,20 +177,22 @@ def execute_workflow(
)
async def execute_workflow_task(
workflow_file: Optional[fastapi.UploadFile] = fastapi.File(None),
enable_cache: bool = False,
cache_ttl_seconds: int = 3600,
workflow_path: str | pathlib.Path = None,
config_file: Optional[fastapi.UploadFile] = fastapi.File(None),
config_path: str | pathlib.Path = None,
selected_aliases: Optional[list[str]] = None,
skipped_aliases: Optional[list[str]] = None,
) -> dict[str, str]:
"""Creates a single operation for running garf workflow."""
span = trace.get_current_span()
content = await workflow_file.read()
workflow_data = yaml.safe_load(content.decode('utf-8'))
execution_workflow = workflow.Workflow(**workflow_data)
try:
execution_workflow = _init_workflow(
workflow_file, workflow_path, config_file, config_path
)
except workflow.GarfWorkflowError as e:
raise fastapi.HTTPException(404, detail=str(e))
task = tasks.execute_workflow.delay(
execution_workflow=execution_workflow.model_dump(),
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
selected_aliases=selected_aliases,
skipped_aliases=skipped_aliases,
)
Expand Down Expand Up @@ -214,6 +221,41 @@ def cancel_operation(operation_id: str):
}


def _init_workflow(
workflow_file: Optional[fastapi.UploadFile] = fastapi.File(None),
workflow_path: str | pathlib.Path = None,
config_file: Optional[fastapi.UploadFile] = fastapi.File(None),
config_path: str | pathlib.Path = None,
):
if config_file:
content = config_file.file.read()
config_data = yaml.safe_load(content.decode('utf-8'))
elif config_path:
try:
with smart_open.open(config_path, 'r', encoding='utf-8') as f:
config_data = yaml.safe_load(f)
except FileNotFoundError as e:
raise workflow.GarfWorkflowError('Incorrect config path') from e

else:
config_data = None
if workflow_file:
content = workflow_file.file.read()
workflow_data = yaml.safe_load(content.decode('utf-8'))
elif workflow_path:
try:
with smart_open.open(workflow_path, 'r', encoding='utf-8') as f:
workflow_data = yaml.safe_load(f)
except FileNotFoundError as e:
raise workflow.GarfWorkflowError('Incorrect workflow path') from e
else:
raise workflow.GarfWorkflowError('Neither workflow path nor file provided')
return workflow.Workflow(
steps=workflow_data.get('steps'),
execution_config=config_data,
)


@typer_app.command()
def main(
host: Annotated[
Expand Down
9 changes: 1 addition & 8 deletions libs/executors/garf/executors/entrypoints/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,13 @@ def execute_batch(request: ApiExecutorBatchRequest):
@app.task(pydantic=True)
def execute_workflow(
execution_workflow: workflow.Workflow,
enable_cache: bool,
cache_ttl_seconds,
selected_aliases: list[str],
skipped_aliases: list[str],
):
"""Executes a batch of queries."""
return workflow_runner.WorkflowRunner(
execution_workflow=execution_workflow
).run(
enable_cache=enable_cache,
cache_ttl_seconds=cache_ttl_seconds,
selected_aliases=selected_aliases,
skipped_aliases=skipped_aliases,
)
).run(selected_aliases=selected_aliases, skipped_aliases=skipped_aliases)


@app.task(pydantic=True)
Expand Down
Loading