From 7d45738e0d2be561e9f7dd94e33ebb92aa10d2d5 Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Wed, 9 Jul 2025 16:00:40 +0200 Subject: [PATCH 01/12] first crude shot at writing / loading all alerts upon boot / shutdown --- cmd/alertmanager/main.go | 59 +++++++++++++++++++++++++++++++++++++++- go.mod | 3 ++ go.sum | 3 ++ provider/mem/mem.go | 13 +++++++++ 4 files changed, 77 insertions(+), 1 deletion(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 87cdab8a09..0c255ecca4 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -32,6 +32,7 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" "github.com/alecthomas/kingpin/v2" + jsoniter "github.com/json-iterator/go" "github.com/prometheus/client_golang/prometheus" versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -56,6 +57,7 @@ import ( "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/mem" "github.com/prometheus/alertmanager/silence" + "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/template" "github.com/prometheus/alertmanager/timeinterval" "github.com/prometheus/alertmanager/types" @@ -177,6 +179,8 @@ func run() int { allowInsecureAdvertise = kingpin.Flag("cluster.allow-insecure-public-advertise-address-discovery", "[EXPERIMENTAL] Allow alertmanager to discover and listen on a public IP address.").Bool() label = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String() featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String() + + alertPersistenceFile = kingpin.Flag("storage.alert-persistence-file", "Alert persistence filename (in the base folder). If set, Alertmanager will persist alerts to this file on shutdown and restore them on startup.").Default("persisted-alerts.json").String() ) promslogflag.AddFlags(kingpin.CommandLine, &promslogConfig) @@ -347,7 +351,32 @@ func run() int { logger.Error("error creating memory provider", "err", err) return 1 } - defer alerts.Close() + + // if alertPersistenceFile is set, we will use it to load persisted alerts + alertPersistenceFilePath := "" + if *alertPersistenceFile != "" { + alertPersistenceFilePath = filepath.Join(*dataDir, *alertPersistenceFile) + } + + defer func() { + // if alertPersistenceFile is set, persist alerts + if alertPersistenceFilePath != "" { + alertsToWrite := alerts.GetAlerts() + if err := persistAlerts(alertPersistenceFilePath, alertsToWrite); err != nil { + logger.Error("error persisting alerts", "file", alertPersistenceFilePath, "err", err) + } + logger.Info("persisted alerts to file", "file", alertPersistenceFilePath) + } + alerts.Close() + }() + + // if alertPersistenceFile is set, we will use it to load persisted alerts + if alertPersistenceFilePath != "" { + if err := loadAlerts(alertPersistenceFilePath, alerts); err != nil { + logger.Error("error loading persisted alerts", "file", alertPersistenceFilePath, "err", err) + } + logger.Info("loaded alerts from file", "file", alertPersistenceFilePath) + } var disp *dispatch.Dispatcher defer func() { @@ -589,6 +618,34 @@ func run() int { } } +func persistAlerts(alertPersistenceFilePath string, alertsToWrite *store.Alerts) error { + data, err := jsoniter.Marshal(alertsToWrite) + if err != nil { + return fmt.Errorf("error marshalling alerts to persistence file %s: %w", alertPersistenceFilePath, err) + } + + if err := os.WriteFile(alertPersistenceFilePath, data, 0o644); err != nil { + return fmt.Errorf("error writing alerts to persistence file %s: %w", alertPersistenceFilePath, err) + } + + return nil +} + +func loadAlerts(alertPersistenceFilePath string, alerts *mem.Alerts) error { + data, err := os.ReadFile(alertPersistenceFilePath) + if err != nil { + return fmt.Errorf("error reading alert persistence file %s: %w", alertPersistenceFilePath, err) + } + + readAlerts := new(store.Alerts) + if err := jsoniter.Unmarshal(data, readAlerts); err != nil { + return fmt.Errorf("error unmarshalling alerts from persistence file %s: %w", alertPersistenceFilePath, err) + } + + alerts.SetAlerts(readAlerts) + return nil +} + // clusterWait returns a function that inspects the current peer state and returns // a duration of one base timeout for each peer with a higher ID than ourselves. func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { diff --git a/go.mod b/go.mod index 5221932e33..150e87f142 100644 --- a/go.mod +++ b/go.mod @@ -76,6 +76,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/jpillora/backoff v1.0.0 // indirect + github.com/json-iterator/go v1.1.12 // indirect github.com/julienschmidt/httprouter v1.3.0 // indirect github.com/klauspost/compress v1.17.9 // indirect github.com/mailru/easyjson v0.7.7 // indirect @@ -83,6 +84,8 @@ require ( github.com/mdlayher/vsock v1.2.1 // indirect github.com/miekg/dns v1.1.41 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect diff --git a/go.sum b/go.sum index a633321b63..1a9fcf5788 100644 --- a/go.sum +++ b/go.sum @@ -335,6 +335,7 @@ github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCV github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= +github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= @@ -395,9 +396,11 @@ github.com/mitchellh/mapstructure v1.4.3/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY= github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 3948697d84..ebf05d3c1d 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -110,6 +110,19 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio return a, nil } +func (a *Alerts) GetAlerts() *store.Alerts { + a.mtx.Lock() + defer a.mtx.Unlock() + + return a.alerts +} + +func (a *Alerts) SetAlerts(alerts *store.Alerts) { + a.mtx.Lock() + a.alerts = alerts + a.mtx.Unlock() +} + func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) { t := time.NewTicker(interval) defer t.Stop() From c79024b38c8c812c9d877b04698131c209f80641 Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Wed, 9 Jul 2025 16:13:50 +0200 Subject: [PATCH 02/12] make (de)serialization method receiver and limit to actual serializable data --- .gitignore | 1 + cmd/alertmanager/main.go | 35 ++--------------------------------- provider/mem/mem.go | 11 ++++++----- store/store.go | 35 +++++++++++++++++++++++++++++++++++ 4 files changed, 44 insertions(+), 38 deletions(-) diff --git a/.gitignore b/.gitignore index 7ef77d321a..91b4f565d2 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,7 @@ /.release /.tarballs /vendor +cmd/alertmanager/alertmanager !.golangci.yml !/cli/testdata/*.yml diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 0c255ecca4..32b6eb4442 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -32,7 +32,6 @@ import ( "github.com/KimMachineGun/automemlimit/memlimit" "github.com/alecthomas/kingpin/v2" - jsoniter "github.com/json-iterator/go" "github.com/prometheus/client_golang/prometheus" versioncollector "github.com/prometheus/client_golang/prometheus/collectors/version" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -57,7 +56,6 @@ import ( "github.com/prometheus/alertmanager/notify" "github.com/prometheus/alertmanager/provider/mem" "github.com/prometheus/alertmanager/silence" - "github.com/prometheus/alertmanager/store" "github.com/prometheus/alertmanager/template" "github.com/prometheus/alertmanager/timeinterval" "github.com/prometheus/alertmanager/types" @@ -361,8 +359,7 @@ func run() int { defer func() { // if alertPersistenceFile is set, persist alerts if alertPersistenceFilePath != "" { - alertsToWrite := alerts.GetAlerts() - if err := persistAlerts(alertPersistenceFilePath, alertsToWrite); err != nil { + if err := alerts.PersistAlerts(alertPersistenceFilePath); err != nil { logger.Error("error persisting alerts", "file", alertPersistenceFilePath, "err", err) } logger.Info("persisted alerts to file", "file", alertPersistenceFilePath) @@ -372,7 +369,7 @@ func run() int { // if alertPersistenceFile is set, we will use it to load persisted alerts if alertPersistenceFilePath != "" { - if err := loadAlerts(alertPersistenceFilePath, alerts); err != nil { + if err := alerts.LoadAlerts(alertPersistenceFilePath); err != nil { logger.Error("error loading persisted alerts", "file", alertPersistenceFilePath, "err", err) } logger.Info("loaded alerts from file", "file", alertPersistenceFilePath) @@ -618,34 +615,6 @@ func run() int { } } -func persistAlerts(alertPersistenceFilePath string, alertsToWrite *store.Alerts) error { - data, err := jsoniter.Marshal(alertsToWrite) - if err != nil { - return fmt.Errorf("error marshalling alerts to persistence file %s: %w", alertPersistenceFilePath, err) - } - - if err := os.WriteFile(alertPersistenceFilePath, data, 0o644); err != nil { - return fmt.Errorf("error writing alerts to persistence file %s: %w", alertPersistenceFilePath, err) - } - - return nil -} - -func loadAlerts(alertPersistenceFilePath string, alerts *mem.Alerts) error { - data, err := os.ReadFile(alertPersistenceFilePath) - if err != nil { - return fmt.Errorf("error reading alert persistence file %s: %w", alertPersistenceFilePath, err) - } - - readAlerts := new(store.Alerts) - if err := jsoniter.Unmarshal(data, readAlerts); err != nil { - return fmt.Errorf("error unmarshalling alerts from persistence file %s: %w", alertPersistenceFilePath, err) - } - - alerts.SetAlerts(readAlerts) - return nil -} - // clusterWait returns a function that inspects the current peer state and returns // a duration of one base timeout for each peer with a higher ID than ourselves. func clusterWait(p *cluster.Peer, timeout time.Duration) func() time.Duration { diff --git a/provider/mem/mem.go b/provider/mem/mem.go index ebf05d3c1d..61f81761ca 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -110,17 +110,18 @@ func NewAlerts(ctx context.Context, m types.AlertMarker, intervalGC time.Duratio return a, nil } -func (a *Alerts) GetAlerts() *store.Alerts { +func (a *Alerts) PersistAlerts(alertPersistenceFilePath string) error { a.mtx.Lock() defer a.mtx.Unlock() - return a.alerts + return a.alerts.PersistAlerts(alertPersistenceFilePath) } -func (a *Alerts) SetAlerts(alerts *store.Alerts) { +func (a *Alerts) LoadAlerts(alertPersistenceFilePath string) error { a.mtx.Lock() - a.alerts = alerts - a.mtx.Unlock() + defer a.mtx.Unlock() + + return a.alerts.LoadAlerts(alertPersistenceFilePath) } func (a *Alerts) gcLoop(ctx context.Context, interval time.Duration) { diff --git a/store/store.go b/store/store.go index e4275aa1f5..3476f01198 100644 --- a/store/store.go +++ b/store/store.go @@ -16,9 +16,12 @@ package store import ( "context" "errors" + "fmt" + "os" "sync" "time" + jsoniter "github.com/json-iterator/go" "github.com/prometheus/common/model" "github.com/prometheus/alertmanager/types" @@ -47,6 +50,38 @@ func NewAlerts() *Alerts { return a } +func (a *Alerts) PersistAlerts(alertPersistenceFilePath string) error { + a.Lock() + defer a.Unlock() + + data, err := jsoniter.Marshal(a.c) + if err != nil { + return fmt.Errorf("error marshalling alerts to persistence file %s: %w", alertPersistenceFilePath, err) + } + + if err := os.WriteFile(alertPersistenceFilePath, data, 0o644); err != nil { + return fmt.Errorf("error writing alerts to persistence file %s: %w", alertPersistenceFilePath, err) + } + + return nil +} + +func (a *Alerts) LoadAlerts(alertPersistenceFilePath string) error { + a.Lock() + defer a.Unlock() + + data, err := os.ReadFile(alertPersistenceFilePath) + if err != nil { + return fmt.Errorf("error reading alert persistence file %s: %w", alertPersistenceFilePath, err) + } + + if err := jsoniter.Unmarshal(data, &a.c); err != nil { + return fmt.Errorf("error unmarshalling alerts from persistence file %s: %w", alertPersistenceFilePath, err) + } + + return nil +} + // SetGCCallback sets a GC callback to be executed after each GC. func (a *Alerts) SetGCCallback(cb func([]types.Alert)) { a.Lock() From 0456e46a94ecfb6da9be57ae7e6c5c58448eb931 Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Fri, 11 Jul 2025 11:39:18 +0200 Subject: [PATCH 03/12] incorporate upstream flushing patch --- dispatch/dispatch.go | 14 -------------- dispatch/dispatch_test.go | 24 +++++++----------------- test/with_api_v2/acceptance/send_test.go | 6 +++--- 3 files changed, 10 insertions(+), 34 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 6883786dca..1c890aa8d1 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -391,9 +391,6 @@ type aggrGroup struct { done chan struct{} next *time.Timer timeout func(time.Duration) time.Duration - - mtx sync.RWMutex - hasFlushed bool } // newAggrGroup returns a new aggregation group. @@ -460,10 +457,7 @@ func (ag *aggrGroup) run(nf notifyFunc) { ctx = notify.WithRouteID(ctx, ag.routeID) // Wait the configured interval before calling flush again. - ag.mtx.Lock() ag.next.Reset(ag.opts.GroupInterval) - ag.hasFlushed = true - ag.mtx.Unlock() ag.flush(func(alerts ...*types.Alert) bool { return nf(ctx, alerts...) @@ -489,14 +483,6 @@ func (ag *aggrGroup) insert(alert *types.Alert) { if err := ag.alerts.Set(alert); err != nil { ag.logger.Error("error on set alert", "err", err) } - - // Immediately trigger a flush if the wait duration for this - // alert is already over. - ag.mtx.Lock() - defer ag.mtx.Unlock() - if !ag.hasFlushed && alert.StartsAt.Add(ag.opts.GroupWait).Before(time.Now()) { - ag.next.Reset(0) - } } func (ag *aggrGroup) empty() bool { diff --git a/dispatch/dispatch_test.go b/dispatch/dispatch_test.go index 913032c6ce..c014b5da69 100644 --- a/dispatch/dispatch_test.go +++ b/dispatch/dispatch_test.go @@ -189,28 +189,18 @@ func TestAggrGroup(t *testing.T) { ag.stop() - // Add an alert that started more than group_interval in the past. We expect - // immediate flushing. - // Finally, set all alerts to be resolved. After successful notify the aggregation group - // should empty itself. + // Set all alerts to be resolved. After successful notify the aggregation group ag = newAggrGroup(context.Background(), lset, route, nil, promslog.NewNopLogger()) go ag.run(ntfy) - ag.insert(a1) ag.insert(a2) - // a2 lies way in the past so the initial group_wait should be skipped. - select { - case <-time.After(opts.GroupWait / 2): - t.Fatalf("expected immediate alert but received none") - - case batch := <-alertsCh: - exp := removeEndsAt(types.AlertSlice{a1, a2}) - sort.Sort(batch) + batch := <-alertsCh + exp := removeEndsAt(types.AlertSlice{a1, a2}) + sort.Sort(batch) - if !reflect.DeepEqual(batch, exp) { - t.Fatalf("expected alerts %v but got %v", exp, batch) - } + if !reflect.DeepEqual(batch, exp) { + t.Fatalf("expected alerts %v but got %v", exp, batch) } for i := 0; i < 3; i++ { @@ -241,7 +231,7 @@ func TestAggrGroup(t *testing.T) { a1r := *a1 a1r.EndsAt = time.Now() ag.insert(&a1r) - exp := append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) + exp = append(types.AlertSlice{&a1r}, removeEndsAt(types.AlertSlice{a2, a3})...) select { case <-time.After(2 * opts.GroupInterval): diff --git a/test/with_api_v2/acceptance/send_test.go b/test/with_api_v2/acceptance/send_test.go index fa190f44c1..96547e0253 100644 --- a/test/with_api_v2/acceptance/send_test.go +++ b/test/with_api_v2/acceptance/send_test.go @@ -453,9 +453,9 @@ receivers: amc.Push(At(4), Alert("alertname", "test2")) co.Want(Between(2, 2.5), Alert("alertname", "test1").Active(1)) - // Timers are reset on reload regardless, so we count the 6 second group - // interval from 3 onwards. - co.Want(Between(9, 9.5), + // Timers are reset on reload, so if the reload happens at 3 seconds + // then the first flush will happen at reload + group_wait seconds. + co.Want(Between(4, 4.5), Alert("alertname", "test1").Active(1), Alert("alertname", "test2").Active(4), ) From 37d359e9bbd52df4acac2f7c7bb0b32dac37ab1a Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Fri, 11 Jul 2025 11:41:23 +0200 Subject: [PATCH 04/12] improve logging --- cmd/alertmanager/main.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 32b6eb4442..06aca432fd 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -357,22 +357,25 @@ func run() int { } defer func() { + defer alerts.Close() + // if alertPersistenceFile is set, persist alerts if alertPersistenceFilePath != "" { if err := alerts.PersistAlerts(alertPersistenceFilePath); err != nil { logger.Error("error persisting alerts", "file", alertPersistenceFilePath, "err", err) + return } logger.Info("persisted alerts to file", "file", alertPersistenceFilePath) } - alerts.Close() }() // if alertPersistenceFile is set, we will use it to load persisted alerts if alertPersistenceFilePath != "" { if err := alerts.LoadAlerts(alertPersistenceFilePath); err != nil { logger.Error("error loading persisted alerts", "file", alertPersistenceFilePath, "err", err) + } else { + logger.Info("loaded alerts from file", "file", alertPersistenceFilePath) } - logger.Info("loaded alerts from file", "file", alertPersistenceFilePath) } var disp *dispatch.Dispatcher From 84d3bb57eb8d491155bc29fcc286c3744bc78e2a Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Fri, 11 Jul 2025 11:54:43 +0200 Subject: [PATCH 05/12] add gzip compression for alert persistence and integration tests --- cmd/alertmanager/main.go | 2 +- store/store.go | 39 ++++++- store/store_test.go | 235 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 271 insertions(+), 5 deletions(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 06aca432fd..244835604b 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -178,7 +178,7 @@ func run() int { label = kingpin.Flag("cluster.label", "The cluster label is an optional string to include on each packet and stream. It uniquely identifies the cluster and prevents cross-communication issues when sending gossip messages.").Default("").String() featureFlags = kingpin.Flag("enable-feature", fmt.Sprintf("Comma-separated experimental features to enable. Valid options: %s", strings.Join(featurecontrol.AllowedFlags, ", "))).Default("").String() - alertPersistenceFile = kingpin.Flag("storage.alert-persistence-file", "Alert persistence filename (in the base folder). If set, Alertmanager will persist alerts to this file on shutdown and restore them on startup.").Default("persisted-alerts.json").String() + alertPersistenceFile = kingpin.Flag("storage.alert-persistence-file", "Alert persistence filename (in the base folder). If set, Alertmanager will persist alerts to this file on shutdown and restore them on startup.").Default("persisted-alerts.json.gz").String() ) promslogflag.AddFlags(kingpin.CommandLine, &promslogConfig) diff --git a/store/store.go b/store/store.go index 3476f01198..6d74c41ec0 100644 --- a/store/store.go +++ b/store/store.go @@ -14,9 +14,11 @@ package store import ( + "compress/gzip" "context" "errors" "fmt" + "io" "os" "sync" "time" @@ -59,8 +61,20 @@ func (a *Alerts) PersistAlerts(alertPersistenceFilePath string) error { return fmt.Errorf("error marshalling alerts to persistence file %s: %w", alertPersistenceFilePath, err) } - if err := os.WriteFile(alertPersistenceFilePath, data, 0o644); err != nil { - return fmt.Errorf("error writing alerts to persistence file %s: %w", alertPersistenceFilePath, err) + // Create the file + file, err := os.OpenFile(alertPersistenceFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644) + if err != nil { + return fmt.Errorf("error creating persistence file %s: %w", alertPersistenceFilePath, err) + } + defer file.Close() + + // Write compressed data + gzipWriter := gzip.NewWriter(file) + if _, err := gzipWriter.Write(data); err != nil { + return fmt.Errorf("error writing compressed alerts to persistence file %s: %w", alertPersistenceFilePath, err) + } + if err := gzipWriter.Close(); err != nil { + return fmt.Errorf("error closing gzip writer for persistence file %s: %w", alertPersistenceFilePath, err) } return nil @@ -70,9 +84,26 @@ func (a *Alerts) LoadAlerts(alertPersistenceFilePath string) error { a.Lock() defer a.Unlock() - data, err := os.ReadFile(alertPersistenceFilePath) + // Open the file + file, err := os.Open(alertPersistenceFilePath) if err != nil { - return fmt.Errorf("error reading alert persistence file %s: %w", alertPersistenceFilePath, err) + return fmt.Errorf("error opening alert persistence file %s: %w", alertPersistenceFilePath, err) + } + defer file.Close() + + // Create gzip reader + gzipReader, err := gzip.NewReader(file) + if err != nil { + return fmt.Errorf("error creating gzip reader for persistence file %s: %w", alertPersistenceFilePath, err) + } + + // Read decompressed data + data, err := io.ReadAll(gzipReader) + if err != nil { + return fmt.Errorf("error reading decompressed data from persistence file %s: %w", alertPersistenceFilePath, err) + } + if err := gzipReader.Close(); err != nil { + return fmt.Errorf("error closing gzip reader for persistence file %s: %w", alertPersistenceFilePath, err) } if err := jsoniter.Unmarshal(data, &a.c); err != nil { diff --git a/store/store_test.go b/store/store_test.go index fe1cd0a8ae..5a65511ebd 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -14,7 +14,9 @@ package store import ( + "compress/gzip" "context" + "os" "testing" "time" @@ -195,3 +197,236 @@ func TestGC(t *testing.T) { } require.Len(t, resolved, n) } + +func TestAlerts_PersistAndLoadRoundTrip(t *testing.T) { + // Create a temporary directory for the test + tempDir := t.TempDir() + persistenceFile := tempDir + "/alerts.json.gz" + + // Create test alerts + now := time.Now() + alert1 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert1", + "instance": "localhost:9090", + "job": "prometheus", + "severity": "critical", + }, + Annotations: model.LabelSet{ + "description": "Test alert 1 description", + "summary": "Test alert 1 summary", + }, + StartsAt: now.Add(-10 * time.Minute), + EndsAt: time.Time{}, // Active alert + GeneratorURL: "http://localhost:9090/graph?g0.expr=up%3D%3D0", + }, + UpdatedAt: now, + Timeout: false, + } + + alert2 := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "TestAlert2", + "instance": "localhost:9091", + "job": "node_exporter", + "severity": "warning", + }, + Annotations: model.LabelSet{ + "description": "Test alert 2 description", + "summary": "Test alert 2 summary", + "runbook_url": "https://example.com/runbook", + }, + StartsAt: now.Add(-5 * time.Minute), + EndsAt: now.Add(-1 * time.Minute), // Resolved alert + GeneratorURL: "http://localhost:9090/graph?g1.expr=node_load1%3E1", + }, + UpdatedAt: now.Add(-1 * time.Minute), + Timeout: false, + } + + // Create alerts store and add test alerts + originalStore := NewAlerts() + require.NoError(t, originalStore.Set(alert1)) + require.NoError(t, originalStore.Set(alert2)) + + // Verify we have 2 alerts in the original store + originalAlerts := originalStore.List() + require.Len(t, originalAlerts, 2) + + // Persist alerts to disk + require.NoError(t, originalStore.PersistAlerts(persistenceFile)) + + // Verify the file was created and is not empty + fileInfo, err := os.Stat(persistenceFile) + require.NoError(t, err) + require.Greater(t, fileInfo.Size(), int64(0)) + + // Create a new alerts store and load from disk + newStore := NewAlerts() + require.True(t, newStore.Empty()) + + require.NoError(t, newStore.LoadAlerts(persistenceFile)) + + // Verify we have the same number of alerts + loadedAlerts := newStore.List() + require.Len(t, loadedAlerts, 2) + + // Verify alert data consistency + alertMap := make(map[model.Fingerprint]*types.Alert) + for _, alert := range loadedAlerts { + alertMap[alert.Fingerprint()] = alert + } + + // Check alert1 + loadedAlert1, exists := alertMap[alert1.Fingerprint()] + require.True(t, exists, "Alert1 should exist in loaded alerts") + require.Equal(t, alert1.Labels, loadedAlert1.Labels) + require.Equal(t, alert1.Annotations, loadedAlert1.Annotations) + require.True(t, alert1.StartsAt.Equal(loadedAlert1.StartsAt)) + require.True(t, alert1.EndsAt.Equal(loadedAlert1.EndsAt)) + require.Equal(t, alert1.GeneratorURL, loadedAlert1.GeneratorURL) + require.True(t, alert1.UpdatedAt.Equal(loadedAlert1.UpdatedAt)) + require.Equal(t, alert1.Timeout, loadedAlert1.Timeout) + + // Check alert2 + loadedAlert2, exists := alertMap[alert2.Fingerprint()] + require.True(t, exists, "Alert2 should exist in loaded alerts") + require.Equal(t, alert2.Labels, loadedAlert2.Labels) + require.Equal(t, alert2.Annotations, loadedAlert2.Annotations) + require.True(t, alert2.StartsAt.Equal(loadedAlert2.StartsAt)) + require.True(t, alert2.EndsAt.Equal(loadedAlert2.EndsAt)) + require.Equal(t, alert2.GeneratorURL, loadedAlert2.GeneratorURL) + require.True(t, alert2.UpdatedAt.Equal(loadedAlert2.UpdatedAt)) + require.Equal(t, alert2.Timeout, loadedAlert2.Timeout) + + // Verify individual alert retrieval works + retrievedAlert1, err := newStore.Get(alert1.Fingerprint()) + require.NoError(t, err) + require.Equal(t, alert1.Labels, retrievedAlert1.Labels) + + retrievedAlert2, err := newStore.Get(alert2.Fingerprint()) + require.NoError(t, err) + require.Equal(t, alert2.Labels, retrievedAlert2.Labels) +} + +func TestAlerts_PersistAndLoadEmptyStore(t *testing.T) { + tempDir := t.TempDir() + persistenceFile := tempDir + "/empty_alerts.json.gz" + + // Create empty alerts store + originalStore := NewAlerts() + require.True(t, originalStore.Empty()) + + // Persist empty store to disk + require.NoError(t, originalStore.PersistAlerts(persistenceFile)) + + // Verify the file was created + fileInfo, err := os.Stat(persistenceFile) + require.NoError(t, err) + require.Greater(t, fileInfo.Size(), int64(0)) // Should still have some data due to gzip overhead + + // Load into new store + newStore := NewAlerts() + require.NoError(t, newStore.LoadAlerts(persistenceFile)) + + // Verify the new store is also empty + require.True(t, newStore.Empty()) + require.Len(t, newStore.List(), 0) +} + +func TestAlerts_LoadNonExistentFile(t *testing.T) { + store := NewAlerts() + err := store.LoadAlerts("/non/existent/file.json.gz") + require.Error(t, err) + require.Contains(t, err.Error(), "error opening alert persistence file") +} + +func TestAlerts_PersistToInvalidPath(t *testing.T) { + store := NewAlerts() + + // Try to persist to a directory that doesn't exist + err := store.PersistAlerts("/non/existent/directory/alerts.json.gz") + require.Error(t, err) + require.Contains(t, err.Error(), "error creating persistence file") +} + +func TestAlerts_LoadCorruptedFile(t *testing.T) { + tempDir := t.TempDir() + corruptedFile := tempDir + "/corrupted.json.gz" + + // Create a file with invalid gzip content + require.NoError(t, os.WriteFile(corruptedFile, []byte("this is not gzip data"), 0o644)) + + store := NewAlerts() + err := store.LoadAlerts(corruptedFile) + require.Error(t, err) + require.Contains(t, err.Error(), "error creating gzip reader") +} + +func TestAlerts_LoadInvalidJSON(t *testing.T) { + tempDir := t.TempDir() + invalidJSONFile := tempDir + "/invalid.json.gz" + + // Create a gzipped file with invalid JSON + file, err := os.Create(invalidJSONFile) + require.NoError(t, err) + defer file.Close() + + writer := gzip.NewWriter(file) + _, err = writer.Write([]byte("{ invalid json content")) + require.NoError(t, err) + require.NoError(t, writer.Close()) + + store := NewAlerts() + err = store.LoadAlerts(invalidJSONFile) + require.Error(t, err) + require.Contains(t, err.Error(), "error unmarshalling alerts") +} + +func TestAlerts_MultipleRoundTrips(t *testing.T) { + tempDir := t.TempDir() + persistenceFile := tempDir + "/multi_roundtrip.json.gz" + + // Create initial alert + now := time.Now() + alert := &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{ + "alertname": "MultiRoundTripAlert", + "instance": "localhost:8080", + }, + Annotations: model.LabelSet{ + "description": "Multi round trip test", + }, + StartsAt: now, + EndsAt: time.Time{}, + GeneratorURL: "http://localhost:9090/test", + }, + UpdatedAt: now, + Timeout: false, + } + + // First round trip + store1 := NewAlerts() + require.NoError(t, store1.Set(alert)) + require.NoError(t, store1.PersistAlerts(persistenceFile)) + + // Second round trip + store2 := NewAlerts() + require.NoError(t, store2.LoadAlerts(persistenceFile)) + require.NoError(t, store2.PersistAlerts(persistenceFile)) + + // Third round trip + store3 := NewAlerts() + require.NoError(t, store3.LoadAlerts(persistenceFile)) + + // Verify data integrity after multiple round trips + alerts := store3.List() + require.Len(t, alerts, 1) + require.Equal(t, alert.Labels, alerts[0].Labels) + require.Equal(t, alert.Annotations, alerts[0].Annotations) + require.True(t, alert.StartsAt.Equal(alerts[0].StartsAt)) + require.True(t, alert.UpdatedAt.Equal(alerts[0].UpdatedAt)) +} From 462a9692506de4fb90a39f32e50457f9b7c8363a Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Fri, 11 Jul 2025 12:32:16 +0200 Subject: [PATCH 06/12] log alerts load time upon success --- cmd/alertmanager/main.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/alertmanager/main.go b/cmd/alertmanager/main.go index 244835604b..f8c3555b92 100644 --- a/cmd/alertmanager/main.go +++ b/cmd/alertmanager/main.go @@ -371,10 +371,11 @@ func run() int { // if alertPersistenceFile is set, we will use it to load persisted alerts if alertPersistenceFilePath != "" { + loadStart := time.Now() if err := alerts.LoadAlerts(alertPersistenceFilePath); err != nil { logger.Error("error loading persisted alerts", "file", alertPersistenceFilePath, "err", err) } else { - logger.Info("loaded alerts from file", "file", alertPersistenceFilePath) + logger.Info("loaded alerts from file", "file", alertPersistenceFilePath, "duration", time.Since(loadStart)) } } From 1854a831b41d08fed8b8b2d854918c12888f565b Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Tue, 15 Jul 2025 09:51:29 +0200 Subject: [PATCH 07/12] store the configured repeat interval in the alert annotations for consecutive deduplication --- notify/notify.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/notify/notify.go b/notify/notify.go index 3973e7876b..a2775b9dc2 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -936,6 +936,15 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*t } expiry := 2 * repeat + // store the configured repeat interval in the alert annotations + // so that it can be used by alert-handler to deduplicate alerts / notifications + for _, alert := range alerts { + if alert.Annotations == nil { + alert.Annotations = model.LabelSet{} + } + alert.Annotations["repeat_interval"] = model.LabelValue(repeat.String()) + } + return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry) } From 8acac7e91bf3de1f3c9f5b6377c99c9ed7e1d43d Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Tue, 15 Jul 2025 14:30:56 +0200 Subject: [PATCH 08/12] add logging for alert annotation --- notify/notify.go | 1 + 1 file changed, 1 insertion(+) diff --git a/notify/notify.go b/notify/notify.go index a2775b9dc2..9a6516635b 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -943,6 +943,7 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*t alert.Annotations = model.LabelSet{} } alert.Annotations["repeat_interval"] = model.LabelValue(repeat.String()) + l.Info("Set repeat_interval annotation", "annotations", alert.Annotations) } return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry) From 22009eae40b233cb37c14481c1ed6bb9439fa2b6 Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Thu, 17 Jul 2025 12:57:19 +0200 Subject: [PATCH 09/12] move annotation logic to dispatcher --- dispatch/dispatch.go | 8 ++++++++ notify/notify.go | 10 ---------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 1c890aa8d1..d7df960c05 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -342,6 +342,14 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() + // store the configured repeat interval in the alert annotations + // so that it can be used by alert-handler to deduplicate alerts / notifications + if alert.Annotations == nil { + alert.Annotations = model.LabelSet{} + } + alert.Annotations["repeat_interval"] = model.LabelValue(route.RouteOpts.RepeatInterval.String()) + d.logger.Info("Set repeat_interval annotation", "alert", alert.Name(), "annotations", alert.Annotations) + // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st // alert is already there. diff --git a/notify/notify.go b/notify/notify.go index 9a6516635b..3973e7876b 100644 --- a/notify/notify.go +++ b/notify/notify.go @@ -936,16 +936,6 @@ func (n SetNotifiesStage) Exec(ctx context.Context, l *slog.Logger, alerts ...*t } expiry := 2 * repeat - // store the configured repeat interval in the alert annotations - // so that it can be used by alert-handler to deduplicate alerts / notifications - for _, alert := range alerts { - if alert.Annotations == nil { - alert.Annotations = model.LabelSet{} - } - alert.Annotations["repeat_interval"] = model.LabelValue(repeat.String()) - l.Info("Set repeat_interval annotation", "annotations", alert.Annotations) - } - return ctx, alerts, n.nflog.Log(n.recv, gkey, firing, resolved, expiry) } From 828d5af9aaacc0fa3c28299676d73d5630c65bdc Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Thu, 24 Jul 2025 09:32:43 +0200 Subject: [PATCH 10/12] remove verbose logging for annotation modifiactions --- dispatch/dispatch.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index d7df960c05..677c15ab8e 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -348,7 +348,6 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { alert.Annotations = model.LabelSet{} } alert.Annotations["repeat_interval"] = model.LabelValue(route.RouteOpts.RepeatInterval.String()) - d.logger.Info("Set repeat_interval annotation", "alert", alert.Name(), "annotations", alert.Annotations) // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st From 1ca12c3b8d6a1899c04e36b84ae8dd2636eed272 Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Wed, 30 Jul 2025 15:28:43 +0200 Subject: [PATCH 11/12] ensure alert annotation is added every time and add logging --- dispatch/dispatch.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 677c15ab8e..6dc599d97d 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -318,6 +318,14 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { d.mtx.Lock() defer d.mtx.Unlock() + // store the configured repeat interval in the alert annotations + // so that it can be used by alert-handler to deduplicate alerts / notifications + if alert.Annotations == nil { + alert.Annotations = model.LabelSet{} + } + d.logger.Info("Storing repeat interval in alert annotations", "alert_name", alert.Name(), "repeat_interval", route.RouteOpts.RepeatInterval.String()) + alert.Annotations["repeat_interval"] = model.LabelValue(route.RouteOpts.RepeatInterval.String()) + routeGroups, ok := d.aggrGroupsPerRoute[route] if !ok { routeGroups = map[model.Fingerprint]*aggrGroup{} @@ -342,13 +350,6 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { d.aggrGroupsNum++ d.metrics.aggrGroups.Inc() - // store the configured repeat interval in the alert annotations - // so that it can be used by alert-handler to deduplicate alerts / notifications - if alert.Annotations == nil { - alert.Annotations = model.LabelSet{} - } - alert.Annotations["repeat_interval"] = model.LabelValue(route.RouteOpts.RepeatInterval.String()) - // Insert the 1st alert in the group before starting the group's run() // function, to make sure that when the run() will be executed the 1st // alert is already there. From 81c0d1fbac57a292188a115430ef48b3ddfea4e6 Mon Sep 17 00:00:00 2001 From: Fabian Kohn Date: Thu, 31 Jul 2025 16:03:52 +0200 Subject: [PATCH 12/12] make logging message about annotation debug level --- dispatch/dispatch.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dispatch/dispatch.go b/dispatch/dispatch.go index 6dc599d97d..b0755cc521 100644 --- a/dispatch/dispatch.go +++ b/dispatch/dispatch.go @@ -323,7 +323,7 @@ func (d *Dispatcher) processAlert(alert *types.Alert, route *Route) { if alert.Annotations == nil { alert.Annotations = model.LabelSet{} } - d.logger.Info("Storing repeat interval in alert annotations", "alert_name", alert.Name(), "repeat_interval", route.RouteOpts.RepeatInterval.String()) + d.logger.Debug("Storing repeat interval in alert annotations", "alert_name", alert.Name(), "repeat_interval", route.RouteOpts.RepeatInterval.String()) alert.Annotations["repeat_interval"] = model.LabelValue(route.RouteOpts.RepeatInterval.String()) routeGroups, ok := d.aggrGroupsPerRoute[route]