diff --git a/pkg/storage/cache.go b/pkg/storage/cache.go index 60a6e7ad..39c3fc3f 100644 --- a/pkg/storage/cache.go +++ b/pkg/storage/cache.go @@ -12,11 +12,15 @@ import ( ) const ( - BlobsCache = "blobs" + // global bucket. + BlobsCache = "blobs" + // bucket where we store all blobs from storage(deduped blobs + original blob). + DedupedBucket = "deduped" + /* bucket where we store only the original/source blob (used by s3 to know which is the blob with content) + it should contain only one blob, this is the only place from which we'll get blobs. */ + OriginBucket = "origin" DBExtensionName = ".db" dbCacheLockCheckTimeout = 10 * time.Second - // always mark the first key inserted in the BlobsCache with a value. - firstKeyValue = "first" ) type Cache struct { @@ -26,6 +30,11 @@ type Cache struct { useRelPaths bool // weather or not to use relative paths, should be true for filesystem and false for s3 } +// Blob is a blob record. +type Blob struct { + Path string +} + func NewCache(rootDir string, name string, useRelPaths bool, log zlog.Logger) *Cache { dbPath := path.Join(rootDir, name+DBExtensionName) dbOpts := &bbolt.Options{ @@ -85,28 +94,46 @@ func (c *Cache) PutBlob(digest, path string) error { return err } - var value string + bucket, err := root.CreateBucketIfNotExists([]byte(digest)) + if err != nil { + // this is a serious failure + c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket") - bucket := root.Bucket([]byte(digest)) - if bucket == nil { - /* mark first key in bucket - in the context of s3 we need to know which blob is real - and we know that the first one is always the real, so mark them. - */ - value = firstKeyValue - bucket, err = root.CreateBucket([]byte(digest)) + return err + } + + // create nested deduped bucket where we store all the deduped blobs + original blob + deduped, err := bucket.CreateBucketIfNotExists([]byte(DedupedBucket)) + if err != nil { + // this is a serious failure + c.log.Error().Err(err).Str("bucket", DedupedBucket).Msg("unable to create a bucket") + + return err + } + + if err := deduped.Put([]byte(path), nil); err != nil { + c.log.Error().Err(err).Str("bucket", DedupedBucket).Str("value", path).Msg("unable to put record") + + return err + } + + // create origin bucket and insert only the original blob + origin := bucket.Bucket([]byte(OriginBucket)) + if origin == nil { + // if the bucket doesn't exist yet then 'path' is the original blob + origin, err := bucket.CreateBucket([]byte(OriginBucket)) if err != nil { // this is a serious failure - c.log.Error().Err(err).Str("bucket", digest).Msg("unable to create a bucket") + c.log.Error().Err(err).Str("bucket", OriginBucket).Msg("unable to create a bucket") return err } - } - if err := bucket.Put([]byte(path), []byte(value)); err != nil { - c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record") + if err := origin.Put([]byte(path), nil); err != nil { + c.log.Error().Err(err).Str("bucket", OriginBucket).Str("value", path).Msg("unable to put record") - return err + return err + } } return nil @@ -130,22 +157,10 @@ func (c *Cache) GetBlob(digest string) (string, error) { return err } - b := root.Bucket([]byte(digest)) - if b != nil { - if err := b.ForEach(func(k, v []byte) error { - // always return the key with 'first' value - if string(v) == firstKeyValue { - blobPath.WriteString(string(k)) - - return nil - } - - return nil - }); err != nil { - c.log.Error().Err(err).Msg("unable to access digest bucket") - - return err - } + bucket := root.Bucket([]byte(digest)) + if bucket != nil { + origin := bucket.Bucket([]byte(OriginBucket)) + blobPath.WriteString(string(c.getOne(origin))) return nil } @@ -169,12 +184,17 @@ func (c *Cache) HasBlob(digest, blob string) bool { return err } - b := root.Bucket([]byte(digest)) - if b == nil { + bucket := root.Bucket([]byte(digest)) + if bucket == nil { return errors.ErrCacheMiss } - if b.Get([]byte(blob)) == nil { + origin := bucket.Bucket([]byte(OriginBucket)) + if origin == nil { + return errors.ErrCacheMiss + } + + if origin.Get([]byte(blob)) == nil { return errors.ErrCacheMiss } @@ -186,6 +206,17 @@ func (c *Cache) HasBlob(digest, blob string) bool { return true } +func (c *Cache) getOne(bucket *bbolt.Bucket) []byte { + if bucket != nil { + cursor := bucket.Cursor() + k, _ := cursor.First() + + return k + } + + return nil +} + func (c *Cache) DeleteBlob(digest, path string) error { // use only relative (to rootDir) paths on blobs var err error @@ -211,28 +242,45 @@ func (c *Cache) DeleteBlob(digest, path string) error { return errors.ErrCacheMiss } - value := bucket.Get([]byte(path)) + deduped := bucket.Bucket([]byte(DedupedBucket)) + if deduped == nil { + return errors.ErrCacheMiss + } - if err := bucket.Delete([]byte(path)); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete") + if err := deduped.Delete([]byte(path)); err != nil { + c.log.Error().Err(err).Str("digest", digest).Str("bucket", DedupedBucket).Str("path", path).Msg("unable to delete") return err } - cur := bucket.Cursor() + origin := bucket.Bucket([]byte(OriginBucket)) + if origin != nil { + originBlob := c.getOne(origin) + if originBlob != nil { + if err := origin.Delete([]byte(path)); err != nil { + c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to delete") - key, _ := cur.First() - if key == nil { + return err + } + + // move next candidate to origin bucket, next GetKey will return this one and storage will move the content here + dedupedBlob := c.getOne(deduped) + if dedupedBlob != nil { + if err := origin.Put(dedupedBlob, nil); err != nil { + c.log.Error().Err(err).Str("digest", digest).Str("bucket", OriginBucket).Str("path", path).Msg("unable to put") + + return err + } + } + } + } + + // if no key in origin bucket then digest bucket is empty, remove it + k := c.getOne(origin) + if k == nil { c.log.Debug().Str("digest", digest).Str("path", path).Msg("deleting empty bucket") if err := root.DeleteBucket([]byte(digest)); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete") - - return err - } - // if deleted key has value 'first' then move this value to the next key - } else if string(value) == firstKeyValue { - if err := bucket.Put(key, value); err != nil { - c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record") + c.log.Error().Err(err).Str("digest", digest).Str("bucket", digest).Str("path", path).Msg("unable to delete") return err } diff --git a/pkg/storage/local_test.go b/pkg/storage/local_test.go index 252a2286..d510a17b 100644 --- a/pkg/storage/local_test.go +++ b/pkg/storage/local_test.go @@ -1056,6 +1056,34 @@ func TestDedupeLinks(t *testing.T) { fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) So(err, ShouldBeNil) So(os.SameFile(fi1, fi2), ShouldBeTrue) + + Convey("storage and cache inconsistency", func() { + // delete blobs + err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + + err := os.Remove(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + // now cache is inconsistent with storage (blobs present in cache but not in storage) + upload, err = imgStore.NewBlobUpload("dedupe3") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content = []byte("test-data3") + buf = bytes.NewBuffer(content) + buflen = buf.Len() + digest = godigest.FromBytes(content) + blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + blobDigest2 := strings.Split(digest.String(), ":")[1] + So(blobDigest2, ShouldNotBeEmpty) + + err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest.String()) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + }) }) }