Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,116 +16,51 @@
*/
package org.apache.nifi.diagnostics;

import java.lang.management.LockInfo;
import com.sun.management.HotSpotDiagnosticMXBean;

import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.lang.management.MonitorInfo;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.ArrayList;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;

/**
* Captures a textual dump of every thread in the JVM. Uses
* {@link HotSpotDiagnosticMXBean#dumpThreads(String, HotSpotDiagnosticMXBean.ThreadDumpFormat)}
* so that virtual threads are included in the dump alongside platform threads.
*/
public class ThreadDumpTask implements DiagnosticTask {
@Override
public DiagnosticsDumpElement captureDump(boolean verbose) {
final ThreadMXBean mbean = ManagementFactory.getThreadMXBean();

final ThreadInfo[] infos = mbean.dumpAllThreads(true, true);
final long[] deadlockedThreadIds = mbean.findDeadlockedThreads();
final long[] monitorDeadlockThreadIds = mbean.findMonitorDeadlockedThreads();

final List<ThreadInfo> sortedInfos = new ArrayList<>(infos.length);
Collections.addAll(sortedInfos, infos);
sortedInfos.sort(new Comparator<>() {
@Override
public int compare(ThreadInfo o1, ThreadInfo o2) {
return o1.getThreadName().toLowerCase().compareTo(o2.getThreadName().toLowerCase());
}
});

final StringBuilder sb = new StringBuilder();
for (final ThreadInfo info : sortedInfos) {
sb.append("\n");
sb.append("\"").append(info.getThreadName()).append("\" Id=");
sb.append(info.getThreadId()).append(" ");
sb.append(info.getThreadState().toString()).append(" ");

switch (info.getThreadState()) {
case BLOCKED:
case TIMED_WAITING:
case WAITING:
sb.append(" on ");
sb.append(info.getLockInfo());
break;
default:
break;
}

if (info.isSuspended()) {
sb.append(" (suspended)");
}
if (info.isInNative()) {
sb.append(" (in native code)");
}

if (deadlockedThreadIds != null) {
for (final long id : deadlockedThreadIds) {
if (id == info.getThreadId()) {
sb.append(" ** DEADLOCKED THREAD **");
}
}
}

if (monitorDeadlockThreadIds != null) {
for (final long id : monitorDeadlockThreadIds) {
if (id == info.getThreadId()) {
sb.append(" ** MONITOR-DEADLOCKED THREAD **");
}
}
}

final StackTraceElement[] stackTraces = info.getStackTrace();
for (final StackTraceElement element : stackTraces) {
sb.append("\n\tat ").append(element);

final MonitorInfo[] monitors = info.getLockedMonitors();
for (final MonitorInfo monitor : monitors) {
if (Objects.equals(monitor.getLockedStackFrame(), element)) {
sb.append("\n\t- waiting on ").append(monitor);
}
}
@Override
public DiagnosticsDumpElement captureDump(final boolean verbose) {
String threadDump;

Path tempDirectory = null;
try {
final HotSpotDiagnosticMXBean diagnosticMXBean = ManagementFactory.getPlatformMXBean(HotSpotDiagnosticMXBean.class);
// dumpThreads requires that the destination file does not already exist. Creating a private
// temporary directory and writing to a fresh filename inside it avoids a time-of-check to
// time-of-use race that would exist if we created a temp file and then deleted it before the
// dumpThreads call.
tempDirectory = Files.createTempDirectory("nifi-thread-dump-");
final Path tempFile = tempDirectory.resolve("thread-dump.txt");
try {
diagnosticMXBean.dumpThreads(tempFile.toString(), HotSpotDiagnosticMXBean.ThreadDumpFormat.TEXT_PLAIN);
threadDump = Files.readString(tempFile);
} finally {
Files.deleteIfExists(tempFile);
}

final LockInfo[] lockInfos = info.getLockedSynchronizers();
if (lockInfos.length > 0) {
sb.append("\n\t");
sb.append("Number of Locked Synchronizers: ").append(lockInfos.length);
for (final LockInfo lockInfo : lockInfos) {
sb.append("\n\t- ").append(lockInfo.toString());
} catch (final IOException e) {
threadDump = "Failed to capture thread dump: " + e.getMessage();
} finally {
if (tempDirectory != null) {
try {
Files.deleteIfExists(tempDirectory);
} catch (final IOException ignored) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we at least log this at debug so that a recurring failure to delete nifi-thread-dump-* directories is discoverable?

}
}

sb.append("\n");
}

if (deadlockedThreadIds != null && deadlockedThreadIds.length > 0) {
sb.append("\n\nDEADLOCK DETECTED!");
sb.append("\nThe following thread IDs are deadlocked:");
for (final long id : deadlockedThreadIds) {
sb.append("\n").append(id);
}
}

if (monitorDeadlockThreadIds != null && monitorDeadlockThreadIds.length > 0) {
sb.append("\n\nMONITOR DEADLOCK DETECTED!");
sb.append("\nThe following thread IDs are deadlocked:");
for (final long id : monitorDeadlockThreadIds) {
sb.append("\n").append(id);
}
}

return new StandardDiagnosticsDumpElement("Thread Dump", Collections.singletonList(sb.toString()));
return new StandardDiagnosticsDumpElement("Thread Dump", Collections.singletonList(threadDump));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -38,7 +33,6 @@ public class LifecycleState {
private final Object componentId;
private final AtomicInteger activeThreadCount = new AtomicInteger(0);
private final AtomicBoolean scheduled = new AtomicBoolean(false);
private final Set<ScheduledFuture<?>> futures = new HashSet<>();
private final AtomicBoolean mustCallOnStoppedMethods = new AtomicBoolean(false);
private volatile long lastStopTime = -1;
private volatile boolean terminated = false;
Expand Down Expand Up @@ -111,7 +105,16 @@ public synchronized void setScheduled(final boolean scheduled) {
mustCallOnStoppedMethods.set(true);

if (!scheduled) {
lastStopTime = System.currentTimeMillis();
// Force the stop time to strictly advance. System.currentTimeMillis() has millisecond resolution, so a
// second stop that happens within the same millisecond as the previous one would otherwise read the same
// value; that would defeat the race check in VirtualThreadSchedulingAgent, which relies on this field
// changing to signal that a prior scheduling generation has ended.
final long previousStopTime = lastStopTime;
long nextStopTime = System.currentTimeMillis();
if (nextStopTime <= previousStopTime) {
nextStopTime = previousStopTime + 1L;
}
lastStopTime = nextStopTime;
}
}

Expand All @@ -135,25 +138,6 @@ public boolean mustCallOnStoppedMethods() {
return mustCallOnStoppedMethods.getAndSet(false);
}

/**
* Establishes the list of relevant futures for this processor. Replaces any previously held futures.
*
* @param newFutures futures
*/
public synchronized void setFutures(final Collection<ScheduledFuture<?>> newFutures) {
futures.clear();
futures.addAll(newFutures);
}

public synchronized void replaceFuture(final ScheduledFuture<?> oldFuture, final ScheduledFuture<?> newFuture) {
futures.remove(oldFuture);
futures.add(newFuture);
}

public synchronized Set<ScheduledFuture<?>> getFutures() {
return Collections.unmodifiableSet(futures);
}

public synchronized void terminate() {
this.terminated = true;
activeThreadCount.set(0);
Expand Down
Loading
Loading