Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion nifi-docs/src/main/asciidoc/administration-guide.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -211,11 +211,25 @@ Component logs provide the following MDC named values:
- `processGroupName` contains the name of the Process Group
- `processGroupNamePath` contains of the hierarchy of names for Process Groups with separators

Components that run inside a Connector-managed flow also carry framework-supplied MDC values that identify the owning
Connector:

- `connectorId` contains the UUID of the Connector
- `connectorName` contains the user-visible name of the Connector
- `connectorComponent` contains the fully qualified class name of the Connector implementation
- `connectorBundleGroup`, `connectorBundleArtifact`, `connectorBundleVersion` identify the NAR bundle the Connector was
loaded from

Additional implementation-specific MDC values may be supplied by the framework
`ConnectorConfigurationProvider` extension via `getLoggingAttributes(String connectorId)`. The framework consults the
provider at connector creation time and merges the result with the framework-managed keys above. Keys reserved by the
framework (those listed above) cannot be overridden; attempts to do so are dropped and logged as a `WARN`.

MDC named values can be added to a Logback pattern layout using the `mdc` conversion word.

[source, xml]
----
<pattern>%date %level [%thread] %mdc{processGroupId} %logger{40} %msg%n</pattern>
<pattern>%date %level [%thread] %mdc{connectorId} %mdc{processGroupId} %logger{40} %msg%n</pattern>
----

Logs from classes other than extension components do not have MDC named values. Logs formatted using the pattern layout
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.nifi.flow.ScheduledState;

import java.io.InputStream;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -174,6 +175,34 @@ default boolean shouldApplyUpdate(final String connectorId) {
return true;
}

/**
* Returns identity-shaped logging attributes the framework should include in the SLF4J {@code MDC} for
* every log line emitted by the Connector with the given identifier and by any component (Processor,
* Controller Service, etc.) running inside its managed flow. The same attributes are also snapshotted
* onto {@code ProcessGroupStatus} so that status-based reporting tasks (e.g. OpenTelemetry) can attach
* them as metric attributes.
*
* <p>The framework reserves a set of well-known keys that describe the Connector itself (identity,
* bundle coordinate, etc.). Any entry in the returned map whose key collides with a reserved
* framework-managed key is dropped by the framework and a {@code WARN} is logged for that entry;
* the remaining entries are accepted.</p>
*
* <p>The framework consults this method once, at connector create time. It is the provider's
* responsibility to ensure the returned values are stable for the lifetime of the connector
* instance or to publish updates through other means; mid-lifecycle refresh is not currently
* defined.</p>
*
* <p>The default returns {@link Map#of()} so that providers that do not need to publish additional
* attributes (and runtimes that have no provider configured) get only the framework-managed identity
* keys in MDC.</p>
*
* @param connectorId the identifier of the connector whose logging attributes are being requested
* @return an immutable map of attribute keys to values; never {@code null}
*/
default Map<String, String> getLoggingAttributes(final String connectorId) {
return Map.of();
}

/**
* Ensures that local asset binaries are up to date with the external store. For each asset
* tracked in the provider's local state, this method compares the external store's current
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
*/
package org.apache.nifi.controller.status.history;

import org.apache.nifi.controller.status.ConnectorStatus;
import org.apache.nifi.controller.status.NodeStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;

import java.util.Collection;
import java.util.Date;
import java.util.List;

Expand Down Expand Up @@ -48,6 +50,25 @@ public interface StatusHistoryRepository {
*/
void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, List<GarbageCollectionStatus> garbageCollectionStatus, Date timestamp);

/**
* Captures the status information provided in the given report, additionally providing the status of each
* Connector-managed flow. Connector-managed Process Groups are siblings of the root group and are not reachable
* from {@code rootGroupStatus}, so this overload allows implementations to observe them (for example, to export
* Connector-managed flow metrics). The default implementation ignores {@code connectorStatuses} and delegates to
* {@link #capture(NodeStatus, ProcessGroupStatus, List, Date)} so existing implementations continue to work
* unchanged.
*
* @param nodeStatus status of the node
* @param rootGroupStatus status of the root group and its content
* @param connectorStatuses status of each registered Connector, each carrying its managed root group status
* @param garbageCollectionStatus status of garbage collection
* @param timestamp timestamp of capture
*/
default void capture(NodeStatus nodeStatus, ProcessGroupStatus rootGroupStatus, Collection<ConnectorStatus> connectorStatuses,
List<GarbageCollectionStatus> garbageCollectionStatus, Date timestamp) {
capture(nodeStatus, rootGroupStatus, garbageCollectionStatus, timestamp);
}

/**
* @param connectionId the ID of the Connection for which the Status is
* desired
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ public final class StandardProcessGroup implements ProcessGroup {
private static final String UNREGISTERED_PATH_SEGMENT = "UNREGISTERED";

private final Map<String, String> loggingAttributes = new ConcurrentHashMap<>();
private volatile Map<String, String> connectorLoggingAttributes = Map.of();
private volatile String logFileSuffix;

public StandardProcessGroup(final String id, final ControllerServiceProvider serviceProvider, final ProcessScheduler scheduler,
Expand Down Expand Up @@ -303,6 +304,14 @@ public ProcessGroup getParent() {
@Override
public void setParent(final ProcessGroup newParent) {
parent.set(newParent);
// Inherit connector-supplied MDC attributes from the new parent. Descendants of a connector's
// managed flow root must carry the same connectorId/connectorName/etc. so that logs and OTel
// metrics emitted by processors inside the connector flow can be attributed to the connector.
// This runs each time the PG is re-parented (including initial attach), so new PGs added to
// an existing connector flow inherit automatically.
if (newParent instanceof StandardProcessGroup standardParent) {
this.connectorLoggingAttributes = standardParent.connectorLoggingAttributes;
}
setLoggingAttributes();
}

Expand Down Expand Up @@ -4705,6 +4714,47 @@ private void setLoggingAttributes() {
final String registeredFlowVersion = currentVersionControl.getVersion();
loggingAttributes.put(LoggingAttribute.REGISTERED_FLOW_VERSION.attribute, registeredFlowVersion);
}

// Merge connector-supplied attributes last so they are exposed to MDC/OTel snapshots taken
// from this PG. PG-level keys are added first to avoid being shadowed in case a connector
// ever tried to override them; the reserved-key filter on the connector side also enforces
// this separation defensively.
loggingAttributes.putAll(connectorLoggingAttributes);
}

/**
* Stores the connector-managed MDC attributes for this process group and cascades the same
* attributes to all descendant process groups so that components anywhere in the connector's
* managed flow log with consistent connectorId/connectorName/etc. context.
*
* <p>This method is called by {@code StandardConnectorNode} against its managed root process
* group whenever the connector's framework keys (e.g. {@code connectorName}) change or when the
* connector provides updated custom logging attributes. Newly created descendant groups will
* also inherit the attributes lazily via {@link #setParent(ProcessGroup)}.</p>
*
* @param attributes the merged set of connector logging attributes; an empty or {@code null}
* map clears any previously assigned attributes
*/
public void setConnectorLoggingAttributes(final Map<String, String> attributes) {
final Map<String, String> snapshot = (attributes == null || attributes.isEmpty())
? Map.of()
: Map.copyOf(attributes);
this.connectorLoggingAttributes = snapshot;
setLoggingAttributes();

for (final ProcessGroup child : getProcessGroups()) {
if (child instanceof StandardProcessGroup standardChild) {
standardChild.setConnectorLoggingAttributes(snapshot);
}
}
}

/**
* Returns the connector-supplied MDC attributes assigned to this process group, if any. Returns
* an empty map when this PG is not inside a connector-managed flow.
*/
public Map<String, String> getConnectorLoggingAttributes() {
return connectorLoggingAttributes;
}

private void setGroupPath() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -238,4 +239,110 @@ void testGetLoggingAttributesWithVersionControlInformation() {

assertEquals(expected, loggingAttributes);
}

@Test
void testSetConnectorLoggingAttributesMergesIntoLoggingAttributes() {
processGroup.setName(NAME);

final Map<String, String> connectorAttributes = Map.of(
"connectorId", "connector-1",
"connectorName", "My Connector",
"connectorComponent", "com.example.MyConnector"
);

processGroup.setConnectorLoggingAttributes(connectorAttributes);

final Map<String, String> loggingAttributes = processGroup.getLoggingAttributes();
assertEquals("connector-1", loggingAttributes.get("connectorId"));
assertEquals("My Connector", loggingAttributes.get("connectorName"));
assertEquals("com.example.MyConnector", loggingAttributes.get("connectorComponent"));
assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
assertEquals(ID, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_ID.getAttribute()));
}

@Test
void testSetConnectorLoggingAttributesCascadesToChildProcessGroups() {
processGroup.setName(NAME);

final StandardProcessGroup child = createStandardProcessGroup("child-id");
child.setName("Child");
processGroup.addProcessGroup(child);

final Map<String, String> connectorAttributes = Map.of(
"connectorId", "connector-1",
"connectorName", "Postgres CDC"
);

processGroup.setConnectorLoggingAttributes(connectorAttributes);

assertEquals("connector-1", child.getLoggingAttributes().get("connectorId"));
assertEquals("Postgres CDC", child.getLoggingAttributes().get("connectorName"));
}

@Test
void testAddProcessGroupInheritsConnectorLoggingAttributesFromParent() {
processGroup.setName(NAME);
processGroup.setConnectorLoggingAttributes(Map.of(
"connectorId", "connector-1",
"connectorName", "Postgres CDC"
));

final StandardProcessGroup lateChild = createStandardProcessGroup("late-child-id");
lateChild.setName("Late Child");
processGroup.addProcessGroup(lateChild);

assertEquals("connector-1", lateChild.getLoggingAttributes().get("connectorId"));
assertEquals("Postgres CDC", lateChild.getLoggingAttributes().get("connectorName"));
assertEquals(Map.copyOf(processGroup.getConnectorLoggingAttributes()), lateChild.getConnectorLoggingAttributes());
}

@Test
void testEmptyConnectorLoggingAttributesAddsNothing() {
processGroup.setName(NAME);
processGroup.setConnectorLoggingAttributes(Map.of());

final Map<String, String> loggingAttributes = processGroup.getLoggingAttributes();
assertFalse(loggingAttributes.containsKey("connectorId"));
assertTrue(processGroup.getConnectorLoggingAttributes().isEmpty());
// Verify the PG-level keys are still present.
assertEquals(NAME, loggingAttributes.get(StandardProcessGroup.LoggingAttribute.PROCESS_GROUP_NAME.getAttribute()));
}

@Test
void testSetConnectorLoggingAttributesReplacesPreviousValues() {
processGroup.setName(NAME);
processGroup.setConnectorLoggingAttributes(Map.of(
"connectorId", "connector-1",
"connectorName", "Old Name",
"customKey", "customValue"
));

processGroup.setConnectorLoggingAttributes(Map.of(
"connectorId", "connector-1",
"connectorName", "New Name"
));

final Map<String, String> loggingAttributes = processGroup.getLoggingAttributes();
assertEquals("New Name", loggingAttributes.get("connectorName"));
assertFalse(loggingAttributes.containsKey("customKey"));
}

private StandardProcessGroup createStandardProcessGroup(final String id) {
return new StandardProcessGroup(
id,
controllerServiceProvider,
processScheduler,
propertyEncryptor,
extensionManager,
stateManagerProvider,
flowManager,
reloadComponent,
nodeTypeProvider,
properties,
statelessGroupNodeFactory,
assetManager,
null
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Future;

Expand Down Expand Up @@ -211,4 +212,21 @@ void inheritConfiguration(ConnectorNode connector, List<VersionedConfigurationSt
* @param connector the connector whose assets should be synced
*/
void syncAssetsFromProvider(ConnectorNode connector);

/**
* Returns provider-sourced logging attributes for the connector with the given identifier. The
* framework calls this once at connector create time and merges the result into the MDC context
* for the connector's {@link org.apache.nifi.logging.ComponentLog} and the loggers of every
* component running inside its managed flow.
*
* <p>If no {@link ConnectorConfigurationProvider} is configured for this repository, this method
* returns {@link Map#of()}. Otherwise, it delegates to
* {@link ConnectorConfigurationProvider#getLoggingAttributes(String)}. Provider implementations
* that throw should be handled by the repository to avoid breaking connector creation; the
* framework will fall back to an empty map and log a warning.</p>
*
* @param connectorId the identifier of the connector whose logging attributes are being requested
* @return an immutable map of attribute keys to values; never {@code null}
*/
Map<String, String> getLoggingAttributesForConnector(String connectorId);
}
Loading
Loading