Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED;
import static org.apache.accumulo.core.metrics.Metric.COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.io.IOException;
Expand Down Expand Up @@ -107,8 +108,12 @@ public class CompactionPriorityQueueMetricsIT extends SharedMiniClusterBase {
private String rootPath;

public static final String QUEUE1 = "METRICSQ1";
public static final String QUEUE2 = "METRICSQ2";
public static final String QUEUE1_METRIC_LABEL = MetricsUtil.formatString(QUEUE1);
public static final String QUEUE2_METRIC_LABEL = MetricsUtil.formatString(QUEUE2);
public static final String QUEUE1_SERVICE = "Q1";
public static final String QUEUE2_SERVICE = "Q2";
public static final int QUEUE2_SIZE = 6;
Comment thread
ArbaazKhan1 marked this conversation as resolved.
public static final int QUEUE1_SIZE = 10 * 1024;

// Metrics collector Thread
Expand Down Expand Up @@ -207,6 +212,14 @@ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration conf)
cfg.setProperty(Property.MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, "10K");
cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 0);

// Queue 2 with zero compactors
cfg.setProperty(Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE2_SERVICE + ".planner",
RatioBasedCompactionPlanner.class.getName());
cfg.setProperty(
Property.COMPACTION_SERVICE_PREFIX.getKey() + QUEUE2_SERVICE + ".planner.opts.groups",
"[{'group':'" + QUEUE2 + "'}]");
cfg.getClusterServerConfiguration().addCompactorResourceGroup(QUEUE2, 0);

// This test waits for dead compactors to be absent in zookeeper. The following setting will
// make entries in ZK related to dead compactor processes expire sooner.
cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT.getKey(), "10");
Expand Down Expand Up @@ -431,6 +444,210 @@ public void testQueueMetrics() throws Exception {
}
}

@Test
public void testMultipleQueueMetricsIndependence() throws Exception {
String table2 = getUniqueNames(2)[1];
long highestFileCountQueue1 = 0L;
long highestFileCountQueue2 = 0L;
ServerContext context = getCluster().getServerContext();
try (AccumuloClient c = Accumulo.newClient().from(getClientProps()).build()) {
String dir1 = getDir("/testBulkFile-Queue1-");
String dir2 = getDir("/testBulkFile-Queue2-");
FileSystem fs = getCluster().getFileSystem();
fs.mkdirs(new Path(dir1));
fs.mkdirs(new Path(dir2));

Map<String,String> props =
Map.of("table.compaction.dispatcher", SimpleCompactionDispatcher.class.getName(),
"table.compaction.dispatcher.opts.service", QUEUE2_SERVICE);
NewTableConfiguration ntc = new NewTableConfiguration().setProperties(props)
.withInitialTabletAvailability(TabletAvailability.HOSTED);
c.tableOperations().create(table2, ntc);
TableId table2Id = TableId.of(c.tableOperations().tableIdMap().get(table2));

// Create splits so there are two groupings of tablets with similar file counts for both
// queues.
String splitString = "500 1000 1500 2000 3750 5500 7250 9000";
String splitString2 = "500 1000 1500 2000 3750 5500";
addSplits(c, tableName, splitString);
addSplits(c, table2, splitString2);

// Add files to both directories (simulating two different queues)
for (int i = 0; i < 100; i++) {
writeData(dir1 + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
writeData(dir2 + "/f" + i + ".", aconf, i * 100, (i + 1) * 100 - 1);
}
c.tableOperations().importDirectory(dir1).to(tableName).load();
c.tableOperations().importDirectory(dir2).to(table2).load();

// Set up compaction configurations for two different queues
IteratorSetting iterSettingQueue1 = new IteratorSetting(100, CompactionIT.TestFilter.class);
iterSettingQueue1.addOption("expectedQ", QUEUE1);
iterSettingQueue1.addOption("modulus", 3 + "");

IteratorSetting iterSettingQueue2 = new IteratorSetting(100, CompactionIT.TestFilter.class);
iterSettingQueue2.addOption("expectedQ", QUEUE2);
iterSettingQueue2.addOption("modulus", 5 + "");

CompactionConfig configQ1 =
new CompactionConfig().setIterators(List.of(iterSettingQueue1)).setWait(false);
CompactionConfig configQ2 =
new CompactionConfig().setIterators(List.of(iterSettingQueue2)).setWait(false);

c.tableOperations().compact(tableName, configQ1);
c.tableOperations().compact(table2, configQ2);
// Get file sizes for each queue's tablets
try (TabletsMetadata tm = context.getAmple().readTablets().forTable(tableId).build()) {
for (TabletMetadata tablet : tm) {
long fileSize = tablet.getFiles().size();
log.info("Number of files in tablet1 {}: {}", tablet.getExtent().toString(), fileSize);
highestFileCountQueue1 = Math.max(highestFileCountQueue1, fileSize);
}
}

try (TabletsMetadata tm = context.getAmple().readTablets().forTable(table2Id).build()) {
for (TabletMetadata tablet : tm) {
long fileSize = tablet.getFiles().size();
log.info("Number of files in tablet2 {}: {}", tablet.getExtent().toString(), fileSize);
highestFileCountQueue2 = Math.max(highestFileCountQueue2, fileSize);
}
}

assertNotEquals(highestFileCountQueue1, highestFileCountQueue2);
verifyData(c, tableName, 0, 100 * 100 - 1, false);
verifyData(c, table2, 0, 100 * 100 - 1, false);
}

// Fetch and verify metrics for both queues
boolean sawMetricsQ1 = false;
boolean sawMetricsQ2 = false;

while (!sawMetricsQ1 || !sawMetricsQ2) {
while (!queueMetrics.isEmpty()) {
var qm = queueMetrics.take();
if (qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& qm.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (Integer.parseInt(qm.getValue()) > 0) {
sawMetricsQ1 = true;
}
} else if (qm.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& qm.getTags().containsValue(QUEUE2_METRIC_LABEL)) {
if (Integer.parseInt(qm.getValue()) > 0) {
sawMetricsQ2 = true;
}
}
}
// If metrics are not found in the queue, sleep until the next poll.
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

// Verify metrics independence by comparing their lowest priorities
long lowestPriorityQ1 = Short.MIN_VALUE;
long lowestPriorityQ2 = Short.MIN_VALUE;
long rejectedCountQ1 = 0L;
long rejectedCountQ2 = 0L;
boolean sawQ1Metric = false;
boolean sawQ2Metric = false;
boolean sawQs = false;

while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
// Handle QUEUE1 metrics
if (metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.getName())) {
rejectedCountQ1 = Long.parseLong(metric.getValue());
} else if (metric.getName()
.contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName())) {
lowestPriorityQ1 = Math.max(lowestPriorityQ1, Long.parseLong(metric.getValue()));
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName())) {
sawQ1Metric = true;
}
} else if (metric.getTags().containsValue(QUEUE2_METRIC_LABEL)) {
if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_REJECTED.getName())) {
rejectedCountQ2 = Long.parseLong(metric.getValue());
} else if (metric.getName()
.contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_PRIORITY.getName())) {
lowestPriorityQ2 = Math.max(lowestPriorityQ2, Long.parseLong(metric.getValue()));
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_SIZE.getName())) {
sawQ2Metric = true;
}
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) {
sawQs = true;
} else {
log.debug("{}", metric);
}
}

// Confirm metrics were generated and in some cases, validate contents.
assertTrue(rejectedCountQ1 > 0L);
assertTrue(rejectedCountQ2 > 0L);

// Priority is the file counts + number of compactions for that tablet.
// The lowestPriority job in the queue should have been
// at least 1 count higher than the highest file count.
TableId tid = context.getTableId(tableName);
TableId tid2 = context.getTableId(table2);

short highestFileCountPrioQ1 =
CompactionJobPrioritizer.createPriority(getCluster().getServerContext().getNamespaceId(tid),
tid, CompactionKind.USER, (int) highestFileCountQueue1, 0,
context.getTableConfiguration(tid).getCount(Property.TABLE_FILE_MAX));
assertTrue(lowestPriorityQ1 > highestFileCountPrioQ1,
lowestPriorityQ1 + " " + highestFileCountQueue1 + " " + highestFileCountPrioQ1);

short highestFileCountPrioQ2 = CompactionJobPrioritizer.createPriority(
getCluster().getServerContext().getNamespaceId(tid2), tid2, CompactionKind.USER,
(int) highestFileCountQueue2, 0,
context.getTableConfiguration(tid2).getCount(Property.TABLE_FILE_MAX));
assertTrue(lowestPriorityQ2 > highestFileCountPrioQ2,
lowestPriorityQ2 + " " + highestFileCountQueue2 + " " + highestFileCountPrioQ2);

// Multiple Queues have been created
assertTrue(sawQs);

assertTrue(sawQ1Metric);
assertTrue(sawQ2Metric);

// Start Compactors for both QUEUE1 and QUEUE2
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE1, 1);
getCluster().getConfig().getClusterServerConfiguration().addCompactorResourceGroup(QUEUE2, 1);
getCluster().getClusterControl().start(ServerType.COMPACTOR);

boolean emptyQueue1 = false;
boolean emptyQueue2 = false;

UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());

// Continue checking until both queues are empty
while (!emptyQueue1 || !emptyQueue2) {
while (!queueMetrics.isEmpty()) {
var metric = queueMetrics.take();
// Check metrics for QUEUE1
if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& metric.getTags().containsValue(QUEUE1_METRIC_LABEL)) {
if (Integer.parseInt(metric.getValue()) == 0) {
emptyQueue1 = true;
}
} else if (metric.getName().contains(COMPACTOR_JOB_PRIORITY_QUEUE_JOBS_QUEUED.getName())
&& metric.getTags().containsValue(QUEUE2_METRIC_LABEL)) {
if (Integer.parseInt(metric.getValue()) == 0) {
emptyQueue2 = true;
}
}

// Check if the total number of queues is zero, and ensure no metrics for the queues remain
if (metric.getName().equals(COMPACTOR_JOB_PRIORITY_QUEUES.getName())) {
if (Integer.parseInt(metric.getValue()) == 0) {
emptyQueue1 = true;
emptyQueue2 = true;
}
}
}
UtilWaitThread.sleep(TestStatsDRegistryFactory.pollingFrequency.toMillis());
}

}

/**
* Test that the compaction queue is cleared when compactions no longer need to happen.
*/
Expand Down Expand Up @@ -465,7 +682,7 @@ public void testCompactionQueueClearedWhenNotNeeded() throws Exception {
context.tableOperations().setProperty(tableName, Property.TABLE_FILE_MAX.getKey(), "2000");

// wait for queue to clear
Wait.waitFor(() -> getJobsQueued() == 0, 60_000, sleepMillis,
Wait.waitFor(() -> getJobsQueued() <= 0, 60_000, sleepMillis,
"Expected job queue to be cleared once compactions no longer need to happen");
}

Expand Down