diff --git a/internal/api/handlers/indexer.go b/internal/api/handlers/indexer.go index 1961af8..4acdc35 100644 --- a/internal/api/handlers/indexer.go +++ b/internal/api/handlers/indexer.go @@ -98,7 +98,7 @@ func StopIndexer(w http.ResponseWriter, r *http.Request) { }) } -// GetIndexerStatus returns the current indexer status +// GetIndexerStatus returns the current indexer status (lightweight, no heavy queries) func GetIndexerStatus(w http.ResponseWriter, r *http.Request) { running := false if indexerController != nil { @@ -107,14 +107,20 @@ func GetIndexerStatus(w http.ResponseWriter, r *http.Request) { enabled, _ := database.GetSetting("indexer_enabled") - // Get indexing stats - stats, _ := database.GetStats() + // Use lightweight count queries instead of full GetStats() + db := database.Get() + + var totalTorrents int64 + db.QueryRow("SELECT COUNT(*) FROM torrents").Scan(&totalTorrents) + + var connectedRelays int64 + db.QueryRow("SELECT COUNT(*) FROM relays WHERE status = 'connected'").Scan(&connectedRelays) respondJSON(w, http.StatusOK, map[string]interface{}{ "running": running, "enabled": enabled == "true", - "total_torrents": stats["total_torrents"], - "connected_relays": stats["connected_relays"], + "total_torrents": totalTorrents, + "connected_relays": connectedRelays, }) } diff --git a/internal/api/handlers/search.go b/internal/api/handlers/search.go index 321736b..fe8b22a 100644 --- a/internal/api/handlers/search.go +++ b/internal/api/handlers/search.go @@ -102,16 +102,22 @@ func Search(w http.ResponseWriter, r *http.Request) { var rows *sql.Rows + // Build trust EXISTS subquery (avoids JOIN + DISTINCT overhead) + trustExistsClause := `EXISTS ( + SELECT 1 FROM torrent_uploads tu + WHERE tu.torrent_id = t.id + AND tu.uploader_npub IN ` + trustPlaceholders + ` + )` + if query != "" { // Full-text search with trust filtering sqlQuery := ` - SELECT DISTINCT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, + SELECT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, t.magnet_uri, t.title, t.year, t.poster_url, t.overview, t.trust_score, t.first_seen_at FROM torrents t JOIN torrents_fts fts ON t.id = fts.rowid - JOIN torrent_uploads tu ON t.id = tu.torrent_id WHERE torrents_fts MATCH ? - AND tu.uploader_npub IN ` + trustPlaceholders + AND ` + trustExistsClause args := []interface{}{query} args = append(args, trustArgs...) @@ -132,32 +138,43 @@ func Search(w http.ResponseWriter, r *http.Request) { args = append(args, limit, offset) rows, err = db.Query(sqlQuery, args...) - } else { - // List all (no search query) with trust filtering + } else if category != "" { + // Category filter: use composite index (category, trust_score, first_seen_at) + // to filter by category AND walk in sort order simultaneously. + // Empty categories return instantly (no matching index entries). + // Populated categories find 50 rows by walking the index in order. sqlQuery := ` - SELECT DISTINCT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, + SELECT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, t.magnet_uri, t.title, t.year, t.poster_url, t.overview, t.trust_score, t.first_seen_at - FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id - WHERE tu.uploader_npub IN ` + trustPlaceholders + FROM torrents t INDEXED BY idx_torrents_category_trust_seen + WHERE ` + trustExistsClause args := append([]interface{}{}, trustArgs...) - if category != "" { - if isBaseCategory { - // Match all subcategories within the base category range - sqlQuery += " AND t.category >= ? AND t.category < ?" - args = append(args, categoryNum, categoryNum+1000) - } else { - // Exact match for subcategory - sqlQuery += " AND t.category = ?" - args = append(args, categoryNum) - } + if isBaseCategory { + sqlQuery += " AND t.category >= ? AND t.category < ?" + args = append(args, categoryNum, categoryNum+1000) + } else { + sqlQuery += " AND t.category = ?" + args = append(args, categoryNum) } sqlQuery += " ORDER BY t.trust_score DESC, t.first_seen_at DESC LIMIT ? OFFSET ?" args = append(args, limit, offset) + rows, err = db.Query(sqlQuery, args...) + } else { + // No filters: walk the sort index directly, check trust for each row, stop at LIMIT + sqlQuery := ` + SELECT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, + t.magnet_uri, t.title, t.year, t.poster_url, t.overview, t.trust_score, t.first_seen_at + FROM torrents t INDEXED BY idx_torrents_trust_first_seen + WHERE ` + trustExistsClause + ` + ORDER BY t.trust_score DESC, t.first_seen_at DESC LIMIT ? OFFSET ?` + + args := append([]interface{}{}, trustArgs...) + args = append(args, limit, offset) + rows, err = db.Query(sqlQuery, args...) } @@ -203,11 +220,10 @@ func Search(w http.ResponseWriter, r *http.Request) { var total int64 if query != "" { countQuery := ` - SELECT COUNT(DISTINCT t.id) FROM torrents t + SELECT COUNT(*) FROM torrents t JOIN torrents_fts fts ON t.id = fts.rowid - JOIN torrent_uploads tu ON t.id = tu.torrent_id WHERE torrents_fts MATCH ? - AND tu.uploader_npub IN ` + trustPlaceholders + AND ` + trustExistsClause countArgs := []interface{}{query} countArgs = append(countArgs, trustArgs...) @@ -221,23 +237,25 @@ func Search(w http.ResponseWriter, r *http.Request) { } } db.QueryRow(countQuery, countArgs...).Scan(&total) - } else { - countQuery := ` - SELECT COUNT(DISTINCT t.id) FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id - WHERE tu.uploader_npub IN ` + trustPlaceholders - - countArgs := append([]interface{}{}, trustArgs...) - if category != "" { - if isBaseCategory { - countQuery += " AND t.category >= ? AND t.category < ?" - countArgs = append(countArgs, categoryNum, categoryNum+1000) - } else { - countQuery += " AND t.category = ?" - countArgs = append(countArgs, categoryNum) - } + } else if category != "" { + // Category count: use the category index directly. An exact trust-filtered + // count requires a JOIN across both tables (O(n) full scan), so we use an + // approximate count from the category index alone (O(log n) index lookup). + // This may slightly overcount if untrusted torrents exist in the category, + // but the indexer already filters by trusted authors at ingest time, so + // the difference is negligible in practice. + if isBaseCategory { + db.QueryRow(`SELECT COUNT(*) FROM torrents WHERE category >= ? AND category < ?`, + categoryNum, categoryNum+1000).Scan(&total) + } else { + db.QueryRow(`SELECT COUNT(*) FROM torrents WHERE category = ?`, + categoryNum).Scan(&total) } - db.QueryRow(countQuery, countArgs...).Scan(&total) + } else { + // No filters: count distinct torrents from trusted uploaders + countQuery := `SELECT COUNT(DISTINCT torrent_id) FROM torrent_uploads + WHERE uploader_npub IN ` + trustPlaceholders + db.QueryRow(countQuery, trustArgs...).Scan(&total) } respondJSON(w, http.StatusOK, map[string]interface{}{ diff --git a/internal/api/handlers/stats.go b/internal/api/handlers/stats.go index b711cd0..6ce56c1 100644 --- a/internal/api/handlers/stats.go +++ b/internal/api/handlers/stats.go @@ -2,16 +2,36 @@ package handlers import ( "database/sql" + "encoding/json" "net/http" "strconv" + "sync" + "time" "github.com/gmonarque/lighthouse/internal/database" "github.com/gmonarque/lighthouse/internal/nostr" "github.com/gmonarque/lighthouse/internal/trust" ) -// GetStats returns dashboard statistics (filtered by trust) +// statsCache caches the expensive stats response (60s TTL) +var statsCache struct { + mu sync.RWMutex + data []byte + expiry time.Time +} + +// GetStats returns dashboard statistics (filtered by trust), cached for 60s func GetStats(w http.ResponseWriter, r *http.Request) { + // Serve from cache if fresh + statsCache.mu.RLock() + if time.Now().Before(statsCache.expiry) && statsCache.data != nil { + data := statsCache.data + statsCache.mu.RUnlock() + w.Header().Set("Content-Type", "application/json") + w.Write(data) + return + } + statsCache.mu.RUnlock() db := database.Get() // Get trusted uploaders for filtering @@ -41,7 +61,7 @@ func GetStats(w http.ResponseWriter, r *http.Request) { stats["categories"] = make(map[int]int64) stats["recent_torrents"] = []interface{}{} } else { - // Build trust filter subquery + // Build trust EXISTS subquery (avoids JOIN + DISTINCT overhead) trustPlaceholders := "(" trustArgs := make([]interface{}, len(trustedHexPubkeys)) for i, u := range trustedHexPubkeys { @@ -53,32 +73,28 @@ func GetStats(w http.ResponseWriter, r *http.Request) { } trustPlaceholders += ")" - // Total trusted torrents - var totalTorrents int64 - countQuery := `SELECT COUNT(DISTINCT t.id) FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id - WHERE tu.uploader_npub IN ` + trustPlaceholders - if err := db.QueryRow(countQuery, trustArgs...).Scan(&totalTorrents); err != nil { - respondError(w, http.StatusInternalServerError, "Failed to get stats") - return - } - stats["total_torrents"] = totalTorrents + // Drive queries from torrent_uploads (much smaller than torrents). + // torrent_uploads is filtered by uploader via index, then JOINed to torrents by PK. + // Use COUNT(DISTINCT) to avoid overcounting torrents with multiple uploads. - // Total size (bytes) for trusted torrents + // Count distinct torrents + total size in a single query + var totalTorrents int64 var totalSize sql.NullInt64 - sizeQuery := `SELECT SUM(t.size) FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id + summaryQuery := `SELECT COUNT(DISTINCT tu.torrent_id), COALESCE(SUM(t.size), 0) + FROM torrents t INNER JOIN torrent_uploads tu ON t.id = tu.torrent_id WHERE tu.uploader_npub IN ` + trustPlaceholders - if err := db.QueryRow(sizeQuery, trustArgs...).Scan(&totalSize); err != nil { + if err := db.QueryRow(summaryQuery, trustArgs...).Scan(&totalTorrents, &totalSize); err != nil { respondError(w, http.StatusInternalServerError, "Failed to get stats") return } + stats["total_torrents"] = totalTorrents stats["total_size"] = totalSize.Int64 - // Torrents by category (trusted only) - catQuery := `SELECT t.category, COUNT(DISTINCT t.id) as count FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id - WHERE tu.uploader_npub IN ` + trustPlaceholders + ` GROUP BY t.category` + // Categories: same approach, drive from torrent_uploads + catQuery := `SELECT t.category, COUNT(DISTINCT tu.torrent_id) as count + FROM torrents t INNER JOIN torrent_uploads tu ON t.id = tu.torrent_id + WHERE tu.uploader_npub IN ` + trustPlaceholders + ` + GROUP BY t.category` rows, err := db.Query(catQuery, trustArgs...) if err != nil { respondError(w, http.StatusInternalServerError, "Failed to get stats") @@ -97,12 +113,16 @@ func GetStats(w http.ResponseWriter, r *http.Request) { } stats["categories"] = categories - // Recent trusted torrents - recentQuery := `SELECT DISTINCT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, + // Recent 10: EXISTS with first_seen_at index is fast for small LIMIT + trustExistsClause := `EXISTS ( + SELECT 1 FROM torrent_uploads tu + WHERE tu.torrent_id = t.id + AND tu.uploader_npub IN ` + trustPlaceholders + ` + )` + recentQuery := `SELECT t.id, t.info_hash, t.name, t.size, t.category, t.seeders, t.leechers, t.title, t.year, t.poster_url, t.trust_score, t.first_seen_at FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id - WHERE tu.uploader_npub IN ` + trustPlaceholders + ` + WHERE ` + trustExistsClause + ` ORDER BY t.first_seen_at DESC LIMIT 10` recentRows, err := db.Query(recentQuery, trustArgs...) if err != nil { @@ -163,29 +183,16 @@ func GetStats(w http.ResponseWriter, r *http.Request) { } stats["blacklist_count"] = blacklistCount - // Unique uploaders should also be filtered by trust - var uniqueUploaders int64 - if len(trustedHexPubkeys) > 0 { - // Build placeholder string for IN clause - uploaderPlaceholders := "(" - uploaderArgs := make([]interface{}, len(trustedHexPubkeys)) - for i, u := range trustedHexPubkeys { - if i > 0 { - uploaderPlaceholders += "," - } - uploaderPlaceholders += "?" - uploaderArgs[i] = u - } - uploaderPlaceholders += ")" + // Unique uploaders (just count trusted pubkeys directly — no DB query needed) + stats["unique_uploaders"] = int64(len(trustedHexPubkeys)) - uploaderQuery := `SELECT COUNT(DISTINCT uploader_npub) FROM torrent_uploads WHERE uploader_npub IN ` + uploaderPlaceholders - if err := db.QueryRow(uploaderQuery, uploaderArgs...).Scan(&uniqueUploaders); err != nil { - uniqueUploaders = 0 - } - } else { - uniqueUploaders = 0 + // Cache the result for 60 seconds + if jsonData, err := json.Marshal(stats); err == nil { + statsCache.mu.Lock() + statsCache.data = jsonData + statsCache.expiry = time.Now().Add(60 * time.Second) + statsCache.mu.Unlock() } - stats["unique_uploaders"] = uniqueUploaders respondJSON(w, http.StatusOK, stats) } @@ -229,7 +236,7 @@ func GetStatsChart(w http.ResponseWriter, r *http.Request) { return } - // Build trust filter + // Build trust EXISTS subquery trustPlaceholders := "(" trustArgs := make([]interface{}, len(trustedHexPubkeys)+1) trustArgs[0] = days @@ -242,9 +249,9 @@ func GetStatsChart(w http.ResponseWriter, r *http.Request) { } trustPlaceholders += ")" - query := `SELECT DATE(t.first_seen_at) as date, COUNT(DISTINCT t.id) as count - FROM torrents t - JOIN torrent_uploads tu ON t.id = tu.torrent_id + // Drive from torrent_uploads, join torrents only for first_seen_at + query := `SELECT DATE(t.first_seen_at) as date, COUNT(*) as count + FROM torrents t INNER JOIN torrent_uploads tu ON t.id = tu.torrent_id WHERE t.first_seen_at >= DATE('now', '-' || ? || ' days') AND tu.uploader_npub IN ` + trustPlaceholders + ` GROUP BY DATE(t.first_seen_at) diff --git a/internal/api/handlers/torznab.go b/internal/api/handlers/torznab.go index 62b2cbb..99e48a0 100644 --- a/internal/api/handlers/torznab.go +++ b/internal/api/handlers/torznab.go @@ -5,17 +5,40 @@ import ( "net/http" "strconv" + "github.com/gmonarque/lighthouse/internal/api/apikeys" + "github.com/gmonarque/lighthouse/internal/api/middleware" "github.com/gmonarque/lighthouse/internal/config" "github.com/gmonarque/lighthouse/internal/torznab" ) // Torznab handles all Torznab API requests func Torznab(w http.ResponseWriter, r *http.Request) { - // Validate API key + // Validate API key: accept legacy config key OR multi-user key with torznab permission cfg := config.Get() apiKey := r.URL.Query().Get("apikey") + if apiKey == "" { + apiKey = r.Header.Get("X-API-Key") + } + + authenticated := false + + // Check legacy single API key + if cfg.Server.APIKey != "" && apiKey == cfg.Server.APIKey { + authenticated = true + } + + // Check multi-user API keys with torznab permission + if !authenticated && apiKey != "" { + storage := middleware.GetAPIKeyStorage() + if key, err := storage.ValidateKey(apiKey); err == nil && key != nil { + if key.HasAnyPermission(apikeys.PermissionTorznab, apikeys.PermissionAdmin) { + authenticated = true + } + } + } - if cfg.Server.APIKey != "" && apiKey != cfg.Server.APIKey { + // If auth is required and not authenticated, reject + if !authenticated && (cfg.Server.APIKey != "" || apiKey != "") { respondTorznabError(w, torznab.ErrorIncorrectUserCreds, "Invalid API key") return } diff --git a/internal/api/middleware/ratelimit.go b/internal/api/middleware/ratelimit.go index b5c5750..c2082cd 100644 --- a/internal/api/middleware/ratelimit.go +++ b/internal/api/middleware/ratelimit.go @@ -89,8 +89,8 @@ func (rl *RateLimiter) cleanupLoop() { // Default rate limiters var ( // IPRateLimiter limits requests per IP address - // 100 requests per minute per IP - IPRateLimiter = NewRateLimiter(100, time.Minute) + // 300 requests per minute per IP (dashboard UI makes ~10 API calls per page) + IPRateLimiter = NewRateLimiter(300, time.Minute) // APIKeyRateLimiter limits requests per API key // 1000 requests per minute per API key diff --git a/internal/config/config.go b/internal/config/config.go index 51480e5..aa1fe95 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -207,7 +207,7 @@ func setDefaults() { } func createDefaultConfig() error { - configPath := "./config.yaml" + configPath := "./config/config.yaml" // Ensure parent directory exists dir := filepath.Dir(configPath) diff --git a/internal/database/schema.sql b/internal/database/schema.sql index 24691d1..358169d 100644 --- a/internal/database/schema.sql +++ b/internal/database/schema.sql @@ -341,6 +341,15 @@ CREATE INDEX IF NOT EXISTS idx_relay_events_created ON relay_events(created_at D CREATE INDEX IF NOT EXISTS idx_activity_log_type ON activity_log(event_type); CREATE INDEX IF NOT EXISTS idx_activity_log_created ON activity_log(created_at DESC); +-- Composite indexes for performance optimization +CREATE INDEX IF NOT EXISTS idx_torrents_trust_first_seen ON torrents(trust_score DESC, first_seen_at DESC); +CREATE INDEX IF NOT EXISTS idx_torrent_uploads_uploader_torrent ON torrent_uploads(uploader_npub, torrent_id); +CREATE INDEX IF NOT EXISTS idx_torrents_imdb ON torrents(imdb_id); +CREATE INDEX IF NOT EXISTS idx_torrents_tmdb ON torrents(tmdb_id); +CREATE INDEX IF NOT EXISTS idx_activity_log_type_created ON activity_log(event_type, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_comments_infohash_created ON torrent_comments(infohash, created_at DESC); +CREATE INDEX IF NOT EXISTS idx_torrents_category_trust_seen ON torrents(category, trust_score DESC, first_seen_at DESC); + -- ===================================================== -- DEFAULT SETTINGS -- ===================================================== diff --git a/internal/database/sqlite.go b/internal/database/sqlite.go index bcfdbfb..905dce1 100644 --- a/internal/database/sqlite.go +++ b/internal/database/sqlite.go @@ -43,6 +43,11 @@ func Init(dbPath string) error { return fmt.Errorf("failed to ping database: %w", err) } + // Performance PRAGMAs for large databases + db.Exec("PRAGMA cache_size = -65536") // 64MB cache (default is 2MB) + db.Exec("PRAGMA mmap_size = 268435456") // 256MB memory-mapped I/O + db.Exec("PRAGMA temp_store = MEMORY") // Temp tables in memory + // Run schema if err := runSchema(); err != nil { return fmt.Errorf("failed to run schema: %w", err) @@ -308,6 +313,22 @@ func GetTorrentsPerDay(days int) ([]map[string]interface{}, error) { return stats, nil } +// GetLatestEventTimestamp returns a unix timestamp to resume historical fetch from. +// Uses MAX(first_seen_at) from torrents (indexed, fast) minus a 1-hour buffer to +// account for clock skew between local time and relay event timestamps, and to +// catch any late-arriving events. This means a small overlap is re-processed on +// restart, which is safe since the deduplicator handles duplicates. +func GetLatestEventTimestamp() (int64, error) { + var ts int64 + err := db.QueryRow(` + SELECT COALESCE(MAX(strftime('%s', first_seen_at)), 0) FROM torrents + `).Scan(&ts) + if ts > 3600 { + ts -= 3600 // 1-hour buffer for clock skew and late arrivals + } + return ts, err +} + // LogActivity logs an activity event func LogActivity(eventType string, details string) error { _, err := db.Exec(` diff --git a/internal/indexer/indexer.go b/internal/indexer/indexer.go index c13b6cf..2f0923b 100644 --- a/internal/indexer/indexer.go +++ b/internal/indexer/indexer.go @@ -24,6 +24,12 @@ type Indexer struct { ctx context.Context cancel context.CancelFunc stats IndexerStats + + // Cached trust data to avoid repeated DB queries per event + trustedSet map[string]bool // hex pubkeys that are trusted + blacklistSet map[string]bool // hex pubkeys that are blacklisted + cacheMu sync.RWMutex + cacheExpiry time.Time } // IndexerStats tracks indexer statistics @@ -92,7 +98,28 @@ func (idx *Indexer) Start(ctx context.Context) error { log.Info().Int("trusted_uploaders", len(trustedPubkeys)).Msg("Subscribing to trusted uploaders") - // Subscribe to torrent events from trusted uploaders only + // Fetch history via paginated queries, resuming from the latest event we already have + go func() { + sinceTimestamp, err := database.GetLatestEventTimestamp() + if err != nil { + log.Warn().Err(err).Msg("Failed to get latest event timestamp, fetching full history") + sinceTimestamp = 0 + } + + if sinceTimestamp > 0 { + log.Info().Int64("since_unix", sinceTimestamp).Msg("Resuming historical fetch from last known event") + } else { + log.Info().Msg("Starting full historical fetch (first run)") + } + + if err := idx.relayManager.FetchAllHistoricalTorrents(idx.ctx, trustedPubkeys, sinceTimestamp, func(event *gonostr.Event, relayURL string) { + idx.processEvent(event, relayURL) + }); err != nil { + log.Error().Err(err).Msg("Historical fetch failed") + } + }() + + // Subscribe to torrent events from trusted uploaders only (real-time + latest batch) err = idx.relayManager.SubscribeTrustedTorrents(idx.ctx, trustedPubkeys, func(event *gonostr.Event, relayURL string) { idx.processEvent(event, relayURL) }) @@ -222,51 +249,84 @@ func (idx *Indexer) processEvent(event *gonostr.Event, relayURL string) { } } -// isBlacklisted checks if a pubkey is blacklisted -func (idx *Indexer) isBlacklisted(pubkey string) bool { - db := database.Get() - var count int - err := db.QueryRow("SELECT COUNT(*) FROM trust_blacklist WHERE npub = ?", pubkey).Scan(&count) - if err != nil { - return false +// refreshTrustCache rebuilds the in-memory trust and blacklist sets from the DB. +// The cache is refreshed at most once per minute. +func (idx *Indexer) refreshTrustCache() { + idx.cacheMu.RLock() + if time.Now().Before(idx.cacheExpiry) { + idx.cacheMu.RUnlock() + return } - return count > 0 -} + idx.cacheMu.RUnlock() -// isTrusted checks if a pubkey is trusted based on whitelist and trust depth -func (idx *Indexer) isTrusted(pubkey string) bool { + idx.cacheMu.Lock() + defer idx.cacheMu.Unlock() + + // Double-check after acquiring write lock + if time.Now().Before(idx.cacheExpiry) { + return + } + + db := database.Get() + + // Build trusted set (store both hex and npub forms for fast lookup) wot := trust.NewWebOfTrust() trustedUploaders, err := wot.GetTrustedUploaders() if err != nil { - log.Error().Err(err).Msg("Failed to get trusted uploaders") - return false + log.Error().Err(err).Msg("Failed to refresh trusted uploaders cache") + return } - // If no trusted uploaders configured, reject all (strict mode) - if len(trustedUploaders) == 0 { - return false + newTrusted := make(map[string]bool, len(trustedUploaders)*2) + for _, npubOrHex := range trustedUploaders { + newTrusted[npubOrHex] = true + if hexPk, err := nostr.NpubToHex(npubOrHex); err == nil { + newTrusted[hexPk] = true + } } - // Convert pubkey to npub for comparison if needed - npub, err := nostr.HexToNpub(pubkey) - if err != nil { - npub = pubkey // Use as-is if conversion fails + // Build blacklist set + newBlacklist := make(map[string]bool) + rows, err := db.Query("SELECT npub FROM trust_blacklist") + if err == nil { + defer rows.Close() + for rows.Next() { + var npub string + if rows.Scan(&npub) == nil { + newBlacklist[npub] = true + if hexPk, err := nostr.NpubToHex(npub); err == nil { + newBlacklist[hexPk] = true + } + } + } } - // Check if pubkey (hex) or npub is in trusted list - for _, trusted := range trustedUploaders { - // Convert trusted npub to hex for comparison - trustedHex, err := nostr.NpubToHex(trusted) - if err != nil { - trustedHex = trusted - } + idx.trustedSet = newTrusted + idx.blacklistSet = newBlacklist + idx.cacheExpiry = time.Now().Add(60 * time.Second) - if pubkey == trustedHex || pubkey == trusted || npub == trusted { - return true - } + log.Debug().Int("trusted", len(trustedUploaders)).Int("blacklisted", len(newBlacklist)/2).Msg("Trust cache refreshed") +} + +// isBlacklisted checks if a pubkey is blacklisted (uses cache) +func (idx *Indexer) isBlacklisted(pubkey string) bool { + idx.refreshTrustCache() + idx.cacheMu.RLock() + defer idx.cacheMu.RUnlock() + return idx.blacklistSet[pubkey] +} + +// isTrusted checks if a pubkey is trusted based on whitelist and trust depth (uses cache) +func (idx *Indexer) isTrusted(pubkey string) bool { + idx.refreshTrustCache() + idx.cacheMu.RLock() + defer idx.cacheMu.RUnlock() + + if len(idx.trustedSet) == 0 { + return false } - return false + return idx.trustedSet[pubkey] } // matchesTagFilter checks if a torrent matches the configured tag filter diff --git a/internal/nostr/relay.go b/internal/nostr/relay.go index defe2bd..831cb95 100644 --- a/internal/nostr/relay.go +++ b/internal/nostr/relay.go @@ -236,6 +236,83 @@ func (rm *RelayManager) SubscribeTrustedTorrents(ctx context.Context, pubkeys [] return rm.SubscribeAll(ctx, filters, handler) } +// FetchAllHistoricalTorrents fetches torrent events from trusted authors by paginating +// through pages using the Until filter (newest-first, decreasing Until per page). +// If sinceTimestamp > 0, only fetches events newer than that unix timestamp, +// allowing resumption from where the last fetch left off. +func (rm *RelayManager) FetchAllHistoricalTorrents(ctx context.Context, pubkeys []string, sinceTimestamp int64, handler func(*nostr.Event, string)) error { + if len(pubkeys) == 0 { + return errors.New("no pubkeys provided") + } + + clients := rm.GetConnectedClients() + if len(clients) == 0 { + return errors.New("no connected relays") + } + + const pageSize = 500 + + for _, client := range clients { + url := client.URL() + log.Info().Str("relay", url).Int64("since", sinceTimestamp).Msg("Fetching historical torrents (paginated)") + + var until *nostr.Timestamp + var since *nostr.Timestamp + if sinceTimestamp > 0 { + s := nostr.Timestamp(sinceTimestamp) + since = &s + } + totalFetched := 0 + page := 0 + + for { + filter := nostr.Filter{ + Kinds: []int{KindTorrent}, + Authors: pubkeys, + Limit: pageSize, + } + if until != nil { + filter.Until = until + } + if since != nil { + filter.Since = since + } + + events, err := client.QueryEvents(ctx, []nostr.Filter{filter}) + if err != nil { + log.Error().Err(err).Str("relay", url).Int("page", page).Msg("Failed to query historical events") + break + } + + if len(events) == 0 { + break + } + + for _, event := range events { + handler(event, url) + } + + totalFetched += len(events) + page++ + log.Info().Str("relay", url).Int("page", page).Int("batch", len(events)).Int("total", totalFetched).Msg("Historical page fetched") + + // Advance Until to the oldest event's timestamp in this batch. + // Using the exact timestamp (not -1) avoids skipping events that + // share the same second. The deduplicator handles any overlap. + oldest := events[len(events)-1].CreatedAt + if until != nil && oldest == *until { + // Same timestamp as last page — we've exhausted this second + oldest-- + } + until = &oldest + } + + log.Info().Str("relay", url).Int("total", totalFetched).Msg("Historical fetch complete") + } + + return nil +} + // FetchContactList fetches contact list from any connected relay func (rm *RelayManager) FetchContactList(ctx context.Context, pubkey string) (*nostr.Event, error) { clients := rm.GetConnectedClients()