Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions .cursor/rules/code-style.mdc
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,79 @@ final List<String> result = myList.stream()
when the logic is not simple and straightforward. The stream API is powerful but can be difficult to
read when overused or used in complex scenarios. Functional style is best used when the logic is simple
and chains together no more than 3-4 operations.
16. Always place a blank line after a closing brace (`}`) that ends a control-flow construct (such as an
`if`, `else`, `for`, `while`, `switch`, or `try` / `catch` / `finally` block) when the next line is a
new statement or another control-flow construct at the same indentation level. This also applies to
the line following the closing brace of a nested block within a method. The goal is to clearly
separate logical sections of code. Exceptions:
- No blank line is required immediately before a closing brace of the enclosing block.
- No blank line is required between `} else {`, `} else if (...) {`, `} catch (...) {`, or
`} finally {` on the same chain.

Bad:
```java
for (final Connector connector : connectors) {
if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
continue;
}
final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
if (flowContext != null && flowContext.getManagedProcessGroup().findFunnel(funnelId) != null) {
return true;
}
}
return false;
```

Good:
```java
for (final Connector connector : connectors) {
if (connector.getCurrentState() != ConnectorState.TROUBLESHOOTING) {
continue;
}

final FrameworkFlowContext flowContext = connector.getActiveFlowContext();
if (flowContext != null && flowContext.getManagedProcessGroup().findFunnel(funnelId) != null) {
return true;
}
}

return false;
```
17. Rule #11 (prefer importing a class rather than using a fully qualified classname inline) is not
optional. Even when a class is referenced only a single time, add an `import` statement rather than
referring to it by fully qualified name inline. The only acceptable use of a fully qualified
classname is when there is an unavoidable naming conflict with another imported class in the same
file. Fully qualified references scattered throughout code make it much harder to read and maintain.
18. Never combine a negative condition such as `if (x != null)`, `if (!collection.isEmpty())`, or
`if (!flag)` with an `else` clause. Negated conditions are harder to read, and pairing them with an
`else` branch forces the reader to mentally invert the predicate for the "happy path". Instead,
rewrite the condition in the positive form (for example, swap the branches so the `if` tests the
positive condition), or use an early `return`/`continue`/`throw` to eliminate the `else` entirely.

Bad:
```java
if (persistedManagedGroup != null) {
restore(persistedManagedGroup);
} else {
logger.warn("No snapshot was persisted; leaving Managed Process Group unchanged");
}
```

Good (positive predicate):
```java
if (persistedManagedGroup == null) {
logger.warn("No snapshot was persisted; leaving Managed Process Group unchanged");
} else {
restore(persistedManagedGroup);
}
```

Or, preferably, use an early action and drop the `else`:
```java
if (persistedManagedGroup == null) {
logger.warn("No snapshot was persisted; leaving Managed Process Group unchanged");
return;
}

restore(persistedManagedGroup);
```
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public VersionedExternalFlow getInitialFlow() {
return VersionedFlowUtils.loadFlowFromResource("flows/Generate_and_Update.json");
}

@Override
public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
return getInitialFlow();
}

@Override
public List<ConfigurationStep> getConfigurationSteps() {
return CONFIGURATION_STEPS;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,19 @@ public VersionedExternalFlow getInitialFlow() {
return VersionedFlowUtils.loadFlowFromResource("flows/Cron_Schedule_Connector.json");
}

@Override
public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
// The authoritative Active flow is the flow template with the currently configured CRON expression
// applied as the Trigger Schedule parameter value.
final VersionedExternalFlow flow = getInitialFlow();
final String triggerSchedule = activeFlowContext.getConfigurationContext().getProperty(SCHEDULE_STEP_NAME, TRIGGER_SCHEDULE_PARAM).getValue();
if (triggerSchedule != null) {
VersionedFlowUtils.setParameterValue(flow, TRIGGER_SCHEDULE_PARAM, triggerSchedule);
}

return flow;
}

@Override
public List<ConfigurationStep> getConfigurationSteps() {
return List.of(SCHEDULE_STEP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ public VersionedExternalFlow getInitialFlow() {
return VersionedFlowUtils.loadFlowFromResource("flows/Generate_and_Update.json");
}

@Override
public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
return getInitialFlow();
}

@Override
protected void onStepConfigured(final String stepName, final FlowContext flowContext) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ public VersionedExternalFlow getInitialFlow() {
return externalFlow;
}

@Override
public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
return getInitialFlow();
}

@Override
protected void onStepConfigured(final String stepName, final FlowContext flowContext) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,14 @@ public VersionedExternalFlow getInitialFlow() {
return KafkaToS3FlowBuilder.loadInitialFlow();
}

@Override
public VersionedExternalFlow getActiveFlow(final FlowContext activeFlowContext) {
// The authoritative Active flow for this Connector is the flow that is produced by applying the
// current configuration to the Kafka-to-S3 flow template. When the user exits Troubleshooting mode,
// this flow is reinstated to discard any manual edits made to the managed Process Group.
return buildFlow(activeFlowContext.getConfigurationContext());
}

@Override
public void onStepConfigured(final String stepName, final FlowContext workingContext) throws FlowUpdateException {
final VersionedExternalFlow flow = buildFlow(workingContext.getConfigurationContext());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.nifi.components.connector;

import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedConnectorState;

import java.io.InputStream;
import java.util.Optional;
Expand Down Expand Up @@ -101,6 +101,25 @@ public interface ConnectorConfigurationProvider {
*/
void verifyCreate(String connectorId);

/**
* Verifies that the provider allows the connector with the given identifier to transition into
* Troubleshooting mode. This is called before the framework transitions a Connector into
* Troubleshooting, giving the provider an opportunity to reject the operation (for example,
* because the provider is currently performing a concurrent operation on the connector or the
* external system does not permit troubleshooting at this time).
*
* <p>If the provider cannot support the operation, it should throw a runtime exception
* (for example, {@link IllegalStateException} or
* {@link ConnectorConfigurationProviderException}) describing why the transition is not
* allowed. The exception message will be surfaced to the caller.</p>
*
* <p>The default implementation is a no-op, allowing the transition to proceed.</p>
*
* @param connectorId the identifier of the connector that is being transitioned into Troubleshooting mode
*/
default void verifyEnterTroubleshooting(final String connectorId) {
}

/**
* Determines how the connector repository should handle synchronization for the given
* connector during flow inheritance (cluster join). The provider examines the external
Expand All @@ -112,7 +131,7 @@ public interface ConnectorConfigurationProvider {
* <ul>
* <li>A {@link ConnectorWorkingConfiguration} with the provider's working config and name
* (overriding the potentially stale values from the versioned flow)</li>
* <li>A {@link ScheduledState} override (correcting stale run intent from the versioned flow)</li>
* <li>A {@link VersionedConnectorState} override (correcting stale run intent from the versioned flow)</li>
* </ul>
*
* <p>This method combines the verify and load operations into a single call to avoid
Expand All @@ -123,10 +142,10 @@ public interface ConnectorConfigurationProvider {
* behavior for Apache NiFi when no provider is configured.</p>
*
* @param connectorId the identifier of the connector to check
* @param proposedScheduledState the ScheduledState from the versioned flow
* @param proposedScheduledState the scheduled state from the versioned flow
* @return a directive indicating how to handle this connector during sync
*/
default ConnectorSyncDirective getSyncDirective(final String connectorId, final ScheduledState proposedScheduledState) {
default ConnectorSyncDirective getSyncDirective(final String connectorId, final VersionedConnectorState proposedScheduledState) {
return ConnectorSyncDirective.allow();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.nifi.components.connector;

import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedConnectorState;

/**
* Directive returned by {@link ConnectorConfigurationProvider#getSyncDirective(String, ScheduledState)}
* Directive returned by {@link ConnectorConfigurationProvider#getSyncDirective(String, VersionedConnectorState)}
* indicating how the connector repository should handle synchronization for a connector during
* flow inheritance.
*/
Expand All @@ -32,7 +32,7 @@ public class ConnectorSyncDirective {
public enum Action {
/**
* Proceed with synchronization. The directive may optionally include a
* {@link ScheduledState} override and/or a {@link ConnectorWorkingConfiguration}
* {@link VersionedConnectorState} override and/or a {@link ConnectorWorkingConfiguration}
* containing the provider's working config and name.
*/
ALLOW,
Expand All @@ -57,10 +57,10 @@ public enum Action {
private static final ConnectorSyncDirective REMOVE_DIRECTIVE = new ConnectorSyncDirective(Action.REMOVE, null, null);

private final Action action;
private final ScheduledState scheduledStateOverride;
private final VersionedConnectorState scheduledStateOverride;
private final ConnectorWorkingConfiguration workingConfiguration;

private ConnectorSyncDirective(final Action action, final ScheduledState scheduledStateOverride,
private ConnectorSyncDirective(final Action action, final VersionedConnectorState scheduledStateOverride,
final ConnectorWorkingConfiguration workingConfiguration) {
this.action = action;
this.scheduledStateOverride = scheduledStateOverride;
Expand All @@ -69,7 +69,7 @@ private ConnectorSyncDirective(final Action action, final ScheduledState schedul

/**
* Returns an ALLOW directive with no overrides. The connector repository will use the
* versioned flow's name, working config, and ScheduledState as-is. This is the default
* versioned flow's name, working config, and scheduled state as-is. This is the default
* behavior when no {@link ConnectorConfigurationProvider} is configured (Apache NiFi).
*/
public static ConnectorSyncDirective allow() {
Expand All @@ -78,7 +78,7 @@ public static ConnectorSyncDirective allow() {

/**
* Returns an ALLOW directive with the provider's working configuration (name + working
* config steps) and no ScheduledState override.
* config steps) and no scheduled state override.
*
* @param workingConfiguration the provider's working configuration including name
*/
Expand All @@ -88,14 +88,14 @@ public static ConnectorSyncDirective allow(final ConnectorWorkingConfiguration w

/**
* Returns an ALLOW directive with the provider's working configuration and a
* ScheduledState override. The override replaces the versioned flow's ScheduledState,
* scheduled state override. The override replaces the versioned flow's scheduled state,
* which may be stale due to in-flight DPS tasks.
*
* @param workingConfiguration the provider's working configuration including name
* @param scheduledStateOverride the ScheduledState to use instead of the versioned flow's value
* @param scheduledStateOverride the scheduled state to use instead of the versioned flow's value
*/
public static ConnectorSyncDirective allow(final ConnectorWorkingConfiguration workingConfiguration,
final ScheduledState scheduledStateOverride) {
final VersionedConnectorState scheduledStateOverride) {
return new ConnectorSyncDirective(Action.ALLOW, scheduledStateOverride, workingConfiguration);
}

Expand All @@ -119,10 +119,10 @@ public Action getAction() {
}

/**
* Returns the ScheduledState override, or {@code null} if the versioned flow's
* ScheduledState should be used.
* Returns the scheduled state override, or {@code null} if the versioned flow's
* scheduled state should be used.
*/
public ScheduledState getScheduledStateOverride() {
public VersionedConnectorState getScheduledStateOverride() {
return scheduledStateOverride;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ public class ConnectorEndpointMerger extends AbstractSingleEntityEndpoint<Connec
public static final Pattern CONNECTORS_URI_PATTERN = Pattern.compile("/nifi-api/connectors");
public static final Pattern CONNECTOR_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}");
public static final Pattern CONNECTOR_RUN_STATUS_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/run-status");
public static final Pattern CONNECTOR_TROUBLESHOOTING_URI_PATTERN = Pattern.compile("/nifi-api/connectors/[a-f0-9\\-]{36}/troubleshooting");

@Override
public boolean canHandle(final URI uri, final String method) {
if (("GET".equalsIgnoreCase(method) || "PUT".equalsIgnoreCase(method)) && (CONNECTOR_URI_PATTERN.matcher(uri.getPath()).matches())) {
return true;
} else if ("PUT".equalsIgnoreCase(method) && CONNECTOR_RUN_STATUS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if (("POST".equalsIgnoreCase(method) || "DELETE".equalsIgnoreCase(method)) && CONNECTOR_TROUBLESHOOTING_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
} else if ("POST".equalsIgnoreCase(method) && CONNECTORS_URI_PATTERN.matcher(uri.getPath()).matches()) {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.nifi.authorization.resource.ResourceFactory;
import org.apache.nifi.authorization.resource.ResourceType;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.connector.ConnectorNode;
import org.apache.nifi.components.state.StateManager;
import org.apache.nifi.components.state.StateManagerProvider;
import org.apache.nifi.components.validation.ValidationStatus;
Expand Down Expand Up @@ -346,6 +347,22 @@ public Optional<String> getConnectorIdentifier() {
return Optional.ofNullable(connectorId);
}

@Override
public Optional<ConnectorNode> findOwningConnector() {
ProcessGroup group = this;
while (group != null) {
final Optional<String> owningConnectorId = group.getConnectorIdentifier();
if (owningConnectorId.isPresent()) {
final ConnectorNode connectorNode = flowManager.getConnector(owningConnectorId.get());
return Optional.ofNullable(connectorNode);
}

group = group.getParent();
}

return Optional.empty();
}

@Override
public void setPosition(final Position position) {
this.position.set(position);
Expand Down Expand Up @@ -3898,6 +3915,48 @@ public void updateFlow(final VersionedExternalFlow proposedSnapshot, final Strin
synchronizeFlow(proposedSnapshot, synchronizationOptions, flowMappingOptions);
}

@Override
public void restoreFlowPreservingIdentifiers(final VersionedExternalFlow proposedSnapshot) {
// Use the Instance Identifier captured in the persisted flow as the runtime identifier for every component. This is
// required so that Connection identifiers (and therefore FlowFile queue identifiers) match what was in use before
// the flow was persisted. Without this, queued FlowFiles in the FlowFile Repository cannot be re-associated with
// their Connections upon restore.
final ComponentIdGenerator idGenerator = (proposedId, instanceId, destinationGroupId) -> instanceId;
final VersionedComponentStateLookup stateLookup = VersionedComponentStateLookup.IDENTITY_LOOKUP;
final ComponentScheduler componentScheduler = new DefaultComponentScheduler(controllerServiceProvider, stateLookup);

final FlowSynchronizationOptions synchronizationOptions = new FlowSynchronizationOptions.Builder()
.componentIdGenerator(idGenerator)
.componentComparisonIdLookup(VersionedComponent::getInstanceIdentifier)
.componentScheduler(componentScheduler)
.ignoreLocalModifications(true)
.updateDescendantVersionedFlows(true)
.updateGroupSettings(true)
.updateRpgUrls(false)
.propertyDecryptor(encryptor::decrypt)
.build();

// Sensitive property values in the proposed snapshot were encrypted using the same PropertyEncryptor when the snapshot
// was persisted (for example, when a Connector-managed flow is persisted in Troubleshooting mode). The currently loaded
// flow therefore must also be mapped with an equivalent SensitiveValueEncryptor so the comparison between "current" and
// "proposed" sensitive values operates on matching ciphertext; otherwise every sensitive property appears to differ and
// the decrypted value written back to the live component is the encrypted payload rather than the plaintext (or parameter
// reference) that was originally captured.
final FlowMappingOptions flowMappingOptions = new FlowMappingOptions.Builder()
.mapSensitiveConfiguration(true)
.mapPropertyDescriptors(true)
.stateLookup(stateLookup)
.sensitiveValueEncryptor(encryptor::encrypt)
.componentIdLookup(ComponentIdLookup.VERSIONED_OR_GENERATE)
.mapInstanceIdentifiers(true)
.mapControllerServiceReferencesToVersionedId(true)
.mapFlowRegistryClientId(false)
.mapAssetReferences(false)
.build();

synchronizeFlow(proposedSnapshot, synchronizationOptions, flowMappingOptions);
}

private ProcessContext createProcessContext(final ProcessorNode processorNode) {
final org.apache.nifi.processor.Processor processor = processorNode.getProcessor();
final Class<?> componentClass = processor == null ? null : processor.getClass();
Expand Down
Loading
Loading