diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc index d5e4234bc39f..f7ed3cf57029 100644 --- a/nifi-docs/src/main/asciidoc/administration-guide.adoc +++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc @@ -211,11 +211,25 @@ Component logs provide the following MDC named values: - `processGroupName` contains the name of the Process Group - `processGroupNamePath` contains of the hierarchy of names for Process Groups with separators +Components that run inside a Connector-managed flow also carry framework-supplied MDC values that identify the owning +Connector: + +- `connectorId` contains the UUID of the Connector +- `connectorName` contains the user-visible name of the Connector +- `connectorComponent` contains the fully qualified class name of the Connector implementation +- `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was + loaded from + +Additional implementation-specific MDC values may be supplied by the framework +`ConnectorConfigurationProvider` extension via `getLoggingAttributes(String connectorId)`. The framework consults the +provider at connector creation time and merges the result with the framework-managed keys above. Keys reserved by the +framework (those listed above) cannot be overridden; attempts to do so are dropped and logged as a `WARN`. + MDC named values can be added to a Logback pattern layout using the `mdc` conversion word. [source, xml] ---- -%date %level [%thread] %mdc{processGroupId} %logger{40} %msg%n +%date %level [%thread] %mdc{connectorId} %mdc{processGroupId} %logger{40} %msg%n ---- Logs from classes other than extension components do not have MDC named values. Logs formatted using the pattern layout diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java index a0d5314a2f96..efa32f9f420d 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java @@ -20,6 +20,7 @@ import org.apache.nifi.flow.ScheduledState; import java.io.InputStream; +import java.util.Map; import java.util.Optional; /** @@ -174,6 +175,34 @@ default boolean shouldApplyUpdate(final String connectorId) { return true; } + /** + * Returns identity-shaped logging attributes the framework should include in the SLF4J {@code MDC} for + * every log line emitted by the Connector with the given identifier and by any component (Processor, + * Controller Service, etc.) running inside its managed flow. The same attributes are also snapshotted + * onto {@code ProcessGroupStatus} so that status-based reporting tasks (e.g. OpenTelemetry) can attach + * them as metric attributes. + * + *

The framework reserves a set of well-known keys that describe the Connector itself (identity, + * bundle coordinate, etc.). Any entry in the returned map whose key collides with a reserved + * framework-managed key is dropped by the framework and a {@code WARN} is logged for that entry; + * the remaining entries are accepted.

+ * + *

The framework consults this method once, at connector create time. It is the provider's + * responsibility to ensure the returned values are stable for the lifetime of the connector + * instance or to publish updates through other means; mid-lifecycle refresh is not currently + * defined.

+ * + *

The default returns {@link Map#of()} so that providers that do not need to publish additional + * attributes (and runtimes that have no provider configured) get only the framework-managed identity + * keys in MDC.

+ * + * @param connectorId the identifier of the connector whose logging attributes are being requested + * @return an immutable map of attribute keys to values; never {@code null} + */ + default Map getLoggingAttributes(final String connectorId) { + return Map.of(); + } + /** * Ensures that local asset binaries are up to date with the external store. For each asset * tracked in the provider's local state, this method compares the external store's current diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java index 2b96feb6192e..b02258552811 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java @@ -16,9 +16,11 @@ */ package org.apache.nifi.controller.status.history; +import org.apache.nifi.controller.status.ConnectorStatus; import org.apache.nifi.controller.status.NodeStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; +import java.util.Collection; import java.util.Date; import java.util.List; @@ -48,6 +50,25 @@ public interface StatusHistoryRepository { */ void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List garbageCollectionStatus, Date timestamp); + /** + * Captures the status information provided in the given report, additionally providing the status of each + * Connector-managed flow. Connector-managed Process Groups are siblings of the root group and are not reachable + * from {@code rootGroupStatus}, so this overload allows implementations to observe them (for example, to export + * Connector-managed flow metrics). The default implementation ignores {@code connectorStatuses} and delegates to + * {@link #capture(NodeStatus, ProcessGroupStatus, List, Date)} so existing implementations continue to work + * unchanged. + * + * @param nodeStatus status of the node + * @param rootGroupStatus status of the root group and its content + * @param connectorStatuses status of each registered Connector, each carrying its managed root group status + * @param garbageCollectionStatus status of garbage collection + * @param timestamp timestamp of capture + */ + default void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, Collection connectorStatuses, + List garbageCollectionStatus, Date timestamp) { + capture(nodeStatus, rootGroupStatus, garbageCollectionStatus, timestamp); + } + /** * @param connectionId the ID of the Connection for which the Status is * desired diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 3abd69197bf5..568cfa884772 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -229,6 +229,7 @@ public final class StandardProcessGroup implements ProcessGroup { private static final String UNREGISTERED_PATH_SEGMENT = "UNREGISTERED"; private final Map loggingAttributes = new ConcurrentHashMap<>(); + private volatile Map connectorLoggingAttributes = Map.of(); private volatile String logFileSuffix; public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler, @@ -303,6 +304,14 @@ public ProcessGroup getParent() { @Override public void setParent(final ProcessGroup newParent) { parent.set(newParent); + // Inherit connector-supplied MDC attributes from the new parent. Descendants of a connector's + // managed flow root must carry the same connectorId/connectorName/etc. so that logs and OTel + // metrics emitted by processors inside the connector flow can be attributed to the connector. + // This runs each time the PG is re-parented (including initial attach), so new PGs added to + // an existing connector flow inherit automatically. + if (newParent instanceof StandardProcessGroup standardParent) { + this.connectorLoggingAttributes = standardParent.connectorLoggingAttributes; + } setLoggingAttributes(); } @@ -4705,6 +4714,47 @@ private void setLoggingAttributes() { final String registeredFlowVersion = currentVersionControl.getVersion(); loggingAttributes.put(LoggingAttribute.REGISTERED_FLOW_VERSION.attribute, registeredFlowVersion); } + + // Merge connector-supplied attributes last so they are exposed to MDC/OTel snapshots taken + // from this PG. PG-level keys are added first to avoid being shadowed in case a connector + // ever tried to override them; the reserved-key filter on the connector side also enforces + // this separation defensively. + loggingAttributes.putAll(connectorLoggingAttributes); + } + + /** + * Stores the connector-managed MDC attributes for this process group and cascades the same + * attributes to all descendant process groups so that components anywhere in the connector's + * managed flow log with consistent connectorId/connectorName/etc. context. + * + *

This method is called by {@code StandardConnectorNode} against its managed root process + * group whenever the connector's framework keys (e.g. {@code connectorName}) change or when the + * connector provides updated custom logging attributes. Newly created descendant groups will + * also inherit the attributes lazily via {@link #setParent(ProcessGroup)}.

+ * + * @param attributes the merged set of connector logging attributes; an empty or {@code null} + * map clears any previously assigned attributes + */ + public void setConnectorLoggingAttributes(final Map attributes) { + final Map snapshot = (attributes == null || attributes.isEmpty()) + ? Map.of() + : Map.copyOf(attributes); + this.connectorLoggingAttributes = snapshot; + setLoggingAttributes(); + + for (final ProcessGroup child : getProcessGroups()) { + if (child instanceof StandardProcessGroup standardChild) { + standardChild.setConnectorLoggingAttributes(snapshot); + } + } + } + + /** + * Returns the connector-supplied MDC attributes assigned to this process group, if any. Returns + * an empty map when this PG is not inside a connector-managed flow. + */ + public Map getConnectorLoggingAttributes() { + return connectorLoggingAttributes; } private void setGroupPath() { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java index 35051a30f9fc..3acbfc58dfe6 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java @@ -44,6 +44,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.when; @@ -238,4 +239,110 @@ void testGetLoggingAttributesWithVersionControlInformation() { assertEquals(expected, loggingAttributes); } + + @Test + void testSetConnectorLoggingAttributesMergesIntoLoggingAttributes() { + processGroup.setName(NAME); + + final Map connectorAttributes = Map.of( + "connectorId", "connector-1", + "connectorName", "My Connector", + "connectorComponent", "com.example.MyConnector" + ); + + processGroup.setConnectorLoggingAttributes(connectorAttributes); + + final Map loggingAttributes = processGroup.getLoggingAttributes(); + assertEquals("connector-1", loggingAttributes.get("connectorId")); + assertEquals("My Connector", loggingAttributes.get("connectorName")); + assertEquals("com.example.MyConnector", loggingAttributes.get("connectorComponent")); + assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute())); + assertEquals(ID, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_ID.getAttribute())); + } + + @Test + void testSetConnectorLoggingAttributesCascadesToChildProcessGroups() { + processGroup.setName(NAME); + + final StandardProcessGroup child = createStandardProcessGroup("child-id"); + child.setName("Child"); + processGroup.addProcessGroup(child); + + final Map connectorAttributes = Map.of( + "connectorId", "connector-1", + "connectorName", "Postgres CDC" + ); + + processGroup.setConnectorLoggingAttributes(connectorAttributes); + + assertEquals("connector-1", child.getLoggingAttributes().get("connectorId")); + assertEquals("Postgres CDC", child.getLoggingAttributes().get("connectorName")); + } + + @Test + void testAddProcessGroupInheritsConnectorLoggingAttributesFromParent() { + processGroup.setName(NAME); + processGroup.setConnectorLoggingAttributes(Map.of( + "connectorId", "connector-1", + "connectorName", "Postgres CDC" + )); + + final StandardProcessGroup lateChild = createStandardProcessGroup("late-child-id"); + lateChild.setName("Late Child"); + processGroup.addProcessGroup(lateChild); + + assertEquals("connector-1", lateChild.getLoggingAttributes().get("connectorId")); + assertEquals("Postgres CDC", lateChild.getLoggingAttributes().get("connectorName")); + assertEquals(Map.copyOf(processGroup.getConnectorLoggingAttributes()), lateChild.getConnectorLoggingAttributes()); + } + + @Test + void testEmptyConnectorLoggingAttributesAddsNothing() { + processGroup.setName(NAME); + processGroup.setConnectorLoggingAttributes(Map.of()); + + final Map loggingAttributes = processGroup.getLoggingAttributes(); + assertFalse(loggingAttributes.containsKey("connectorId")); + assertTrue(processGroup.getConnectorLoggingAttributes().isEmpty()); + // Verify the PG-level keys are still present. + assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute())); + } + + @Test + void testSetConnectorLoggingAttributesReplacesPreviousValues() { + processGroup.setName(NAME); + processGroup.setConnectorLoggingAttributes(Map.of( + "connectorId", "connector-1", + "connectorName", "Old Name", + "customKey", "customValue" + )); + + processGroup.setConnectorLoggingAttributes(Map.of( + "connectorId", "connector-1", + "connectorName", "New Name" + )); + + final Map loggingAttributes = processGroup.getLoggingAttributes(); + assertEquals("New Name", loggingAttributes.get("connectorName")); + assertFalse(loggingAttributes.containsKey("customKey")); + } + + private StandardProcessGroup createStandardProcessGroup(final String id) { + return new StandardProcessGroup( + id, + controllerServiceProvider, + processScheduler, + propertyEncryptor, + extensionManager, + stateManagerProvider, + flowManager, + reloadComponent, + nodeTypeProvider, + properties, + statelessGroupNodeFactory, + assetManager, + null + ); + } + } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java index d8d06bc79efa..c25962e4e6c7 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.Future; @@ -211,4 +212,21 @@ void inheritConfiguration(ConnectorNode connector, ListIf no {@link ConnectorConfigurationProvider} is configured for this repository, this method + * returns {@link Map#of()}. Otherwise, it delegates to + * {@link ConnectorConfigurationProvider#getLoggingAttributes(String)}. Provider implementations + * that throw should be handled by the repository to avoid breaking connector creation; the + * framework will fall back to an empty map and log a warning.

+ * + * @param connectorId the identifier of the connector whose logging attributes are being requested + * @return an immutable map of attribute keys to values; never {@code null} + */ + Map getLoggingAttributesForConnector(String connectorId); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java index b9659ebb4860..030f973eedac 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java @@ -50,7 +50,9 @@ import org.apache.nifi.flow.VersionedProcessGroup; import org.apache.nifi.flow.VersionedProcessor; import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.groups.StandardProcessGroup; import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.logging.GroupedComponent; import org.apache.nifi.nar.ExtensionManager; import org.apache.nifi.nar.NarCloseable; import org.apache.nifi.util.StringUtils; @@ -81,9 +83,16 @@ import java.util.function.Function; import java.util.stream.Collectors; -public class StandardConnectorNode implements ConnectorNode { +public class StandardConnectorNode implements ConnectorNode, GroupedComponent { private static final Logger logger = LoggerFactory.getLogger(StandardConnectorNode.class); + /** + * Soft cardinality limit beyond which a WARN is emitted when a connector supplies custom logging + * attributes. Exceeding this threshold signals a potential metric/MDC cardinality risk for + * downstream observability backends but does not reject the attributes. + */ + private static final int CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD = 10; + private final String identifier; private final FlowManager flowManager; private final ExtensionManager extensionManager; @@ -109,6 +118,10 @@ public class StandardConnectorNode implements ConnectorNode { private volatile String name; private volatile FrameworkConnectorInitializationContext initializationContext; + private final Object loggingAttributesLock = new Object(); + private volatile Map customLoggingAttributes = Map.of(); + private volatile Map mergedLoggingAttributes = Map.of(); + public StandardConnectorNode(final String identifier, final FlowManager flowManager, final ExtensionManager extensionManager, final Authorizable parentAuthorizable, final ConnectorDetails connectorDetails, final String componentType, final String componentCanonicalClass, @@ -133,6 +146,8 @@ public StandardConnectorNode(final String identifier, final FlowManager flowMana final Bundle activeFlowBundle = new Bundle(bundleCoordinate.getGroup(), bundleCoordinate.getId(), bundleCoordinate.getVersion()); this.activeFlowContext = flowContextFactory.createActiveFlowContext(identifier, connectorDetails.getComponentLog(), activeFlowBundle); + + rebuildLoggingAttributes(); } @Override @@ -143,6 +158,138 @@ public String getName() { @Override public void setName(final String name) { this.name = name; + rebuildLoggingAttributes(); + } + + /** + * Returns the {@link ProcessGroup} that holds the connector-managed flow, satisfying the + * {@link GroupedComponent} contract so that {@code StandardLoggingContext} can source MDC + * attributes for components running inside the connector's managed flow. + */ + @Override + public ProcessGroup getProcessGroup() { + final FrameworkFlowContext context = activeFlowContext; + return context == null ? null : context.getManagedProcessGroup(); + } + + /** + * Replaces the connector-supplied custom logging attributes. Reserved keys (those used by the + * framework, see {@link ConnectorLoggingAttribute}) are filtered out and a WARN is logged for + * each dropped entry. A WARN is also logged when the number of accepted custom keys exceeds + * {@value #CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD} to surface cardinality risk for + * MDC logging and OpenTelemetry metric labels. + * + * @param attributes the proposed custom attributes; {@code null} or empty clears the current set + */ + public void setCustomLoggingAttributes(final Map attributes) { + final Map filtered = filterReservedKeys(attributes); + + if (filtered.size() > CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD) { + logger.warn("{} supplied {} custom logging attributes which exceeds the soft threshold of {}; high-cardinality attributes can degrade MDC logging and OpenTelemetry metric backends", + this, filtered.size(), CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD); + } + + synchronized (loggingAttributesLock) { + this.customLoggingAttributes = filtered; + } + rebuildLoggingAttributes(); + } + + /** + * Returns an immutable snapshot of the merged framework + custom logging attributes currently + * advertised by this connector. The framework keys are populated by the framework from the + * connector's identifier, name, component type, and bundle coordinate. + */ + public Map getLoggingAttributes() { + return mergedLoggingAttributes; + } + + private Map filterReservedKeys(final Map attributes) { + if (attributes == null || attributes.isEmpty()) { + return Map.of(); + } + final Map filtered = new HashMap<>(attributes.size()); + for (final Map.Entry entry : attributes.entrySet()) { + final String key = entry.getKey(); + if (key == null || key.isEmpty()) { + continue; + } + if (ConnectorLoggingAttribute.isReserved(key)) { + logger.warn("{} attempted to set reserved logging attribute [{}]; dropping the entry. Reserved keys are managed by the framework.", this, key); + continue; + } + filtered.put(key, entry.getValue()); + } + return Collections.unmodifiableMap(filtered); + } + + private void rebuildLoggingAttributes() { + final Map merged; + final Map custom; + synchronized (loggingAttributesLock) { + custom = customLoggingAttributes; + merged = new HashMap<>(custom.size() + ConnectorLoggingAttribute.values().length); + merged.putAll(custom); + // Framework keys are applied last so they always win against any not-yet-filtered overlap. + merged.put(ConnectorLoggingAttribute.CONNECTOR_ID.attribute, identifier); + merged.put(ConnectorLoggingAttribute.CONNECTOR_NAME.attribute, name == null ? "" : name); + merged.put(ConnectorLoggingAttribute.CONNECTOR_COMPONENT.attribute, componentCanonicalClass == null ? "" : componentCanonicalClass); + if (bundleCoordinate != null) { + merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_GROUP.attribute, bundleCoordinate.getGroup()); + merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_ARTIFACT.attribute, bundleCoordinate.getId()); + merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_VERSION.attribute, bundleCoordinate.getVersion()); + } + this.mergedLoggingAttributes = Collections.unmodifiableMap(merged); + } + + pushLoggingAttributesToManagedFlow(merged); + } + + private void pushLoggingAttributesToManagedFlow(final Map attributes) { + final ProcessGroup managedProcessGroup = getProcessGroup(); + if (managedProcessGroup instanceof StandardProcessGroup standardProcessGroup) { + standardProcessGroup.setConnectorLoggingAttributes(attributes); + } + } + + /** + * Framework-managed MDC logging attribute keys for a connector. The framework computes these + * automatically and they cannot be overridden by a connector's custom attributes. + */ + public enum ConnectorLoggingAttribute { + CONNECTOR_ID("connectorId"), + CONNECTOR_NAME("connectorName"), + CONNECTOR_COMPONENT("connectorComponent"), + CONNECTOR_BUNDLE_GROUP("connectorBundleGroup"), + CONNECTOR_BUNDLE_ARTIFACT("connectorBundleArtifact"), + CONNECTOR_BUNDLE_VERSION("connectorBundleVersion"); + + private static final Set RESERVED_KEYS; + static { + final Set reserved = new HashSet<>(); + for (final ConnectorLoggingAttribute attribute : values()) { + reserved.add(attribute.attribute); + } + RESERVED_KEYS = Collections.unmodifiableSet(reserved); + } + + private final String attribute; + + ConnectorLoggingAttribute(final String attribute) { + this.attribute = attribute; + } + + public String getAttribute() { + return attribute; + } + + public static boolean isReserved(final String key) { + return RESERVED_KEYS.contains(key); + } + + public static Set getReservedKeys() { + return RESERVED_KEYS; + } } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java index bdb527fecc81..4910ed5c528e 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java @@ -722,6 +722,21 @@ public SecretsManager getSecretsManager() { return secretsManager; } + @Override + public Map getLoggingAttributesForConnector(final String connectorId) { + if (configurationProvider == null) { + return Map.of(); + } + try { + final Map attributes = configurationProvider.getLoggingAttributes(connectorId); + return attributes == null ? Map.of() : attributes; + } catch (final Exception e) { + logger.warn("ConnectorConfigurationProvider [{}] threw while computing logging attributes for connector [{}]; using empty map: {}", + configurationProvider.getClass().getSimpleName(), connectorId, e.getMessage(), e); + return Map.of(); + } + } + @Override public ConnectorStateTransition createStateTransition(final String type, final String id) { final String componentDescription = "StandardConnectorNode[id=" + id + ", type=" + type + "]"; diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java index 45ef9980d4b2..f73f5249adf1 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java @@ -521,10 +521,11 @@ public Resource getResource() { } final String componentType = connector.getClass().getSimpleName(); - final ComponentLog componentLog = new SimpleProcessLogger(identifier, connector, new StandardLoggingContext()); + final StandardLoggingContext loggingContext = new StandardLoggingContext(); + final ComponentLog componentLog = new SimpleProcessLogger(identifier, connector, loggingContext); final ConnectorDetails connectorDetails = new ConnectorDetails(connector, bundleCoordinate, componentLog); - final ConnectorNode connectorNode = new StandardConnectorNode( + final StandardConnectorNode connectorNode = new StandardConnectorNode( identifier, flowController.getFlowManager(), extensionManager, @@ -538,11 +539,20 @@ public Resource getResource() { connectorValidationTrigger, false ); + // Late-bind the logging context to the connector node so that MDC attributes assembled by + // the node (framework keys + provider-supplied custom keys) flow through the connector's + // own ComponentLog as well as the managed flow's nested components. + loggingContext.setComponent(connectorNode); + + // Pull provider-sourced logging attributes (e.g. connectorDefinitionId) from the ConnectorRepository + // and merge them into the node before any logs are emitted from the connector or its managed flow. + final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier); + connectorNode.setCustomLoggingAttributes(providerAttributes); try { initializeDefaultValues(connector, connectorNode.getActiveFlowContext()); - final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog); + final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode); connectorNode.initializeConnector(initContext); } catch (final Exception e) { logger.error("Could not initialize Connector of type {} from {} for ID {}; creating \"Ghost\" implementation", type, bundleCoordinate, identifier, e); @@ -565,13 +575,14 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth final GhostConnector ghostConnector = new GhostConnector(identifier, type, cause); final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; final String componentType = "(Missing) " + simpleClassName; - final ComponentLog componentLog = new SimpleProcessLogger(identifier, ghostConnector, new StandardLoggingContext()); + final StandardLoggingContext loggingContext = new StandardLoggingContext(); + final ComponentLog componentLog = new SimpleProcessLogger(identifier, ghostConnector, loggingContext); final ConnectorDetails connectorDetails = new ConnectorDetails(ghostConnector, bundleCoordinate, componentLog); // If an instance class loader has been created for this connector, remove it because it's no longer necessary. extensionManager.removeInstanceClassLoader(identifier); - final ConnectorNode connectorNode = new StandardConnectorNode( + final StandardConnectorNode connectorNode = new StandardConnectorNode( identifier, flowController.getFlowManager(), extensionManager, @@ -585,9 +596,15 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth connectorValidationTrigger, true ); + loggingContext.setComponent(connectorNode); + + // Pull provider-sourced logging attributes for the ghost connector as well so that MDC and + // status snapshots are consistent regardless of whether the real Connector implementation loaded. + final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier); + connectorNode.setCustomLoggingAttributes(providerAttributes); // Initialize the ghost connector so that it can be properly configured during flow synchronization - final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog); + final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode); connectorNode.initializeConnector(initContext); return connectorNode; @@ -613,7 +630,8 @@ private void initializeDefaultValues(final Connector connector, final FrameworkF } } - private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog) { + private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog, + final StandardConnectorNode connectorNode) { final String name = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type; return connectorInitializationContextBuilder diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 259aed264641..6a2c827c8229 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -810,11 +810,12 @@ private FlowController( } eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, - auditService, analyticsEngine, flowFileRepository, contentRepository); + auditService, analyticsEngine, flowFileRepository, contentRepository, connectorRepository); timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { try { - statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date()); + statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), eventAccess.getConnectorStatuses(), + getGarbageCollectionStatus(), new Date()); } catch (final Exception e) { LOG.error("Failed to capture component stats for Stats History", e); } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java index ec233778b65a..3e12745328ec 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java @@ -22,6 +22,10 @@ import org.apache.nifi.authorization.RequestAction; import org.apache.nifi.authorization.resource.Authorizable; import org.apache.nifi.authorization.user.NiFiUser; +import org.apache.nifi.components.connector.ConnectorNode; +import org.apache.nifi.components.connector.ConnectorRepository; +import org.apache.nifi.components.connector.ConnectorSyncMode; +import org.apache.nifi.components.connector.FrameworkFlowContext; import org.apache.nifi.controller.ProcessScheduler; import org.apache.nifi.controller.ProcessorNode; import org.apache.nifi.controller.flow.FlowManager; @@ -31,6 +35,7 @@ import org.apache.nifi.controller.repository.FlowFileRepository; import org.apache.nifi.controller.repository.RepositoryStatusReport; import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent; +import org.apache.nifi.controller.status.ConnectorStatus; import org.apache.nifi.controller.status.ProcessGroupStatus; import org.apache.nifi.controller.status.ProcessorStatus; import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine; @@ -41,6 +46,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -55,10 +62,11 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar private final AuditService auditService; private final FlowFileRepository flowFileRepository; private final ContentRepository contentRepository; + private final ConnectorRepository connectorRepository; public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRepository flowFileEventRepository, final ProcessScheduler processScheduler, final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine, - final FlowFileRepository flowFileRepository, final ContentRepository contentRepository) { + final FlowFileRepository flowFileRepository, final ContentRepository contentRepository, final ConnectorRepository connectorRepository) { super(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository); this.flowFileEventRepository = flowFileEventRepository; this.flowManager = flowManager; @@ -67,6 +75,49 @@ public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRep this.auditService = auditService; this.flowFileRepository = flowFileRepository; this.contentRepository = contentRepository; + this.connectorRepository = connectorRepository; + } + + /** + * Returns a {@link ConnectorStatus} for each registered connector, each carrying the connector identity along with + * the {@link ProcessGroupStatus} of the connector's managed root process group. Connector-managed flows live + * outside the controller's root process group, so they are invisible to {@link #getControllerStatus()} and would + * otherwise be skipped by standard reporting tasks. + */ + @Override + public Collection getConnectorStatuses() { + if (connectorRepository == null) { + return Collections.emptyList(); + } + + final List connectors = connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY); + if (connectors == null || connectors.isEmpty()) { + return Collections.emptyList(); + } + + final RepositoryStatusReport statusReport = generateRepositoryStatusReport(); + final List statuses = new ArrayList<>(connectors.size()); + for (final ConnectorNode connector : connectors) { + final FrameworkFlowContext context = connector.getActiveFlowContext(); + if (context == null) { + continue; + } + final ProcessGroup managedGroup = context.getManagedProcessGroup(); + if (managedGroup == null) { + continue; + } + final ProcessGroupStatus rootGroupStatus = getGroupStatus(managedGroup, statusReport, + authorizable -> true, Integer.MAX_VALUE, 1, true); + if (rootGroupStatus == null) { + continue; + } + final ConnectorStatus connectorStatus = new ConnectorStatus(); + connectorStatus.setId(connector.getIdentifier()); + connectorStatus.setName(connector.getName()); + connectorStatus.setRootGroupStatus(rootGroupStatus); + statuses.add(connectorStatus); + } + return statuses; } @Override diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java index 00e136a66358..1c456e702bda 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java @@ -783,6 +783,88 @@ public void testCancelDrainFlowFilesInterruptsConnector() throws Exception { assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState()); } + @Test + public void testLoggingAttributesIncludeFrameworkKeys() throws Exception { + final SleepingConnector connector = new SleepingConnector(Duration.ofMillis(1)); + final StandardConnectorNode connectorNode = createConnectorNode(connector); + + final Map attributes = connectorNode.getLoggingAttributes(); + assertEquals("test-connector-id", attributes.get("connectorId")); + assertEquals(connector.getClass().getCanonicalName(), attributes.get("connectorComponent")); + assertEquals("org.apache.nifi", attributes.get("connectorBundleGroup")); + assertEquals("test-standard-connector-node", attributes.get("connectorBundleArtifact")); + assertEquals("1.0.0", attributes.get("connectorBundleVersion")); + assertEquals(connector.getClass().getSimpleName(), attributes.get("connectorName")); + } + + @Test + public void testSetNameRefreshesConnectorNameLoggingAttribute() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setName("Renamed Connector"); + + assertEquals("Renamed Connector", connectorNode.getLoggingAttributes().get("connectorName")); + } + + @Test + public void testSetCustomLoggingAttributesMergesWithFrameworkKeys() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setCustomLoggingAttributes(Map.of( + "customA", "valueA", + "customB", "valueB" + )); + + final Map attributes = connectorNode.getLoggingAttributes(); + assertEquals("valueA", attributes.get("customA")); + assertEquals("valueB", attributes.get("customB")); + assertEquals("test-connector-id", attributes.get("connectorId")); + } + + @Test + public void testSetCustomLoggingAttributesFiltersReservedKeys() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setCustomLoggingAttributes(Map.of( + "connectorId", "attempted-override", + "connectorBundleVersion", "9.9.9", + "customA", "valueA" + )); + + final Map attributes = connectorNode.getLoggingAttributes(); + // Reserved keys are owned by the framework: connectorId stays as the framework value. + assertEquals("test-connector-id", attributes.get("connectorId")); + assertEquals("1.0.0", attributes.get("connectorBundleVersion")); + // Non-reserved custom keys are preserved. + assertEquals("valueA", attributes.get("customA")); + } + + @Test + public void testSetCustomLoggingAttributesNullClears() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + connectorNode.setCustomLoggingAttributes(Map.of("customA", "valueA")); + assertTrue(connectorNode.getLoggingAttributes().containsKey("customA")); + + connectorNode.setCustomLoggingAttributes(null); + assertFalse(connectorNode.getLoggingAttributes().containsKey("customA")); + // Framework keys remain. + assertEquals("test-connector-id", connectorNode.getLoggingAttributes().get("connectorId")); + } + + @Test + public void testReservedKeysExposeFrameworkAttributeNames() { + final Set reserved = StandardConnectorNode.ConnectorLoggingAttribute.getReservedKeys(); + assertTrue(reserved.contains("connectorId")); + assertTrue(reserved.contains("connectorName")); + assertTrue(reserved.contains("connectorComponent")); + assertTrue(reserved.contains("connectorBundleGroup")); + assertTrue(reserved.contains("connectorBundleArtifact")); + assertTrue(reserved.contains("connectorBundleVersion")); + } + + @Test + public void testGetProcessGroupReturnsManagedFlowRoot() throws Exception { + final StandardConnectorNode connectorNode = createConnectorNode(); + assertEquals(managedProcessGroup, connectorNode.getProcessGroup()); + } + private StandardConnectorNode createConnectorNode() throws FlowUpdateException { final SleepingConnector sleepingConnector = new SleepingConnector(Duration.ofMillis(1)); return createConnectorNode(sleepingConnector); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java index cba4b12f5741..a8e661a78dfb 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java @@ -1364,6 +1364,58 @@ private static ParameterProviderNode mockParameterProvider(final String name, fi return node; } + // --- getLoggingAttributesForConnector tests --- + + @Test + public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenNoProvider() { + final StandardConnectorRepository repository = new StandardConnectorRepository(); + final ConnectorRepositoryInitializationContext initContext = mock(ConnectorRepositoryInitializationContext.class); + when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class)); + when(initContext.getConnectorConfigurationProvider()).thenReturn(null); + repository.initialize(initContext); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + public void testGetLoggingAttributesForConnectorDelegatesToProvider() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + final Map expected = Map.of("connectorDefinitionId", "snowflake.runtime.postgres-cdc"); + when(provider.getLoggingAttributes("connector-1")).thenReturn(expected); + + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertEquals(expected, attributes); + verify(provider).getLoggingAttributes("connector-1"); + } + + @Test + public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderReturnsNull() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + when(provider.getLoggingAttributes(anyString())).thenReturn(null); + + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + + @Test + public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderThrows() { + final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class); + when(provider.getLoggingAttributes(anyString())).thenThrow(new RuntimeException("provider boom")); + + final StandardConnectorRepository repository = createRepositoryWithProvider(provider); + + final Map attributes = repository.getLoggingAttributesForConnector("connector-1"); + assertNotNull(attributes); + assertTrue(attributes.isEmpty()); + } + // --- Helper Methods --- private StandardConnectorRepository createRepositoryWithProviderAndAssetManager(