Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions task-sdk/src/airflow/sdk/coordinators/_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import psutil
import structlog

from airflow.sdk.configuration import conf
from airflow.sdk.execution_time.coordinator import BaseCoordinator
from airflow.sdk.execution_time.supervisor import ActivitySubprocess, NeverRaised, ProcessTracker

Expand Down Expand Up @@ -293,6 +294,15 @@ def start( # type: ignore[override]
stdout_r, stdout_w = tracker.track(*socket.socketpair())
stderr_r, stderr_w = tracker.track(*socket.socketpair())

# A language SDK runtime cannot read Airflow's config, so propagate the
# resolved log levels via the environment at launch. StartupDetails
# arrives too late, the logs might already be produced by then.
env = {
**os.environ,
"AIRFLOW__LOGGING__LOGGING_LEVEL": conf.get("logging", "logging_level", fallback="INFO"),
"AIRFLOW__LOGGING__NAMESPACE_LEVELS": conf.get("logging", "namespace_levels", fallback=""),
}

proc = subprocess.Popen(
[
*command,
Expand All @@ -301,6 +311,7 @@ def start( # type: ignore[override]
],
stdout=stdout_w.fileno(),
stderr=stderr_w.fileno(),
env=env,
)
tracker.track(proc)
for soc in tracker.untrack(stdout_w, stderr_w):
Expand Down
22 changes: 22 additions & 0 deletions task-sdk/tests/task_sdk/coordinators/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from airflow.sdk.execution_time.coordinator import BaseCoordinator
from airflow.sdk.execution_time.supervisor import ActivitySubprocess

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_3_PLUS

if not AIRFLOW_V_3_3_PLUS:
Expand Down Expand Up @@ -763,6 +764,27 @@ def test_on_child_started_called(self, mock_client):
assert kwargs["ti"] is ti
assert kwargs["dag_rel_path"] == "bundle"

@conf_vars({("logging", "logging_level"): "DEBUG"})
def test_resolved_log_level_passed_to_subprocess_env(self, mock_client):
"""A language SDK runtime gets the resolved task log level via the environment at launch."""
_, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"])
env = popen_mock.call_args.kwargs["env"]
assert env["AIRFLOW__LOGGING__LOGGING_LEVEL"] == "DEBUG"

@conf_vars({("logging", "namespace_levels"): "sqlalchemy=INFO, botocore=WARNING"})
def test_namespace_levels_passed_to_subprocess_env(self, mock_client):
"""Per-logger levels are propagated verbatim for the runtime to parse."""
_, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"])
env = popen_mock.call_args.kwargs["env"]
assert env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] == "sqlalchemy=INFO, botocore=WARNING"

@conf_vars({("logging", "namespace_levels"): ""})
def test_namespace_levels_omitted_when_unset(self, mock_client):
"""An empty value has no pairs to parse, so the variable is left out."""
_, popen_mock, _ = self._start_with_mocks(mock_client, command=["/bin/true"])
env = popen_mock.call_args.kwargs["env"]
assert env["AIRFLOW__LOGGING__NAMESPACE_LEVELS"] == ""

def test_register_pipe_readers_called_with_four_sockets(self, mock_client):
"""Both socketpair read-ends and both TCP sockets must be registered, with a data kwarg."""
with (
Expand Down
Loading