Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
49ee386
add support for on-prem
TarunRavikumar Oct 28, 2025
18f2761
clean up on-prem artificats
TarunRavikumar Oct 28, 2025
a1177cf
add back comments from initial code
TarunRavikumar Oct 28, 2025
f5e95f4
fix lint
TarunRavikumar Oct 28, 2025
8768e20
use ecr image repo:tag directly
TarunRavikumar Nov 12, 2025
3557681
fix: isort import ordering
TarunRavikumar Dec 11, 2025
81d9773
fix: remove unused infra_config import
TarunRavikumar Dec 11, 2025
6526a9d
fix: mypy type annotation errors
TarunRavikumar Dec 11, 2025
e26c53f
fix: remove type annotation causing mypy no-redef error
TarunRavikumar Dec 12, 2025
0f96e8a
fix: mypy type errors in s3_utils.py and io.py - use botocore.config.…
TarunRavikumar Dec 12, 2025
2918dda
fix: mypy typeddict-item errors - use broad type ignore
TarunRavikumar Dec 12, 2025
42a2809
fix: update test mocks to use get_s3_resource from s3_utils
TarunRavikumar Dec 12, 2025
d727e2f
test: add unit tests for s3_utils, onprem_docker_repository, and onpr…
TarunRavikumar Dec 12, 2025
a9968eb
style: format test files with black
TarunRavikumar Dec 12, 2025
e37c200
refactor: use filesystem_gateway abstraction for S3 operations
TarunRavikumar Dec 15, 2025
5ff9481
fix: deduplicate S3 client config by using centralized s3_utils
TarunRavikumar Dec 15, 2025
80b010d
fix: add pagination to list_objects to handle >1000 objects
TarunRavikumar Dec 15, 2025
ca79edc
fix: make OnPremDockerRepository.get_image_url consistent with ECR/ACR
TarunRavikumar Dec 15, 2025
38dbe9f
refactor: add explicit on-prem branches in dependencies.py for clarity
TarunRavikumar Dec 15, 2025
4eab08b
feat: implement Redis LLEN for queue depth in OnPremQueueEndpointReso…
TarunRavikumar Dec 15, 2025
ffafa1f
fix: replace mutable default argument with None in _get_client
TarunRavikumar Dec 15, 2025
8e27e18
refactor: extract inline import to module-level helper function
TarunRavikumar Dec 15, 2025
c7c16fd
fix: reduce excessive debug logging in s3_utils
TarunRavikumar Dec 15, 2025
47e9dcb
chore: remove unused TYPE_CHECKING import
TarunRavikumar Dec 15, 2025
af297cc
fix: make Dockerfile multi-arch compatible for ARM/AMD64
TarunRavikumar Dec 15, 2025
7750cd9
style: fix black formatting in test_onprem_queue_endpoint_resource_de…
TarunRavikumar Dec 15, 2025
8f8a4da
fix: restore AWS_PROFILE env var fallback in s3_utils
TarunRavikumar Dec 15, 2025
e4f9818
fix: correct isort ordering in s3_filesystem_gateway.py
TarunRavikumar Dec 15, 2025
161778c
fix: use Literal type for s3 addressing_style to satisfy mypy
TarunRavikumar Dec 15, 2025
b0b7066
Onprem Compatibility Change
charlesahn-scale Jan 24, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion charts/model-engine/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ env:
- name: ABS_CONTAINER_NAME
value: {{ .Values.azure.abs_container_name }}
{{- end }}
{{- if .Values.s3EndpointUrl }}
- name: S3_ENDPOINT_URL
value: {{ .Values.s3EndpointUrl | quote }}
{{- end }}
{{- end }}

{{- define "modelEngine.syncForwarderTemplateEnv" -}}
Expand Down Expand Up @@ -342,9 +346,27 @@ env:
value: "/workspace/model-engine/model_engine_server/core/configs/config.yaml"
{{- end }}
- name: CELERY_ELASTICACHE_ENABLED
value: "true"
value: {{ .Values.celeryElasticacheEnabled | default true | quote }}
- name: LAUNCH_SERVICE_TEMPLATE_FOLDER
value: "/workspace/model-engine/model_engine_server/infra/gateways/resources/templates"
{{- if .Values.s3EndpointUrl }}
- name: S3_ENDPOINT_URL
value: {{ .Values.s3EndpointUrl | quote }}
{{- end }}
{{- if .Values.redisHost }}
- name: REDIS_HOST
value: {{ .Values.redisHost | quote }}
- name: REDIS_PORT
value: {{ .Values.redisPort | default "6379" | quote }}
{{- end }}
{{- if .Values.celeryBrokerUrl }}
- name: CELERY_BROKER_URL
value: {{ .Values.celeryBrokerUrl | quote }}
{{- end }}
{{- if .Values.celeryResultBackend }}
- name: CELERY_RESULT_BACKEND
value: {{ .Values.celeryResultBackend | quote }}
{{- end }}
{{- if .Values.redis.auth}}
- name: REDIS_AUTH_TOKEN
value: {{ .Values.redis.auth }}
Expand Down
19 changes: 13 additions & 6 deletions model-engine/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@ RUN apt-get update && apt-get install -y \
telnet \
&& rm -rf /var/lib/apt/lists/*

RUN curl -Lo /bin/aws-iam-authenticator https://github.com/kubernetes-sigs/aws-iam-authenticator/releases/download/v0.5.9/aws-iam-authenticator_0.5.9_linux_amd64
RUN chmod +x /bin/aws-iam-authenticator
# Install aws-iam-authenticator (architecture-aware)
RUN ARCH=$(dpkg --print-architecture) && \
if [ "$ARCH" = "arm64" ]; then \
curl -Lo /bin/aws-iam-authenticator https://github.com/kubernetes-sigs/aws-iam-authenticator/releases/download/v0.5.9/aws-iam-authenticator_0.5.9_linux_arm64; \
else \
curl -Lo /bin/aws-iam-authenticator https://github.com/kubernetes-sigs/aws-iam-authenticator/releases/download/v0.5.9/aws-iam-authenticator_0.5.9_linux_amd64; \
fi && \
chmod +x /bin/aws-iam-authenticator

# Install kubectl
RUN curl -LO "https://dl.k8s.io/release/v1.23.13/bin/linux/amd64/kubectl" \
&& chmod +x kubectl \
&& mv kubectl /usr/local/bin/kubectl
# Install kubectl (architecture-aware)
RUN ARCH=$(dpkg --print-architecture) && \
curl -LO "https://dl.k8s.io/release/v1.23.13/bin/linux/${ARCH}/kubectl" && \
chmod +x kubectl && \
mv kubectl /usr/local/bin/kubectl

# Pin pip version
RUN pip install pip==24.2
Expand Down
74 changes: 44 additions & 30 deletions model-engine/model_engine_server/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@
from model_engine_server.infra.gateways.resources.live_endpoint_resource_gateway import (
LiveEndpointResourceGateway,
)
from model_engine_server.infra.gateways.resources.onprem_queue_endpoint_resource_delegate import (
OnPremQueueEndpointResourceDelegate,
)
from model_engine_server.infra.gateways.resources.queue_endpoint_resource_delegate import (
QueueEndpointResourceDelegate,
)
Expand All @@ -114,6 +117,7 @@
FakeDockerRepository,
LiveTokenizerRepository,
LLMFineTuneRepository,
OnPremDockerRepository,
RedisModelEndpointCacheRepository,
S3FileLLMFineTuneEventsRepository,
S3FileLLMFineTuneRepository,
Expand Down Expand Up @@ -221,23 +225,30 @@ def _get_external_interfaces(
)

queue_delegate: QueueEndpointResourceDelegate
if CIRCLECI:
if CIRCLECI or infra_config().cloud_provider == "onprem":
# On-prem uses fake queue delegate (no SQS/ServiceBus)
queue_delegate = FakeQueueEndpointResourceDelegate()
elif infra_config().cloud_provider == "azure":
queue_delegate = ASBQueueEndpointResourceDelegate()
elif infra_config().cloud_provider == "onprem":
queue_delegate = OnPremQueueEndpointResourceDelegate()
else:
queue_delegate = SQSQueueEndpointResourceDelegate(
sqs_profile=os.getenv("SQS_PROFILE", hmi_config.sqs_profile)
)

inference_task_queue_gateway: TaskQueueGateway
infra_task_queue_gateway: TaskQueueGateway
if CIRCLECI:
if CIRCLECI or infra_config().cloud_provider == "onprem":
# On-prem uses Redis-based task queues
inference_task_queue_gateway = redis_24h_task_queue_gateway
infra_task_queue_gateway = redis_task_queue_gateway
elif infra_config().cloud_provider == "azure":
inference_task_queue_gateway = servicebus_task_queue_gateway
infra_task_queue_gateway = servicebus_task_queue_gateway
elif infra_config().cloud_provider == "onprem":
inference_task_queue_gateway = redis_task_queue_gateway
infra_task_queue_gateway = redis_task_queue_gateway
elif infra_config().celery_broker_type_redis:
inference_task_queue_gateway = redis_task_queue_gateway
infra_task_queue_gateway = redis_task_queue_gateway
Expand Down Expand Up @@ -274,16 +285,17 @@ def _get_external_interfaces(
monitoring_metrics_gateway=monitoring_metrics_gateway,
use_asyncio=(not CIRCLECI),
)
filesystem_gateway = (
ABSFilesystemGateway()
if infra_config().cloud_provider == "azure"
else S3FilesystemGateway()
)
llm_artifact_gateway = (
ABSLLMArtifactGateway()
if infra_config().cloud_provider == "azure"
else S3LLMArtifactGateway()
)
filesystem_gateway: FilesystemGateway
llm_artifact_gateway: LLMArtifactGateway
if infra_config().cloud_provider == "azure":
filesystem_gateway = ABSFilesystemGateway()
llm_artifact_gateway = ABSLLMArtifactGateway()
elif infra_config().cloud_provider == "onprem":
filesystem_gateway = S3FilesystemGateway() # Uses MinIO via s3_utils
llm_artifact_gateway = S3LLMArtifactGateway() # Uses MinIO via s3_utils
else:
filesystem_gateway = S3FilesystemGateway()
llm_artifact_gateway = S3LLMArtifactGateway()
model_endpoints_schema_gateway = LiveModelEndpointsSchemaGateway(
filesystem_gateway=filesystem_gateway
)
Expand Down Expand Up @@ -323,23 +335,20 @@ def _get_external_interfaces(
cron_job_gateway = LiveCronJobGateway()

llm_fine_tune_repository: LLMFineTuneRepository
llm_fine_tune_events_repository: LLMFineTuneEventsRepository
file_path = os.getenv(
"CLOUD_FILE_LLM_FINE_TUNE_REPOSITORY",
hmi_config.cloud_file_llm_fine_tune_repository,
)
if infra_config().cloud_provider == "azure":
llm_fine_tune_repository = ABSFileLLMFineTuneRepository(
file_path=file_path,
)
llm_fine_tune_repository = ABSFileLLMFineTuneRepository(file_path=file_path)
llm_fine_tune_events_repository = ABSFileLLMFineTuneEventsRepository()
elif infra_config().cloud_provider == "onprem":
llm_fine_tune_repository = S3FileLLMFineTuneRepository(file_path=file_path) # Uses MinIO
llm_fine_tune_events_repository = S3FileLLMFineTuneEventsRepository() # Uses MinIO
else:
llm_fine_tune_repository = S3FileLLMFineTuneRepository(
file_path=file_path,
)
llm_fine_tune_events_repository = (
ABSFileLLMFineTuneEventsRepository()
if infra_config().cloud_provider == "azure"
else S3FileLLMFineTuneEventsRepository()
)
llm_fine_tune_repository = S3FileLLMFineTuneRepository(file_path=file_path)
llm_fine_tune_events_repository = S3FileLLMFineTuneEventsRepository()
llm_fine_tuning_service = DockerImageBatchJobLLMFineTuningService(
docker_image_batch_job_gateway=docker_image_batch_job_gateway,
docker_image_batch_job_bundle_repo=docker_image_batch_job_bundle_repository,
Expand All @@ -350,17 +359,22 @@ def _get_external_interfaces(
docker_image_batch_job_gateway=docker_image_batch_job_gateway
)

file_storage_gateway = (
ABSFileStorageGateway()
if infra_config().cloud_provider == "azure"
else S3FileStorageGateway()
)
file_storage_gateway: FileStorageGateway
if infra_config().cloud_provider == "azure":
file_storage_gateway = ABSFileStorageGateway()
elif infra_config().cloud_provider == "onprem":
file_storage_gateway = S3FileStorageGateway() # Uses MinIO via s3_utils
else:
file_storage_gateway = S3FileStorageGateway()

docker_repository: DockerRepository
if CIRCLECI:
if CIRCLECI or infra_config().cloud_provider == "onprem":
# On-prem uses fake docker repository (no ECR/ACR validation)
docker_repository = FakeDockerRepository()
elif infra_config().docker_repo_prefix.endswith("azurecr.io"):
elif infra_config().cloud_provider == "azure":
docker_repository = ACRDockerRepository()
elif infra_config().cloud_provider == "onprem":
docker_repository = OnPremDockerRepository()
else:
docker_repository = ECRDockerRepository()

Expand Down
28 changes: 21 additions & 7 deletions model-engine/model_engine_server/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,13 @@ class HostedModelInferenceServiceConfig:
user_inference_tensorflow_repository: str
docker_image_layer_cache_repository: str
sensitive_log_mode: bool
# Exactly one of the following three must be specified
# Exactly one of the following must be specified for Redis cache
cache_redis_aws_url: Optional[str] = None # also using this to store sync autoscaling metrics
cache_redis_azure_host: Optional[str] = None
cache_redis_aws_secret_name: Optional[str] = (
None # Not an env var because the redis cache info is already here
)
cache_redis_onprem_url: Optional[str] = None # For on-prem Redis (e.g., redis://redis:6379/0)
sglang_repository: Optional[str] = None

@classmethod
Expand All @@ -90,21 +91,34 @@ def from_yaml(cls, yaml_path):

@property
def cache_redis_url(self) -> str:
# On-prem Redis support (explicit URL, no cloud provider dependency)
if self.cache_redis_onprem_url:
return self.cache_redis_onprem_url

cloud_provider = infra_config().cloud_provider

# On-prem: support REDIS_HOST env var fallback
if cloud_provider == "onprem":
if self.cache_redis_aws_url:
logger.info("On-prem deployment using cache_redis_aws_url")
return self.cache_redis_aws_url
redis_host = os.getenv("REDIS_HOST", "redis")
redis_port = getattr(infra_config(), "redis_port", 6379)
return f"redis://{redis_host}:{redis_port}/0"

if self.cache_redis_aws_url:
assert infra_config().cloud_provider == "aws", "cache_redis_aws_url is only for AWS"
assert cloud_provider == "aws", "cache_redis_aws_url is only for AWS"
if self.cache_redis_aws_secret_name:
logger.warning(
"Both cache_redis_aws_url and cache_redis_aws_secret_name are set. Using cache_redis_aws_url"
)
return self.cache_redis_aws_url
elif self.cache_redis_aws_secret_name:
assert (
infra_config().cloud_provider == "aws"
), "cache_redis_aws_secret_name is only for AWS"
creds = get_key_file(self.cache_redis_aws_secret_name) # Use default role
assert cloud_provider == "aws", "cache_redis_aws_secret_name is only for AWS"
creds = get_key_file(self.cache_redis_aws_secret_name)
return creds["cache-url"]

assert self.cache_redis_azure_host and infra_config().cloud_provider == "azure"
assert self.cache_redis_azure_host and cloud_provider == "azure"
username = os.getenv("AZURE_OBJECT_ID")
token = DefaultAzureCredential().get_token("https://redis.azure.com/.default")
password = token.token
Expand Down
10 changes: 4 additions & 6 deletions model-engine/model_engine_server/common/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,17 @@
import os
from typing import Any

import boto3
import smart_open
from model_engine_server.core.config import infra_config


def open_wrapper(uri: str, mode: str = "rt", **kwargs):
client: Any
cloud_provider: str
# This follows the 5.1.0 smart_open API
try:
cloud_provider = infra_config().cloud_provider
except Exception:
cloud_provider = "aws"

if cloud_provider == "azure":
from azure.identity import DefaultAzureCredential
from azure.storage.blob import BlobServiceClient
Expand All @@ -25,9 +23,9 @@ def open_wrapper(uri: str, mode: str = "rt", **kwargs):
DefaultAzureCredential(),
)
else:
profile_name = kwargs.get("aws_profile", os.getenv("AWS_PROFILE"))
session = boto3.Session(profile_name=profile_name)
client = session.client("s3")
from model_engine_server.infra.gateways.s3_utils import get_s3_client

client = get_s3_client(kwargs)

transport_params = {"client": client}
return smart_open.open(uri, mode, transport_params=transport_params)
11 changes: 10 additions & 1 deletion model-engine/model_engine_server/core/aws/roles.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,21 @@ def session(role: Optional[str], session_type: SessionT = Session) -> SessionT:

:param:`session_type` defines the type of session to return. Most users will use
the default boto3 type. Some users required a special type (e.g aioboto3 session).

For on-prem deployments without AWS profiles, pass role=None or role=""
to use default credentials from environment variables (AWS_ACCESS_KEY_ID, etc).
"""
# Do not assume roles in CIRCLECI
if os.getenv("CIRCLECI"):
logger.warning(f"In circleci, not assuming role (ignoring: {role})")
role = None
sesh: SessionT = session_type(profile_name=role)

# Use profile-based auth only if role is specified
# For on-prem with MinIO, role will be None or empty - use env var credentials
if role:
sesh: SessionT = session_type(profile_name=role)
else:
sesh: SessionT = session_type() # Uses default credential chain (env vars)
return sesh


Expand Down
5 changes: 5 additions & 0 deletions model-engine/model_engine_server/core/aws/storage_client.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import time
from typing import IO, Callable, Iterable, Optional, Sequence

Expand All @@ -20,6 +21,10 @@


def sync_storage_client(**kwargs) -> BaseClient:
# Support for MinIO/on-prem S3-compatible storage
endpoint_url = os.getenv("S3_ENDPOINT_URL")
if endpoint_url and "endpoint_url" not in kwargs:
kwargs["endpoint_url"] = endpoint_url
return session(infra_config().profile_ml_worker).client("s3", **kwargs) # type: ignore


Expand Down
31 changes: 21 additions & 10 deletions model-engine/model_engine_server/core/celery/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,17 +531,28 @@ def _get_backend_url_and_conf(
backend_url = get_redis_endpoint(1)
elif backend_protocol == "s3":
backend_url = "s3://"
if aws_role is None:
aws_session = session(infra_config().profile_ml_worker)
if infra_config().cloud_provider == "aws":
if aws_role is None:
aws_session = session(infra_config().profile_ml_worker)
else:
aws_session = session(aws_role)
out_conf_changes.update(
{
"s3_boto3_session": aws_session,
"s3_bucket": s3_bucket,
"s3_base_path": s3_base_path,
}
)
else:
aws_session = session(aws_role)
out_conf_changes.update(
{
"s3_boto3_session": aws_session,
"s3_bucket": s3_bucket,
"s3_base_path": s3_base_path,
}
)
logger.info(
"Non-AWS deployment, using environment variables for S3 backend credentials"
)
out_conf_changes.update(
{
"s3_bucket": s3_bucket,
"s3_base_path": s3_base_path,
}
)
elif backend_protocol == "abs":
backend_url = f"azureblockblob://{os.getenv('ABS_ACCOUNT_NAME')}"
else:
Expand Down
Loading