Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.Monitor;
import org.apache.accumulo.monitor.next.InformationFetcher.InstanceSummary;
import org.apache.accumulo.monitor.next.SystemInformation.CompactionGroupSummary;
import org.apache.accumulo.monitor.next.SystemInformation.CompactionTableSummary;
import org.apache.accumulo.monitor.next.SystemInformation.TableSummary;
import org.apache.accumulo.monitor.next.SystemInformation.TimeOrderedRunningCompactionSet;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
Expand Down Expand Up @@ -321,6 +323,22 @@ public List<RunningCompactionInfo> getCompactions() {
.collect(Collectors.toList());
}

@GET
@Path("compactions/running/group")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns number of running major compactions per group")
public List<CompactionGroupSummary> getRunningCompactionsPerGroup() {
return monitor.getInformationFetcher().getSummaryForEndpoint().getRunningCompactionsPerGroup();
}

@GET
@Path("compactions/running/table")
@Produces(MediaType.APPLICATION_JSON)
@Description("Returns number of running major compactions per table")
public List<CompactionTableSummary> getRunningCompactionsPerTable() {
return monitor.getInformationFetcher().getSummaryForEndpoint().getRunningCompactionsPerTable();
}

@GET
@Path("compactions/running/{" + GROUP_PARAM_KEY + "}")
@Produces(MediaType.APPLICATION_JSON)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@
package org.apache.accumulo.monitor.next;

import static com.google.common.base.Suppliers.memoize;
import static org.apache.accumulo.core.metrics.MetricsInfo.QUEUE_TAG_KEY;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -43,6 +45,7 @@
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.client.admin.TabletMergeabilityInfo;
Expand All @@ -57,14 +60,15 @@
import org.apache.accumulo.core.metadata.SystemTables;
import org.apache.accumulo.core.metadata.TabletState;
import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.metrics.flatbuffers.FTag;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.core.spi.balancer.TableLoadBalancer;
import org.apache.accumulo.core.util.compaction.RunningCompactionInfo;
import org.apache.accumulo.monitor.next.deployment.DeploymentOverview;
import org.apache.accumulo.monitor.next.views.ServersView;
import org.apache.accumulo.monitor.next.views.ServersView.Column;
import org.apache.accumulo.monitor.next.views.ServersView.ColumnFactory;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.metrics.MetricResponseWrapper;
Expand Down Expand Up @@ -352,6 +356,12 @@ public Stream<RunningCompactionInfo> stream() {

}

public record CompactionTableSummary(String tableId, String tableName, long running) {
}

public record CompactionGroupSummary(String groupId, long running) {
}

private static final Logger LOG = LoggerFactory.getLogger(SystemInformation.class);

private final DistributionStatisticConfig DSC =
Expand Down Expand Up @@ -401,6 +411,9 @@ public Stream<RunningCompactionInfo> stream() {
protected final Map<TableId,LongAdder> runningCompactionsPerTable = new ConcurrentHashMap<>();
protected final Map<String,LongAdder> runningCompactionsPerGroup = new ConcurrentHashMap<>();

private final List<CompactionTableSummary> tableCompactions = new ArrayList<>();
private final List<CompactionGroupSummary> groupCompactions = new ArrayList<>();

// Table Information
private final Map<TableId,TableSummary> tables = new ConcurrentHashMap<>();
private final Map<TableId,List<TabletInformation>> tablets = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -449,6 +462,8 @@ public void clear() {
suggestions.clear();
runningCompactionsPerGroup.clear();
runningCompactionsPerTable.clear();
tableCompactions.clear();
groupCompactions.clear();
configuredCompactionResourceGroups.clear();
serverMetricsView.clear();
}
Expand Down Expand Up @@ -490,20 +505,80 @@ private void updateAggregates(final MetricResponse response,

}

private void createCompactionSummary(MetricResponse response) {
if (response.getMetrics() != null) {
for (final ByteBuffer binary : response.getMetrics()) {
FMetric fm = FMetric.getRootAsFMetric(binary);
for (int i = 0; i < fm.tagsLength(); i++) {
FTag t = fm.tags(i);
if (t.key().equals(MetricsInfo.QUEUE_TAG_KEY)) {
queueMetrics
.computeIfAbsent(t.value(), (k) -> Collections.synchronizedList(new ArrayList<>()))
.add(fm);
private ServersView createCompactionQueueSummary(final Set<ServerId> managers) {

final Column COMPACTION_QUEUE_COL =
Comment thread
dlmarion marked this conversation as resolved.
new Column(ServersView.RG_COL_KEY, "Compaction Queue", "Compaction Queue", "");

List<ColumnFactory> cols = ServersView.columnsFor(ServersView.ServerTable.COORDINATOR_QUEUES);

// Remove the column mapping for the resource group and replace it so that
// the column header reads "Compaction Queue" instead of "Resource Group"
int rgIdx = -1;
for (int idx = 0; idx < cols.size(); idx++) {
if (cols.get(idx).getColumn().key().equals(ServersView.RG_COL_KEY)) {
rgIdx = idx;
break;
}
}

if (rgIdx == -1) {
LOG.warn("Did not find Resource Group column to replace column header");
} else {
cols.remove(rgIdx);
cols.set(rgIdx, new ColumnFactory() {

@Override
public Column getColumn() {
return COMPACTION_QUEUE_COL;
}

@Override
public Object getRowData(ServerId sid, MetricResponse mr,
Map<String,List<FMetric>> serverMetrics) {
return sid.getResourceGroup().canonical();
}
});
}

// Construct a Map of MetricResponses by Queue. This method will take
// the provided MetricResponse and construct new ones that contain
// only the metrics with the "queue.id" tag in addition to the common
// server information (address, resource group, etc.).
Map<ServerId,MetricResponse> qm = new HashMap<>();

for (ServerId manager : managers) {
MetricResponse response = allMetrics.getIfPresent(manager);
if (response.getMetrics() != null) {

FMetric fm = new FMetric();
FTag t = new FTag();
for (final ByteBuffer binary : response.getMetrics()) {
fm = FMetric.getRootAsFMetric(binary, fm);
for (int i = 0; i < fm.tagsLength(); i++) {
t = fm.tags(t, i);
if (t.key().equals(QUEUE_TAG_KEY)) {
String queueName = t.value();
// For these MetricResponse objects we are going to put the queueId value
// in the place of the resource group, we'll update the column information
// for the resource group below.
ServerId sid = new ServerId(manager.getType(), ResourceGroupId.of(queueName),
manager.getHost(), manager.getPort());
qm.computeIfAbsent(sid, (k) -> new MetricResponse(response.getServerType(),
response.getServer(), queueName, response.getTimestamp(), new ArrayList<>()))
.addToMetrics(binary);
break;
}
}
}
}
}

if (!qm.isEmpty()) {
// Create a ServersView object from the MetricResponse for each queue
return new ServersView(qm.keySet(), 0, qm, timestamp.get(), cols);
}
return new ServersView(Set.of(), 0, Map.of(), timestamp.get(), cols);
}

public void processResponse(final ServerId server, final MetricResponse response) {
Expand All @@ -527,7 +602,6 @@ public void processResponse(final ServerId server, final MetricResponse response
break;
case MANAGER:
managers.add(server);
createCompactionSummary(response);
break;
case SCAN_SERVER:
sservers.computeIfAbsent(response.getResourceGroup(), (rg) -> ConcurrentHashMap.newKeySet())
Expand Down Expand Up @@ -606,6 +680,7 @@ public void finish() {
+ " group " + balancerRG + ", but there are no TabletServers.");
}
}

for (String rg : getResourceGroups()) {
Set<ServerId> rgCompactors = getCompactorResourceGroupServers(rg);
List<FMetric> metrics = queueMetrics.get(rg);
Expand Down Expand Up @@ -652,6 +727,19 @@ public void finish() {

timestamp.set(System.currentTimeMillis());

for (Entry<TableId,LongAdder> e : runningCompactionsPerTable.entrySet()) {
TableId tid = e.getKey();
try {
tableCompactions.add(new CompactionTableSummary(tid.canonical(),
ctx.getQualifiedTableName(tid), e.getValue().sum()));
} catch (TableNotFoundException e1) {
LOG.warn("Error converting table id {} to table name, caught TableNotFoundException", tid);
}
}

runningCompactionsPerGroup
.forEach((k, v) -> groupCompactions.add(new CompactionGroupSummary(k, v.sum())));

for (final ServerId.Type type : ServerId.Type.values()) {
long problemHostCount =
problemHosts.stream().filter(serverId -> serverId.getType() == type).count();
Expand All @@ -673,6 +761,10 @@ public void finish() {
cacheServerProcessView(ServersView.ServerTable.MANAGER_FATE, servers, problemHostCount);
cacheServerProcessView(ServersView.ServerTable.MANAGER_COMPACTIONS, servers,
problemHostCount);
ServersView coordinatorQueues = createCompactionQueueSummary(servers);
serverMetricsView.put(ServersView.ServerTable.COORDINATOR_QUEUES,
memoize(() -> coordinatorQueues));

break;
case SCAN_SERVER:
sservers.values().forEach(servers::addAll);
Expand Down Expand Up @@ -706,6 +798,14 @@ public ServerId getGarbageCollector() {
return this.gc.get();
}

public List<CompactionGroupSummary> getRunningCompactionsPerGroup() {
return this.groupCompactions;
}

public List<CompactionTableSummary> getRunningCompactionsPerTable() {
return this.tableCompactions;
}

public Set<ServerId> getCompactorResourceGroupServers(String resourceGroup) {
return this.compactors.get(resourceGroup);
}
Expand Down Expand Up @@ -783,7 +883,7 @@ public long getTimestamp() {
private void cacheServerProcessView(ServersView.ServerTable table, Set<ServerId> servers,
long problemHostCount) {
serverMetricsView.put(table, memoize(() -> new ServersView(servers, problemHostCount,
allMetrics, timestamp.get(), ServersView.columnsFor(table))));
allMetrics.asMap(), timestamp.get(), ServersView.columnsFor(table))));
}

public ServersView getServerProcessView(ServersView.ServerTable table) {
Expand Down
Loading