Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions changelog/unreleased/issue-26080.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
type = "f"
message = "Fix duplicate metric registration error during concurrent pipeline state builds"

issues = ["26080"]
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ public String className() {
public static class State {
private final Logger LOG = LoggerFactory.getLogger(getClass());
protected static final String STAGE_CACHE_METRIC_SUFFIX = "stage-cache";
private static final Object METRIC_REGISTRATION_LOCK = new Object();

private final ImmutableMap<String, Pipeline> currentPipelines;
private final ImmutableSetMultimap<String, Pipeline> streamPipelineConnections;
Expand Down Expand Up @@ -504,9 +505,12 @@ public StageIterator.Configuration load(@Nonnull Set<Pipeline> pipelines) {
}
});

// we have to remove the metrics, because otherwise we leak references to the cache (and the register call with throw)
metricRegistry.removeMatching((name, metric) -> name.startsWith(getStageCacheMetricName()));
MetricUtils.safelyRegisterAll(metricRegistry, new CacheStatsSet(getStageCacheMetricName(), cache));
// Synchronized to prevent concurrent State constructions from racing on remove+register,
// which would cause duplicate metric registration errors. (See #26080)
synchronized (METRIC_REGISTRATION_LOCK) {
metricRegistry.removeMatching((name, metric) -> name.startsWith(getStageCacheMetricName()));
MetricUtils.safelyRegisterAll(metricRegistry, new CacheStatsSet(getStageCacheMetricName(), cache));
}
}

protected String getStageCacheMetricName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,76 @@
*/
package org.graylog.plugins.pipelineprocessor.processors;

import org.graylog2.plugin.LocalMetricRegistry;
import com.codahale.metrics.MetricRegistry;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

public class StateTest {

@Test
public void testMetricName() {
final PipelineInterpreter.State state = new PipelineInterpreter.State(null, null, null,
new LocalMetricRegistry(), 1, false);
new MetricRegistry(), 1, false);
assertEquals("org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.stage-cache",
state.getStageCacheMetricName());
}

/**
* Regression test for <a href="https://github.com/Graylog2/graylog2-server/issues/26080">#26080</a>.
* Verifies that concurrent State construction does not produce duplicate metric registration errors.
*/
@Test
public void concurrentStateConstructionDoesNotCauseDuplicateMetricErrors() throws Exception {
final int threadCount = 10;
final MetricRegistry sharedRegistry = new MetricRegistry() {
@Override
public void registerAll(com.codahale.metrics.MetricSet metrics) throws IllegalArgumentException {
try {
super.registerAll(metrics);
} catch (IllegalArgumentException e) {
// Fail the test if duplicate metric registration is attempted (this would normally only be logged).
throw new AssertionError("Duplicate metric set registered", e);
}
}
};
final CountDownLatch startLatch = new CountDownLatch(1);
final ExecutorService executor = Executors.newFixedThreadPool(threadCount,
new ThreadFactoryBuilder().setNameFormat("state-test-%d").build());

try {
final List<Future<PipelineInterpreter.State>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
futures.add(executor.submit(() -> {
startLatch.await();
return new PipelineInterpreter.State(null, null, null, sharedRegistry, 1, false);
}));
}

// Release all threads simultaneously to maximize contention
startLatch.countDown();

for (final Future<PipelineInterpreter.State> future : futures) {
assertThat(future.get(5, TimeUnit.SECONDS)).isNotNull();
}

// CacheStatsSet registers 9 gauge metrics under the stage-cache prefix
final long stageCacheMetricCount = sharedRegistry.getMetrics().keySet().stream()
.filter(name -> name.startsWith("org.graylog.plugins.pipelineprocessor.processors.PipelineInterpreter.stage-cache"))
.count();
assertThat(stageCacheMetricCount).isEqualTo(9);
} finally {
executor.shutdownNow();
}
}
}
Loading