Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## dbt-databricks 1.12.0 (TBD)

### Features

- Enable Notebook scoped python packages installation

## dbt-databricks 1.11.5 (TBD)

### Fixes
Expand Down
17 changes: 17 additions & 0 deletions dbt/adapters/databricks/python_models/python_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ class Config:
extra = "allow"


class PythonPackagesConfig(BaseModel):
"""Pydantic model for python packages configuration."""

packages: list[str]
notebook_scoped: bool
index_url: Optional[str] = None


class PythonModelConfig(BaseModel):
"""
Pydantic model for a Python model configuration.
Expand All @@ -42,6 +50,7 @@ class PythonModelConfig(BaseModel):
cluster_id: Optional[str] = None
http_path: Optional[str] = None
create_notebook: bool = False
notebook_scoped_libraries: bool = False
environment_key: Optional[str] = None
environment_dependencies: list[str] = Field(default_factory=list)

Expand Down Expand Up @@ -69,6 +78,14 @@ def validate_notebook_permissions(cls, v: list[dict[str, str]]) -> list[dict[str
)
return v

@property
def python_packages_config(self) -> PythonPackagesConfig:
return PythonPackagesConfig(
packages=self.packages,
index_url=self.index_url,
notebook_scoped=self.notebook_scoped_libraries,
)


class ParsedPythonModel(BaseModel):
"""Pydantic model for a Python model parsed from a dbt manifest"""
Expand Down
93 changes: 82 additions & 11 deletions dbt/adapters/databricks/python_models/python_submissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,55 @@
from dbt.adapters.databricks.api_client import CommandExecution, DatabricksApiClient, WorkflowJobApi
from dbt.adapters.databricks.credentials import DatabricksCredentials
from dbt.adapters.databricks.logging import logger
from dbt.adapters.databricks.python_models.python_config import ParsedPythonModel
from dbt.adapters.databricks.python_models.python_config import (
ParsedPythonModel,
PythonPackagesConfig,
)
from dbt.adapters.databricks.python_models.run_tracking import PythonRunTracker

DEFAULT_TIMEOUT = 60 * 60 * 24
NOTEBOOK_SEPARATOR = "\n\n# COMMAND ----------\n\n"


class PythonSubmitter(ABC):
"""Interface for submitting Python models to run on Databricks."""

def __init__(self, packages_config: PythonPackagesConfig) -> None:
self.packages_config = packages_config

@abstractmethod
def submit(self, compiled_code: str) -> None:
"""Submit the compiled code to Databricks."""
pass

def _prepare_code_with_notebook_scoped_packages(
self, compiled_code: str, separator: str = NOTEBOOK_SEPARATOR
) -> str:
"""
Prepend notebook-scoped package installation commands to the compiled code.

If notebook-scoped flag is not set, or if there are no packages to install,
returns the original compiled code.
"""
if not self.packages_config.packages or not self.packages_config.notebook_scoped:
return compiled_code

index_url = (
f"--index-url {self.packages_config.index_url}"
if self.packages_config.index_url
else ""
)
# Build the %pip install command for notebook-scoped packages
packages = " ".join(self.packages_config.packages)
pip_install_cmd = f"%pip install {index_url} -q {packages}"
logger.debug(f"Adding notebook-scoped package installation: {pip_install_cmd}")

# Add extra restart python command for Databricks runtimes 13.0 and above
restart_cmd = "dbutils.library.restartPython()"

# Prepend the pip install command to the compiled code
return f"{pip_install_cmd}{separator}{restart_cmd}{separator}{compiled_code}"


class BaseDatabricksHelper(PythonJobHelper):
"""Base helper for python models on Databricks."""
Expand Down Expand Up @@ -63,16 +98,24 @@ class PythonCommandSubmitter(PythonSubmitter):
"""Submitter for Python models using the Command API."""

def __init__(
self, api_client: DatabricksApiClient, tracker: PythonRunTracker, cluster_id: str
self,
api_client: DatabricksApiClient,
tracker: PythonRunTracker,
cluster_id: str,
parsed_model: ParsedPythonModel,
) -> None:
self.api_client = api_client
self.tracker = tracker
self.cluster_id = cluster_id
super().__init__(parsed_model.config.python_packages_config)

@override
def submit(self, compiled_code: str) -> None:
logger.debug("Submitting Python model using the Command API.")

# Prepare code with notebook-scoped package installation if needed
compiled_code = self._prepare_code_with_notebook_scoped_packages(compiled_code)

context_id = self.api_client.command_contexts.create(self.cluster_id)
command_exec: Optional[CommandExecution] = None
try:
Expand Down Expand Up @@ -252,16 +295,24 @@ def get_library_config(
packages: list[str],
index_url: Optional[str],
additional_libraries: list[dict[str, Any]],
notebook_scoped_libraries: bool = False,
) -> dict[str, Any]:
"""Update the job configuration with the required libraries."""
"""
Update the job configuration with the required libraries.

If notebook_scoped_libraries is True, packages are not included in the library config
as they will be installed via %pip install in the notebook itself.
"""

libraries = []

for package in packages:
if index_url:
libraries.append({"pypi": {"package": package, "repo": index_url}})
else:
libraries.append({"pypi": {"package": package}})
# Only add packages to cluster-level libraries if not using notebook-scoped
if not notebook_scoped_libraries:
for package in packages:
if index_url:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us look to add support for index_url in notebook scoped packages as well. Since this is supported today; it would be weird if some model silently breaks if something breaks if an user who has set this switches to using notebook scoped packages.

Copy link
Author

@fedemgp fedemgp Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already added support to index-url also at notebook scoped packages here

libraries.append({"pypi": {"package": package, "repo": index_url}})
else:
libraries.append({"pypi": {"package": package}})

for library in additional_libraries:
libraries.append(library)
Expand All @@ -286,7 +337,10 @@ def __init__(
packages = parsed_model.config.packages
index_url = parsed_model.config.index_url
additional_libraries = parsed_model.config.additional_libs
library_config = get_library_config(packages, index_url, additional_libraries)
notebook_scoped_libraries = parsed_model.config.notebook_scoped_libraries
library_config = get_library_config(
packages, index_url, additional_libraries, notebook_scoped_libraries
)
self.cluster_spec = {**cluster_spec, **library_config}
self.job_grants = parsed_model.config.python_job_config.grants
self.additional_job_settings = parsed_model.config.python_job_config.dict()
Expand Down Expand Up @@ -335,11 +389,14 @@ def __init__(
tracker: PythonRunTracker,
uploader: PythonNotebookUploader,
config_compiler: PythonJobConfigCompiler,
parsed_model: ParsedPythonModel,
) -> None:
self.api_client = api_client
self.tracker = tracker
self.uploader = uploader
self.config_compiler = config_compiler
self.parsed_model = parsed_model
super().__init__(parsed_model.config.python_packages_config)

@staticmethod
def create(
Expand All @@ -356,12 +413,17 @@ def create(
parsed_model,
cluster_spec,
)
return PythonNotebookSubmitter(api_client, tracker, notebook_uploader, config_compiler)
return PythonNotebookSubmitter(
api_client, tracker, notebook_uploader, config_compiler, parsed_model
)

@override
def submit(self, compiled_code: str) -> None:
logger.debug("Submitting Python model using the Job Run API.")

# Prepare code with notebook-scoped package installation if needed
compiled_code = self._prepare_code_with_notebook_scoped_packages(compiled_code)

file_path = self.uploader.upload(compiled_code)
job_config = self.config_compiler.compile(file_path)

Expand Down Expand Up @@ -444,7 +506,12 @@ def build_submitter(self) -> PythonSubmitter:
{"existing_cluster_id": self.cluster_id},
)
else:
return PythonCommandSubmitter(self.api_client, self.tracker, self.cluster_id or "")
return PythonCommandSubmitter(
self.api_client,
self.tracker,
self.cluster_id or "",
self.parsed_model,
)

@override
def validate_config(self) -> None:
Expand Down Expand Up @@ -572,6 +639,7 @@ def __init__(
workflow_creater: PythonWorkflowCreator,
job_grants: dict[str, list[dict[str, str]]],
acls: list[dict[str, str]],
parsed_model: ParsedPythonModel,
) -> None:
self.api_client = api_client
self.tracker = tracker
Expand All @@ -581,6 +649,7 @@ def __init__(
self.workflow_creater = workflow_creater
self.job_grants = job_grants
self.acls = acls
super().__init__(parsed_model.config.python_packages_config)

@staticmethod
def create(
Expand All @@ -599,6 +668,7 @@ def create(
workflow_creater,
parsed_model.config.python_job_config.grants,
parsed_model.config.access_control_list,
parsed_model,
)

@override
Expand All @@ -611,6 +681,7 @@ def submit(self, compiled_code: str) -> None:
logger.debug(
f"[Workflow Debug] Compiled code preview: {compiled_code[:preview_len]}..."
)
compiled_code = self._prepare_code_with_notebook_scoped_packages(compiled_code)

file_path = self.uploader.upload(compiled_code)
logger.debug(f"[Workflow Debug] Uploaded notebook to: {file_path}")
Expand Down
64 changes: 64 additions & 0 deletions docs/workflow-job-submission.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ models:
runtime_engine: "{{ var('job_cluster_defaults.runtime_engine') }}"
data_security_mode: "{{ var('job_cluster_defaults.data_security_mode') }}"
autoscale: { "min_workers": 1, "max_workers": 4 }

# Python package configuration
packages: ["pandas", "numpy==1.24.0"]
index_url: "https://pypi.org/simple" # Optional custom PyPI index
notebook_scoped_libraries: false # Set to true for notebook-scoped installation
```

### Configuration
Expand Down Expand Up @@ -173,6 +178,65 @@ grants:
manage: []
```

#### Python Packages

You can install Python packages for your models using the `packages` configuration. There are two ways to install packages:

##### Cluster-level installation (default)

By default, packages are installed at the cluster level using Databricks libraries. This is the traditional approach where packages are installed when the cluster starts.

```yaml
models:
- name: my_model
config:
packages: ["pandas", "numpy==1.24.0", "scikit-learn>=1.0"]
index_url: "https://pypi.org/simple" # Optional: custom PyPI index
notebook_scoped_libraries: false # Default behavior
```

**Benefits:**
- Packages are available for the entire cluster lifecycle
- Faster model execution (no installation overhead per run)

**Limitations:**
- Requires cluster restart to update packages
- All tasks on the cluster share the same package versions

##### Notebook-scoped installation

When `notebook_scoped_libraries: true`, packages are installed at the notebook level using `%pip install` magic commands. This prepends installation commands to your compiled code.

```yaml
models:
- name: my_model
config:
packages: ["pandas", "numpy==1.24.0", "scikit-learn>=1.0"]
index_url: "https://pypi.org/simple" # Optional: custom PyPI index
notebook_scoped_libraries: true # Enable notebook-scoped installation
```

**Benefits:**
- Packages are installed per model execution
- No cluster restart required to change packages
- Different models can use different package versions
- Works with serverless compute and all-purpose clusters

**How it works:**
The adapter prepends the following commands to your model code:
```python
%pip install -q pandas numpy==1.24.0 scikit-learn>=1.0
dbutils.library.restartPython()
# Your model code follows...
```

**Supported submission methods:**
- `all_purpose_cluster` (Command API)
- `job_cluster` (Notebook Job Run)
- `workflow_job` (Workflow Job)

**Note:** For Databricks Runtime 13.0 and above, `dbutils.library.restartPython()` is automatically added after package installation to ensure packages are properly loaded.

#### Post hooks

It is possible to add in python hooks by using the `config.python_job_config.post_hook_tasks`
Expand Down
16 changes: 16 additions & 0 deletions tests/unit/python/test_python_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,22 @@ def test_parsed_model__valid_model_config(self):
assert config.http_path == "http_path"
assert config.create_notebook is True

def test_parsed_model__valid_python_packages_config(self):
parsed_model = {
"alias": "test",
"config": {
"packages": ["package"],
"index_url": "index_url",
"notebook_scoped_libraries": True,
},
}

model = ParsedPythonModel(**parsed_model)
config = model.config.python_packages_config
assert config.packages == ["package"]
assert config.index_url == "index_url"
assert config.notebook_scoped is True

def test_parsed_model__extra_model_config(self):
parsed_model = {
"alias": "test",
Expand Down
Loading