diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py index bac9f828b78d8..9550fdd0bc438 100644 --- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py +++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py @@ -41,6 +41,7 @@ import psutil import structlog +from airflow.sdk.configuration import conf from airflow.sdk.execution_time.coordinator import BaseCoordinator from airflow.sdk.execution_time.supervisor import ActivitySubprocess, NeverRaised, ProcessTracker @@ -293,6 +294,15 @@ def start( # type: ignore[override] stdout_r, stdout_w = tracker.track(*socket.socketpair()) stderr_r, stderr_w = tracker.track(*socket.socketpair()) + # A language SDK runtime cannot read Airflow's config, so propagate the + # resolved log levels via the environment at launch. StartupDetails + # arrives too late, the logs might already be produced by then. + env = { + **os.environ, + "AIRFLOW__LOGGING__LOGGING_LEVEL": conf.get("logging", "logging_level", fallback="INFO"), + "AIRFLOW__LOGGING__NAMESPACE_LEVELS": conf.get("logging", "namespace_levels", fallback=""), + } + proc = subprocess.Popen( [ *command, @@ -301,6 +311,7 @@ def start( # type: ignore[override] ], stdout=stdout_w.fileno(), stderr=stderr_w.fileno(), + env=env, ) tracker.track(proc) for soc in tracker.untrack(stdout_w, stderr_w): diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py index 8bdd905c6b059..5a89c73e780d4 100644 --- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py +++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py @@ -45,6 +45,7 @@ from airflow.sdk.execution_time.coordinator import BaseCoordinator from airflow.sdk.execution_time.supervisor import ActivitySubprocess +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS if not AIRFLOW_V_3_3_PLUS: @@ -763,6 +764,27 @@ def test_on_child_started_called(self, mock_client): assert kwargs["ti"] is ti assert kwargs["dag_rel_path"] == "bundle" + @conf_vars({("logging", "logging_level"): "DEBUG"}) + def test_resolved_log_level_passed_to_subprocess_env(self, mock_client): + """A language SDK runtime gets the resolved task log level via the environment at launch.""" + _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) + env = popen_mock.call_args.kwargs["env"] + assert env["AIRFLOW__LOGGING__LOGGING_LEVEL"] == "DEBUG" + + @conf_vars({("logging", "namespace_levels"): "sqlalchemy=INFO, botocore=WARNING"}) + def test_namespace_levels_passed_to_subprocess_env(self, mock_client): + """Per-logger levels are propagated verbatim for the runtime to parse.""" + _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) + env = popen_mock.call_args.kwargs["env"] + assert env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] == "sqlalchemy=INFO, botocore=WARNING" + + @conf_vars({("logging", "namespace_levels"): ""}) + def test_namespace_levels_omitted_when_unset(self, mock_client): + """An empty value has no pairs to parse, so the variable is left out.""" + _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) + env = popen_mock.call_args.kwargs["env"] + assert env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] == "" + def test_register_pipe_readers_called_with_four_sockets(self, mock_client): """Both socketpair read-ends and both TCP sockets must be registered, with a data kwarg.""" with (