-
Notifications
You must be signed in to change notification settings - Fork 191
[metrics] count hits/misses for each cache entry kind separately #472
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,32 +25,32 @@ import ( | |
| "github.com/buchgr/bazel-remote/utils/tempfile" | ||
|
|
||
| "github.com/djherbis/atime" | ||
| "github.com/prometheus/client_golang/prometheus" | ||
| "github.com/prometheus/client_golang/prometheus/promauto" | ||
|
|
||
| pb "github.com/buchgr/bazel-remote/genproto/build/bazel/remote/execution/v2" | ||
| "google.golang.org/protobuf/proto" | ||
|
|
||
| "golang.org/x/sync/semaphore" | ||
| ) | ||
|
|
||
| var ( | ||
| cacheHits = promauto.NewCounter(prometheus.CounterOpts{ | ||
| Name: "bazel_remote_disk_cache_hits", | ||
| Help: "The total number of disk backend cache hits", | ||
| }) | ||
| cacheMisses = promauto.NewCounter(prometheus.CounterOpts{ | ||
| Name: "bazel_remote_disk_cache_misses", | ||
| Help: "The total number of disk backend cache misses", | ||
| }) | ||
| ) | ||
|
|
||
| var tfc = tempfile.NewCreator() | ||
|
|
||
| var emptyZstdBlob = []byte{40, 181, 47, 253, 32, 0, 1, 0, 0} | ||
|
|
||
| var hashKeyRegex = regexp.MustCompile("^[a-f0-9]{64}$") | ||
|
|
||
| type Cache interface { | ||
| Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (io.ReadCloser, int64, error) | ||
| GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error) | ||
| GetZstd(ctx context.Context, hash string, size int64, offset int64) (io.ReadCloser, int64, error) | ||
| Put(kind cache.EntryKind, hash string, size int64, r io.Reader) error | ||
| Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) | ||
| FindMissingCasBlobs(ctx context.Context, blobs []*pb.Digest) ([]*pb.Digest, error) | ||
|
|
||
| MaxSize() int64 | ||
| Stats() (totalSize int64, reservedSize int64, numItems int, uncompressedSize int64) | ||
| RegisterMetrics() | ||
| } | ||
|
|
||
| // lruItem is the type of the values stored in SizedLRU to keep track of items. | ||
| type lruItem struct { | ||
| // Size of the blob in uncompressed form. | ||
|
|
@@ -67,9 +67,9 @@ type lruItem struct { | |
| legacy bool | ||
| } | ||
|
|
||
| // Cache is a filesystem-based LRU cache, with an optional backend proxy. | ||
| // diskCache is a filesystem-based LRU cache, with an optional backend proxy. | ||
| // It is safe for concurrent use. | ||
| type Cache struct { | ||
| type diskCache struct { | ||
| dir string | ||
| proxy cache.Proxy | ||
| storageMode casblob.CompressionType | ||
|
|
@@ -108,7 +108,7 @@ func badReqErr(format string, a ...interface{}) *cache.Error { | |
|
|
||
| // New returns a new instance of a filesystem-based cache rooted at `dir`, | ||
| // with a maximum size of `maxSizeBytes` bytes and `opts` Options set. | ||
| func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) { | ||
| func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) { | ||
|
|
||
| err := os.MkdirAll(dir, os.ModePerm) | ||
| if err != nil { | ||
|
|
@@ -120,7 +120,7 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) { | |
| return nil, err | ||
| } | ||
|
|
||
| c := &Cache{ | ||
| c := diskCache{ | ||
| dir: dir, | ||
|
|
||
| // Not using config here, to avoid test import cycles. | ||
|
|
@@ -135,9 +135,34 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) { | |
| fileRemovalSem: semaphore.NewWeighted(5000), | ||
| } | ||
|
|
||
| cc := CacheConfig{diskCache: &c} | ||
|
|
||
| // The eviction callback deletes the file from disk. | ||
| // This function is only called while the lock is held | ||
| // by the current goroutine. | ||
| onEvict := func(key Key, value lruItem) { | ||
| ks := key.(string) | ||
| hash := ks[len(ks)-sha256.Size*2:] | ||
| var kind cache.EntryKind = cache.AC | ||
| if strings.HasPrefix(ks, "cas") { | ||
| kind = cache.CAS | ||
| } else if strings.HasPrefix(ks, "ac") { | ||
| kind = cache.AC | ||
| } else if strings.HasPrefix(ks, "raw") { | ||
| kind = cache.RAW | ||
| } | ||
|
|
||
| f := filepath.Join(dir, c.FileLocation(kind, value.legacy, hash, value.size, value.random)) | ||
|
|
||
| // Run in a goroutine so we can release the lock sooner. | ||
| go c.removeFile(f) | ||
| } | ||
|
|
||
| c.lru = NewSizedLRU(maxSizeBytes, onEvict) | ||
|
|
||
| // Apply options. | ||
| for _, o := range opts { | ||
| err = o(c) | ||
| err = o(&cc) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
@@ -163,29 +188,6 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) { | |
| } | ||
| } | ||
|
|
||
| // The eviction callback deletes the file from disk. | ||
| // This function is only called while the lock is held | ||
| // by the current goroutine. | ||
| onEvict := func(key Key, value lruItem) { | ||
| ks := key.(string) | ||
| hash := ks[len(ks)-sha256.Size*2:] | ||
| var kind cache.EntryKind = cache.AC | ||
| if strings.HasPrefix(ks, "cas") { | ||
| kind = cache.CAS | ||
| } else if strings.HasPrefix(ks, "ac") { | ||
| kind = cache.AC | ||
| } else if strings.HasPrefix(ks, "raw") { | ||
| kind = cache.RAW | ||
| } | ||
|
|
||
| f := filepath.Join(dir, c.FileLocation(kind, value.legacy, hash, value.size, value.random)) | ||
|
|
||
| // Run in a goroutine so we can release the lock sooner. | ||
| go c.removeFile(f) | ||
| } | ||
|
|
||
| c.lru = NewSizedLRU(maxSizeBytes, onEvict) | ||
|
|
||
| err = c.migrateDirectories() | ||
| if err != nil { | ||
| return nil, fmt.Errorf("Attempting to migrate the old directory structure failed: %w", err) | ||
|
|
@@ -195,10 +197,21 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) { | |
| return nil, fmt.Errorf("Loading of existing cache entries failed due to error: %w", err) | ||
| } | ||
|
|
||
| return c, nil | ||
| if cc.metrics == nil { | ||
| return &c, nil | ||
| } | ||
|
|
||
| cc.metrics.diskCache = &c | ||
|
|
||
| return cc.metrics, nil | ||
|
Comment on lines
+200
to
+206
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this knowledge about metrics needed in disk.go?
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIRC I added this to make enabling endpoint metrics an option, then I refactored to use a decorator and this was left as an internal package detail. This seems like a reasonable tradeoff as long as we don't have many decorators.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK! |
||
| } | ||
|
|
||
| // Non-test users must call this to expose metrics. | ||
| func (c *diskCache) RegisterMetrics() { | ||
| c.lru.RegisterMetrics() | ||
| } | ||
|
|
||
| func (c *Cache) removeFile(f string) { | ||
| func (c *diskCache) removeFile(f string) { | ||
| if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil { | ||
| log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f) | ||
| return | ||
|
|
@@ -211,7 +224,7 @@ func (c *Cache) removeFile(f string) { | |
| } | ||
| } | ||
|
|
||
| func (c *Cache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string, size int64) string { | ||
| func (c *diskCache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string, size int64) string { | ||
| if kind == cache.RAW { | ||
| return path.Join("raw.v2", hash[:2], hash) | ||
| } | ||
|
|
@@ -227,7 +240,7 @@ func (c *Cache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string, | |
| return fmt.Sprintf("cas.v2/%s/%s-%d", hash[:2], hash, size) | ||
| } | ||
|
|
||
| func (c *Cache) FileLocation(kind cache.EntryKind, legacy bool, hash string, size int64, random string) string { | ||
| func (c *diskCache) FileLocation(kind cache.EntryKind, legacy bool, hash string, size int64, random string) string { | ||
| if kind == cache.RAW { | ||
| return path.Join("raw.v2", hash[:2], hash+"-"+random) | ||
| } | ||
|
|
@@ -243,7 +256,7 @@ func (c *Cache) FileLocation(kind cache.EntryKind, legacy bool, hash string, siz | |
| return fmt.Sprintf("cas.v2/%s/%s-%d-%s", hash[:2], hash, size, random) | ||
| } | ||
|
|
||
| func (c *Cache) migrateDirectories() error { | ||
| func (c *diskCache) migrateDirectories() error { | ||
| err := migrateDirectory(c.dir, cache.AC) | ||
| if err != nil { | ||
| return err | ||
|
|
@@ -415,7 +428,7 @@ func migrateV1Subdir(oldDir string, destDir string, kind cache.EntryKind) error | |
| // loadExistingFiles lists all files in the cache directory, and adds them to the | ||
| // LRU index so that they can be served. Files are sorted by access time first, | ||
| // so that the eviction behavior is preserved across server restarts. | ||
| func (c *Cache) loadExistingFiles() error { | ||
| func (c *diskCache) loadExistingFiles() error { | ||
| log.Printf("Loading existing files in %s.\n", c.dir) | ||
|
|
||
| // compressed CAS items: <hash>-<logical size>-<random digits/ascii letters> | ||
|
|
@@ -521,7 +534,7 @@ func (c *Cache) loadExistingFiles() error { | |
| // If `hash` is not the empty string, and the contents don't match it, | ||
| // a non-nil error is returned. All data will be read from `r` before | ||
| // this function returns. | ||
| func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) { | ||
| func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) { | ||
| defer func() { | ||
| if r != nil { | ||
| _, _ = io.Copy(ioutil.Discard, r) | ||
|
|
@@ -643,7 +656,7 @@ func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) | |
| return nil | ||
| } | ||
|
|
||
| func (c *Cache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string, size int64, f *os.File) (int64, error) { | ||
| func (c *diskCache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string, size int64, f *os.File) (int64, error) { | ||
| closeFile := true | ||
| defer func() { | ||
| if closeFile { | ||
|
|
@@ -685,7 +698,7 @@ func (c *Cache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string | |
| } | ||
|
|
||
| // This must be called when the lock is not held. | ||
| func (c *Cache) commit(key string, legacy bool, tempfile string, reservedSize int64, logicalSize int64, sizeOnDisk int64, random string) (unreserve bool, removeTempfile bool, err error) { | ||
| func (c *diskCache) commit(key string, legacy bool, tempfile string, reservedSize int64, logicalSize int64, sizeOnDisk int64, random string) (unreserve bool, removeTempfile bool, err error) { | ||
| unreserve = reservedSize > 0 | ||
| removeTempfile = true | ||
|
|
||
|
|
@@ -726,7 +739,7 @@ func (c *Cache) commit(key string, legacy bool, tempfile string, reservedSize in | |
| // but that we can try the proxy backend. | ||
| // | ||
| // This function assumes that only CAS blobs are requested in zstd form. | ||
| func (c *Cache) availableOrTryProxy(kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, foundSize int64, tryProxy bool, err error) { | ||
| func (c *diskCache) availableOrTryProxy(kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, foundSize int64, tryProxy bool, err error) { | ||
| locked := true | ||
| c.mu.Lock() | ||
|
|
||
|
|
@@ -836,27 +849,25 @@ var errOnlyCompressedCAS = &cache.Error{ | |
| // item is not found, the io.ReadCloser will be nil. If some error occurred | ||
| // when processing the request, then it is returned. Callers should provide | ||
| // the `size` of the item to be retrieved, or -1 if unknown. | ||
| func (c *Cache) Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) { | ||
| func (c *diskCache) Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) { | ||
| return c.get(ctx, kind, hash, size, offset, false) | ||
| } | ||
|
|
||
| // GetZstd is just like Get, except the data available from rc is zstandard | ||
| // compressed. Note that the returned `s` value still refers to the amount | ||
| // of data once it has been decompressed. | ||
| func (c *Cache) GetZstd(ctx context.Context, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) { | ||
| func (c *diskCache) GetZstd(ctx context.Context, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) { | ||
| return c.get(ctx, cache.CAS, hash, size, offset, true) | ||
| } | ||
|
|
||
| func (c *Cache) get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, s int64, rErr error) { | ||
| func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, s int64, rErr error) { | ||
| // The hash format is checked properly in the http/grpc code. | ||
| // Just perform a simple/fast check here, to catch bad tests. | ||
| if len(hash) != sha256HashStrSize { | ||
| return nil, -1, badReqErr("Invalid hash size: %d, expected: %d", len(hash), sha256.Size) | ||
| } | ||
|
|
||
| if kind == cache.CAS && size <= 0 && hash == emptySha256 { | ||
| cacheHits.Inc() | ||
|
|
||
| if zstd { | ||
| return ioutil.NopCloser(bytes.NewReader(emptyZstdBlob)), 0, nil | ||
| } | ||
|
|
@@ -917,12 +928,9 @@ func (c *Cache) get(ctx context.Context, kind cache.EntryKind, hash string, size | |
| unreserve = true | ||
| } | ||
| if f != nil { | ||
| cacheHits.Inc() | ||
| return f, foundSize, nil | ||
| } | ||
|
|
||
| cacheMisses.Inc() | ||
|
|
||
| if !tryProxy { | ||
| return nil, -1, nil | ||
| } | ||
|
|
@@ -1006,8 +1014,7 @@ func (c *Cache) get(ctx context.Context, kind cache.EntryKind, hash string, size | |
| // one) will be checked. | ||
| // | ||
| // Callers should provide the `size` of the item, or -1 if unknown. | ||
| func (c *Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) { | ||
|
|
||
| func (c *diskCache) Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) { | ||
| // The hash format is checked properly in the http/grpc code. | ||
| // Just perform a simple/fast check here, to catch bad tests. | ||
| if len(hash) != sha256HashStrSize { | ||
|
|
@@ -1043,14 +1050,14 @@ func (c *Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string, | |
| } | ||
|
|
||
| // MaxSize returns the maximum cache size in bytes. | ||
| func (c *Cache) MaxSize() int64 { | ||
| func (c *diskCache) MaxSize() int64 { | ||
| // The underlying value is never modified, no need to lock. | ||
| return c.lru.MaxSize() | ||
| } | ||
|
|
||
| // Stats returns the current size of the cache in bytes, and the number of | ||
| // items stored in the cache. | ||
| func (c *Cache) Stats() (totalSize int64, reservedSize int64, numItems int, uncompressedSize int64) { | ||
| func (c *diskCache) Stats() (totalSize int64, reservedSize int64, numItems int, uncompressedSize int64) { | ||
| c.mu.Lock() | ||
| defer c.mu.Unlock() | ||
|
|
||
|
|
@@ -1074,8 +1081,7 @@ func ensureDirExists(path string) { | |
| // value from the CAS if it and all its dependencies are also available. If | ||
| // not, nil values are returned. If something unexpected went wrong, return | ||
| // an error. | ||
| func (c *Cache) GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error) { | ||
|
|
||
| func (c *diskCache) GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error) { | ||
| rc, sizeBytes, err := c.Get(ctx, cache.AC, hash, -1, 0) | ||
| if rc != nil { | ||
| defer rc.Close() | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this- is there a better way to only register metrics in non-test code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know. I generally use implicit registering via promauto instead of explicitly registering. Does it matter if registered also for test code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The prometheus library panics with a "duplicate metrics collector registration attempted" in that case. I think we can live with this new method for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK. I agree.