From d449057345ab8cb63ef59d702f1c975ea40a8f7b Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 06:43:21 +0900 Subject: [PATCH 1/3] feat(kv): observe dispatched mutations with keyviz Sampler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires the keyviz.Sampler from PR #639 into ShardedCoordinator (design doc §5.1): - Adds an unexported `sampler keyviz.Sampler` field plus `WithSampler(s keyviz.Sampler) *ShardedCoordinator` post- construction option, mirroring WithLeaseReadObserver. - groupMutations now calls `c.observeMutation(routeID, mut)` once per resolved (RouteID, mutation) pair before grouping by GroupID. Reads are not dispatched through this path; all calls use OpWrite. - observeMutation guards against an interface-nil sampler at the call site; the keyviz contract also tolerates a typed-nil *MemSampler, so disabled keyviz wires through to a no-op without branching deep in the hot path. - DelPrefix is intentionally not observed: dispatchDelPrefixBroadcast doesn't resolve a single RouteID (it broadcasts to every shard), and the design treats per-route attribution there as out of scope. Tests: - TestShardedCoordinatorObservesEveryDispatchedMutation: cross-shard Put batch, verifies one Observe per element with the engine's resolved RouteID, OpWrite, and exact key/value lengths. - TestShardedCoordinatorWithoutSamplerStaysSafe: dispatches successfully both with no WithSampler call (interface-nil) and with a typed-nil *MemSampler. --- kv/sharded_coordinator.go | 38 +++++++ kv/sharded_coordinator_sampler_test.go | 142 +++++++++++++++++++++++++ 2 files changed, 180 insertions(+) create mode 100644 kv/sharded_coordinator_sampler_test.go diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index de8c02fd8..14e38bcb0 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -13,6 +13,7 @@ import ( "github.com/bootjp/elastickv/distribution" "github.com/bootjp/elastickv/internal/monoclock" "github.com/bootjp/elastickv/internal/raftengine" + "github.com/bootjp/elastickv/keyviz" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" @@ -135,6 +136,12 @@ type ShardedCoordinator struct { // leaseObserver records lease-read hit/miss for every shard the // coordinator owns. Nil-safe; see Coordinate.leaseObserver. leaseObserver LeaseReadObserver + // sampler counts requests per RouteID for the key visualizer + // heatmap. Nil-safe at the call site; the implementation + // (keyviz.MemSampler) also tolerates a typed-nil receiver, so a + // disabled keyviz wires through to a no-op without branching on + // the hot path. + sampler keyviz.Sampler } // WithLeaseReadObserver wires a LeaseReadObserver onto a @@ -148,6 +155,22 @@ func (c *ShardedCoordinator) WithLeaseReadObserver(observer LeaseReadObserver) * return c } +// WithSampler wires a keyviz.Sampler onto a ShardedCoordinator. The +// coordinator calls sampler.Observe at dispatch entry — once per +// resolved (RouteID, mutation key) pair — to feed the key visualizer +// heatmap (design doc §5.1). Applied after construction for the same +// reason as WithLeaseReadObserver: NewShardedCoordinator is already +// heavily overloaded. +// +// Passing a nil interface value is supported and disables sampling +// (the call site guards against it). Passing a typed-nil +// *keyviz.MemSampler also works because Observe is nil-safe by +// contract. +func (c *ShardedCoordinator) WithSampler(s keyviz.Sampler) *ShardedCoordinator { + c.sampler = s + return c +} + // NewShardedCoordinator builds a coordinator for the provided shard groups. // The defaultGroup is used for non-keyed leader checks. func NewShardedCoordinator(engine *distribution.Engine, groups map[uint64]*ShardGroup, defaultGroup uint64, clock *HLC, st store.MVCCStore) *ShardedCoordinator { @@ -952,6 +975,20 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids) } +// observeMutation records a single dispatched mutation with the +// keyviz sampler, if one is wired. All operations reaching this +// point are writes (Put, Del); reads are served outside Dispatch via +// the lease-read / linearizable-read paths. +// +// Nil-safe: a nil-interface c.sampler skips with a single branch, +// keeping the dispatch loop allocation-free when keyviz is disabled. +func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { + if c.sampler == nil { + return + } + c.sampler.Observe(routeID, keyviz.OpWrite, len(mut.Key), len(mut.Value)) +} + func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb.Mutation, []uint64, error) { grouped := make(map[uint64][]*pb.Mutation) for _, req := range reqs { @@ -963,6 +1000,7 @@ func (c *ShardedCoordinator) groupMutations(reqs []*Elem[OP]) (map[uint64][]*pb. if !ok { return nil, nil, errors.Wrapf(ErrInvalidRequest, "no route for key %q", mut.Key) } + c.observeMutation(route.RouteID, mut) grouped[route.GroupID] = append(grouped[route.GroupID], mut) } gids := make([]uint64, 0, len(grouped)) diff --git a/kv/sharded_coordinator_sampler_test.go b/kv/sharded_coordinator_sampler_test.go new file mode 100644 index 000000000..99967ed38 --- /dev/null +++ b/kv/sharded_coordinator_sampler_test.go @@ -0,0 +1,142 @@ +package kv + +import ( + "context" + "sync" + "testing" + + "github.com/bootjp/elastickv/distribution" + "github.com/bootjp/elastickv/keyviz" + "github.com/bootjp/elastickv/store" + "github.com/stretchr/testify/require" +) + +// recordingSampler is a keyviz.Sampler that records every Observe +// call so tests can assert dispatch wiring fires once per resolved +// (RouteID, mutation key) pair. +type recordingSampler struct { + mu sync.Mutex + calls []sampleCall +} + +type sampleCall struct { + routeID uint64 + op keyviz.Op + keyLen int + valueLen int +} + +func (r *recordingSampler) Observe(routeID uint64, op keyviz.Op, keyLen, valueLen int) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, sampleCall{routeID: routeID, op: op, keyLen: keyLen, valueLen: valueLen}) +} + +func (r *recordingSampler) snapshot() []sampleCall { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]sampleCall, len(r.calls)) + copy(out, r.calls) + return out +} + +// TestShardedCoordinatorObservesEveryDispatchedMutation pins the +// keyviz wiring contract: every successfully-routed mutation in a +// non-txn dispatch produces exactly one Observe call carrying the +// resolved RouteID, OpWrite, and the mutation's key/value lengths. +func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) { + t.Parallel() + ctx := context.Background() + + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), []byte("m"), 1) + engine.UpdateRoute([]byte("m"), nil, 2) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-g1", NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + s2 := store.NewMVCCStore() + r2, stop2 := newSingleRaft(t, "kv-sampler-g2", NewKvFSMWithHLC(s2, NewHLC())) + t.Cleanup(stop2) + + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + 2: {Engine: r2, Store: s2, Txn: NewLeaderProxyWithEngine(r2)}, + } + shardStore := NewShardStore(engine, groups) + + rec := &recordingSampler{} + coord := NewShardedCoordinator(engine, groups, 1, NewHLC(), shardStore).WithSampler(rec) + + // Cross-shard non-txn dispatch: "b" → group 1, "x" → group 2. + ops := &OperationGroup[OP]{ + Elems: []*Elem[OP]{ + {Op: Put, Key: []byte("b"), Value: []byte("val-b")}, + {Op: Put, Key: []byte("x"), Value: []byte("val-x-longer")}, + }, + } + _, err := coord.Dispatch(ctx, ops) + require.NoError(t, err) + + calls := rec.snapshot() + require.Len(t, calls, 2, "expected one Observe per mutation") + + // groupMutations iterates reqs in order, so call[i] matches + // elem[i]. Verify each: OpWrite, exact key/value lengths, and + // the RouteID the engine resolved for that key. + for i, elem := range ops.Elems { + route, ok := engine.GetRoute(elem.Key) + require.True(t, ok) + require.Equal(t, sampleCall{ + routeID: route.RouteID, + op: keyviz.OpWrite, + keyLen: len(elem.Key), + valueLen: len(elem.Value), + }, calls[i], "Observe call %d for key %q", i, elem.Key) + } +} + +// TestShardedCoordinatorWithoutSamplerStaysSafe pins the nil-safe +// contract: a coordinator without WithSampler (interface-nil +// c.sampler) and one wired with a typed-nil *MemSampler must both +// dispatch successfully without observing anything. +func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) { + t.Parallel() + ctx := context.Background() + + for _, tc := range []struct { + name string + opt func(*ShardedCoordinator) *ShardedCoordinator + }{ + { + name: "no WithSampler call", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c }, + }, + { + name: "typed-nil *MemSampler", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { + return c.WithSampler((*keyviz.MemSampler)(nil)) + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + engine := distribution.NewEngine() + engine.UpdateRoute([]byte("a"), nil, 1) + + s1 := store.NewMVCCStore() + r1, stop1 := newSingleRaft(t, "kv-sampler-nilsafe-"+tc.name, NewKvFSMWithHLC(s1, NewHLC())) + t.Cleanup(stop1) + groups := map[uint64]*ShardGroup{ + 1: {Engine: r1, Store: s1, Txn: NewLeaderProxyWithEngine(r1)}, + } + coord := tc.opt(NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups))) + + ops := &OperationGroup[OP]{ + Elems: []*Elem[OP]{{Op: Put, Key: []byte("b"), Value: []byte("v")}}, + } + _, err := coord.Dispatch(ctx, ops) + require.NoError(t, err) + }) + } +} From 28975463a8b8c49dacf872bd196a9c35901c29d4 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 17:50:20 +0900 Subject: [PATCH 2/3] keyviz+kv: tighten Sampler nil-safe contract + slim observe doc Round-1 review fixes for PR #645: - Claude bot Issue 1: WithSampler stores the interface value as supplied; the guard at the call site only checks interface-nil, not typed-nil. Document the nil-receiver requirement on the Sampler interface so a future implementor cannot land a panic-on-nil Observe and silently break dispatch. - Claude bot Issue 2: trim observeMutation comment to the two non-obvious points (reads bypass this path; the early return keeps the disabled hot path branch-only). Per CLAUDE.md, comments for the "what" duplicate the code. - Claude / CodeRabbit nit: TestShardedCoordinatorWithoutSamplerStaysSafe now asserts c.sampler stays the zero interface value for the "no WithSampler call" subcase, so a future refactor that silently initialises the field would fail the test. --- keyviz/sampler.go | 10 +++++++++- kv/sharded_coordinator.go | 9 ++------- kv/sharded_coordinator_sampler_test.go | 20 +++++++++++++++----- 3 files changed, 26 insertions(+), 13 deletions(-) diff --git a/keyviz/sampler.go b/keyviz/sampler.go index 0d816c08f..d74404d09 100644 --- a/keyviz/sampler.go +++ b/keyviz/sampler.go @@ -58,11 +58,19 @@ const ( // Sampler is the narrow interface the coordinator depends on. The // nil-safe contract is documented per-method so a coordinator wired // without a sampler compiles to a no-op call. +// +// Implementations MUST be nil-receiver-safe: a typed-nil +// implementation passed through this interface (e.g. +// `var s Sampler = (*MemSampler)(nil)`) must not panic when its +// methods are called. The coordinator stores the interface value as +// supplied and dispatches through it on the hot path; a guard at the +// call site only checks for an interface-nil, not a typed-nil. type Sampler interface { // Observe records a single request against a route. Op identifies // the counter family. keyLen and valueLen are summed into the // matching *Bytes counter; pass 0 for read-only ops where the - // payload size is irrelevant. + // payload size is irrelevant. Implementations must no-op (not + // panic) when invoked on a typed-nil receiver. Observe(routeID uint64, op Op, keyLen, valueLen int) } diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 14e38bcb0..0127899fd 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -975,13 +975,8 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e return buildTxnLogs(reqs.StartTS, commitTS, grouped, gids) } -// observeMutation records a single dispatched mutation with the -// keyviz sampler, if one is wired. All operations reaching this -// point are writes (Put, Del); reads are served outside Dispatch via -// the lease-read / linearizable-read paths. -// -// Nil-safe: a nil-interface c.sampler skips with a single branch, -// keeping the dispatch loop allocation-free when keyviz is disabled. +// observeMutation: reads never reach this path; the early return +// keeps the disabled-keyviz hot path allocation-free. func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { if c.sampler == nil { return diff --git a/kv/sharded_coordinator_sampler_test.go b/kv/sharded_coordinator_sampler_test.go index 99967ed38..6eb793582 100644 --- a/kv/sharded_coordinator_sampler_test.go +++ b/kv/sharded_coordinator_sampler_test.go @@ -99,24 +99,30 @@ func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) { // TestShardedCoordinatorWithoutSamplerStaysSafe pins the nil-safe // contract: a coordinator without WithSampler (interface-nil // c.sampler) and one wired with a typed-nil *MemSampler must both -// dispatch successfully without observing anything. +// dispatch successfully without observing anything. The "no +// WithSampler" subcase additionally asserts c.sampler stays the +// zero interface value so a future refactor that silently +// initialises the field would fail this guard. func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) { t.Parallel() ctx := context.Background() for _, tc := range []struct { - name string - opt func(*ShardedCoordinator) *ShardedCoordinator + name string + opt func(*ShardedCoordinator) *ShardedCoordinator + wantNilField bool }{ { - name: "no WithSampler call", - opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c }, + name: "no WithSampler call", + opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c }, + wantNilField: true, }, { name: "typed-nil *MemSampler", opt: func(c *ShardedCoordinator) *ShardedCoordinator { return c.WithSampler((*keyviz.MemSampler)(nil)) }, + wantNilField: false, }, } { t.Run(tc.name, func(t *testing.T) { @@ -132,6 +138,10 @@ func TestShardedCoordinatorWithoutSamplerStaysSafe(t *testing.T) { } coord := tc.opt(NewShardedCoordinator(engine, groups, 1, NewHLC(), NewShardStore(engine, groups))) + if tc.wantNilField { + require.Nil(t, coord.sampler, "expected sampler field to be unset when WithSampler is never called") + } + ops := &OperationGroup[OP]{ Elems: []*Elem[OP]{{Op: Put, Key: []byte("b"), Value: []byte("v")}}, } From 0c893214aaabed911c5d5b316afd4733599ecc89 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Sun, 26 Apr 2026 18:00:17 +0900 Subject: [PATCH 3/3] keyviz wiring: pin pre-commit observe semantics + match prod routeKey Round-2 review fixes for PR #645: CodeRabbit nit on observeMutation: spell out the pre-commit ordering intent so a future contributor doesn't "fix" it by moving the call after router.Commit. The heatmap is meant to reflect offered load (useful for planning capacity); counting only committed writes would hide hotspots that show up as repeated proposal failures. CodeRabbit nit on TestShardedCoordinatorObservesEveryDispatchedMutation: production routes via engine.GetRoute(routeKey(mut.Key)) but the test was calling engine.GetRoute(elem.Key) directly. Both inputs ("b", "x") happen to be identity through normalizeRouteKey today, but a future change to internal-prefix handling could silently diverge the test from production. Mirror the production transform. --- kv/sharded_coordinator.go | 5 ++++- kv/sharded_coordinator_sampler_test.go | 7 ++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/kv/sharded_coordinator.go b/kv/sharded_coordinator.go index 0127899fd..6a728e264 100644 --- a/kv/sharded_coordinator.go +++ b/kv/sharded_coordinator.go @@ -976,7 +976,10 @@ func (c *ShardedCoordinator) txnLogs(reqs *OperationGroup[OP]) ([]*pb.Request, e } // observeMutation: reads never reach this path; the early return -// keeps the disabled-keyviz hot path allocation-free. +// keeps the disabled-keyviz hot path allocation-free. Counted +// pre-commit, so a mutation that subsequently fails its Raft +// proposal is still recorded — the heatmap reflects offered load, +// not just committed writes (intentional for traffic visualisation). func (c *ShardedCoordinator) observeMutation(routeID uint64, mut *pb.Mutation) { if c.sampler == nil { return diff --git a/kv/sharded_coordinator_sampler_test.go b/kv/sharded_coordinator_sampler_test.go index 6eb793582..97e9e1068 100644 --- a/kv/sharded_coordinator_sampler_test.go +++ b/kv/sharded_coordinator_sampler_test.go @@ -82,10 +82,11 @@ func TestShardedCoordinatorObservesEveryDispatchedMutation(t *testing.T) { require.Len(t, calls, 2, "expected one Observe per mutation") // groupMutations iterates reqs in order, so call[i] matches - // elem[i]. Verify each: OpWrite, exact key/value lengths, and - // the RouteID the engine resolved for that key. + // elem[i]. Resolve via routeKey(elem.Key) so the test mirrors + // production's routing transform — important for keys that + // normalize through internal-prefix handling. for i, elem := range ops.Elems { - route, ok := engine.GetRoute(elem.Key) + route, ok := engine.GetRoute(routeKey(elem.Key)) require.True(t, ok) require.Equal(t, sampleCall{ routeID: route.RouteID,