diff --git a/pkg/storage/cache/boltdb.go b/pkg/storage/cache/boltdb.go index 5e00f1bd..dfbf337d 100644 --- a/pkg/storage/cache/boltdb.go +++ b/pkg/storage/cache/boltdb.go @@ -223,9 +223,9 @@ func (d *BoltDBDriver) GetAllBlobs(digest godigest.Digest) ([]string, error) { blobPaths = append(blobPaths, duplicateBlob) } } - - return nil } + + return nil } return zerr.ErrCacheMiss diff --git a/pkg/storage/cache/boltdb_test.go b/pkg/storage/cache/boltdb_test.go index bd6f4b4c..409f0be7 100644 --- a/pkg/storage/cache/boltdb_test.go +++ b/pkg/storage/cache/boltdb_test.go @@ -145,13 +145,17 @@ func TestBoltDBCache(t *testing.T) { err = cacheDriver.PutBlob("digest", "first") So(err, ShouldBeNil) + blobs, err := cacheDriver.GetAllBlobs("digest") + So(err, ShouldBeNil) + So(blobs, ShouldResemble, []string{"first"}) + err = cacheDriver.PutBlob("digest", "second") So(err, ShouldBeNil) err = cacheDriver.PutBlob("digest", "third") So(err, ShouldBeNil) - blobs, err := cacheDriver.GetAllBlobs("digest") + blobs, err = cacheDriver.GetAllBlobs("digest") So(err, ShouldBeNil) // "first" is the original, "second" and "third" are duplicates diff --git a/pkg/storage/cache/dynamodb.go b/pkg/storage/cache/dynamodb.go index a0221123..43ae71e8 100644 --- a/pkg/storage/cache/dynamodb.go +++ b/pkg/storage/cache/dynamodb.go @@ -2,6 +2,7 @@ package cache import ( "context" + "errors" "slices" "strings" @@ -187,14 +188,44 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error { return zerr.ErrEmptyValue } - originBlob, _ := d.GetBlob(digest) - if originBlob == "" { - // first entry, so add original blob only - if err := d.putOriginBlob(digest, path); err != nil { - return err + var originBlob string + + for { + var err error + + originBlob, err = d.GetBlob(digest) + if err != nil { + if !errors.Is(err, zerr.ErrCacheMiss) { + return err + } + + // first entry, so add original blob only + if err := d.putOriginBlob(digest, path); err != nil { + var conditionalErr *types.ConditionalCheckFailedException + if errors.As(err, &conditionalErr) { + continue + } + + return err + } + + return nil } - return nil + if originBlob == "" { + if err := d.putOriginBlob(digest, path); err != nil { + var conditionalErr *types.ConditionalCheckFailedException + if errors.As(err, &conditionalErr) { + continue + } + + return err + } + + return nil + } + + break } // if same as original, this is idempotent @@ -206,7 +237,7 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error { expression := "ADD DuplicateBlobPath :i" attrPath := types.AttributeValueMemberSS{Value: []string{path}} - if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil { + if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}, nil); err != nil { d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to put blob") return err @@ -276,7 +307,7 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error { expression := "DELETE DuplicateBlobPath :i" attrPath := types.AttributeValueMemberSS{Value: []string{path}} - if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil { + if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}, nil); err != nil { d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to delete") return err @@ -303,11 +334,21 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error { } // no more duplicates, remove the original + conditionExpression := "attribute_not_exists(DuplicateBlobPath) OR size(DuplicateBlobPath) = :zero" + zero := types.AttributeValueMemberN{Value: "0"} + _, err = d.client.DeleteItem(context.TODO(), &dynamodb.DeleteItemInput{ - Key: marshaledKey, - TableName: &d.tableName, + Key: marshaledKey, + TableName: &d.tableName, + ConditionExpression: &conditionExpression, + ExpressionAttributeValues: map[string]types.AttributeValue{":zero": &zero}, }) if err != nil { + var conditionalErr *types.ConditionalCheckFailedException + if errors.As(err, &conditionalErr) { + return nil + } + d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to delete") return err @@ -363,8 +404,13 @@ func (d *DynamoDBDriver) GetDuplicateBlob(digest godigest.Digest) (string, error func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) error { expression := "SET OriginalBlobPath = :s" attrPath := types.AttributeValueMemberS{Value: path} + emptyPath := types.AttributeValueMemberS{Value: ""} + conditionExpression := "attribute_not_exists(OriginalBlobPath) OR OriginalBlobPath = :empty" - if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":s": &attrPath}); err != nil { + if err := d.updateItem(digest, expression, map[string]types.AttributeValue{ + ":empty": &emptyPath, + ":s": &attrPath, + }, &conditionExpression); err != nil { d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to put original blob") return err @@ -374,7 +420,7 @@ func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) erro } func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string, - expressionAttVals map[string]types.AttributeValue, + expressionAttVals map[string]types.AttributeValue, conditionExpression *string, ) error { marshaledKey, _ := attributevalue.MarshalMap(map[string]any{"Digest": digest.String()}) @@ -383,6 +429,7 @@ func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string, TableName: &d.tableName, UpdateExpression: &expression, ExpressionAttributeValues: expressionAttVals, + ConditionExpression: conditionExpression, }) return err diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 8ffde2c6..f37d9f2e 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -1456,6 +1456,8 @@ func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo blobInfo, err := is.storeDriver.Stat(dstRecord) if err != nil { + statErr := err + is.log.Error().Err(err).Str("blobPath", dstRecord).Str("component", "dedupe").Msg("failed to stat") // the actual blob on disk may have been removed by GC, so sync the cache err := is.cache.DeleteBlob(dstDigest, dstRecord) @@ -1467,6 +1469,21 @@ func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo return err } + updatedRecord, err := is.cache.GetBlob(dstDigest) + if err := inject.Error(err); err != nil && !errors.Is(err, zerr.ErrCacheMiss) { + is.log.Error().Err(err).Str("blobPath", dst).Str("component", "dedupe").Msg("failed to lookup blob record") + + return err + } + + if is.cache.UsesRelativePaths() && !path.IsAbs(updatedRecord) { + updatedRecord = path.Join(is.rootDir, updatedRecord) + } + + if updatedRecord == dstRecord { + return statErr + } + continue } diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 573c58c1..08b95923 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -766,6 +766,40 @@ func TestStorageCacheErrors(t *testing.T) { _, _, err = imgStore.FullBlobUpload(dedupedRepo, bytes.NewReader(cblob), cdigest) So(err, ShouldNotBeNil) }) + + Convey("DedupeBlob returns when the cached global blob is stale", t, func() { + log := zlog.NewTestLogger() + metrics := monitoring.NewMetricsServer(false, log) + + dir := t.TempDir() + + cacheDriver, err := storage.Create("boltdb", cache.BoltDBDriverParameters{ + RootDir: dir, + Name: "cache", + UseRelPaths: true, + }, log) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver, nil, nil) + + cblob, cdigest := GetRandomImageConfig() + + _, _, err = imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + + _, _, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest) + So(err, ShouldBeNil) + + globalBlobPath := imgStore.BlobPath(storageConstants.GlobalBlobsRepo, cdigest) + err = os.Remove(globalBlobPath) + So(err, ShouldBeNil) + + err = imgStore.InitRepo("dedupe3") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("", cdigest, "dedupe3", imgStore.BlobPath("dedupe3", cdigest)) + So(err, ShouldNotBeNil) + }) } func FuzzDedupeBlob(f *testing.F) {