From 67c2d4cd4fd1ea5de3ad5658e464b2a42483afaa Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Wed, 6 May 2026 15:03:34 +0530 Subject: [PATCH 1/2] NIFI-14472: Fixed NullPointerException in PutKinesisFirehose when stream name evaluates to null MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The onTrigger() method used a two-step pattern to populate the recordHash map: 1. recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()) 2. recordHash.get(firehoseStreamName).add(record) // ← NPE here When the KINESIS_FIREHOSE_DELIVERY_STREAM_NAME expression evaluates to null (e.g., the referenced FlowFile attribute is absent), computeIfAbsent(null, ...) does not insert an entry into the map for null keys in all JVM implementations, causing the subsequent get(null) to return null and .add() to throw: NullPointerException: Cannot invoke List.add() because Map.get() returns null - Collapsed the two-step lookup into a single atomic computeIfAbsent().add() call, eliminating the null-return window between the two statements - The fix applies to the recordHash population step; hashFlowFiles already used computeIfAbsent correctly in a single step on the following line Co-authored-by: Rakesh Kumar Singh --- .../aws/kinesis/firehose/PutKinesisFirehose.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java index 25d6e027c49c..452cf715f54c 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java @@ -134,8 +134,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session for (final FlowFile flowFile : flowFiles) { final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); - recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()); - session.read(flowFile, in -> recordHash.get(firehoseStreamName).add(Record.builder().data(SdkBytes.fromInputStream(in)).build())); + // Use a single computeIfAbsent().add() call so the list lookup is atomic. + // The previous two-step pattern (computeIfAbsent then a separate get()) could + // throw NullPointerException when firehoseStreamName evaluates to null via + // expression language, because get(null) returned null after the absent-key + // entry was never actually inserted. (NIFI-14472) + session.read(flowFile, in -> recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()) + .add(Record.builder().data(SdkBytes.fromInputStream(in)).build())); final List flowFilesForStream = hashFlowFiles.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()); flowFilesForStream.add(flowFile); From 98fa7ea86129f793365994fab0586c0ff13aee53 Mon Sep 17 00:00:00 2001 From: Rakesh Kumar Singh Date: Thu, 7 May 2026 23:43:31 +0530 Subject: [PATCH 2/2] NIFI-14472: Fix KubernetesConfigMapStateProviderTest flaky teardown on macOS The test class was missing an @AfterEach to shut down the provider after each test. Tests that called provider.initialize() but not provider.shutdown() left the Kubernetes client's Netty event loop open. On macOS with Zulu JDK 21, the GC collected the event loop before KubernetesMockServerExtension.afterEach() ran, causing MockWebServer.await() to throw RejectedExecutionException: 'event executor terminated' Fix: add @AfterEach shutdownProvider() to guarantee the Kubernetes client is closed before the mock server extension tears down, ensuring deterministic cleanup across all platforms and JVM implementations. Co-authored-by: Rakesh Kumar Singh --- .../provider/KubernetesConfigMapStateProviderTest.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java index 7ef21176820f..ba2d2d39b5bb 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java @@ -39,6 +39,7 @@ import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockValidationContext; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -107,6 +108,11 @@ void setProvider() { provider = new MockKubernetesConfigMapStateProvider(); } + @AfterEach + void shutdownProvider() { + provider.shutdown(); + } + @Test void testGetSupportedScopes() { final Scope[] supportedScopes = provider.getSupportedScopes();