diff --git a/pkg/storage/cache.go b/pkg/storage/cache.go index 0b054a87..60a6e7ad 100644 --- a/pkg/storage/cache.go +++ b/pkg/storage/cache.go @@ -15,6 +15,8 @@ const ( BlobsCache = "blobs" DBExtensionName = ".db" dbCacheLockCheckTimeout = 10 * time.Second + // always mark the first key inserted in the BlobsCache with a value. + firstKeyValue = "first" ) type Cache struct { @@ -24,11 +26,6 @@ type Cache struct { useRelPaths bool // weather or not to use relative paths, should be true for filesystem and false for s3 } -// Blob is a blob record. -type Blob struct { - Path string -} - func NewCache(rootDir string, name string, useRelPaths bool, log zlog.Logger) *Cache { dbPath := path.Join(rootDir, name+DBExtensionName) dbOpts := &bbolt.Options{ @@ -88,15 +85,25 @@ func (c *Cache) PutBlob(digest, path string) error { return err } - bucket, err := root.CreateBucketIfNotExists([]byte(digest)) - if err != nil { - // this is a serious failure - c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket") + var value string - return err + bucket := root.Bucket([]byte(digest)) + if bucket == nil { + /* mark first key in bucket + in the context of s3 we need to know which blob is real + and we know that the first one is always the real, so mark them. + */ + value = firstKeyValue + bucket, err = root.CreateBucket([]byte(digest)) + if err != nil { + // this is a serious failure + c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket") + + return err + } } - if err := bucket.Put([]byte(path), nil); err != nil { + if err := bucket.Put([]byte(path), []byte(value)); err != nil { c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record") return err @@ -125,10 +132,20 @@ func (c *Cache) GetBlob(digest string) (string, error) { b := root.Bucket([]byte(digest)) if b != nil { - // get first key - c := b.Cursor() - k, _ := c.First() - blobPath.WriteString(string(k)) + if err := b.ForEach(func(k, v []byte) error { + // always return the key with 'first' value + if string(v) == firstKeyValue { + blobPath.WriteString(string(k)) + + return nil + } + + return nil + }); err != nil { + c.log.Error().Err(err).Msg("unable to access digest bucket") + + return err + } return nil } @@ -194,6 +211,8 @@ func (c *Cache) DeleteBlob(digest, path string) error { return errors.ErrCacheMiss } + value := bucket.Get([]byte(path)) + if err := bucket.Delete([]byte(path)); err != nil { c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete") @@ -202,12 +221,19 @@ func (c *Cache) DeleteBlob(digest, path string) error { cur := bucket.Cursor() - k, _ := cur.First() - if k == nil { + key, _ := cur.First() + if key == nil { c.log.Debug().Str("digest", digest).Str("path", path).Msg("deleting empty bucket") if err := root.DeleteBucket([]byte(digest)); err != nil { c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete") + return err + } + // if deleted key has value 'first' then move this value to the next key + } else if string(value) == firstKeyValue { + if err := bucket.Put(key, value); err != nil { + c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record") + return err } } diff --git a/pkg/storage/s3/s3.go b/pkg/storage/s3/s3.go index 44f290b1..2974f4b3 100644 --- a/pkg/storage/s3/s3.go +++ b/pkg/storage/s3/s3.go @@ -1273,10 +1273,6 @@ retry: return err } - if dstRecord == dst { - is.log.Warn().Msg("FOUND equal dsts") - } - // prevent overwrite original blob if fileInfo == nil && dstRecord != dst { // put empty file so that we are compliant with oci layout, this will act as a deduped blob @@ -1286,8 +1282,12 @@ retry: return err } - } else { - is.log.Warn().Msg("prevent overwrite") + + if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil { + is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") + + return err + } } // remove temp blobupload @@ -1445,33 +1445,37 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.ReadCloser, } // is a 'deduped' blob - if binfo.Size() == 0 && is.cache != nil { + if binfo.Size() == 0 { // Check blobs in cache dstRecord, err := is.checkCacheBlob(digest) - if err == nil { - binfo, err := is.store.Stat(context.Background(), dstRecord) - if err != nil { - is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob") + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found") - // the actual blob on disk may have been removed by GC, so sync the cache - if err := is.cache.DeleteBlob(digest, dstRecord); err != nil { - is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record") + return nil, -1, zerr.ErrBlobNotFound + } - return nil, -1, err - } + binfo, err := is.store.Stat(context.Background(), dstRecord) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob") - return nil, -1, zerr.ErrBlobNotFound - } - - blobReadCloser, err := is.store.Reader(context.Background(), dstRecord, 0) - if err != nil { - is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") + // the actual blob on disk may have been removed by GC, so sync the cache + if err := is.cache.DeleteBlob(digest, dstRecord); err != nil { + is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record") return nil, -1, err } - return blobReadCloser, binfo.Size(), nil + return nil, -1, zerr.ErrBlobNotFound } + + blobReadCloser, err := is.store.Reader(context.Background(), dstRecord, 0) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") + + return nil, -1, err + } + + return blobReadCloser, binfo.Size(), nil } // The caller function is responsible for calling Close() diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 041d59b6..8b8894db 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -1649,6 +1649,22 @@ func TestS3DedupeErr(t *testing.T) { So(err, ShouldNotBeNil) }) + Convey("Test DedupeBlob - error on cache.PutBlob()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return nil, nil + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") + err := imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("", digest, "") + So(err, ShouldNotBeNil) + }) + Convey("Test DedupeBlob - error on store.Delete()", t, func(c C) { tdir := t.TempDir() imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ @@ -1812,6 +1828,26 @@ func TestS3DedupeErr(t *testing.T) { So(err, ShouldNotBeNil) }) + Convey("Test GetBlob() - error on checkCacheBlob()", t, func(c C) { + tdir := t.TempDir() + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, + "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + StatFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return &FileInfoMock{ + SizeFn: func() int64 { + return 0 + }, + }, nil + }, + }) + + _, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldNotBeNil) + }) + Convey("Test DeleteBlob() - error on store.Move()", t, func(c C) { tdir := t.TempDir() hash := "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc"