Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1193,23 +1193,19 @@ protected synchronized ThriftTransportPool getTransportPoolImpl(boolean shouldHa
return thriftTransportPool;
}

public MeterRegistry getMeterRegistry() {
ensureOpen();
return micrometer;
}

public void setMeterRegistry(MeterRegistry micrometer) {
public synchronized void setMeterRegistry(MeterRegistry micrometer) {
ensureOpen();
this.micrometer = micrometer;
getCaches();
if (caches != null) {
caches.registerMetrics(micrometer);
}
}

public synchronized Caches getCaches() {
ensureOpen();
if (caches == null) {
caches = Caches.getInstance();
if (micrometer != null
&& getConfiguration().getBoolean(Property.GENERAL_MICROMETER_CACHE_METRICS_ENABLED)) {
if (micrometer != null) {
caches.registerMetrics(micrometer);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,6 @@ temporary files (for example, when creating a pre-split table). \
PropertyType.TIMEDURATION,
"The maximum amount of time that a Scanner should wait before retrying a failed RPC.",
"1.7.3"),
GENERAL_MICROMETER_CACHE_METRICS_ENABLED("general.micrometer.cache.metrics.enabled", "false",
PropertyType.BOOLEAN, "Enables Caffeine Cache metrics functionality using Micrometer.",
"4.0.0"),
GENERAL_MICROMETER_ENABLED("general.micrometer.enabled", "false", PropertyType.BOOLEAN,
"Enables metrics collection and reporting functionality using Micrometer.", "2.1.0"),
GENERAL_MICROMETER_JVM_METRICS_ENABLED("general.micrometer.jvm.metrics.enabled", "false",
Expand Down Expand Up @@ -1688,8 +1685,7 @@ public static boolean isValidTablePropertyKey(String key) {
GENERAL_IDLE_PROCESS_INTERVAL, GENERAL_MICROMETER_ENABLED,
GENERAL_MICROMETER_JVM_METRICS_ENABLED, GENERAL_MICROMETER_LOG_METRICS,
GENERAL_MICROMETER_FACTORY, GENERAL_SERVER_LOCK_VERIFICATION_INTERVAL,
GENERAL_CACHE_MANAGER_IMPL, GENERAL_MICROMETER_CACHE_METRICS_ENABLED,
GENERAL_LOW_MEM_DETECTOR_INTERVAL,
GENERAL_CACHE_MANAGER_IMPL, GENERAL_LOW_MEM_DETECTOR_INTERVAL,

// MANAGER options
MANAGER_THREADCHECK, MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, MANAGER_METADATA_SUSPENDABLE,
Expand Down
44 changes: 26 additions & 18 deletions core/src/main/java/org/apache/accumulo/core/metrics/Metric.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ public enum Metric {
MetricDocSection.COMPACTION, "Majc In Progress", null, NUMBER),
COMPACTOR_MAJC_STUCK("accumulo.compaction.majc.stuck", MetricType.LONG_TASK_TIMER,
"Number and duration of stuck major compactions.", MetricDocSection.COMPACTION, "Majc Stuck",
null, NUMBER),
null, DURATION),
COMPACTOR_MINC_STUCK("accumulo.compaction.minc.stuck", MetricType.LONG_TASK_TIMER,
"Number and duration of stuck minor compactions.", MetricDocSection.COMPACTION, "Minc Stuck",
null, NUMBER),
null, DURATION),
COMPACTOR_ENTRIES_READ("accumulo.compaction.entries.read", MetricType.FUNCTION_COUNTER,
"Number of entries read by all compactions that have run on this compactor (majc) or tserver (minc).",
MetricDocSection.COMPACTION, "Compaction Entries Read", null, NUMBER),
Expand Down Expand Up @@ -122,7 +122,7 @@ public enum Metric {
MetricDocSection.COMPACTION, "Compaction Queue Avg Job Age", null, NUMBER),
COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_POLL_TIMER("accumulo.compaction.queue.jobs.exit.time",
MetricType.TIMER, "Tracks time a job spent in the queue before exiting the queue.",
MetricDocSection.COMPACTION, "Compaction Queue Job Time Queued", null, NUMBER),
MetricDocSection.COMPACTION, "Compaction Queue Job Time Queued", null, DURATION),

// Fate Metrics
FATE_TYPE_IN_PROGRESS("accumulo.fate.ops.in.progress.by.type", MetricType.GAUGE,
Expand Down Expand Up @@ -241,24 +241,27 @@ public enum Metric {

// Scan Server Metrics
SCAN_RESERVATION_TOTAL_TIMER("accumulo.scan.reservation.total.timer", MetricType.TIMER,
"Time to reserve a tablet's files for scan.", MetricDocSection.SCAN_SERVER,
"Scan Reservation Total Time", null, NUMBER),
"Average time to reserve a tablet's files for scan.", MetricDocSection.SCAN_SERVER,
"Mean Reservation", null, DURATION),
SCAN_RESERVATION_WRITEOUT_TIMER("accumulo.scan.reservation.writeout.timer", MetricType.TIMER,
"Time to write out a tablets file reservations for scan.", MetricDocSection.SCAN_SERVER,
"Scan Reservation Write Time", null, NUMBER),
"Scan Reservation Write Time", null, DURATION),
SCAN_RESERVATION_FILES("accumulo.scan.reservation.files", MetricType.GAUGE,
"The number of files reserved by a scan server.", MetricDocSection.SCAN_SERVER,
"Files Reserved", null, NUMBER),
SCAN_RESERVATION_CONFLICT_COUNTER("accumulo.scan.reservation.conflict.count", MetricType.COUNTER,
"Count of instances where file reservation attempts for scans encountered conflicts.",
MetricDocSection.SCAN_SERVER, "Scan Reservation Conflicts", null, NUMBER),
SCAN_TABLET_METADATA_CACHE("accumulo.scan.tablet.metadata.cache", MetricType.CACHE,
"Scan server tablet cache metrics.", MetricDocSection.SCAN_SERVER, "Scan Server Tablet Cache",
null, NUMBER),

// Scan Metrics
SCAN_BUSY_TIMEOUT_COUNT("accumulo.scan.busy.timeout.count", MetricType.FUNCTION_COUNTER,
"Count of the scans where a busy timeout happened.", MetricDocSection.SCAN, "Scan Busy Count",
null, NUMBER),

// Scan Metrics
SCAN_TIMES("accumulo.scan.times", MetricType.TIMER, "Scan session lifetime (creation to close).",
MetricDocSection.SCAN, "Scan Session Total Time", null, NUMBER),
MetricDocSection.SCAN, "Scan Session Total Time", null, DURATION),
SCAN_OPEN_FILES("accumulo.scan.files.open", MetricType.GAUGE, "Number of files open for scans.",
MetricDocSection.SCAN, "Scan Files Open", null, NUMBER),
SCAN_RESULTS("accumulo.scan.result", MetricType.DISTRIBUTION_SUMMARY, "Results per scan.",
Expand Down Expand Up @@ -307,10 +310,11 @@ public enum Metric {
// Minor Compaction Metrics
MINC_QUEUED("accumulo.compaction.minc.queued", MetricType.TIMER,
"Queued minor compactions time queued.", MetricDocSection.COMPACTION, "Minc Queued", null,
NUMBER),
DURATION),
MINC_RUNNING("accumulo.compaction.minc.running", MetricType.TIMER,
"Minor compactions time active.", MetricDocSection.COMPACTION, "Minc Running", null, NUMBER),
MINC_PAUSED("accumulo.compaction.minc.paused", MetricType.COUNTER,
"Minor compactions time active.", MetricDocSection.COMPACTION, "Minc Running", null,
DURATION),
MINC_PAUSED("accumulo.compaction.minc.paused", MetricType.FUNCTION_COUNTER,
"Number of paused minor compactions.", MetricDocSection.COMPACTION, "Minc Paused", null,
NUMBER),

Expand All @@ -320,19 +324,19 @@ public enum Metric {
MetricDocSection.TABLET_SERVER, "Ingest Errors", null, NUMBER),
UPDATE_LOCK("accumulo.updates.lock", MetricType.TIMER,
"Average time taken for conditional mutation to get a row lock.",
MetricDocSection.TABLET_SERVER, "Conditional Mutation Row Lock Wait Time", null, NUMBER),
MetricDocSection.TABLET_SERVER, "Conditional Mutation Row Lock Wait Time", null, DURATION),
UPDATE_CHECK("accumulo.updates.check", MetricType.TIMER,
"Average time taken for conditional mutation to check conditions.",
MetricDocSection.TABLET_SERVER, "Conditional Mutation Condition Check Time", null, NUMBER),
MetricDocSection.TABLET_SERVER, "Conditional Mutation Condition Check Time", null, DURATION),
UPDATE_COMMIT("accumulo.updates.commit", MetricType.TIMER,
"Average time taken to commit a mutation.", MetricDocSection.TABLET_SERVER,
"Mutation Commit Avg Total Time", null, NUMBER),
"Mutation Commit Avg Total Time", null, DURATION),
UPDATE_COMMIT_PREP("accumulo.updates.commit.prep", MetricType.TIMER,
"Average time taken to prepare to commit a single mutation.", MetricDocSection.TABLET_SERVER,
"Mutation Commit Avg Prep Time", null, NUMBER),
"Mutation Commit Avg Prep Time", null, DURATION),
UPDATE_WALOG_WRITE("accumulo.updates.walog.write", MetricType.TIMER,
"Time taken to write a batch of mutations to WAL.", MetricDocSection.TABLET_SERVER,
"WAL Write Time", null, NUMBER),
"WAL Write Time", null, DURATION),
UPDATE_MUTATION_ARRAY_SIZE("accumulo.updates.mutation.arrays.size",
MetricType.DISTRIBUTION_SUMMARY, "Batch size of mutations from client.",
MetricDocSection.TABLET_SERVER, "Mutation Batch Size", null, NUMBER),
Expand Down Expand Up @@ -399,7 +403,11 @@ public enum Metric {
MetricDocSection.GENERAL_SERVER, "Completed task", null, NUMBER),
EXECUTOR_QUEUED("executor.queued", MetricType.GAUGE,
"Task queued for a thread pool. Each thread pool emits this metric w/ a different tag.",
MetricDocSection.GENERAL_SERVER, "Queued task", null, NUMBER);
MetricDocSection.GENERAL_SERVER, "Queued task", null, NUMBER),

// Cache metrics
CACHE_SIZE("cache.size", MetricType.GAUGE, "The current number of entries a cache has.",
MetricDocSection.GENERAL_SERVER, "Cache size", null, NUMBER);

public enum MonitorCssClass {
BYTES("big-size"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ public MetricResponse getMetrics(TInfo tinfo, TCredentials credentials) throws T
registry.getMeters().forEach(m -> {
if (m.getId().getName().startsWith("accumulo.")
|| m.getId().getName().equals(Metric.EXECUTOR_COMPLETED.getName())
|| m.getId().getName().equals(Metric.EXECUTOR_QUEUED.getName())) {
|| m.getId().getName().equals(Metric.EXECUTOR_QUEUED.getName())
|| m.getId().getName().equals(Metric.CACHE_SIZE.getName())) {
if (!this.monitorMetricExclusions.contains(m.getId().getName())) {
m.match(response::writeMeter, response::writeMeter, response::writeTimer,
response::writeDistributionSummary, response::writeLongTaskTimer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.monitor.next.views;

import java.util.List;
import java.util.Map;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.metrics.flatbuffers.FTag;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.monitor.next.SystemInformation;

import com.google.common.base.Preconditions;

public class CacheSizeColumnFactory implements ColumnFactory {

private final String cacheName;
private final TableData.Column column;

CacheSizeColumnFactory(String cacheName, String colHeader, String description) {
this.cacheName = cacheName;
Preconditions.checkState(Metric.CACHE_SIZE.getColumnClasses().length == 1);
this.column = new TableData.Column(Metric.CACHE_SIZE.getName() + "_" + cacheName, colHeader,
description, Metric.CACHE_SIZE.getColumnClasses()[0].getCssClass());
}

@Override
public TableData.Column getColumn() {
return column;
}

@Override
public Object getRowData(ServerId sid, MetricResponse mr,
Map<String,List<FMetric>> serverMetrics) {
var tag = new FTag();
for (var metric : serverMetrics.getOrDefault(Metric.CACHE_SIZE.getName(), List.of())) {
for (int i = 0; i < metric.tagsLength(); i++) {
metric.tags(tag, i);
if ("cache".equals(tag.key()) && cacheName.equals(tag.value())) {
return SystemInformation.getMetricValue(metric);
}
}
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import java.util.List;
import java.util.Map;
import java.util.function.Predicate;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.metrics.MonitorMeterRegistry;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.metrics.flatbuffers.FTag;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.monitor.next.SystemInformation;
import org.apache.accumulo.monitor.next.views.TableData.Column;
Expand Down Expand Up @@ -56,10 +58,14 @@ default Number add(Number n1, Number n2) {
}
}

default Number sum(List<FMetric> metrics) {
default Number sum(List<FMetric> metrics, Predicate<String> statPredicate) {
Number sum = null;
FTag tag = new FTag();
for (var metric : metrics) {
sum = add(sum, SystemInformation.getMetricValue(metric));
var statValue = TableDataFactory.extractStatistic(metric, tag);
if (statPredicate.test(statValue)) {
sum = add(sum, SystemInformation.getMetricValue(metric));
}
}
return sum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.monitor.next.SystemInformation;
import org.apache.accumulo.monitor.next.views.TableData.Column;
import org.apache.accumulo.monitor.next.views.TableDataFactory.StatType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +90,7 @@ public Object getRowData(ServerId sid, MetricResponse mr,
Number sum = null;

FTag ftag = new FTag();
FTag statTag = new FTag();
for (var metric : metrics) {
boolean foundTag = false;
String tag = null;
Expand All @@ -103,9 +105,9 @@ public Object getRowData(ServerId sid, MetricResponse mr,
}
}

var metricStatistic = TableDataFactory.extractStatistic(metric);
if (foundTag && (metricStatistic == null || metricStatistic.equals("value")
|| metricStatistic.equals("count"))) {
var metricStatistic = TableDataFactory.extractStatistic(metric, statTag);
if (foundTag && (metricStatistic == null || metricStatistic.equals(StatType.VALUE)
|| metricStatistic.equals(StatType.COUNT))) {
var val = SystemInformation.getMetricValue(metric);
log.trace("adding {}+{} for {} {} {} {}", sum, val, metric.name(), tag,
sid.toHostPortString(), sid.getResourceGroup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
import java.util.stream.Collectors;

import org.apache.accumulo.core.client.admin.servers.ServerId;
import org.apache.accumulo.core.metrics.Metric;
import org.apache.accumulo.core.metrics.flatbuffers.FMetric;
import org.apache.accumulo.core.process.thrift.MetricResponse;
import org.apache.accumulo.monitor.next.views.TableData.Column;
import org.apache.accumulo.monitor.next.views.TableDataFactory.StatType;

public class MetricColumnFactory implements ColumnFactory {

private final Column column;
private final boolean computeRate;
private final Predicate<String> statPredicate;

MetricColumnFactory(Metric metric) {
String classes;
Expand All @@ -50,6 +53,14 @@ public class MetricColumnFactory implements ColumnFactory {
}
this.column = new Column(metric.getName(), metric.getColumnHeader(),
metric.getColumnDescription(), classes);

statPredicate = switch (metric.getType()) {
case GAUGE -> sv -> sv.equals(StatType.VALUE);
case COUNTER, FUNCTION_COUNTER -> sv -> sv.equals(StatType.COUNT);
case TIMER, DISTRIBUTION_SUMMARY -> sv -> sv.equals(StatType.AVERAGE);
case LONG_TASK_TIMER -> sv -> sv.equals(StatType.MAX);
case CACHE -> StatType.COUNT_OR_VALUE; // TODO this class does not really support this type
};
}

@Override
Expand All @@ -60,7 +71,7 @@ public Column getColumn() {
@Override
public Object getRowData(ServerId sid, MetricResponse mr,
Map<String,List<FMetric>> serverMetrics) {
var sum = sum(serverMetrics.getOrDefault(column.key(), List.of()));
var sum = sum(serverMetrics.getOrDefault(column.key(), List.of()), statPredicate);
if (computeRate) {
return computeRate(sum);
} else {
Expand Down
Loading