From 03df8dbea5f8eedfaad0fa7a598b949397f40539 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Thu, 18 Jun 2026 18:46:45 +0900 Subject: [PATCH 1/4] Propagate resolved task log level to language SDK runtimes A language-SDK runtime (Java, Go) launched by a coordinator cannot read Airflow's configuration, so it could only guess the task log level from inherited environment variables. The subprocess starts before the StartupDetails handshake arrives, so carrying the level in that message would reach it too late. Passing the supervisor-resolved level in the environment at launch makes it available before the runtime configures logging. --- task-sdk/src/airflow/sdk/coordinators/_subprocess.py | 8 ++++++++ task-sdk/tests/task_sdk/coordinators/test_subprocess.py | 8 ++++++++ 2 files changed, 16 insertions(+) diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py index bac9f828b78d8..b1f92f3ea89f1 100644 --- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py +++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py @@ -41,6 +41,7 @@ import psutil import structlog +from airflow.sdk.configuration import conf from airflow.sdk.execution_time.coordinator import BaseCoordinator from airflow.sdk.execution_time.supervisor import ActivitySubprocess, NeverRaised, ProcessTracker @@ -301,6 +302,13 @@ def start( # type: ignore[override] ], stdout=stdout_w.fileno(), stderr=stderr_w.fileno(), + # A language SDK runtime cannot read Airflow's config, so propagate the + # resolved task log level via the environment at launch. StartupDetails + # arrives too late, the logs might already be produced by then. + env={ + **os.environ, + "AIRFLOW__LOGGING__LOGGING_LEVEL": conf.get("logging", "logging_level", fallback="INFO"), + }, ) tracker.track(proc) for soc in tracker.untrack(stdout_w, stderr_w): diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py index 8bdd905c6b059..db3b889e6e4b4 100644 --- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py +++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py @@ -45,6 +45,7 @@ from airflow.sdk.execution_time.coordinator import BaseCoordinator from airflow.sdk.execution_time.supervisor import ActivitySubprocess +from tests_common.test_utils.config import conf_vars from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS if not AIRFLOW_V_3_3_PLUS: @@ -763,6 +764,13 @@ def test_on_child_started_called(self, mock_client): assert kwargs["ti"] is ti assert kwargs["dag_rel_path"] == "bundle" + @conf_vars({("logging", "logging_level"): "DEBUG"}) + def test_resolved_log_level_passed_to_subprocess_env(self, mock_client): + """A language SDK runtime gets the resolved task log level via the environment at launch.""" + _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) + env = popen_mock.call_args.kwargs["env"] + assert env["AIRFLOW__LOGGING__LOGGING_LEVEL"] == "DEBUG" + def test_register_pipe_readers_called_with_four_sockets(self, mock_client): """Both socketpair read-ends and both TCP sockets must be registered, with a data kwarg.""" with ( From b5e450b4edae06cdddcfcdd28442d73dbb1c3ab7 Mon Sep 17 00:00:00 2001 From: LIU ZHE YOU Date: Fri, 19 Jun 2026 13:30:10 +0900 Subject: [PATCH 2/4] Propagate per-logger namespace levels to language SDK runtimes The root log level alone does not reflect a user's per-logger overrides (`[logging] namespace_levels`), so a runtime would still emit logs the worker is configured to suppress. Passing the levels alongside the root level keeps the runtime's filtering consistent with the rest of Airflow. Each language SDK reads and applies these on its own, so the contract is documented for SDK authors. --- .../language-sdks/index.rst | 5 +++++ .../airflow/sdk/coordinators/_subprocess.py | 18 +++++++++++------- .../task_sdk/coordinators/test_subprocess.py | 14 ++++++++++++++ 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst index e32a70d0ffc6d..596a2dee482f8 100644 --- a/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst +++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst @@ -77,6 +77,11 @@ The execution model has three moving parts. worker to identify the workload, executes the task, and communicates through the coordinator as a proxy back to the worker process. + A runtime cannot read Airflow's configuration, so the coordinator passes the resolved logging + configuration through the environment at launch: ``AIRFLOW__LOGGING__LOGGING_LEVEL`` (the root level) + and, when set, ``AIRFLOW__LOGGING__NAMESPACE_LEVELS`` (per-logger ``=`` pairs, separated + by whitespace or commas). Each SDK is responsible for reading these and filtering its own logs. + .. _language-sdks/stub-tasks: Stub tasks diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py index b1f92f3ea89f1..b0412333c1763 100644 --- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py +++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py @@ -294,6 +294,16 @@ def start( # type: ignore[override] stdout_r, stdout_w = tracker.track(*socket.socketpair()) stderr_r, stderr_w = tracker.track(*socket.socketpair()) + # A language SDK runtime cannot read Airflow's config, so propagate the + # resolved log levels via the environment at launch. StartupDetails + # arrives too late, the logs might already be produced by then. + env = { + **os.environ, + "AIRFLOW__LOGGING__LOGGING_LEVEL": conf.get("logging", "logging_level", fallback="INFO"), + } + if namespace_levels := conf.get("logging", "namespace_levels", fallback=None): + env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] = namespace_levels + proc = subprocess.Popen( [ *command, @@ -302,13 +312,7 @@ def start( # type: ignore[override] ], stdout=stdout_w.fileno(), stderr=stderr_w.fileno(), - # A language SDK runtime cannot read Airflow's config, so propagate the - # resolved task log level via the environment at launch. StartupDetails - # arrives too late, the logs might already be produced by then. - env={ - **os.environ, - "AIRFLOW__LOGGING__LOGGING_LEVEL": conf.get("logging", "logging_level", fallback="INFO"), - }, + env=env, ) tracker.track(proc) for soc in tracker.untrack(stdout_w, stderr_w): diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py index db3b889e6e4b4..163bee5948115 100644 --- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py +++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py @@ -771,6 +771,20 @@ def test_resolved_log_level_passed_to_subprocess_env(self, mock_client): env = popen_mock.call_args.kwargs["env"] assert env["AIRFLOW__LOGGING__LOGGING_LEVEL"] == "DEBUG" + @conf_vars({("logging", "namespace_levels"): "sqlalchemy=INFO, botocore=WARNING"}) + def test_namespace_levels_passed_to_subprocess_env(self, mock_client): + """Per-logger levels are propagated verbatim for the runtime to parse.""" + _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) + env = popen_mock.call_args.kwargs["env"] + assert env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] == "sqlalchemy=INFO, botocore=WARNING" + + @conf_vars({("logging", "namespace_levels"): ""}) + def test_namespace_levels_omitted_when_unset(self, mock_client): + """An empty value has no pairs to parse, so the variable is left out.""" + _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) + env = popen_mock.call_args.kwargs["env"] + assert "AIRFLOW__LOGGING__NAMESPACE_LEVELS" not in env + def test_register_pipe_readers_called_with_four_sockets(self, mock_client): """Both socketpair read-ends and both TCP sockets must be registered, with a data kwarg.""" with ( From c0f6ddf2c2b8560c3f9c763462657ad655a6eeb0 Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 22 Jun 2026 05:35:21 +0800 Subject: [PATCH 3/4] Remove logging metion in user-facing logs This should simply work, so we don't need to get into them. --- .../docs/authoring-and-scheduling/language-sdks/index.rst | 5 ----- 1 file changed, 5 deletions(-) diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst index 596a2dee482f8..e32a70d0ffc6d 100644 --- a/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst +++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/index.rst @@ -77,11 +77,6 @@ The execution model has three moving parts. worker to identify the workload, executes the task, and communicates through the coordinator as a proxy back to the worker process. - A runtime cannot read Airflow's configuration, so the coordinator passes the resolved logging - configuration through the environment at launch: ``AIRFLOW__LOGGING__LOGGING_LEVEL`` (the root level) - and, when set, ``AIRFLOW__LOGGING__NAMESPACE_LEVELS`` (per-logger ``=`` pairs, separated - by whitespace or commas). Each SDK is responsible for reading these and filtering its own logs. - .. _language-sdks/stub-tasks: Stub tasks From 7b2f48319b17b32efbd18faa26dff0782a11257a Mon Sep 17 00:00:00 2001 From: Tzu-ping Chung Date: Mon, 22 Jun 2026 05:35:56 +0800 Subject: [PATCH 4/4] Always set AIRFLOW__LOGGING__NAMESPACE_LEVELS Even when the configuration is empty, we can still set the environ without harm. This would actually be easier to handle in the runtime. --- task-sdk/src/airflow/sdk/coordinators/_subprocess.py | 3 +-- task-sdk/tests/task_sdk/coordinators/test_subprocess.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py index b0412333c1763..9550fdd0bc438 100644 --- a/task-sdk/src/airflow/sdk/coordinators/_subprocess.py +++ b/task-sdk/src/airflow/sdk/coordinators/_subprocess.py @@ -300,9 +300,8 @@ def start( # type: ignore[override] env = { **os.environ, "AIRFLOW__LOGGING__LOGGING_LEVEL": conf.get("logging", "logging_level", fallback="INFO"), + "AIRFLOW__LOGGING__NAMESPACE_LEVELS": conf.get("logging", "namespace_levels", fallback=""), } - if namespace_levels := conf.get("logging", "namespace_levels", fallback=None): - env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] = namespace_levels proc = subprocess.Popen( [ diff --git a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py index 163bee5948115..5a89c73e780d4 100644 --- a/task-sdk/tests/task_sdk/coordinators/test_subprocess.py +++ b/task-sdk/tests/task_sdk/coordinators/test_subprocess.py @@ -783,7 +783,7 @@ def test_namespace_levels_omitted_when_unset(self, mock_client): """An empty value has no pairs to parse, so the variable is left out.""" _, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"]) env = popen_mock.call_args.kwargs["env"] - assert "AIRFLOW__LOGGING__NAMESPACE_LEVELS" not in env + assert env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] == "" def test_register_pipe_readers_called_with_four_sockets(self, mock_client): """Both socketpair read-ends and both TCP sockets must be registered, with a data kwarg."""