Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@ public class ParameterProviderSecretsManager implements SecretsManager {
private Duration cacheDuration;
private final Map<String, CachedSecret> secretCache = new ConcurrentHashMap<>();

// Per-ParameterProvider-id deduplication for the WARN log emitted whenever a SecretReference
// resolution is skipped because its backing provider is not VALID. An entry is present for any
// provider id we have already surfaced via WARN; the value is the status that was logged so the
// same WARN is not repeated until the status either changes or the provider returns to VALID.
private final Map<String, ValidationStatus> lastLoggedStatus = new ConcurrentHashMap<>();

private record CachedSecret(Secret secret, long timestampNanos) {
}

Expand Down Expand Up @@ -86,10 +92,15 @@ public Set<SecretProvider> getSecretProviders() {
validationStatus = parameterProviderNode.performValidation();
}
if (validationStatus != ValidationStatus.VALID) {
logger.debug("Will not use Parameter Provider {} as a Secret Provider because it is not valid", parameterProviderNode.getName());
logSkippedInvalidProvider(parameterProviderNode, validationStatus);
continue;
}

// Clear the WARN-dedup state so that a future INVALID transition for this provider is logged again.
final String providerId = parameterProviderNode.getIdentifier();
if (providerId != null) {
lastLoggedStatus.remove(providerId);
}
providers.add(new ParameterProviderSecretProvider(parameterProviderNode));
}

Expand Down Expand Up @@ -143,26 +154,23 @@ public Map<SecretReference, Secret> getSecrets(final Set<SecretReference> secret
private Map<SecretReference, Secret> fetchSecretsWithoutCache(final Set<SecretReference> secretReferences) {
final Set<SecretProvider> providers = getSecretProviders();

// Partition secret references by Provider
// Partition secret references by Provider. References whose provider is non-VALID or absent
// are recorded with a null Secret so that callers receive an explicit entry for every input.
final Map<SecretProvider, Set<SecretReference>> referencesByProvider = new HashMap<>();
final Map<SecretReference, Secret> secrets = new HashMap<>();
for (final SecretReference secretReference : secretReferences) {
final SecretProvider provider = findProvider(secretReference, providers);
referencesByProvider.computeIfAbsent(provider, k -> new HashSet<>()).add(secretReference);
if (provider == null) {
secrets.put(secretReference, null);
} else {
referencesByProvider.computeIfAbsent(provider, k -> new HashSet<>()).add(secretReference);
}
}

final Map<SecretReference, Secret> secrets = new HashMap<>();
for (final Map.Entry<SecretProvider, Set<SecretReference>> entry : referencesByProvider.entrySet()) {
final SecretProvider provider = entry.getKey();
final Set<SecretReference> references = entry.getValue();

// If no provider found, be sure to map to a null Secret rather than skipping
if (provider == null) {
for (final SecretReference secretReference : references) {
secrets.put(secretReference, null);
}
continue;
}

final List<String> secretNames = references.stream()
.map(SecretReference::getFullyQualifiedName)
.filter(Objects::nonNull)
Expand Down Expand Up @@ -192,6 +200,7 @@ private Map<SecretReference, Secret> fetchSecretsWithCache(final Set<SecretRefer

// Partition references into cache hits vs. misses that need fetching
final Map<SecretProvider, Set<SecretReference>> uncachedByProvider = new HashMap<>();

for (final SecretReference secretReference : secretReferences) {
final String fqn = secretReference.getFullyQualifiedName();

Expand All @@ -205,21 +214,18 @@ private Map<SecretReference, Secret> fetchSecretsWithCache(final Set<SecretRefer
}

final SecretProvider provider = findProvider(secretReference, providers);
uncachedByProvider.computeIfAbsent(provider, k -> new HashSet<>()).add(secretReference);
if (provider == null) {
results.put(secretReference, null);
} else {
uncachedByProvider.computeIfAbsent(provider, k -> new HashSet<>()).add(secretReference);
}
}

// Batch fetch uncached secrets grouped by provider
for (final Map.Entry<SecretProvider, Set<SecretReference>> entry : uncachedByProvider.entrySet()) {
final SecretProvider provider = entry.getKey();
final Set<SecretReference> references = entry.getValue();

if (provider == null) {
for (final SecretReference secretReference : references) {
results.put(secretReference, null);
}
continue;
}

final List<String> secretNames = references.stream()
.map(SecretReference::getFullyQualifiedName)
.filter(Objects::nonNull)
Expand Down Expand Up @@ -266,6 +272,25 @@ private void cacheSecret(final String fqn, final Secret secret) {
}
}

private void logSkippedInvalidProvider(final ParameterProviderNode parameterProviderNode, final ValidationStatus status) {
final String providerId = parameterProviderNode.getIdentifier();
if (providerId == null) {
return;
}

// Treat a null status (e.g., a mock or transient lookup failure) as INVALID for deduplication
// purposes so the tracked status is always well-defined.
final ValidationStatus effectiveStatus = status == null ? ValidationStatus.INVALID : status;
final ValidationStatus priorStatus = lastLoggedStatus.put(providerId, effectiveStatus);
if (priorStatus == effectiveStatus) {
return;
}

logger.warn("Skipping Parameter Provider [{}] (id={}) as a Secret Provider because its current validation status is {}; "
+ "SecretReferences backed by this provider will resolve to null until it returns to VALID",
parameterProviderNode.getName(), providerId, effectiveStatus);
}

private SecretProvider findProvider(final SecretReference secretReference, final Set<SecretProvider> providers) {
// Search first by Provider ID, if it's provided.
final String providerId = secretReference.getProviderId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,5 +575,76 @@ private ParameterProviderSecretsManager createManagerWithCacheDuration(final Str
manager.initialize(initContext);
return manager;
}

@Test
public void testGetSecretsReturnsNullValueWhenProviderIsInvalid() {
final FlowManager flowManager = mock(FlowManager.class);
final ParameterProviderNode invalidProvider = createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
ValidationStatus.INVALID, createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, SECRET_1_VALUE));
// performValidation should also return INVALID so the provider is consistently filtered out.
when(invalidProvider.performValidation()).thenReturn(ValidationStatus.INVALID);
when(flowManager.getAllParameterProviders()).thenReturn(Set.of(invalidProvider));

final ParameterProviderSecretsManager manager = new ParameterProviderSecretsManager();
manager.initialize(new StandardSecretsManagerInitializationContext(flowManager));

final SecretReference reference = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_1_NAME);

final Map<SecretReference, Secret> results = manager.getSecrets(Set.of(reference));

assertEquals(1, results.size());
assertTrue(results.containsKey(reference));
assertNull(results.get(reference));
}

@Test
public void testGetSecretsResolvesReferenceOnceProviderTransitionsToValid() {
final FlowManager flowManager = mock(FlowManager.class);
final ParameterProviderNode flippingProvider = createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
ValidationStatus.INVALID, createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, SECRET_1_VALUE));
when(flippingProvider.performValidation()).thenReturn(ValidationStatus.INVALID);
when(flowManager.getAllParameterProviders()).thenReturn(Set.of(flippingProvider));

final ParameterProviderSecretsManager manager = new ParameterProviderSecretsManager();
manager.initialize(new StandardSecretsManagerInitializationContext(flowManager,
Map.of(NiFiProperties.SECRETS_MANAGER_CACHE_DURATION, DEFAULT_CACHE_DURATION)));

final SecretReference reference = createSecretReference(PROVIDER_1_ID, PROVIDER_1_NAME, SECRET_1_NAME);

// While the provider is INVALID the secret is unresolvable.
assertNull(manager.getSecrets(Set.of(reference)).get(reference));

// Once the provider transitions to VALID a follow-up call resolves the value. The cache only
// stores non-null secrets, so the prior unresolved attempt does not block this re-attempt.
when(flippingProvider.getValidationStatus()).thenReturn(ValidationStatus.VALID);
when(flippingProvider.performValidation()).thenReturn(ValidationStatus.VALID);

final Secret resolved = manager.getSecrets(Set.of(reference)).get(reference);
assertNotNull(resolved);
assertEquals(SECRET_1_VALUE, resolved.getValue());
}

@Test
public void testGetSecretProvidersFiltersConsistentlyAcrossValidationStatusTransitions() {
final FlowManager flowManager = mock(FlowManager.class);
final ParameterProviderNode provider = createMockedParameterProviderNode(PROVIDER_1_ID, PROVIDER_1_NAME, GROUP_1_NAME,
ValidationStatus.INVALID, createParameter(SECRET_1_NAME, SECRET_1_DESCRIPTION, SECRET_1_VALUE));
when(provider.performValidation()).thenReturn(ValidationStatus.INVALID);
when(flowManager.getAllParameterProviders()).thenReturn(Set.of(provider));

final ParameterProviderSecretsManager manager = new ParameterProviderSecretsManager();
manager.initialize(new StandardSecretsManagerInitializationContext(flowManager));

assertTrue(manager.getSecretProviders().isEmpty());
assertTrue(manager.getSecretProviders().isEmpty());

when(provider.getValidationStatus()).thenReturn(ValidationStatus.VALID);
when(provider.performValidation()).thenReturn(ValidationStatus.VALID);
assertEquals(1, manager.getSecretProviders().size());

when(provider.getValidationStatus()).thenReturn(ValidationStatus.INVALID);
when(provider.performValidation()).thenReturn(ValidationStatus.INVALID);
assertTrue(manager.getSecretProviders().isEmpty());
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,27 @@ public interface ConnectorNode extends ComponentAuthorizable, VersionedComponent
*/
void setConfiguration(String configurationStepName, StepConfiguration configuration) throws FlowUpdateException;

/**
* Replaces the configuration of the named step on the working flow context with the given configuration.
* Unlike {@link #setConfiguration(String, StepConfiguration)}, which merges the incoming properties with
* any existing properties for the step, this method treats the supplied configuration as the authoritative
* full state for the step: any property not present in {@code configuration} is removed from the step.
*
* <p>If applying the configuration changes either the raw or the resolved property values, the Connector
* is notified via {@link Connector#onConfigurationStepConfigured} so the embedded flow and Parameter
* Context can be brought up to date. If nothing changed, no notification is performed.</p>
*
* <p>Intended for use by the framework when reconciling the working flow context against an external
* {@link ConnectorConfigurationProvider}, whose view is treated as authoritative. This method should
* only be invoked via the ConnectorRepository.</p>
*
* @param configurationStepName the name of the configuration step being replaced
* (must match one of the names returned by {@link Connector#getConfigurationSteps()})
* @param configuration the full configuration for the given configuration step
* @throws FlowUpdateException if unable to apply the configuration changes
*/
void replaceWorkingConfiguration(String configurationStepName, StepConfiguration configuration) throws FlowUpdateException;

void transitionStateForUpdating();

void prepareForUpdate() throws FlowUpdateException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,18 +333,30 @@ public void setConfiguration(final String stepName, final StepConfiguration conf
}

private void setConfiguration(final String stepName, final StepConfiguration configuration, final boolean forceOnConfigurationStepConfigured) throws FlowUpdateException {
// Update properties and check if the configuration changed.
final ConfigurationUpdateResult updateResult = workingFlowContext.getConfigurationContext().setProperties(stepName, configuration);
if (updateResult == ConfigurationUpdateResult.NO_CHANGES && !forceOnConfigurationStepConfigured) {
return;
}
notifyStepConfigured(stepName);
}

@Override
public void replaceWorkingConfiguration(final String stepName, final StepConfiguration configuration) throws FlowUpdateException {
// The configuration provider's view is authoritative: any property absent from the provided
// configuration is removed from the step.
final ConfigurationUpdateResult updateResult = workingFlowContext.getConfigurationContext().replaceProperties(stepName, configuration);
if (updateResult == ConfigurationUpdateResult.NO_CHANGES) {
return;
}
notifyStepConfigured(stepName);
}

// If there were changes, trigger Processor to be notified of the change.
private void notifyStepConfigured(final String stepName) throws FlowUpdateException {
final Connector connector = connectorDetails.getConnector();
try (final NarCloseable ignored = NarCloseable.withComponentNarLoader(extensionManager, connector.getClass(), getIdentifier())) {
logger.debug("Notifying {} of configuration change for configuration step {}", this, stepName);
connector.onConfigurationStepConfigured(stepName, workingFlowContext);
logger.debug("Successfully set configuration for step {} on {}", stepName, this);
logger.debug("Successfully notified {} of configuration change for step {}", this, stepName);
} catch (final FlowUpdateException e) {
throw e;
} catch (final Exception e) {
Expand Down Expand Up @@ -863,29 +875,19 @@ private void recreateWorkingFlowContext() {

getComponentLog().info("Working Flow Context has been recreated");

synchronizeWorkingFlowParameters();
}

/**
* Re-triggers {@link Connector#onConfigurationStepConfigured} for every configured step in
* the working flow context. This ensures that flow parameters derived from the configuration
* (e.g., resolved asset paths) are fresh, even when the working context was just recreated
* from the active flow whose parameter values may be stale.
*
* <p>Skipped when {@code initializationContext} is {@code null} because the connector has
* not yet been initialized and there is no flow to update.</p>
*/
private void synchronizeWorkingFlowParameters() {
// Re-fire onConfigurationStepConfigured for every step so flow parameters derived from the
// configuration (e.g., resolved asset paths, secret values) are refreshed against the new
// working context. Step failures are logged so the remaining steps can still be refreshed.
// Skipped before the connector has been initialized because there is no flow to update yet.
if (initializationContext == null) {
return;
}

final ConnectorConfiguration config = workingFlowContext.getConfigurationContext().toConnectorConfiguration();
for (final NamedStepConfiguration stepConfig : config.getNamedStepConfigurations()) {
try {
setConfiguration(stepConfig.stepName(), stepConfig.configuration(), true);
} catch (final Exception e) {
logger.warn("Failed to synchronize working flow parameters for step [{}] of {}",
logger.warn("Failed to refresh resolved configuration for step [{}] of {}",
stepConfig.stepName(), this, e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -813,15 +813,26 @@ private void syncFromProvider(final ConnectorNode connector) {
}

final List<VersionedConfigurationStep> workingFlowConfiguration = config.getWorkingFlowConfiguration();
if (workingFlowConfiguration != null) {
// Enrich provider-sourced SECRET_REFERENCE values with providerId before they are
// converted into the in-memory ConnectorValueReference graph.
resolveSecretReferencesFromProvider(workingFlowConfiguration);

final MutableConnectorConfigurationContext workingConfigContext = connector.getWorkingFlowContext().getConfigurationContext();
for (final VersionedConfigurationStep step : workingFlowConfiguration) {
final StepConfiguration stepConfiguration = toStepConfiguration(step);
workingConfigContext.replaceProperties(step.getName(), stepConfiguration);

if (workingFlowConfiguration == null) {
return;
}

// Enrich provider-sourced SECRET_REFERENCE values with providerId before they are
// converted into the in-memory ConnectorValueReference graph.
resolveSecretReferencesFromProvider(workingFlowConfiguration);

// Replace each step's working configuration on the connector. Routing through the connector
// (rather than touching the configuration context directly) ensures it is notified via
// onConfigurationStepConfigured when raw or resolved property values changed, so the embedded
// flow's Parameter Context picks up new values (e.g., rotated secrets) without an explicit save.
for (final VersionedConfigurationStep step : workingFlowConfiguration) {
final StepConfiguration stepConfiguration = toStepConfiguration(step);
try {
connector.replaceWorkingConfiguration(step.getName(), stepConfiguration);
} catch (final Exception e) {
logger.warn("Failed to replace working configuration for step [{}] on {} during sync from provider; continuing with remaining steps",
step.getName(), connector, e);
}
}
}
Expand Down
Loading
Loading