From b49cc66ad8fde7cfd4c4a2c50c2535b512d21a21 Mon Sep 17 00:00:00 2001
From: Kevin Doran
Date: Wed, 27 May 2026 14:31:35 -0400
Subject: [PATCH 1/3] NIFI-15979 Wire Connector MDC and reporting-task
visibility
- StandardConnectorNode implements GroupedComponent and assembles
framework MDC keys (connectorId/Name/Component/BundleGroup/Artifact/Version)
plus connector-supplied custom attributes
- StandardProcessGroup merges connector attributes into getLoggingAttributes,
cascades to descendants, and inherits from the new parent on setParent.
- ConnectorInitializationContext.setLoggingAttributes is forwarded into the
connector node via a new builder consumer; ExtensionBuilder late-binds
StandardLoggingContext to the connector.
- AbstractEventAccess snapshots loggingAttributes onto ProcessGroupStatus;
StandardEventAccess implements EventAccess.getConnectorStatuses via
ConnectorRepository so reporting tasks see connector-managed flows.
Includes unit tests and Administration Guide updates.
---
.../main/asciidoc/administration-guide.adoc | 15 +-
.../nifi/groups/StandardProcessGroup.java | 50 ++++++
.../nifi/reporting/AbstractEventAccess.java | 5 +
.../nifi/groups/StandardProcessGroupTest.java | 107 +++++++++++++
...ConnectorInitializationContextBuilder.java | 12 ++
...tandardConnectorInitializationContext.java | 18 +++
.../connector/StandardConnectorNode.java | 149 +++++++++++++++++-
.../nifi/controller/ExtensionBuilder.java | 23 ++-
.../nifi/controller/FlowController.java | 2 +-
.../nifi/reporting/StandardEventAccess.java | 46 +++++-
...tandardConnectorInitializationContext.java | 38 +++++
.../connector/TestStandardConnectorNode.java | 82 ++++++++++
.../reporting/AbstractEventAccessTest.java | 21 +++
13 files changed, 557 insertions(+), 11 deletions(-)
diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index d5e4234bc39f..a865f5219d5c 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -211,11 +211,24 @@ Component logs provide the following MDC named values:
- `processGroupName` contains the name of the Process Group
- `processGroupNamePath` contains of the hierarchy of names for Process Groups with separators
+Components that run inside a Connector-managed flow also carry framework-supplied MDC values that identify the owning
+Connector:
+
+- `connectorId` contains the UUID of the Connector
+- `connectorName` contains the user-visible name of the Connector
+- `connectorComponent` contains the fully qualified class name of the Connector implementation
+- `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was
+ loaded from
+
+A Connector may also supply additional, implementation-specific MDC values via
+`ConnectorInitializationContext.setLoggingAttributes(Map)`. Keys reserved by the framework (those listed above) cannot
+be overridden; attempts to do so are dropped and logged as a `WARN`.
+
MDC named values can be added to a Logback pattern layout using the `mdc` conversion word.
[source, xml]
----
-%date %level [%thread] %mdc{processGroupId} %logger{40} %msg%n
+%date %level [%thread] %mdc{connectorId} %mdc{processGroupId} %logger{40} %msg%n
----
Logs from classes other than extension components do not have MDC named values. Logs formatted using the pattern layout
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3abd69197bf5..568cfa884772 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -229,6 +229,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String UNREGISTERED_PATH_SEGMENT = "UNREGISTERED";
private final Map loggingAttributes = new ConcurrentHashMap<>();
+ private volatile Map connectorLoggingAttributes = Map.of();
private volatile String logFileSuffix;
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
@@ -303,6 +304,14 @@ public ProcessGroup getParent() {
@Override
public void setParent(final ProcessGroup newParent) {
parent.set(newParent);
+ // Inherit connector-supplied MDC attributes from the new parent. Descendants of a connector's
+ // managed flow root must carry the same connectorId/connectorName/etc. so that logs and OTel
+ // metrics emitted by processors inside the connector flow can be attributed to the connector.
+ // This runs each time the PG is re-parented (including initial attach), so new PGs added to
+ // an existing connector flow inherit automatically.
+ if (newParent instanceof StandardProcessGroup standardParent) {
+ this.connectorLoggingAttributes = standardParent.connectorLoggingAttributes;
+ }
setLoggingAttributes();
}
@@ -4705,6 +4714,47 @@ private void setLoggingAttributes() {
final String registeredFlowVersion = currentVersionControl.getVersion();
loggingAttributes.put(LoggingAttribute.REGISTERED_FLOW_VERSION.attribute, registeredFlowVersion);
}
+
+ // Merge connector-supplied attributes last so they are exposed to MDC/OTel snapshots taken
+ // from this PG. PG-level keys are added first to avoid being shadowed in case a connector
+ // ever tried to override them; the reserved-key filter on the connector side also enforces
+ // this separation defensively.
+ loggingAttributes.putAll(connectorLoggingAttributes);
+ }
+
+ /**
+ * Stores the connector-managed MDC attributes for this process group and cascades the same
+ * attributes to all descendant process groups so that components anywhere in the connector's
+ * managed flow log with consistent connectorId/connectorName/etc. context.
+ *
+ * This method is called by {@code StandardConnectorNode} against its managed root process
+ * group whenever the connector's framework keys (e.g. {@code connectorName}) change or when the
+ * connector provides updated custom logging attributes. Newly created descendant groups will
+ * also inherit the attributes lazily via {@link #setParent(ProcessGroup)}.
+ *
+ * @param attributes the merged set of connector logging attributes; an empty or {@code null}
+ * map clears any previously assigned attributes
+ */
+ public void setConnectorLoggingAttributes(final Map attributes) {
+ final Map snapshot = (attributes == null || attributes.isEmpty())
+ ? Map.of()
+ : Map.copyOf(attributes);
+ this.connectorLoggingAttributes = snapshot;
+ setLoggingAttributes();
+
+ for (final ProcessGroup child : getProcessGroups()) {
+ if (child instanceof StandardProcessGroup standardChild) {
+ standardChild.setConnectorLoggingAttributes(snapshot);
+ }
+ }
+ }
+
+ /**
+ * Returns the connector-supplied MDC attributes assigned to this process group, if any. Returns
+ * an empty map when this PG is not inside a connector-managed flow.
+ */
+ public Map getConnectorLoggingAttributes() {
+ return connectorLoggingAttributes;
}
private void setGroupPath() {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index 4c1a10686375..de962cefa77e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -537,6 +537,11 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat
status.setBytesTransferred(bytesTransferred);
status.setProcessingNanos(processingNanos);
status.setProcessingPerformanceStatus(performanceStatus);
+ // Snapshot the PG's MDC logging attributes onto the status DTO so reporting tasks (e.g. the
+ // OpenTelemetry reporting task) can attribute metrics emitted for this PG to its owning
+ // connector. For non-connector PGs this is just the existing processGroupId/Name/path keys;
+ // for PGs inside a connector-managed flow it also carries connectorId/connectorName/etc.
+ status.setLoggingAttributes(group.getLoggingAttributes());
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java
index 35051a30f9fc..3acbfc58dfe6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java
@@ -44,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@@ -238,4 +239,110 @@ void testGetLoggingAttributesWithVersionControlInformation() {
assertEquals(expected, loggingAttributes);
}
+
+ @Test
+ void testSetConnectorLoggingAttributesMergesIntoLoggingAttributes() {
+ processGroup.setName(NAME);
+
+ final Map connectorAttributes = Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "My Connector",
+ "connectorComponent", "com.example.MyConnector"
+ );
+
+ processGroup.setConnectorLoggingAttributes(connectorAttributes);
+
+ final Map loggingAttributes = processGroup.getLoggingAttributes();
+ assertEquals("connector-1", loggingAttributes.get("connectorId"));
+ assertEquals("My Connector", loggingAttributes.get("connectorName"));
+ assertEquals("com.example.MyConnector", loggingAttributes.get("connectorComponent"));
+ assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
+ assertEquals(ID, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_ID.getAttribute()));
+ }
+
+ @Test
+ void testSetConnectorLoggingAttributesCascadesToChildProcessGroups() {
+ processGroup.setName(NAME);
+
+ final StandardProcessGroup child = createStandardProcessGroup("child-id");
+ child.setName("Child");
+ processGroup.addProcessGroup(child);
+
+ final Map connectorAttributes = Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "Postgres CDC"
+ );
+
+ processGroup.setConnectorLoggingAttributes(connectorAttributes);
+
+ assertEquals("connector-1", child.getLoggingAttributes().get("connectorId"));
+ assertEquals("Postgres CDC", child.getLoggingAttributes().get("connectorName"));
+ }
+
+ @Test
+ void testAddProcessGroupInheritsConnectorLoggingAttributesFromParent() {
+ processGroup.setName(NAME);
+ processGroup.setConnectorLoggingAttributes(Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "Postgres CDC"
+ ));
+
+ final StandardProcessGroup lateChild = createStandardProcessGroup("late-child-id");
+ lateChild.setName("Late Child");
+ processGroup.addProcessGroup(lateChild);
+
+ assertEquals("connector-1", lateChild.getLoggingAttributes().get("connectorId"));
+ assertEquals("Postgres CDC", lateChild.getLoggingAttributes().get("connectorName"));
+ assertEquals(Map.copyOf(processGroup.getConnectorLoggingAttributes()), lateChild.getConnectorLoggingAttributes());
+ }
+
+ @Test
+ void testEmptyConnectorLoggingAttributesAddsNothing() {
+ processGroup.setName(NAME);
+ processGroup.setConnectorLoggingAttributes(Map.of());
+
+ final Map loggingAttributes = processGroup.getLoggingAttributes();
+ assertFalse(loggingAttributes.containsKey("connectorId"));
+ assertTrue(processGroup.getConnectorLoggingAttributes().isEmpty());
+ // Verify the PG-level keys are still present.
+ assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
+ }
+
+ @Test
+ void testSetConnectorLoggingAttributesReplacesPreviousValues() {
+ processGroup.setName(NAME);
+ processGroup.setConnectorLoggingAttributes(Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "Old Name",
+ "customKey", "customValue"
+ ));
+
+ processGroup.setConnectorLoggingAttributes(Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "New Name"
+ ));
+
+ final Map loggingAttributes = processGroup.getLoggingAttributes();
+ assertEquals("New Name", loggingAttributes.get("connectorName"));
+ assertFalse(loggingAttributes.containsKey("customKey"));
+ }
+
+ private StandardProcessGroup createStandardProcessGroup(final String id) {
+ return new StandardProcessGroup(
+ id,
+ controllerServiceProvider,
+ processScheduler,
+ propertyEncryptor,
+ extensionManager,
+ stateManagerProvider,
+ flowManager,
+ reloadComponent,
+ nodeTypeProvider,
+ properties,
+ statelessGroupNodeFactory,
+ assetManager,
+ null
+ );
+ }
+
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java
index 9ef92941a8f6..76857093426f 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java
@@ -21,6 +21,9 @@
import org.apache.nifi.components.connector.secrets.SecretsManager;
import org.apache.nifi.logging.ComponentLog;
+import java.util.Map;
+import java.util.function.Consumer;
+
public interface FrameworkConnectorInitializationContextBuilder {
FrameworkConnectorInitializationContextBuilder identifier(String identifier);
@@ -35,5 +38,14 @@ public interface FrameworkConnectorInitializationContextBuilder {
FrameworkConnectorInitializationContextBuilder componentBundleLookup(ComponentBundleLookup bundleLookup);
+ /**
+ * Registers a callback invoked when the connector calls
+ * {@link ConnectorInitializationContext#setLoggingAttributes(Map)}. The framework uses this to
+ * forward custom MDC logging attributes into the owning {@code StandardConnectorNode} so they
+ * are merged with the framework-managed connector keys and propagated to the connector's
+ * managed flow.
+ */
+ FrameworkConnectorInitializationContextBuilder loggingAttributesConsumer(Consumer
+ *
+ * @param connectorId the identifier of the connector whose logging attributes are being requested
+ * @return an immutable map of attribute keys to values; never {@code null}
+ */
+ Map getLoggingAttributesForConnector(String connectorId);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java
index 76857093426f..9ef92941a8f6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java
@@ -21,9 +21,6 @@
import org.apache.nifi.components.connector.secrets.SecretsManager;
import org.apache.nifi.logging.ComponentLog;
-import java.util.Map;
-import java.util.function.Consumer;
-
public interface FrameworkConnectorInitializationContextBuilder {
FrameworkConnectorInitializationContextBuilder identifier(String identifier);
@@ -38,14 +35,5 @@ public interface FrameworkConnectorInitializationContextBuilder {
FrameworkConnectorInitializationContextBuilder componentBundleLookup(ComponentBundleLookup bundleLookup);
- /**
- * Registers a callback invoked when the connector calls
- * {@link ConnectorInitializationContext#setLoggingAttributes(Map)}. The framework uses this to
- * forward custom MDC logging attributes into the owning {@code StandardConnectorNode} so they
- * are merged with the framework-managed connector keys and propagated to the connector's
- * managed flow.
- */
- FrameworkConnectorInitializationContextBuilder loggingAttributesConsumer(Consumer> loggingAttributesConsumer);
-
FrameworkConnectorInitializationContext build();
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
index 2081988260b7..002f2a8aa2f8 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java
@@ -28,7 +28,6 @@
import org.apache.nifi.logging.ComponentLog;
import java.util.List;
-import java.util.Map;
import java.util.function.Consumer;
public class StandardConnectorInitializationContext implements FrameworkConnectorInitializationContext {
@@ -38,7 +37,6 @@ public class StandardConnectorInitializationContext implements FrameworkConnecto
private final SecretsManager secretsManager;
private final AssetManager assetManager;
private final ComponentBundleLookup componentBundleLookup;
- private final Consumer> loggingAttributesConsumer;
protected StandardConnectorInitializationContext(final Builder builder) {
@@ -48,7 +46,6 @@ protected StandardConnectorInitializationContext(final Builder builder) {
this.secretsManager = builder.secretsManager;
this.assetManager = builder.assetManager;
this.componentBundleLookup = builder.componentBundleLookup;
- this.loggingAttributesConsumer = builder.loggingAttributesConsumer;
}
@Override
@@ -81,14 +78,6 @@ public AssetManager getAssetManager() {
return assetManager;
}
- @Override
- public void setLoggingAttributes(final Map attributes) {
- if (loggingAttributesConsumer == null) {
- throw new UnsupportedOperationException("setLoggingAttributes is not supported by this initialization context");
- }
- loggingAttributesConsumer.accept(attributes);
- }
-
@Override
public void updateFlow(final FlowContext flowContext, final VersionedExternalFlow versionedExternalFlow,
final BundleCompatibility bundleCompatability) throws FlowUpdateException {
@@ -171,7 +160,6 @@ public static class Builder implements FrameworkConnectorInitializationContextBu
private SecretsManager secretsManager;
private AssetManager assetManager;
private ComponentBundleLookup componentBundleLookup;
- private Consumer> loggingAttributesConsumer;
@Override
public Builder identifier(final String identifier) {
@@ -209,12 +197,6 @@ public Builder componentBundleLookup(final ComponentBundleLookup bundleLookup) {
return this;
}
- @Override
- public Builder loggingAttributesConsumer(final Consumer> loggingAttributesConsumer) {
- this.loggingAttributesConsumer = loggingAttributesConsumer;
- return this;
- }
-
@Override
public StandardConnectorInitializationContext build() {
return new StandardConnectorInitializationContext(this);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index bdb527fecc81..4910ed5c528e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -722,6 +722,21 @@ public SecretsManager getSecretsManager() {
return secretsManager;
}
+ @Override
+ public Map getLoggingAttributesForConnector(final String connectorId) {
+ if (configurationProvider == null) {
+ return Map.of();
+ }
+ try {
+ final Map attributes = configurationProvider.getLoggingAttributes(connectorId);
+ return attributes == null ? Map.of() : attributes;
+ } catch (final Exception e) {
+ logger.warn("ConnectorConfigurationProvider [{}] threw while computing logging attributes for connector [{}]; using empty map: {}",
+ configurationProvider.getClass().getSimpleName(), connectorId, e.getMessage(), e);
+ return Map.of();
+ }
+ }
+
@Override
public ConnectorStateTransition createStateTransition(final String type, final String id) {
final String componentDescription = "StandardConnectorNode[id=" + id + ", type=" + type + "]";
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 9091a433189e..f73f5249adf1 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -540,10 +540,15 @@ public Resource getResource() {
false
);
// Late-bind the logging context to the connector node so that MDC attributes assembled by
- // the node (framework keys + connector-supplied custom keys) flow through the connector's
+ // the node (framework keys + provider-supplied custom keys) flow through the connector's
// own ComponentLog as well as the managed flow's nested components.
loggingContext.setComponent(connectorNode);
+ // Pull provider-sourced logging attributes (e.g. connectorDefinitionId) from the ConnectorRepository
+ // and merge them into the node before any logs are emitted from the connector or its managed flow.
+ final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier);
+ connectorNode.setCustomLoggingAttributes(providerAttributes);
+
try {
initializeDefaultValues(connector, connectorNode.getActiveFlowContext());
@@ -593,6 +598,11 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth
);
loggingContext.setComponent(connectorNode);
+ // Pull provider-sourced logging attributes for the ghost connector as well so that MDC and
+ // status snapshots are consistent regardless of whether the real Connector implementation loaded.
+ final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier);
+ connectorNode.setCustomLoggingAttributes(providerAttributes);
+
// Initialize the ghost connector so that it can be properly configured during flow synchronization
final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode);
connectorNode.initializeConnector(initContext);
@@ -631,7 +641,6 @@ private FrameworkConnectorInitializationContext createConnectorInitializationCon
.secretsManager(flowController.getConnectorRepository().getSecretsManager())
.assetManager(flowController.getConnectorAssetManager())
.componentBundleLookup(componentBundleLookup)
- .loggingAttributesConsumer(connectorNode::setCustomLoggingAttributes)
.build();
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index c8355ae97427..ebc9b7304e06 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -34,6 +34,7 @@
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
+import org.apache.nifi.controller.status.ConnectorStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
@@ -77,13 +78,13 @@ public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRep
}
/**
- * Returns a {@link ProcessGroupStatus} for each connector's managed root process group so that
- * reporting tasks can observe metrics for flows running inside connectors. Connector-managed
- * flows live outside the controller's root process group, so they are invisible to
- * {@link #getControllerStatus()} and would otherwise be skipped by standard reporting tasks.
+ * Returns a {@link ConnectorStatus} for each registered connector, each carrying the connector identity along with
+ * the {@link ProcessGroupStatus} of the connector's managed root process group. Connector-managed flows live
+ * outside the controller's root process group, so they are invisible to {@link #getControllerStatus()} and would
+ * otherwise be skipped by standard reporting tasks.
*/
@Override
- public Collection getConnectorStatuses() {
+ public Collection getConnectorStatuses() {
if (connectorRepository == null) {
return Collections.emptyList();
}
@@ -94,7 +95,7 @@ public Collection getConnectorStatuses() {
}
final RepositoryStatusReport statusReport = generateRepositoryStatusReport();
- final List statuses = new ArrayList<>(connectors.size());
+ final List statuses = new ArrayList<>(connectors.size());
for (final ConnectorNode connector : connectors) {
final FrameworkFlowContext context = connector.getActiveFlowContext();
if (context == null) {
@@ -104,11 +105,16 @@ public Collection getConnectorStatuses() {
if (managedGroup == null) {
continue;
}
- final ProcessGroupStatus status = getGroupStatus(managedGroup, statusReport,
+ final ProcessGroupStatus rootGroupStatus = getGroupStatus(managedGroup, statusReport,
authorizable -> true, Integer.MAX_VALUE, 1, true);
- if (status != null) {
- statuses.add(status);
+ if (rootGroupStatus == null) {
+ continue;
}
+ final ConnectorStatus connectorStatus = new ConnectorStatus();
+ connectorStatus.setId(connector.getIdentifier());
+ connectorStatus.setName(connector.getName());
+ connectorStatus.setRootGroupStatus(rootGroupStatus);
+ statuses.add(connectorStatus);
}
return statuses;
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
index c10fa522b9ac..565ddb05f9c7 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java
@@ -39,12 +39,8 @@
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.atomic.AtomicReference;
-
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -388,38 +384,4 @@ private StandardConnectorInitializationContext createContext(final ComponentBund
.componentBundleLookup(bundleLookup)
.build();
}
-
- @Test
- public void testSetLoggingAttributesForwardsToConsumer() {
- final AtomicReference> received = new AtomicReference<>();
- final StandardConnectorInitializationContext context = new StandardConnectorInitializationContext.Builder()
- .identifier("test-connector")
- .name("Test Connector")
- .componentLog(mock(ComponentLog.class))
- .secretsManager(mock(SecretsManager.class))
- .assetManager(mock(AssetManager.class))
- .componentBundleLookup(mock(ComponentBundleLookup.class))
- .loggingAttributesConsumer(received::set)
- .build();
-
- final Map attributes = Map.of("region", "us-east-1", "tenant", "acme");
- context.setLoggingAttributes(attributes);
-
- assertNotNull(received.get());
- assertEquals(attributes, received.get());
- }
-
- @Test
- public void testSetLoggingAttributesThrowsWhenNoConsumerRegistered() {
- final StandardConnectorInitializationContext context = new StandardConnectorInitializationContext.Builder()
- .identifier("test-connector")
- .name("Test Connector")
- .componentLog(mock(ComponentLog.class))
- .secretsManager(mock(SecretsManager.class))
- .assetManager(mock(AssetManager.class))
- .componentBundleLookup(mock(ComponentBundleLookup.class))
- .build();
-
- assertThrows(UnsupportedOperationException.class, () -> context.setLoggingAttributes(Map.of("k", "v")));
- }
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
index cba4b12f5741..a8e661a78dfb 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
@@ -1364,6 +1364,58 @@ private static ParameterProviderNode mockParameterProvider(final String name, fi
return node;
}
+ // --- getLoggingAttributesForConnector tests ---
+
+ @Test
+ public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenNoProvider() {
+ final StandardConnectorRepository repository = new StandardConnectorRepository();
+ final ConnectorRepositoryInitializationContext initContext = mock(ConnectorRepositoryInitializationContext.class);
+ when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class));
+ when(initContext.getConnectorConfigurationProvider()).thenReturn(null);
+ repository.initialize(initContext);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ public void testGetLoggingAttributesForConnectorDelegatesToProvider() {
+ final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class);
+ final Map expected = Map.of("connectorDefinitionId", "snowflake.runtime.postgres-cdc");
+ when(provider.getLoggingAttributes("connector-1")).thenReturn(expected);
+
+ final StandardConnectorRepository repository = createRepositoryWithProvider(provider);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertEquals(expected, attributes);
+ verify(provider).getLoggingAttributes("connector-1");
+ }
+
+ @Test
+ public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderReturnsNull() {
+ final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class);
+ when(provider.getLoggingAttributes(anyString())).thenReturn(null);
+
+ final StandardConnectorRepository repository = createRepositoryWithProvider(provider);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderThrows() {
+ final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class);
+ when(provider.getLoggingAttributes(anyString())).thenThrow(new RuntimeException("provider boom"));
+
+ final StandardConnectorRepository repository = createRepositoryWithProvider(provider);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
// --- Helper Methods ---
private StandardConnectorRepository createRepositoryWithProviderAndAssetManager(
From 4c2ff6967244e9fbe78b20c03f07cacabb11b883 Mon Sep 17 00:00:00 2001
From: Kevin Doran
Date: Tue, 2 Jun 2026 12:15:19 -0400
Subject: [PATCH 3/3] NIFI-15979 Align Connector status capture with merged
nifi-api
- Drop the ProcessGroupStatus.loggingAttributes snapshot from AbstractEventAccess
(the field was removed from nifi-api); update the affected test and doc comment.
- Add a StatusHistoryRepository.capture overload carrying Collection,
defaulting to the existing 4-arg capture so current implementations are unaffected.
- Invoke the new overload from FlowController using EventAccess.getConnectorStatuses(),
so Connector-managed flows reach the status history repository.
- Use getConnectors(ConnectorSyncMode.LOCAL_ONLY) in StandardEventAccess.
Co-authored-by: Cursor
---
.../history/StatusHistoryRepository.java | 21 +++++++++++++++++++
.../nifi/reporting/AbstractEventAccess.java | 5 -----
.../connector/ConnectorRepository.java | 3 +--
.../nifi/controller/FlowController.java | 3 ++-
.../nifi/reporting/StandardEventAccess.java | 3 ++-
.../reporting/AbstractEventAccessTest.java | 21 -------------------
6 files changed, 26 insertions(+), 30 deletions(-)
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java
index 2b96feb6192e..b02258552811 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java
@@ -16,9 +16,11 @@
*/
package org.apache.nifi.controller.status.history;
+import org.apache.nifi.controller.status.ConnectorStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -48,6 +50,25 @@ public interface StatusHistoryRepository {
*/
void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List garbageCollectionStatus, Date timestamp);
+ /**
+ * Captures the status information provided in the given report, additionally providing the status of each
+ * Connector-managed flow. Connector-managed Process Groups are siblings of the root group and are not reachable
+ * from {@code rootGroupStatus}, so this overload allows implementations to observe them (for example, to export
+ * Connector-managed flow metrics). The default implementation ignores {@code connectorStatuses} and delegates to
+ * {@link #capture(NodeStatus, ProcessGroupStatus, List, Date)} so existing implementations continue to work
+ * unchanged.
+ *
+ * @param nodeStatus status of the node
+ * @param rootGroupStatus status of the root group and its content
+ * @param connectorStatuses status of each registered Connector, each carrying its managed root group status
+ * @param garbageCollectionStatus status of garbage collection
+ * @param timestamp timestamp of capture
+ */
+ default void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, Collection connectorStatuses,
+ List garbageCollectionStatus, Date timestamp) {
+ capture(nodeStatus, rootGroupStatus, garbageCollectionStatus, timestamp);
+ }
+
/**
* @param connectionId the ID of the Connection for which the Status is
* desired
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
index de962cefa77e..4c1a10686375 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java
@@ -537,11 +537,6 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat
status.setBytesTransferred(bytesTransferred);
status.setProcessingNanos(processingNanos);
status.setProcessingPerformanceStatus(performanceStatus);
- // Snapshot the PG's MDC logging attributes onto the status DTO so reporting tasks (e.g. the
- // OpenTelemetry reporting task) can attribute metrics emitted for this PG to its owning
- // connector. For non-connector PGs this is just the existing processGroupId/Name/path keys;
- // for PGs inside a connector-managed flow it also carries connectorId/connectorName/etc.
- status.setLoggingAttributes(group.getLoggingAttributes());
final VersionControlInformation vci = group.getVersionControlInformation();
if (vci != null) {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
index 3a163a2da4d2..c25962e4e6c7 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
@@ -217,8 +217,7 @@ void inheritConfiguration(ConnectorNode connector, ListIf no {@link ConnectorConfigurationProvider} is configured for this repository, this method
* returns {@link Map#of()}. Otherwise, it delegates to
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 55fa7c77593f..6a2c827c8229 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -814,7 +814,8 @@ private FlowController(
timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> {
try {
- statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date());
+ statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), eventAccess.getConnectorStatuses(),
+ getGarbageCollectionStatus(), new Date());
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index ebc9b7304e06..3e12745328ec 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -24,6 +24,7 @@
import org.apache.nifi.authorization.user.NiFiUser;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
@@ -89,7 +90,7 @@ public Collection getConnectorStatuses() {
return Collections.emptyList();
}
- final List connectors = connectorRepository.getConnectors();
+ final List connectors = connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY);
if (connectors == null || connectors.isEmpty()) {
return Collections.emptyList();
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java
index 3f10a068e752..7661d1996c78 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java
@@ -109,27 +109,6 @@ void testGetGroupStatusAuthorized() {
assertEquals(1, authorizables.size());
}
- @Test
- void testGetGroupStatusSnapshotsLoggingAttributes() {
- final Map loggingAttributes = Map.of(
- "processGroupId", PROCESS_GROUP_ID,
- "processGroupName", PROCESS_GROUP_NAME,
- "connectorId", "connector-1",
- "connectorName", "Postgres CDC"
- );
-
- when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME);
- when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID);
- when(processGroup.getLoggingAttributes()).thenReturn(loggingAttributes);
-
- final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus(
- processGroup, new StandardRepositoryStatusReport(), authorizable -> true,
- SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS);
-
- assertNotNull(groupStatus);
- assertEquals(loggingAttributes, groupStatus.getLoggingAttributes());
- }
-
@Test
void testGetGroupStatusProcessorAuthorizedNotIncluded() {
final List authorizables = new ArrayList<>();