diff --git a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java index 25d6e027c49c..452cf715f54c 100644 --- a/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java +++ b/nifi-extension-bundles/nifi-aws-bundle/nifi-aws-processors/src/main/java/org/apache/nifi/processors/aws/kinesis/firehose/PutKinesisFirehose.java @@ -134,8 +134,13 @@ public void onTrigger(final ProcessContext context, final ProcessSession session for (final FlowFile flowFile : flowFiles) { final String firehoseStreamName = context.getProperty(KINESIS_FIREHOSE_DELIVERY_STREAM_NAME).evaluateAttributeExpressions(flowFile).getValue(); - recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()); - session.read(flowFile, in -> recordHash.get(firehoseStreamName).add(Record.builder().data(SdkBytes.fromInputStream(in)).build())); + // Use a single computeIfAbsent().add() call so the list lookup is atomic. + // The previous two-step pattern (computeIfAbsent then a separate get()) could + // throw NullPointerException when firehoseStreamName evaluates to null via + // expression language, because get(null) returned null after the absent-key + // entry was never actually inserted. (NIFI-14472) + session.read(flowFile, in -> recordHash.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()) + .add(Record.builder().data(SdkBytes.fromInputStream(in)).build())); final List flowFilesForStream = hashFlowFiles.computeIfAbsent(firehoseStreamName, k -> new ArrayList<>()); flowFilesForStream.add(flowFile); diff --git a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java index 7ef21176820f..ba2d2d39b5bb 100644 --- a/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java +++ b/nifi-framework-bundle/nifi-framework-extensions/nifi-framework-kubernetes-bundle/nifi-framework-kubernetes-state-provider/src/test/java/org/apache/nifi/kubernetes/state/provider/KubernetesConfigMapStateProviderTest.java @@ -39,6 +39,7 @@ import org.apache.nifi.parameter.ParameterLookup; import org.apache.nifi.util.MockProcessContext; import org.apache.nifi.util.MockValidationContext; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -107,6 +108,11 @@ void setProvider() { provider = new MockKubernetesConfigMapStateProvider(); } + @AfterEach + void shutdownProvider() { + provider.shutdown(); + } + @Test void testGetSupportedScopes() { final Scope[] supportedScopes = provider.getSupportedScopes();