mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 13:37:57 +08:00
fix(storage): address blobstore cache correctness issues
This commit is contained in:
committed by
GitHub
parent
48d44ce2a5
commit
40512e4a41
Vendored
+2
-2
@@ -223,9 +223,9 @@ func (d *BoltDBDriver) GetAllBlobs(digest godigest.Digest) ([]string, error) {
|
||||
blobPaths = append(blobPaths, duplicateBlob)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return zerr.ErrCacheMiss
|
||||
|
||||
Vendored
+5
-1
@@ -145,13 +145,17 @@ func TestBoltDBCache(t *testing.T) {
|
||||
err = cacheDriver.PutBlob("digest", "first")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
blobs, err := cacheDriver.GetAllBlobs("digest")
|
||||
So(err, ShouldBeNil)
|
||||
So(blobs, ShouldResemble, []string{"first"})
|
||||
|
||||
err = cacheDriver.PutBlob("digest", "second")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = cacheDriver.PutBlob("digest", "third")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
blobs, err := cacheDriver.GetAllBlobs("digest")
|
||||
blobs, err = cacheDriver.GetAllBlobs("digest")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// "first" is the original, "second" and "third" are duplicates
|
||||
|
||||
Vendored
+59
-12
@@ -2,6 +2,7 @@ package cache
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"slices"
|
||||
"strings"
|
||||
|
||||
@@ -187,14 +188,44 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
|
||||
return zerr.ErrEmptyValue
|
||||
}
|
||||
|
||||
originBlob, _ := d.GetBlob(digest)
|
||||
if originBlob == "" {
|
||||
// first entry, so add original blob only
|
||||
if err := d.putOriginBlob(digest, path); err != nil {
|
||||
return err
|
||||
var originBlob string
|
||||
|
||||
for {
|
||||
var err error
|
||||
|
||||
originBlob, err = d.GetBlob(digest)
|
||||
if err != nil {
|
||||
if !errors.Is(err, zerr.ErrCacheMiss) {
|
||||
return err
|
||||
}
|
||||
|
||||
// first entry, so add original blob only
|
||||
if err := d.putOriginBlob(digest, path); err != nil {
|
||||
var conditionalErr *types.ConditionalCheckFailedException
|
||||
if errors.As(err, &conditionalErr) {
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
if originBlob == "" {
|
||||
if err := d.putOriginBlob(digest, path); err != nil {
|
||||
var conditionalErr *types.ConditionalCheckFailedException
|
||||
if errors.As(err, &conditionalErr) {
|
||||
continue
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
// if same as original, this is idempotent
|
||||
@@ -206,7 +237,7 @@ func (d *DynamoDBDriver) PutBlob(digest godigest.Digest, path string) error {
|
||||
expression := "ADD DuplicateBlobPath :i"
|
||||
attrPath := types.AttributeValueMemberSS{Value: []string{path}}
|
||||
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil {
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}, nil); err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to put blob")
|
||||
|
||||
return err
|
||||
@@ -276,7 +307,7 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {
|
||||
expression := "DELETE DuplicateBlobPath :i"
|
||||
attrPath := types.AttributeValueMemberSS{Value: []string{path}}
|
||||
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}); err != nil {
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":i": &attrPath}, nil); err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to delete")
|
||||
|
||||
return err
|
||||
@@ -303,11 +334,21 @@ func (d *DynamoDBDriver) DeleteBlob(digest godigest.Digest, path string) error {
|
||||
}
|
||||
|
||||
// no more duplicates, remove the original
|
||||
conditionExpression := "attribute_not_exists(DuplicateBlobPath) OR size(DuplicateBlobPath) = :zero"
|
||||
zero := types.AttributeValueMemberN{Value: "0"}
|
||||
|
||||
_, err = d.client.DeleteItem(context.TODO(), &dynamodb.DeleteItemInput{
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
Key: marshaledKey,
|
||||
TableName: &d.tableName,
|
||||
ConditionExpression: &conditionExpression,
|
||||
ExpressionAttributeValues: map[string]types.AttributeValue{":zero": &zero},
|
||||
})
|
||||
if err != nil {
|
||||
var conditionalErr *types.ConditionalCheckFailedException
|
||||
if errors.As(err, &conditionalErr) {
|
||||
return nil
|
||||
}
|
||||
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to delete")
|
||||
|
||||
return err
|
||||
@@ -363,8 +404,13 @@ func (d *DynamoDBDriver) GetDuplicateBlob(digest godigest.Digest) (string, error
|
||||
func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) error {
|
||||
expression := "SET OriginalBlobPath = :s"
|
||||
attrPath := types.AttributeValueMemberS{Value: path}
|
||||
emptyPath := types.AttributeValueMemberS{Value: ""}
|
||||
conditionExpression := "attribute_not_exists(OriginalBlobPath) OR OriginalBlobPath = :empty"
|
||||
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{":s": &attrPath}); err != nil {
|
||||
if err := d.updateItem(digest, expression, map[string]types.AttributeValue{
|
||||
":empty": &emptyPath,
|
||||
":s": &attrPath,
|
||||
}, &conditionExpression); err != nil {
|
||||
d.log.Error().Err(err).Str("digest", digest.String()).Str("path", path).Msg("failed to put original blob")
|
||||
|
||||
return err
|
||||
@@ -374,7 +420,7 @@ func (d *DynamoDBDriver) putOriginBlob(digest godigest.Digest, path string) erro
|
||||
}
|
||||
|
||||
func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string,
|
||||
expressionAttVals map[string]types.AttributeValue,
|
||||
expressionAttVals map[string]types.AttributeValue, conditionExpression *string,
|
||||
) error {
|
||||
marshaledKey, _ := attributevalue.MarshalMap(map[string]any{"Digest": digest.String()})
|
||||
|
||||
@@ -383,6 +429,7 @@ func (d *DynamoDBDriver) updateItem(digest godigest.Digest, expression string,
|
||||
TableName: &d.tableName,
|
||||
UpdateExpression: &expression,
|
||||
ExpressionAttributeValues: expressionAttVals,
|
||||
ConditionExpression: conditionExpression,
|
||||
})
|
||||
|
||||
return err
|
||||
|
||||
@@ -1456,6 +1456,8 @@ func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo
|
||||
|
||||
blobInfo, err := is.storeDriver.Stat(dstRecord)
|
||||
if err != nil {
|
||||
statErr := err
|
||||
|
||||
is.log.Error().Err(err).Str("blobPath", dstRecord).Str("component", "dedupe").Msg("failed to stat")
|
||||
// the actual blob on disk may have been removed by GC, so sync the cache
|
||||
err := is.cache.DeleteBlob(dstDigest, dstRecord)
|
||||
@@ -1467,6 +1469,21 @@ func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dstRepo
|
||||
return err
|
||||
}
|
||||
|
||||
updatedRecord, err := is.cache.GetBlob(dstDigest)
|
||||
if err := inject.Error(err); err != nil && !errors.Is(err, zerr.ErrCacheMiss) {
|
||||
is.log.Error().Err(err).Str("blobPath", dst).Str("component", "dedupe").Msg("failed to lookup blob record")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
if is.cache.UsesRelativePaths() && !path.IsAbs(updatedRecord) {
|
||||
updatedRecord = path.Join(is.rootDir, updatedRecord)
|
||||
}
|
||||
|
||||
if updatedRecord == dstRecord {
|
||||
return statErr
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -766,6 +766,40 @@ func TestStorageCacheErrors(t *testing.T) {
|
||||
_, _, err = imgStore.FullBlobUpload(dedupedRepo, bytes.NewReader(cblob), cdigest)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
Convey("DedupeBlob returns when the cached global blob is stale", t, func() {
|
||||
log := zlog.NewTestLogger()
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
|
||||
dir := t.TempDir()
|
||||
|
||||
cacheDriver, err := storage.Create("boltdb", cache.BoltDBDriverParameters{
|
||||
RootDir: dir,
|
||||
Name: "cache",
|
||||
UseRelPaths: true,
|
||||
}, log)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver, nil, nil)
|
||||
|
||||
cblob, cdigest := GetRandomImageConfig()
|
||||
|
||||
_, _, err = imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
globalBlobPath := imgStore.BlobPath(storageConstants.GlobalBlobsRepo, cdigest)
|
||||
err = os.Remove(globalBlobPath)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.InitRepo("dedupe3")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = imgStore.DedupeBlob("", cdigest, "dedupe3", imgStore.BlobPath("dedupe3", cdigest))
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
}
|
||||
|
||||
func FuzzDedupeBlob(f *testing.F) {
|
||||
|
||||
Reference in New Issue
Block a user