diff --git a/.rat-excludes b/.rat-excludes index 01f7a3acb8b43..f0cd2910f5b27 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -328,9 +328,10 @@ www-hash.txt /src/airflow/providers/google/_vendor/* # Java SDK build outputs +/java-sdk/bin/* /java-sdk/build/* -/java-sdk/sdk/build/* -/java-sdk/example/build/* +/java-sdk/*/bin/* +/java-sdk/*/build/* # Git ignore file .gitignore diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst index 380ec763733fc..25dac2e2e1043 100644 --- a/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst +++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst @@ -230,6 +230,159 @@ See the Java SDK's published JavaDoc for more details. .. TODO: (AIP-108) Put a link here once we publish the JavaDoc. +.. _java-sdk/logging: + +Logging +------- + +Task code can emit log records through any common Java logging framework. The SDK ships optional +integration libraries that forward those records to Airflow's task log store, where they appear +alongside the standard task output in the Airflow UI. + +Declare a logger as a static field on the task class, using the class's own type as the name. This +is the conventional pattern regardless of which logging framework you choose: + +.. code-block:: java + + private static final System.Logger log = + System.getLogger(SalesPipeline.class.getName()); + + @Builder.Task(id = "extract") + public long extract(Client client) { + log.log(System.Logger.Level.INFO, "Starting extraction"); + return recordCount; + } + +The Gradle snippets below show the dependency declarations; all Airflow artifact versions are managed +by ``airflow-sdk-bom``. Maven users apply the same artifact IDs following the pattern in +:ref:`java-sdk/build/maven`. + +.. _java-sdk/logging/jpl: + +``System.Logger`` (Java Platform Logging) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Java 9's new logging facade ``java.lang.System.Logger`` (JEP 264), commonly abbreviated *JPL*, can be +used by libraries without pulling in any third-party API. The ``airflow-sdk-jpl`` artifact registers an +``AirflowSystemLoggerFinder`` via ``ServiceLoader``, which routes all ``System.Logger`` calls directly +to Airflow's task log store. + +.. code-block:: groovy + + implementation("org.apache.airflow:airflow-sdk-jpl:${version}") + +No configuration file or startup call is required. The ``ServiceLoader`` mechanism discovers the +provider automatically as long as the JAR is on the classpath. + +.. note:: + + Do not add a second ``System.LoggerFinder`` implementation alongside + ``airflow-sdk-jpl``. The JVM selects one finder via ``ServiceLoader``; having + multiple providers on the classpath leads to unpredictable behaviour. + +.. _java-sdk/logging/slf4j: + +SLF4J 2.x +~~~~~~~~~ + +The SLF4J binding is discovered automatically via ``ServiceLoader``; no configuration file or +startup call is required. + +.. code-block:: groovy + + implementation("org.apache.airflow:airflow-sdk-slf4j:${version}") + +The above automatically pulls in the SLF4J API, so you don't need to add ``slf4j-api`` yourself. + +.. note:: + + Do not add a second SLF4J binding (such as ``logback-classic`` or ``slf4j-simple``) alongside + ``airflow-sdk-slf4j``. SLF4J 2.x warns about multiple bindings and selects one unpredictably. + +.. _java-sdk/logging/log4j2: + +Log4j 2 +~~~~~~~ + +``airflow-sdk-log4j2`` declares ``log4j-api`` as a transitive dependency, so you do not need to add the latter +separately. You must also place ``log4j-core`` on the runtime classpath to host the plugin loader that +discovers the custom ``AirflowAppender`` supplied by ``airflow-sdk-log4j2`` at startup: + +.. code-block:: groovy + + implementation("org.apache.airflow:airflow-sdk-log4j2:${version}") + runtimeOnly("org.apache.logging.log4j:log4j-core:${log4jVersion}") + +Declare ``AirflowAppender`` in your ``log4j2.xml``: + +.. code-block:: xml + + + + + + + + + + + + + +.. _java-sdk/logging/jul: + +``java.util.logging`` +~~~~~~~~~~~~~~~~~~~~~ + +Add the artifact: + +.. code-block:: groovy + + implementation("org.apache.airflow:airflow-sdk-jul:${version}") + +and call ``AirflowJulHandler.install()`` on startup to attach the handler to the +JUL root logger before any task runs: + +.. code-block:: java + + public static void main(String[] args) { + AirflowJulHandler.install(); + Server.create(args).serve(new MyBundle()); + } + +Alternatively, declare the handler in a ``logging.properties`` file and point JUL at it with the +``java.util.logging.config.file`` system property (set via ``jvm_args`` in the coordinator +configuration): + +.. code-block:: properties + + handlers = org.apache.airflow.sdk.jul.AirflowJulHandler + +.. code-block:: ini + + [sdk] + coordinators = { + "java-jdk17": { + "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", + "kwargs": { + "jars_root": ["/opt/airflow/jars"], + "jvm_args": ["-Djava.util.logging.config.file=/opt/airflow/logging.properties"] + } + } + } + +.. _java-sdk/logging/other: + +Other frameworks +~~~~~~~~~~~~~~~~ + +Several commonly used logging APIs are covered without a dedicated Airflow artifact: + +* **Logback** is itself an SLF4J binding. Replace ``logback-classic`` with ``airflow-sdk-slf4j`` + and no changes are needed in your task code. +* **Apache Commons Logging (JCL)** can be bridged to SLF4J via ``org.slf4j:jcl-over-slf4j`` or + to Log4j 2 via ``org.apache.logging.log4j:log4j-jcl``. + .. _java-sdk/types: XCom type mapping diff --git a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py index b1e04fb2dde5f..709987cb2c744 100644 --- a/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py +++ b/airflow-e2e-tests/tests/airflow_e2e_tests/java_sdk_tests/test_java_sdk_dag.py @@ -54,14 +54,21 @@ from __future__ import annotations +import time from datetime import datetime, timezone +from typing import TYPE_CHECKING import pytest from airflow_e2e_tests.e2e_test_utils.clients import AirflowClient +if TYPE_CHECKING: + from collections.abc import Callable + # The Java extract task sleeps 6 s + coordinator startup; allow plenty of room. _JAVA_TASK_TIMEOUT = 600 +# Logs can lag slightly behind the task reaching a terminal state. +_LOG_FETCH_TIMEOUT = 60 class TestJavaSDKAnnotationExample: @@ -166,3 +173,71 @@ def test_load_retried_then_succeeded(self): f"Java 'load' task should have run twice (fail then retry); " f"try_number={load_ti.get('try_number')!r}, ti: {load_ti}" ) + + def _wait_for_transform_log_record( + self, run_id: str, try_number: int, match: Callable[[dict], bool] + ) -> tuple[dict | None, list[dict]]: + """Poll the ``transform`` task logs until a record matching *match* appears. + + Logs can lag behind the terminal task state, and earlier records (e.g. the + first transform line) arrive before the one under test, so returning on any + record would race. Keep polling until the target record shows up or the + deadline passes. Returns the matching record (or ``None``) and the last + batch of records seen for diagnostics. + """ + deadline = time.monotonic() + _LOG_FETCH_TIMEOUT + records: list[dict] = [] + while True: + resp = self.airflow_client.get_task_logs( + dag_id="java_annotation_example", run_id=run_id, task_id="transform", try_number=try_number + ) + records = [entry for entry in resp.get("content", []) if isinstance(entry, dict)] + record = next((r for r in records if match(r)), None) + if record is not None or time.monotonic() > deadline: + return record, records + time.sleep(3) + + def test_application_logs_preserve_their_level(self): + """A Java task's SLF4J ``logger.info`` must reach the UI as INFO, not ERROR. + + Without the SDK's SLF4J binding the application's logs fall through to + stderr and the supervisor tags every line ERROR. The binding routes them + over the logs socket carrying the real level instead. + """ + resp = self.airflow_client.trigger_dag( + "java_annotation_example", + json={"logical_date": datetime.now(timezone.utc).isoformat()}, + ) + run_id = resp["dag_run_id"] + dag_state = self.airflow_client.wait_for_dag_run( + dag_id="java_annotation_example", + run_id=run_id, + timeout=_JAVA_TASK_TIMEOUT, + ) + + # The log under test is emitted only if transform actually ran; assert it + # succeeded and fetch the attempt that produced the logs (transform does + # not retry, but read try_number rather than assuming attempt 1). + ti_resp = self.airflow_client.get_task_instances(dag_id="java_annotation_example", run_id=run_id) + ti_map = {ti["task_id"]: ti for ti in ti_resp.get("task_instances", [])} + transform_ti = ti_map.get("transform", {}) + assert transform_ti.get("state") == "success", ( + f"Java 'transform' task must succeed to emit the log under test.\n" + f" task state : {transform_ti.get('state')!r}\n" + f" dag state : {dag_state!r}\n" + f" all tasks : { {k: v.get('state') for k, v in ti_map.items()} }" + ) + + # transform logs `logger.info("Got variable {}", variable)` -> "Got variable 123". + record, records = self._wait_for_transform_log_record( + run_id, + transform_ti.get("try_number", 1), + lambda r: str(r.get("event", "")).startswith("Got variable"), + ) + assert record is not None, ( + f"transform should emit a 'Got variable' INFO record; " + f"events seen: {[r.get('event') for r in records]}" + ) + assert str(record.get("level", "")).lower() == "info", ( + f"application INFO log should keep its level, got {record.get('level')!r}; record: {record}" + ) diff --git a/java-sdk/bom/build.gradle.kts b/java-sdk/bom/build.gradle.kts index 479cbdde27fd0..1656dc3ca9729 100644 --- a/java-sdk/bom/build.gradle.kts +++ b/java-sdk/bom/build.gradle.kts @@ -28,7 +28,10 @@ val airflowSupervisorSchemaVersion: String by project dependencies { constraints { api("org.apache.airflow:airflow-sdk:$projectVersion") - api("org.apache.airflow:airflow-sdk-processor:$projectVersion") + api("org.apache.airflow:airflow-sdk-jul:$projectVersion") + api("org.apache.airflow:airflow-sdk-log4j2:$projectVersion") + api("org.apache.airflow:airflow-sdk-processor:${projectVersion}") + api("org.apache.airflow:airflow-sdk-slf4j:$projectVersion") } } diff --git a/java-sdk/example/build.gradle b/java-sdk/example/build.gradle index 497ea8f5103a6..5275b7fae545c 100644 --- a/java-sdk/example/build.gradle +++ b/java-sdk/example/build.gradle @@ -29,7 +29,7 @@ repositories { dependencies { annotationProcessor("org.apache.airflow:airflow-sdk-processor:${projectVersion}") implementation("org.apache.airflow:airflow-sdk:${projectVersion}") - implementation("org.slf4j:slf4j-simple:2.0.17") + implementation("org.apache.airflow:airflow-sdk-jpl:${projectVersion}") } java { diff --git a/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java b/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java index 3915d8e5c384f..85c4162f8caeb 100644 --- a/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java +++ b/java-sdk/example/src/java/org/apache/airflow/example/AnnotationExample.java @@ -19,43 +19,43 @@ package org.apache.airflow.example; +import static java.lang.System.Logger.Level.INFO; + import java.util.Date; import org.apache.airflow.sdk.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SuppressWarnings("DuplicatedCode") @Builder.Dag(id = "java_annotation_example") public class AnnotationExample { - private static final Logger logger = LoggerFactory.getLogger(AnnotationExample.class); + private static final System.Logger log = System.getLogger(AnnotationExample.class.getName()); @Builder.Task(id = "extract") public long extractValue(Client client) throws InterruptedException { - logger.info("Hello from task"); + log.log(INFO, "Hello from task"); var pythonXcom = client.getXCom("python_task_1"); - logger.info("Got XCom from Python Task 'python_task_1' {}", pythonXcom); + log.log(INFO, "Got XCom from python_task_1: {0}", pythonXcom); var connection = client.getConnection("test_http"); - logger.info("Got con {}", connection); + log.log(INFO, "Got connection: {0}", connection); for (var i = 0; i < 3; i++) { - logger.info("Beep {}, next time will be {}", i, new Date()); + log.log(INFO, "Beep {0}, next time will be {1}", i, new Date()); Thread.sleep(2 * 1000); } - logger.info("Goodbye from task"); + log.log(INFO, "Goodbye from task"); return new Date().getTime(); } @Builder.Task(id = "transform") public long transformValue(Client client, @Builder.XCom(task = "extract") long extracted) { - logger.info("Got XCom from 'extract' {}", extracted); + log.log(INFO, "Got XCom from extract: {0}", extracted); var variable = client.getVariable("my_variable"); - logger.info("Got variable {}", variable); + log.log(INFO, "Got variable: {0}", variable); - logger.info("Push XCom to python task 2"); + log.log(INFO, "Push XCom to python task 2"); return new Date().getTime(); } @@ -66,10 +66,10 @@ public long transformValue(Client client, @Builder.XCom(task = "extract") long e // set. The retry then runs this task again and it returns normally. @Builder.Task public void load(Context context, @Builder.XCom(task = "transform") long transformed) { - logger.info("Got XCom from 'transform' {}", transformed); + log.log(INFO, "Got XCom from transform: {0}", transformed); if (context.ti.tryNumber == 1) { throw new RuntimeException("I failed"); } - logger.info("Recovered on retry, try number {}", context.ti.tryNumber); + log.log(INFO, "Recovered on retry, try number {0}", context.ti.tryNumber); } } diff --git a/java-sdk/example/src/java/org/apache/airflow/example/InterfaceExampleBuilder.java b/java-sdk/example/src/java/org/apache/airflow/example/InterfaceExampleBuilder.java index 7853927b66a50..1c536c3cbf2c7 100644 --- a/java-sdk/example/src/java/org/apache/airflow/example/InterfaceExampleBuilder.java +++ b/java-sdk/example/src/java/org/apache/airflow/example/InterfaceExampleBuilder.java @@ -19,45 +19,46 @@ package org.apache.airflow.example; +import static java.lang.System.Logger.Level.INFO; + import java.util.Date; import org.apache.airflow.sdk.*; import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @SuppressWarnings("DuplicatedCode") public class InterfaceExampleBuilder { - private static final Logger logger = LoggerFactory.getLogger(InterfaceExampleBuilder.class); + private static final System.Logger log = + System.getLogger(InterfaceExampleBuilder.class.getName()); public static class Extract implements Task { public void execute(@NotNull Context context, Client client) throws Exception { - logger.info("Hello from task"); + log.log(INFO, "Hello from task"); var pythonInput = client.getXCom("python_task_1"); - logger.info("Got XCom from Python Task 'python_task_1' {}", pythonInput); + log.log(INFO, "Got XCom from python_task_1: {0}", pythonInput); var connection = client.getConnection("test_http"); - logger.info("Got con {}", connection); + log.log(INFO, "Got connection: {0}", connection); for (var i = 0; i < 3; i++) { - logger.info("Beep {}, next time will be {}", i, new Date()); + log.log(INFO, "Beep {0}, next time will be {1}", i, new Date()); Thread.sleep(2 * 1000); } client.setXCom(new Date().getTime()); - logger.info("Goodbye from task"); + log.log(INFO, "Goodbye from task"); } } public static class Transform implements Task { public void execute(@NotNull Context context, Client client) { var extracted = client.getXCom("extract"); - logger.info("Got XCom from 'extract' {}", extracted); + log.log(INFO, "Got XCom from extract: {0}", extracted); var variable = client.getVariable("my_variable"); - logger.info("Got variable {}", variable); + log.log(INFO, "Got variable: {0}", variable); - logger.info("Push XCom to python task 2"); + log.log(INFO, "Push XCom to python task 2"); client.setXCom(new Date().getTime()); } } @@ -65,7 +66,7 @@ public void execute(@NotNull Context context, Client client) { public static class Load implements Task { public void execute(@NotNull Context context, Client client) { var transformed = client.getXCom("transform"); - logger.info("Got XCom from 'transform' {}", transformed); + log.log(INFO, "Got XCom from transform: {0}", transformed); throw new RuntimeException("I failed"); } } diff --git a/java-sdk/gradle.properties b/java-sdk/gradle.properties index 47099646bfbb5..9438ba6435532 100644 --- a/java-sdk/gradle.properties +++ b/java-sdk/gradle.properties @@ -20,3 +20,5 @@ org.gradle.configuration-cache=true airflowSupervisorSchemaVersion=2026-06-16 projectVersion=1.0.0-SNAPSHOT + +mockkVersion=1.13.12 diff --git a/java-sdk/jpl/build.gradle.kts b/java-sdk/jpl/build.gradle.kts new file mode 100644 index 0000000000000..ecc351beefd59 --- /dev/null +++ b/java-sdk/jpl/build.gradle.kts @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + `java-library` + id("airflow-jvm-conventions") + id("airflow-publish") +} + +dependencies { + implementation(project(":sdk")) + testImplementation(kotlin("test")) + testImplementation(testFixtures(project(":sdk"))) +} + +java { + withSourcesJar() // Required by Maven Central. +} + +tasks.withType { + useJUnitPlatform() +} + +publishing { + publications { + create("mavenJava") { + artifactId = "airflow-sdk-jpl" + from(components["java"]) + pom { + name = "Apache Airflow Java SDK Java Platform Logging Provider" + description = + "Java Platform Logging (System.Logger) provider for the Apache Airflow Java SDK. " + + "Routes java.lang.System.Logger calls from task code through the SDK " + + "to Airflow's task log store." + } + } + } +} diff --git a/java-sdk/jpl/src/main/kotlin/org/apache/airflow/sdk/jpl/AirflowSystemLoggerFinder.kt b/java-sdk/jpl/src/main/kotlin/org/apache/airflow/sdk/jpl/AirflowSystemLoggerFinder.kt new file mode 100644 index 0000000000000..74069982961c5 --- /dev/null +++ b/java-sdk/jpl/src/main/kotlin/org/apache/airflow/sdk/jpl/AirflowSystemLoggerFinder.kt @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.jpl + +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.Log +import java.text.MessageFormat +import java.util.MissingResourceException +import java.util.ResourceBundle + +/** + * [System.LoggerFinder] that routes [System.Logger] calls to the Airflow task log store. + * + * Registered via [META-INF/services/java.lang.System$LoggerFinder][System.LoggerFinder]. + * All loggers share a single implementation regardless of the requesting module. + */ +class AirflowSystemLoggerFinder : System.LoggerFinder() { + override fun getLogger( + name: String, + module: Module, + ): System.Logger = AirflowSystemLogger(name) +} + +internal class AirflowSystemLogger( + private val name: String, +) : System.Logger { + override fun getName(): String = name + + override fun isLoggable(level: System.Logger.Level): Boolean = + level != System.Logger.Level.OFF && Log.isEnabledForLevel(level.convert(), name) + + override fun log( + level: System.Logger.Level, + bundle: ResourceBundle?, + msg: String?, + thrown: Throwable?, + ) { + if (!isLoggable(level)) return + Log.send(level.convert(), name, bundle.resolve(msg)) { + thrown?.let { put("exception", it.stackTraceToString()) } + } + } + + override fun log( + level: System.Logger.Level, + bundle: ResourceBundle?, + format: String?, + params: Array?, + ) { + if (!isLoggable(level)) return + + var message = bundle.resolve(format) + + fun renderEvent(): Pair> { + if (params.isNullOrEmpty()) return message to emptyMap() + val arguments = + buildMap { + message = + try { + MessageFormat.format(message, *params) + } catch (e: IllegalArgumentException) { + params.forEachIndexed { i, v -> put(i.toString(), v) } + put("exception", e.stackTraceToString()) + message + } + } + return message to arguments + } + + val (event, arguments) = renderEvent() + Log.send(level.convert(), name, event, arguments) + } +} + +private fun System.Logger.Level.convert(): Level = + when (this) { + System.Logger.Level.OFF, System.Logger.Level.ERROR -> Level.ERROR + System.Logger.Level.WARNING -> Level.WARNING + System.Logger.Level.INFO -> Level.INFO + System.Logger.Level.DEBUG -> Level.DEBUG + System.Logger.Level.TRACE, System.Logger.Level.ALL -> Level.NOTSET + } + +private fun ResourceBundle?.resolve(key: String?): String { + if (key == null) return "" + if (this == null) return key + return try { + getString(key) + } catch (_: MissingResourceException) { + key + } +} diff --git a/java-sdk/jpl/src/main/resources/META-INF/services/java.lang.System$LoggerFinder b/java-sdk/jpl/src/main/resources/META-INF/services/java.lang.System$LoggerFinder new file mode 100644 index 0000000000000..fdae8c246f75b --- /dev/null +++ b/java-sdk/jpl/src/main/resources/META-INF/services/java.lang.System$LoggerFinder @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.apache.airflow.sdk.jpl.AirflowSystemLoggerFinder diff --git a/java-sdk/jpl/src/test/kotlin/org/apache/airflow/sdk/jpl/AirflowSystemLoggerTest.kt b/java-sdk/jpl/src/test/kotlin/org/apache/airflow/sdk/jpl/AirflowSystemLoggerTest.kt new file mode 100644 index 0000000000000..7e36d50f90f80 --- /dev/null +++ b/java-sdk/jpl/src/test/kotlin/org/apache/airflow/sdk/jpl/AirflowSystemLoggerTest.kt @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.jpl + +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.LogCapture +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.ResourceBundle + +class AirflowSystemLoggerTest { + private lateinit var logger: AirflowSystemLogger + + @BeforeEach + fun setUp() { + logger = AirflowSystemLogger("com.example.Task") + LogCapture.resetThresholds() + LogCapture.drain() + } + + @Test + fun `level conversions`() { + val cases = + listOf( + System.Logger.Level.TRACE to Level.NOTSET, + System.Logger.Level.DEBUG to Level.DEBUG, + System.Logger.Level.INFO to Level.INFO, + System.Logger.Level.WARNING to Level.WARNING, + System.Logger.Level.ERROR to Level.ERROR, + ) + cases.forEach { (sysLevel, expected) -> + LogCapture.drain() + logger.log(sysLevel, null as ResourceBundle?, "m", null as Array?) + val messages = LogCapture.drain().filter { it.logger == "com.example.Task" } + assertEquals(1, messages.size, "Expected exactly one message for System.Logger $sysLevel") + assertEquals(expected, messages.single().level, "System.Logger $sysLevel should map to SDK $expected") + } + } + + @Test + fun `OFF level is not forwarded`() { + logger.log(System.Logger.Level.OFF, null as ResourceBundle?, "should not appear", null as Array?) + assertTrue(LogCapture.drain().none { it.logger == "com.example.Task" }) + } + + @Test + fun `message and logger name are forwarded`() { + logger.log(System.Logger.Level.INFO, null as ResourceBundle?, "hello", null as Array?) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals(Level.INFO, msg.level) + assertEquals("com.example.Task", msg.logger) + assertEquals("hello", msg.event) + } + + @Test + fun `null params array is tolerated (matches JDK default method delegation)`() { + logger.log(System.Logger.Level.INFO, null as ResourceBundle?, "hello", null as Array?) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("hello", msg.event) + assertEquals(emptyMap(), msg.arguments) + } + + @Test + fun `null throwable is tolerated (matches JDK contract)`() { + logger.log(System.Logger.Level.INFO, null as ResourceBundle?, "hello", null as Throwable?) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("hello", msg.event) + assertEquals(emptyMap(), msg.arguments) + } + + @Test + fun `message parameters are rendered into the message`() { + logger.log(System.Logger.Level.INFO, null as ResourceBundle?, "{0} {1}", arrayOf("alpha", 42)) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("alpha 42", msg.event) + assertEquals(emptyMap(), msg.arguments) + } + + @Test + fun `malformed pattern keeps the template and preserves parameters as metadata`() { + // "{0" is an unterminated MessageFormat element, so rendering throws and we fall back. + logger.log(System.Logger.Level.INFO, null as ResourceBundle?, "{0", arrayOf("alpha", 42)) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("{0", msg.event) + assertEquals("alpha", msg.arguments["0"]) + assertEquals(42, msg.arguments["1"]) + } + + @Test + fun `throwable is stored under the exception key`() { + val ex = RuntimeException("boom") + logger.log(System.Logger.Level.ERROR, null as ResourceBundle?, "oops", ex) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertTrue(msg.arguments["exception"].toString().contains("boom")) + } + + @Test + fun `bundle key is resolved to localised string`() { + val bundle = + object : ResourceBundle() { + override fun handleGetObject(key: String) = if (key == "greeting") "hello" else null + + override fun getKeys() = java.util.Collections.enumeration(listOf("greeting")) + } + logger.log(System.Logger.Level.INFO, bundle, "greeting", null as Array?) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("hello", msg.event) + } + + @Test + fun `missing bundle key falls back to the key itself`() { + val bundle = + object : ResourceBundle() { + override fun handleGetObject(key: String) = null + + override fun getKeys() = java.util.Collections.emptyEnumeration() + } + logger.log(System.Logger.Level.INFO, bundle, "unknown.key", null as Array?) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("unknown.key", msg.event) + } +} diff --git a/java-sdk/jul/build.gradle.kts b/java-sdk/jul/build.gradle.kts new file mode 100644 index 0000000000000..5dcc056976ead --- /dev/null +++ b/java-sdk/jul/build.gradle.kts @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + `java-library` + id("airflow-jvm-conventions") + id("airflow-publish") +} + +val mockkVersion: String by project + +dependencies { + implementation(project(":sdk")) + testImplementation(kotlin("test")) + testImplementation("io.mockk:mockk:$mockkVersion") + testImplementation("io.mockk:mockk-agent:$mockkVersion") +} + +java { + withSourcesJar() // Required by Maven Central. +} + +tasks.withType { + useJUnitPlatform() + jvmArgs("-Djdk.attach.allowAttachSelf=true") +} + +publishing { + publications { + create("mavenJava") { + artifactId = "airflow-sdk-jul" + from(components["java"]) + pom { + name = "Apache Airflow Java SDK java.util.Logging Handler" + description = "Routes java.util.Logging calls from task code through the SDK to Airflow's task log store." + } + } + } +} diff --git a/java-sdk/jul/src/main/kotlin/org/apache/airflow/sdk/jul/AirflowJulHandler.kt b/java-sdk/jul/src/main/kotlin/org/apache/airflow/sdk/jul/AirflowJulHandler.kt new file mode 100644 index 0000000000000..46605c9b0f2ae --- /dev/null +++ b/java-sdk/jul/src/main/kotlin/org/apache/airflow/sdk/jul/AirflowJulHandler.kt @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.jul + +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.Log +import java.util.logging.Handler +import java.util.logging.LogRecord +import java.util.logging.Logger +import java.util.logging.SimpleFormatter +import java.util.logging.Level as JLevel + +/** + * Convert a JUL Level to an SDK Level. + * + * JUL levels are VASTLY different from SDK levels. The `>` and `>=` criteria + * are chosen intentionally (but also arbitrarily) to fit JUL level regions + * more equally, while still keeping the predefined levels match. + */ +private fun JLevel.convert() = + intValue().let { + if (it > JLevel.SEVERE.intValue()) { + Level.CRITICAL + } else if (it > JLevel.WARNING.intValue()) { + Level.ERROR + } else if (it > JLevel.INFO.intValue()) { + Level.WARNING + } else if (it >= JLevel.CONFIG.intValue()) { + Level.INFO + } else if (it >= JLevel.FINER.intValue()) { + Level.DEBUG + } else { + Level.NOTSET + } + } + +/** + * A [Handler] that routes java.util.logging records through the Airflow Java SDK's + * log pipeline to Airflow's task log store. + */ +class AirflowJulHandler : Handler() { + // Used only for [java.util.logging.Formatter.formatMessage]: it localizes + // and substitutes parameters but, unlike format(), never appends the + // throwable's stack trace, which we send separately instead, to the text. + private val formatter = SimpleFormatter() + + override fun publish(record: LogRecord) { + if (!isLoggable(record)) return + val level = record.level.convert() + val logger = record.loggerName + if (!Log.isEnabledForLevel(level, logger)) return + Log.send(level, logger ?: "", formatter.formatMessage(record)) { + record.thrown?.run { put("exception", stackTraceToString()) } + } + } + + override fun flush() = Unit + + override fun close() = Unit + + companion object { + /** + * Install an [AirflowJulHandler] on the root logger. + * + * This is a convenience method to install the handler on all loggers if + * you choose to do this programmatically (rather than with a properties + * file). You should typically do it in the Dag bundle's `main` method + * before you create the [org.apache.airflow.sdk.Bundle] object. + */ + @JvmStatic + fun install() { + val root = Logger.getLogger("") + if (root.handlers.none { it is AirflowJulHandler }) { + root.addHandler(AirflowJulHandler()) + } + } + } +} diff --git a/java-sdk/jul/src/test/kotlin/org/apache/airflow/sdk/jul/AirflowJulHandlerTest.kt b/java-sdk/jul/src/test/kotlin/org/apache/airflow/sdk/jul/AirflowJulHandlerTest.kt new file mode 100644 index 0000000000000..d0ad2fe19030b --- /dev/null +++ b/java-sdk/jul/src/test/kotlin/org/apache/airflow/sdk/jul/AirflowJulHandlerTest.kt @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.jul + +import io.mockk.every +import io.mockk.just +import io.mockk.mockkObject +import io.mockk.runs +import io.mockk.slot +import io.mockk.unmockkAll +import io.mockk.verify +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.Log +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import java.util.logging.LogRecord +import java.util.logging.Logger +import java.util.logging.Level as JLevel + +class AirflowJulHandlerTest { + private lateinit var handler: AirflowJulHandler + + @BeforeEach + fun setUp() { + handler = AirflowJulHandler() + mockkObject(Log) + every { Log.isEnabledForLevel(any(), any()) } returns true + every { Log.send(any(), any(), any(), any.() -> Unit>()) } just runs + } + + @AfterEach + fun tearDown() { + unmockkAll() + // Remove any handlers installed by install() tests so they don't leak between tests. + val root = Logger.getLogger("") + root.handlers.filterIsInstance().forEach { root.removeHandler(it) } + } + + // Mapping: + // > 1000 -> CRITICAL + // > 900 -> ERROR (SEVERE = 1000) + // > 800 -> WARNING (WARNING = 900) + // >= 700 -> INFO (INFO = 800, CONFIG = 700) + // >= 400 -> DEBUG (FINE = 500, FINER = 400) + // else -> NOTSET (FINEST = 300, ALL) + @Test + fun `level conversions`() { + // Custom level above SEVERE to hit the CRITICAL branch. + val aboveSevere = object : JLevel("ABOVE_SEVERE", 1001) {} + + val cases = + listOf( + aboveSevere to Level.CRITICAL, + JLevel.SEVERE to Level.ERROR, + JLevel.WARNING to Level.WARNING, + JLevel.INFO to Level.INFO, + JLevel.CONFIG to Level.INFO, + JLevel.FINE to Level.DEBUG, + JLevel.FINER to Level.DEBUG, + JLevel.FINEST to Level.NOTSET, + ) + cases.forEach { (julLevel, expected) -> + val capturedLevel = slot() + every { Log.send(capture(capturedLevel), any(), any(), any.() -> Unit>()) } just runs + handler.publish(record("msg", julLevel)) + assertEquals(expected, capturedLevel.captured, "JUL $julLevel (${julLevel.intValue()}) should map to SDK $expected") + } + } + + @Test + fun `message and logger name are forwarded`() { + handler.publish(record("hello world", JLevel.INFO, loggerName = "com.example.Task")) + verify { Log.send(Level.INFO, "com.example.Task", "hello world", any.() -> Unit>()) } + } + + @Test + fun `message parameters are rendered into the message`() { + val rec = + record("msg {0} {1}", JLevel.INFO).also { + it.parameters = arrayOf("alpha", 42) + } + handler.publish(rec) + verify { Log.send(Level.INFO, "test.Logger", "msg alpha 42", any.() -> Unit>()) } + } + + @Test + fun `thrown is stored under the exception key`() { + val lambdaSlot = slot.() -> Unit>() + every { Log.send(any(), any(), any(), capture(lambdaSlot)) } just runs + val rec = + LogRecord(JLevel.SEVERE, "failure").also { + it.thrown = RuntimeException("kaboom") + } + handler.publish(rec) + val args = mutableMapOf().also { lambdaSlot.captured.invoke(it) } + assertTrue(args["exception"].toString().contains("kaboom")) + } + + private fun record( + message: String, + level: JLevel, + loggerName: String = "test.Logger", + ) = LogRecord(level, message).also { it.loggerName = loggerName } +} diff --git a/java-sdk/log4j2/build.gradle.kts b/java-sdk/log4j2/build.gradle.kts new file mode 100644 index 0000000000000..9149377167ec9 --- /dev/null +++ b/java-sdk/log4j2/build.gradle.kts @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + `java-library` + id("airflow-jvm-conventions") + id("airflow-publish") +} + +val mockkVersion: String by project + +// We are intentionally separating compileOnly and testImplementation for +// "org.apache.logging.log4j:log4j-core:2.26.0" so it's not pulled at +// runtime when it's unnecessary. +@Suppress("GradleDependencyAddedMultipleTimes") +dependencies { + annotationProcessor("org.apache.logging.log4j:log4j-core:2.26.0") + api("org.apache.logging.log4j:log4j-api:2.26.0") + compileOnly("org.apache.logging.log4j:log4j-core:2.26.0") + implementation(project(":sdk")) + + testImplementation(kotlin("test")) + testImplementation("io.mockk:mockk:$mockkVersion") + testImplementation("io.mockk:mockk-agent:$mockkVersion") + testImplementation("org.apache.logging.log4j:log4j-core:2.26.0") +} + +java { + withSourcesJar() // Required by Maven Central. +} + +tasks.withType { + options.compilerArgs.addAll( + listOf( + "-Alog4j.graalvm.groupId=org.apache.airflow", + "-Alog4j.graalvm.artifactId=airflow-sdk-log4j2", + ), + ) +} + +tasks.withType { + useJUnitPlatform() + jvmArgs("-Djdk.attach.allowAttachSelf=true") +} + +publishing { + publications { + create("mavenJava") { + artifactId = "airflow-sdk-log4j2" + from(components["java"]) + pom { + name = "Apache Airflow Java SDK Log4j 2 Appender" + description = "Routes Log4j 2 log calls from task code through the SDK to Airflow's task log store." + } + } + } +} diff --git a/java-sdk/log4j2/src/main/java/org/apache/airflow/sdk/log4j/AirflowLog4jAppender.java b/java-sdk/log4j2/src/main/java/org/apache/airflow/sdk/log4j/AirflowLog4jAppender.java new file mode 100644 index 0000000000000..1c27cd1dd796c --- /dev/null +++ b/java-sdk/log4j2/src/main/java/org/apache/airflow/sdk/log4j/AirflowLog4jAppender.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.log4j; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import org.apache.airflow.sdk.execution.Level; +import org.apache.airflow.sdk.execution.Log; +import org.apache.logging.log4j.core.Appender; +import org.apache.logging.log4j.core.Core; +import org.apache.logging.log4j.core.Filter; +import org.apache.logging.log4j.core.LogEvent; +import org.apache.logging.log4j.core.appender.AbstractAppender; +import org.apache.logging.log4j.core.config.Property; +import org.apache.logging.log4j.core.config.plugins.Plugin; +import org.apache.logging.log4j.core.config.plugins.PluginAttribute; +import org.apache.logging.log4j.core.config.plugins.PluginElement; +import org.apache.logging.log4j.core.config.plugins.PluginFactory; +import org.apache.logging.log4j.spi.StandardLevel; + +/** + * A Log4j {@link Appender} to route logs to Airflow. + * + *

This class is not called explicitly. An annotation processor reads the + * class (since it's annotated with {@link Plugin}) and generates information + * needed by Log4J. + */ +@Plugin( + name = "AirflowAppender", + category = Core.CATEGORY_NAME, + elementType = Appender.ELEMENT_TYPE) +public final class AirflowLog4jAppender extends AbstractAppender { + + private AirflowLog4jAppender(String name, Filter filter) { + super(name, filter, null, true, Property.EMPTY_ARRAY); + } + + @PluginFactory + public static AirflowLog4jAppender createAppender( + @PluginAttribute("name") String name, @PluginElement("Filter") Filter filter) { + return new AirflowLog4jAppender(name == null ? "AirflowAppender" : name, filter); + } + + @Override + public void append(LogEvent event) { + var level = convert(event.getLevel()); + var logger = event.getLoggerName(); + if (!Log.INSTANCE.isEnabledForLevel(level, logger)) return; + // Log4J does not really provide a good way to access the underlying unformatted data + // since it allows vastly different logging mechanisms. We pre-format the message here + // and send the exception and marker separately as structured metadata. + String message = event.getMessage().getFormattedMessage(); + Map args = new HashMap<>(); + var thrown = event.getThrown(); + if (thrown != null) { + args.put("exception", stackTrace(thrown)); + } + var marker = event.getMarker(); + if (marker != null) { + args.put("marker", marker.getName()); + } + Log.INSTANCE.send(level, logger, message, args); + } + + private static Level convert(org.apache.logging.log4j.Level level) { + var v = level.intLevel(); + if (v < StandardLevel.ERROR.intLevel()) return Level.CRITICAL; + if (v < StandardLevel.WARN.intLevel()) return Level.ERROR; + if (v < StandardLevel.INFO.intLevel()) return Level.WARNING; + if (v < StandardLevel.DEBUG.intLevel()) return Level.INFO; + if (v < StandardLevel.TRACE.intLevel()) return Level.DEBUG; + return Level.NOTSET; + } + + private static String stackTrace(Throwable t) { + var sw = new StringWriter(); + t.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } +} diff --git a/java-sdk/log4j2/src/test/kotlin/org/apache/airflow/sdk/log4j/AirflowLog4jAppenderTest.kt b/java-sdk/log4j2/src/test/kotlin/org/apache/airflow/sdk/log4j/AirflowLog4jAppenderTest.kt new file mode 100644 index 0000000000000..f8284f5c5b7e8 --- /dev/null +++ b/java-sdk/log4j2/src/test/kotlin/org/apache/airflow/sdk/log4j/AirflowLog4jAppenderTest.kt @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.log4j + +import io.mockk.every +import io.mockk.just +import io.mockk.mockkObject +import io.mockk.runs +import io.mockk.slot +import io.mockk.unmockkAll +import io.mockk.verify +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.Log +import org.apache.logging.log4j.MarkerManager +import org.apache.logging.log4j.core.impl.Log4jLogEvent +import org.apache.logging.log4j.message.SimpleMessage +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.apache.logging.log4j.Level as L4jLevel + +class AirflowLog4jAppenderTest { + private lateinit var appender: AirflowLog4jAppender + + @BeforeEach + fun setUp() { + appender = AirflowLog4jAppender.createAppender("AirflowAppender", null) + mockkObject(Log) + every { Log.isEnabledForLevel(any(), any()) } returns true + every { Log.send(any(), any(), any(), any>()) } just runs + } + + @AfterEach + fun tearDown() { + unmockkAll() + } + + @Test + fun `factory honors the configured name and defaults when absent`() { + assertEquals("Airflow", AirflowLog4jAppender.createAppender("Airflow", null).name) + assertEquals("AirflowAppender", AirflowLog4jAppender.createAppender(null, null).name) + } + + @Test + fun `level conversions`() { + val cases = + listOf( + L4jLevel.FATAL to Level.CRITICAL, + L4jLevel.ERROR to Level.ERROR, + L4jLevel.WARN to Level.WARNING, + L4jLevel.INFO to Level.INFO, + L4jLevel.DEBUG to Level.DEBUG, + L4jLevel.TRACE to Level.NOTSET, + L4jLevel.ALL to Level.NOTSET, + ) + cases.forEach { (l4jLevel, expected) -> + val capturedLevel = slot() + every { Log.send(capture(capturedLevel), any(), any(), any>()) } just runs + appender.append(event("msg", l4jLevel)) + assertEquals(expected, capturedLevel.captured, "Log4j $l4jLevel should map to SDK $expected") + } + } + + @Test + fun `formatted message and logger name are forwarded`() { + val capturedArgs = slot>() + appender.append(event("hello world", L4jLevel.INFO, loggerName = "com.example.Task")) + verify { Log.send(Level.INFO, "com.example.Task", "hello world", capture(capturedArgs)) } + assertTrue(capturedArgs.captured.isEmpty()) + } + + @Test + fun `thrown is stored under the exception key`() { + val capturedArgs = slot>() + every { Log.send(any(), any(), any(), capture(capturedArgs)) } just runs + appender.append(event("failure", L4jLevel.ERROR, thrown = RuntimeException("kaboom"))) + assertTrue(capturedArgs.captured["exception"].toString().contains("kaboom")) + } + + @Test + fun `marker name is stored under the marker key`() { + val capturedArgs = slot>() + every { Log.send(any(), any(), any(), capture(capturedArgs)) } just runs + appender.append(event("hi", L4jLevel.INFO, marker = MarkerManager.getMarker("AUDIT"))) + assertEquals("AUDIT", capturedArgs.captured["marker"]) + } + + private fun event( + message: String, + level: L4jLevel, + loggerName: String = "test.Logger", + thrown: Throwable? = null, + marker: org.apache.logging.log4j.Marker? = null, + ) = Log4jLogEvent + .newBuilder() + .setLoggerName(loggerName) + .setLevel(level) + .setMessage(SimpleMessage(message)) + .setThrown(thrown) + .setMarker(marker) + .build() +} diff --git a/java-sdk/sdk/build.gradle.kts b/java-sdk/sdk/build.gradle.kts index 216ef76513123..1fcd07292a4da 100644 --- a/java-sdk/sdk/build.gradle.kts +++ b/java-sdk/sdk/build.gradle.kts @@ -21,6 +21,7 @@ val airflowSupervisorSchemaVersion: String by project plugins { `java-library` + `java-test-fixtures` id("airflow-jvm-conventions") id("airflow-publish") id("org.jetbrains.dokka") version "2.2.0" @@ -251,6 +252,9 @@ publishing { create("mavenJava") { artifactId = "airflow-sdk" from(components["java"]) + // test-fixtures are not published to Maven Central. + suppressPomMetadataWarningsFor("testFixturesApiElements") + suppressPomMetadataWarningsFor("testFixturesRuntimeElements") artifact(javadocJar) pom { name = "Apache Airflow Java SDK" diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt index 2bab74bf23d2b..6f82ca3c4c3bd 100644 --- a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt +++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/Server.kt @@ -25,6 +25,7 @@ import io.ktor.network.sockets.InetSocketAddress import io.ktor.network.sockets.aSocket import io.ktor.network.sockets.openReadChannel import io.ktor.network.sockets.openWriteChannel +import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch @@ -139,20 +140,27 @@ class Server( */ suspend fun serveAsync(bundle: Bundle) = coroutineScope { + val deferral = CompletableDeferred() + launch { - aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket -> - logger.debug("Connected comm", mapOf("addr" to comm)) - CoordinatorComm( - bundle, - socket.openReadChannel(), - socket.openWriteChannel(autoFlush = true), - ).startProcessing() + try { + aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(comm).use { socket -> + logger.debug("Connected comm", mapOf("addr" to comm)) + CoordinatorComm( + bundle, + socket.openReadChannel(), + socket.openWriteChannel(autoFlush = true), + ).startProcessing() + } + } finally { + deferral.complete(Unit) } } launch { aSocket(SelectorManager(Dispatchers.IO)).tcp().connect(logs).use { socket -> logger.debug("Connected logs", mapOf("addr" to logs)) LogSender.configure(socket.openWriteChannel(autoFlush = true)) + deferral.await() } } } diff --git a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt index 27005f92082cb..b778e9aa79f9a 100644 --- a/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt +++ b/java-sdk/sdk/src/main/kotlin/org/apache/airflow/sdk/execution/Logger.kt @@ -34,52 +34,188 @@ import java.util.concurrent.ConcurrentLinkedDeque import kotlin.reflect.KClass import kotlin.time.Clock -enum class Level { ERROR, DEBUG, } +// Adapted from Python logging. +enum class Level( + val value: Short, +) { + CRITICAL(50), + ERROR(40), + WARNING(30), + INFO(20), + DEBUG(10), + NOTSET(0), + ; + + internal companion object { + val accepted = entries.associateBy { it.name } + + fun parse(s: String?): Level? = s?.let { Level.accepted[it.trim().uppercase()] } + } +} + +/** + * Parser for the namespace levels configuration. + * + * The value is a series of `=` pairs separated by whitespaces and/or commas. + * Each `` must name one of [Level]s (case-insensitive). When the same logger appears + * more than once, the last value wins. + */ +internal object NamespaceLevels { + const val ENV_VAR = "AIRFLOW__LOGGING__NAMESPACE_LEVELS" + private const val LOGGER_NAME = "org.apache.airflow.sdk.execution.NamespaceLevels" + + /** + * Parse [raw] into per-logger [Level] overrides. + * + * Invalid entries are skipped — the affected logger then falls back to the global level — and + * reported once at [Level.ERROR], so a misconfiguration never takes down logging nor discards the + * valid overrides. A `null`, empty, or whitespace/comma-only value yields no overrides. + * Surrounding and repeated separators are ignored. + */ + fun parse(raw: String?): Map { + if (raw.isNullOrBlank()) return emptyMap() + + val levels = mutableMapOf() + val errors = mutableListOf() + raw + .split(Regex("""[\s,]+""")) + .filter { it.isNotEmpty() } + .forEach { entry -> parseEntry(entry, levels)?.let { errors += it } } + + if (errors.isNotEmpty()) { + LogSender.send( + LogMessage( + event = "Ignoring invalid $ENV_VAR entries: ${errors.joinToString("; ")}", + arguments = emptyMap(), + logger = LOGGER_NAME, + level = Level.ERROR, + ), + ) + } + return levels + } + + /** Parse a single [entry] into [levels], or return a description of why it was skipped. */ + private fun parseEntry( + entry: String, + levels: MutableMap, + ): String? { + val separator = entry.indexOf('=') + if (separator < 0) { + return "malformed entry \"$entry\", expected \"=\"" + } + + val logger = entry.substring(0, separator).trim() + if (logger.isEmpty()) { + return "malformed entry \"$entry\", logger name is empty" + } + + val levelName = entry.substring(separator + 1).trim() + val level = + Level.parse(levelName) + ?: return "invalid level \"$levelName\" for logger \"$logger\", " + + "expected one of: ${Level.entries.joinToString(", ") { it.name }}" + + levels[logger] = level + return null + } +} + +/** + * Public entry point into Airflow's log pipeline. + * + * This is useful for Java-side logging providers such as [java.util.logging] + * and SLF4J to integrate logs they receive into Airflow. + * + * Not intended for use by task code. + */ +object Log { + private const val LOGGING_LEVEL_ENV = "AIRFLOW__LOGGING__LOGGING_LEVEL" + + internal var globalThreshold = Level.parse(System.getenv(LOGGING_LEVEL_ENV)) ?: Level.INFO + internal var namedThresholds = NamespaceLevels.parse(System.getenv(NamespaceLevels.ENV_VAR)) + + /** + * Whether a [level] message from the logger called [name] should be emitted. + * + * Thresholds cascade down the dotted-name hierarchy, mirroring Java logging + * convention. A logger inherits the level of its nearest configured + * ancestor, so with `foo=WARNING` configured, loggers named e.g. `foo.bar` + * also resolves to `WARNING` without additional configuration. A logger with + * no configured ancestor falls back to [globalThreshold]. + */ + fun isEnabledForLevel( + level: Level, + name: String?, + ): Boolean { + var threshold = name + while (threshold != null) { + namedThresholds[threshold]?.let { return level.value >= it.value } + threshold = threshold.substringBeforeLast('.', missingDelimiterValue = "").ifEmpty { null } + } + return level.value >= globalThreshold.value + } + + fun send( + level: Level, + logger: String, + event: String, + arguments: Map = emptyMap(), + ) { + if (!isEnabledForLevel(level, logger)) return + LogSender.send(LogMessage(event, arguments, logger, level)) + } + + fun send( + level: Level, + logger: String, + event: String, + buildArguments: MutableMap.() -> Unit, + ) = send(level, logger, event, buildMap(buildArguments)) +} internal data class LogMessage( val event: String, - val arguments: Map, - val logger: Logger, + val arguments: Map, + val logger: String, val level: Level, val timestamp: LocalDateTime = Clock.System.now().toLocalDateTime(TimeZone.currentSystemDefault()), ) +/** + * Logger used by task scaffolding. + * + * This is a thin wrapper around [LogSender] that our own code can + * use instead of needing to go through a "real" logging provider. + */ internal class Logger( - cls: KClass<*>, + val name: String, ) { - val name: String? = cls.java.typeName - - // TODO: Actually implement level filtering. - @Suppress("UNUSED_PARAMETER") - fun isEnabledForLevel(level: Level): Boolean = true + constructor(cls: KClass<*>) : this(cls.java.typeName) fun debug( message: String, arguments: Map = emptyMap(), - ) { - log(Level.DEBUG, message, arguments) - } + ) = log(Level.DEBUG, message, arguments) fun error( message: String, arguments: Map = emptyMap(), - ) { - log(Level.ERROR, message, arguments) - } + ) = log(Level.ERROR, message, arguments) private fun log( level: Level, event: String, arguments: Map, ) { - if (!isEnabledForLevel(level)) return - LogSender.send(LogMessage(event, arguments, this, level)) + if (!Log.isEnabledForLevel(level, name)) return + LogSender.send(LogMessage(event, arguments, name, level)) } } internal object LogSender { private var writer: ByteWriteChannel? = null - private val messages: ConcurrentLinkedDeque = ConcurrentLinkedDeque() + internal val messages: ConcurrentLinkedDeque = ConcurrentLinkedDeque() fun configure(channel: ByteWriteChannel) { writer = channel @@ -106,7 +242,7 @@ internal object LogSender { val map = message.arguments.toMutableMap() map["event"] = message.event map["level"] = message.level.name.lowercase() - map["logger"] = message.logger.name ?: "(java)" + map["logger"] = message.logger map["timestamp"] = message.timestamp // TODO: Can this be done asynchronously instead? runBlocking { writer.writeString("${map.toJsonElement()}\n") } diff --git a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/LogTest.kt b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/LogTest.kt new file mode 100644 index 0000000000000..e1d7ebf0f63b3 --- /dev/null +++ b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/LogTest.kt @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.execution + +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class LogTest { + @AfterEach + fun tearDown() { + LogCapture.resetThresholds() + } + + @Test + @DisplayName("Without overrides, a logger is gated by the global threshold") + fun shouldUseGlobalThreshold() { + Log.globalThreshold = Level.INFO + Log.namedThresholds = emptyMap() + + assertTrue(Log.isEnabledForLevel(Level.WARNING, "any")) + assertTrue(Log.isEnabledForLevel(Level.INFO, "any")) + assertFalse(Log.isEnabledForLevel(Level.DEBUG, "any")) + } + + @Test + @DisplayName("An exact namespace override applies to its logger; unrelated loggers use global") + fun shouldApplyNamespaceOverride() { + Log.globalThreshold = Level.INFO + Log.namedThresholds = mapOf("chatty" to Level.DEBUG) + + assertTrue(Log.isEnabledForLevel(Level.DEBUG, "chatty")) + assertFalse(Log.isEnabledForLevel(Level.DEBUG, "other")) + assertTrue(Log.isEnabledForLevel(Level.INFO, "other")) + } + + @Test + @DisplayName("A nested logger inherits the level of its nearest configured ancestor") + fun shouldInheritFromAncestor() { + Log.globalThreshold = Level.INFO + Log.namedThresholds = mapOf("foo" to Level.WARNING) + + // foo.bar and foo.bar.rex have no exact entry, so they inherit foo=WARNING. + assertFalse(Log.isEnabledForLevel(Level.INFO, "foo.bar")) + assertFalse(Log.isEnabledForLevel(Level.INFO, "foo.bar.rex")) + assertTrue(Log.isEnabledForLevel(Level.WARNING, "foo.bar.rex")) + } + + @Test + @DisplayName("The most specific configured ancestor wins") + fun shouldPreferMostSpecificAncestor() { + Log.globalThreshold = Level.INFO + Log.namedThresholds = mapOf("foo" to Level.WARNING, "foo.bar" to Level.DEBUG) + + // foo.bar.rex's nearest ancestor is foo.bar=DEBUG, not foo=WARNING. + assertTrue(Log.isEnabledForLevel(Level.DEBUG, "foo.bar.rex")) + // A sibling under foo (but not under foo.bar) still resolves to foo=WARNING. + assertFalse(Log.isEnabledForLevel(Level.INFO, "foo.baz")) + } + + @Test + @DisplayName("Inheritance respects dotted-segment boundaries, not raw string prefixes") + fun shouldNotMatchPartialSegmentPrefixes() { + Log.globalThreshold = Level.INFO + Log.namedThresholds = mapOf("foo" to Level.WARNING) + + // "foobar" is not a child of "foo"; it must fall back to the global level. + assertTrue(Log.isEnabledForLevel(Level.INFO, "foobar")) + } +} diff --git a/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/NamespaceLevelsTest.kt b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/NamespaceLevelsTest.kt new file mode 100644 index 0000000000000..e2a4cb306e568 --- /dev/null +++ b/java-sdk/sdk/src/test/kotlin/org/apache/airflow/sdk/execution/NamespaceLevelsTest.kt @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.execution + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test + +class NamespaceLevelsTest { + @BeforeEach + fun setUp() { + LogCapture.drain() // discard anything buffered by earlier tests + } + + @Test + @DisplayName("null, empty and separator-only input parse to no overrides and log nothing") + fun shouldParseBlankToEmptyMap() { + for (raw in listOf(null, "", " ", " , , ")) { + assertEquals(emptyMap(), NamespaceLevels.parse(raw), "levels for <$raw>") + } + assertTrue(LogCapture.drain().isEmpty(), "no log should be emitted for blank input") + } + + @Test + @DisplayName("Should parse the documented example without logging") + fun shouldParseDocumentedExample() { + assertEquals( + mapOf("sqlalchemy" to Level.INFO, "sqlalchemy.engine" to Level.DEBUG), + NamespaceLevels.parse("sqlalchemy=INFO sqlalchemy.engine=DEBUG"), + ) + assertTrue(LogCapture.drain().isEmpty(), "no log should be emitted for valid input") + } + + @Test + @DisplayName("Should accept whitespace, commas and a mix as separators") + fun shouldAcceptVariousSeparators() { + val expected = mapOf("a" to Level.INFO, "b" to Level.DEBUG, "c" to Level.ERROR) + assertEquals(expected, NamespaceLevels.parse("a=INFO b=DEBUG c=ERROR")) + assertEquals(expected, NamespaceLevels.parse("a=INFO,b=DEBUG,c=ERROR")) + assertEquals(expected, NamespaceLevels.parse(" a=INFO ,, b=DEBUG\t c=ERROR ")) + } + + @Test + @DisplayName("Level names are case-insensitive") + fun shouldParseLevelCaseInsensitively() { + assertEquals( + mapOf("a" to Level.INFO, "b" to Level.CRITICAL), + NamespaceLevels.parse("a=info b=Critical"), + ) + } + + @Test + @DisplayName("When a logger is repeated the last value wins") + fun shouldLetLastValueWin() { + assertEquals(mapOf("a" to Level.ERROR), NamespaceLevels.parse("a=INFO a=ERROR")) + } + + @Test + @DisplayName("An entry without '=' is skipped, valid entries kept, and an error is logged") + fun shouldSkipEntryWithoutEquals() { + // "sqlalchemy" keeps its override; only the malformed "botocore" falls back to the global level. + assertEquals(mapOf("sqlalchemy" to Level.INFO), NamespaceLevels.parse("sqlalchemy=INFO botocore")) + + val error = LogCapture.drain().single() + assertEquals(Level.ERROR, error.level) + assertEquals("org.apache.airflow.sdk.execution.NamespaceLevels", error.logger) + assertTrue(error.event.contains("AIRFLOW__LOGGING__NAMESPACE_LEVELS"), error.event) + assertTrue(error.event.contains("botocore"), error.event) + assertTrue(error.event.contains("="), error.event) + } + + @Test + @DisplayName("An entry with an empty logger name is skipped while valid entries are kept") + fun shouldSkipEmptyLoggerName() { + assertEquals(mapOf("a" to Level.DEBUG), NamespaceLevels.parse("=INFO a=DEBUG")) + assertTrue( + LogCapture + .drain() + .single() + .event + .contains("logger name is empty"), + ) + } + + @Test + @DisplayName("An unknown level is skipped and reported with the valid levels") + fun shouldSkipUnknownLevel() { + assertEquals(emptyMap(), NamespaceLevels.parse("sqlalchemy=VERBOSE")) + + val event = LogCapture.drain().single().event + assertTrue(event.contains("VERBOSE"), event) + assertTrue(event.contains("sqlalchemy"), event) + assertTrue(event.contains("INFO"), event) + } + + @Test + @DisplayName("An entry with an empty level is skipped") + fun shouldSkipEmptyLevel() { + assertEquals(emptyMap(), NamespaceLevels.parse("sqlalchemy=")) + assertEquals(1, LogCapture.drain().size) + } + + @Test + @DisplayName("Every invalid entry is reported in a single log, valid ones kept") + fun shouldReportEveryInvalidEntry() { + assertEquals( + mapOf("a" to Level.INFO, "c" to Level.DEBUG), + NamespaceLevels.parse("a=INFO botocore b=NOPE c=DEBUG"), + ) + + val event = LogCapture.drain().single().event + assertTrue(event.contains("botocore"), event) + assertTrue(event.contains("NOPE"), event) + } +} diff --git a/java-sdk/sdk/src/testFixtures/kotlin/org/apache/airflow/sdk/execution/LogCapture.kt b/java-sdk/sdk/src/testFixtures/kotlin/org/apache/airflow/sdk/execution/LogCapture.kt new file mode 100644 index 0000000000000..5c72d36cbbc38 --- /dev/null +++ b/java-sdk/sdk/src/testFixtures/kotlin/org/apache/airflow/sdk/execution/LogCapture.kt @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.execution + +data class CapturedLogMessage( + val level: Level, + val logger: String, + val event: String, + val arguments: Map, +) + +object LogCapture { + fun drain(): List = + buildList { + var msg = LogSender.messages.poll() + while (msg != null) { + add(CapturedLogMessage(msg.level, msg.logger, msg.event, msg.arguments)) + msg = LogSender.messages.poll() + } + } + + fun resetThresholds() { + configureThresholds(Level.NOTSET, emptyMap()) + } + + fun configureThresholds( + global: Level, + named: Map = emptyMap(), + ) { + Log.globalThreshold = global + Log.namedThresholds = named + } +} diff --git a/java-sdk/settings.gradle.kts b/java-sdk/settings.gradle.kts index fd4f4ac001c68..d8792763718da 100644 --- a/java-sdk/settings.gradle.kts +++ b/java-sdk/settings.gradle.kts @@ -22,4 +22,4 @@ plugins { } rootProject.name = "airflow-java-sdk" -include("bom", "plugin", "processor", "sdk") +include("bom", "jpl", "jul", "log4j2", "plugin", "processor", "sdk", "slf4j") diff --git a/java-sdk/slf4j/build.gradle.kts b/java-sdk/slf4j/build.gradle.kts new file mode 100644 index 0000000000000..5ed02fa64f7e7 --- /dev/null +++ b/java-sdk/slf4j/build.gradle.kts @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +plugins { + `java-library` + id("airflow-jvm-conventions") + id("airflow-publish") +} + +val slf4jVersion: String by project + +abstract class GenerateConstants : DefaultTask() { + @get:Input + abstract val requestedSlf4jApiVersion: Property + + @get:OutputDirectory + abstract val outputDir: DirectoryProperty + + @TaskAction + fun generate() = + outputDir.get().file("org/apache/airflow/sdk/slf4j/Constants.kt").asFile.apply { + parentFile.mkdirs() + writeText( + """ + // Generated by Gradle — do not edit. + package org.apache.airflow.sdk.slf4j + + internal const val REQUESTED_SLF4J_API_VERSION = "${requestedSlf4jApiVersion.get()}" + """.trimIndent() + "\n", + ) + } +} + +val generateSlf4jApiVersion by tasks.registering(GenerateConstants::class) { + description = "Generate source for constants pulled from Gradle properties." + requestedSlf4jApiVersion = slf4jVersion + outputDir = layout.buildDirectory.dir("generated/sources/slf4jApiVersion/kotlin/main") +} + +kotlin.sourceSets.main { + kotlin.srcDir(generateSlf4jApiVersion.map { it.outputs.files.singleFile }) +} + +dependencies { + api("org.slf4j:slf4j-api:$slf4jVersion") + implementation(project(":sdk")) + testImplementation(kotlin("test")) + testImplementation(testFixtures(project(":sdk"))) +} + +java { + withSourcesJar() // Required by Maven Central. +} + +tasks.withType { + useJUnitPlatform() +} + +publishing { + publications { + create("mavenJava") { + artifactId = "airflow-sdk-slf4j" + from(components["java"]) + pom { + name = "Apache Airflow Java SDK SLF4J Provider" + description = "SLF4J logging provider for the Apache Airflow Java SDK. " + + "Routes SLF4J log calls from task code through the SDK to Airflow's task log store." + } + } + } +} diff --git a/java-sdk/slf4j/gradle.properties b/java-sdk/slf4j/gradle.properties new file mode 100644 index 0000000000000..15392279b8306 --- /dev/null +++ b/java-sdk/slf4j/gradle.properties @@ -0,0 +1,18 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +slf4jVersion=2.0.17 diff --git a/java-sdk/slf4j/src/main/kotlin/org/apache/airflow/sdk/slf4j/AirflowSlf4jProvider.kt b/java-sdk/slf4j/src/main/kotlin/org/apache/airflow/sdk/slf4j/AirflowSlf4jProvider.kt new file mode 100644 index 0000000000000..f353204a1eef6 --- /dev/null +++ b/java-sdk/slf4j/src/main/kotlin/org/apache/airflow/sdk/slf4j/AirflowSlf4jProvider.kt @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.slf4j + +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.Log +import org.slf4j.ILoggerFactory +import org.slf4j.IMarkerFactory +import org.slf4j.Logger +import org.slf4j.Marker +import org.slf4j.helpers.AbstractLogger +import org.slf4j.helpers.BasicMarkerFactory +import org.slf4j.helpers.MessageFormatter +import org.slf4j.helpers.NOPMDCAdapter +import org.slf4j.spi.MDCAdapter +import org.slf4j.spi.SLF4JServiceProvider +import java.util.concurrent.ConcurrentHashMap +import org.slf4j.event.Level as SLevel + +private fun SLevel.convert(): Level = + when (this) { + SLevel.TRACE -> Level.NOTSET + SLevel.DEBUG -> Level.DEBUG + SLevel.INFO -> Level.INFO + SLevel.WARN -> Level.WARNING + SLevel.ERROR -> Level.ERROR + } + +internal class AirflowSlf4jLogger( + name: String, +) : AbstractLogger() { + init { + this.name = name + } + + override fun getFullyQualifiedCallerName(): String? = null + + override fun handleNormalizedLoggingCall( + level: SLevel, + marker: Marker?, + messagePattern: String?, + arguments: Array?, + throwable: Throwable?, + ) { + val event = MessageFormatter.basicArrayFormat(messagePattern ?: "", arguments) + Log.send(level.convert(), name, event) { + throwable?.run { put("exception", stackTraceToString()) } + marker?.let { put("marker", it.name) } + } + } + + override fun isTraceEnabled(marker: Marker?) = Log.isEnabledForLevel(Level.NOTSET, name) + + override fun isTraceEnabled() = Log.isEnabledForLevel(Level.NOTSET, name) + + override fun isDebugEnabled(marker: Marker?) = Log.isEnabledForLevel(Level.DEBUG, name) + + override fun isDebugEnabled() = Log.isEnabledForLevel(Level.DEBUG, name) + + override fun isInfoEnabled(marker: Marker?) = Log.isEnabledForLevel(Level.INFO, name) + + override fun isInfoEnabled() = Log.isEnabledForLevel(Level.INFO, name) + + override fun isWarnEnabled(marker: Marker?) = Log.isEnabledForLevel(Level.WARNING, name) + + override fun isWarnEnabled() = Log.isEnabledForLevel(Level.WARNING, name) + + override fun isErrorEnabled(marker: Marker?) = Log.isEnabledForLevel(Level.ERROR, name) + + override fun isErrorEnabled() = Log.isEnabledForLevel(Level.ERROR, name) +} + +internal class AirflowLoggerFactory : ILoggerFactory { + private val loggers = ConcurrentHashMap() + + override fun getLogger(name: String): Logger = loggers.computeIfAbsent(name) { AirflowSlf4jLogger(it) } +} + +class AirflowSlf4jProvider : SLF4JServiceProvider { + private lateinit var factory: AirflowLoggerFactory + + override fun getLoggerFactory(): ILoggerFactory = factory + + override fun getMarkerFactory(): IMarkerFactory = BasicMarkerFactory() + + override fun getMDCAdapter(): MDCAdapter = NOPMDCAdapter() + + override fun getRequestedApiVersion() = REQUESTED_SLF4J_API_VERSION + + override fun initialize() { + factory = AirflowLoggerFactory() + } +} diff --git a/java-sdk/slf4j/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider b/java-sdk/slf4j/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider new file mode 100644 index 0000000000000..1cec01dbd96c1 --- /dev/null +++ b/java-sdk/slf4j/src/main/resources/META-INF/services/org.slf4j.spi.SLF4JServiceProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +org.apache.airflow.sdk.slf4j.AirflowSlf4jProvider diff --git a/java-sdk/slf4j/src/test/kotlin/org/apache/airflow/sdk/slf4j/AirflowSlf4jLoggerTest.kt b/java-sdk/slf4j/src/test/kotlin/org/apache/airflow/sdk/slf4j/AirflowSlf4jLoggerTest.kt new file mode 100644 index 0000000000000..b2f366cf86350 --- /dev/null +++ b/java-sdk/slf4j/src/test/kotlin/org/apache/airflow/sdk/slf4j/AirflowSlf4jLoggerTest.kt @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.airflow.sdk.slf4j + +import org.apache.airflow.sdk.execution.Level +import org.apache.airflow.sdk.execution.LogCapture +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Assertions.assertFalse +import org.junit.jupiter.api.Assertions.assertTrue +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.Test +import org.slf4j.MarkerFactory +import org.slf4j.event.Level as SLevel + +class AirflowSlf4jLoggerTest { + private lateinit var logger: AirflowSlf4jLogger + + @BeforeEach + fun setUp() { + logger = AirflowSlf4jLogger("com.example.Task") + LogCapture.resetThresholds() + LogCapture.drain() // discard any messages buffered before this test + } + + @Test + fun `level conversions`() { + val cases = + listOf( + SLevel.TRACE to Level.NOTSET, + SLevel.DEBUG to Level.DEBUG, + SLevel.INFO to Level.INFO, + SLevel.WARN to Level.WARNING, + SLevel.ERROR to Level.ERROR, + ) + cases.forEach { (slf4jLevel, expected) -> + LogCapture.drain() + when (slf4jLevel) { + SLevel.TRACE -> logger.trace("m") + SLevel.DEBUG -> logger.debug("m") + SLevel.INFO -> logger.info("m") + SLevel.WARN -> logger.warn("m") + SLevel.ERROR -> logger.error("m") + } + val messages = LogCapture.drain().filter { it.logger == "com.example.Task" } + assertEquals(1, messages.size, "Expected exactly one message for SLF4J $slf4jLevel") + assertEquals(expected, messages.single().level, "SLF4J $slf4jLevel should map to SDK $expected") + } + } + + @Test + fun `message and logger name are forwarded`() { + logger.info("hello") + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals(Level.INFO, msg.level) + assertEquals("com.example.Task", msg.logger) + assertEquals("hello", msg.event) + } + + @Test + fun `message parameters are rendered into the message`() { + logger.info("{} {}", "alpha", 42 as Any) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("alpha 42", msg.event) + assertFalse(msg.arguments.containsKey("0")) + } + + @Test + fun `throwable is stored under the exception key`() { + val ex = RuntimeException("boom") + logger.error("oops", ex) + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertTrue(msg.arguments["exception"].toString().contains("boom")) + } + + @Test + fun `marker name is stored under the marker key`() { + logger.info(MarkerFactory.getMarker("AUDIT"), "hello") + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertEquals("AUDIT", msg.arguments["marker"]) + } + + @Test + fun `no marker key is added when none is supplied`() { + logger.info("hello") + val msg = LogCapture.drain().single { it.logger == "com.example.Task" } + assertFalse(msg.arguments.containsKey("marker")) + } + + @Test + fun `named DEBUG override enables debug while the global level stays INFO`() { + LogCapture.configureThresholds(Level.INFO, mapOf("com.example.Task" to Level.DEBUG)) + + // AbstractLogger gates debug() on isDebugEnabled(); the per-logger override must let it through. + assertTrue(logger.isDebugEnabled) + logger.debug("hello") + + val messages = LogCapture.drain().filter { it.logger == "com.example.Task" } + assertEquals(1, messages.size, "named DEBUG override should let debug through") + assertEquals(Level.DEBUG, messages.single().level) + + // A logger without an override still follows the global INFO threshold. + assertFalse(AirflowSlf4jLogger("com.other.Task").isDebugEnabled) + } +}