diff --git a/adapter/admin_grpc.go b/adapter/admin_grpc.go index cbca0b57f..bab449b73 100644 --- a/adapter/admin_grpc.go +++ b/adapter/admin_grpc.go @@ -1,14 +1,17 @@ package adapter import ( + "bytes" "context" "crypto/subtle" "sort" + "strconv" "strings" "sync" "time" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/keyviz" pb "github.com/bootjp/elastickv/proto" "github.com/cockroachdb/errors" "google.golang.org/grpc" @@ -17,6 +20,19 @@ import ( "google.golang.org/grpc/status" ) +// KeyVizSampler is the read-side abstraction the Admin service needs +// from the keyviz package: a time-bounded matrix snapshot. Defined +// here (not in keyviz) so tests can pass an in-memory fake without +// constructing a full *keyviz.MemSampler. *keyviz.MemSampler +// satisfies this interface. +type KeyVizSampler interface { + // Snapshot returns the matrix columns in [from, to). Either + // bound may be the zero time meaning unbounded on that side. + // Implementations must return rows the caller can mutate freely + // (a deep copy) — see keyviz.MemSampler.Snapshot. + Snapshot(from, to time.Time) []keyviz.MatrixColumn +} + // AdminGroup exposes per-Raft-group state to the Admin service. It is a narrow // subset of raftengine.Engine so tests can supply an in-memory fake without // standing up a real Raft cluster. Configuration is polled on each @@ -56,6 +72,12 @@ type AdminServer struct { // instance cannot contend with concurrent RPCs on another instance. now func() time.Time + // sampler exposes the keyviz heatmap matrix to GetKeyVizMatrix. + // Nil means keyviz is disabled — the RPC returns Unavailable. + // Guarded by groupsMu (same lock as groups/now) so RegisterSampler + // pairs atomically with concurrent RPC reads. + sampler KeyVizSampler + pb.UnimplementedAdminServer } @@ -97,6 +119,16 @@ func (s *AdminServer) RegisterGroup(groupID uint64, g AdminGroup) { s.groupsMu.Unlock() } +// RegisterSampler wires the keyviz sampler used by GetKeyVizMatrix. +// Without this call (or with a nil sampler) the RPC returns +// codes.Unavailable so callers can distinguish "keyviz disabled" +// from "no data yet". +func (s *AdminServer) RegisterSampler(sampler KeyVizSampler) { + s.groupsMu.Lock() + s.sampler = sampler + s.groupsMu.Unlock() +} + // GetClusterOverview returns the local node identity, the current member // list, and per-group leader identity collected from the engines registered // via RegisterGroup. The member list is the union of (a) the bootstrap seed @@ -477,3 +509,166 @@ func AdminTokenAuth(token string) (grpc.UnaryServerInterceptor, grpc.StreamServe // ErrAdminTokenRequired is returned by NewAdminServer helpers when the operator // failed to supply a token and also did not opt into insecure mode. var ErrAdminTokenRequired = errors.New("admin token file required; pass --adminInsecureNoAuth to run without") + +// GetKeyVizMatrix renders the keyviz heatmap matrix for the [from, to) +// range supplied by the request, returning one KeyVizRow per tracked +// route or virtual bucket and a parallel column-timestamp slice. +// +// Series selection (Reads / Writes / ReadBytes / WriteBytes) maps from +// the request's KeyVizSeries enum to the matching keyviz.MatrixRow +// counter; KEYVIZ_SERIES_UNSPECIFIED defaults to Reads. +// +// Returns codes.Unavailable when no sampler is registered (keyviz +// disabled) so callers can distinguish that from "no data yet" +// (which yields a successful empty response). +func (s *AdminServer) GetKeyVizMatrix( + _ context.Context, + req *pb.GetKeyVizMatrixRequest, +) (*pb.GetKeyVizMatrixResponse, error) { + s.groupsMu.RLock() + sampler := s.sampler + s.groupsMu.RUnlock() + if sampler == nil { + return nil, errors.WithStack(status.Error(codes.Unavailable, "keyviz sampler not configured on this node")) + } + from := unixMsToTime(req.GetFromUnixMs()) + to := unixMsToTime(req.GetToUnixMs()) + cols := sampler.Snapshot(from, to) + pickValue := matrixSeriesPicker(req.GetSeries()) + return matrixToProto(cols, pickValue, int(req.GetRows())), nil +} + +// unixMsToTime converts a Unix-millisecond timestamp into a time.Time, +// returning the zero Time when the input is zero so the sampler reads +// an unbounded range on that side. +func unixMsToTime(ms int64) time.Time { + if ms == 0 { + return time.Time{} + } + return time.UnixMilli(ms) +} + +// matrixSeriesPicker returns a callback that extracts the requested +// counter from a MatrixRow. KEYVIZ_SERIES_UNSPECIFIED (and READS) +// fall through to Reads so a default-valued request still returns +// something useful. +func matrixSeriesPicker(series pb.KeyVizSeries) func(keyviz.MatrixRow) uint64 { + switch series { + case pb.KeyVizSeries_KEYVIZ_SERIES_WRITES: + return func(r keyviz.MatrixRow) uint64 { return r.Writes } + case pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES: + return func(r keyviz.MatrixRow) uint64 { return r.ReadBytes } + case pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES: + return func(r keyviz.MatrixRow) uint64 { return r.WriteBytes } + case pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, pb.KeyVizSeries_KEYVIZ_SERIES_READS: + return func(r keyviz.MatrixRow) uint64 { return r.Reads } + default: + return func(r keyviz.MatrixRow) uint64 { return r.Reads } + } +} + +// matrixToProto pivots the column-major MatrixColumn slice into the +// row-major proto layout: one KeyVizRow per distinct RouteID with a +// values slice aligned to the column_unix_ms parallel slice. Idle +// routes (zero in every column) are not emitted by the sampler, so +// the row set already reflects observed activity in [from, to). +// +// rowBudget caps how many rows the response carries — passing +// 0 means "no cap." When the budget would be exceeded, rows are +// sorted by total activity across the requested series and the +// top-N retained, so callers asking for a compact matrix do not +// receive a payload that scales with the route count. +func matrixToProto(cols []keyviz.MatrixColumn, pick func(keyviz.MatrixRow) uint64, rowBudget int) *pb.GetKeyVizMatrixResponse { + resp := &pb.GetKeyVizMatrixResponse{ + ColumnUnixMs: make([]int64, len(cols)), + } + rowsByID := make(map[uint64]*pb.KeyVizRow) + order := make([]uint64, 0) + for j, col := range cols { + resp.ColumnUnixMs[j] = col.At.UnixMilli() + for _, mr := range col.Rows { + pr, ok := rowsByID[mr.RouteID] + if !ok { + pr = newKeyVizRowFrom(mr, len(cols)) + rowsByID[mr.RouteID] = pr + order = append(order, mr.RouteID) + } + pr.Values[j] = pick(mr) + } + } + resp.Rows = make([]*pb.KeyVizRow, len(order)) + for i, id := range order { + resp.Rows[i] = rowsByID[id] + } + resp.Rows = applyKeyVizRowBudget(resp.Rows, rowBudget) + sortKeyVizRowsByStart(resp.Rows) + return resp +} + +// applyKeyVizRowBudget caps rows to budget by total activity per row +// (sum of per-column values), preserving the top-N rows. budget <= 0 +// means "no cap." +func applyKeyVizRowBudget(rows []*pb.KeyVizRow, budget int) []*pb.KeyVizRow { + if budget <= 0 || len(rows) <= budget { + return rows + } + sort.Slice(rows, func(i, j int) bool { + return rowActivityTotal(rows[i]) > rowActivityTotal(rows[j]) + }) + return rows[:budget] +} + +func rowActivityTotal(r *pb.KeyVizRow) uint64 { + var sum uint64 + for _, v := range r.Values { + sum += v + } + return sum +} + +// newKeyVizRowFrom seeds a proto row from the first MatrixRow seen +// for a given RouteID. Values is allocated with len == numCols so +// every column gets a deterministic slot (zero-valued by default). +// +// route_count surfaces MemberRoutesTotal (the true number of routes +// folded into the bucket) — not just len(MemberRoutes), which the +// sampler caps at MaxMemberRoutesPerSlot. When the visible list is +// shorter than the total, route_ids_truncated lets consumers know +// to trust route_count for drill-down decisions. +func newKeyVizRowFrom(mr keyviz.MatrixRow, numCols int) *pb.KeyVizRow { + total := mr.MemberRoutesTotal + if !mr.Aggregate && total == 0 { + // Individual slots fall through to RouteCount=1 when the + // sampler predates MemberRoutesTotal or never set it. + total = 1 + } + row := &pb.KeyVizRow{ + BucketId: bucketIDFor(mr), + Start: append([]byte(nil), mr.Start...), + End: append([]byte(nil), mr.End...), + Aggregate: mr.Aggregate, + RouteCount: total, + RouteIdsTruncated: mr.Aggregate && total > uint64(len(mr.MemberRoutes)), + Values: make([]uint64, numCols), + } + if mr.Aggregate { + row.RouteIds = append([]uint64(nil), mr.MemberRoutes...) + } + return row +} + +func bucketIDFor(mr keyviz.MatrixRow) string { + if mr.Aggregate { + return "virtual:" + strconv.FormatUint(mr.RouteID, 10) + } + return "route:" + strconv.FormatUint(mr.RouteID, 10) +} + +func sortKeyVizRowsByStart(rows []*pb.KeyVizRow) { + sort.Slice(rows, func(i, j int) bool { + if c := bytes.Compare(rows[i].Start, rows[j].Start); c != 0 { + return c < 0 + } + return rows[i].BucketId < rows[j].BucketId + }) +} diff --git a/adapter/admin_grpc_keyviz_test.go b/adapter/admin_grpc_keyviz_test.go new file mode 100644 index 000000000..721eadae1 --- /dev/null +++ b/adapter/admin_grpc_keyviz_test.go @@ -0,0 +1,246 @@ +package adapter + +import ( + "context" + "testing" + "time" + + "github.com/bootjp/elastickv/keyviz" + pb "github.com/bootjp/elastickv/proto" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +// fakeKeyVizSampler is a deterministic in-memory KeyVizSampler so +// AdminServer tests don't need to drive a real keyviz.MemSampler with +// goroutines and time. Snapshot returns a fresh deep copy of the +// configured columns so the test mirrors the real sampler's contract. +type fakeKeyVizSampler struct { + cols []keyviz.MatrixColumn +} + +func (f *fakeKeyVizSampler) Snapshot(_, _ time.Time) []keyviz.MatrixColumn { + out := make([]keyviz.MatrixColumn, len(f.cols)) + for i, c := range f.cols { + rows := make([]keyviz.MatrixRow, len(c.Rows)) + for j, r := range c.Rows { + rows[j] = r + rows[j].Start = append([]byte(nil), r.Start...) + rows[j].End = append([]byte(nil), r.End...) + if len(r.MemberRoutes) > 0 { + rows[j].MemberRoutes = append([]uint64(nil), r.MemberRoutes...) + } + } + out[i] = keyviz.MatrixColumn{At: c.At, Rows: rows} + } + return out +} + +// TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered pins +// the failure mode operators should see when keyviz is disabled on +// a node — Unavailable rather than a successful empty response. +func TestGetKeyVizMatrixReturnsUnavailableWhenSamplerNotRegistered(t *testing.T) { + t.Parallel() + srv := NewAdminServer(NodeIdentity{NodeID: "node-a"}, nil) + _, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{}) + st, ok := status.FromError(err) + if !ok || st.Code() != codes.Unavailable { + t.Fatalf("expected Unavailable, got %v", err) + } +} + +// TestGetKeyVizMatrixPivotsColumnsToRows pins the row-major proto +// layout: one KeyVizRow per RouteID with values aligned to the +// parallel column_unix_ms slice. Drives a fake sampler with two +// columns and two routes (one of which reports zero in column 1). +func TestGetKeyVizMatrixPivotsColumnsToRows(t *testing.T) { + t.Parallel() + t0 := time.Unix(1_700_000_000, 0) + t1 := t0.Add(time.Minute) + srv := newAdminServerWithFakeSampler(t, twoColumnTwoRouteCols(t0, t1)) + + resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{ + Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS, + }) + require.NoError(t, err) + require.Equal(t, []int64{t0.UnixMilli(), t1.UnixMilli()}, resp.ColumnUnixMs) + require.Len(t, resp.Rows, 2) + // Sorted by Start: route 1 ("a") then route 2 ("m"). + r1, r2 := resp.Rows[0], resp.Rows[1] + require.Equal(t, "route:1", r1.BucketId) + require.Equal(t, "route:2", r2.BucketId) + require.Equal(t, []byte("a"), r1.Start) + require.Equal(t, []byte("m"), r1.End) + require.False(t, r1.Aggregate) + require.False(t, r2.Aggregate) + require.Equal(t, []uint64{4, 9}, r1.Values) + // Route 2 is absent in column 1 — zero by default. + require.Equal(t, []uint64{7, 0}, r2.Values) +} + +func twoColumnTwoRouteCols(t0, t1 time.Time) []keyviz.MatrixColumn { + return []keyviz.MatrixColumn{ + { + At: t0, + Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("m"), Reads: 4, Writes: 1}, + {RouteID: 2, Start: []byte("m"), End: []byte("z"), Reads: 7, Writes: 0}, + }, + }, + { + At: t1, + Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("m"), Reads: 9, Writes: 3}, + }, + }, + } +} + +func newAdminServerWithFakeSampler(t *testing.T, cols []keyviz.MatrixColumn) *AdminServer { + t.Helper() + srv := NewAdminServer(NodeIdentity{NodeID: "node-a"}, nil) + srv.RegisterSampler(&fakeKeyVizSampler{cols: cols}) + return srv +} + +// TestGetKeyVizMatrixSeriesSelection pins the request.Series → +// MatrixRow counter mapping including the UNSPECIFIED → Reads default. +func TestGetKeyVizMatrixSeriesSelection(t *testing.T) { + t.Parallel() + row := keyviz.MatrixRow{ + RouteID: 1, + Start: []byte("a"), + End: []byte("z"), + Reads: 11, + Writes: 22, + ReadBytes: 333, + WriteBytes: 4444, + } + srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{ + {At: time.Unix(1_700_000_000, 0), Rows: []keyviz.MatrixRow{row}}, + }) + + for _, tc := range []struct { + name string + series pb.KeyVizSeries + want uint64 + }{ + {"unspecified defaults to reads", pb.KeyVizSeries_KEYVIZ_SERIES_UNSPECIFIED, 11}, + {"reads", pb.KeyVizSeries_KEYVIZ_SERIES_READS, 11}, + {"writes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITES, 22}, + {"read_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_READ_BYTES, 333}, + {"write_bytes", pb.KeyVizSeries_KEYVIZ_SERIES_WRITE_BYTES, 4444}, + } { + t.Run(tc.name, func(t *testing.T) { + resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{Series: tc.series}) + require.NoError(t, err) + require.Len(t, resp.Rows, 1) + require.Equal(t, []uint64{tc.want}, resp.Rows[0].Values) + }) + } +} + +// TestGetKeyVizMatrixEncodesAggregateBucket pins the proto layout +// for virtual buckets: bucket_id prefixed "virtual:", aggregate=true, +// route_ids carries the visible MemberRoutes list, and route_count +// reports the TRUE total (MemberRoutesTotal) — including past-cap +// folded routes — so consumers always know how many routes +// contributed. +func TestGetKeyVizMatrixEncodesAggregateBucket(t *testing.T) { + t.Parallel() + srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + { + RouteID: ^uint64(0), // synthetic virtual-bucket ID + Start: []byte("c"), + End: []byte("d"), + Aggregate: true, + MemberRoutes: []uint64{2, 3, 4}, + MemberRoutesTotal: 3, + Reads: 50, + }, + }, + }, + }) + + resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{ + Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS, + }) + require.NoError(t, err) + require.Len(t, resp.Rows, 1) + r := resp.Rows[0] + require.True(t, r.Aggregate) + require.Equal(t, "virtual:18446744073709551615", r.BucketId) + require.Equal(t, uint64(3), r.RouteCount) + require.False(t, r.RouteIdsTruncated) + require.Equal(t, []uint64{2, 3, 4}, r.RouteIds) +} + +// TestGetKeyVizMatrixSurfacesRouteCountTruncation pins Codex round-1 +// P2 on PR #646: when the sampler caps MemberRoutes at +// MaxMemberRoutesPerSlot, route_count must still report the TRUE +// total (MemberRoutesTotal) and route_ids_truncated must flip true so +// consumers know the visible list is a prefix. +func TestGetKeyVizMatrixSurfacesRouteCountTruncation(t *testing.T) { + t.Parallel() + srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + { + RouteID: ^uint64(0), + Start: []byte("c"), + End: []byte("d"), + Aggregate: true, + MemberRoutes: []uint64{2, 3}, // visible cap=2 + MemberRoutesTotal: 9, // 7 more folded past the cap + Reads: 100, + }, + }, + }, + }) + + resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{ + Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS, + }) + require.NoError(t, err) + require.Len(t, resp.Rows, 1) + r := resp.Rows[0] + require.Equal(t, uint64(9), r.RouteCount, "route_count must reflect MemberRoutesTotal") + require.True(t, r.RouteIdsTruncated, "route_ids_truncated must signal capped membership") + require.Equal(t, []uint64{2, 3}, r.RouteIds) +} + +// TestGetKeyVizMatrixHonorsRowsBudget pins Codex round-1 P1 on +// PR #646: a request with rows=N must return at most N rows. We +// stage 4 routes with distinct activity totals and request rows=2; +// the response must contain only the two highest-activity routes, +// sorted by Start. +func TestGetKeyVizMatrixHonorsRowsBudget(t *testing.T) { + t.Parallel() + srv := newAdminServerWithFakeSampler(t, []keyviz.MatrixColumn{ + { + At: time.Unix(1_700_000_000, 0), + Rows: []keyviz.MatrixRow{ + {RouteID: 1, Start: []byte("a"), End: []byte("b"), Reads: 1}, + {RouteID: 2, Start: []byte("b"), End: []byte("c"), Reads: 100}, + {RouteID: 3, Start: []byte("c"), End: []byte("d"), Reads: 5}, + {RouteID: 4, Start: []byte("d"), End: []byte("e"), Reads: 50}, + }, + }, + }) + + resp, err := srv.GetKeyVizMatrix(context.Background(), &pb.GetKeyVizMatrixRequest{ + Series: pb.KeyVizSeries_KEYVIZ_SERIES_READS, + Rows: 2, + }) + require.NoError(t, err) + require.Len(t, resp.Rows, 2, "rows budget must cap response size") + // Top 2 by activity = routes 2 (100) and 4 (50); sorted by Start + // gives "b" then "d". + require.Equal(t, "route:2", resp.Rows[0].BucketId) + require.Equal(t, "route:4", resp.Rows[1].BucketId) +} diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 0d816c08f..9a8561cd4 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -58,11 +58,19 @@ const ( // Sampler is the narrow interface the coordinator depends on. The // nil-safe contract is documented per-method so a coordinator wired // without a sampler compiles to a no-op call. +// +// Implementations MUST be nil-receiver-safe: a typed-nil +// implementation passed through this interface (e.g. +// `var s Sampler = (*MemSampler)(nil)`) must not panic when its +// methods are called. The coordinator stores the interface value as +// supplied and dispatches through it on the hot path; a guard at the +// call site only checks for an interface-nil, not a typed-nil. type Sampler interface { // Observe records a single request against a route. Op identifies // the counter family. keyLen and valueLen are summed into the // matching *Bytes counter; pass 0 for read-only ops where the - // payload size is irrelevant. + // payload size is irrelevant. Implementations must no-op (not + // panic) when invoked on a typed-nil receiver. Observe(routeID uint64, op Op, keyLen, valueLen int) } @@ -226,6 +234,12 @@ type routeSlot struct { // routes together (Snapshot surfaces this in MatrixRow). Aggregate bool MemberRoutes []uint64 + // MemberRoutesTotal counts every distinct routeID that has folded + // into this bucket, including ones beyond MaxMemberRoutesPerSlot + // (which still contribute to the counters but are not appended to + // MemberRoutes). Always equals len(MemberRoutes) for individual + // (non-Aggregate) slots. + MemberRoutesTotal uint64 reads atomic.Uint64 writes atomic.Uint64 @@ -238,7 +252,7 @@ type routeSlot struct { // Start/End/MemberRoutes with the live slot (which a later // RegisterRoute may extend, and which the snapshot API exports to // external consumers that may mutate the bounds). -func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64) { +func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members []uint64, membersTotal uint64) { s.metaMu.RLock() defer s.metaMu.RUnlock() start = cloneBytes(s.Start) @@ -247,6 +261,7 @@ func (s *routeSlot) snapshotMeta() (start, end []byte, aggregate bool, members [ if len(s.MemberRoutes) > 0 { members = append([]uint64(nil), s.MemberRoutes...) } + membersTotal = s.MemberRoutesTotal return } @@ -263,6 +278,12 @@ type MatrixRow struct { Start, End []byte Aggregate bool MemberRoutes []uint64 + // MemberRoutesTotal is how many distinct route IDs contributed to + // this row's counters, including ones that exceeded + // MaxMemberRoutesPerSlot and so are NOT listed in MemberRoutes. + // Snapshot consumers should treat MemberRoutes as the visible + // prefix of this list when MemberRoutesTotal > len(MemberRoutes). + MemberRoutesTotal uint64 Reads uint64 Writes uint64 @@ -379,9 +400,10 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { slot := s.reclaimRetiredSlot(routeID) if slot == nil { slot = &routeSlot{ - RouteID: routeID, - Start: cloneBytes(start), - End: cloneBytes(end), + RouteID: routeID, + Start: cloneBytes(start), + End: cloneBytes(end), + MemberRoutesTotal: 1, } } else { // Re-registering the same routeID inside the grace window: @@ -394,6 +416,7 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { slot.metaMu.Lock() slot.Start = cloneBytes(start) slot.End = cloneBytes(end) + slot.MemberRoutesTotal = 1 slot.metaMu.Unlock() } next.slots[routeID] = slot @@ -410,11 +433,12 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { bucket := findVirtualBucket(next.sortedSlots, start) if bucket == nil { bucket = &routeSlot{ - RouteID: s.nextVirtualBucketID(), - Start: cloneBytes(start), - End: cloneBytes(end), - Aggregate: true, - MemberRoutes: []uint64{routeID}, + RouteID: s.nextVirtualBucketID(), + Start: cloneBytes(start), + End: cloneBytes(end), + Aggregate: true, + MemberRoutes: []uint64{routeID}, + MemberRoutesTotal: 1, } next.sortedSlots = appendSorted(next.sortedSlots, bucket) } else { @@ -443,9 +467,11 @@ func (s *MemSampler) RegisterRoute(routeID uint64, start, end []byte) bool { // the routeID is not added to the visible member list. func (s *MemSampler) foldIntoBucket(next *routeTable, bucket *routeSlot, routeID uint64, start, end []byte) { bucket.metaMu.Lock() - if !memberRoutesContains(bucket.MemberRoutes, routeID) && - len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot { - bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + if !memberRoutesContains(bucket.MemberRoutes, routeID) { + bucket.MemberRoutesTotal++ + if len(bucket.MemberRoutes) < s.opts.MaxMemberRoutesPerSlot { + bucket.MemberRoutes = append(bucket.MemberRoutes, routeID) + } } if len(end) == 0 || (len(bucket.End) != 0 && bytesGT(end, bucket.End)) { bucket.End = cloneBytes(end) @@ -703,17 +729,26 @@ func bucketStillReferenced(virtualForRoute map[uint64]*routeSlot, bucket *routeS // pruneMemberRoute removes routeID from bucket.MemberRoutes under the // bucket's metaMu so a concurrent snapshotMeta reader sees a -// consistent view. +// consistent view. MemberRoutesTotal is decremented when the routeID +// was visible in MemberRoutes (the only case we can confidently +// account for) — routes pruned past the visible cap stay in the +// total because we don't track individual past-cap members. func pruneMemberRoute(bucket *routeSlot, routeID uint64) { bucket.metaMu.Lock() defer bucket.metaMu.Unlock() filtered := bucket.MemberRoutes[:0] + removed := false for _, m := range bucket.MemberRoutes { - if m != routeID { - filtered = append(filtered, m) + if m == routeID { + removed = true + continue } + filtered = append(filtered, m) } bucket.MemberRoutes = filtered + if removed && bucket.MemberRoutesTotal > 0 { + bucket.MemberRoutesTotal-- + } } // Step returns the configured flush interval after applying default @@ -739,17 +774,18 @@ func appendDrainedRow(rows []MatrixRow, slot *routeSlot) []MatrixRow { if reads == 0 && writes == 0 && readBytes == 0 && writeBytes == 0 { return rows } - start, end, aggregate, members := slot.snapshotMeta() + start, end, aggregate, members, membersTotal := slot.snapshotMeta() return append(rows, MatrixRow{ - RouteID: slot.RouteID, - Start: start, - End: end, - Aggregate: aggregate, - MemberRoutes: members, - Reads: reads, - Writes: writes, - ReadBytes: readBytes, - WriteBytes: writeBytes, + RouteID: slot.RouteID, + Start: start, + End: end, + Aggregate: aggregate, + MemberRoutes: members, + MemberRoutesTotal: membersTotal, + Reads: reads, + Writes: writes, + ReadBytes: readBytes, + WriteBytes: writeBytes, }) } diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index de8c02fd8..0127899fd 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -13,6 +13,7 @@ import ( "github.com/bootjp/elastickv/distribution" "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/keyviz" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" @@ -135,6 +136,12 @@ type ShardedCoordinator struct { // leaseObserver records lease-read hit/miss for every shard the // coordinator owns. Nil-safe; see Coordinate.leaseObserver. leaseObserver LeaseReadObserver + // sampler counts requests per RouteID for the key visualizer + // heatmap. Nil-safe at the call site; the implementation + // (keyviz.MemSampler) also tolerates a typed-nil receiver, so a + // disabled keyviz wires through to a no-op without branching on + // the hot path. + sampler keyviz.Sampler } // WithLeaseReadObserver wires a LeaseReadObserver onto a @@ -148,6 +155,22 @@ func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) * return c } +// WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The +// coordinator calls sampler.Observe at dispatch entry — once per +// resolved (RouteID, mutation key) pair — to feed the key visualizer +// heatmap (design doc §5.1). Applied after construction for the same +// reason as WithLeaseReadObserver: NewShardedCoordinator is already +// heavily overloaded. +// +// Passing a nil interface value is supported and disables sampling +// (the call site guards against it). Passing a typed-nil +// *keyviz.MemSampler also works because Observe is nil-safe by +// contract. +func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator { + c.sampler = s + return c +} + // NewShardedCoordinator builds a coordinator for the provided shard groups. // The defaultGroup is used for non-keyed leader checks. func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator { @@ -952,6 +975,15 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids) } +// observeMutation: reads never reach this path; the early return +// keeps the disabled-keyviz hot path allocation-free. +func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { + if c.sampler == nil { + return + } + c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value)) +} + func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) { grouped := make(map[uint64][]*pb.Mutation) for _, req := range reqs { @@ -963,6 +995,7 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb. if !ok { return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key) } + c.observeMutation(route.RouteID, mut) grouped[route.GroupID] = append(grouped[route.GroupID], mut) } gids := make([]uint64, 0, len(grouped)) diff --git a/kv/sharded_coordinator_sampler_test.go b/kv/sharded_coordinator_sampler_test.go new file mode 100644 index 000000000..6eb793582 --- /dev/null +++ b/kv/sharded_coordinator_sampler_test.go @@ -0,0 +1,152 @@ +package kv + +import ( + "context" + "sync" + "testing" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/keyviz" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// recordingSampler is a keyviz.Sampler that records every Observe +// call so tests can assert dispatch wiring fires once per resolved +// (RouteID, mutation key) pair. +type recordingSampler struct { + mu sync.Mutex + calls []sampleCall +} + +type sampleCall struct { + routeID uint64 + op keyviz.Op + keyLen int + valueLen int +} + +func (r *recordingSampler) Observe(routeID uint64, op keyviz.Op, keyLen, valueLen int) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, sampleCall{routeID: routeID, op: op, keyLen: keyLen, valueLen: valueLen}) +} + +func (r *recordingSampler) snapshot() []sampleCall { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]sampleCall, len(r.calls)) + copy(out, r.calls) + return out +} + +// TestShardedCoordinatorObservesEveryDispatchedMutation pins the +// keyviz wiring contract: every successfully-routed mutation in a +// non-txn dispatch produces exactly one Observe call carrying the +// resolved RouteID, OpWrite, and the mutation's key/value lengths. +func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) { + t.Parallel() + ctx := context.Background() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-g1", NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + s2 := store.NewMVCCStore() + r2, stop2 := newSingleRaft(t, "kv-sampler-g2", NewKvFSMWithHLC(s2, NewHLC())) + t.Cleanup(stop2) + + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + 2: {Engine: r2, Store: s2, Txn: NewLeaderProxyWithEngine(r2)}, + } + shardStore := NewShardStore(engine, groups) + + rec := &recordingSampler{} + coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), shardStore).WithSampler(rec) + + // Cross-shard non-txn dispatch: "b" → group 1, "x" → group 2. + ops := &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("b"), Value: []byte("val-b")}, + {Op: Put, Key: []byte("x"), Value: []byte("val-x-longer")}, + }, + } + _, err := coord.Dispatch(ctx, ops) + require.NoError(t, err) + + calls := rec.snapshot() + require.Len(t, calls, 2, "expected one Observe per mutation") + + // groupMutations iterates reqs in order, so call[i] matches + // elem[i]. Verify each: OpWrite, exact key/value lengths, and + // the RouteID the engine resolved for that key. + for i, elem := range ops.Elems { + route, ok := engine.GetRoute(elem.Key) + require.True(t, ok) + require.Equal(t, sampleCall{ + routeID: route.RouteID, + op: keyviz.OpWrite, + keyLen: len(elem.Key), + valueLen: len(elem.Value), + }, calls[i], "Observe call %d for key %q", i, elem.Key) + } +} + +// TestShardedCoordinatorWithoutSamplerStaysSafe pins the nil-safe +// contract: a coordinator without WithSampler (interface-nil +// c.sampler) and one wired with a typed-nil *MemSampler must both +// dispatch successfully without observing anything. The "no +// WithSampler" subcase additionally asserts c.sampler stays the +// zero interface value so a future refactor that silently +// initialises the field would fail this guard. +func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) { + t.Parallel() + ctx := context.Background() + + for _, tc := range []struct { + name string + opt func(*ShardedCoordinator) *ShardedCoordinator + wantNilField bool + }{ + { + name: "no WithSampler call", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c }, + wantNilField: true, + }, + { + name: "typed-nil *MemSampler", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { + return c.WithSampler((*keyviz.MemSampler)(nil)) + }, + wantNilField: false, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), nil, 1) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-nilsafe-"+tc.name, NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + } + coord := tc.opt(NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups))) + + if tc.wantNilField { + require.Nil(t, coord.sampler, "expected sampler field to be unset when WithSampler is never called") + } + + ops := &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("b"), Value: []byte("v")}}, + } + _, err := coord.Dispatch(ctx, ops) + require.NoError(t, err) + }) + } +} diff --git a/main.go b/main.go index f6e38cc89..c0e60d68a 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,7 @@ import ( internalraftadmin "github.com/bootjp/elastickv/internal/raftadmin" "github.com/bootjp/elastickv/internal/raftengine" etcdraftengine "github.com/bootjp/elastickv/internal/raftengine/etcd" + "github.com/bootjp/elastickv/keyviz" "github.com/bootjp/elastickv/kv" "github.com/bootjp/elastickv/monitoring" pb "github.com/bootjp/elastickv/proto" @@ -124,6 +125,15 @@ var ( adminSessionSigningKeyPreviousFile = flag.String("adminSessionSigningKeyPreviousFile", "", "Path to a file containing the base64-encoded previous admin HS256 key used for rotation") adminReadOnlyAccessKeys = flag.String("adminReadOnlyAccessKeys", "", "Comma-separated SigV4 access keys granted read-only admin access") adminFullAccessKeys = flag.String("adminFullAccessKeys", "", "Comma-separated SigV4 access keys granted full-access admin role") + + // Key visualizer sampler flags. The sampler runs entirely in-memory + // on each node, feeds AdminServer.GetKeyVizMatrix, and is disabled + // by default — opt in with --keyvizEnabled. The other flags are + // no-ops when the sampler is disabled. + keyvizEnabled = flag.Bool("keyvizEnabled", false, "Enable the in-memory key visualizer sampler that feeds AdminServer.GetKeyVizMatrix") + keyvizStep = flag.Duration("keyvizStep", keyviz.DefaultStep, "Flush interval / matrix-column resolution for the keyviz sampler") + keyvizMaxTrackedRoutes = flag.Int("keyvizMaxTrackedRoutes", keyviz.DefaultMaxTrackedRoutes, "Maximum routes tracked individually before excess routes coarsen into virtual buckets") + keyvizMaxMemberRoutesPerSlot = flag.Int("keyvizMaxMemberRoutesPerSlot", keyviz.DefaultMaxMemberRoutesPerSlot, "Maximum members listed on a virtual bucket; excess routes still drive the bucket counters") ) const adminTokenMaxBytes = 4 << 10 @@ -278,16 +288,26 @@ func run() error { cleanup.Add(cancel) lockResolver := kv.NewLockResolver(shardStore, shardGroups, nil) cleanup.Add(func() { lockResolver.Close() }) + sampler := buildKeyVizSampler() coordinate := kv.NewShardedCoordinator(cfg.engine, shardGroups, cfg.defaultGroup, clock, shardStore). - WithLeaseReadObserver(metricsRegistry.LeaseReadObserver()) + WithLeaseReadObserver(metricsRegistry.LeaseReadObserver()). + WithSampler(keyVizSamplerForCoordinator(sampler)) distCatalog, err := setupDistributionCatalog(ctx, runtimes, cfg.engine) if err != nil { return err } + // Seed AFTER setupDistributionCatalog so the sampler picks up the + // catalog-assigned RouteIDs. EnsureCatalogSnapshot inside + // setupDistributionCatalog applies a snapshot back into the engine + // with durable non-zero RouteIDs; seeding earlier would register + // the placeholder zero IDs from buildEngine and Observe would miss + // every dispatched mutation. + seedKeyVizRoutes(sampler, cfg.engine) eg, runCtx := errgroup.WithContext(ctx) eg.Go(func() error { return runDistributionCatalogWatcher(runCtx, distCatalog, cfg.engine) }) + startKeyVizFlusher(runCtx, eg, sampler) startMemoryWatchdog(runCtx, eg, cancel) distServer := adapter.NewDistributionServer( cfg.engine, @@ -314,6 +334,7 @@ func run() error { shardStore: shardStore, coordinate: coordinate, distServer: distServer, readTracker: readTracker, metricsRegistry: metricsRegistry, cfg: cfg, + keyvizSampler: sampler, }); err != nil { return err } @@ -643,13 +664,19 @@ type serversInput struct { readTracker *kv.ActiveTimestampTracker metricsRegistry *monitoring.Registry cfg runtimeConfig + // keyvizSampler is the in-memory key visualizer sampler, or nil + // when --keyvizEnabled is false. Threaded into setupAdminService + // so AdminServer.GetKeyVizMatrix can serve snapshots; the + // coordinator already has its own copy from + // `WithSampler(...)` higher up in run(). + keyvizSampler *keyviz.MemSampler } // startServers wires up the AdminServer, builds the runtime runner, and // kicks off both the per-group raft listeners and the admin HTTP listener. // Extracted from run() to keep cyclomatic complexity within budget. func startServers(in serversInput) error { - adminServer, adminGRPCOpts, err := setupAdminService(*raftId, *myAddr, in.runtimes, in.bootstrapServers) + adminServer, adminGRPCOpts, err := setupAdminService(*raftId, *myAddr, in.runtimes, in.bootstrapServers, in.keyvizSampler) if err != nil { return err } @@ -703,6 +730,7 @@ func setupAdminService( nodeID, grpcAddress string, runtimes []*raftGroupRuntime, bootstrapServers []raftengine.Server, + keyvizSampler *keyviz.MemSampler, ) (*adapter.AdminServer, adminGRPCInterceptors, error) { members := adminMembersFromBootstrap(nodeID, bootstrapServers) // In multi-group mode the process does not listen on *myAddr — each group @@ -726,6 +754,13 @@ func setupAdminService( for _, rt := range runtimes { srv.RegisterGroup(rt.spec.id, rt.engine) } + // Only register a real sampler. Passing a typed-nil *MemSampler + // would store a non-nil interface and make GetKeyVizMatrix + // return a successful empty response instead of Unavailable — + // operators want the explicit "keyviz disabled" signal. + if keyvizSampler != nil { + srv.RegisterSampler(keyvizSampler) + } if *adminInsecureNoAuth { log.Printf("WARNING: --adminInsecureNoAuth is set; Admin gRPC service exposed without authentication") } @@ -1262,3 +1297,59 @@ func (r *runtimeServerRunner) start() error { } return nil } + +// buildKeyVizSampler constructs the in-memory keyviz sampler from +// flag-supplied options, or returns nil when --keyvizEnabled is +// false. The coordinator's WithSampler and AdminServer's +// RegisterSampler both treat a nil receiver as "keyviz disabled," so +// this is the single decision point. +func buildKeyVizSampler() *keyviz.MemSampler { + if !*keyvizEnabled { + return nil + } + return keyviz.NewMemSampler(keyviz.MemSamplerOptions{ + Step: *keyvizStep, + MaxTrackedRoutes: *keyvizMaxTrackedRoutes, + MaxMemberRoutesPerSlot: *keyvizMaxMemberRoutesPerSlot, + }) +} + +// keyVizSamplerForCoordinator wraps a *MemSampler in the +// keyviz.Sampler interface understood by ShardedCoordinator. A nil +// sampler returns a typed-nil interface value, so the coordinator's +// `if c.sampler == nil` guard fires and the dispatch hot path skips +// Observe with a single branch. +func keyVizSamplerForCoordinator(s *keyviz.MemSampler) keyviz.Sampler { + if s == nil { + return nil + } + return s +} + +// seedKeyVizRoutes copies the engine's current route catalogue into +// the sampler so the first matrix snapshots have non-empty metadata. +// No-op when the sampler is disabled. The coordinator's +// distribution.Engine handles route mutations after this point; +// route-watch propagation into the sampler is a follow-up (the +// design's Phase 3 persistence work). +func seedKeyVizRoutes(s *keyviz.MemSampler, engine *distribution.Engine) { + if s == nil || engine == nil { + return + } + for _, r := range engine.Stats() { + s.RegisterRoute(r.RouteID, r.Start, r.End) + } +} + +// startKeyVizFlusher launches RunFlusher in the supplied errgroup +// and harvests the in-progress step with a final Flush after the +// goroutine returns, so a graceful shutdown does not lose the most +// recent partial column. Nil-safe: a disabled sampler reduces the +// goroutine to ctx-wait + a no-op Flush. +func startKeyVizFlusher(ctx context.Context, eg *errgroup.Group, s *keyviz.MemSampler) { + eg.Go(func() error { + keyviz.RunFlusher(ctx, s, s.Step()) + s.Flush() + return nil + }) +} diff --git a/main_keyviz_test.go b/main_keyviz_test.go new file mode 100644 index 000000000..cda98514c --- /dev/null +++ b/main_keyviz_test.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "testing" + "time" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/keyviz" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +// TestBuildKeyVizSamplerHonorsEnabledFlag pins the on/off contract: +// --keyvizEnabled=false returns nil (so coordinator/admin server take +// the disabled paths), and --keyvizEnabled=true with explicit options +// returns a configured sampler. +func TestBuildKeyVizSamplerHonorsEnabledFlag(t *testing.T) { + t.Parallel() + withFlags(t, false, time.Second, 5, 7, func() { + require.Nil(t, buildKeyVizSampler()) + }) + withFlags(t, true, 250*time.Millisecond, 5, 7, func() { + s := buildKeyVizSampler() + require.NotNil(t, s) + require.Equal(t, 250*time.Millisecond, s.Step()) + }) +} + +// TestSeedKeyVizRoutesCopiesEngineCatalogue pins that the startup +// seed registers each route the engine reports, so subsequent +// Observe(routeID, ...) calls find a slot. Uses a single route via +// UpdateRoute (which leaves RouteID=0) — the deeper invariant +// (one slot per distinct RouteID) is covered by the keyviz package's +// own unit tests. +func TestSeedKeyVizRoutesCopiesEngineCatalogue(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + + s := keyviz.NewMemSampler(keyviz.MemSamplerOptions{Step: time.Second, HistoryColumns: 4}) + seedKeyVizRoutes(s, engine) + + for _, r := range engine.Stats() { + s.Observe(r.RouteID, keyviz.OpRead, 1, 1) + } + s.Flush() + cols := s.Snapshot(time.Time{}, time.Time{}) + require.Len(t, cols, 1) + require.Len(t, cols[0].Rows, 1) + require.Equal(t, []byte("a"), cols[0].Rows[0].Start) +} + +// TestSeedKeyVizRoutesNoOpOnNilSampler pins that a disabled sampler +// is safe to seed — the function returns without panicking. +func TestSeedKeyVizRoutesNoOpOnNilSampler(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), nil, 1) + require.NotPanics(t, func() { + seedKeyVizRoutes(nil, engine) + }) +} + +// TestStartKeyVizFlusherReturnsAfterCancel pins the goroutine +// lifecycle: when ctx fires the RunFlusher returns and the errgroup +// closure exits cleanly. Also verifies that a final Flush is called +// so the in-progress step is harvested. +func TestStartKeyVizFlusherReturnsAfterCancel(t *testing.T) { + t.Parallel() + s := keyviz.NewMemSampler(keyviz.MemSamplerOptions{Step: time.Millisecond, HistoryColumns: 4}) + require.True(t, s.RegisterRoute(1, []byte("a"), []byte("b"))) + s.Observe(1, keyviz.OpRead, 0, 0) + + ctx, cancel := context.WithCancel(context.Background()) + eg, _ := errgroup.WithContext(ctx) + startKeyVizFlusher(ctx, eg, s) + cancel() + require.NoError(t, eg.Wait()) + // After cancel, the final Flush should have harvested the + // pre-cancel Observe into the ring buffer. + cols := s.Snapshot(time.Time{}, time.Time{}) + saw := false + for _, c := range cols { + for _, r := range c.Rows { + if r.RouteID == 1 && r.Reads > 0 { + saw = true + } + } + } + require.True(t, saw, "post-cancel Flush did not harvest pending Observe") +} + +func withFlags( + t *testing.T, + enabled bool, + step time.Duration, + maxTracked, maxMembers int, + fn func(), +) { + t.Helper() + prevEnabled := *keyvizEnabled + prevStep := *keyvizStep + prevMaxTracked := *keyvizMaxTrackedRoutes + prevMaxMembers := *keyvizMaxMemberRoutesPerSlot + *keyvizEnabled = enabled + *keyvizStep = step + *keyvizMaxTrackedRoutes = maxTracked + *keyvizMaxMemberRoutesPerSlot = maxMembers + defer func() { + *keyvizEnabled = prevEnabled + *keyvizStep = prevStep + *keyvizMaxTrackedRoutes = prevMaxTracked + *keyvizMaxMemberRoutesPerSlot = prevMaxMembers + }() + fn() +}