From b49cc66ad8fde7cfd4c4a2c50c2535b512d21a21 Mon Sep 17 00:00:00 2001 From: Kevin Doran Date: Wed, 27 May 2026 14:31:35 -0400 Subject: [PATCH 1/3] NIFI-15979 Wire Connector MDC and reporting-task visibility - StandardConnectorNode implements GroupedComponent and assembles framework MDC keys (connectorId/Name/Component/BundleGroup/Artifact/Version) plus connector-supplied custom attributes - StandardProcessGroup merges connector attributes into getLoggingAttributes, cascades to descendants, and inherits from the new parent on setParent. - ConnectorInitializationContext.setLoggingAttributes is forwarded into the connector node via a new builder consumer; ExtensionBuilder late-binds StandardLoggingContext to the connector. - AbstractEventAccess snapshots loggingAttributes onto ProcessGroupStatus; StandardEventAccess implements EventAccess.getConnectorStatuses via ConnectorRepository so reporting tasks see connector-managed flows. Includes unit tests and Administration Guide updates. --- .../main/asciidoc/administration-guide.adoc | 15 +- .../nifi/groups/StandardProcessGroup.java | 50 ++++++ .../nifi/reporting/AbstractEventAccess.java | 5 + .../nifi/groups/StandardProcessGroupTest.java | 107 +++++++++++++ ...ConnectorInitializationContextBuilder.java | 12 ++ ...tandardConnectorInitializationContext.java | 18 +++ .../connector/StandardConnectorNode.java | 149 +++++++++++++++++- .../nifi/controller/ExtensionBuilder.java | 23 ++- .../nifi/controller/FlowController.java | 2 +- .../nifi/reporting/StandardEventAccess.java | 46 +++++- ...tandardConnectorInitializationContext.java | 38 +++++ .../connector/TestStandardConnectorNode.java | 82 ++++++++++ .../reporting/AbstractEventAccessTest.java | 21 +++ 13 files changed, 557 insertions(+), 11 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index d5e4234bc39f..a865f5219d5c 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -211,11 +211,24 @@ Component logs provide the following MDC named values: - `processGroupName` contains the name of the Process Group - `processGroupNamePath` contains of the hierarchy of names for Process Groups with separators +Components that run inside a Connector-managed flow also carry framework-supplied MDC values that identify the owning +Connector: + +- `connectorId` contains the UUID of the Connector +- `connectorName` contains the user-visible name of the Connector +- `connectorComponent` contains the fully qualified class name of the Connector implementation +- `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was + loaded from + +A Connector may also supply additional, implementation-specific MDC values via +`ConnectorInitializationContext.setLoggingAttributes(Map)`. Keys reserved by the framework (those listed above) cannot +be overridden; attempts to do so are dropped and logged as a `WARN`. + MDC named values can be added to a Logback pattern layout using the `mdc` conversion word. [source, xml] ---- -%date %level [%thread] %mdc{processGroupId} %logger{40} %msg%n +%date %level [%thread] %mdc{connectorId} %mdc{processGroupId} %logger{40} %msg%n ---- Logs from classes other than extension components do not have MDC named values. Logs formatted using the pattern layout diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 3abd69197bf5..568cfa884772 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -229,6 +229,7 @@ public final class StandardProcessGroup implements ProcessGroup { private static final String UNREGISTERED_PATH_SEGMENT = "UNREGISTERED"; private final Map loggingAttributes = new ConcurrentHashMap<>(); + private volatile Map connectorLoggingAttributes = Map.of(); private volatile String logFileSuffix; public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, @@ -303,6 +304,14 @@ public ProcessGroup getParent() { @Override public void setParent(final ProcessGroup newParent) { parent.set(newParent); + // Inherit connector-supplied MDC attributes from the new parent. Descendants of a connector's + // managed flow root must carry the same connectorId/connectorName/etc. so that logs and OTel + // metrics emitted by processors inside the connector flow can be attributed to the connector. + // This runs each time the PG is re-parented (including initial attach), so new PGs added to + // an existing connector flow inherit automatically. + if (newParent instanceof StandardProcessGroup standardParent) { + this.connectorLoggingAttributes = standardParent.connectorLoggingAttributes; + } setLoggingAttributes(); } @@ -4705,6 +4714,47 @@ private void setLoggingAttributes() { final String registeredFlowVersion = currentVersionControl.getVersion(); loggingAttributes.put(LoggingAttribute.REGISTERED_FLOW_VERSION.attribute, registeredFlowVersion); } + + // Merge connector-supplied attributes last so they are exposed to MDC/OTel snapshots taken + // from this PG. PG-level keys are added first to avoid being shadowed in case a connector + // ever tried to override them; the reserved-key filter on the connector side also enforces + // this separation defensively. + loggingAttributes.putAll(connectorLoggingAttributes); + } + + /** + * Stores the connector-managed MDC attributes for this process group and cascades the same + * attributes to all descendant process groups so that components anywhere in the connector's + * managed flow log with consistent connectorId/connectorName/etc. context. + * + *

This method is called by {@code StandardConnectorNode} against its managed root process + * group whenever the connector's framework keys (e.g. {@code connectorName}) change or when the + * connector provides updated custom logging attributes. Newly created descendant groups will + * also inherit the attributes lazily via {@link #setParent(ProcessGroup)}.

+ * + * @param attributes the merged set of connector logging attributes; an empty or {@code null} + * map clears any previously assigned attributes + */ + public void setConnectorLoggingAttributes(final Map attributes) { + final Map snapshot = (attributes == null || attributes.isEmpty()) + ? Map.of() + : Map.copyOf(attributes); + this.connectorLoggingAttributes = snapshot; + setLoggingAttributes(); + + for (final ProcessGroup child : getProcessGroups()) { + if (child instanceof StandardProcessGroup standardChild) { + standardChild.setConnectorLoggingAttributes(snapshot); + } + } + } + + /** + * Returns the connector-supplied MDC attributes assigned to this process group, if any. Returns + * an empty map when this PG is not inside a connector-managed flow. + */ + public Map getConnectorLoggingAttributes() { + return connectorLoggingAttributes; } private void setGroupPath() { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java index 4c1a10686375..de962cefa77e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java @@ -537,6 +537,11 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat status.setBytesTransferred(bytesTransferred); status.setProcessingNanos(processingNanos); status.setProcessingPerformanceStatus(performanceStatus); + // Snapshot the PG's MDC logging attributes onto the status DTO so reporting tasks (e.g. the + // OpenTelemetry reporting task) can attribute metrics emitted for this PG to its owning + // connector. For non-connector PGs this is just the existing processGroupId/Name/path keys; + // for PGs inside a connector-managed flow it also carries connectorId/connectorName/etc. + status.setLoggingAttributes(group.getLoggingAttributes()); final VersionControlInformation vci = group.getVersionControlInformation(); if (vci != null) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java index 35051a30f9fc..3acbfc58dfe6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java @@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @@ -238,4 +239,110 @@ void testGetLoggingAttributesWithVersionControlInformation() { assertEquals(expected, loggingAttributes); } + + @Test + void testSetConnectorLoggingAttributesMergesIntoLoggingAttributes() { + processGroup.setName(NAME); + + final Map connectorAttributes = Map.of( + "connectorId", "connector-1", + "connectorName", "My Connector", + "connectorComponent", "com.example.MyConnector" + ); + + processGroup.setConnectorLoggingAttributes(connectorAttributes); + + final Map loggingAttributes = processGroup.getLoggingAttributes(); + assertEquals("connector-1", loggingAttributes.get("connectorId")); + assertEquals("My Connector", loggingAttributes.get("connectorName")); + assertEquals("com.example.MyConnector", loggingAttributes.get("connectorComponent")); + assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute())); + assertEquals(ID, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_ID.getAttribute())); + } + + @Test + void testSetConnectorLoggingAttributesCascadesToChildProcessGroups() { + processGroup.setName(NAME); + + final StandardProcessGroup child = createStandardProcessGroup("child-id"); + child.setName("Child"); + processGroup.addProcessGroup(child); + + final Map connectorAttributes = Map.of( + "connectorId", "connector-1", + "connectorName", "Postgres CDC" + ); + + processGroup.setConnectorLoggingAttributes(connectorAttributes); + + assertEquals("connector-1", child.getLoggingAttributes().get("connectorId")); + assertEquals("Postgres CDC", child.getLoggingAttributes().get("connectorName")); + } + + @Test + void testAddProcessGroupInheritsConnectorLoggingAttributesFromParent() { + processGroup.setName(NAME); + processGroup.setConnectorLoggingAttributes(Map.of( + "connectorId", "connector-1", + "connectorName", "Postgres CDC" + )); + + final StandardProcessGroup lateChild = createStandardProcessGroup("late-child-id"); + lateChild.setName("Late Child"); + processGroup.addProcessGroup(lateChild); + + assertEquals("connector-1", lateChild.getLoggingAttributes().get("connectorId")); + assertEquals("Postgres CDC", lateChild.getLoggingAttributes().get("connectorName")); + assertEquals(Map.copyOf(processGroup.getConnectorLoggingAttributes()), lateChild.getConnectorLoggingAttributes()); + } + + @Test + void testEmptyConnectorLoggingAttributesAddsNothing() { + processGroup.setName(NAME); + processGroup.setConnectorLoggingAttributes(Map.of()); + + final Map loggingAttributes = processGroup.getLoggingAttributes(); + assertFalse(loggingAttributes.containsKey("connectorId")); + assertTrue(processGroup.getConnectorLoggingAttributes().isEmpty()); + // Verify the PG-level keys are still present. + assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute())); + } + + @Test + void testSetConnectorLoggingAttributesReplacesPreviousValues() { + processGroup.setName(NAME); + processGroup.setConnectorLoggingAttributes(Map.of( + "connectorId", "connector-1", + "connectorName", "Old Name", + "customKey", "customValue" + )); + + processGroup.setConnectorLoggingAttributes(Map.of( + "connectorId", "connector-1", + "connectorName", "New Name" + )); + + final Map loggingAttributes = processGroup.getLoggingAttributes(); + assertEquals("New Name", loggingAttributes.get("connectorName")); + assertFalse(loggingAttributes.containsKey("customKey")); + } + + private StandardProcessGroup createStandardProcessGroup(final String id) { + return new StandardProcessGroup( + id, + controllerServiceProvider, + processScheduler, + propertyEncryptor, + extensionManager, + stateManagerProvider, + flowManager, + reloadComponent, + nodeTypeProvider, + properties, + statelessGroupNodeFactory, + assetManager, + null + ); + } + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java index 9ef92941a8f6..76857093426f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java @@ -21,6 +21,9 @@ import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.logging.ComponentLog; +import java.util.Map; +import java.util.function.Consumer; + public interface FrameworkConnectorInitializationContextBuilder { FrameworkConnectorInitializationContextBuilder identifier(String identifier); @@ -35,5 +38,14 @@ public interface FrameworkConnectorInitializationContextBuilder { FrameworkConnectorInitializationContextBuilder componentBundleLookup(ComponentBundleLookup bundleLookup); + /** + * Registers a callback invoked when the connector calls + * {@link ConnectorInitializationContext#setLoggingAttributes(Map)}. The framework uses this to + * forward custom MDC logging attributes into the owning {@code StandardConnectorNode} so they + * are merged with the framework-managed connector keys and propagated to the connector's + * managed flow. + */ + FrameworkConnectorInitializationContextBuilder loggingAttributesConsumer(Consumer> loggingAttributesConsumer); + FrameworkConnectorInitializationContext build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java index 002f2a8aa2f8..2081988260b7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java @@ -28,6 +28,7 @@ import org.apache.nifi.logging.ComponentLog; import java.util.List; +import java.util.Map; import java.util.function.Consumer; public class StandardConnectorInitializationContext implements FrameworkConnectorInitializationContext { @@ -37,6 +38,7 @@ public class StandardConnectorInitializationContext implements FrameworkConnecto private final SecretsManager secretsManager; private final AssetManager assetManager; private final ComponentBundleLookup componentBundleLookup; + private final Consumer> loggingAttributesConsumer; protected StandardConnectorInitializationContext(final Builder builder) { @@ -46,6 +48,7 @@ protected StandardConnectorInitializationContext(final Builder builder) { this.secretsManager = builder.secretsManager; this.assetManager = builder.assetManager; this.componentBundleLookup = builder.componentBundleLookup; + this.loggingAttributesConsumer = builder.loggingAttributesConsumer; } @Override @@ -78,6 +81,14 @@ public AssetManager getAssetManager() { return assetManager; } + @Override + public void setLoggingAttributes(final Map attributes) { + if (loggingAttributesConsumer == null) { + throw new UnsupportedOperationException("setLoggingAttributes is not supported by this initialization context"); + } + loggingAttributesConsumer.accept(attributes); + } + @Override public void updateFlow(final FlowContext flowContext, final VersionedExternalFlow versionedExternalFlow, final BundleCompatibility bundleCompatability) throws FlowUpdateException { @@ -160,6 +171,7 @@ public static class Builder implements FrameworkConnectorInitializationContextBu private SecretsManager secretsManager; private AssetManager assetManager; private ComponentBundleLookup componentBundleLookup; + private Consumer> loggingAttributesConsumer; @Override public Builder identifier(final String identifier) { @@ -197,6 +209,12 @@ public Builder componentBundleLookup(final ComponentBundleLookup bundleLookup) { return this; } + @Override + public Builder loggingAttributesConsumer(final Consumer> loggingAttributesConsumer) { + this.loggingAttributesConsumer = loggingAttributesConsumer; + return this; + } + @Override public StandardConnectorInitializationContext build() { return new StandardConnectorInitializationContext(this); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index b9659ebb4860..030f973eedac 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -50,7 +50,9 @@ import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.GroupedComponent; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.util.StringUtils; @@ -81,9 +83,16 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class StandardConnectorNode implements ConnectorNode { +public class StandardConnectorNode implements ConnectorNode, GroupedComponent { private static final Logger logger = LoggerFactory.getLogger(StandardConnectorNode.class); + /** + * Soft cardinality limit beyond which a WARN is emitted when a connector supplies custom logging + * attributes. Exceeding this threshold signals a potential metric/MDC cardinality risk for + * downstream observability backends but does not reject the attributes. + */ + private static final int CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD = 10; + private final String identifier; private final FlowManager flowManager; private final ExtensionManager extensionManager; @@ -109,6 +118,10 @@ public class StandardConnectorNode implements ConnectorNode { private volatile String name; private volatile FrameworkConnectorInitializationContext initializationContext; + private final Object loggingAttributesLock = new Object(); + private volatile Map customLoggingAttributes = Map.of(); + private volatile Map mergedLoggingAttributes = Map.of(); + public StandardConnectorNode(final String identifier, final FlowManager flowManager, final ExtensionManager extensionManager, final Authorizable parentAuthorizable, final ConnectorDetails connectorDetails, final String componentType, final String componentCanonicalClass, @@ -133,6 +146,8 @@ public StandardConnectorNode(final String identifier, final FlowManager flowMana final Bundle activeFlowBundle = new Bundle(bundleCoordinate.getGroup(), bundleCoordinate.getId(), bundleCoordinate.getVersion()); this.activeFlowContext = flowContextFactory.createActiveFlowContext(identifier, connectorDetails.getComponentLog(), activeFlowBundle); + + rebuildLoggingAttributes(); } @Override @@ -143,6 +158,138 @@ public String getName() { @Override public void setName(final String name) { this.name = name; + rebuildLoggingAttributes(); + } + + /** + * Returns the {@link ProcessGroup} that holds the connector-managed flow, satisfying the + * {@link GroupedComponent} contract so that {@code StandardLoggingContext} can source MDC + * attributes for components running inside the connector's managed flow. + */ + @Override + public ProcessGroup getProcessGroup() { + final FrameworkFlowContext context = activeFlowContext; + return context == null ? null : context.getManagedProcessGroup(); + } + + /** + * Replaces the connector-supplied custom logging attributes. Reserved keys (those used by the + * framework, see {@link ConnectorLoggingAttribute}) are filtered out and a WARN is logged for + * each dropped entry. A WARN is also logged when the number of accepted custom keys exceeds + * {@value #CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD} to surface cardinality risk for + * MDC logging and OpenTelemetry metric labels. + * + * @param attributes the proposed custom attributes; {@code null} or empty clears the current set + */ + public void setCustomLoggingAttributes(final Map attributes) { + final Map filtered = filterReservedKeys(attributes); + + if (filtered.size() > CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD) { + logger.warn("{} supplied {} custom logging attributes which exceeds the soft threshold of {}; high-cardinality attributes can degrade MDC logging and OpenTelemetry metric backends", + this, filtered.size(), CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD); + } + + synchronized (loggingAttributesLock) { + this.customLoggingAttributes = filtered; + } + rebuildLoggingAttributes(); + } + + /** + * Returns an immutable snapshot of the merged framework + custom logging attributes currently + * advertised by this connector. The framework keys are populated by the framework from the + * connector's identifier, name, component type, and bundle coordinate. + */ + public Map getLoggingAttributes() { + return mergedLoggingAttributes; + } + + private Map filterReservedKeys(final Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return Map.of(); + } + final Map filtered = new HashMap<>(attributes.size()); + for (final Map.Entry entry : attributes.entrySet()) { + final String key = entry.getKey(); + if (key == null || key.isEmpty()) { + continue; + } + if (ConnectorLoggingAttribute.isReserved(key)) { + logger.warn("{} attempted to set reserved logging attribute [{}]; dropping the entry. Reserved keys are managed by the framework.", this, key); + continue; + } + filtered.put(key, entry.getValue()); + } + return Collections.unmodifiableMap(filtered); + } + + private void rebuildLoggingAttributes() { + final Map merged; + final Map custom; + synchronized (loggingAttributesLock) { + custom = customLoggingAttributes; + merged = new HashMap<>(custom.size() + ConnectorLoggingAttribute.values().length); + merged.putAll(custom); + // Framework keys are applied last so they always win against any not-yet-filtered overlap. + merged.put(ConnectorLoggingAttribute.CONNECTOR_ID.attribute, identifier); + merged.put(ConnectorLoggingAttribute.CONNECTOR_NAME.attribute, name == null ? "" : name); + merged.put(ConnectorLoggingAttribute.CONNECTOR_COMPONENT.attribute, componentCanonicalClass == null ? "" : componentCanonicalClass); + if (bundleCoordinate != null) { + merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_GROUP.attribute, bundleCoordinate.getGroup()); + merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_ARTIFACT.attribute, bundleCoordinate.getId()); + merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_VERSION.attribute, bundleCoordinate.getVersion()); + } + this.mergedLoggingAttributes = Collections.unmodifiableMap(merged); + } + + pushLoggingAttributesToManagedFlow(merged); + } + + private void pushLoggingAttributesToManagedFlow(final Map attributes) { + final ProcessGroup managedProcessGroup = getProcessGroup(); + if (managedProcessGroup instanceof StandardProcessGroup standardProcessGroup) { + standardProcessGroup.setConnectorLoggingAttributes(attributes); + } + } + + /** + * Framework-managed MDC logging attribute keys for a connector. The framework computes these + * automatically and they cannot be overridden by a connector's custom attributes. + */ + public enum ConnectorLoggingAttribute { + CONNECTOR_ID("connectorId"), + CONNECTOR_NAME("connectorName"), + CONNECTOR_COMPONENT("connectorComponent"), + CONNECTOR_BUNDLE_GROUP("connectorBundleGroup"), + CONNECTOR_BUNDLE_ARTIFACT("connectorBundleArtifact"), + CONNECTOR_BUNDLE_VERSION("connectorBundleVersion"); + + private static final Set RESERVED_KEYS; + static { + final Set reserved = new HashSet<>(); + for (final ConnectorLoggingAttribute attribute : values()) { + reserved.add(attribute.attribute); + } + RESERVED_KEYS = Collections.unmodifiableSet(reserved); + } + + private final String attribute; + + ConnectorLoggingAttribute(final String attribute) { + this.attribute = attribute; + } + + public String getAttribute() { + return attribute; + } + + public static boolean isReserved(final String key) { + return RESERVED_KEYS.contains(key); + } + + public static Set getReservedKeys() { + return RESERVED_KEYS; + } } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java index 45ef9980d4b2..9091a433189e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -521,10 +521,11 @@ public Resource getResource() { } final String componentType = connector.getClass().getSimpleName(); - final ComponentLog componentLog = new SimpleProcessLogger(identifier, connector, new StandardLoggingContext()); + final StandardLoggingContext loggingContext = new StandardLoggingContext(); + final ComponentLog componentLog = new SimpleProcessLogger(identifier, connector, loggingContext); final ConnectorDetails connectorDetails = new ConnectorDetails(connector, bundleCoordinate, componentLog); - final ConnectorNode connectorNode = new StandardConnectorNode( + final StandardConnectorNode connectorNode = new StandardConnectorNode( identifier, flowController.getFlowManager(), extensionManager, @@ -538,11 +539,15 @@ public Resource getResource() { connectorValidationTrigger, false ); + // Late-bind the logging context to the connector node so that MDC attributes assembled by + // the node (framework keys + connector-supplied custom keys) flow through the connector's + // own ComponentLog as well as the managed flow's nested components. + loggingContext.setComponent(connectorNode); try { initializeDefaultValues(connector, connectorNode.getActiveFlowContext()); - final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog); + final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode); connectorNode.initializeConnector(initContext); } catch (final Exception e) { logger.error("Could not initialize Connector of type {} from {} for ID {}; creating \"Ghost\" implementation", type, bundleCoordinate, identifier, e); @@ -565,13 +570,14 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth final GhostConnector ghostConnector = new GhostConnector(identifier, type, cause); final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - final ComponentLog componentLog = new SimpleProcessLogger(identifier, ghostConnector, new StandardLoggingContext()); + final StandardLoggingContext loggingContext = new StandardLoggingContext(); + final ComponentLog componentLog = new SimpleProcessLogger(identifier, ghostConnector, loggingContext); final ConnectorDetails connectorDetails = new ConnectorDetails(ghostConnector, bundleCoordinate, componentLog); // If an instance class loader has been created for this connector, remove it because it's no longer necessary. extensionManager.removeInstanceClassLoader(identifier); - final ConnectorNode connectorNode = new StandardConnectorNode( + final StandardConnectorNode connectorNode = new StandardConnectorNode( identifier, flowController.getFlowManager(), extensionManager, @@ -585,9 +591,10 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth connectorValidationTrigger, true ); + loggingContext.setComponent(connectorNode); // Initialize the ghost connector so that it can be properly configured during flow synchronization - final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog); + final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode); connectorNode.initializeConnector(initContext); return connectorNode; @@ -613,7 +620,8 @@ private void initializeDefaultValues(final Connector connector, final FrameworkF } } - private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog) { + private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog, + final StandardConnectorNode connectorNode) { final String name = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; return connectorInitializationContextBuilder @@ -623,6 +631,7 @@ private FrameworkConnectorInitializationContext createConnectorInitializationCon .secretsManager(flowController.getConnectorRepository().getSecretsManager()) .assetManager(flowController.getConnectorAssetManager()) .componentBundleLookup(componentBundleLookup) + .loggingAttributesConsumer(connectorNode::setCustomLoggingAttributes) .build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 259aed264641..55fa7c77593f 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -810,7 +810,7 @@ private FlowController( } eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, - auditService, analyticsEngine, flowFileRepository, contentRepository); + auditService, analyticsEngine, flowFileRepository, contentRepository, connectorRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { try { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index ec233778b65a..c8355ae97427 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -22,6 +22,9 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.flow.FlowManager; @@ -41,6 +44,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -55,10 +60,11 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar private final AuditService auditService; private final FlowFileRepository flowFileRepository; private final ContentRepository contentRepository; + private final ConnectorRepository connectorRepository; public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRepository flowFileEventRepository, final ProcessScheduler processScheduler, final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine, - final FlowFileRepository flowFileRepository, final ContentRepository contentRepository) { + final FlowFileRepository flowFileRepository, final ContentRepository contentRepository, final ConnectorRepository connectorRepository) { super(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository); this.flowFileEventRepository = flowFileEventRepository; this.flowManager = flowManager; @@ -67,6 +73,44 @@ public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRep this.auditService = auditService; this.flowFileRepository = flowFileRepository; this.contentRepository = contentRepository; + this.connectorRepository = connectorRepository; + } + + /** + * Returns a {@link ProcessGroupStatus} for each connector's managed root process group so that + * reporting tasks can observe metrics for flows running inside connectors. Connector-managed + * flows live outside the controller's root process group, so they are invisible to + * {@link #getControllerStatus()} and would otherwise be skipped by standard reporting tasks. + */ + @Override + public Collection getConnectorStatuses() { + if (connectorRepository == null) { + return Collections.emptyList(); + } + + final List connectors = connectorRepository.getConnectors(); + if (connectors == null || connectors.isEmpty()) { + return Collections.emptyList(); + } + + final RepositoryStatusReport statusReport = generateRepositoryStatusReport(); + final List statuses = new ArrayList<>(connectors.size()); + for (final ConnectorNode connector : connectors) { + final FrameworkFlowContext context = connector.getActiveFlowContext(); + if (context == null) { + continue; + } + final ProcessGroup managedGroup = context.getManagedProcessGroup(); + if (managedGroup == null) { + continue; + } + final ProcessGroupStatus status = getGroupStatus(managedGroup, statusReport, + authorizable -> true, Integer.MAX_VALUE, 1, true); + if (status != null) { + statuses.add(status); + } + } + return statuses; } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java index 565ddb05f9c7..c10fa522b9ac 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java @@ -39,8 +39,12 @@ import java.util.Optional; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -384,4 +388,38 @@ private StandardConnectorInitializationContext createContext(final ComponentBund .componentBundleLookup(bundleLookup) .build(); } + + @Test + public void testSetLoggingAttributesForwardsToConsumer() { + final AtomicReference> received = new AtomicReference<>(); + final StandardConnectorInitializationContext context = new StandardConnectorInitializationContext.Builder() + .identifier("test-connector") + .name("Test Connector") + .componentLog(mock(ComponentLog.class)) + .secretsManager(mock(SecretsManager.class)) + .assetManager(mock(AssetManager.class)) + .componentBundleLookup(mock(ComponentBundleLookup.class)) + .loggingAttributesConsumer(received::set) + .build(); + + final Map attributes = Map.of("region", "us-east-1", "tenant", "acme"); + context.setLoggingAttributes(attributes); + + assertNotNull(received.get()); + assertEquals(attributes, received.get()); + } + + @Test + public void testSetLoggingAttributesThrowsWhenNoConsumerRegistered() { + final StandardConnectorInitializationContext context = new StandardConnectorInitializationContext.Builder() + .identifier("test-connector") + .name("Test Connector") + .componentLog(mock(ComponentLog.class)) + .secretsManager(mock(SecretsManager.class)) + .assetManager(mock(AssetManager.class)) + .componentBundleLookup(mock(ComponentBundleLookup.class)) + .build(); + + assertThrows(UnsupportedOperationException.class, () -> context.setLoggingAttributes(Map.of("k", "v"))); + } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java index 00e136a66358..1c456e702bda 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java @@ -783,6 +783,88 @@ public void testCancelDrainFlowFilesInterruptsConnector() throws Exception { assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); } + @Test + public void testLoggingAttributesIncludeFrameworkKeys() throws Exception { + final SleepingConnector connector = new SleepingConnector(Duration.ofMillis(1)); + final StandardConnectorNode connectorNode = createConnectorNode(connector); + + final Map attributes = connectorNode.getLoggingAttributes(); + assertEquals("test-connector-id", attributes.get("connectorId")); + assertEquals(connector.getClass().getCanonicalName(), attributes.get("connectorComponent")); + assertEquals("org.apache.nifi", attributes.get("connectorBundleGroup")); + assertEquals("test-standard-connector-node", attributes.get("connectorBundleArtifact")); + assertEquals("1.0.0", attributes.get("connectorBundleVersion")); + assertEquals(connector.getClass().getSimpleName(), attributes.get("connectorName")); + } + + @Test + public void testSetNameRefreshesConnectorNameLoggingAttribute() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setName("Renamed Connector"); + + assertEquals("Renamed Connector", connectorNode.getLoggingAttributes().get("connectorName")); + } + + @Test + public void testSetCustomLoggingAttributesMergesWithFrameworkKeys() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setCustomLoggingAttributes(Map.of( + "customA", "valueA", + "customB", "valueB" + )); + + final Map attributes = connectorNode.getLoggingAttributes(); + assertEquals("valueA", attributes.get("customA")); + assertEquals("valueB", attributes.get("customB")); + assertEquals("test-connector-id", attributes.get("connectorId")); + } + + @Test + public void testSetCustomLoggingAttributesFiltersReservedKeys() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setCustomLoggingAttributes(Map.of( + "connectorId", "attempted-override", + "connectorBundleVersion", "9.9.9", + "customA", "valueA" + )); + + final Map attributes = connectorNode.getLoggingAttributes(); + // Reserved keys are owned by the framework: connectorId stays as the framework value. + assertEquals("test-connector-id", attributes.get("connectorId")); + assertEquals("1.0.0", attributes.get("connectorBundleVersion")); + // Non-reserved custom keys are preserved. + assertEquals("valueA", attributes.get("customA")); + } + + @Test + public void testSetCustomLoggingAttributesNullClears() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setCustomLoggingAttributes(Map.of("customA", "valueA")); + assertTrue(connectorNode.getLoggingAttributes().containsKey("customA")); + + connectorNode.setCustomLoggingAttributes(null); + assertFalse(connectorNode.getLoggingAttributes().containsKey("customA")); + // Framework keys remain. + assertEquals("test-connector-id", connectorNode.getLoggingAttributes().get("connectorId")); + } + + @Test + public void testReservedKeysExposeFrameworkAttributeNames() { + final Set reserved = StandardConnectorNode.ConnectorLoggingAttribute.getReservedKeys(); + assertTrue(reserved.contains("connectorId")); + assertTrue(reserved.contains("connectorName")); + assertTrue(reserved.contains("connectorComponent")); + assertTrue(reserved.contains("connectorBundleGroup")); + assertTrue(reserved.contains("connectorBundleArtifact")); + assertTrue(reserved.contains("connectorBundleVersion")); + } + + @Test + public void testGetProcessGroupReturnsManagedFlowRoot() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + assertEquals(managedProcessGroup, connectorNode.getProcessGroup()); + } + private StandardConnectorNode createConnectorNode() throws FlowUpdateException { final SleepingConnector sleepingConnector = new SleepingConnector(Duration.ofMillis(1)); return createConnectorNode(sleepingConnector); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java index 7661d1996c78..3f10a068e752 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java @@ -109,6 +109,27 @@ void testGetGroupStatusAuthorized() { assertEquals(1, authorizables.size()); } + @Test + void testGetGroupStatusSnapshotsLoggingAttributes() { + final Map loggingAttributes = Map.of( + "processGroupId", PROCESS_GROUP_ID, + "processGroupName", PROCESS_GROUP_NAME, + "connectorId", "connector-1", + "connectorName", "Postgres CDC" + ); + + when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME); + when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID); + when(processGroup.getLoggingAttributes()).thenReturn(loggingAttributes); + + final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus( + processGroup, new StandardRepositoryStatusReport(), authorizable -> true, + SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS); + + assertNotNull(groupStatus); + assertEquals(loggingAttributes, groupStatus.getLoggingAttributes()); + } + @Test void testGetGroupStatusProcessorAuthorizedNotIncluded() { final List authorizables = new ArrayList<>(); From 9ebba3d3b3d822bb7311f732ac6e5c35b1da0604 Mon Sep 17 00:00:00 2001 From: Kevin Doran Date: Mon, 1 Jun 2026 18:59:05 -0400 Subject: [PATCH 2/3] WIP --- .../main/asciidoc/administration-guide.adoc | 7 +-- .../ConnectorConfigurationProvider.java | 29 +++++++++++ .../connector/ConnectorRepository.java | 19 +++++++ ...ConnectorInitializationContextBuilder.java | 12 ----- ...tandardConnectorInitializationContext.java | 18 ------- .../StandardConnectorRepository.java | 15 ++++++ .../nifi/controller/ExtensionBuilder.java | 13 ++++- .../nifi/reporting/StandardEventAccess.java | 24 +++++---- ...tandardConnectorInitializationContext.java | 38 -------------- .../TestStandardConnectorRepository.java | 52 +++++++++++++++++++ 10 files changed, 145 insertions(+), 82 deletions(-) diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index a865f5219d5c..f7ed3cf57029 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -220,9 +220,10 @@ Connector: - `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was loaded from -A Connector may also supply additional, implementation-specific MDC values via -`ConnectorInitializationContext.setLoggingAttributes(Map)`. Keys reserved by the framework (those listed above) cannot -be overridden; attempts to do so are dropped and logged as a `WARN`. +Additional implementation-specific MDC values may be supplied by the framework +`ConnectorConfigurationProvider` extension via `getLoggingAttributes(String connectorId)`. The framework consults the +provider at connector creation time and merges the result with the framework-managed keys above. Keys reserved by the +framework (those listed above) cannot be overridden; attempts to do so are dropped and logged as a `WARN`. MDC named values can be added to a Logback pattern layout using the `mdc` conversion word. diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java index a0d5314a2f96..efa32f9f420d 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java @@ -20,6 +20,7 @@ import org.apache.nifi.flow.ScheduledState; import java.io.InputStream; +import java.util.Map; import java.util.Optional; /** @@ -174,6 +175,34 @@ default boolean shouldApplyUpdate(final String connectorId) { return true; } + /** + * Returns identity-shaped logging attributes the framework should include in the SLF4J {@code MDC} for + * every log line emitted by the Connector with the given identifier and by any component (Processor, + * Controller Service, etc.) running inside its managed flow. The same attributes are also snapshotted + * onto {@code ProcessGroupStatus} so that status-based reporting tasks (e.g. OpenTelemetry) can attach + * them as metric attributes. + * + *

The framework reserves a set of well-known keys that describe the Connector itself (identity, + * bundle coordinate, etc.). Any entry in the returned map whose key collides with a reserved + * framework-managed key is dropped by the framework and a {@code WARN} is logged for that entry; + * the remaining entries are accepted.

+ * + *

The framework consults this method once, at connector create time. It is the provider's + * responsibility to ensure the returned values are stable for the lifetime of the connector + * instance or to publish updates through other means; mid-lifecycle refresh is not currently + * defined.

+ * + *

The default returns {@link Map#of()} so that providers that do not need to publish additional + * attributes (and runtimes that have no provider configured) get only the framework-managed identity + * keys in MDC.

+ * + * @param connectorId the identifier of the connector whose logging attributes are being requested + * @return an immutable map of attribute keys to values; never {@code null} + */ + default Map getLoggingAttributes(final String connectorId) { + return Map.of(); + } + /** * Ensures that local asset binaries are up to date with the external store. For each asset * tracked in the provider's local state, this method compares the external store's current diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java index d8d06bc79efa..3a163a2da4d2 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Future; @@ -211,4 +212,22 @@ void inheritConfiguration(ConnectorNode connector, ListIf no {@link ConnectorConfigurationProvider} is configured for this repository, this method + * returns {@link Map#of()}. Otherwise, it delegates to + * {@link ConnectorConfigurationProvider#getLoggingAttributes(String)}. Provider implementations + * that throw should be handled by the repository to avoid breaking connector creation; the + * framework will fall back to an empty map and log a warning.

+ * + * @param connectorId the identifier of the connector whose logging attributes are being requested + * @return an immutable map of attribute keys to values; never {@code null} + */ + Map getLoggingAttributesForConnector(String connectorId); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java index 76857093426f..9ef92941a8f6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/FrameworkConnectorInitializationContextBuilder.java @@ -21,9 +21,6 @@ import org.apache.nifi.components.connector.secrets.SecretsManager; import org.apache.nifi.logging.ComponentLog; -import java.util.Map; -import java.util.function.Consumer; - public interface FrameworkConnectorInitializationContextBuilder { FrameworkConnectorInitializationContextBuilder identifier(String identifier); @@ -38,14 +35,5 @@ public interface FrameworkConnectorInitializationContextBuilder { FrameworkConnectorInitializationContextBuilder componentBundleLookup(ComponentBundleLookup bundleLookup); - /** - * Registers a callback invoked when the connector calls - * {@link ConnectorInitializationContext#setLoggingAttributes(Map)}. The framework uses this to - * forward custom MDC logging attributes into the owning {@code StandardConnectorNode} so they - * are merged with the framework-managed connector keys and propagated to the connector's - * managed flow. - */ - FrameworkConnectorInitializationContextBuilder loggingAttributesConsumer(Consumer> loggingAttributesConsumer); - FrameworkConnectorInitializationContext build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java index 2081988260b7..002f2a8aa2f8 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorInitializationContext.java @@ -28,7 +28,6 @@ import org.apache.nifi.logging.ComponentLog; import java.util.List; -import java.util.Map; import java.util.function.Consumer; public class StandardConnectorInitializationContext implements FrameworkConnectorInitializationContext { @@ -38,7 +37,6 @@ public class StandardConnectorInitializationContext implements FrameworkConnecto private final SecretsManager secretsManager; private final AssetManager assetManager; private final ComponentBundleLookup componentBundleLookup; - private final Consumer> loggingAttributesConsumer; protected StandardConnectorInitializationContext(final Builder builder) { @@ -48,7 +46,6 @@ protected StandardConnectorInitializationContext(final Builder builder) { this.secretsManager = builder.secretsManager; this.assetManager = builder.assetManager; this.componentBundleLookup = builder.componentBundleLookup; - this.loggingAttributesConsumer = builder.loggingAttributesConsumer; } @Override @@ -81,14 +78,6 @@ public AssetManager getAssetManager() { return assetManager; } - @Override - public void setLoggingAttributes(final Map attributes) { - if (loggingAttributesConsumer == null) { - throw new UnsupportedOperationException("setLoggingAttributes is not supported by this initialization context"); - } - loggingAttributesConsumer.accept(attributes); - } - @Override public void updateFlow(final FlowContext flowContext, final VersionedExternalFlow versionedExternalFlow, final BundleCompatibility bundleCompatability) throws FlowUpdateException { @@ -171,7 +160,6 @@ public static class Builder implements FrameworkConnectorInitializationContextBu private SecretsManager secretsManager; private AssetManager assetManager; private ComponentBundleLookup componentBundleLookup; - private Consumer> loggingAttributesConsumer; @Override public Builder identifier(final String identifier) { @@ -209,12 +197,6 @@ public Builder componentBundleLookup(final ComponentBundleLookup bundleLookup) { return this; } - @Override - public Builder loggingAttributesConsumer(final Consumer> loggingAttributesConsumer) { - this.loggingAttributesConsumer = loggingAttributesConsumer; - return this; - } - @Override public StandardConnectorInitializationContext build() { return new StandardConnectorInitializationContext(this); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java index bdb527fecc81..4910ed5c528e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java @@ -722,6 +722,21 @@ public SecretsManager getSecretsManager() { return secretsManager; } + @Override + public Map getLoggingAttributesForConnector(final String connectorId) { + if (configurationProvider == null) { + return Map.of(); + } + try { + final Map attributes = configurationProvider.getLoggingAttributes(connectorId); + return attributes == null ? Map.of() : attributes; + } catch (final Exception e) { + logger.warn("ConnectorConfigurationProvider [{}] threw while computing logging attributes for connector [{}]; using empty map: {}", + configurationProvider.getClass().getSimpleName(), connectorId, e.getMessage(), e); + return Map.of(); + } + } + @Override public ConnectorStateTransition createStateTransition(final String type, final String id) { final String componentDescription = "StandardConnectorNode[id=" + id + ", type=" + type + "]"; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java index 9091a433189e..f73f5249adf1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -540,10 +540,15 @@ public Resource getResource() { false ); // Late-bind the logging context to the connector node so that MDC attributes assembled by - // the node (framework keys + connector-supplied custom keys) flow through the connector's + // the node (framework keys + provider-supplied custom keys) flow through the connector's // own ComponentLog as well as the managed flow's nested components. loggingContext.setComponent(connectorNode); + // Pull provider-sourced logging attributes (e.g. connectorDefinitionId) from the ConnectorRepository + // and merge them into the node before any logs are emitted from the connector or its managed flow. + final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier); + connectorNode.setCustomLoggingAttributes(providerAttributes); + try { initializeDefaultValues(connector, connectorNode.getActiveFlowContext()); @@ -593,6 +598,11 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth ); loggingContext.setComponent(connectorNode); + // Pull provider-sourced logging attributes for the ghost connector as well so that MDC and + // status snapshots are consistent regardless of whether the real Connector implementation loaded. + final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier); + connectorNode.setCustomLoggingAttributes(providerAttributes); + // Initialize the ghost connector so that it can be properly configured during flow synchronization final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode); connectorNode.initializeConnector(initContext); @@ -631,7 +641,6 @@ private FrameworkConnectorInitializationContext createConnectorInitializationCon .secretsManager(flowController.getConnectorRepository().getSecretsManager()) .assetManager(flowController.getConnectorAssetManager()) .componentBundleLookup(componentBundleLookup) - .loggingAttributesConsumer(connectorNode::setCustomLoggingAttributes) .build(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index c8355ae97427..ebc9b7304e06 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -34,6 +34,7 @@ import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.RepositoryStatusReport; import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent; +import org.apache.nifi.controller.status.ConnectorStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; @@ -77,13 +78,13 @@ public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRep } /** - * Returns a {@link ProcessGroupStatus} for each connector's managed root process group so that - * reporting tasks can observe metrics for flows running inside connectors. Connector-managed - * flows live outside the controller's root process group, so they are invisible to - * {@link #getControllerStatus()} and would otherwise be skipped by standard reporting tasks. + * Returns a {@link ConnectorStatus} for each registered connector, each carrying the connector identity along with + * the {@link ProcessGroupStatus} of the connector's managed root process group. Connector-managed flows live + * outside the controller's root process group, so they are invisible to {@link #getControllerStatus()} and would + * otherwise be skipped by standard reporting tasks. */ @Override - public Collection getConnectorStatuses() { + public Collection getConnectorStatuses() { if (connectorRepository == null) { return Collections.emptyList(); } @@ -94,7 +95,7 @@ public Collection getConnectorStatuses() { } final RepositoryStatusReport statusReport = generateRepositoryStatusReport(); - final List statuses = new ArrayList<>(connectors.size()); + final List statuses = new ArrayList<>(connectors.size()); for (final ConnectorNode connector : connectors) { final FrameworkFlowContext context = connector.getActiveFlowContext(); if (context == null) { @@ -104,11 +105,16 @@ public Collection getConnectorStatuses() { if (managedGroup == null) { continue; } - final ProcessGroupStatus status = getGroupStatus(managedGroup, statusReport, + final ProcessGroupStatus rootGroupStatus = getGroupStatus(managedGroup, statusReport, authorizable -> true, Integer.MAX_VALUE, 1, true); - if (status != null) { - statuses.add(status); + if (rootGroupStatus == null) { + continue; } + final ConnectorStatus connectorStatus = new ConnectorStatus(); + connectorStatus.setId(connector.getIdentifier()); + connectorStatus.setName(connector.getName()); + connectorStatus.setRootGroupStatus(rootGroupStatus); + statuses.add(connectorStatus); } return statuses; } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java index c10fa522b9ac..565ddb05f9c7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorInitializationContext.java @@ -39,12 +39,8 @@ import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -388,38 +384,4 @@ private StandardConnectorInitializationContext createContext(final ComponentBund .componentBundleLookup(bundleLookup) .build(); } - - @Test - public void testSetLoggingAttributesForwardsToConsumer() { - final AtomicReference> received = new AtomicReference<>(); - final StandardConnectorInitializationContext context = new StandardConnectorInitializationContext.Builder() - .identifier("test-connector") - .name("Test Connector") - .componentLog(mock(ComponentLog.class)) - .secretsManager(mock(SecretsManager.class)) - .assetManager(mock(AssetManager.class)) - .componentBundleLookup(mock(ComponentBundleLookup.class)) - .loggingAttributesConsumer(received::set) - .build(); - - final Map attributes = Map.of("region", "us-east-1", "tenant", "acme"); - context.setLoggingAttributes(attributes); - - assertNotNull(received.get()); - assertEquals(attributes, received.get()); - } - - @Test - public void testSetLoggingAttributesThrowsWhenNoConsumerRegistered() { - final StandardConnectorInitializationContext context = new StandardConnectorInitializationContext.Builder() - .identifier("test-connector") - .name("Test Connector") - .componentLog(mock(ComponentLog.class)) - .secretsManager(mock(SecretsManager.class)) - .assetManager(mock(AssetManager.class)) - .componentBundleLookup(mock(ComponentBundleLookup.class)) - .build(); - - assertThrows(UnsupportedOperationException.class, () -> context.setLoggingAttributes(Map.of("k", "v"))); - } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java index cba4b12f5741..a8e661a78dfb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java @@ -1364,6 +1364,58 @@ private static ParameterProviderNode mockParameterProvider(final String name, fi return node; } + // --- getLoggingAttributesForConnector tests --- + + @Test + public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenNoProvider() { + final StandardConnectorRepository repository = new StandardConnectorRepository(); + final ConnectorRepositoryInitializationContext initContext = mock(ConnectorRepositoryInitializationContext.class); + when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class)); + when(initContext.getConnectorConfigurationProvider()).thenReturn(null); + repository.initialize(initContext); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + public void testGetLoggingAttributesForConnectorDelegatesToProvider() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final Map expected = Map.of("connectorDefinitionId", "snowflake.runtime.postgres-cdc"); + when(provider.getLoggingAttributes("connector-1")).thenReturn(expected); + + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertEquals(expected, attributes); + verify(provider).getLoggingAttributes("connector-1"); + } + + @Test + public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderReturnsNull() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + when(provider.getLoggingAttributes(anyString())).thenReturn(null); + + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderThrows() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + when(provider.getLoggingAttributes(anyString())).thenThrow(new RuntimeException("provider boom")); + + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + // --- Helper Methods --- private StandardConnectorRepository createRepositoryWithProviderAndAssetManager( From 4c2ff6967244e9fbe78b20c03f07cacabb11b883 Mon Sep 17 00:00:00 2001 From: Kevin Doran Date: Tue, 2 Jun 2026 12:15:19 -0400 Subject: [PATCH 3/3] NIFI-15979 Align Connector status capture with merged nifi-api - Drop the ProcessGroupStatus.loggingAttributes snapshot from AbstractEventAccess (the field was removed from nifi-api); update the affected test and doc comment. - Add a StatusHistoryRepository.capture overload carrying Collection, defaulting to the existing 4-arg capture so current implementations are unaffected. - Invoke the new overload from FlowController using EventAccess.getConnectorStatuses(), so Connector-managed flows reach the status history repository. - Use getConnectors(ConnectorSyncMode.LOCAL_ONLY) in StandardEventAccess. Co-authored-by: Cursor --- .../history/StatusHistoryRepository.java | 21 +++++++++++++++++++ .../nifi/reporting/AbstractEventAccess.java | 5 ----- .../connector/ConnectorRepository.java | 3 +-- .../nifi/controller/FlowController.java | 3 ++- .../nifi/reporting/StandardEventAccess.java | 3 ++- .../reporting/AbstractEventAccessTest.java | 21 ------------------- 6 files changed, 26 insertions(+), 30 deletions(-) diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java index 2b96feb6192e..b02258552811 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java @@ -16,9 +16,11 @@ */ package org.apache.nifi.controller.status.history; +import org.apache.nifi.controller.status.ConnectorStatus; import org.apache.nifi.controller.status.NodeStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; +import java.util.Collection; import java.util.Date; import java.util.List; @@ -48,6 +50,25 @@ public interface StatusHistoryRepository { */ void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List garbageCollectionStatus, Date timestamp); + /** + * Captures the status information provided in the given report, additionally providing the status of each + * Connector-managed flow. Connector-managed Process Groups are siblings of the root group and are not reachable + * from {@code rootGroupStatus}, so this overload allows implementations to observe them (for example, to export + * Connector-managed flow metrics). The default implementation ignores {@code connectorStatuses} and delegates to + * {@link #capture(NodeStatus, ProcessGroupStatus, List, Date)} so existing implementations continue to work + * unchanged. + * + * @param nodeStatus status of the node + * @param rootGroupStatus status of the root group and its content + * @param connectorStatuses status of each registered Connector, each carrying its managed root group status + * @param garbageCollectionStatus status of garbage collection + * @param timestamp timestamp of capture + */ + default void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, Collection connectorStatuses, + List garbageCollectionStatus, Date timestamp) { + capture(nodeStatus, rootGroupStatus, garbageCollectionStatus, timestamp); + } + /** * @param connectionId the ID of the Connection for which the Status is * desired diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java index de962cefa77e..4c1a10686375 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/reporting/AbstractEventAccess.java @@ -537,11 +537,6 @@ ProcessGroupStatus getGroupStatus(final ProcessGroup group, final RepositoryStat status.setBytesTransferred(bytesTransferred); status.setProcessingNanos(processingNanos); status.setProcessingPerformanceStatus(performanceStatus); - // Snapshot the PG's MDC logging attributes onto the status DTO so reporting tasks (e.g. the - // OpenTelemetry reporting task) can attribute metrics emitted for this PG to its owning - // connector. For non-connector PGs this is just the existing processGroupId/Name/path keys; - // for PGs inside a connector-managed flow it also carries connectorId/connectorName/etc. - status.setLoggingAttributes(group.getLoggingAttributes()); final VersionControlInformation vci = group.getVersionControlInformation(); if (vci != null) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java index 3a163a2da4d2..c25962e4e6c7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java @@ -217,8 +217,7 @@ void inheritConfiguration(ConnectorNode connector, ListIf no {@link ConnectorConfigurationProvider} is configured for this repository, this method * returns {@link Map#of()}. Otherwise, it delegates to diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 55fa7c77593f..6a2c827c8229 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -814,7 +814,8 @@ private FlowController( timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { try { - statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date()); + statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), eventAccess.getConnectorStatuses(), + getGarbageCollectionStatus(), new Date()); } catch (final Exception e) { LOG.error("Failed to capture component stats for Stats History", e); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index ebc9b7304e06..3e12745328ec 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -24,6 +24,7 @@ import org.apache.nifi.authorization.user.NiFiUser; import org.apache.nifi.components.connector.ConnectorNode; import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; @@ -89,7 +90,7 @@ public Collection getConnectorStatuses() { return Collections.emptyList(); } - final List connectors = connectorRepository.getConnectors(); + final List connectors = connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY); if (connectors == null || connectors.isEmpty()) { return Collections.emptyList(); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java index 3f10a068e752..7661d1996c78 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/reporting/AbstractEventAccessTest.java @@ -109,27 +109,6 @@ void testGetGroupStatusAuthorized() { assertEquals(1, authorizables.size()); } - @Test - void testGetGroupStatusSnapshotsLoggingAttributes() { - final Map loggingAttributes = Map.of( - "processGroupId", PROCESS_GROUP_ID, - "processGroupName", PROCESS_GROUP_NAME, - "connectorId", "connector-1", - "connectorName", "Postgres CDC" - ); - - when(processGroup.getName()).thenReturn(PROCESS_GROUP_NAME); - when(processGroup.getIdentifier()).thenReturn(PROCESS_GROUP_ID); - when(processGroup.getLoggingAttributes()).thenReturn(loggingAttributes); - - final ProcessGroupStatus groupStatus = eventAccess.getGroupStatus( - processGroup, new StandardRepositoryStatusReport(), authorizable -> true, - SINGLE_DEPTH, SINGLE_DEPTH, INCLUDE_CONNECTION_DETAILS); - - assertNotNull(groupStatus); - assertEquals(loggingAttributes, groupStatus.getLoggingAttributes()); - } - @Test void testGetGroupStatusProcessorAuthorizedNotIncluded() { final List authorizables = new ArrayList<>();