diff --git a/CHANGELOG.md b/CHANGELOG.md index 985eaa24d..10a90d0bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## dbt-databricks 1.12.0 (TBD) + +### Features + +- Enable Notebook scoped python packages installation + ## dbt-databricks 1.11.5 (TBD) ### Fixes diff --git a/dbt/adapters/databricks/python_models/python_config.py b/dbt/adapters/databricks/python_models/python_config.py index f00164ab3..264176b10 100644 --- a/dbt/adapters/databricks/python_models/python_config.py +++ b/dbt/adapters/databricks/python_models/python_config.py @@ -24,6 +24,14 @@ class Config: extra = "allow" +class PythonPackagesConfig(BaseModel): + """Pydantic model for python packages configuration.""" + + packages: list[str] + notebook_scoped: bool + index_url: Optional[str] = None + + class PythonModelConfig(BaseModel): """ Pydantic model for a Python model configuration. @@ -42,6 +50,7 @@ class PythonModelConfig(BaseModel): cluster_id: Optional[str] = None http_path: Optional[str] = None create_notebook: bool = False + notebook_scoped_libraries: bool = False environment_key: Optional[str] = None environment_dependencies: list[str] = Field(default_factory=list) @@ -69,6 +78,14 @@ def validate_notebook_permissions(cls, v: list[dict[str, str]]) -> list[dict[str ) return v + @property + def python_packages_config(self) -> PythonPackagesConfig: + return PythonPackagesConfig( + packages=self.packages, + index_url=self.index_url, + notebook_scoped=self.notebook_scoped_libraries, + ) + class ParsedPythonModel(BaseModel): """Pydantic model for a Python model parsed from a dbt manifest""" diff --git a/dbt/adapters/databricks/python_models/python_submissions.py b/dbt/adapters/databricks/python_models/python_submissions.py index 23c2a27c1..db1742513 100644 --- a/dbt/adapters/databricks/python_models/python_submissions.py +++ b/dbt/adapters/databricks/python_models/python_submissions.py @@ -9,20 +9,55 @@ from dbt.adapters.databricks.api_client import CommandExecution, DatabricksApiClient, WorkflowJobApi from dbt.adapters.databricks.credentials import DatabricksCredentials from dbt.adapters.databricks.logging import logger -from dbt.adapters.databricks.python_models.python_config import ParsedPythonModel +from dbt.adapters.databricks.python_models.python_config import ( + ParsedPythonModel, + PythonPackagesConfig, +) from dbt.adapters.databricks.python_models.run_tracking import PythonRunTracker DEFAULT_TIMEOUT = 60 * 60 * 24 +NOTEBOOK_SEPARATOR = "\n\n# COMMAND ----------\n\n" class PythonSubmitter(ABC): """Interface for submitting Python models to run on Databricks.""" + def __init__(self, packages_config: PythonPackagesConfig) -> None: + self.packages_config = packages_config + @abstractmethod def submit(self, compiled_code: str) -> None: """Submit the compiled code to Databricks.""" pass + def _prepare_code_with_notebook_scoped_packages( + self, compiled_code: str, separator: str = NOTEBOOK_SEPARATOR + ) -> str: + """ + Prepend notebook-scoped package installation commands to the compiled code. + + If notebook-scoped flag is not set, or if there are no packages to install, + returns the original compiled code. + """ + if not self.packages_config.packages or not self.packages_config.notebook_scoped: + return compiled_code + + index_url = ( + f"--index-url {self.packages_config.index_url}" + if self.packages_config.index_url + else "" + ) + # Build the %pip install command for notebook-scoped packages + packages = " ".join(self.packages_config.packages) + pip_install_cmd = f"%pip install {index_url} -q {packages}" + logger.debug(f"Adding notebook-scoped package installation: {pip_install_cmd}") + + # Add extra restart python command for Databricks runtimes 13.0 and above + restart_cmd = "dbutils.library.restartPython()" + + # Prepend the pip install command to the compiled code + return f"{pip_install_cmd}{separator}{restart_cmd}{separator}{compiled_code}" + class BaseDatabricksHelper(PythonJobHelper): """Base helper for python models on Databricks.""" @@ -63,16 +98,24 @@ class PythonCommandSubmitter(PythonSubmitter): """Submitter for Python models using the Command API.""" def __init__( - self, api_client: DatabricksApiClient, tracker: PythonRunTracker, cluster_id: str + self, + api_client: DatabricksApiClient, + tracker: PythonRunTracker, + cluster_id: str, + parsed_model: ParsedPythonModel, ) -> None: self.api_client = api_client self.tracker = tracker self.cluster_id = cluster_id + super().__init__(parsed_model.config.python_packages_config) @override def submit(self, compiled_code: str) -> None: logger.debug("Submitting Python model using the Command API.") + # Prepare code with notebook-scoped package installation if needed + compiled_code = self._prepare_code_with_notebook_scoped_packages(compiled_code) + context_id = self.api_client.command_contexts.create(self.cluster_id) command_exec: Optional[CommandExecution] = None try: @@ -252,16 +295,24 @@ def get_library_config( packages: list[str], index_url: Optional[str], additional_libraries: list[dict[str, Any]], + notebook_scoped_libraries: bool = False, ) -> dict[str, Any]: - """Update the job configuration with the required libraries.""" + """ + Update the job configuration with the required libraries. + + If notebook_scoped_libraries is True, packages are not included in the library config + as they will be installed via %pip install in the notebook itself. + """ libraries = [] - for package in packages: - if index_url: - libraries.append({"pypi": {"package": package, "repo": index_url}}) - else: - libraries.append({"pypi": {"package": package}}) + # Only add packages to cluster-level libraries if not using notebook-scoped + if not notebook_scoped_libraries: + for package in packages: + if index_url: + libraries.append({"pypi": {"package": package, "repo": index_url}}) + else: + libraries.append({"pypi": {"package": package}}) for library in additional_libraries: libraries.append(library) @@ -286,7 +337,10 @@ def __init__( packages = parsed_model.config.packages index_url = parsed_model.config.index_url additional_libraries = parsed_model.config.additional_libs - library_config = get_library_config(packages, index_url, additional_libraries) + notebook_scoped_libraries = parsed_model.config.notebook_scoped_libraries + library_config = get_library_config( + packages, index_url, additional_libraries, notebook_scoped_libraries + ) self.cluster_spec = {**cluster_spec, **library_config} self.job_grants = parsed_model.config.python_job_config.grants self.additional_job_settings = parsed_model.config.python_job_config.dict() @@ -335,11 +389,14 @@ def __init__( tracker: PythonRunTracker, uploader: PythonNotebookUploader, config_compiler: PythonJobConfigCompiler, + parsed_model: ParsedPythonModel, ) -> None: self.api_client = api_client self.tracker = tracker self.uploader = uploader self.config_compiler = config_compiler + self.parsed_model = parsed_model + super().__init__(parsed_model.config.python_packages_config) @staticmethod def create( @@ -356,12 +413,17 @@ def create( parsed_model, cluster_spec, ) - return PythonNotebookSubmitter(api_client, tracker, notebook_uploader, config_compiler) + return PythonNotebookSubmitter( + api_client, tracker, notebook_uploader, config_compiler, parsed_model + ) @override def submit(self, compiled_code: str) -> None: logger.debug("Submitting Python model using the Job Run API.") + # Prepare code with notebook-scoped package installation if needed + compiled_code = self._prepare_code_with_notebook_scoped_packages(compiled_code) + file_path = self.uploader.upload(compiled_code) job_config = self.config_compiler.compile(file_path) @@ -444,7 +506,12 @@ def build_submitter(self) -> PythonSubmitter: {"existing_cluster_id": self.cluster_id}, ) else: - return PythonCommandSubmitter(self.api_client, self.tracker, self.cluster_id or "") + return PythonCommandSubmitter( + self.api_client, + self.tracker, + self.cluster_id or "", + self.parsed_model, + ) @override def validate_config(self) -> None: @@ -572,6 +639,7 @@ def __init__( workflow_creater: PythonWorkflowCreator, job_grants: dict[str, list[dict[str, str]]], acls: list[dict[str, str]], + parsed_model: ParsedPythonModel, ) -> None: self.api_client = api_client self.tracker = tracker @@ -581,6 +649,7 @@ def __init__( self.workflow_creater = workflow_creater self.job_grants = job_grants self.acls = acls + super().__init__(parsed_model.config.python_packages_config) @staticmethod def create( @@ -599,6 +668,7 @@ def create( workflow_creater, parsed_model.config.python_job_config.grants, parsed_model.config.access_control_list, + parsed_model, ) @override @@ -611,6 +681,7 @@ def submit(self, compiled_code: str) -> None: logger.debug( f"[Workflow Debug] Compiled code preview: {compiled_code[:preview_len]}..." ) + compiled_code = self._prepare_code_with_notebook_scoped_packages(compiled_code) file_path = self.uploader.upload(compiled_code) logger.debug(f"[Workflow Debug] Uploaded notebook to: {file_path}") diff --git a/docs/workflow-job-submission.md b/docs/workflow-job-submission.md index 8e607801f..3cdeafceb 100644 --- a/docs/workflow-job-submission.md +++ b/docs/workflow-job-submission.md @@ -70,6 +70,11 @@ models: runtime_engine: "{{ var('job_cluster_defaults.runtime_engine') }}" data_security_mode: "{{ var('job_cluster_defaults.data_security_mode') }}" autoscale: { "min_workers": 1, "max_workers": 4 } + + # Python package configuration + packages: ["pandas", "numpy==1.24.0"] + index_url: "https://pypi.org/simple" # Optional custom PyPI index + notebook_scoped_libraries: false # Set to true for notebook-scoped installation ``` ### Configuration @@ -173,6 +178,65 @@ grants: manage: [] ``` +#### Python Packages + +You can install Python packages for your models using the `packages` configuration. There are two ways to install packages: + +##### Cluster-level installation (default) + +By default, packages are installed at the cluster level using Databricks libraries. This is the traditional approach where packages are installed when the cluster starts. + +```yaml +models: + - name: my_model + config: + packages: ["pandas", "numpy==1.24.0", "scikit-learn>=1.0"] + index_url: "https://pypi.org/simple" # Optional: custom PyPI index + notebook_scoped_libraries: false # Default behavior +``` + +**Benefits:** +- Packages are available for the entire cluster lifecycle +- Faster model execution (no installation overhead per run) + +**Limitations:** +- Requires cluster restart to update packages +- All tasks on the cluster share the same package versions + +##### Notebook-scoped installation + +When `notebook_scoped_libraries: true`, packages are installed at the notebook level using `%pip install` magic commands. This prepends installation commands to your compiled code. + +```yaml +models: + - name: my_model + config: + packages: ["pandas", "numpy==1.24.0", "scikit-learn>=1.0"] + index_url: "https://pypi.org/simple" # Optional: custom PyPI index + notebook_scoped_libraries: true # Enable notebook-scoped installation +``` + +**Benefits:** +- Packages are installed per model execution +- No cluster restart required to change packages +- Different models can use different package versions +- Works with serverless compute and all-purpose clusters + +**How it works:** +The adapter prepends the following commands to your model code: +```python +%pip install -q pandas numpy==1.24.0 scikit-learn>=1.0 +dbutils.library.restartPython() +# Your model code follows... +``` + +**Supported submission methods:** +- `all_purpose_cluster` (Command API) +- `job_cluster` (Notebook Job Run) +- `workflow_job` (Workflow Job) + +**Note:** For Databricks Runtime 13.0 and above, `dbutils.library.restartPython()` is automatically added after package installation to ensure packages are properly loaded. + #### Post hooks It is possible to add in python hooks by using the `config.python_job_config.post_hook_tasks` diff --git a/tests/unit/python/test_python_config.py b/tests/unit/python/test_python_config.py index abda6587a..46840f045 100644 --- a/tests/unit/python/test_python_config.py +++ b/tests/unit/python/test_python_config.py @@ -85,6 +85,22 @@ def test_parsed_model__valid_model_config(self): assert config.http_path == "http_path" assert config.create_notebook is True + def test_parsed_model__valid_python_packages_config(self): + parsed_model = { + "alias": "test", + "config": { + "packages": ["package"], + "index_url": "index_url", + "notebook_scoped_libraries": True, + }, + } + + model = ParsedPythonModel(**parsed_model) + config = model.config.python_packages_config + assert config.packages == ["package"] + assert config.index_url == "index_url" + assert config.notebook_scoped is True + def test_parsed_model__extra_model_config(self): parsed_model = { "alias": "test", diff --git a/tests/unit/python/test_python_job_support.py b/tests/unit/python/test_python_job_support.py index 50536589a..9763ea17b 100644 --- a/tests/unit/python/test_python_job_support.py +++ b/tests/unit/python/test_python_job_support.py @@ -40,6 +40,8 @@ def uploader(self, client, parsed_model, identifier): parsed_model.catalog = "catalog" parsed_model.schema_ = "schema" parsed_model.identifier = identifier + parsed_model.config.notebook_scoped_libraries = False + parsed_model.config.packages = [] return PythonNotebookUploader(client, parsed_model) def test_upload__golden_path(self, uploader, client, compiled_code, workdir, identifier): @@ -220,6 +222,29 @@ def test_get_library_config__packages_libraries(self): ] } + def test_get_library_config__notebook_scoped_packages_excluded(self): + config = python_submissions.get_library_config( + ["package1", "package2"], None, [], notebook_scoped_libraries=True + ) + assert config == {"libraries": []} + + def test_get_library_config__notebook_scoped_with_additional_libs(self): + config = python_submissions.get_library_config( + ["package1", "package2"], + None, + [{"jar": "s3://mybucket/myjar.jar"}], + notebook_scoped_libraries=True, + ) + assert config == {"libraries": [{"jar": "s3://mybucket/myjar.jar"}]} + + def test_get_library_config__notebook_scoped_false_includes_packages(self): + config = python_submissions.get_library_config( + ["package1", "package2"], None, [], notebook_scoped_libraries=False + ) + assert config == { + "libraries": [{"pypi": {"package": "package1"}}, {"pypi": {"package": "package2"}}] + } + class TestPythonJobConfigCompiler: @pytest.fixture @@ -232,6 +257,7 @@ def run_name(self, parsed_model): parsed_model.run_name = run_name parsed_model.config.packages = [] parsed_model.config.additional_libs = [] + parsed_model.config.notebook_scoped_libraries = False return run_name @pytest.fixture @@ -384,3 +410,38 @@ def test_compile__user_environments_override_auto_generated( assert details.additional_job_config["environments"][0]["spec"]["dependencies"] == [ "requests" ] + + def test_compile__notebook_scoped_libraries_excludes_packages( + self, client, permission_builder, parsed_model, run_name + ): + parsed_model.config.packages = ["pandas", "numpy"] + parsed_model.config.index_url = None + parsed_model.config.notebook_scoped_libraries = True + parsed_model.config.environment_key = None + parsed_model.config.python_job_config.dict.return_value = {} + + permission_builder.build_job_permissions.return_value = [] + compiler = PythonJobConfigCompiler(client, permission_builder, parsed_model, {}) + details = compiler.compile("path") + + # Libraries should be empty since packages are notebook-scoped + assert details.job_spec["libraries"] == [] + + def test_compile__notebook_scoped_false_includes_packages( + self, client, permission_builder, parsed_model, run_name + ): + parsed_model.config.packages = ["pandas", "numpy"] + parsed_model.config.index_url = None + parsed_model.config.notebook_scoped_libraries = False + parsed_model.config.environment_key = None + parsed_model.config.python_job_config.dict.return_value = {} + + permission_builder.build_job_permissions.return_value = [] + compiler = PythonJobConfigCompiler(client, permission_builder, parsed_model, {}) + details = compiler.compile("path") + + # Libraries should include packages + assert details.job_spec["libraries"] == [ + {"pypi": {"package": "pandas"}}, + {"pypi": {"package": "numpy"}}, + ] diff --git a/tests/unit/python/test_python_submitters.py b/tests/unit/python/test_python_submitters.py index 9a47e6af5..e02b79dc4 100644 --- a/tests/unit/python/test_python_submitters.py +++ b/tests/unit/python/test_python_submitters.py @@ -3,6 +3,7 @@ import pytest from dbt_common.exceptions import DbtRuntimeError +from dbt.adapters.databricks.python_models.python_config import ParsedPythonModel, PythonModelConfig from dbt.adapters.databricks.python_models.python_submissions import ( PythonCommandSubmitter, PythonJobConfigCompiler, @@ -44,15 +45,22 @@ def uploader(): return Mock() +@pytest.fixture +def parsed_model(): + return ParsedPythonModel( + catalog="hive_metastore", schema="default", identifier="alias", config=PythonModelConfig() + ) + + class TestPythonCommandSubmitter: @pytest.fixture def cluster_id(self): return "cluster_id" @pytest.fixture - def submitter(self, client, tracker, cluster_id, context_id): + def submitter(self, client, tracker, cluster_id, context_id, parsed_model): client.command_contexts.create.return_value = context_id - return PythonCommandSubmitter(client, tracker, cluster_id) + return PythonCommandSubmitter(client, tracker, cluster_id, parsed_model) @pytest.fixture def context_id(self): @@ -87,11 +95,75 @@ def test_submit__poll_fails__cleans_up( client.command_contexts.destroy.assert_called_once_with(cluster_id, context_id) tracker.remove_command.assert_called_once_with(command_exec) + def test_submit__with_packages(self, client, tracker, cluster_id, context_id, compiled_code): + client.command_contexts.create.return_value = context_id + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas", "numpy==1.24.0", "scikit-learn>=1.0"], + notebook_scoped_libraries=True, + ) + submitter = PythonCommandSubmitter(client, tracker, cluster_id, parsed_model) + + command_exec = client.commands.execute.return_value + submitter.submit(compiled_code) + + # Verify the code includes the pip install command + expected_code = [ + "%pip install -q pandas numpy==1.24.0 scikit-learn>=1.0", + "dbutils.library.restartPython()", + "compiled_code", + ] + expected_code = "\n\n# COMMAND ----------\n\n".join(expected_code) + client.commands.execute.assert_called_once_with(cluster_id, context_id, expected_code) + client.commands.poll_for_completion.assert_called_once_with(command_exec) + tracker.remove_command.assert_called_once_with(command_exec) + + def test_submit__with_packages_and_index_url( + self, client, tracker, cluster_id, context_id, compiled_code + ): + client.command_contexts.create.return_value = context_id + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas"], + notebook_scoped_libraries=True, + index_url="https://example.com/pypi/simple", + ) + submitter = PythonCommandSubmitter(client, tracker, cluster_id, parsed_model) + + command_exec = client.commands.execute.return_value + submitter.submit(compiled_code) + + # Verify the code includes the pip install command + expected_code = [ + "%pip install --index-url https://example.com/pypi/simple -q pandas", + "dbutils.library.restartPython()", + "compiled_code", + ] + expected_code = "\n\n# COMMAND ----------\n\n".join(expected_code) + client.commands.execute.assert_called_once_with(cluster_id, context_id, expected_code) + client.commands.poll_for_completion.assert_called_once_with(command_exec) + tracker.remove_command.assert_called_once_with(command_exec) + + def test_submit__with_empty_packages( + self, client, tracker, cluster_id, context_id, compiled_code + ): + client.command_contexts.create.return_value = context_id + parsed_model = Mock() + parsed_model.config = PythonModelConfig(packages=[], notebook_scoped_libraries=True) + submitter = PythonCommandSubmitter(client, tracker, cluster_id, parsed_model) + + command_exec = client.commands.execute.return_value + submitter.submit(compiled_code) + + # Verify the code is unchanged + client.commands.execute.assert_called_once_with(cluster_id, context_id, compiled_code) + client.commands.poll_for_completion.assert_called_once_with(command_exec) + class TestPythonNotebookSubmitter: @pytest.fixture - def submitter(self, client, tracker, uploader, config_compiler): - return PythonNotebookSubmitter(client, tracker, uploader, config_compiler) + def submitter(self, client, tracker, uploader, config_compiler, parsed_model): + return PythonNotebookSubmitter(client, tracker, uploader, config_compiler, parsed_model) @pytest.fixture def run_id(self, client): @@ -172,6 +244,105 @@ def test_create__golden_path(self, client, tracker): assert isinstance(submitter.uploader, PythonNotebookUploader) assert isinstance(submitter.config_compiler, PythonJobConfigCompiler) + def test_submit__with_packages(self, client, tracker, uploader, config_compiler, compiled_code): + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas", "numpy==1.24.0", "scikit-learn>=1.0"], + notebook_scoped_libraries=True, + ) + submitter = PythonNotebookSubmitter( + client, tracker, uploader, config_compiler, parsed_model + ) + + job_config = Mock() + job_config.job_spec = {} + job_config.additional_job_config = {} + config_compiler.compile.return_value = job_config + uploader.upload.return_value = "upload_path" + + submitter.submit(compiled_code) + + # Verify the uploader was called with the modified code + expected_code = [ + "%pip install -q pandas numpy==1.24.0 scikit-learn>=1.0", + "dbutils.library.restartPython()", + "compiled_code", + ] + expected_code = "\n\n# COMMAND ----------\n\n".join(expected_code) + uploader.upload.assert_called_once_with(expected_code) + + def test_submit__with_packages_and_index_url( + self, client, tracker, uploader, config_compiler, compiled_code + ): + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas"], + notebook_scoped_libraries=True, + index_url="https://example.com/pypi/simple", + ) + submitter = PythonNotebookSubmitter( + client, tracker, uploader, config_compiler, parsed_model + ) + + job_config = Mock() + job_config.job_spec = {} + job_config.additional_job_config = {} + config_compiler.compile.return_value = job_config + uploader.upload.return_value = "upload_path" + + submitter.submit(compiled_code) + + # Verify the uploader was called with the modified code + expected_code = [ + "%pip install --index-url https://example.com/pypi/simple -q pandas", + "dbutils.library.restartPython()", + "compiled_code", + ] + expected_code = "\n\n# COMMAND ----------\n\n".join(expected_code) + uploader.upload.assert_called_once_with(expected_code) + + def test_submit__with_empty_packages( + self, client, tracker, uploader, config_compiler, compiled_code + ): + parsed_model = Mock() + parsed_model.config = PythonModelConfig(packages=[], notebook_scoped_libraries=True) + submitter = PythonNotebookSubmitter( + client, tracker, uploader, config_compiler, parsed_model + ) + + job_config = Mock() + job_config.job_spec = {} + job_config.additional_job_config = {} + config_compiler.compile.return_value = job_config + uploader.upload.return_value = "upload_path" + + submitter.submit(compiled_code) + + # Verify the code is unchanged + uploader.upload.assert_called_once_with(compiled_code) + + def test_submit__with_packages_not_notebook_scoped( + self, client, tracker, uploader, config_compiler, compiled_code + ): + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas"], notebook_scoped_libraries=False + ) + submitter = PythonNotebookSubmitter( + client, tracker, uploader, config_compiler, parsed_model + ) + + job_config = Mock() + job_config.job_spec = {} + job_config.additional_job_config = {} + config_compiler.compile.return_value = job_config + uploader.upload.return_value = "upload_path" + + submitter.submit(compiled_code) + + # Verify the code is unchanged when notebook_scoped_libraries is False + uploader.upload.assert_called_once_with(compiled_code) + class TestPythonNotebookWorkflowSubmitter: @pytest.fixture @@ -184,10 +355,25 @@ def workflow_creater(self): @pytest.fixture def submitter( - self, client, tracker, uploader, config_compiler, permission_builder, workflow_creater + self, + client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + parsed_model, ): return PythonNotebookWorkflowSubmitter( - client, tracker, uploader, config_compiler, permission_builder, workflow_creater, {}, [] + client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + {}, + [], + parsed_model, ) def test_submit__golden_path(self, submitter, compiled_code): @@ -225,3 +411,158 @@ def test_create__golden_path(self, client, tracker): assert isinstance(submitter.uploader, PythonNotebookUploader) assert isinstance(submitter.config_compiler, PythonWorkflowConfigCompiler) assert isinstance(submitter.workflow_creater, PythonWorkflowCreator) + + def test_submit__with_packages(self, client, tracker, uploader, compiled_code): + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas", "numpy==1.24.0", "scikit-learn>=1.0"], + notebook_scoped_libraries=True, + ) + parsed_model.config.python_job_config.grants = {} + parsed_model.config.access_control_list = [] + + config_compiler = Mock() + permission_builder = Mock() + workflow_creater = Mock() + + submitter = PythonNotebookWorkflowSubmitter( + client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + {}, + [], + parsed_model, + ) + + uploader.upload.return_value = "upload_path" + config_compiler.compile.return_value = ({}, "existing_job_id") + workflow_creater.create_or_update.return_value = "existing_job_id" + permission_builder.build_job_permissions.return_value = [] + client.workflows.run.return_value = "run_id" + + submitter.submit(compiled_code) + + # Verify the uploader was called with the modified code + expected_code = [ + "%pip install -q pandas numpy==1.24.0 scikit-learn>=1.0", + "dbutils.library.restartPython()", + "compiled_code", + ] + expected_code = "\n\n# COMMAND ----------\n\n".join(expected_code) + uploader.upload.assert_called_once_with(expected_code) + + def test_submit__with_packages_and_index_url(self, client, tracker, uploader, compiled_code): + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas"], + notebook_scoped_libraries=True, + index_url="https://example.com/pypi/simple", + ) + parsed_model.config.python_job_config.grants = {} + parsed_model.config.access_control_list = [] + + config_compiler = Mock() + permission_builder = Mock() + workflow_creater = Mock() + + submitter = PythonNotebookWorkflowSubmitter( + client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + {}, + [], + parsed_model, + ) + + uploader.upload.return_value = "upload_path" + config_compiler.compile.return_value = ({}, "existing_job_id") + workflow_creater.create_or_update.return_value = "existing_job_id" + permission_builder.build_job_permissions.return_value = [] + client.workflows.run.return_value = "run_id" + + submitter.submit(compiled_code) + + # Verify the uploader was called with the modified code + expected_code = [ + "%pip install --index-url https://example.com/pypi/simple -q pandas", + "dbutils.library.restartPython()", + "compiled_code", + ] + expected_code = "\n\n# COMMAND ----------\n\n".join(expected_code) + uploader.upload.assert_called_once_with(expected_code) + + def test_submit__with_empty_packages(self, client, tracker, uploader, compiled_code): + parsed_model = Mock() + parsed_model.config = PythonModelConfig(packages=[], notebook_scoped_libraries=True) + parsed_model.config.python_job_config.grants = {} + parsed_model.config.access_control_list = [] + + config_compiler = Mock() + permission_builder = Mock() + workflow_creater = Mock() + + submitter = PythonNotebookWorkflowSubmitter( + client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + {}, + [], + parsed_model, + ) + + uploader.upload.return_value = "upload_path" + config_compiler.compile.return_value = ({}, "existing_job_id") + workflow_creater.create_or_update.return_value = "existing_job_id" + permission_builder.build_job_permissions.return_value = [] + client.workflows.run.return_value = "run_id" + + submitter.submit(compiled_code) + + # Verify the code is unchanged + uploader.upload.assert_called_once_with(compiled_code) + + def test_submit__with_packages_not_notebook_scoped( + self, client, tracker, uploader, compiled_code + ): + parsed_model = Mock() + parsed_model.config = PythonModelConfig( + packages=["pandas"], notebook_scoped_libraries=False + ) + parsed_model.config.python_job_config.grants = {} + parsed_model.config.access_control_list = [] + + config_compiler = Mock() + permission_builder = Mock() + workflow_creater = Mock() + + submitter = PythonNotebookWorkflowSubmitter( + client, + tracker, + uploader, + config_compiler, + permission_builder, + workflow_creater, + {}, + [], + parsed_model, + ) + + uploader.upload.return_value = "upload_path" + config_compiler.compile.return_value = ({}, "existing_job_id") + workflow_creater.create_or_update.return_value = "existing_job_id" + permission_builder.build_job_permissions.return_value = [] + client.workflows.run.return_value = "run_id" + + submitter.submit(compiled_code) + + # Verify the code is unchanged when notebook_scoped_libraries is False + uploader.upload.assert_called_once_with(compiled_code)