Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cache/disk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"disk.go",
"findmissing.go",
"lru.go",
"metrics.go",
"options.go",
],
importpath = "github.com/buchgr/bazel-remote/cache/disk",
Expand All @@ -17,7 +18,6 @@ go_library(
"//utils/tempfile:go_default_library",
"@com_github_djherbis_atime//:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/promauto:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
Expand All @@ -39,6 +39,8 @@ go_test(
"//cache/httpproxy:go_default_library",
"//genproto/build/bazel/remote/execution/v2:go_default_library",
"//utils:go_default_library",
"@com_github_prometheus_client_golang//prometheus:go_default_library",
"@com_github_prometheus_client_golang//prometheus/testutil:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
],
)
136 changes: 71 additions & 65 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,32 +25,32 @@ import (
"github.com/buchgr/bazel-remote/utils/tempfile"

"github.com/djherbis/atime"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

pb "github.com/buchgr/bazel-remote/genproto/build/bazel/remote/execution/v2"
"google.golang.org/protobuf/proto"

"golang.org/x/sync/semaphore"
)

var (
cacheHits = promauto.NewCounter(prometheus.CounterOpts{
Name: "bazel_remote_disk_cache_hits",
Help: "The total number of disk backend cache hits",
})
cacheMisses = promauto.NewCounter(prometheus.CounterOpts{
Name: "bazel_remote_disk_cache_misses",
Help: "The total number of disk backend cache misses",
})
)

var tfc = tempfile.NewCreator()

var emptyZstdBlob = []byte{40, 181, 47, 253, 32, 0, 1, 0, 0}

var hashKeyRegex = regexp.MustCompile("^[a-f0-9]{64}$")

type Cache interface {
Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (io.ReadCloser, int64, error)
GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error)
GetZstd(ctx context.Context, hash string, size int64, offset int64) (io.ReadCloser, int64, error)
Put(kind cache.EntryKind, hash string, size int64, r io.Reader) error
Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64)
FindMissingCasBlobs(ctx context.Context, blobs []*pb.Digest) ([]*pb.Digest, error)

MaxSize() int64
Stats() (totalSize int64, reservedSize int64, numItems int, uncompressedSize int64)
RegisterMetrics()
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like this- is there a better way to only register metrics in non-test code?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I generally use implicit registering via promauto instead of explicitly registering. Does it matter if registered also for test code?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The prometheus library panics with a "duplicate metrics collector registration attempted" in that case. I think we can live with this new method for now.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I agree.

}

// lruItem is the type of the values stored in SizedLRU to keep track of items.
type lruItem struct {
// Size of the blob in uncompressed form.
Expand All @@ -67,9 +67,9 @@ type lruItem struct {
legacy bool
}

// Cache is a filesystem-based LRU cache, with an optional backend proxy.
// diskCache is a filesystem-based LRU cache, with an optional backend proxy.
// It is safe for concurrent use.
type Cache struct {
type diskCache struct {
dir string
proxy cache.Proxy
storageMode casblob.CompressionType
Expand Down Expand Up @@ -108,7 +108,7 @@ func badReqErr(format string, a ...interface{}) *cache.Error {

// New returns a new instance of a filesystem-based cache rooted at `dir`,
// with a maximum size of `maxSizeBytes` bytes and `opts` Options set.
func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) {
func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {

err := os.MkdirAll(dir, os.ModePerm)
if err != nil {
Expand All @@ -120,7 +120,7 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) {
return nil, err
}

c := &Cache{
c := diskCache{
dir: dir,

// Not using config here, to avoid test import cycles.
Expand All @@ -135,9 +135,34 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) {
fileRemovalSem: semaphore.NewWeighted(5000),
}

cc := CacheConfig{diskCache: &c}

// The eviction callback deletes the file from disk.
// This function is only called while the lock is held
// by the current goroutine.
onEvict := func(key Key, value lruItem) {
ks := key.(string)
hash := ks[len(ks)-sha256.Size*2:]
var kind cache.EntryKind = cache.AC
if strings.HasPrefix(ks, "cas") {
kind = cache.CAS
} else if strings.HasPrefix(ks, "ac") {
kind = cache.AC
} else if strings.HasPrefix(ks, "raw") {
kind = cache.RAW
}

f := filepath.Join(dir, c.FileLocation(kind, value.legacy, hash, value.size, value.random))

// Run in a goroutine so we can release the lock sooner.
go c.removeFile(f)
}

c.lru = NewSizedLRU(maxSizeBytes, onEvict)

// Apply options.
for _, o := range opts {
err = o(c)
err = o(&cc)
if err != nil {
return nil, err
}
Expand All @@ -163,29 +188,6 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) {
}
}

// The eviction callback deletes the file from disk.
// This function is only called while the lock is held
// by the current goroutine.
onEvict := func(key Key, value lruItem) {
ks := key.(string)
hash := ks[len(ks)-sha256.Size*2:]
var kind cache.EntryKind = cache.AC
if strings.HasPrefix(ks, "cas") {
kind = cache.CAS
} else if strings.HasPrefix(ks, "ac") {
kind = cache.AC
} else if strings.HasPrefix(ks, "raw") {
kind = cache.RAW
}

f := filepath.Join(dir, c.FileLocation(kind, value.legacy, hash, value.size, value.random))

// Run in a goroutine so we can release the lock sooner.
go c.removeFile(f)
}

c.lru = NewSizedLRU(maxSizeBytes, onEvict)

err = c.migrateDirectories()
if err != nil {
return nil, fmt.Errorf("Attempting to migrate the old directory structure failed: %w", err)
Expand All @@ -195,10 +197,21 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (*Cache, error) {
return nil, fmt.Errorf("Loading of existing cache entries failed due to error: %w", err)
}

return c, nil
if cc.metrics == nil {
return &c, nil
}

cc.metrics.diskCache = &c

return cc.metrics, nil
Comment on lines +200 to +206
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this knowledge about metrics needed in disk.go?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC I added this to make enabling endpoint metrics an option, then I refactored to use a decorator and this was left as an internal package detail. This seems like a reasonable tradeoff as long as we don't have many decorators.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK!

}

// Non-test users must call this to expose metrics.
func (c *diskCache) RegisterMetrics() {
c.lru.RegisterMetrics()
}

func (c *Cache) removeFile(f string) {
func (c *diskCache) removeFile(f string) {
if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil {
log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f)
return
Expand All @@ -211,7 +224,7 @@ func (c *Cache) removeFile(f string) {
}
}

func (c *Cache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string, size int64) string {
func (c *diskCache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string, size int64) string {
if kind == cache.RAW {
return path.Join("raw.v2", hash[:2], hash)
}
Expand All @@ -227,7 +240,7 @@ func (c *Cache) FileLocationBase(kind cache.EntryKind, legacy bool, hash string,
return fmt.Sprintf("cas.v2/%s/%s-%d", hash[:2], hash, size)
}

func (c *Cache) FileLocation(kind cache.EntryKind, legacy bool, hash string, size int64, random string) string {
func (c *diskCache) FileLocation(kind cache.EntryKind, legacy bool, hash string, size int64, random string) string {
if kind == cache.RAW {
return path.Join("raw.v2", hash[:2], hash+"-"+random)
}
Expand All @@ -243,7 +256,7 @@ func (c *Cache) FileLocation(kind cache.EntryKind, legacy bool, hash string, siz
return fmt.Sprintf("cas.v2/%s/%s-%d-%s", hash[:2], hash, size, random)
}

func (c *Cache) migrateDirectories() error {
func (c *diskCache) migrateDirectories() error {
err := migrateDirectory(c.dir, cache.AC)
if err != nil {
return err
Expand Down Expand Up @@ -415,7 +428,7 @@ func migrateV1Subdir(oldDir string, destDir string, kind cache.EntryKind) error
// loadExistingFiles lists all files in the cache directory, and adds them to the
// LRU index so that they can be served. Files are sorted by access time first,
// so that the eviction behavior is preserved across server restarts.
func (c *Cache) loadExistingFiles() error {
func (c *diskCache) loadExistingFiles() error {
log.Printf("Loading existing files in %s.\n", c.dir)

// compressed CAS items: <hash>-<logical size>-<random digits/ascii letters>
Expand Down Expand Up @@ -521,7 +534,7 @@ func (c *Cache) loadExistingFiles() error {
// If `hash` is not the empty string, and the contents don't match it,
// a non-nil error is returned. All data will be read from `r` before
// this function returns.
func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) {
func (c *diskCache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) {
defer func() {
if r != nil {
_, _ = io.Copy(ioutil.Discard, r)
Expand Down Expand Up @@ -643,7 +656,7 @@ func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader)
return nil
}

func (c *Cache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string, size int64, f *os.File) (int64, error) {
func (c *diskCache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string, size int64, f *os.File) (int64, error) {
closeFile := true
defer func() {
if closeFile {
Expand Down Expand Up @@ -685,7 +698,7 @@ func (c *Cache) writeAndCloseFile(r io.Reader, kind cache.EntryKind, hash string
}

// This must be called when the lock is not held.
func (c *Cache) commit(key string, legacy bool, tempfile string, reservedSize int64, logicalSize int64, sizeOnDisk int64, random string) (unreserve bool, removeTempfile bool, err error) {
func (c *diskCache) commit(key string, legacy bool, tempfile string, reservedSize int64, logicalSize int64, sizeOnDisk int64, random string) (unreserve bool, removeTempfile bool, err error) {
unreserve = reservedSize > 0
removeTempfile = true

Expand Down Expand Up @@ -726,7 +739,7 @@ func (c *Cache) commit(key string, legacy bool, tempfile string, reservedSize in
// but that we can try the proxy backend.
//
// This function assumes that only CAS blobs are requested in zstd form.
func (c *Cache) availableOrTryProxy(kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, foundSize int64, tryProxy bool, err error) {
func (c *diskCache) availableOrTryProxy(kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, foundSize int64, tryProxy bool, err error) {
locked := true
c.mu.Lock()

Expand Down Expand Up @@ -836,27 +849,25 @@ var errOnlyCompressedCAS = &cache.Error{
// item is not found, the io.ReadCloser will be nil. If some error occurred
// when processing the request, then it is returned. Callers should provide
// the `size` of the item to be retrieved, or -1 if unknown.
func (c *Cache) Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) {
func (c *diskCache) Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) {
return c.get(ctx, kind, hash, size, offset, false)
}

// GetZstd is just like Get, except the data available from rc is zstandard
// compressed. Note that the returned `s` value still refers to the amount
// of data once it has been decompressed.
func (c *Cache) GetZstd(ctx context.Context, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) {
func (c *diskCache) GetZstd(ctx context.Context, hash string, size int64, offset int64) (rc io.ReadCloser, s int64, rErr error) {
return c.get(ctx, cache.CAS, hash, size, offset, true)
}

func (c *Cache) get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, s int64, rErr error) {
func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (rc io.ReadCloser, s int64, rErr error) {
// The hash format is checked properly in the http/grpc code.
// Just perform a simple/fast check here, to catch bad tests.
if len(hash) != sha256HashStrSize {
return nil, -1, badReqErr("Invalid hash size: %d, expected: %d", len(hash), sha256.Size)
}

if kind == cache.CAS && size <= 0 && hash == emptySha256 {
cacheHits.Inc()

if zstd {
return ioutil.NopCloser(bytes.NewReader(emptyZstdBlob)), 0, nil
}
Expand Down Expand Up @@ -917,12 +928,9 @@ func (c *Cache) get(ctx context.Context, kind cache.EntryKind, hash string, size
unreserve = true
}
if f != nil {
cacheHits.Inc()
return f, foundSize, nil
}

cacheMisses.Inc()

if !tryProxy {
return nil, -1, nil
}
Expand Down Expand Up @@ -1006,8 +1014,7 @@ func (c *Cache) get(ctx context.Context, kind cache.EntryKind, hash string, size
// one) will be checked.
//
// Callers should provide the `size` of the item, or -1 if unknown.
func (c *Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) {

func (c *diskCache) Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) {
// The hash format is checked properly in the http/grpc code.
// Just perform a simple/fast check here, to catch bad tests.
if len(hash) != sha256HashStrSize {
Expand Down Expand Up @@ -1043,14 +1050,14 @@ func (c *Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string,
}

// MaxSize returns the maximum cache size in bytes.
func (c *Cache) MaxSize() int64 {
func (c *diskCache) MaxSize() int64 {
// The underlying value is never modified, no need to lock.
return c.lru.MaxSize()
}

// Stats returns the current size of the cache in bytes, and the number of
// items stored in the cache.
func (c *Cache) Stats() (totalSize int64, reservedSize int64, numItems int, uncompressedSize int64) {
func (c *diskCache) Stats() (totalSize int64, reservedSize int64, numItems int, uncompressedSize int64) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -1074,8 +1081,7 @@ func ensureDirExists(path string) {
// value from the CAS if it and all its dependencies are also available. If
// not, nil values are returned. If something unexpected went wrong, return
// an error.
func (c *Cache) GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error) {

func (c *diskCache) GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error) {
rc, sizeBytes, err := c.Get(ctx, cache.AC, hash, -1, 0)
if rc != nil {
defer rc.Close()
Expand Down
Loading