diff --git a/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java index 74228a2d0aa6..f6d02bf52294 100644 --- a/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java +++ b/nifi-framework-api/src/main/java/org/apache/nifi/diagnostics/ThreadDumpTask.java @@ -16,116 +16,51 @@ */ package org.apache.nifi.diagnostics; -import java.lang.management.LockInfo; +import com.sun.management.HotSpotDiagnosticMXBean; + +import java.io.IOException; import java.lang.management.ManagementFactory; -import java.lang.management.MonitorInfo; -import java.lang.management.ThreadInfo; -import java.lang.management.ThreadMXBean; -import java.util.ArrayList; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.Collections; -import java.util.Comparator; -import java.util.List; -import java.util.Objects; +/** + * Captures a textual dump of every thread in the JVM. Uses + * {@link HotSpotDiagnosticMXBean#dumpThreads(String, HotSpotDiagnosticMXBean.ThreadDumpFormat)} + * so that virtual threads are included in the dump alongside platform threads. + */ public class ThreadDumpTask implements DiagnosticTask { - @Override - public DiagnosticsDumpElement captureDump(boolean verbose) { - final ThreadMXBean mbean = ManagementFactory.getThreadMXBean(); - - final ThreadInfo[] infos = mbean.dumpAllThreads(true, true); - final long[] deadlockedThreadIds = mbean.findDeadlockedThreads(); - final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads(); - - final List sortedInfos = new ArrayList<>(infos.length); - Collections.addAll(sortedInfos, infos); - sortedInfos.sort(new Comparator<>() { - @Override - public int compare(ThreadInfo o1, ThreadInfo o2) { - return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase()); - } - }); - - final StringBuilder sb = new StringBuilder(); - for (final ThreadInfo info : sortedInfos) { - sb.append("\n"); - sb.append("\"").append(info.getThreadName()).append("\" Id="); - sb.append(info.getThreadId()).append(" "); - sb.append(info.getThreadState().toString()).append(" "); - - switch (info.getThreadState()) { - case BLOCKED: - case TIMED_WAITING: - case WAITING: - sb.append(" on "); - sb.append(info.getLockInfo()); - break; - default: - break; - } - - if (info.isSuspended()) { - sb.append(" (suspended)"); - } - if (info.isInNative()) { - sb.append(" (in native code)"); - } - - if (deadlockedThreadIds != null) { - for (final long id : deadlockedThreadIds) { - if (id == info.getThreadId()) { - sb.append(" ** DEADLOCKED THREAD **"); - } - } - } - if (monitorDeadlockThreadIds != null) { - for (final long id : monitorDeadlockThreadIds) { - if (id == info.getThreadId()) { - sb.append(" ** MONITOR-DEADLOCKED THREAD **"); - } - } - } - - final StackTraceElement[] stackTraces = info.getStackTrace(); - for (final StackTraceElement element : stackTraces) { - sb.append("\n\tat ").append(element); - - final MonitorInfo[] monitors = info.getLockedMonitors(); - for (final MonitorInfo monitor : monitors) { - if (Objects.equals(monitor.getLockedStackFrame(), element)) { - sb.append("\n\t- waiting on ").append(monitor); - } - } + @Override + public DiagnosticsDumpElement captureDump(final boolean verbose) { + String threadDump; + + Path tempDirectory = null; + try { + final HotSpotDiagnosticMXBean diagnosticMXBean = ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class); + // dumpThreads requires that the destination file does not already exist. Creating a private + // temporary directory and writing to a fresh filename inside it avoids a time-of-check to + // time-of-use race that would exist if we created a temp file and then deleted it before the + // dumpThreads call. + tempDirectory = Files.createTempDirectory("nifi-thread-dump-"); + final Path tempFile = tempDirectory.resolve("thread-dump.txt"); + try { + diagnosticMXBean.dumpThreads(tempFile.toString(), HotSpotDiagnosticMXBean.ThreadDumpFormat.TEXT_PLAIN); + threadDump = Files.readString(tempFile); + } finally { + Files.deleteIfExists(tempFile); } - - final LockInfo[] lockInfos = info.getLockedSynchronizers(); - if (lockInfos.length > 0) { - sb.append("\n\t"); - sb.append("Number of Locked Synchronizers: ").append(lockInfos.length); - for (final LockInfo lockInfo : lockInfos) { - sb.append("\n\t- ").append(lockInfo.toString()); + } catch (final IOException e) { + threadDump = "Failed to capture thread dump: " + e.getMessage(); + } finally { + if (tempDirectory != null) { + try { + Files.deleteIfExists(tempDirectory); + } catch (final IOException ignored) { } } - - sb.append("\n"); - } - - if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) { - sb.append("\n\nDEADLOCK DETECTED!"); - sb.append("\nThe following thread IDs are deadlocked:"); - for (final long id : deadlockedThreadIds) { - sb.append("\n").append(id); - } - } - - if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) { - sb.append("\n\nMONITOR DEADLOCK DETECTED!"); - sb.append("\nThe following thread IDs are deadlocked:"); - for (final long id : monitorDeadlockThreadIds) { - sb.append("\n").append(id); - } } - return new StandardDiagnosticsDumpElement("Thread Dump", Collections.singletonList(sb.toString())); + return new StandardDiagnosticsDumpElement("Thread Dump", Collections.singletonList(threadDump)); } } diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java index a3095a86ea1e..08a397fc6ba9 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/scheduling/LifecycleState.java @@ -22,13 +22,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import java.util.WeakHashMap; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,7 +33,6 @@ public class LifecycleState { private final Object componentId; private final AtomicInteger activeThreadCount = new AtomicInteger(0); private final AtomicBoolean scheduled = new AtomicBoolean(false); - private final Set> futures = new HashSet<>(); private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false); private volatile long lastStopTime = -1; private volatile boolean terminated = false; @@ -111,7 +105,16 @@ public synchronized void setScheduled(final boolean scheduled) { mustCallOnStoppedMethods.set(true); if (!scheduled) { - lastStopTime = System.currentTimeMillis(); + // Force the stop time to strictly advance. System.currentTimeMillis() has millisecond resolution, so a + // second stop that happens within the same millisecond as the previous one would otherwise read the same + // value; that would defeat the race check in VirtualThreadSchedulingAgent, which relies on this field + // changing to signal that a prior scheduling generation has ended. + final long previousStopTime = lastStopTime; + long nextStopTime = System.currentTimeMillis(); + if (nextStopTime <= previousStopTime) { + nextStopTime = previousStopTime + 1L; + } + lastStopTime = nextStopTime; } } @@ -135,25 +138,6 @@ public boolean mustCallOnStoppedMethods() { return mustCallOnStoppedMethods.getAndSet(false); } - /** - * Establishes the list of relevant futures for this processor. Replaces any previously held futures. - * - * @param newFutures futures - */ - public synchronized void setFutures(final Collection> newFutures) { - futures.clear(); - futures.addAll(newFutures); - } - - public synchronized void replaceFuture(final ScheduledFuture oldFuture, final ScheduledFuture newFuture) { - futures.remove(oldFuture); - futures.add(newFuture); - } - - public synchronized Set> getFutures() { - return Collections.unmodifiableSet(futures); - } - public synchronized void terminate() { this.terminated = true; activeThreadCount.set(0); diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java index 8b92b5e411ed..584aa0456e6d 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/FlowController.java @@ -130,12 +130,11 @@ import org.apache.nifi.controller.repository.claim.StandardContentClaim; import org.apache.nifi.controller.repository.claim.StandardResourceClaimManager; import org.apache.nifi.controller.repository.io.LimitedInputStream; -import org.apache.nifi.controller.scheduling.CronSchedulingAgent; import org.apache.nifi.controller.scheduling.LifecycleStateManager; import org.apache.nifi.controller.scheduling.RepositoryContextFactory; import org.apache.nifi.controller.scheduling.StandardLifecycleStateManager; import org.apache.nifi.controller.scheduling.StandardProcessScheduler; -import org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent; +import org.apache.nifi.controller.scheduling.VirtualThreadSchedulingAgent; import org.apache.nifi.controller.serialization.FlowSerializationException; import org.apache.nifi.controller.serialization.FlowSerializer; import org.apache.nifi.controller.serialization.FlowSynchronizationException; @@ -292,8 +291,17 @@ public class FlowController implements ReportingTaskProvider, FlowAnalysisRulePr private static final String ZOOKEEPER_STATE_PROVIDER_SERVER_CLASS = "org.apache.nifi.controller.state.providers.zookeeper.server.ZooKeeperStateProviderServer"; + /** + * Fixed size of the thread pool that runs framework tasks (status-history capture, analytics predictions, + * Python extension discovery, and the component start/stop lifecycle). Processor and reporting-task + * invocations are no longer dispatched on this pool -- they run on virtual threads managed by + * {@link VirtualThreadSchedulingAgent} -- so this pool only needs to accommodate the background framework work. + */ + private static final int FRAMEWORK_TASK_POOL_SIZE = 8; + private final AtomicInteger maxTimerDrivenThreads; - private final AtomicReference timerDrivenEngineRef; + private final AtomicReference frameworkTaskEngineRef; + private final VirtualThreadSchedulingAgent virtualThreadSchedulingAgent; private final ContentRepository contentRepository; private final FlowFileRepository flowFileRepository; @@ -551,7 +559,7 @@ private FlowController( stateManagerProvider.enableClusterProvider(); } - timerDrivenEngineRef = new AtomicReference<>(new FlowEngine(maxTimerDrivenThreads.get(), "Timer-Driven Process")); + frameworkTaskEngineRef = new AtomicReference<>(new FlowEngine(FRAMEWORK_TASK_POOL_SIZE, "FrameworkTaskEngine")); final FlowFileRepository flowFileRepo = createFlowFileRepository(nifiProperties, extensionManager, resourceClaimManager); flowFileRepository = flowFileRepo; @@ -598,7 +606,7 @@ private FlowController( lifecycleStateManager = new StandardLifecycleStateManager(); reloadComponent = new StandardReloadComponent(this); - processScheduler = new StandardProcessScheduler(timerDrivenEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager); + processScheduler = new StandardProcessScheduler(frameworkTaskEngineRef.get(), this, stateManagerProvider, this.nifiProperties, lifecycleStateManager); parameterContextManager = new StandardParameterContextManager(); final long maxAppendableBytes = getMaxAppendableBytes(); @@ -672,10 +680,9 @@ private FlowController( flowAnalyzer.initialize(controllerServiceProvider); } - final CronSchedulingAgent cronSchedulingAgent = new CronSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory); - final TimerDrivenSchedulingAgent timerDrivenAgent = new TimerDrivenSchedulingAgent(this, timerDrivenEngineRef.get(), repositoryContextFactory, this.nifiProperties); - processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, timerDrivenAgent); - processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, cronSchedulingAgent); + this.virtualThreadSchedulingAgent = new VirtualThreadSchedulingAgent(this, repositoryContextFactory, this.nifiProperties, maxTimerDrivenThreads.get()); + processScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, virtualThreadSchedulingAgent); + processScheduler.setSchedulingAgent(SchedulingStrategy.CRON_DRIVEN, virtualThreadSchedulingAgent); startConnectablesAfterInitialization = new HashSet<>(); startRemoteGroupPortsAfterInitialization = new HashSet<>(); @@ -789,7 +796,7 @@ private FlowController( analyticsEngine = new CachingConnectionStatusAnalyticsEngine(flowManager, statusHistoryRepository, statusAnalyticsModelMapFactory, predictionIntervalMillis, queryIntervalMillis, modelScoreName, modelScoreThreshold); - timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { + frameworkTaskEngineRef.get().scheduleWithFixedDelay(() -> { try { Long startTs = System.currentTimeMillis(); RepositoryStatusReport statusReport = flowFileEventRepository.reportTransferEvents(startTs); @@ -810,7 +817,7 @@ private FlowController( eventAccess = new StandardEventAccess(flowManager, flowFileEventRepository, processScheduler, authorizer, provenanceRepository, auditService, analyticsEngine, flowFileRepository, contentRepository); - timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { + frameworkTaskEngineRef.get().scheduleWithFixedDelay(() -> { try { statusHistoryRepository.capture(getNodeStatusSnapshot(), eventAccess.getControllerStatus(), getGarbageCollectionStatus(), new Date()); } catch (final Exception e) { @@ -1304,7 +1311,7 @@ public void initializeFlow(final QueueProvider queueProvider) throws IOException notifyComponentsConfigurationRestored(); - timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { + frameworkTaskEngineRef.get().scheduleWithFixedDelay(() -> { try { updateRemoteProcessGroups(); } catch (final Throwable t) { @@ -1318,7 +1325,7 @@ public void initializeFlow(final QueueProvider queueProvider) throws IOException LOG.info("Scheduled Flow Registry synchronization every {}", registrySyncInterval); // Schedule the flow registry synchronization task - timerDrivenEngineRef.get().scheduleWithFixedDelay(() -> { + frameworkTaskEngineRef.get().scheduleWithFixedDelay(() -> { final ProcessGroup rootGroup = flowManager.getRootGroup(); final List allGroups = rootGroup.findAllProcessGroups(); allGroups.add(rootGroup); @@ -1559,7 +1566,7 @@ public void trigger(final ComponentNode component) { scheduleLongRunningTaskMonitor(); final Runnable discoverPythonExtensions = () -> extensionManager.discoverNewPythonExtensions(pythonBundle); - timerDrivenEngineRef.get().scheduleWithFixedDelay(discoverPythonExtensions, 1, 1, TimeUnit.MINUTES); + frameworkTaskEngineRef.get().scheduleWithFixedDelay(discoverPythonExtensions, 1, 1, TimeUnit.MINUTES); } finally { writeLock.unlock("onFlowInitialized"); } @@ -1814,7 +1821,7 @@ public Authorizer getAuthorizer() { public boolean isTerminated() { this.readLock.lock(); try { - return null == this.timerDrivenEngineRef.get() || this.timerDrivenEngineRef.get().isTerminated(); + return null == this.frameworkTaskEngineRef.get() || this.frameworkTaskEngineRef.get().isTerminated(); } finally { this.readLock.unlock("isTerminated"); } @@ -1838,7 +1845,7 @@ public void shutdown(final boolean kill) { readLock.lock(); try { - if (isTerminated() || timerDrivenEngineRef.get().isTerminating()) { + if (isTerminated() || frameworkTaskEngineRef.get().isTerminating()) { throw new IllegalStateException("Controller already stopped or still stopping..."); } @@ -1888,10 +1895,15 @@ public void shutdown(final boolean kill) { } if (kill) { - this.timerDrivenEngineRef.get().shutdownNow(); + this.frameworkTaskEngineRef.get().shutdownNow(); + // frameworkTaskEngineRef.shutdownNow() only interrupts threads in the platform-thread framework pool. + // Processor/reporting-task work runs on virtual threads owned by the VirtualThreadSchedulingAgent, so + // those threads must be interrupted explicitly on the kill path rather than waiting for the final + // processScheduler.shutdown() call below to do it. This is idempotent with the later shutdown() call. + virtualThreadSchedulingAgent.shutdown(); LOG.info("Initiated immediate shutdown of flow controller..."); } else { - this.timerDrivenEngineRef.get().shutdown(); + this.frameworkTaskEngineRef.get().shutdown(); LOG.info("Initiated graceful shutdown of flow controller...waiting up to {} seconds", gracefulShutdownSeconds); } @@ -1899,7 +1911,7 @@ public void shutdown(final boolean kill) { // Give thread pool up to the configured amount of time to finish, but no less than 2 seconds, // in order to allow for a more graceful shutdown. final long millisToWait = Math.max(2000, shutdownEnd - System.currentTimeMillis()); - this.timerDrivenEngineRef.get().awaitTermination(millisToWait, TimeUnit.MILLISECONDS); + this.frameworkTaskEngineRef.get().awaitTermination(millisToWait, TimeUnit.MILLISECONDS); } catch (final InterruptedException ie) { LOG.info("Interrupted while waiting for controller termination."); } @@ -1910,7 +1922,7 @@ public void shutdown(final boolean kill) { LOG.warn("Unable to shut down FlowFileRepository", t); } - if (this.timerDrivenEngineRef.get().isTerminated()) { + if (this.frameworkTaskEngineRef.get().isTerminated()) { LOG.info("Controller has been terminated successfully."); } else { LOG.warn("Controller hasn't terminated properly. There exists an uninterruptable thread that " @@ -2133,39 +2145,21 @@ public int getMaxTimerDrivenThreadCount() { } public int getActiveTimerDrivenThreadCount() { - return timerDrivenEngineRef.get().getActiveCount(); + return frameworkTaskEngineRef.get().getActiveCount() + virtualThreadSchedulingAgent.getActiveThreadCount(); } public void setMaxTimerDrivenThreadCount(final int maxThreadCount) { - writeLock.lock(); - try { - setMaxThreadCount(maxThreadCount, "Timer Driven", this.timerDrivenEngineRef.get(), this.maxTimerDrivenThreads); - } finally { - writeLock.unlock("setMaxTimerDrivenThreadCount"); - } - } - - /** - * Updates the number of threads that can be simultaneously used for executing processors. - * This method must be called while holding the write lock! - * - * @param maxThreadCount Requested new thread pool size - * @param poolName Thread Pool Name - * @param engine Flow Engine executor or null when terminated - * @param maxThreads Internal tracker for Maximum Threads - */ - private void setMaxThreadCount(final int maxThreadCount, final String poolName, final FlowEngine engine, final AtomicInteger maxThreads) { if (maxThreadCount < 1) { throw new IllegalArgumentException("Cannot set max number of threads to less than 1"); } - maxThreads.getAndSet(maxThreadCount); - if (engine == null) { - LOG.debug("[{}] Engine not found: Maximum Thread Count not updated", poolName); - } else { - final int previousCorePoolSize = engine.getCorePoolSize(); - engine.setCorePoolSize(maxThreadCount); - LOG.info("[{}] Maximum Thread Count updated [{}] previous [{}]", poolName, maxThreadCount, previousCorePoolSize); + writeLock.lock(); + try { + final int previousMax = maxTimerDrivenThreads.getAndSet(maxThreadCount); + virtualThreadSchedulingAgent.setMaxThreadCount(maxThreadCount); + LOG.info("Maximum Timer-Driven Thread Count updated [{}] previous [{}]", maxThreadCount, previousMax); + } finally { + writeLock.unlock("setMaxTimerDrivenThreadCount"); } } @@ -2805,7 +2799,7 @@ public GroupStatusCounts getGroupStatusCounts(final ProcessGroup group) { } public int getActiveThreadCount() { - return timerDrivenEngineRef.get().getActiveCount(); + return getActiveTimerDrivenThreadCount(); } // diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java deleted file mode 100644 index 2b051121e066..000000000000 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractSchedulingAgent.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.scheduling; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.engine.FlowEngine; - -import java.util.concurrent.Callable; -import java.util.concurrent.Future; - -/** - * Base implementation of the {@link SchedulingAgent} which encapsulates the - * updates to the {@link LifecycleState} based on invoked operation and then - * delegates to the corresponding 'do' methods. For example; By invoking - * {@link #schedule(Connectable, LifecycleState)} the - * {@link LifecycleState#setScheduled(boolean)} with value 'true' will be - * invoked. - * - * @see TimerDrivenSchedulingAgent - * @see CronSchedulingAgent - */ -abstract class AbstractSchedulingAgent implements SchedulingAgent { - - protected final FlowEngine flowEngine; - - protected AbstractSchedulingAgent(FlowEngine flowEngine) { - this.flowEngine = flowEngine; - } - - @Override - public void schedule(Connectable connectable, LifecycleState scheduleState) { - scheduleState.setScheduled(true); - this.doSchedule(connectable, scheduleState); - } - - @Override - public void scheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback) { - scheduleState.setScheduled(true); - this.doScheduleOnce(connectable, scheduleState, stopCallback); - } - - @Override - public void unschedule(Connectable connectable, LifecycleState lifeycleState) { - lifeycleState.setScheduled(false); - this.doUnschedule(connectable, lifeycleState); - } - - @Override - public void schedule(ReportingTaskNode taskNode, LifecycleState lifecycleState) { - lifecycleState.setScheduled(true); - this.doSchedule(taskNode, lifecycleState); - } - - @Override - public void unschedule(ReportingTaskNode taskNode, LifecycleState lifecycleState) { - lifecycleState.setScheduled(false); - this.doUnschedule(taskNode, lifecycleState); - } - - /** - * Schedules the provided {@link Connectable}. Its {@link LifecycleState} - * will be set to true - * - * @param connectable - * the instance of {@link Connectable} - * @param scheduleState - * the instance of {@link LifecycleState} - */ - protected abstract void doSchedule(Connectable connectable, LifecycleState scheduleState); - - /** - * Schedules the provided {@link Connectable} to run once and then calls the provided stopCallback to stop it. - * Its {@link LifecycleState} will be set to true - * - * @param connectable - * the instance of {@link Connectable} - * @param scheduleState - * the instance of {@link LifecycleState} - * @param stopCallback the callback responsible for stopping connectable after it ran once - */ - protected abstract void doScheduleOnce(Connectable connectable, LifecycleState scheduleState, Callable> stopCallback); - - /** - * Unschedules the provided {@link Connectable}. Its {@link LifecycleState} - * will be set to false - * - * @param connectable - * the instance of {@link Connectable} - * @param scheduleState - * the instance of {@link LifecycleState} - */ - protected abstract void doUnschedule(Connectable connectable, LifecycleState scheduleState); - - /** - * Schedules the provided {@link ReportingTaskNode}. Its - * {@link LifecycleState} will be set to true - * - * @param connectable - * the instance of {@link ReportingTaskNode} - * @param scheduleState - * the instance of {@link LifecycleState} - */ - protected abstract void doSchedule(ReportingTaskNode connectable, LifecycleState scheduleState); - - /** - * Unschedules the provided {@link ReportingTaskNode}. Its - * {@link LifecycleState} will be set to false - * - * @param connectable - * the instance of {@link ReportingTaskNode} - * @param scheduleState - * the instance of {@link LifecycleState} - */ - protected abstract void doUnschedule(ReportingTaskNode connectable, LifecycleState scheduleState); -} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java deleted file mode 100644 index 2f3659a8ae27..000000000000 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/AbstractTimeBasedSchedulingAgent.java +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.scheduling; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.tasks.ConnectableTask; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.processor.exception.ProcessException; -import org.apache.nifi.util.FormatUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; - -public abstract class AbstractTimeBasedSchedulingAgent extends AbstractSchedulingAgent { - protected final Logger logger = LoggerFactory.getLogger(this.getClass()); - - protected final FlowController flowController; - protected final RepositoryContextFactory contextFactory; - - protected volatile String adminYieldDuration = "1 sec"; - - public AbstractTimeBasedSchedulingAgent( - final FlowEngine flowEngine, - final FlowController flowController, - final RepositoryContextFactory contextFactory - ) { - super(flowEngine); - this.flowController = flowController; - this.contextFactory = contextFactory; - } - - @Override - public void doScheduleOnce(final Connectable connectable, final LifecycleState scheduleState, Callable> stopCallback) { - final List> futures = new ArrayList<>(); - final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState); - - final Runnable trigger = () -> { - connectableTask.invoke(); - try { - stopCallback.call(); - } catch (Exception e) { - String errorMessage = "Error while stopping " + connectable + " after running once."; - logger.error(errorMessage, e); - throw new ProcessException(errorMessage, e); - } - }; - - final ScheduledFuture future = flowEngine.schedule(trigger, 1, TimeUnit.NANOSECONDS); - futures.add(future); - - scheduleState.setFutures(futures); - } - - @Override - public void setAdministrativeYieldDuration(final String yieldDuration) { - this.adminYieldDuration = yieldDuration; - } - - @Override - public String getAdministrativeYieldDuration() { - return adminYieldDuration; - } - - @Override - public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { - return FormatUtils.getTimeDuration(adminYieldDuration, timeUnit); - } - - @Override - public void incrementMaxThreadCount(int toAdd) { - final int corePoolSize = flowEngine.getCorePoolSize(); - if (toAdd < 0 && corePoolSize + toAdd < 1) { - throw new IllegalStateException("Cannot remove " + (-toAdd) + " threads from pool because there are only " + corePoolSize + " threads in the pool"); - } - - flowEngine.setCorePoolSize(corePoolSize + toAdd); - } -} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/CronSchedulingAgent.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/CronSchedulingAgent.java deleted file mode 100644 index f44f1b813cec..000000000000 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/CronSchedulingAgent.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.scheduling; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.controller.tasks.ConnectableTask; -import org.apache.nifi.controller.tasks.ReportingTaskWrapper; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.processor.exception.ProcessException; -import org.springframework.scheduling.support.CronExpression; - -import java.time.OffsetDateTime; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -public class CronSchedulingAgent extends AbstractTimeBasedSchedulingAgent { - private final Map>> scheduledFutures = new HashMap<>(); - - public CronSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory) { - super(flowEngine, flowController, contextFactory); - } - - @Override - public void shutdown() { - scheduledFutures.values().forEach(map -> map.values().forEach(future -> { - if (!future.isCancelled()) { - // stop scheduling to run and interrupt currently running tasks. - future.cancel(true); - } - })); - flowEngine.shutdown(); - } - - @Override - public void doSchedule(final ReportingTaskNode taskNode, final LifecycleState scheduleState) { - final Map> componentFuturesMap = scheduledFutures.computeIfAbsent(taskNode, k -> new HashMap<>()); - if (!componentFuturesMap.values().isEmpty()) { - throw new IllegalStateException("Cannot schedule " + taskNode.getReportingTask().getIdentifier() + " because it is already scheduled to run"); - } - - final String cronSchedule = taskNode.getSchedulingPeriod(); - final CronExpression cronExpression; - try { - cronExpression = CronExpression.parse(cronSchedule); - } catch (final Exception pe) { - throw new IllegalStateException("Cannot schedule Reporting Task " + taskNode.getReportingTask().getIdentifier() + " to run because its scheduling period is not valid", pe); - } - - final ReportingTaskWrapper taskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, flowController.getExtensionManager()); - - final OffsetDateTime initialDate = getInitialDate(cronExpression); - final long initialDelay = getInitialDelay(initialDate); - - final Runnable command = new Runnable() { - - private OffsetDateTime nextSchedule = initialDate; - - @Override - public void run() { - taskWrapper.run(); - - nextSchedule = getNextSchedule(nextSchedule, cronExpression); - final long delay = getDelay(nextSchedule); - - logger.debug("Finished running Reporting Task {}; next scheduled time is at {} after a delay of {} milliseconds", taskNode, nextSchedule, delay); - final ScheduledFuture newFuture = flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS); - final ScheduledFuture oldFuture = componentFuturesMap.put(0, newFuture); - scheduleState.replaceFuture(oldFuture, newFuture); - } - }; - - final ScheduledFuture future = flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS); - componentFuturesMap.put(0, future); - - scheduleState.setScheduled(true); - scheduleState.setFutures(componentFuturesMap.values()); - logger.info("Scheduled Reporting Task {} to run threads on schedule {}", taskNode, cronSchedule); - } - - @Override - public synchronized void doSchedule(final Connectable connectable, final LifecycleState scheduleState) { - final Map> componentFuturesMap = scheduledFutures.computeIfAbsent(connectable, k -> new HashMap<>()); - if (!componentFuturesMap.values().isEmpty()) { - throw new IllegalStateException("Cannot schedule " + connectable + " because it is already scheduled to run"); - } - - final String cronSchedule = connectable.evaluateParameters(connectable.getSchedulingPeriod()); - - final CronExpression cronExpression; - try { - cronExpression = CronExpression.parse(cronSchedule); - } catch (final Exception pe) { - throw new IllegalStateException("Cannot schedule " + connectable + " to run because its scheduling period is not valid", pe); - } - - for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { - final ConnectableTask continuallyRunTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState); - - final AtomicInteger taskNumber = new AtomicInteger(i); - - final OffsetDateTime initialDate = getInitialDate(cronExpression); - final long initialDelay = getInitialDelay(initialDate); - - final Runnable command = new Runnable() { - - private OffsetDateTime nextSchedule = initialDate; - - @Override - public void run() { - try { - continuallyRunTask.invoke(); - } catch (final RuntimeException re) { - throw re; - } catch (final Exception e) { - throw new ProcessException(e); - } - - nextSchedule = getNextSchedule(nextSchedule, cronExpression); - final long delay = getDelay(nextSchedule); - - logger.debug("Finished task for {}; next scheduled time is at {} after a delay of {} milliseconds", connectable, nextSchedule, delay); - final ScheduledFuture newFuture = flowEngine.schedule(this, delay, TimeUnit.MILLISECONDS); - final ScheduledFuture oldFuture = componentFuturesMap.put(taskNumber.get(), newFuture); - scheduleState.replaceFuture(oldFuture, newFuture); - } - }; - - final ScheduledFuture future = flowEngine.schedule(command, initialDelay, TimeUnit.MILLISECONDS); - componentFuturesMap.put(taskNumber.get(), future); - } - - scheduleState.setFutures(componentFuturesMap.values()); - logger.info("Scheduled {} to run with {} threads on schedule {}", connectable, connectable.getMaxConcurrentTasks(), cronSchedule); - } - - @Override - public synchronized void doUnschedule(final Connectable connectable, final LifecycleState scheduleState) { - unschedule((Object) connectable, scheduleState); - } - - @Override - public synchronized void doUnschedule(final ReportingTaskNode taskNode, final LifecycleState scheduleState) { - unschedule((Object) taskNode, scheduleState); - } - - private void unschedule(final Object scheduled, final LifecycleState scheduleState) { - scheduledFutures.remove(scheduled); - scheduleState.getFutures().forEach(future -> { - if (!future.isCancelled()) { - // stop scheduling to run but do not interrupt currently running tasks. - future.cancel(false); - } - }); - - scheduleState.setScheduled(false); - logger.info("Stopped scheduling {} to run", scheduled); - } - - @Override - public void onEvent(final Connectable connectable) { - } - - @Override - public void setMaxThreadCount(final int maxThreads) { - } - - private OffsetDateTime getInitialDate(final CronExpression cronExpression) { - final OffsetDateTime now = OffsetDateTime.now(); - final OffsetDateTime initialDate = cronExpression.next(now); - return initialDate == null ? now : initialDate; - } - - private long getInitialDelay(final OffsetDateTime initialDate) { - return initialDate.toInstant().toEpochMilli() - System.currentTimeMillis(); - } - - private static OffsetDateTime getNextSchedule(final OffsetDateTime currentSchedule, final CronExpression cronExpression) { - // Since the clock has not a millisecond precision, we have to check that we - // schedule the next time after the time this was supposed to run, otherwise - // we might end up with running the same task twice - final OffsetDateTime now = OffsetDateTime.now(); - return cronExpression.next(now.isAfter(currentSchedule) ? now : currentSchedule); - } - - private static long getDelay(final OffsetDateTime nextSchedule) { - return Math.max(nextSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); - } -} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/DynamicSemaphore.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/DynamicSemaphore.java new file mode 100644 index 000000000000..210db02d7710 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/DynamicSemaphore.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +/** + * A semaphore wrapper that supports dynamically adjusting the maximum number of permits. + * Uses fair ordering so that any virtual thread that has been waiting the longest is the + * next to acquire a permit; this prevents any particular processor's scheduling loop from + * being starved under heavy contention. + */ +public class DynamicSemaphore { + + private final ResizableSemaphore semaphore; + private volatile int maxPermits; + + public DynamicSemaphore(final int permits) { + if (permits < 1) { + throw new IllegalArgumentException("Permits must be at least 1"); + } + this.maxPermits = permits; + this.semaphore = new ResizableSemaphore(permits); + } + + public void acquire() throws InterruptedException { + semaphore.acquire(); + } + + public boolean tryAcquire(final long timeout, final TimeUnit timeUnit) throws InterruptedException { + return semaphore.tryAcquire(timeout, timeUnit); + } + + public void release() { + semaphore.release(); + } + + /** + * Adjusts the maximum number of permits to the specified count. If the new maximum is + * greater than the current maximum, additional permits are released. If it is less than + * the current maximum, permits are reduced: threads currently holding permits are not + * interrupted, and subsequent releases will not top up available permits until the + * number of permits in circulation falls below the new cap. + * + * @param newMaxPermits the desired maximum number of permits (must be at least 1) + */ + public synchronized void setMaxPermits(final int newMaxPermits) { + if (newMaxPermits < 1) { + throw new IllegalArgumentException("Max permits must be at least 1"); + } + + final int delta = newMaxPermits - this.maxPermits; + this.maxPermits = newMaxPermits; + if (delta > 0) { + semaphore.release(delta); + } else if (delta < 0) { + semaphore.reducePermits(-delta); + } + } + + public int getMaxPermits() { + return maxPermits; + } + + public int availablePermits() { + return semaphore.availablePermits(); + } + + /** + * Returns the number of permits currently in use, computed atomically against any concurrent + * call to {@link #setMaxPermits(int)}. A non-atomic {@code getMaxPermits() - availablePermits()} + * outside this class can observe a transient inconsistency if a resize is in progress between + * the two reads, which is undesirable for metrics that feed cluster heartbeats. + * + * @return the number of permits that have been acquired but not yet released. The returned value + * is a best-effort snapshot because permits may be acquired or released by other threads + * before the caller can act on it, but it is consistent with a single point in time. + */ + public synchronized int getInUsePermits() { + return maxPermits - semaphore.availablePermits(); + } + + /** + * Extends {@link Semaphore} in order to expose the protected {@link #reducePermits(int)} + * method, which is needed in order to dynamically shrink the pool of available permits. + */ + private static class ResizableSemaphore extends Semaphore { + + ResizableSemaphore(final int permits) { + super(permits, true); + } + + @Override + protected void reducePermits(final int reduction) { + super.reducePermits(reduction); + } + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java index 6c5645b28aa4..4188d410aadd 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/StandardProcessScheduler.java @@ -747,14 +747,12 @@ private synchronized void startConnectable(final Connectable connectable) { } lifecycleState.clearTerminationFlag(); + lifecycleState.setScheduled(true); - // Schedule the component to be triggered, unless the engine is stateless. For stateless engine, we let the stateless - // framework take care of triggering components. + // Stateless components are driven by the stateless framework, so no scheduling agent is involved. if (connectable.getProcessGroup().resolveExecutionEngine() != ExecutionEngine.STATELESS) { getSchedulingAgent(connectable).schedule(connectable, lifecycleState); } - - lifecycleState.setScheduled(true); } private synchronized void stopConnectable(final Connectable connectable) { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java deleted file mode 100644 index 69fe3031a6ea..000000000000 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/TimerDrivenSchedulingAgent.java +++ /dev/null @@ -1,191 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.scheduling; - -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.controller.tasks.ConnectableTask; -import org.apache.nifi.controller.tasks.InvocationResult; -import org.apache.nifi.controller.tasks.ReportingTaskWrapper; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.util.FormatUtils; -import org.apache.nifi.util.NiFiProperties; - -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -public class TimerDrivenSchedulingAgent extends AbstractTimeBasedSchedulingAgent { - private final long noWorkYieldNanos; - - public TimerDrivenSchedulingAgent(final FlowController flowController, final FlowEngine flowEngine, final RepositoryContextFactory contextFactory, - final NiFiProperties nifiProperties) { - super(flowEngine, flowController, contextFactory); - - final String boredYieldDuration = nifiProperties.getBoredYieldDuration(); - try { - noWorkYieldNanos = FormatUtils.getTimeDuration(boredYieldDuration, TimeUnit.NANOSECONDS); - } catch (final IllegalArgumentException e) { - throw new RuntimeException("Failed to create SchedulingAgent because the " + NiFiProperties.BORED_YIELD_DURATION + " property is set to an invalid time duration: " + boredYieldDuration); - } - } - - @Override - public void shutdown() { - flowEngine.shutdown(); - } - - @Override - public void doSchedule(final ReportingTaskNode taskNode, final LifecycleState scheduleState) { - final Runnable reportingTaskWrapper = new ReportingTaskWrapper(taskNode, scheduleState, flowController.getExtensionManager()); - final long schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS); - - final ScheduledFuture future = flowEngine.scheduleWithFixedDelay(reportingTaskWrapper, 0L, schedulingNanos, TimeUnit.NANOSECONDS); - final List> futures = new ArrayList<>(1); - futures.add(future); - scheduleState.setFutures(futures); - - logger.info("{} started.", taskNode.getReportingTask()); - } - - @Override - public void doSchedule(final Connectable connectable, final LifecycleState scheduleState) { - final List> futures = new ArrayList<>(); - final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, scheduleState); - - for (int i = 0; i < connectable.getMaxConcurrentTasks(); i++) { - // Determine the task to run and create it. - final AtomicReference> futureRef = new AtomicReference<>(); - - final Runnable trigger = createTrigger(connectableTask, scheduleState, futureRef); - - // Schedule the task to run - final ScheduledFuture future = flowEngine.scheduleWithFixedDelay(trigger, 0L, - connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - - // now that we have the future, set the atomic reference so that if the component is yielded we - // are able to then cancel this future. - futureRef.set(future); - - // Keep track of the futures so that we can update the ScheduleState. - futures.add(future); - } - - scheduleState.setFutures(futures); - logger.info("Scheduled {} to run with {} threads", connectable, connectable.getMaxConcurrentTasks()); - } - - private Runnable createTrigger(final ConnectableTask connectableTask, final LifecycleState scheduleState, final AtomicReference> futureRef) { - final Connectable connectable = connectableTask.getConnectable(); - final Runnable yieldDetectionRunnable = new Runnable() { - @Override - public void run() { - // Call the task. It will return a boolean indicating whether or not we should yield - // based on a lack of work for to do for the component. - final InvocationResult invocationResult = connectableTask.invoke(); - if (invocationResult.isYield()) { - logger.debug("Yielding {} due to {}", connectable, invocationResult.getYieldExplanation()); - } - - // If the component is yielded, cancel its future and re-submit it to run again - // after the yield has expired. - final long newYieldExpiration = connectable.getYieldExpiration(); - final long now = System.currentTimeMillis(); - if (newYieldExpiration > now) { - final long yieldMillis = newYieldExpiration - now; - final long scheduleMillis = connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS); - final ScheduledFuture scheduledFuture = futureRef.get(); - if (scheduledFuture == null) { - return; - } - - // If we are able to cancel the future, create a new one and update the ScheduleState so that it has - // an accurate accounting of which futures are outstanding; we must then also update the futureRef - // so that we can do this again the next time that the component is yielded. - if (scheduledFuture.cancel(false)) { - final long yieldNanos = Math.max(TimeUnit.MILLISECONDS.toNanos(scheduleMillis), TimeUnit.MILLISECONDS.toNanos(yieldMillis)); - - synchronized (scheduleState) { - if (scheduleState.isScheduled()) { - final long schedulingNanos = connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS); - final ScheduledFuture newFuture = flowEngine.scheduleWithFixedDelay(this, yieldNanos, schedulingNanos, TimeUnit.NANOSECONDS); - - scheduleState.replaceFuture(scheduledFuture, newFuture); - futureRef.set(newFuture); - } - } - } - } else if (noWorkYieldNanos > 0L && invocationResult.isYield()) { - // Component itself didn't yield but there was no work to do, so the framework will choose - // to yield the component automatically for a short period of time. - final ScheduledFuture scheduledFuture = futureRef.get(); - if (scheduledFuture == null) { - return; - } - - // If we are able to cancel the future, create a new one and update the ScheduleState so that it has - // an accurate accounting of which futures are outstanding; we must then also update the futureRef - // so that we can do this again the next time that the component is yielded. - if (scheduledFuture.cancel(false)) { - synchronized (scheduleState) { - if (scheduleState.isScheduled()) { - final ScheduledFuture newFuture = flowEngine.scheduleWithFixedDelay(this, noWorkYieldNanos, - connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS), TimeUnit.NANOSECONDS); - - scheduleState.replaceFuture(scheduledFuture, newFuture); - futureRef.set(newFuture); - } - } - } - } - } - }; - - return yieldDetectionRunnable; - } - - @Override - public void doUnschedule(final Connectable connectable, final LifecycleState lifecycleState) { - for (final ScheduledFuture future : lifecycleState.getFutures()) { - // stop scheduling to run but do not interrupt currently running tasks. - future.cancel(false); - } - - logger.info("Stopped scheduling {} to run", connectable); - } - - @Override - public void doUnschedule(final ReportingTaskNode taskNode, final LifecycleState lifecycleState) { - for (final ScheduledFuture future : lifecycleState.getFutures()) { - // stop scheduling to run but do not interrupt currently running tasks. - future.cancel(false); - } - - logger.info("Stopped scheduling {} to run", taskNode.getReportingTask()); - } - - @Override - public void onEvent(final Connectable connectable) { - } - - @Override - public void setMaxThreadCount(final int maxThreads) { - } -} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgent.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgent.java new file mode 100644 index 000000000000..bbedbfe81b15 --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgent.java @@ -0,0 +1,564 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.tasks.ConnectableTask; +import org.apache.nifi.controller.tasks.InvocationResult; +import org.apache.nifi.controller.tasks.ReportingTaskWrapper; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.FormatUtils; +import org.apache.nifi.util.NiFiProperties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.support.CronExpression; + +import java.time.OffsetDateTime; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +/** + * Scheduling agent that runs processors, funnels, ports, and reporting tasks on virtual + * threads rather than on a shared thread pool. A single {@link DynamicSemaphore} bounds + * the number of invocations that may run concurrently across the entire flow. Its permit + * count replaces the old Timer-Driven thread pool size and is adjustable at runtime via + * {@link #setMaxThreadCount(int)}. + *

+ * The agent is used for both {@link SchedulingStrategy#TIMER_DRIVEN} and + * {@link SchedulingStrategy#CRON_DRIVEN} connectables. Each scheduled connectable is given + * one virtual thread per concurrent task; every iteration of the scheduling loop acquires + * a permit, calls {@link ConnectableTask#invoke()}, releases the permit, and then sleeps + * until the next invocation is due. + */ +public class VirtualThreadSchedulingAgent implements SchedulingAgent { + + private static final Logger logger = LoggerFactory.getLogger(VirtualThreadSchedulingAgent.class); + + /** + * Sleeps in the scheduling loop are broken up into chunks of at most this long so that + * a processor that is unscheduled while a scheduling thread is sleeping will exit the + * sleep and observe the state change quickly rather than having to wait for the full + * scheduling period to elapse. + */ + private static final long POLL_INTERVAL_NANOS = TimeUnit.MILLISECONDS.toNanos(25L); + + private final FlowController flowController; + private final RepositoryContextFactory contextFactory; + private final DynamicSemaphore globalSemaphore; + private final long noWorkYieldNanos; + private final Set runningThreads = ConcurrentHashMap.newKeySet(); + private volatile boolean shutdown = false; + private volatile String adminYieldDuration = "1 sec"; + private volatile long adminYieldNanos = TimeUnit.SECONDS.toNanos(1L); + + public VirtualThreadSchedulingAgent(final FlowController flowController, final RepositoryContextFactory contextFactory, + final NiFiProperties nifiProperties, final int maxThreadCount) { + this.flowController = flowController; + this.contextFactory = contextFactory; + this.globalSemaphore = new DynamicSemaphore(maxThreadCount); + + final String boredYieldDuration = nifiProperties.getBoredYieldDuration(); + try { + noWorkYieldNanos = FormatUtils.getTimeDuration(boredYieldDuration, TimeUnit.NANOSECONDS); + } catch (final IllegalArgumentException e) { + throw new RuntimeException("Failed to create VirtualThreadSchedulingAgent because the " + + NiFiProperties.BORED_YIELD_DURATION + " property is set to an invalid time duration: " + boredYieldDuration); + } + } + + /** + * Marks the agent as shut down and interrupts every virtual thread that is currently running a scheduling loop, + * a run-once invocation, or a reporting-task loop. After {@code shutdown()} returns, subsequent calls to any of + * the {@code schedule} methods will fail fast with {@link IllegalStateException}. This method is idempotent and + * may be called multiple times (for example, both on the {@code kill} path in {@code FlowController.shutdown} + * and again from {@code StandardProcessScheduler.shutdown}). Interrupts are a best-effort mechanism: processor + * code that does not honor interruption will continue to run until it completes naturally, but the virtual + * threads themselves are daemon threads and will be reaped when the JVM exits. + */ + @Override + public void shutdown() { + shutdown = true; + int interrupted = 0; + for (final Thread thread : runningThreads) { + thread.interrupt(); + interrupted++; + } + if (interrupted > 0) { + logger.info("Shutdown interrupted {} virtual threads managed by the VirtualThreadSchedulingAgent", interrupted); + } + } + + /** + * Schedules the given connectable to run on virtual threads. This method transitions the + * {@link LifecycleState} to scheduled and then spawns one virtual thread per concurrent task. Each scheduling loop + * captures {@link LifecycleState#getLastStopTime()} at startup and will exit as soon as either + * {@link LifecycleState#isScheduled()} becomes {@code false} or the last-stop-time advances, which is how the + * agent guards against a rapid stop/start cycle leaving prior loops running alongside newly-spawned ones. The + * corresponding {@link #unschedule(Connectable, LifecycleState)} call performs the inverse transition. Calling + * {@code setScheduled(true)} here is intentionally idempotent: the framework scheduler also marks the state as + * scheduled in some code paths (for example, reporting-task startup), and double-calling is harmless because the + * last-stop-time only advances on transitions to the stopped state. + */ + @Override + public void schedule(final Connectable connectable, final LifecycleState lifecycleState) { + final CronExpression cronExpression; + if (connectable.getSchedulingStrategy() == SchedulingStrategy.CRON_DRIVEN) { + final String cronSchedule = connectable.evaluateParameters(connectable.getSchedulingPeriod()); + cronExpression = parseCronExpression(cronSchedule, connectable); + } else { + cronExpression = null; + } + + // setScheduled(true) is idempotent with the state the framework already set before calling into the agent. + // It is done here too so that the agent is self-consistent even for callers that do not follow the framework + // contract, but it also means that if anything below fails we MUST roll it back: otherwise the component + // would be left flagged as scheduled with zero (or a partial set of) scheduling loops actually running. + // Any loops that did start before the failure will exit on their next isActive() check when the rollback + // flips the flag back to false. + lifecycleState.setScheduled(true); + try { + final long startStopTime = lifecycleState.getLastStopTime(); + final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, lifecycleState); + final int taskCount = connectable.getMaxConcurrentTasks(); + + for (int i = 0; i < taskCount; i++) { + final String threadName = buildThreadName(connectable, i); + startTrackedVirtualThread(threadName, () -> runSchedulingLoop(connectable, connectableTask, lifecycleState, startStopTime, cronExpression)); + } + + logger.info("Scheduled {} to run with {} virtual threads", connectable, taskCount); + } catch (final Throwable t) { + lifecycleState.setScheduled(false); + throw t; + } + } + + @Override + public void scheduleOnce(final Connectable connectable, final LifecycleState lifecycleState, final Callable> stopCallback) { + lifecycleState.setScheduled(true); + try { + final ConnectableTask connectableTask = new ConnectableTask(this, connectable, flowController, contextFactory, lifecycleState); + final String threadName = buildThreadName(connectable, 0); + + startTrackedVirtualThread(threadName, () -> runOnce(connectable, connectableTask, stopCallback)); + } catch (final Throwable t) { + lifecycleState.setScheduled(false); + throw t; + } + } + + @Override + public void unschedule(final Connectable connectable, final LifecycleState lifecycleState) { + lifecycleState.setScheduled(false); + logger.info("Stopped scheduling {} to run", connectable); + } + + @Override + public void schedule(final ReportingTaskNode taskNode, final LifecycleState lifecycleState) { + final boolean cronDriven = taskNode.getSchedulingStrategy() == SchedulingStrategy.CRON_DRIVEN; + final CronExpression cronExpression; + final long schedulingNanos; + if (cronDriven) { + cronExpression = parseCronExpression(taskNode.getSchedulingPeriod(), taskNode); + schedulingNanos = 0L; + } else { + cronExpression = null; + schedulingNanos = taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS); + } + + lifecycleState.setScheduled(true); + try { + final long startStopTime = lifecycleState.getLastStopTime(); + final Runnable reportingTaskWrapper = new ReportingTaskWrapper(taskNode, lifecycleState, flowController.getExtensionManager()); + final String threadName = "Reporting Task: " + taskNode.getName(); + + startTrackedVirtualThread(threadName, + () -> runReportingTaskLoop(taskNode, reportingTaskWrapper, schedulingNanos, cronExpression, lifecycleState, startStopTime)); + + logger.info("{} started on virtual thread", taskNode.getReportingTask()); + } catch (final Throwable t) { + lifecycleState.setScheduled(false); + throw t; + } + } + + @Override + public void unschedule(final ReportingTaskNode taskNode, final LifecycleState lifecycleState) { + lifecycleState.setScheduled(false); + logger.info("Stopped scheduling {} to run", taskNode.getReportingTask()); + } + + /** + * @return {@code true} if {@code lifecycleState} is still scheduled and its last-stop-time has not changed since + * the scheduling loop was spawned (meaning the loop is still running against its original scheduling generation). + * Used by the scheduling loops and their polling helpers to detect both the normal stop path (isScheduled flips to + * false) and the rapid stop/start race (a stop increments lastStopTime even if a quick restart flips isScheduled + * back to true before the old loop has observed the stop). + */ + private boolean isActive(final LifecycleState lifecycleState, final long startStopTime) { + return lifecycleState.isScheduled() && lifecycleState.getLastStopTime() == startStopTime; + } + + private static CronExpression parseCronExpression(final String cronSchedule, final Object component) { + try { + return CronExpression.parse(cronSchedule); + } catch (final RuntimeException e) { + throw new IllegalStateException("Cannot schedule " + component + " to run because its scheduling period is not a valid CRON expression: " + cronSchedule, e); + } + } + + @Override + public void onEvent(final Connectable connectable) { + } + + @Override + public synchronized void setMaxThreadCount(final int maxThreads) { + globalSemaphore.setMaxPermits(maxThreads); + logger.info("Global semaphore permits updated to {}", maxThreads); + } + + @Override + public synchronized void incrementMaxThreadCount(final int toAdd) { + if (toAdd == 0) { + return; + } + + final int currentMax = globalSemaphore.getMaxPermits(); + final int newMax = currentMax + toAdd; + if (newMax < 1) { + throw new IllegalStateException("Cannot remove " + (-toAdd) + " permits from global semaphore because there are only " + currentMax + " permits available"); + } + + globalSemaphore.setMaxPermits(newMax); + } + + @Override + public void setAdministrativeYieldDuration(final String duration) { + this.adminYieldNanos = FormatUtils.getTimeDuration(duration, TimeUnit.NANOSECONDS); + this.adminYieldDuration = duration; + } + + @Override + public String getAdministrativeYieldDuration() { + return adminYieldDuration; + } + + @Override + public long getAdministrativeYieldDuration(final TimeUnit timeUnit) { + return timeUnit.convert(adminYieldNanos, TimeUnit.NANOSECONDS); + } + + DynamicSemaphore getGlobalSemaphore() { + return globalSemaphore; + } + + int getRunningThreadCount() { + return runningThreads.size(); + } + + boolean isShutdown() { + return shutdown; + } + + /** + * @return the number of virtual threads that are currently executing a processor or + * reporting-task invocation. A thread counts as active when it is holding a permit on + * the global semaphore, which mirrors the old Timer-Driven engine's active-count + * semantics used by the cluster heartbeat and UI active-thread counter. + */ + public int getActiveThreadCount() { + return globalSemaphore.getInUsePermits(); + } + + /** + * Runs the scheduling loop for a {@link Connectable}. Each iteration acquires a permit from the global semaphore, + * invokes the connectable, releases the permit, and sleeps until the next invocation is due. The entire body of + * the loop is wrapped in a {@code try/catch(Throwable)} so that no exception or error -- including + * {@link Error} subclasses or bugs in the scheduling logic itself -- can cause the virtual thread to terminate + * silently. A processor is expected to continue being triggered as long as it is scheduled, so on any unexpected + * {@link Throwable} the error is logged, an administrative yield is applied to prevent tight-looping on a broken + * task, and the loop continues on its next iteration. + */ + private void runSchedulingLoop(final Connectable connectable, final ConnectableTask connectableTask, + final LifecycleState lifecycleState, final long startStopTime, final CronExpression cronExpression) { + + final boolean cronDriven = cronExpression != null; + + OffsetDateTime nextCronSchedule = null; + if (cronDriven) { + nextCronSchedule = getNextCronSchedule(OffsetDateTime.now(), cronExpression); + if (nextCronSchedule == null) { + logger.warn("CRON expression for {} has no future firings; scheduling loop will exit without invoking the component", connectable); + return; + } + final long initialDelayMillis = Math.max(nextCronSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); + if (initialDelayMillis > 0L) { + sleepWithPolling(TimeUnit.MILLISECONDS.toNanos(initialDelayMillis), lifecycleState, startStopTime); + } + } + + while (isActive(lifecycleState, startStopTime)) { + try { + if (!acquirePermitWithPolling(lifecycleState, startStopTime)) { + return; + } + + final InvocationResult invocationResult; + try { + invocationResult = connectableTask.invoke(); + } finally { + globalSemaphore.release(); + } + + if (cronDriven) { + nextCronSchedule = getNextCronSchedule(nextCronSchedule, cronExpression); + if (nextCronSchedule == null) { + logger.warn("CRON expression for {} has no further firings after the current invocation; scheduling loop is exiting", connectable); + return; + } + final long sleepMillis = Math.max(nextCronSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); + sleepWithPolling(TimeUnit.MILLISECONDS.toNanos(sleepMillis), lifecycleState, startStopTime); + } else { + sleepForSchedulingPeriod(connectable, lifecycleState, startStopTime, invocationResult); + } + } catch (final Throwable t) { + // Nothing in the loop body is expected to throw (ConnectableTask.invoke() catches Throwable itself, and + // acquirePermitWithPolling handles InterruptedException). If anything does escape to here, it must not + // be allowed to kill the scheduling virtual thread: as long as the component remains scheduled we will + // keep triggering it. Log the error, apply an administrative yield to avoid tight-looping on a broken + // invocation, and continue with the next iteration. + try { + connectable.yield(adminYieldNanos, TimeUnit.NANOSECONDS); + } catch (final Throwable yieldError) { + t.addSuppressed(yieldError); + } + + logger.error("Unexpected error in scheduling loop for {}. Will yield for {} and continue.", connectable, adminYieldDuration, t); + } + } + } + + private void runOnce(final Connectable connectable, final ConnectableTask connectableTask, final Callable> stopCallback) { + try { + try { + globalSemaphore.acquire(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + + try { + connectableTask.invoke(); + } finally { + globalSemaphore.release(); + } + } catch (final Throwable t) { + logger.error("Unexpected error running {} once", connectable, t); + } finally { + try { + stopCallback.call(); + } catch (final Throwable t) { + logger.error("Error while stopping {} after running once", connectable, t); + } + } + } + + /** + * Runs the reporting-task scheduling loop. As with {@link #runSchedulingLoop}, the loop body is wrapped in a + * {@code try/catch(Throwable)} so that no unexpected failure can cause the virtual thread to exit. A reporting + * task is expected to continue running until it is unscheduled. When {@code cronExpression} is non-null the loop + * sleeps until the next CRON firing; otherwise it sleeps for {@code schedulingNanos} after each run. + */ + private void runReportingTaskLoop(final ReportingTaskNode taskNode, final Runnable reportingTaskWrapper, final long schedulingNanos, + final CronExpression cronExpression, final LifecycleState lifecycleState, final long startStopTime) { + final boolean cronDriven = cronExpression != null; + + OffsetDateTime nextCronSchedule = null; + if (cronDriven) { + nextCronSchedule = getNextCronSchedule(OffsetDateTime.now(), cronExpression); + if (nextCronSchedule == null) { + logger.warn("CRON expression for {} has no future firings; scheduling loop will exit without invoking the reporting task", + taskNode.getReportingTask()); + return; + } + final long initialDelayMillis = Math.max(nextCronSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); + if (initialDelayMillis > 0L) { + sleepWithPolling(TimeUnit.MILLISECONDS.toNanos(initialDelayMillis), lifecycleState, startStopTime); + } + } + + while (isActive(lifecycleState, startStopTime)) { + try { + if (!acquirePermitWithPolling(lifecycleState, startStopTime)) { + return; + } + + try { + reportingTaskWrapper.run(); + } finally { + globalSemaphore.release(); + } + + if (cronDriven) { + nextCronSchedule = getNextCronSchedule(nextCronSchedule, cronExpression); + if (nextCronSchedule == null) { + logger.warn("CRON expression for {} has no further firings after the current invocation; scheduling loop is exiting", + taskNode.getReportingTask()); + return; + } + final long sleepMillis = Math.max(nextCronSchedule.toInstant().toEpochMilli() - System.currentTimeMillis(), 0L); + sleepWithPolling(TimeUnit.MILLISECONDS.toNanos(sleepMillis), lifecycleState, startStopTime); + } else { + sleepWithPolling(schedulingNanos, lifecycleState, startStopTime); + } + } catch (final Throwable t) { + // ReportingTaskWrapper.run() is expected to catch Throwable itself, so reaching this handler indicates + // an unexpected framework-level failure. Log and continue so that one bad invocation does not + // permanently kill the scheduling loop for this reporting task. + logger.error("Unexpected error in scheduling loop for {}. Continuing on next scheduled interval.", taskNode.getReportingTask(), t); + } + } + } + + private void sleepForSchedulingPeriod(final Connectable connectable, final LifecycleState lifecycleState, final long startStopTime, + final InvocationResult invocationResult) { + final long sleepNanos; + final long yieldExpiration = connectable.getYieldExpiration(); + final long now = System.currentTimeMillis(); + if (yieldExpiration > now) { + sleepNanos = TimeUnit.MILLISECONDS.toNanos(yieldExpiration - now); + } else if (invocationResult.isYield()) { + sleepNanos = noWorkYieldNanos; + } else { + sleepNanos = connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS); + } + + sleepWithPolling(sleepNanos, lifecycleState, startStopTime); + } + + /** + * Attempts to acquire a permit from the global semaphore, waking up periodically to re-check whether the + * scheduling generation this loop belongs to is still active. This prevents a scheduling thread from blocking + * indefinitely on {@link DynamicSemaphore#acquire()} when the flow is heavily loaded and the component has been + * unscheduled; without this, stopping a processor while all global permits were held elsewhere would have to wait + * for one of those other processors to release a permit before the stop could take effect. + * + * @return {@code true} if a permit was acquired (the caller MUST release it), {@code false} if the scheduling + * generation ended (i.e., the component was stopped) or the thread was interrupted before a permit could be acquired + */ + private boolean acquirePermitWithPolling(final LifecycleState lifecycleState, final long startStopTime) { + while (isActive(lifecycleState, startStopTime)) { + try { + if (globalSemaphore.tryAcquire(POLL_INTERVAL_NANOS, TimeUnit.NANOSECONDS)) { + return true; + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return false; + } + } + + return false; + } + + /** + * Sleeps for approximately {@code sleepNanos}, waking up periodically to check whether the scheduling generation + * this loop belongs to is still active. This allows a stop/unschedule request to take effect promptly even when + * the scheduling period is long: rather than calling {@code Thread.sleep(sleepNanos)} and forcing the caller to + * wait out the entire delay, the sleep is broken into chunks of at most {@link #POLL_INTERVAL_NANOS} and the + * generation is re-checked between chunks. Returns immediately if {@code sleepNanos <= 0} so that a zero + * scheduling period (run-as-fast-as-possible) does not incur any artificial delay. + */ + private void sleepWithPolling(final long sleepNanos, final LifecycleState lifecycleState, final long startStopTime) { + if (sleepNanos <= 0L) { + return; + } + + final long sleepExpiration = System.nanoTime() + sleepNanos; + while (isActive(lifecycleState, startStopTime)) { + final long remainingNanos = sleepExpiration - System.nanoTime(); + if (remainingNanos <= 0L) { + return; + } + + final long chunkNanos = Math.min(remainingNanos, POLL_INTERVAL_NANOS); + try { + TimeUnit.NANOSECONDS.sleep(chunkNanos); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + private static String buildThreadName(final Connectable connectable, final int taskIndex) { + return connectable.getName() + "[type=" + connectable.getComponentType() + ", id=" + connectable.getIdentifier() + + ", group=" + connectable.getProcessGroup().getName() + "] task " + taskIndex; + } + + /** + * Starts a virtual thread running {@code task} under the given thread name and tracks it in {@link #runningThreads} + * so that {@link #shutdown()} can interrupt it if the agent is torn down while the task is still running. The thread + * removes itself from the tracking set in a {@code finally} block when the task completes so the set does not grow + * unbounded as processors and reporting tasks are scheduled and unscheduled over the agent's lifetime. + *

+ * The {@code shutdown} flag is re-checked inside the wrapped runnable because there is a small window between + * {@link Thread#start()} and the first line of the virtual thread's body during which a concurrent call to + * {@link #shutdown()} could miss the newly-started thread. The double-check ensures that a task is not started + * (and therefore cannot acquire the global semaphore) after shutdown has been signaled. + * + * @throws IllegalStateException if {@link #shutdown()} has already been called + */ + private void startTrackedVirtualThread(final String threadName, final Runnable task) { + if (shutdown) { + throw new IllegalStateException("VirtualThreadSchedulingAgent has been shut down and cannot accept new work"); + } + + final Runnable trackedTask = () -> { + final Thread self = Thread.currentThread(); + runningThreads.add(self); + try { + if (shutdown) { + return; + } + task.run(); + } finally { + runningThreads.remove(self); + } + }; + + Thread.ofVirtual().name(threadName).start(trackedTask); + } + + /** + * Returns the next firing time for the given CRON expression after {@code currentSchedule}. + * Callers MUST handle a {@code null} return: {@link CronExpression#next(java.time.temporal.Temporal)} + * returns {@code null} when no future firing is reachable within the expression's search horizon, + * for example for a physically impossible expression such as {@code "0 0 0 30 2 ?"} (February 30). + */ + private static OffsetDateTime getNextCronSchedule(final OffsetDateTime currentSchedule, final CronExpression cronExpression) { + // Clock resolution is not millisecond-precise, so ensure the next scheduled time is strictly after the time + // this invocation was supposed to run, otherwise the same moment could be scheduled twice. + final OffsetDateTime now = OffsetDateTime.now(); + return cronExpression.next(now.isAfter(currentSchedule) ? now : currentSchedule); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java deleted file mode 100644 index 3b46865ae51d..000000000000 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/CronSchedulingAgentTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.nifi.controller.scheduling; - -import org.apache.nifi.components.state.StateManager; -import org.apache.nifi.components.state.StateManagerProvider; -import org.apache.nifi.connectable.Connectable; -import org.apache.nifi.controller.FlowController; -import org.apache.nifi.controller.GarbageCollectionLog; -import org.apache.nifi.controller.ReportingTaskNode; -import org.apache.nifi.engine.FlowEngine; -import org.apache.nifi.reporting.ReportingTask; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import java.util.UUID; - -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - -@ExtendWith(MockitoExtension.class) -class CronSchedulingAgentTest { - - private static final String DEFAULT_CRON_EXPRESSION = "* * * * * ?"; - - private static final String INVALID_CRON_EXPRESSION = "* * * * * * 2023"; - - private static final int CONCURRENT_TASKS = 1; - - @Mock - private FlowController flowController; - - @Mock - private FlowEngine flowEngine; - - @Mock - private RepositoryContextFactory repositoryContextFactory; - - @Mock - private Connectable connectable; - - @Mock - private ReportingTaskNode reportingTaskNode; - - @Mock - private ReportingTask reportingTask; - - @Mock - private StateManagerProvider stateManagerProvider; - - @Mock - private StateManager stateManager; - - private CronSchedulingAgent schedulingAgent; - - @BeforeEach - void setSchedulingAgent() { - schedulingAgent = new CronSchedulingAgent(flowController, flowEngine, repositoryContextFactory); - } - - @Test - void testDoScheduleConnectable() { - final String componentId = UUID.randomUUID().toString(); - final LifecycleState lifecycleState = new LifecycleState(componentId); - - when(connectable.evaluateParameters(eq(DEFAULT_CRON_EXPRESSION))).thenReturn(DEFAULT_CRON_EXPRESSION); - when(connectable.getSchedulingPeriod()).thenReturn(DEFAULT_CRON_EXPRESSION); - when(connectable.getMaxConcurrentTasks()).thenReturn(CONCURRENT_TASKS); - when(connectable.getIdentifier()).thenReturn(componentId); - when(flowController.getStateManagerProvider()).thenReturn(stateManagerProvider); - when(stateManagerProvider.getStateManager(eq(componentId))).thenReturn(stateManager); - when(flowController.getGarbageCollectionLog()).thenReturn(mock(GarbageCollectionLog.class)); - - schedulingAgent.doSchedule(connectable, lifecycleState); - } - - @Test - void testDoScheduleReportingTaskNode() { - final String componentId = UUID.randomUUID().toString(); - final LifecycleState lifecycleState = new LifecycleState(componentId); - - when(reportingTaskNode.getSchedulingPeriod()).thenReturn(DEFAULT_CRON_EXPRESSION); - - schedulingAgent.doSchedule(reportingTaskNode, lifecycleState); - } - - @Test - void testDoScheduleReportingTaskNodeCronExpressionInvalid() { - final String componentId = UUID.randomUUID().toString(); - final LifecycleState lifecycleState = new LifecycleState(componentId); - - when(reportingTaskNode.getSchedulingPeriod()).thenReturn(INVALID_CRON_EXPRESSION); - when(reportingTaskNode.getReportingTask()).thenReturn(reportingTask); - when(reportingTask.getIdentifier()).thenReturn(componentId); - - assertThrows(IllegalStateException.class, () -> schedulingAgent.doSchedule(reportingTaskNode, lifecycleState)); - } -} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DynamicSemaphoreTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DynamicSemaphoreTest.java new file mode 100644 index 000000000000..d96c3b0bf2ca --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/DynamicSemaphoreTest.java @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class DynamicSemaphoreTest { + + @Test + void testConstructorRejectsZeroPermits() { + assertThrows(IllegalArgumentException.class, () -> new DynamicSemaphore(0)); + } + + @Test + void testConstructorRejectsNegativePermits() { + assertThrows(IllegalArgumentException.class, () -> new DynamicSemaphore(-1)); + } + + @Test + void testInitialPermitCount() { + final DynamicSemaphore semaphore = new DynamicSemaphore(5); + assertEquals(5, semaphore.getMaxPermits()); + assertEquals(5, semaphore.availablePermits()); + } + + @Test + void testAcquireAndRelease() throws InterruptedException { + final DynamicSemaphore semaphore = new DynamicSemaphore(3); + semaphore.acquire(); + assertEquals(2, semaphore.availablePermits()); + semaphore.acquire(); + assertEquals(1, semaphore.availablePermits()); + semaphore.release(); + assertEquals(2, semaphore.availablePermits()); + semaphore.release(); + assertEquals(3, semaphore.availablePermits()); + } + + @Test + void testConcurrencyBoundedByPermits() throws InterruptedException { + final int permits = 2; + final int threadCount = 5; + final DynamicSemaphore semaphore = new DynamicSemaphore(permits); + final AtomicInteger concurrentCount = new AtomicInteger(0); + final AtomicInteger maxObservedConcurrency = new AtomicInteger(0); + final CountDownLatch allStarted = new CountDownLatch(threadCount); + final CountDownLatch allDone = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + Thread.ofVirtual().start(() -> { + try { + allStarted.countDown(); + semaphore.acquire(); + try { + final int current = concurrentCount.incrementAndGet(); + maxObservedConcurrency.accumulateAndGet(current, Math::max); + Thread.sleep(50); + } finally { + concurrentCount.decrementAndGet(); + semaphore.release(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }); + } + + assertTrue(allDone.await(5, TimeUnit.SECONDS)); + assertTrue(maxObservedConcurrency.get() <= permits, + "Max concurrency " + maxObservedConcurrency.get() + " exceeded permit count " + permits); + } + + @Test + void testSetMaxPermitsIncrease() throws InterruptedException { + final DynamicSemaphore semaphore = new DynamicSemaphore(2); + semaphore.acquire(); + semaphore.acquire(); + assertEquals(0, semaphore.availablePermits()); + + semaphore.setMaxPermits(5); + assertEquals(5, semaphore.getMaxPermits()); + assertEquals(3, semaphore.availablePermits()); + } + + @Test + void testSetMaxPermitsDecrease() { + final DynamicSemaphore semaphore = new DynamicSemaphore(5); + assertEquals(5, semaphore.availablePermits()); + + semaphore.setMaxPermits(2); + assertEquals(2, semaphore.getMaxPermits()); + assertEquals(2, semaphore.availablePermits()); + } + + @Test + void testSetMaxPermitsDecreaseWhileHeld() throws InterruptedException { + final DynamicSemaphore semaphore = new DynamicSemaphore(5); + semaphore.acquire(); + semaphore.acquire(); + semaphore.acquire(); + assertEquals(2, semaphore.availablePermits()); + + semaphore.setMaxPermits(2); + assertEquals(2, semaphore.getMaxPermits()); + assertTrue(semaphore.availablePermits() <= 0, + "Available permits should be non-positive when more permits are held than the new max"); + + semaphore.release(); + semaphore.release(); + semaphore.release(); + assertEquals(2, semaphore.availablePermits()); + } + + @Test + void testSetMaxPermitsRejectsZero() { + final DynamicSemaphore semaphore = new DynamicSemaphore(5); + assertThrows(IllegalArgumentException.class, () -> semaphore.setMaxPermits(0)); + } + + @Test + void testSetMaxPermitsRejectsNegative() { + final DynamicSemaphore semaphore = new DynamicSemaphore(5); + assertThrows(IllegalArgumentException.class, () -> semaphore.setMaxPermits(-1)); + } + + @Test + void testResizeUnblocksWaitingThreads() throws InterruptedException { + final DynamicSemaphore semaphore = new DynamicSemaphore(1); + semaphore.acquire(); + + final CountDownLatch threadBlocked = new CountDownLatch(1); + final CountDownLatch threadAcquired = new CountDownLatch(1); + + Thread.ofVirtual().start(() -> { + try { + threadBlocked.countDown(); + semaphore.acquire(); + threadAcquired.countDown(); + semaphore.release(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + assertTrue(threadBlocked.await(1, TimeUnit.SECONDS)); + Thread.sleep(100); + + semaphore.setMaxPermits(2); + assertTrue(threadAcquired.await(2, TimeUnit.SECONDS), "Thread should have been unblocked by resize"); + + semaphore.release(); + } + + @Test + void testSetMaxPermitsNoChangeIsNoOp() { + final DynamicSemaphore semaphore = new DynamicSemaphore(5); + semaphore.setMaxPermits(5); + assertEquals(5, semaphore.getMaxPermits()); + assertEquals(5, semaphore.availablePermits()); + } + + @Test + void testGetInUsePermitsReflectsAcquireAndRelease() throws InterruptedException { + final DynamicSemaphore semaphore = new DynamicSemaphore(4); + assertEquals(0, semaphore.getInUsePermits()); + + semaphore.acquire(); + assertEquals(1, semaphore.getInUsePermits()); + + semaphore.acquire(); + semaphore.acquire(); + assertEquals(3, semaphore.getInUsePermits()); + + semaphore.release(); + assertEquals(2, semaphore.getInUsePermits()); + + semaphore.release(); + semaphore.release(); + assertEquals(0, semaphore.getInUsePermits()); + } + + @Test + void testGetInUsePermitsConsistentDuringConcurrentResize() throws InterruptedException { + final int initialPermits = 4; + final DynamicSemaphore semaphore = new DynamicSemaphore(initialPermits); + for (int i = 0; i < initialPermits; i++) { + semaphore.acquire(); + } + + final int iterations = 1_000; + final AtomicInteger impossibleReads = new AtomicInteger(0); + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(2); + + // Resizer thread: repeatedly flips the max between 4 and 10. If getInUsePermits were non-atomic, the reader + // could see max=10 while availablePermits was still reflecting the old max=4 state (or vice versa), which + // would yield a transient "in use" count outside the feasible [0, currentMax] window. + Thread.ofVirtual().start(() -> { + try { + start.await(); + for (int i = 0; i < iterations; i++) { + semaphore.setMaxPermits(10); + semaphore.setMaxPermits(4); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + + Thread.ofVirtual().start(() -> { + try { + start.await(); + for (int i = 0; i < iterations; i++) { + final int inUse = semaphore.getInUsePermits(); + if (inUse < 0 || inUse > 10) { + impossibleReads.incrementAndGet(); + } + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + + start.countDown(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + assertEquals(0, impossibleReads.get(), "getInUsePermits should never observe an impossible value during a resize"); + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java index abadd3b0477b..c2a7d4ffe19a 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/TestStandardProcessScheduler.java @@ -243,6 +243,55 @@ public void testReportingTaskDoesntKeepRunningAfterStop() throws InterruptedExce "After unscheduling Reporting Task, task ran an additional " + attemptsAfterStop + " times"); } + /** + * Exercises the full reporting-task startup path with a real {@link VirtualThreadSchedulingAgent} (rather than a + * mock) so that an incorrect contract between {@link StandardProcessScheduler} and the agent is caught at the + * scheduler level. An earlier version of the agent threw {@link IllegalStateException} on entry if the lifecycle + * state was already marked scheduled; that condition is always true by the time the scheduler hands control to + * the agent for reporting tasks, which silently broke all reporting-task scheduling in production. This test will + * fail fast if that regression is ever reintroduced. + */ + @Test + @Timeout(30) + public void testReportingTaskRunsWithVirtualThreadSchedulingAgent() throws InterruptedException, InitializationException { + final FlowController flowController = Mockito.mock(FlowController.class); + Mockito.when(flowController.getExtensionManager()).thenReturn(extensionManager); + Mockito.when(flowController.getReloadComponent()).thenReturn(Mockito.mock(ReloadComponent.class)); + + final StandardProcessScheduler realScheduler = new StandardProcessScheduler(new FlowEngine(1, "VT Unit Test", true), extensionManager, + flowController, () -> serviceProvider, Mockito.mock(ReloadComponent.class), stateMgrProvider, nifiProperties, new StandardLifecycleStateManager()); + + final RepositoryContextFactory contextFactory = Mockito.mock(RepositoryContextFactory.class); + final VirtualThreadSchedulingAgent virtualThreadAgent = new VirtualThreadSchedulingAgent(flowController, contextFactory, nifiProperties, 10); + realScheduler.setSchedulingAgent(SchedulingStrategy.TIMER_DRIVEN, virtualThreadAgent); + + // Build a reporting task that will not fail @OnScheduled so that control actually reaches the agent. + final TestReportingTask task = new TestReportingTask(); + task.failOnScheduled.set(false); + final ReportingInitializationContext config = new StandardReportingInitializationContext(UUID.randomUUID().toString(), "VTTest", SchedulingStrategy.TIMER_DRIVEN, + "10 millis", Mockito.mock(ComponentLog.class), null, KerberosConfig.NOT_CONFIGURED, null); + + task.initialize(config); + final LoggableComponent loggableTask = new LoggableComponent<>(task, systemBundle.getBundleDetails().getCoordinate(), Mockito.mock(TerminationAwareLogger.class)); + final ReportingTaskNode taskNode = new StandardReportingTaskNode(loggableTask, UUID.randomUUID().toString(), flowController, realScheduler, + new StandardValidationContextFactory(null), Mockito.mock(ReloadComponent.class), extensionManager, new SynchronousValidationTrigger()); + taskNode.setSchedulingPeriod("10 millis"); + taskNode.performValidation(); + + realScheduler.schedule(taskNode); + try { + final long deadline = System.currentTimeMillis() + 5_000L; + while (task.triggerCount.get() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(25L); + } + + assertTrue(task.triggerCount.get() >= 1, + "Expected the VirtualThreadSchedulingAgent to invoke the reporting task at least once via the real scheduler wiring; got " + task.triggerCount.get() + " invocations"); + } finally { + realScheduler.unschedule(taskNode); + } + } + @Test @Timeout(60) public void testDisableControllerServiceWithProcessorTryingToStartUsingIt() throws InterruptedException, ExecutionException { diff --git a/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgentTest.java b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgentTest.java new file mode 100644 index 000000000000..7126b4151d6c --- /dev/null +++ b/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/scheduling/VirtualThreadSchedulingAgentTest.java @@ -0,0 +1,776 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.controller.scheduling; + +import org.apache.nifi.components.state.StateManager; +import org.apache.nifi.components.state.StateManagerProvider; +import org.apache.nifi.connectable.Connectable; +import org.apache.nifi.controller.FlowController; +import org.apache.nifi.controller.GarbageCollectionLog; +import org.apache.nifi.controller.ReportingTaskNode; +import org.apache.nifi.controller.ScheduledState; +import org.apache.nifi.controller.repository.FlowFileEventRepository; +import org.apache.nifi.controller.repository.RepositoryContext; +import org.apache.nifi.groups.ProcessGroup; +import org.apache.nifi.nar.ExtensionManager; +import org.apache.nifi.processor.Processor; +import org.apache.nifi.reporting.ReportingContext; +import org.apache.nifi.reporting.ReportingTask; +import org.apache.nifi.scheduling.SchedulingStrategy; +import org.apache.nifi.util.NiFiProperties; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +class VirtualThreadSchedulingAgentTest { + + private static final int MAX_THREADS = 10; + private static final String COMPONENT_ID = UUID.randomUUID().toString(); + + @Mock + private FlowController flowController; + + @Mock + private RepositoryContextFactory contextFactory; + + @Mock + private NiFiProperties nifiProperties; + + @Mock + private StateManagerProvider stateManagerProvider; + + @Mock + private StateManager stateManager; + + @Mock + private GarbageCollectionLog garbageCollectionLog; + + @Mock + private ExtensionManager extensionManager; + + private VirtualThreadSchedulingAgent agent; + + @BeforeEach + void setUp() { + when(nifiProperties.getBoredYieldDuration()).thenReturn("10 millis"); + agent = new VirtualThreadSchedulingAgent(flowController, contextFactory, nifiProperties, MAX_THREADS); + } + + @AfterEach + void tearDown() { + agent.shutdown(); + } + + @Test + void testSetMaxThreadCountAdjustsSemaphore() { + assertEquals(MAX_THREADS, agent.getGlobalSemaphore().getMaxPermits()); + agent.setMaxThreadCount(20); + assertEquals(20, agent.getGlobalSemaphore().getMaxPermits()); + agent.setMaxThreadCount(5); + assertEquals(5, agent.getGlobalSemaphore().getMaxPermits()); + } + + @Test + void testIncrementMaxThreadCountAdjustsSemaphore() { + final int originalPermits = agent.getGlobalSemaphore().getMaxPermits(); + + agent.incrementMaxThreadCount(0); + assertEquals(originalPermits, agent.getGlobalSemaphore().getMaxPermits()); + + agent.incrementMaxThreadCount(5); + assertEquals(originalPermits + 5, agent.getGlobalSemaphore().getMaxPermits()); + + agent.incrementMaxThreadCount(-3); + assertEquals(originalPermits + 2, agent.getGlobalSemaphore().getMaxPermits()); + + assertThrows(IllegalStateException.class, () -> agent.incrementMaxThreadCount(-1000), + "Removing more permits than exist must fail"); + } + + @Test + void testAdministrativeYieldDuration() { + agent.setAdministrativeYieldDuration("5 sec"); + assertEquals("5 sec", agent.getAdministrativeYieldDuration()); + assertEquals(5000L, agent.getAdministrativeYieldDuration(TimeUnit.MILLISECONDS)); + } + + @Test + void testScheduleSpawnsThreadsThatInvoke() throws InterruptedException { + final int concurrentTasks = 3; + final AtomicInteger invocationCount = new AtomicInteger(0); + final CountDownLatch allTasksInvoked = new CountDownLatch(concurrentTasks); + + final Connectable connectable = createMockedConnectable(concurrentTasks, SchedulingStrategy.TIMER_DRIVEN, invocationCount, allTasksInvoked); + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + + scheduleConnectable(connectable, lifecycleState); + + assertTrue(allTasksInvoked.await(5, TimeUnit.SECONDS), + "Expected " + concurrentTasks + " threads to invoke, but only " + (concurrentTasks - allTasksInvoked.getCount()) + " did"); + assertTrue(invocationCount.get() >= concurrentTasks, + "Expected at least " + concurrentTasks + " invocations but got " + invocationCount.get()); + + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(100); + } + + @Test + void testUnscheduleExitsQuickly() throws InterruptedException { + final Connectable connectable = createMockedConnectable(2, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + + scheduleConnectable(connectable, lifecycleState); + Thread.sleep(300); + + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(250); + + assertEquals(0, lifecycleState.getActiveThreadCount(), + "Active threads should be 0 after unschedule settles"); + } + + @Test + void testSemaphoreLimitsConcurrentInvocations() throws InterruptedException { + final int semaphorePermits = 2; + final int totalThreads = 5; + agent.setMaxThreadCount(semaphorePermits); + + final AtomicInteger concurrentCount = new AtomicInteger(0); + final AtomicInteger maxObservedConcurrency = new AtomicInteger(0); + final CountDownLatch allDone = new CountDownLatch(totalThreads); + + for (int i = 0; i < totalThreads; i++) { + Thread.ofVirtual().start(() -> { + try { + agent.getGlobalSemaphore().acquire(); + try { + final int current = concurrentCount.incrementAndGet(); + maxObservedConcurrency.accumulateAndGet(current, Math::max); + Thread.sleep(50); + } finally { + concurrentCount.decrementAndGet(); + agent.getGlobalSemaphore().release(); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }); + } + + assertTrue(allDone.await(5, TimeUnit.SECONDS)); + assertTrue(maxObservedConcurrency.get() <= semaphorePermits, + "Max concurrency " + maxObservedConcurrency.get() + " exceeded semaphore permits " + semaphorePermits); + } + + @Test + void testScheduleOnceInvokesAndStops() throws InterruptedException { + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + final CountDownLatch stopCallbackInvoked = new CountDownLatch(1); + + lifecycleState.setScheduled(true); + agent.scheduleOnce(connectable, lifecycleState, () -> { + stopCallbackInvoked.countDown(); + return null; + }); + + assertTrue(stopCallbackInvoked.await(5, TimeUnit.SECONDS), + "Stop callback should have been invoked after scheduleOnce"); + } + + @Test + void testActiveThreadCountReflectsSemaphoreUsage() throws InterruptedException { + agent.setMaxThreadCount(4); + assertEquals(0, agent.getActiveThreadCount()); + + final CountDownLatch acquired = new CountDownLatch(2); + final CountDownLatch release = new CountDownLatch(1); + for (int i = 0; i < 2; i++) { + Thread.ofVirtual().start(() -> { + try { + agent.getGlobalSemaphore().acquire(); + acquired.countDown(); + release.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + agent.getGlobalSemaphore().release(); + } + }); + } + + assertTrue(acquired.await(5, TimeUnit.SECONDS)); + assertEquals(2, agent.getActiveThreadCount()); + release.countDown(); + } + + @Test + void testUnscheduleExitsWhenSemaphoreFullyContended() throws InterruptedException { + agent.setMaxThreadCount(1); + + final CountDownLatch releaseHeldPermit = new CountDownLatch(1); + final CountDownLatch permitAcquired = new CountDownLatch(1); + final Thread permitHolder = Thread.ofVirtual().start(() -> { + try { + agent.getGlobalSemaphore().acquire(); + permitAcquired.countDown(); + releaseHeldPermit.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + agent.getGlobalSemaphore().release(); + } + }); + assertTrue(permitAcquired.await(2, TimeUnit.SECONDS), "Failed to acquire permit for test setup"); + + final AtomicInteger invocationCount = new AtomicInteger(0); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, invocationCount, new CountDownLatch(0)); + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + Thread.sleep(100); + assertEquals(0, invocationCount.get(), "No invocation should have run while the permit was held elsewhere"); + + final long unscheduleStart = System.nanoTime(); + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(150); + final long elapsedMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - unscheduleStart); + + releaseHeldPermit.countDown(); + permitHolder.join(1_000L); + Thread.sleep(100); + + assertEquals(0, invocationCount.get(), + "Scheduling loop should have exited after unschedule even though the permit was held; subsequent release must not trigger an invocation"); + assertTrue(elapsedMillis < 1_000L, "Unschedule did not propagate within 1s; measured " + elapsedMillis + "ms"); + } + + @Test + void testConcurrentIncrementMaxThreadCountIsThreadSafe() throws InterruptedException { + agent.setMaxThreadCount(100); + + final int threadCount = 20; + final int incrementsPerThread = 50; + final CountDownLatch start = new CountDownLatch(1); + final CountDownLatch done = new CountDownLatch(threadCount); + + for (int i = 0; i < threadCount; i++) { + Thread.ofVirtual().start(() -> { + try { + start.await(); + for (int j = 0; j < incrementsPerThread; j++) { + agent.incrementMaxThreadCount(1); + } + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + done.countDown(); + } + }); + } + + start.countDown(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + + assertEquals(100 + threadCount * incrementsPerThread, agent.getGlobalSemaphore().getMaxPermits(), + "Lost increments imply a race condition in incrementMaxThreadCount"); + } + + @Test + void testSchedulingLoopSurvivesThrowableEscapingInvoke() throws InterruptedException { + final AtomicInteger invocationCount = new AtomicInteger(0); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, invocationCount, new CountDownLatch(0)); + + final AtomicInteger schedulingPeriodCalls = new AtomicInteger(0); + when(connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS)).thenAnswer(invocation -> { + final int call = schedulingPeriodCalls.incrementAndGet(); + if (call <= 2) { + throw new OutOfMemoryError("Simulated Error " + call); + } + return TimeUnit.MILLISECONDS.toNanos(10L); + }); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + final long deadline = System.currentTimeMillis() + 5_000L; + while (invocationCount.get() < 5 && System.currentTimeMillis() < deadline) { + Thread.sleep(50L); + } + + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(100L); + + assertTrue(invocationCount.get() >= 5, + "Scheduling loop should have continued past the Errors thrown during scheduling; observed only " + invocationCount.get() + " invocations"); + assertEquals(MAX_THREADS, agent.getGlobalSemaphore().availablePermits(), + "All permits should be returned to the semaphore even after Throwables escape the loop body"); + } + + @Test + void testInvocationExceptionStillReleasesPermit() throws InterruptedException { + agent.setMaxThreadCount(2); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + final AtomicInteger invocationCount = new AtomicInteger(0); + doAnswer(invocation -> { + final int count = invocationCount.incrementAndGet(); + if (count <= 3) { + throw new RuntimeException("Simulated failure " + count); + } + return null; + }).when(connectable).onTrigger(any(), any()); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + Thread.sleep(500); + assertEquals(2, agent.getGlobalSemaphore().availablePermits(), + "All permits should be returned to the semaphore after invocation exceptions"); + + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(100); + } + + @Test + void testCronScheduleSpawnsThreadsAndInvokes() throws InterruptedException { + final AtomicInteger invocationCount = new AtomicInteger(0); + final CountDownLatch atLeastOneInvocation = new CountDownLatch(1); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.CRON_DRIVEN, invocationCount, atLeastOneInvocation); + when(connectable.getSchedulingPeriod()).thenReturn("* * * * * ?"); + when(connectable.evaluateParameters(eq("* * * * * ?"))).thenReturn("* * * * * ?"); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + assertTrue(atLeastOneInvocation.await(3, TimeUnit.SECONDS), + "CRON-scheduled connectable should have invoked at least once within 3 seconds"); + + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(100); + } + + @Test + void testCronDrivenReportingTaskIsScheduled() throws InterruptedException { + final AtomicInteger invocationCount = new AtomicInteger(0); + final ReportingTask reportingTask = mock(ReportingTask.class); + doAnswer(invocation -> { + invocationCount.incrementAndGet(); + return null; + }).when(reportingTask).onTrigger(any()); + + final ReportingTaskNode taskNode = mock(ReportingTaskNode.class); + when(taskNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.CRON_DRIVEN); + when(taskNode.getSchedulingPeriod()).thenReturn("* * * * * ?"); + when(taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS)) + .thenThrow(new IllegalArgumentException("CRON expression cannot be parsed as a time duration")); + when(taskNode.getReportingTask()).thenReturn(reportingTask); + when(taskNode.getReportingContext()).thenReturn(mock(ReportingContext.class)); + when(taskNode.getIdentifier()).thenReturn(COMPONENT_ID); + when(taskNode.getName()).thenReturn("TestReporter"); + when(flowController.getExtensionManager()).thenReturn(extensionManager); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + // Mirror the real framework flow: the scheduler transitions the lifecycle state to scheduled before the + // agent is invoked. A prior version of the agent threw IllegalStateException here, which silently broke all + // reporting-task scheduling in production because StandardProcessScheduler.schedule(ReportingTaskNode, ...) + // calls setScheduled(true) synchronously before handing control to the agent. + lifecycleState.setScheduled(true); + agent.schedule(taskNode, lifecycleState); + + try { + final long deadline = System.currentTimeMillis() + 3_000L; + while (invocationCount.get() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(50L); + } + + assertTrue(invocationCount.get() >= 1, + "CRON-driven reporting task should have been invoked at least once; got " + invocationCount.get()); + } finally { + lifecycleState.setScheduled(false); + agent.unschedule(taskNode, lifecycleState); + Thread.sleep(100L); + } + } + + /** + * Verifies that a CRON expression with no reachable future firings (for example Feb 30, which does not exist) + * causes the scheduling loop to exit cleanly rather than tight-looping on NPEs thrown from + * {@link org.springframework.scheduling.support.CronExpression#next(java.time.temporal.Temporal)}. + */ + @Test + void testCronConnectableExitsCleanlyWhenNoFutureFirings() throws InterruptedException { + final String unreachableCron = "0 0 0 30 2 ?"; + final AtomicInteger invocationCount = new AtomicInteger(0); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.CRON_DRIVEN, invocationCount, new CountDownLatch(0)); + when(connectable.getSchedulingPeriod()).thenReturn(unreachableCron); + when(connectable.evaluateParameters(eq(unreachableCron))).thenReturn(unreachableCron); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + Thread.sleep(500L); + + assertEquals(0, invocationCount.get(), + "An unreachable CRON expression must not trigger any invocations"); + assertEquals(MAX_THREADS, agent.getGlobalSemaphore().availablePermits(), + "Scheduling loop must release any acquired permits before exiting; a tight loop would hold a permit"); + + unscheduleConnectable(connectable, lifecycleState); + } + + @Test + void testCronReportingTaskExitsCleanlyWhenNoFutureFirings() throws InterruptedException { + final String unreachableCron = "0 0 0 30 2 ?"; + final AtomicInteger invocationCount = new AtomicInteger(0); + final ReportingTask reportingTask = mock(ReportingTask.class); + doAnswer(invocation -> { + invocationCount.incrementAndGet(); + return null; + }).when(reportingTask).onTrigger(any()); + + final ReportingTaskNode taskNode = mock(ReportingTaskNode.class); + when(taskNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.CRON_DRIVEN); + when(taskNode.getSchedulingPeriod()).thenReturn(unreachableCron); + when(taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS)) + .thenThrow(new IllegalArgumentException("CRON expression cannot be parsed as a time duration")); + when(taskNode.getReportingTask()).thenReturn(reportingTask); + when(taskNode.getReportingContext()).thenReturn(mock(ReportingContext.class)); + when(taskNode.getIdentifier()).thenReturn(COMPONENT_ID); + when(taskNode.getName()).thenReturn("TestReporter"); + when(flowController.getExtensionManager()).thenReturn(extensionManager); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + lifecycleState.setScheduled(true); + agent.schedule(taskNode, lifecycleState); + + Thread.sleep(500L); + + assertEquals(0, invocationCount.get(), + "An unreachable CRON expression must not trigger any reporting-task invocations"); + assertEquals(MAX_THREADS, agent.getGlobalSemaphore().availablePermits(), + "Scheduling loop must release any acquired permits before exiting"); + + lifecycleState.setScheduled(false); + agent.unschedule(taskNode, lifecycleState); + } + + @Test + void testAgentAcceptsAlreadyScheduledLifecycleStateForReportingTask() throws InterruptedException { + final AtomicInteger invocationCount = new AtomicInteger(0); + final ReportingTask reportingTask = mock(ReportingTask.class); + doAnswer(invocation -> { + invocationCount.incrementAndGet(); + return null; + }).when(reportingTask).onTrigger(any()); + + final ReportingTaskNode taskNode = mock(ReportingTaskNode.class); + when(taskNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN); + when(taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS)).thenReturn(TimeUnit.MILLISECONDS.toNanos(50L)); + when(taskNode.getReportingTask()).thenReturn(reportingTask); + when(taskNode.getReportingContext()).thenReturn(mock(ReportingContext.class)); + when(taskNode.getIdentifier()).thenReturn(COMPONENT_ID); + when(taskNode.getName()).thenReturn("TestReporter"); + when(flowController.getExtensionManager()).thenReturn(extensionManager); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + + lifecycleState.setScheduled(true); + agent.schedule(taskNode, lifecycleState); + + try { + final long deadline = System.currentTimeMillis() + 3_000L; + while (invocationCount.get() == 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(25L); + } + assertTrue(invocationCount.get() >= 1, + "Agent must accept a LifecycleState that is already marked scheduled (the real scheduler does that before " + + "handing control to the agent); observed " + invocationCount.get() + " invocations"); + } finally { + lifecycleState.setScheduled(false); + agent.unschedule(taskNode, lifecycleState); + Thread.sleep(100L); + } + } + + @Test + void testLastStopTimeAdvancesOnEveryStop() { + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + lifecycleState.setScheduled(true); + final long initialStopTime = lifecycleState.getLastStopTime(); + + lifecycleState.setScheduled(false); + final long firstStopTime = lifecycleState.getLastStopTime(); + assertTrue(firstStopTime > initialStopTime, "lastStopTime must advance on every stop; was " + initialStopTime + ", now " + firstStopTime); + + lifecycleState.setScheduled(true); + assertEquals(firstStopTime, lifecycleState.getLastStopTime(), "lastStopTime must not change when (re)starting"); + + lifecycleState.setScheduled(false); + final long secondStopTime = lifecycleState.getLastStopTime(); + assertTrue(secondStopTime > firstStopTime, + "lastStopTime must strictly advance even for rapid stop/start/stop; was " + firstStopTime + ", now " + secondStopTime); + } + + @Test + void testRapidStopStartDoesNotLeakSchedulingThreads() throws InterruptedException { + final Set observedThreads = ConcurrentHashMap.newKeySet(); + final AtomicInteger invocationCount = new AtomicInteger(0); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, invocationCount, new CountDownLatch(0)); + when(connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS)).thenReturn(TimeUnit.MILLISECONDS.toNanos(500L)); + doAnswer(invocation -> { + observedThreads.add(Thread.currentThread()); + invocationCount.incrementAndGet(); + return null; + }).when(connectable).onTrigger(any(), any()); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + try { + final long deadline = System.currentTimeMillis() + 2_000L; + while (observedThreads.isEmpty() && System.currentTimeMillis() < deadline) { + Thread.sleep(25L); + } + assertFalse(observedThreads.isEmpty(), "First schedule should have produced at least one invocation"); + final Set firstCycleThreads = new HashSet<>(observedThreads); + + unscheduleConnectable(connectable, lifecycleState); + scheduleConnectable(connectable, lifecycleState); + + final long leakDeadline = System.currentTimeMillis() + 2_000L; + while (System.currentTimeMillis() < leakDeadline) { + boolean allExited = true; + for (final Thread original : firstCycleThreads) { + if (original.isAlive()) { + allExited = false; + break; + } + } + if (allExited) { + return; + } + Thread.sleep(50L); + } + + for (final Thread original : firstCycleThreads) { + if (original.isAlive()) { + fail("Scheduling thread " + original + " from the first schedule should have exited after unschedule, " + + "but remained alive after a subsequent schedule. This indicates a leaked scheduling loop."); + } + } + } finally { + unscheduleConnectable(connectable, lifecycleState); + Thread.sleep(200L); + } + } + + /** + * Drives the agent through the same lifecycle contract enforced by {@link StandardProcessScheduler}: the framework + * transitions the {@link LifecycleState} to scheduled before invoking the agent, and the agent is a pure consumer + * of that state. Tests must therefore follow the same contract rather than relying on the agent to mutate + * {@code isScheduled} itself. + */ + private void scheduleConnectable(final Connectable connectable, final LifecycleState lifecycleState) { + lifecycleState.setScheduled(true); + agent.schedule(connectable, lifecycleState); + } + + private void unscheduleConnectable(final Connectable connectable, final LifecycleState lifecycleState) { + lifecycleState.setScheduled(false); + agent.unschedule(connectable, lifecycleState); + } + + private Connectable createMockedConnectable(final int maxConcurrentTasks, final SchedulingStrategy schedulingStrategy, + final AtomicInteger invocationCount, final CountDownLatch invocationLatch) { + final Connectable connectable = mock(Connectable.class); + when(connectable.getIdentifier()).thenReturn(COMPONENT_ID); + when(connectable.getName()).thenReturn("TestProcessor"); + when(connectable.getMaxConcurrentTasks()).thenReturn(maxConcurrentTasks); + when(connectable.getIncomingConnections()).thenReturn(Collections.emptyList()); + when(connectable.getRelationships()).thenReturn(Collections.emptySet()); + when(connectable.getSchedulingPeriod(TimeUnit.MILLISECONDS)).thenReturn(100L); + when(connectable.getSchedulingPeriod(TimeUnit.NANOSECONDS)).thenReturn(TimeUnit.MILLISECONDS.toNanos(100L)); + when(connectable.getYieldExpiration()).thenReturn(0L); + when(connectable.getSchedulingStrategy()).thenReturn(schedulingStrategy); + when(connectable.isTriggerWhenEmpty()).thenReturn(true); + when(connectable.isIsolated()).thenReturn(false); + when(connectable.getRunDuration(TimeUnit.NANOSECONDS)).thenReturn(0L); + when(connectable.isSessionBatchingSupported()).thenReturn(false); + when(connectable.getScheduledState()).thenReturn(ScheduledState.RUNNING); + + final Processor runnableComponent = mock(Processor.class); + when(connectable.getRunnableComponent()).thenReturn(runnableComponent); + + doAnswer(invocation -> { + invocationCount.incrementAndGet(); + invocationLatch.countDown(); + return null; + }).when(connectable).onTrigger(any(), any()); + + final ProcessGroup processGroup = mock(ProcessGroup.class); + when(processGroup.getName()).thenReturn("RootGroup"); + when(processGroup.getParent()).thenReturn(null); + when(connectable.getProcessGroup()).thenReturn(processGroup); + + when(flowController.getStateManagerProvider()).thenReturn(stateManagerProvider); + when(stateManagerProvider.getStateManager(eq(COMPONENT_ID))).thenReturn(stateManager); + when(flowController.getGarbageCollectionLog()).thenReturn(garbageCollectionLog); + when(flowController.getPerformanceTrackingPercentage()).thenReturn(0); + when(flowController.getExtensionManager()).thenReturn(extensionManager); + + final RepositoryContext repositoryContext = mock(RepositoryContext.class); + when(repositoryContext.isRelationshipAvailabilitySatisfied(0)).thenReturn(true); + final FlowFileEventRepository flowFileEventRepository = mock(FlowFileEventRepository.class); + when(repositoryContext.getFlowFileEventRepository()).thenReturn(flowFileEventRepository); + when(contextFactory.newProcessContext(eq(connectable), any(AtomicLong.class))).thenReturn(repositoryContext); + + return connectable; + } + + /** + * If a failure occurs after the agent has flipped {@code lifecycleState.setScheduled(true)}, the agent must roll + * the flag back so the component is not left flagged as running with zero (or a partial set of) scheduling loops. + * This covers the three schedule entry points. + */ + @Test + void testScheduleRollsBackScheduledFlagOnFailure() { + final Connectable connectable = createMockedConnectable(2, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + when(connectable.getMaxConcurrentTasks()).thenThrow(new RuntimeException("Simulated failure during schedule")); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + final RuntimeException thrown = assertThrows(RuntimeException.class, () -> agent.schedule(connectable, lifecycleState)); + assertEquals("Simulated failure during schedule", thrown.getMessage()); + assertFalse(lifecycleState.isScheduled(), + "schedule() must roll setScheduled(true) back to false when the scheduling setup fails"); + assertEquals(0, agent.getRunningThreadCount(), "No virtual threads should remain after a failed schedule"); + } + + @Test + void testScheduleOnceRollsBackScheduledFlagOnFailure() { + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + when(connectable.getProcessGroup()).thenThrow(new RuntimeException("Simulated failure building thread name")); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + assertThrows(RuntimeException.class, () -> agent.scheduleOnce(connectable, lifecycleState, () -> null)); + assertFalse(lifecycleState.isScheduled(), + "scheduleOnce() must roll setScheduled(true) back to false when the scheduling setup fails"); + } + + @Test + void testScheduleReportingTaskRollsBackScheduledFlagOnFailure() { + final ReportingTaskNode taskNode = mock(ReportingTaskNode.class); + when(taskNode.getSchedulingStrategy()).thenReturn(SchedulingStrategy.TIMER_DRIVEN); + when(taskNode.getSchedulingPeriod(TimeUnit.NANOSECONDS)).thenReturn(TimeUnit.MILLISECONDS.toNanos(50L)); + when(taskNode.getName()).thenThrow(new RuntimeException("Simulated failure building thread name")); + when(flowController.getExtensionManager()).thenReturn(extensionManager); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + assertThrows(RuntimeException.class, () -> agent.schedule(taskNode, lifecycleState)); + assertFalse(lifecycleState.isScheduled(), "schedule(ReportingTaskNode) must roll setScheduled(true) back to false when the scheduling setup fails"); + } + + /** + * Verifies the shutdown contract: {@code shutdown()} interrupts every virtual thread owned by the agent, those + * threads exit cleanly, and subsequent schedule attempts fail fast. A hanging processor onTrigger is used to + * ensure the virtual thread is actively executing inside the loop body when shutdown happens, rather than just + * sleeping between invocations, because the interesting case is delivering an interrupt to in-flight processor + * work that would otherwise run until JVM exit. + */ + @Test + void testShutdownInterruptsRunningVirtualThreads() throws InterruptedException { + final CountDownLatch invocationStarted = new CountDownLatch(1); + final CountDownLatch releaseInvocation = new CountDownLatch(1); + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + doAnswer(invocation -> { + invocationStarted.countDown(); + try { + releaseInvocation.await(); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + } + + return null; + }).when(connectable).onTrigger(any(), any()); + + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + scheduleConnectable(connectable, lifecycleState); + + assertTrue(invocationStarted.await(5, TimeUnit.SECONDS), "The virtual thread should have entered onTrigger before shutdown is triggered"); + assertTrue(agent.getRunningThreadCount() >= 1, "Agent should be tracking at least one running virtual thread"); + + agent.shutdown(); + + final long deadline = System.currentTimeMillis() + 5_000L; + while (agent.getRunningThreadCount() > 0 && System.currentTimeMillis() < deadline) { + Thread.sleep(25L); + } + + assertEquals(0, agent.getRunningThreadCount(), + "shutdown() must interrupt running virtual threads so they exit; still running: " + agent.getRunningThreadCount()); + assertTrue(agent.isShutdown(), "shutdown flag must remain set after shutdown()"); + } + + @Test + void testScheduleAfterShutdownFailsFast() { + agent.shutdown(); + + final Connectable connectable = createMockedConnectable(1, SchedulingStrategy.TIMER_DRIVEN, new AtomicInteger(), new CountDownLatch(0)); + final LifecycleState lifecycleState = new LifecycleState(COMPONENT_ID); + + assertThrows(IllegalStateException.class, () -> agent.schedule(connectable, lifecycleState), + "schedule() must refuse to start new work after the agent has been shut down"); + assertFalse(lifecycleState.isScheduled(), "A rejected schedule() after shutdown must not leave the lifecycle state flagged as scheduled"); + } + + @Test + void testShutdownIsIdempotent() { + agent.shutdown(); + agent.shutdown(); + assertTrue(agent.isShutdown()); + assertEquals(0, agent.getRunningThreadCount()); + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/scheduling/VirtualThreadSchedulingIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/scheduling/VirtualThreadSchedulingIT.java new file mode 100644 index 000000000000..a54fd40c5a41 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/scheduling/VirtualThreadSchedulingIT.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.scheduling; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Soak test for the {@link org.apache.nifi.controller.scheduling.VirtualThreadSchedulingAgent}. + * Stands up a GenerateFlowFile -> TerminateFlowFile flow configured to emit + * {@value #TARGET_FLOWFILES} FlowFiles total and waits for every FlowFile to flow through + * the system. Having the virtual-thread scheduling loop execute this many iterations without + * deadlocking or leaking permits gives confidence that the agent is stable under sustained load. + */ +public class VirtualThreadSchedulingIT extends NiFiSystemIT { + + private static final Logger logger = LoggerFactory.getLogger(VirtualThreadSchedulingIT.class); + + private static final int TARGET_FLOWFILES = 1_000_000; + private static final int GENERATE_BATCH_SIZE = 10_000; + private static final int GENERATE_CONCURRENT_TASKS = 2; + private static final int TERMINATE_CONCURRENT_TASKS = 8; + private static final int DRAINED_POLLS_REQUIRED = 20; + private static final long POLL_DELAY_MILLIS = 500L; + + @Test + @Timeout(value = 10, unit = TimeUnit.MINUTES) + public void testMillionFlowFilesThroughVirtualThreads() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + final ConnectionEntity generateToTerminate = getClientUtil().createConnection(generate, terminate, "success"); + getClientUtil().updateConnectionBackpressure(generateToTerminate, 200_000L, 1_000_000_000L); + + final Map generateProperties = new HashMap<>(); + generateProperties.put("File Size", "0 B"); + generateProperties.put("Batch Size", String.valueOf(GENERATE_BATCH_SIZE)); + generateProperties.put("Max FlowFiles", String.valueOf(TARGET_FLOWFILES)); + generateProperties.put("State Scope", "LOCAL"); + getClientUtil().updateProcessorProperties(generate, generateProperties); + + final ProcessorConfigDTO generateConfig = new ProcessorConfigDTO(); + generateConfig.setSchedulingPeriod("0 sec"); + generateConfig.setConcurrentlySchedulableTaskCount(GENERATE_CONCURRENT_TASKS); + final ProcessorEntity configuredGenerate = getClientUtil().updateProcessorConfig(generate, generateConfig); + + final ProcessorConfigDTO terminateConfig = new ProcessorConfigDTO(); + terminateConfig.setSchedulingPeriod("0 sec"); + terminateConfig.setConcurrentlySchedulableTaskCount(TERMINATE_CONCURRENT_TASKS); + final ProcessorEntity configuredTerminate = getClientUtil().updateProcessorConfig(terminate, terminateConfig); + + getClientUtil().startProcessor(configuredGenerate); + getClientUtil().startProcessor(configuredTerminate); + + try { + waitForSustainedDrain(generateToTerminate.getId()); + } finally { + getClientUtil().stopProcessor(configuredGenerate); + getClientUtil().stopProcessor(configuredTerminate); + getClientUtil().waitForStoppedProcessor(configuredGenerate.getId()); + getClientUtil().waitForStoppedProcessor(configuredTerminate.getId()); + } + + assertEquals(0, getConnectionQueueSize(generateToTerminate.getId()), + "All generated FlowFiles should have been terminated"); + } + + /** + * First waits for the connection queue to be observed as non-empty (so that we know the + * generator is actually producing FlowFiles), then waits for the connection queue to stay + * empty for {@link #DRAINED_POLLS_REQUIRED} consecutive polls. Because the generator is + * configured with a bounded Max FlowFiles, a sustained empty queue indicates that all + * {@value #TARGET_FLOWFILES} FlowFiles have been emitted and drained. + */ + private void waitForSustainedDrain(final String connectionId) throws InterruptedException { + boolean queueWasObservedNonEmpty = false; + int consecutiveDrainedPolls = 0; + long lastLogTime = 0L; + while (!queueWasObservedNonEmpty || consecutiveDrainedPolls < DRAINED_POLLS_REQUIRED) { + final int queueSize; + try { + queueSize = getConnectionQueueSize(connectionId); + } catch (final Exception e) { + logger.debug("Failed to read queue size; continuing to poll", e); + Thread.sleep(POLL_DELAY_MILLIS); + continue; + } + + if (queueSize > 0) { + queueWasObservedNonEmpty = true; + consecutiveDrainedPolls = 0; + } else if (queueWasObservedNonEmpty) { + consecutiveDrainedPolls++; + } + + final long now = System.currentTimeMillis(); + if (now - lastLogTime >= 2_000L) { + logger.info("Current queue size: {} (drained polls: {}/{}, saw non-empty: {})", + queueSize, consecutiveDrainedPolls, DRAINED_POLLS_REQUIRED, queueWasObservedNonEmpty); + lastLogTime = now; + } + + Thread.sleep(POLL_DELAY_MILLIS); + } + } +} diff --git a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/scheduling/VirtualThreadStartStopCycleIT.java b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/scheduling/VirtualThreadStartStopCycleIT.java new file mode 100644 index 000000000000..65dee6dc1c13 --- /dev/null +++ b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/scheduling/VirtualThreadStartStopCycleIT.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.tests.system.scheduling; + +import org.apache.nifi.tests.system.NiFiSystemIT; +import org.apache.nifi.toolkit.client.NiFiClientException; +import org.apache.nifi.web.api.dto.ProcessorConfigDTO; +import org.apache.nifi.web.api.entity.ConnectionEntity; +import org.apache.nifi.web.api.entity.ProcessorEntity; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** + * Exercises the virtual-thread scheduling agent's start/stop behavior across many cycles. Configures a + * {@code GenerateFlowFile} processor to emit exactly one FlowFile per scheduling session and then starts and stops + * the processor {@value #START_STOP_CYCLES} times, verifying the queue size after each cycle. This catches regressions + * in the start/stop lifecycle that a single long-running soak test would not: + * + *

    + *
  • A leaked scheduling loop would cause the queue size to advance by more than one per cycle.
  • + *
  • A scheduling loop that fails to start would cause the queue size to not advance at all.
  • + *
  • Subtle bugs in the {@link org.apache.nifi.controller.scheduling.VirtualThreadSchedulingAgent} lifecycle (for + * example, never calling {@code setScheduled(false)} on unschedule and therefore never exiting the loop) would + * either produce extra FlowFiles on restart or hang the test.
  • + *
+ * + * Once all cycles have completed and the expected number of FlowFiles is queued, {@code TerminateFlowFile} is started + * and the test waits for the queue to drain back to zero, confirming that the final FlowFile count is correct and the + * flow is in a clean state. + */ +public class VirtualThreadStartStopCycleIT extends NiFiSystemIT { + + private static final int START_STOP_CYCLES = 25; + + @Test + @Timeout(value = 5, unit = TimeUnit.MINUTES) + public void testRepeatedStartStopProducesExpectedQueueCount() throws NiFiClientException, IOException, InterruptedException { + final ProcessorEntity generate = getClientUtil().createProcessor("GenerateFlowFile"); + final ProcessorEntity terminate = getClientUtil().createProcessor("TerminateFlowFile"); + final ConnectionEntity generateToTerminate = getClientUtil().createConnection(generate, terminate, "success"); + + // Raise back-pressure well above the expected queue size so that the generator is never blocked from producing + // its one FlowFile per cycle, regardless of load on the test agent. + getClientUtil().updateConnectionBackpressure(generateToTerminate, 10_000L, 10_000_000L); + + final Map generateProperties = new HashMap<>(); + generateProperties.put("File Size", "0 B"); + generateProperties.put("Batch Size", "1"); + generateProperties.put("Max FlowFiles", "1"); + generateProperties.put("State Scope", "LOCAL"); + getClientUtil().updateProcessorProperties(generate, generateProperties); + + final ProcessorConfigDTO generateConfig = new ProcessorConfigDTO(); + generateConfig.setSchedulingPeriod("0 sec"); + generateConfig.setConcurrentlySchedulableTaskCount(1); + final ProcessorEntity configuredGenerate = getClientUtil().updateProcessorConfig(generate, generateConfig); + + final ProcessorConfigDTO terminateConfig = new ProcessorConfigDTO(); + terminateConfig.setSchedulingPeriod("0 sec"); + terminateConfig.setConcurrentlySchedulableTaskCount(1); + final ProcessorEntity configuredTerminate = getClientUtil().updateProcessorConfig(terminate, terminateConfig); + + for (int cycle = 1; cycle <= START_STOP_CYCLES; cycle++) { + getClientUtil().startProcessor(configuredGenerate); + waitForQueueCount(generateToTerminate, cycle); + getClientUtil().stopProcessor(configuredGenerate); + + assertEquals(cycle, getConnectionQueueSize(generateToTerminate.getId()), + "After start/stop cycle " + cycle + " the queue should contain exactly " + cycle + " FlowFiles, " + + "which would not be the case if the prior scheduling loop leaked (too many) or if the new " + + "scheduling loop failed to run (too few)."); + } + + assertEquals(START_STOP_CYCLES, getConnectionQueueSize(generateToTerminate.getId()), + "Expected exactly " + START_STOP_CYCLES + " FlowFiles queued after " + START_STOP_CYCLES + " start/stop cycles"); + + getClientUtil().startProcessor(configuredTerminate); + try { + waitForQueueCount(generateToTerminate, 0); + } finally { + getClientUtil().stopProcessor(configuredTerminate); + } + + assertEquals(0, getConnectionQueueSize(generateToTerminate.getId()), + "After TerminateFlowFile drained the queue it should be empty"); + } +}