Skip to content

Commit 00e7086

Browse files
Implement middleware to track uniques per app (#622)
1 parent a82f1ab commit 00e7086

8 files changed

Lines changed: 422 additions & 25 deletions

api/dbv1/models.go

Lines changed: 13 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/metrics_middleware.go

Lines changed: 174 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,15 @@ package api
33
import (
44
"context"
55
"runtime"
6+
"sync"
67
"time"
78

89
"api.audius.co/hll"
910
"api.audius.co/utils"
11+
"github.com/axiomhq/hyperloglog"
1012
"github.com/gofiber/fiber/v2"
1113
fiberutils "github.com/gofiber/fiber/v2/utils"
14+
"github.com/jackc/pgx/v5"
1215
"github.com/jackc/pgx/v5/pgxpool"
1316
"github.com/maypok86/otter"
1417
"go.uber.org/zap"
@@ -22,9 +25,11 @@ type MetricsCollector struct {
2225
flushTimer *time.Ticker
2326
stopCh chan struct{}
2427

25-
appMetrics otter.Cache[string, *AppMetricsData]
26-
routeMetrics otter.Cache[string, *RouteMetricsData]
27-
countMetrics *hll.HLL
28+
appMetrics otter.Cache[string, *AppMetricsData]
29+
routeMetrics otter.Cache[string, *RouteMetricsData]
30+
countMetrics *hll.HLL
31+
appUniqueMetrics map[string]*hll.HLL
32+
appUniqueMu sync.RWMutex
2833
}
2934

3035
// AppMetricsData holds request count data for a specific app identifier
@@ -75,13 +80,14 @@ func NewMetricsCollector(logger *zap.Logger, writePool *pgxpool.Pool) *MetricsCo
7580
}
7681

7782
collector := &MetricsCollector{
78-
logger: logger.With(zap.String("component", "MetricsCollector")),
79-
writePool: writePool,
80-
appMetrics: appMetricsCache,
81-
routeMetrics: routeMetricsCache,
82-
countMetrics: countMetricsAggregator,
83-
flushTimer: time.NewTicker(flushTimer),
84-
stopCh: make(chan struct{}),
83+
logger: logger.With(zap.String("component", "MetricsCollector")),
84+
writePool: writePool,
85+
appMetrics: appMetricsCache,
86+
routeMetrics: routeMetricsCache,
87+
countMetrics: countMetricsAggregator,
88+
appUniqueMetrics: make(map[string]*hll.HLL),
89+
flushTimer: time.NewTicker(flushTimer),
90+
stopCh: make(chan struct{}),
8591
}
8692

8793
// Start the flush routine
@@ -97,11 +103,14 @@ func (rmc *MetricsCollector) Middleware() fiber.Handler {
97103

98104
apiKey := c.Query("api_key")
99105
appName := c.Query("app_name")
106+
ipAddress := utils.GetIP(c)
107+
100108
// Only record if we have some identifier
101109
if apiKey != "" || appName != "" {
102110
rmc.recordAppMetric(
103111
fiberutils.CopyString(apiKey),
104112
fiberutils.CopyString(appName),
113+
ipAddress,
105114
)
106115
}
107116

@@ -115,8 +124,7 @@ func (rmc *MetricsCollector) Middleware() fiber.Handler {
115124
)
116125
}
117126

118-
// Extract IP address for unique tracking
119-
ipAddress := utils.GetIP(c)
127+
// Extract IP address for unique tracking (global)
120128
if ipAddress != "" {
121129
rmc.recordCountMetric(ipAddress)
122130
}
@@ -126,7 +134,7 @@ func (rmc *MetricsCollector) Middleware() fiber.Handler {
126134
}
127135

128136
// Increments the request count for a given app identifier
129-
func (rmc *MetricsCollector) recordAppMetric(apiKey, appName string) {
137+
func (rmc *MetricsCollector) recordAppMetric(apiKey, appName, ipAddress string) {
130138
// Prioritize api_key over app_name as identifier
131139
identifier := apiKey
132140
if identifier == "" {
@@ -147,6 +155,26 @@ func (rmc *MetricsCollector) recordAppMetric(apiKey, appName string) {
147155
data.RequestCount++
148156
data.LastSeen = lastSeen
149157
rmc.appMetrics.Set(identifier, data)
158+
159+
// Record IP address to app-specific HLL sketch for unique user tracking
160+
if ipAddress != "" {
161+
rmc.appUniqueMu.Lock()
162+
appHLL, exists := rmc.appUniqueMetrics[identifier]
163+
if !exists {
164+
// Create new HLL instance for this app
165+
var err error
166+
appHLL, err = hll.NewHLL(rmc.logger, rmc.writePool, "api_metrics_apps_unique", 12)
167+
if err != nil {
168+
rmc.logger.Error("Failed to create app unique HLL", zap.Error(err), zap.String("identifier", identifier))
169+
rmc.appUniqueMu.Unlock()
170+
return
171+
}
172+
rmc.appUniqueMetrics[identifier] = appHLL
173+
}
174+
rmc.appUniqueMu.Unlock()
175+
176+
appHLL.Record(ipAddress)
177+
}
150178
}
151179

152180
// Increments the request count for a given route pattern
@@ -210,6 +238,23 @@ func (rmc *MetricsCollector) flushMetrics() {
210238
// Get HLL sketch copy
211239
currentHLL, currentTotalRequests := rmc.countMetrics.GetSketchCopy()
212240

241+
type AppUniqueData struct {
242+
Identifier string
243+
Sketch *hyperloglog.Sketch
244+
TotalCount int64
245+
}
246+
appUniqueData := make(map[string]*AppUniqueData)
247+
rmc.appUniqueMu.Lock()
248+
for identifier, appHLL := range rmc.appUniqueMetrics {
249+
sketchCopy, totalCount := appHLL.GetSketchCopy()
250+
appUniqueData[identifier] = &AppUniqueData{
251+
Identifier: identifier,
252+
Sketch: sketchCopy,
253+
TotalCount: totalCount,
254+
}
255+
}
256+
rmc.appUniqueMu.Unlock()
257+
213258
// Begin transaction
214259
tx, err := rmc.writePool.Begin(ctx)
215260
if err != nil {
@@ -295,6 +340,122 @@ func (rmc *MetricsCollector) flushMetrics() {
295340
}
296341
}
297342

343+
// Flush app unique metrics
344+
if len(appUniqueData) > 0 {
345+
appUniqueUpserted := 0
346+
for _, data := range appUniqueData {
347+
if data.Sketch == nil {
348+
continue
349+
}
350+
351+
// Clone the sketch to avoid modifying the original
352+
newSketch := data.Sketch.Clone()
353+
if newSketch == nil {
354+
continue
355+
}
356+
357+
var existingSketchData []byte
358+
var existingCount int64
359+
query := `
360+
SELECT hll_sketch, total_count
361+
FROM api_metrics_apps_unique
362+
WHERE date = $1 AND app_name = $2
363+
FOR UPDATE`
364+
err = tx.QueryRow(ctx, query, date, data.Identifier).Scan(&existingSketchData, &existingCount)
365+
366+
if err != nil && err != pgx.ErrNoRows {
367+
rmc.logger.Error("Failed to query existing app unique metrics",
368+
zap.Error(err),
369+
zap.String("identifier", data.Identifier))
370+
continue
371+
}
372+
373+
var finalSketchData []byte
374+
var finalTotalCount int64
375+
var finalUniqueCount int64
376+
377+
if err == pgx.ErrNoRows {
378+
// No existing row - use new sketch as-is
379+
var marshalErr error
380+
finalSketchData, marshalErr = newSketch.MarshalBinary()
381+
if marshalErr != nil {
382+
rmc.logger.Error("Failed to marshal new sketch",
383+
zap.Error(marshalErr),
384+
zap.String("identifier", data.Identifier))
385+
continue
386+
}
387+
finalTotalCount = data.TotalCount
388+
finalUniqueCount = int64(newSketch.Estimate())
389+
} else {
390+
// Row exists - merge sketches
391+
if existingSketchData != nil {
392+
// Merge with existing sketch
393+
existingSketch, unmarshalErr := hll.UnmarshalSketch(existingSketchData, 12)
394+
if unmarshalErr != nil {
395+
rmc.logger.Error("Failed to unmarshal existing sketch",
396+
zap.Error(unmarshalErr),
397+
zap.String("identifier", data.Identifier))
398+
continue
399+
}
400+
401+
if mergeErr := existingSketch.Merge(newSketch); mergeErr != nil {
402+
rmc.logger.Error("Failed to merge sketches",
403+
zap.Error(mergeErr),
404+
zap.String("identifier", data.Identifier))
405+
continue
406+
}
407+
408+
var marshalErr error
409+
finalSketchData, marshalErr = existingSketch.MarshalBinary()
410+
if marshalErr != nil {
411+
rmc.logger.Error("Failed to marshal merged sketch",
412+
zap.Error(marshalErr),
413+
zap.String("identifier", data.Identifier))
414+
continue
415+
}
416+
finalUniqueCount = int64(existingSketch.Estimate())
417+
} else {
418+
// Row exists but sketch is NULL - use new sketch
419+
var marshalErr error
420+
finalSketchData, marshalErr = newSketch.MarshalBinary()
421+
if marshalErr != nil {
422+
rmc.logger.Error("Failed to marshal new sketch",
423+
zap.Error(marshalErr),
424+
zap.String("identifier", data.Identifier))
425+
continue
426+
}
427+
finalUniqueCount = int64(newSketch.Estimate())
428+
}
429+
finalTotalCount = existingCount + data.TotalCount
430+
}
431+
432+
// Use INSERT ... ON CONFLICT for atomic upsert (same pattern as api_metrics_apps)
433+
upsertQuery := `
434+
INSERT INTO api_metrics_apps_unique (date, app_name, hll_sketch, total_count, unique_count, created_at, updated_at)
435+
VALUES ($1, $2, $3, $4, $5, NOW(), NOW())
436+
ON CONFLICT (date, app_name)
437+
DO UPDATE SET
438+
hll_sketch = EXCLUDED.hll_sketch,
439+
total_count = EXCLUDED.total_count,
440+
unique_count = EXCLUDED.unique_count,
441+
updated_at = NOW()`
442+
443+
_, err = tx.Exec(ctx, upsertQuery, date, data.Identifier, finalSketchData, finalTotalCount, finalUniqueCount)
444+
if err != nil {
445+
rmc.logger.Error("Failed to upsert app unique metrics",
446+
zap.Error(err),
447+
zap.String("identifier", data.Identifier))
448+
continue
449+
}
450+
451+
appUniqueUpserted++
452+
}
453+
454+
rmc.logger.Debug("Flushed app unique metrics",
455+
zap.Int("upserted", appUniqueUpserted),
456+
zap.Int("total", len(appUniqueData)))
457+
}
458+
298459
// Commit transaction
299460
if err := tx.Commit(ctx); err != nil {
300461
rmc.logger.Error("Failed to commit metrics transaction", zap.Error(err))

api/server.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,7 @@ func NewApiServer(config config.Config) *ApiServer {
524524
g.Get("/metrics/total_artists", app.v1MetricsTotalArtists)
525525
g.Get("/metrics/total_wallets", app.v1MetricsTotalWallets)
526526
g.Get("/metrics/aggregates/apps/:time_range", app.v1MetricsApps)
527+
g.Get("/metrics/aggregates/apps/:time_range/unique", app.v1MetricsAppsUnique)
527528
g.Get("/metrics/aggregates/routes/:time_range", app.v1MetricsRoutes)
528529
g.Get("/metrics/aggregates/routes/trailing/:time_range", app.v1MetricsRoutesTrailing)
529530

0 commit comments

Comments
 (0)