diff --git a/nifi-docs/src/main/asciidoc/administration-guide.adoc b/nifi-docs/src/main/asciidoc/administration-guide.adoc
index d5e4234bc39f..f7ed3cf57029 100644
--- a/nifi-docs/src/main/asciidoc/administration-guide.adoc
+++ b/nifi-docs/src/main/asciidoc/administration-guide.adoc
@@ -211,11 +211,25 @@ Component logs provide the following MDC named values:
- `processGroupName` contains the name of the Process Group
- `processGroupNamePath` contains of the hierarchy of names for Process Groups with separators
+Components that run inside a Connector-managed flow also carry framework-supplied MDC values that identify the owning
+Connector:
+
+- `connectorId` contains the UUID of the Connector
+- `connectorName` contains the user-visible name of the Connector
+- `connectorComponent` contains the fully qualified class name of the Connector implementation
+- `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was
+ loaded from
+
+Additional implementation-specific MDC values may be supplied by the framework
+`ConnectorConfigurationProvider` extension via `getLoggingAttributes(String connectorId)`. The framework consults the
+provider at connector creation time and merges the result with the framework-managed keys above. Keys reserved by the
+framework (those listed above) cannot be overridden; attempts to do so are dropped and logged as a `WARN`.
+
MDC named values can be added to a Logback pattern layout using the `mdc` conversion word.
[source, xml]
----
-%date %level [%thread] %mdc{processGroupId} %logger{40} %msg%n
+%date %level [%thread] %mdc{connectorId} %mdc{processGroupId} %logger{40} %msg%n
----
Logs from classes other than extension components do not have MDC named values. Logs formatted using the pattern layout
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java
index a0d5314a2f96..efa32f9f420d 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/components/connector/ConnectorConfigurationProvider.java
@@ -20,6 +20,7 @@
import org.apache.nifi.flow.ScheduledState;
import java.io.InputStream;
+import java.util.Map;
import java.util.Optional;
/**
@@ -174,6 +175,34 @@ default boolean shouldApplyUpdate(final String connectorId) {
return true;
}
+ /**
+ * Returns identity-shaped logging attributes the framework should include in the SLF4J {@code MDC} for
+ * every log line emitted by the Connector with the given identifier and by any component (Processor,
+ * Controller Service, etc.) running inside its managed flow. The same attributes are also snapshotted
+ * onto {@code ProcessGroupStatus} so that status-based reporting tasks (e.g. OpenTelemetry) can attach
+ * them as metric attributes.
+ *
+ *
The framework reserves a set of well-known keys that describe the Connector itself (identity,
+ * bundle coordinate, etc.). Any entry in the returned map whose key collides with a reserved
+ * framework-managed key is dropped by the framework and a {@code WARN} is logged for that entry;
+ * the remaining entries are accepted.
+ *
+ * The framework consults this method once, at connector create time. It is the provider's
+ * responsibility to ensure the returned values are stable for the lifetime of the connector
+ * instance or to publish updates through other means; mid-lifecycle refresh is not currently
+ * defined.
+ *
+ * The default returns {@link Map#of()} so that providers that do not need to publish additional
+ * attributes (and runtimes that have no provider configured) get only the framework-managed identity
+ * keys in MDC.
+ *
+ * @param connectorId the identifier of the connector whose logging attributes are being requested
+ * @return an immutable map of attribute keys to values; never {@code null}
+ */
+ default Map getLoggingAttributes(final String connectorId) {
+ return Map.of();
+ }
+
/**
* Ensures that local asset binaries are up to date with the external store. For each asset
* tracked in the provider's local state, this method compares the external store's current
diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java
index 2b96feb6192e..b02258552811 100644
--- a/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java
+++ b/nifi-framework-api/src/main/java/org/apache/nifi/controller/status/history/StatusHistoryRepository.java
@@ -16,9 +16,11 @@
*/
package org.apache.nifi.controller.status.history;
+import org.apache.nifi.controller.status.ConnectorStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
+import java.util.Collection;
import java.util.Date;
import java.util.List;
@@ -48,6 +50,25 @@ public interface StatusHistoryRepository {
*/
void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List garbageCollectionStatus, Date timestamp);
+ /**
+ * Captures the status information provided in the given report, additionally providing the status of each
+ * Connector-managed flow. Connector-managed Process Groups are siblings of the root group and are not reachable
+ * from {@code rootGroupStatus}, so this overload allows implementations to observe them (for example, to export
+ * Connector-managed flow metrics). The default implementation ignores {@code connectorStatuses} and delegates to
+ * {@link #capture(NodeStatus, ProcessGroupStatus, List, Date)} so existing implementations continue to work
+ * unchanged.
+ *
+ * @param nodeStatus status of the node
+ * @param rootGroupStatus status of the root group and its content
+ * @param connectorStatuses status of each registered Connector, each carrying its managed root group status
+ * @param garbageCollectionStatus status of garbage collection
+ * @param timestamp timestamp of capture
+ */
+ default void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, Collection connectorStatuses,
+ List garbageCollectionStatus, Date timestamp) {
+ capture(nodeStatus, rootGroupStatus, garbageCollectionStatus, timestamp);
+ }
+
/**
* @param connectionId the ID of the Connection for which the Status is
* desired
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
index 3abd69197bf5..568cfa884772 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java
@@ -229,6 +229,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String UNREGISTERED_PATH_SEGMENT = "UNREGISTERED";
private final Map loggingAttributes = new ConcurrentHashMap<>();
+ private volatile Map connectorLoggingAttributes = Map.of();
private volatile String logFileSuffix;
public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
@@ -303,6 +304,14 @@ public ProcessGroup getParent() {
@Override
public void setParent(final ProcessGroup newParent) {
parent.set(newParent);
+ // Inherit connector-supplied MDC attributes from the new parent. Descendants of a connector's
+ // managed flow root must carry the same connectorId/connectorName/etc. so that logs and OTel
+ // metrics emitted by processors inside the connector flow can be attributed to the connector.
+ // This runs each time the PG is re-parented (including initial attach), so new PGs added to
+ // an existing connector flow inherit automatically.
+ if (newParent instanceof StandardProcessGroup standardParent) {
+ this.connectorLoggingAttributes = standardParent.connectorLoggingAttributes;
+ }
setLoggingAttributes();
}
@@ -4705,6 +4714,47 @@ private void setLoggingAttributes() {
final String registeredFlowVersion = currentVersionControl.getVersion();
loggingAttributes.put(LoggingAttribute.REGISTERED_FLOW_VERSION.attribute, registeredFlowVersion);
}
+
+ // Merge connector-supplied attributes last so they are exposed to MDC/OTel snapshots taken
+ // from this PG. PG-level keys are added first to avoid being shadowed in case a connector
+ // ever tried to override them; the reserved-key filter on the connector side also enforces
+ // this separation defensively.
+ loggingAttributes.putAll(connectorLoggingAttributes);
+ }
+
+ /**
+ * Stores the connector-managed MDC attributes for this process group and cascades the same
+ * attributes to all descendant process groups so that components anywhere in the connector's
+ * managed flow log with consistent connectorId/connectorName/etc. context.
+ *
+ * This method is called by {@code StandardConnectorNode} against its managed root process
+ * group whenever the connector's framework keys (e.g. {@code connectorName}) change or when the
+ * connector provides updated custom logging attributes. Newly created descendant groups will
+ * also inherit the attributes lazily via {@link #setParent(ProcessGroup)}.
+ *
+ * @param attributes the merged set of connector logging attributes; an empty or {@code null}
+ * map clears any previously assigned attributes
+ */
+ public void setConnectorLoggingAttributes(final Map attributes) {
+ final Map snapshot = (attributes == null || attributes.isEmpty())
+ ? Map.of()
+ : Map.copyOf(attributes);
+ this.connectorLoggingAttributes = snapshot;
+ setLoggingAttributes();
+
+ for (final ProcessGroup child : getProcessGroups()) {
+ if (child instanceof StandardProcessGroup standardChild) {
+ standardChild.setConnectorLoggingAttributes(snapshot);
+ }
+ }
+ }
+
+ /**
+ * Returns the connector-supplied MDC attributes assigned to this process group, if any. Returns
+ * an empty map when this PG is not inside a connector-managed flow.
+ */
+ public Map getConnectorLoggingAttributes() {
+ return connectorLoggingAttributes;
}
private void setGroupPath() {
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java
index 35051a30f9fc..3acbfc58dfe6 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/test/java/org/apache/nifi/groups/StandardProcessGroupTest.java
@@ -44,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
@@ -238,4 +239,110 @@ void testGetLoggingAttributesWithVersionControlInformation() {
assertEquals(expected, loggingAttributes);
}
+
+ @Test
+ void testSetConnectorLoggingAttributesMergesIntoLoggingAttributes() {
+ processGroup.setName(NAME);
+
+ final Map connectorAttributes = Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "My Connector",
+ "connectorComponent", "com.example.MyConnector"
+ );
+
+ processGroup.setConnectorLoggingAttributes(connectorAttributes);
+
+ final Map loggingAttributes = processGroup.getLoggingAttributes();
+ assertEquals("connector-1", loggingAttributes.get("connectorId"));
+ assertEquals("My Connector", loggingAttributes.get("connectorName"));
+ assertEquals("com.example.MyConnector", loggingAttributes.get("connectorComponent"));
+ assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
+ assertEquals(ID, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_ID.getAttribute()));
+ }
+
+ @Test
+ void testSetConnectorLoggingAttributesCascadesToChildProcessGroups() {
+ processGroup.setName(NAME);
+
+ final StandardProcessGroup child = createStandardProcessGroup("child-id");
+ child.setName("Child");
+ processGroup.addProcessGroup(child);
+
+ final Map connectorAttributes = Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "Postgres CDC"
+ );
+
+ processGroup.setConnectorLoggingAttributes(connectorAttributes);
+
+ assertEquals("connector-1", child.getLoggingAttributes().get("connectorId"));
+ assertEquals("Postgres CDC", child.getLoggingAttributes().get("connectorName"));
+ }
+
+ @Test
+ void testAddProcessGroupInheritsConnectorLoggingAttributesFromParent() {
+ processGroup.setName(NAME);
+ processGroup.setConnectorLoggingAttributes(Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "Postgres CDC"
+ ));
+
+ final StandardProcessGroup lateChild = createStandardProcessGroup("late-child-id");
+ lateChild.setName("Late Child");
+ processGroup.addProcessGroup(lateChild);
+
+ assertEquals("connector-1", lateChild.getLoggingAttributes().get("connectorId"));
+ assertEquals("Postgres CDC", lateChild.getLoggingAttributes().get("connectorName"));
+ assertEquals(Map.copyOf(processGroup.getConnectorLoggingAttributes()), lateChild.getConnectorLoggingAttributes());
+ }
+
+ @Test
+ void testEmptyConnectorLoggingAttributesAddsNothing() {
+ processGroup.setName(NAME);
+ processGroup.setConnectorLoggingAttributes(Map.of());
+
+ final Map loggingAttributes = processGroup.getLoggingAttributes();
+ assertFalse(loggingAttributes.containsKey("connectorId"));
+ assertTrue(processGroup.getConnectorLoggingAttributes().isEmpty());
+ // Verify the PG-level keys are still present.
+ assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
+ }
+
+ @Test
+ void testSetConnectorLoggingAttributesReplacesPreviousValues() {
+ processGroup.setName(NAME);
+ processGroup.setConnectorLoggingAttributes(Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "Old Name",
+ "customKey", "customValue"
+ ));
+
+ processGroup.setConnectorLoggingAttributes(Map.of(
+ "connectorId", "connector-1",
+ "connectorName", "New Name"
+ ));
+
+ final Map loggingAttributes = processGroup.getLoggingAttributes();
+ assertEquals("New Name", loggingAttributes.get("connectorName"));
+ assertFalse(loggingAttributes.containsKey("customKey"));
+ }
+
+ private StandardProcessGroup createStandardProcessGroup(final String id) {
+ return new StandardProcessGroup(
+ id,
+ controllerServiceProvider,
+ processScheduler,
+ propertyEncryptor,
+ extensionManager,
+ stateManagerProvider,
+ flowManager,
+ reloadComponent,
+ nodeTypeProvider,
+ properties,
+ statelessGroupNodeFactory,
+ assetManager,
+ null
+ );
+ }
+
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
index d8d06bc79efa..c25962e4e6c7 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/components/connector/ConnectorRepository.java
@@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;
@@ -211,4 +212,21 @@ void inheritConfiguration(ConnectorNode connector, ListIf no {@link ConnectorConfigurationProvider} is configured for this repository, this method
+ * returns {@link Map#of()}. Otherwise, it delegates to
+ * {@link ConnectorConfigurationProvider#getLoggingAttributes(String)}. Provider implementations
+ * that throw should be handled by the repository to avoid breaking connector creation; the
+ * framework will fall back to an empty map and log a warning.
+ *
+ * @param connectorId the identifier of the connector whose logging attributes are being requested
+ * @return an immutable map of attribute keys to values; never {@code null}
+ */
+ Map getLoggingAttributesForConnector(String connectorId);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
index b9659ebb4860..030f973eedac 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorNode.java
@@ -50,7 +50,9 @@
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.groups.ProcessGroup;
+import org.apache.nifi.groups.StandardProcessGroup;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.logging.GroupedComponent;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.nar.NarCloseable;
import org.apache.nifi.util.StringUtils;
@@ -81,9 +83,16 @@
import java.util.function.Function;
import java.util.stream.Collectors;
-public class StandardConnectorNode implements ConnectorNode {
+public class StandardConnectorNode implements ConnectorNode, GroupedComponent {
private static final Logger logger = LoggerFactory.getLogger(StandardConnectorNode.class);
+ /**
+ * Soft cardinality limit beyond which a WARN is emitted when a connector supplies custom logging
+ * attributes. Exceeding this threshold signals a potential metric/MDC cardinality risk for
+ * downstream observability backends but does not reject the attributes.
+ */
+ private static final int CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD = 10;
+
private final String identifier;
private final FlowManager flowManager;
private final ExtensionManager extensionManager;
@@ -109,6 +118,10 @@ public class StandardConnectorNode implements ConnectorNode {
private volatile String name;
private volatile FrameworkConnectorInitializationContext initializationContext;
+ private final Object loggingAttributesLock = new Object();
+ private volatile Map customLoggingAttributes = Map.of();
+ private volatile Map mergedLoggingAttributes = Map.of();
+
public StandardConnectorNode(final String identifier, final FlowManager flowManager, final ExtensionManager extensionManager,
final Authorizable parentAuthorizable, final ConnectorDetails connectorDetails, final String componentType, final String componentCanonicalClass,
@@ -133,6 +146,8 @@ public StandardConnectorNode(final String identifier, final FlowManager flowMana
final Bundle activeFlowBundle = new Bundle(bundleCoordinate.getGroup(), bundleCoordinate.getId(), bundleCoordinate.getVersion());
this.activeFlowContext = flowContextFactory.createActiveFlowContext(identifier, connectorDetails.getComponentLog(), activeFlowBundle);
+
+ rebuildLoggingAttributes();
}
@Override
@@ -143,6 +158,138 @@ public String getName() {
@Override
public void setName(final String name) {
this.name = name;
+ rebuildLoggingAttributes();
+ }
+
+ /**
+ * Returns the {@link ProcessGroup} that holds the connector-managed flow, satisfying the
+ * {@link GroupedComponent} contract so that {@code StandardLoggingContext} can source MDC
+ * attributes for components running inside the connector's managed flow.
+ */
+ @Override
+ public ProcessGroup getProcessGroup() {
+ final FrameworkFlowContext context = activeFlowContext;
+ return context == null ? null : context.getManagedProcessGroup();
+ }
+
+ /**
+ * Replaces the connector-supplied custom logging attributes. Reserved keys (those used by the
+ * framework, see {@link ConnectorLoggingAttribute}) are filtered out and a WARN is logged for
+ * each dropped entry. A WARN is also logged when the number of accepted custom keys exceeds
+ * {@value #CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD} to surface cardinality risk for
+ * MDC logging and OpenTelemetry metric labels.
+ *
+ * @param attributes the proposed custom attributes; {@code null} or empty clears the current set
+ */
+ public void setCustomLoggingAttributes(final Map attributes) {
+ final Map filtered = filterReservedKeys(attributes);
+
+ if (filtered.size() > CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD) {
+ logger.warn("{} supplied {} custom logging attributes which exceeds the soft threshold of {}; high-cardinality attributes can degrade MDC logging and OpenTelemetry metric backends",
+ this, filtered.size(), CUSTOM_LOGGING_ATTRIBUTE_CARDINALITY_WARN_THRESHOLD);
+ }
+
+ synchronized (loggingAttributesLock) {
+ this.customLoggingAttributes = filtered;
+ }
+ rebuildLoggingAttributes();
+ }
+
+ /**
+ * Returns an immutable snapshot of the merged framework + custom logging attributes currently
+ * advertised by this connector. The framework keys are populated by the framework from the
+ * connector's identifier, name, component type, and bundle coordinate.
+ */
+ public Map getLoggingAttributes() {
+ return mergedLoggingAttributes;
+ }
+
+ private Map filterReservedKeys(final Map attributes) {
+ if (attributes == null || attributes.isEmpty()) {
+ return Map.of();
+ }
+ final Map filtered = new HashMap<>(attributes.size());
+ for (final Map.Entry entry : attributes.entrySet()) {
+ final String key = entry.getKey();
+ if (key == null || key.isEmpty()) {
+ continue;
+ }
+ if (ConnectorLoggingAttribute.isReserved(key)) {
+ logger.warn("{} attempted to set reserved logging attribute [{}]; dropping the entry. Reserved keys are managed by the framework.", this, key);
+ continue;
+ }
+ filtered.put(key, entry.getValue());
+ }
+ return Collections.unmodifiableMap(filtered);
+ }
+
+ private void rebuildLoggingAttributes() {
+ final Map merged;
+ final Map custom;
+ synchronized (loggingAttributesLock) {
+ custom = customLoggingAttributes;
+ merged = new HashMap<>(custom.size() + ConnectorLoggingAttribute.values().length);
+ merged.putAll(custom);
+ // Framework keys are applied last so they always win against any not-yet-filtered overlap.
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_ID.attribute, identifier);
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_NAME.attribute, name == null ? "" : name);
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_COMPONENT.attribute, componentCanonicalClass == null ? "" : componentCanonicalClass);
+ if (bundleCoordinate != null) {
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_GROUP.attribute, bundleCoordinate.getGroup());
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_ARTIFACT.attribute, bundleCoordinate.getId());
+ merged.put(ConnectorLoggingAttribute.CONNECTOR_BUNDLE_VERSION.attribute, bundleCoordinate.getVersion());
+ }
+ this.mergedLoggingAttributes = Collections.unmodifiableMap(merged);
+ }
+
+ pushLoggingAttributesToManagedFlow(merged);
+ }
+
+ private void pushLoggingAttributesToManagedFlow(final Map attributes) {
+ final ProcessGroup managedProcessGroup = getProcessGroup();
+ if (managedProcessGroup instanceof StandardProcessGroup standardProcessGroup) {
+ standardProcessGroup.setConnectorLoggingAttributes(attributes);
+ }
+ }
+
+ /**
+ * Framework-managed MDC logging attribute keys for a connector. The framework computes these
+ * automatically and they cannot be overridden by a connector's custom attributes.
+ */
+ public enum ConnectorLoggingAttribute {
+ CONNECTOR_ID("connectorId"),
+ CONNECTOR_NAME("connectorName"),
+ CONNECTOR_COMPONENT("connectorComponent"),
+ CONNECTOR_BUNDLE_GROUP("connectorBundleGroup"),
+ CONNECTOR_BUNDLE_ARTIFACT("connectorBundleArtifact"),
+ CONNECTOR_BUNDLE_VERSION("connectorBundleVersion");
+
+ private static final Set RESERVED_KEYS;
+ static {
+ final Set reserved = new HashSet<>();
+ for (final ConnectorLoggingAttribute attribute : values()) {
+ reserved.add(attribute.attribute);
+ }
+ RESERVED_KEYS = Collections.unmodifiableSet(reserved);
+ }
+
+ private final String attribute;
+
+ ConnectorLoggingAttribute(final String attribute) {
+ this.attribute = attribute;
+ }
+
+ public String getAttribute() {
+ return attribute;
+ }
+
+ public static boolean isReserved(final String key) {
+ return RESERVED_KEYS.contains(key);
+ }
+
+ public static Set getReservedKeys() {
+ return RESERVED_KEYS;
+ }
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
index bdb527fecc81..4910ed5c528e 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/components/connector/StandardConnectorRepository.java
@@ -722,6 +722,21 @@ public SecretsManager getSecretsManager() {
return secretsManager;
}
+ @Override
+ public Map getLoggingAttributesForConnector(final String connectorId) {
+ if (configurationProvider == null) {
+ return Map.of();
+ }
+ try {
+ final Map attributes = configurationProvider.getLoggingAttributes(connectorId);
+ return attributes == null ? Map.of() : attributes;
+ } catch (final Exception e) {
+ logger.warn("ConnectorConfigurationProvider [{}] threw while computing logging attributes for connector [{}]; using empty map: {}",
+ configurationProvider.getClass().getSimpleName(), connectorId, e.getMessage(), e);
+ return Map.of();
+ }
+ }
+
@Override
public ConnectorStateTransition createStateTransition(final String type, final String id) {
final String componentDescription = "StandardConnectorNode[id=" + id + ", type=" + type + "]";
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
index 45ef9980d4b2..f73f5249adf1 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/ExtensionBuilder.java
@@ -521,10 +521,11 @@ public Resource getResource() {
}
final String componentType = connector.getClass().getSimpleName();
- final ComponentLog componentLog = new SimpleProcessLogger(identifier, connector, new StandardLoggingContext());
+ final StandardLoggingContext loggingContext = new StandardLoggingContext();
+ final ComponentLog componentLog = new SimpleProcessLogger(identifier, connector, loggingContext);
final ConnectorDetails connectorDetails = new ConnectorDetails(connector, bundleCoordinate, componentLog);
- final ConnectorNode connectorNode = new StandardConnectorNode(
+ final StandardConnectorNode connectorNode = new StandardConnectorNode(
identifier,
flowController.getFlowManager(),
extensionManager,
@@ -538,11 +539,20 @@ public Resource getResource() {
connectorValidationTrigger,
false
);
+ // Late-bind the logging context to the connector node so that MDC attributes assembled by
+ // the node (framework keys + provider-supplied custom keys) flow through the connector's
+ // own ComponentLog as well as the managed flow's nested components.
+ loggingContext.setComponent(connectorNode);
+
+ // Pull provider-sourced logging attributes (e.g. connectorDefinitionId) from the ConnectorRepository
+ // and merge them into the node before any logs are emitted from the connector or its managed flow.
+ final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier);
+ connectorNode.setCustomLoggingAttributes(providerAttributes);
try {
initializeDefaultValues(connector, connectorNode.getActiveFlowContext());
- final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog);
+ final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode);
connectorNode.initializeConnector(initContext);
} catch (final Exception e) {
logger.error("Could not initialize Connector of type {} from {} for ID {}; creating \"Ghost\" implementation", type, bundleCoordinate, identifier, e);
@@ -565,13 +575,14 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth
final GhostConnector ghostConnector = new GhostConnector(identifier, type, cause);
final String simpleClassName = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
final String componentType = "(Missing) " + simpleClassName;
- final ComponentLog componentLog = new SimpleProcessLogger(identifier, ghostConnector, new StandardLoggingContext());
+ final StandardLoggingContext loggingContext = new StandardLoggingContext();
+ final ComponentLog componentLog = new SimpleProcessLogger(identifier, ghostConnector, loggingContext);
final ConnectorDetails connectorDetails = new ConnectorDetails(ghostConnector, bundleCoordinate, componentLog);
// If an instance class loader has been created for this connector, remove it because it's no longer necessary.
extensionManager.removeInstanceClassLoader(identifier);
- final ConnectorNode connectorNode = new StandardConnectorNode(
+ final StandardConnectorNode connectorNode = new StandardConnectorNode(
identifier,
flowController.getFlowManager(),
extensionManager,
@@ -585,9 +596,15 @@ private ConnectorNode createGhostConnectorNode(final Authorizable connectorsAuth
connectorValidationTrigger,
true
);
+ loggingContext.setComponent(connectorNode);
+
+ // Pull provider-sourced logging attributes for the ghost connector as well so that MDC and
+ // status snapshots are consistent regardless of whether the real Connector implementation loaded.
+ final Map providerAttributes = flowController.getConnectorRepository().getLoggingAttributesForConnector(identifier);
+ connectorNode.setCustomLoggingAttributes(providerAttributes);
// Initialize the ghost connector so that it can be properly configured during flow synchronization
- final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog);
+ final FrameworkConnectorInitializationContext initContext = createConnectorInitializationContext(managedProcessGroup, componentLog, connectorNode);
connectorNode.initializeConnector(initContext);
return connectorNode;
@@ -613,7 +630,8 @@ private void initializeDefaultValues(final Connector connector, final FrameworkF
}
}
- private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog) {
+ private FrameworkConnectorInitializationContext createConnectorInitializationContext(final ProcessGroup managedProcessGroup, final ComponentLog componentLog,
+ final StandardConnectorNode connectorNode) {
final String name = type.contains(".") ? StringUtils.substringAfterLast(type, ".") : type;
return connectorInitializationContextBuilder
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
index 259aed264641..6a2c827c8229 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java
@@ -810,11 +810,12 @@ private FlowController(
}
eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository,
- auditService, analyticsEngine, flowFileRepository, contentRepository);
+ auditService, analyticsEngine, flowFileRepository, contentRepository, connectorRepository);
timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> {
try {
- statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date());
+ statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), eventAccess.getConnectorStatuses(),
+ getGarbageCollectionStatus(), new Date());
} catch (final Exception e) {
LOG.error("Failed to capture component stats for Stats History", e);
}
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
index ec233778b65a..3e12745328ec 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/reporting/StandardEventAccess.java
@@ -22,6 +22,10 @@
import org.apache.nifi.authorization.RequestAction;
import org.apache.nifi.authorization.resource.Authorizable;
import org.apache.nifi.authorization.user.NiFiUser;
+import org.apache.nifi.components.connector.ConnectorNode;
+import org.apache.nifi.components.connector.ConnectorRepository;
+import org.apache.nifi.components.connector.ConnectorSyncMode;
+import org.apache.nifi.components.connector.FrameworkFlowContext;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.flow.FlowManager;
@@ -31,6 +35,7 @@
import org.apache.nifi.controller.repository.FlowFileRepository;
import org.apache.nifi.controller.repository.RepositoryStatusReport;
import org.apache.nifi.controller.repository.metrics.EmptyFlowFileEvent;
+import org.apache.nifi.controller.status.ConnectorStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.controller.status.analytics.StatusAnalyticsEngine;
@@ -41,6 +46,8 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -55,10 +62,11 @@ public class StandardEventAccess extends AbstractEventAccess implements UserAwar
private final AuditService auditService;
private final FlowFileRepository flowFileRepository;
private final ContentRepository contentRepository;
+ private final ConnectorRepository connectorRepository;
public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRepository flowFileEventRepository, final ProcessScheduler processScheduler,
final Authorizer authorizer, final ProvenanceRepository provenanceRepository, final AuditService auditService, final StatusAnalyticsEngine statusAnalyticsEngine,
- final FlowFileRepository flowFileRepository, final ContentRepository contentRepository) {
+ final FlowFileRepository flowFileRepository, final ContentRepository contentRepository, final ConnectorRepository connectorRepository) {
super(processScheduler, statusAnalyticsEngine, flowManager, flowFileEventRepository);
this.flowFileEventRepository = flowFileEventRepository;
this.flowManager = flowManager;
@@ -67,6 +75,49 @@ public StandardEventAccess(final FlowManager flowManager, final FlowFileEventRep
this.auditService = auditService;
this.flowFileRepository = flowFileRepository;
this.contentRepository = contentRepository;
+ this.connectorRepository = connectorRepository;
+ }
+
+ /**
+ * Returns a {@link ConnectorStatus} for each registered connector, each carrying the connector identity along with
+ * the {@link ProcessGroupStatus} of the connector's managed root process group. Connector-managed flows live
+ * outside the controller's root process group, so they are invisible to {@link #getControllerStatus()} and would
+ * otherwise be skipped by standard reporting tasks.
+ */
+ @Override
+ public Collection getConnectorStatuses() {
+ if (connectorRepository == null) {
+ return Collections.emptyList();
+ }
+
+ final List connectors = connectorRepository.getConnectors(ConnectorSyncMode.LOCAL_ONLY);
+ if (connectors == null || connectors.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ final RepositoryStatusReport statusReport = generateRepositoryStatusReport();
+ final List statuses = new ArrayList<>(connectors.size());
+ for (final ConnectorNode connector : connectors) {
+ final FrameworkFlowContext context = connector.getActiveFlowContext();
+ if (context == null) {
+ continue;
+ }
+ final ProcessGroup managedGroup = context.getManagedProcessGroup();
+ if (managedGroup == null) {
+ continue;
+ }
+ final ProcessGroupStatus rootGroupStatus = getGroupStatus(managedGroup, statusReport,
+ authorizable -> true, Integer.MAX_VALUE, 1, true);
+ if (rootGroupStatus == null) {
+ continue;
+ }
+ final ConnectorStatus connectorStatus = new ConnectorStatus();
+ connectorStatus.setId(connector.getIdentifier());
+ connectorStatus.setName(connector.getName());
+ connectorStatus.setRootGroupStatus(rootGroupStatus);
+ statuses.add(connectorStatus);
+ }
+ return statuses;
}
@Override
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
index 00e136a66358..1c456e702bda 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorNode.java
@@ -783,6 +783,88 @@ public void testCancelDrainFlowFilesInterruptsConnector() throws Exception {
assertEquals(ConnectorState.STOPPED, connectorNode.getCurrentState());
}
+ @Test
+ public void testLoggingAttributesIncludeFrameworkKeys() throws Exception {
+ final SleepingConnector connector = new SleepingConnector(Duration.ofMillis(1));
+ final StandardConnectorNode connectorNode = createConnectorNode(connector);
+
+ final Map attributes = connectorNode.getLoggingAttributes();
+ assertEquals("test-connector-id", attributes.get("connectorId"));
+ assertEquals(connector.getClass().getCanonicalName(), attributes.get("connectorComponent"));
+ assertEquals("org.apache.nifi", attributes.get("connectorBundleGroup"));
+ assertEquals("test-standard-connector-node", attributes.get("connectorBundleArtifact"));
+ assertEquals("1.0.0", attributes.get("connectorBundleVersion"));
+ assertEquals(connector.getClass().getSimpleName(), attributes.get("connectorName"));
+ }
+
+ @Test
+ public void testSetNameRefreshesConnectorNameLoggingAttribute() throws Exception {
+ final StandardConnectorNode connectorNode = createConnectorNode();
+ connectorNode.setName("Renamed Connector");
+
+ assertEquals("Renamed Connector", connectorNode.getLoggingAttributes().get("connectorName"));
+ }
+
+ @Test
+ public void testSetCustomLoggingAttributesMergesWithFrameworkKeys() throws Exception {
+ final StandardConnectorNode connectorNode = createConnectorNode();
+ connectorNode.setCustomLoggingAttributes(Map.of(
+ "customA", "valueA",
+ "customB", "valueB"
+ ));
+
+ final Map attributes = connectorNode.getLoggingAttributes();
+ assertEquals("valueA", attributes.get("customA"));
+ assertEquals("valueB", attributes.get("customB"));
+ assertEquals("test-connector-id", attributes.get("connectorId"));
+ }
+
+ @Test
+ public void testSetCustomLoggingAttributesFiltersReservedKeys() throws Exception {
+ final StandardConnectorNode connectorNode = createConnectorNode();
+ connectorNode.setCustomLoggingAttributes(Map.of(
+ "connectorId", "attempted-override",
+ "connectorBundleVersion", "9.9.9",
+ "customA", "valueA"
+ ));
+
+ final Map attributes = connectorNode.getLoggingAttributes();
+ // Reserved keys are owned by the framework: connectorId stays as the framework value.
+ assertEquals("test-connector-id", attributes.get("connectorId"));
+ assertEquals("1.0.0", attributes.get("connectorBundleVersion"));
+ // Non-reserved custom keys are preserved.
+ assertEquals("valueA", attributes.get("customA"));
+ }
+
+ @Test
+ public void testSetCustomLoggingAttributesNullClears() throws Exception {
+ final StandardConnectorNode connectorNode = createConnectorNode();
+ connectorNode.setCustomLoggingAttributes(Map.of("customA", "valueA"));
+ assertTrue(connectorNode.getLoggingAttributes().containsKey("customA"));
+
+ connectorNode.setCustomLoggingAttributes(null);
+ assertFalse(connectorNode.getLoggingAttributes().containsKey("customA"));
+ // Framework keys remain.
+ assertEquals("test-connector-id", connectorNode.getLoggingAttributes().get("connectorId"));
+ }
+
+ @Test
+ public void testReservedKeysExposeFrameworkAttributeNames() {
+ final Set reserved = StandardConnectorNode.ConnectorLoggingAttribute.getReservedKeys();
+ assertTrue(reserved.contains("connectorId"));
+ assertTrue(reserved.contains("connectorName"));
+ assertTrue(reserved.contains("connectorComponent"));
+ assertTrue(reserved.contains("connectorBundleGroup"));
+ assertTrue(reserved.contains("connectorBundleArtifact"));
+ assertTrue(reserved.contains("connectorBundleVersion"));
+ }
+
+ @Test
+ public void testGetProcessGroupReturnsManagedFlowRoot() throws Exception {
+ final StandardConnectorNode connectorNode = createConnectorNode();
+ assertEquals(managedProcessGroup, connectorNode.getProcessGroup());
+ }
+
private StandardConnectorNode createConnectorNode() throws FlowUpdateException {
final SleepingConnector sleepingConnector = new SleepingConnector(Duration.ofMillis(1));
return createConnectorNode(sleepingConnector);
diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
index cba4b12f5741..a8e661a78dfb 100644
--- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
+++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/components/connector/TestStandardConnectorRepository.java
@@ -1364,6 +1364,58 @@ private static ParameterProviderNode mockParameterProvider(final String name, fi
return node;
}
+ // --- getLoggingAttributesForConnector tests ---
+
+ @Test
+ public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenNoProvider() {
+ final StandardConnectorRepository repository = new StandardConnectorRepository();
+ final ConnectorRepositoryInitializationContext initContext = mock(ConnectorRepositoryInitializationContext.class);
+ when(initContext.getExtensionManager()).thenReturn(mock(ExtensionManager.class));
+ when(initContext.getConnectorConfigurationProvider()).thenReturn(null);
+ repository.initialize(initContext);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ public void testGetLoggingAttributesForConnectorDelegatesToProvider() {
+ final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class);
+ final Map expected = Map.of("connectorDefinitionId", "snowflake.runtime.postgres-cdc");
+ when(provider.getLoggingAttributes("connector-1")).thenReturn(expected);
+
+ final StandardConnectorRepository repository = createRepositoryWithProvider(provider);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertEquals(expected, attributes);
+ verify(provider).getLoggingAttributes("connector-1");
+ }
+
+ @Test
+ public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderReturnsNull() {
+ final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class);
+ when(provider.getLoggingAttributes(anyString())).thenReturn(null);
+
+ final StandardConnectorRepository repository = createRepositoryWithProvider(provider);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
+ @Test
+ public void testGetLoggingAttributesForConnectorReturnsEmptyMapWhenProviderThrows() {
+ final ConnectorConfigurationProvider provider = mock(ConnectorConfigurationProvider.class);
+ when(provider.getLoggingAttributes(anyString())).thenThrow(new RuntimeException("provider boom"));
+
+ final StandardConnectorRepository repository = createRepositoryWithProvider(provider);
+
+ final Map attributes = repository.getLoggingAttributesForConnector("connector-1");
+ assertNotNull(attributes);
+ assertTrue(attributes.isEmpty());
+ }
+
// --- Helper Methods ---
private StandardConnectorRepository createRepositoryWithProviderAndAssetManager(