Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -328,9 +328,10 @@ www-hash.txt
/src/airflow/providers/google/_vendor/*

# Java SDK build outputs
/java-sdk/bin/*
/java-sdk/build/*
/java-sdk/sdk/build/*
/java-sdk/example/build/*
/java-sdk/*/bin/*
/java-sdk/*/build/*

# Git ignore file
.gitignore
Expand Down
153 changes: 153 additions & 0 deletions airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,159 @@ See the Java SDK's published JavaDoc for more details.

.. TODO: (AIP-108) Put a link here once we publish the JavaDoc.

.. _java-sdk/logging:

Logging
-------

Task code can emit log records through any common Java logging framework. The SDK ships optional
integration libraries that forward those records to Airflow's task log store, where they appear
alongside the standard task output in the Airflow UI.

Comment thread
uranusjr marked this conversation as resolved.
Declare a logger as a static field on the task class, using the class's own type as the name. This
is the conventional pattern regardless of which logging framework you choose:

.. code-block:: java

private static final System.Logger log =
System.getLogger(SalesPipeline.class.getName());

@Builder.Task(id = "extract")
public long extract(Client client) {
log.log(System.Logger.Level.INFO, "Starting extraction");
return recordCount;
}

The Gradle snippets below show the dependency declarations; all Airflow artifact versions are managed
by ``airflow-sdk-bom``. Maven users apply the same artifact IDs following the pattern in
:ref:`java-sdk/build/maven`.

.. _java-sdk/logging/jpl:

``System.Logger`` (Java Platform Logging)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Java 9's new logging facade ``java.lang.System.Logger`` (JEP 264), commonly abbreviated *JPL*, can be
used by libraries without pulling in any third-party API. The ``airflow-sdk-jpl`` artifact registers an
``AirflowSystemLoggerFinder`` via ``ServiceLoader``, which routes all ``System.Logger`` calls directly
to Airflow's task log store.

.. code-block:: groovy

implementation("org.apache.airflow:airflow-sdk-jpl:${version}")

No configuration file or startup call is required. The ``ServiceLoader`` mechanism discovers the
provider automatically as long as the JAR is on the classpath.

.. note::

Do not add a second ``System.LoggerFinder`` implementation alongside
``airflow-sdk-jpl``. The JVM selects one finder via ``ServiceLoader``; having
multiple providers on the classpath leads to unpredictable behaviour.

.. _java-sdk/logging/slf4j:

SLF4J 2.x
~~~~~~~~~

The SLF4J binding is discovered automatically via ``ServiceLoader``; no configuration file or
startup call is required.

.. code-block:: groovy

implementation("org.apache.airflow:airflow-sdk-slf4j:${version}")

The above automatically pulls in the SLF4J API, so you don't need to add ``slf4j-api`` yourself.

.. note::

Do not add a second SLF4J binding (such as ``logback-classic`` or ``slf4j-simple``) alongside
``airflow-sdk-slf4j``. SLF4J 2.x warns about multiple bindings and selects one unpredictably.

.. _java-sdk/logging/log4j2:

Log4j 2
~~~~~~~

``airflow-sdk-log4j2`` declares ``log4j-api`` as a transitive dependency, so you do not need to add the latter
separately. You must also place ``log4j-core`` on the runtime classpath to host the plugin loader that
discovers the custom ``AirflowAppender`` supplied by ``airflow-sdk-log4j2`` at startup:

.. code-block:: groovy

implementation("org.apache.airflow:airflow-sdk-log4j2:${version}")
runtimeOnly("org.apache.logging.log4j:log4j-core:${log4jVersion}")

Declare ``AirflowAppender`` in your ``log4j2.xml``:

.. code-block:: xml

<?xml version="1.0" encoding="UTF-8"?>
<Configuration>
<Appenders>
<AirflowAppender name="Airflow"/>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Airflow"/>
</Root>
</Loggers>
</Configuration>

.. _java-sdk/logging/jul:

``java.util.logging``
~~~~~~~~~~~~~~~~~~~~~

Add the artifact:

.. code-block:: groovy

implementation("org.apache.airflow:airflow-sdk-jul:${version}")

and call ``AirflowJulHandler.install()`` on startup to attach the handler to the
JUL root logger before any task runs:

.. code-block:: java

public static void main(String[] args) {
AirflowJulHandler.install();
Server.create(args).serve(new MyBundle());
}

Alternatively, declare the handler in a ``logging.properties`` file and point JUL at it with the
``java.util.logging.config.file`` system property (set via ``jvm_args`` in the coordinator
configuration):

.. code-block:: properties

handlers = org.apache.airflow.sdk.jul.AirflowJulHandler

.. code-block:: ini

[sdk]
coordinators = {
"java-jdk17": {
"classpath": "airflow.sdk.coordinators.java.JavaCoordinator",
"kwargs": {
"jars_root": ["/opt/airflow/jars"],
"jvm_args": ["-Djava.util.logging.config.file=/opt/airflow/logging.properties"]
}
}
}

.. _java-sdk/logging/other:

Other frameworks
~~~~~~~~~~~~~~~~

Several commonly used logging APIs are covered without a dedicated Airflow artifact:

* **Logback** is itself an SLF4J binding. Replace ``logback-classic`` with ``airflow-sdk-slf4j``
and no changes are needed in your task code.
* **Apache Commons Logging (JCL)** can be bridged to SLF4J via ``org.slf4j:jcl-over-slf4j`` or
to Log4j 2 via ``org.apache.logging.log4j:log4j-jcl``.

.. _java-sdk/types:

XCom type mapping
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,21 @@

from __future__ import annotations

import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING

import pytest

from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient

if TYPE_CHECKING:
from collections.abc import Callable

# The Java extract task sleeps 6 s + coordinator startup; allow plenty of room.
_JAVA_TASK_TIMEOUT = 600
# Logs can lag slightly behind the task reaching a terminal state.
_LOG_FETCH_TIMEOUT = 60


class TestJavaSDKAnnotationExample:
Expand Down Expand Up @@ -166,3 +173,71 @@ def test_load_retried_then_succeeded(self):
f"Java 'load' task should have run twice (fail then retry); "
f"try_number={load_ti.get('try_number')!r}, ti: {load_ti}"
)

def _wait_for_transform_log_record(
self, run_id: str, try_number: int, match: Callable[[dict], bool]
) -> tuple[dict | None, list[dict]]:
"""Poll the ``transform`` task logs until a record matching *match* appears.

Logs can lag behind the terminal task state, and earlier records (e.g. the
first transform line) arrive before the one under test, so returning on any
record would race. Keep polling until the target record shows up or the
deadline passes. Returns the matching record (or ``None``) and the last
batch of records seen for diagnostics.
"""
deadline = time.monotonic() + _LOG_FETCH_TIMEOUT
records: list[dict] = []
while True:
resp = self.airflow_client.get_task_logs(
dag_id="java_annotation_example", run_id=run_id, task_id="transform", try_number=try_number
)
records = [entry for entry in resp.get("content", []) if isinstance(entry, dict)]
record = next((r for r in records if match(r)), None)
if record is not None or time.monotonic() > deadline:
return record, records
time.sleep(3)

def test_application_logs_preserve_their_level(self):
"""A Java task's SLF4J ``logger.info`` must reach the UI as INFO, not ERROR.

Without the SDK's SLF4J binding the application's logs fall through to
stderr and the supervisor tags every line ERROR. The binding routes them
over the logs socket carrying the real level instead.
"""
resp = self.airflow_client.trigger_dag(
"java_annotation_example",
json={"logical_date": datetime.now(timezone.utc).isoformat()},
)
run_id = resp["dag_run_id"]
dag_state = self.airflow_client.wait_for_dag_run(
dag_id="java_annotation_example",
run_id=run_id,
timeout=_JAVA_TASK_TIMEOUT,
)

# The log under test is emitted only if transform actually ran; assert it
# succeeded and fetch the attempt that produced the logs (transform does
# not retry, but read try_number rather than assuming attempt 1).
ti_resp = self.airflow_client.get_task_instances(dag_id="java_annotation_example", run_id=run_id)
ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])}
transform_ti = ti_map.get("transform", {})
assert transform_ti.get("state") == "success", (
f"Java 'transform' task must succeed to emit the log under test.\n"
f" task state : {transform_ti.get('state')!r}\n"
f" dag state : {dag_state!r}\n"
f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }"
)

# transform logs `logger.info("Got variable {}", variable)` -> "Got variable 123".
record, records = self._wait_for_transform_log_record(
run_id,
transform_ti.get("try_number", 1),
lambda r: str(r.get("event", "")).startswith("Got variable"),
)
assert record is not None, (
f"transform should emit a 'Got variable' INFO record; "
f"events seen: {[r.get('event') for r in records]}"
)
assert str(record.get("level", "")).lower() == "info", (
f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}"
)
5 changes: 4 additions & 1 deletion java-sdk/bom/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ val airflowSupervisorSchemaVersion: String by project
dependencies {
constraints {
api("org.apache.airflow:airflow-sdk:$projectVersion")
api("org.apache.airflow:airflow-sdk-processor:$projectVersion")
api("org.apache.airflow:airflow-sdk-jul:$projectVersion")
api("org.apache.airflow:airflow-sdk-log4j2:$projectVersion")
api("org.apache.airflow:airflow-sdk-processor:${projectVersion}")
api("org.apache.airflow:airflow-sdk-slf4j:$projectVersion")
}
}

Expand Down
2 changes: 1 addition & 1 deletion java-sdk/example/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ repositories {
dependencies {
annotationProcessor("org.apache.airflow:airflow-sdk-processor:${projectVersion}")
implementation("org.apache.airflow:airflow-sdk:${projectVersion}")
implementation("org.slf4j:slf4j-simple:2.0.17")
implementation("org.apache.airflow:airflow-sdk-jpl:${projectVersion}")
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,43 @@

package org.apache.airflow.example;

import static java.lang.System.Logger.Level.INFO;

import java.util.Date;
import org.apache.airflow.sdk.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressWarnings("DuplicatedCode")
@Builder.Dag(id = "java_annotation_example")
public class AnnotationExample {
private static final Logger logger = LoggerFactory.getLogger(AnnotationExample.class);
private static final System.Logger log = System.getLogger(AnnotationExample.class.getName());

@Builder.Task(id = "extract")
public long extractValue(Client client) throws InterruptedException {
logger.info("Hello from task");
log.log(INFO, "Hello from task");

var pythonXcom = client.getXCom("python_task_1");
logger.info("Got XCom from Python Task 'python_task_1' {}", pythonXcom);
log.log(INFO, "Got XCom from python_task_1: {0}", pythonXcom);

var connection = client.getConnection("test_http");
logger.info("Got con {}", connection);
log.log(INFO, "Got connection: {0}", connection);

for (var i = 0; i < 3; i++) {
logger.info("Beep {}, next time will be {}", i, new Date());
log.log(INFO, "Beep {0}, next time will be {1}", i, new Date());
Thread.sleep(2 * 1000);
}

logger.info("Goodbye from task");
log.log(INFO, "Goodbye from task");
return new Date().getTime();
}

@Builder.Task(id = "transform")
public long transformValue(Client client, @Builder.XCom(task = "extract") long extracted) {
logger.info("Got XCom from 'extract' {}", extracted);
log.log(INFO, "Got XCom from extract: {0}", extracted);

var variable = client.getVariable("my_variable");
logger.info("Got variable {}", variable);
log.log(INFO, "Got variable: {0}", variable);

logger.info("Push XCom to python task 2");
log.log(INFO, "Push XCom to python task 2");
return new Date().getTime();
}

Expand All @@ -66,10 +66,10 @@ public long transformValue(Client client, @Builder.XCom(task = "extract") long e
// set. The retry then runs this task again and it returns normally.
@Builder.Task
public void load(Context context, @Builder.XCom(task = "transform") long transformed) {
logger.info("Got XCom from 'transform' {}", transformed);
log.log(INFO, "Got XCom from transform: {0}", transformed);
if (context.ti.tryNumber == 1) {
throw new RuntimeException("I failed");
}
logger.info("Recovered on retry, try number {}", context.ti.tryNumber);
log.log(INFO, "Recovered on retry, try number {0}", context.ti.tryNumber);
}
}
Loading
Loading