feat(storage): rebuild storage(s3/local) dedupe index when switching dedupe status (#1062)

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu
2023-04-07 19:49:24 +03:00
committed by GitHub
parent 96232bb11c
commit f35ff53146
23 changed files with 2520 additions and 203 deletions
+11 -3
View File
@@ -22,8 +22,9 @@ type BoltDBDriver struct {
}
type BoltDBDriverParameters struct {
RootDir, Name string
UseRelPaths bool
RootDir string
Name string
UseRelPaths bool
}
func NewBoltDBCache(parameters interface{}, log zlog.Logger) Cache {
@@ -206,10 +207,17 @@ func (d *BoltDBDriver) HasBlob(digest godigest.Digest, blob string) bool {
return errors.ErrCacheMiss
}
if origin.Get([]byte(blob)) == nil {
deduped := bucket.Bucket([]byte(constants.DuplicatesBucket))
if deduped == nil {
return errors.ErrCacheMiss
}
if origin.Get([]byte(blob)) == nil {
if deduped.Get([]byte(blob)) == nil {
return errors.ErrCacheMiss
}
}
return nil
}); err != nil {
return false
+86
View File
@@ -16,6 +16,7 @@ import (
"github.com/sigstore/cosign/pkg/oci/remote"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/scheduler"
storageConstants "zotregistry.io/zot/pkg/storage/constants"
)
@@ -789,3 +790,88 @@ func CheckIsImageSignature(repoName string, manifestBlob []byte, reference strin
return false, "", "", nil
}
/*
DedupeTaskGenerator takes all blobs paths found in the imagestore and groups them by digest
for each digest and based on the dedupe value it will dedupe or restore deduped blobs to the original state(undeduped)\
by creating a task for each digest and pushing it to the task scheduler.
*/
type DedupeTaskGenerator struct {
ImgStore ImageStore
// storage dedupe value
Dedupe bool
// store blobs paths grouped by digest
digest godigest.Digest
duplicateBlobs []string
/* store processed digest, used for iterating duplicateBlobs one by one
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
done bool
Log zerolog.Logger
}
func (gen *DedupeTaskGenerator) GenerateTask() (scheduler.Task, error) {
var err error
// get all blobs from imageStore and group them by digest
gen.digest, gen.duplicateBlobs, err = gen.ImgStore.GetNextDigestWithBlobPaths(gen.lastDigests)
if err != nil {
gen.Log.Error().Err(err).Msg("dedupe rebuild: failed to get next digest")
return nil, err
}
// if no digests left, then mark the task generator as done
if gen.digest == "" {
gen.Log.Info().Msg("dedupe rebuild: finished")
gen.done = true
return nil, nil
}
// mark digest as processed before running its task
gen.lastDigests = append(gen.lastDigests, gen.digest)
// generate rebuild dedupe task for this digest
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
}
func (gen *DedupeTaskGenerator) IsDone() bool {
return gen.done
}
func (gen *DedupeTaskGenerator) Reset() {
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.digest = ""
gen.done = false
}
type dedupeTask struct {
imgStore ImageStore
// digest of duplicateBLobs
digest godigest.Digest
// blobs paths with the same digest ^
duplicateBlobs []string
dedupe bool
log zerolog.Logger
}
func newDedupeTask(imgStore ImageStore, digest godigest.Digest, dedupe bool,
duplicateBlobs []string, log zerolog.Logger,
) *dedupeTask {
return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log}
}
func (dt *dedupeTask) DoWork() error {
// run task
err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs)
if err != nil {
// log it
dt.log.Error().Err(err).Msgf("rebuild dedupe: failed to rebuild digest %s", dt.digest.String())
}
return err
}
+164
View File
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
@@ -1776,3 +1777,166 @@ func newGCTask(imgStore *ImageStoreLocal, repo string) *gcTask {
func (gcT *gcTask) DoWork() error {
return gcT.imgStore.RunGCRepo(gcT.repo)
}
func (is *ImageStoreLocal) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest,
) (godigest.Digest, []string, error) {
var lockLatency time.Time
dir := is.rootDir
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
var duplicateBlobs []string
var digest godigest.Digest
err := filepath.WalkDir(dir, func(path string, info fs.DirEntry, err error) error {
if err != nil {
is.log.Warn().Err(err).Msg("unable to walk dir, skipping it")
// skip files/dirs which can't be walked
return filepath.SkipDir
}
if info.IsDir() {
return nil
}
blobDigest := godigest.NewDigestFromEncoded("sha256", info.Name())
if err := blobDigest.Validate(); err != nil {
return nil //nolint:nilerr // ignore files which are not blobs
}
if digest == "" && !common.DContains(lastDigests, blobDigest) {
digest = blobDigest
}
if blobDigest == digest {
duplicateBlobs = append(duplicateBlobs, path)
}
return nil
})
return digest, duplicateBlobs, err
}
func (is *ImageStoreLocal) dedupeBlobs(digest godigest.Digest, duplicateBlobs []string) error {
if fmt.Sprintf("%v", is.cache) == fmt.Sprintf("%v", nil) {
is.log.Error().Err(zerr.ErrDedupeRebuild).Msg("no cache driver found, can not dedupe blobs")
return zerr.ErrDedupeRebuild
}
is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest")
var originalBlob string
var originalBlobFi fs.FileInfo
var err error
// rebuild from dedupe false to true
for _, blobPath := range duplicateBlobs {
/* for local storage, because we use hard links, we can assume that any blob can be original
so we skip the first one and hard link the rest of them with the first*/
if originalBlob == "" {
originalBlob = blobPath
originalBlobFi, err = os.Stat(originalBlob)
if err != nil {
is.log.Error().Err(err).Str("path", originalBlob).Msg("rebuild dedupe: failed to stat blob")
return err
}
// cache it
if ok := is.cache.HasBlob(digest, blobPath); !ok {
if err := is.cache.PutBlob(digest, blobPath); err != nil {
return err
}
}
continue
}
binfo, err := os.Stat(blobPath)
if err != nil {
is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob")
return err
}
// dedupe blob
if !os.SameFile(originalBlobFi, binfo) {
// we should link to a temp file instead of removing blob and then linking
// to make this more atomic
uuid, err := guuid.NewV4()
if err != nil {
return err
}
// put temp blob in <repo>/.uploads dir
tempLinkBlobDir := path.Join(strings.Replace(blobPath, path.Join("blobs/sha256", binfo.Name()), "", 1),
storageConstants.BlobUploadDir)
if err := os.MkdirAll(tempLinkBlobDir, DefaultDirPerms); err != nil {
is.log.Error().Err(err).Str("dir", tempLinkBlobDir).Msg("rebuild dedupe: unable to mkdir")
return err
}
tempLinkBlobPath := path.Join(tempLinkBlobDir, uuid.String())
if err := os.Link(originalBlob, tempLinkBlobPath); err != nil {
is.log.Error().Err(err).Str("src", originalBlob).
Str("dst", tempLinkBlobPath).Msg("rebuild dedupe: unable to hard link")
return err
}
if err := os.Rename(tempLinkBlobPath, blobPath); err != nil {
is.log.Error().Err(err).Str("blobPath", blobPath).Msg("rebuild dedupe: unable to rename temp link")
return err
}
// cache it
if ok := is.cache.HasBlob(digest, blobPath); !ok {
if err := is.cache.PutBlob(digest, blobPath); err != nil {
return err
}
}
}
}
is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest finished successfully")
return nil
}
func (is *ImageStoreLocal) RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error {
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
if dedupe {
return is.dedupeBlobs(digest, duplicateBlobs)
}
// otherwise noop
return nil
}
func (is *ImageStoreLocal) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) {
// for local storage no need to undedupe blobs
if is.dedupe {
generator := &storage.DedupeTaskGenerator{
ImgStore: is,
Dedupe: is.dedupe,
Log: is.log,
}
sch.SubmitGenerator(generator, interval, scheduler.HighPriority)
}
}
+292 -138
View File
@@ -2,6 +2,7 @@ package local_test
import (
"bytes"
"context"
"crypto/rand"
_ "crypto/sha256"
"encoding/json"
@@ -28,11 +29,13 @@ import (
"zotregistry.io/zot/pkg/common"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/cache"
storageConstants "zotregistry.io/zot/pkg/storage/constants"
"zotregistry.io/zot/pkg/storage/local"
"zotregistry.io/zot/pkg/test"
"zotregistry.io/zot/pkg/test/mocks"
)
const (
@@ -40,6 +43,18 @@ const (
repoName = "test"
)
var errCache = errors.New("new cache error")
func runAndGetScheduler() (*scheduler.Scheduler, context.CancelFunc) {
taskScheduler := scheduler.NewScheduler(log.Logger{})
taskScheduler.RateLimit = 50 * time.Millisecond
ctx, cancel := context.WithCancel(context.Background())
taskScheduler.RunScheduler(ctx)
return taskScheduler, cancel
}
func TestStorageFSAPIs(t *testing.T) {
dir := t.TempDir()
@@ -1066,155 +1081,98 @@ func FuzzRunGCRepo(f *testing.F) {
}
func TestDedupeLinks(t *testing.T) {
dir := t.TempDir()
testCases := []struct {
dedupe bool
expected bool
}{
{
dedupe: true,
expected: true,
},
{
dedupe: false,
expected: false,
},
}
log := log.Logger{Logger: zerolog.New(os.Stdout)}
metrics := monitoring.NewMetricsServer(false, log)
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: dir,
Name: "cache",
UseRelPaths: true,
}, log)
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
true, true, log, metrics, nil, cacheDriver)
Convey("Dedupe", t, func(c C) {
// manifest1
upload, err := imgStore.NewBlobUpload("dedupe1")
So(err, ShouldBeNil)
So(upload, ShouldNotBeEmpty)
for _, testCase := range testCases {
dir := t.TempDir()
content := []byte("test-data3")
buf := bytes.NewBuffer(content)
buflen := buf.Len()
digest := godigest.FromBytes(content)
blob, err := imgStore.PutBlobChunkStreamed("dedupe1", upload, buf)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
blobDigest1 := strings.Split(digest.String(), ":")[1]
So(blobDigest1, ShouldNotBeEmpty)
cacheDriver, _ := storage.Create("boltdb", cache.BoltDBDriverParameters{
RootDir: dir,
Name: "cache",
UseRelPaths: true,
}, log)
err = imgStore.FinishBlobUpload("dedupe1", upload, buf, digest)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
testCase.dedupe, true, log, metrics, nil, cacheDriver)
_, _, err = imgStore.CheckBlob("dedupe1", digest)
So(err, ShouldBeNil)
Convey(fmt.Sprintf("Dedupe %t", testCase.dedupe), t, func(c C) {
// manifest1
upload, err := imgStore.NewBlobUpload("dedupe1")
So(err, ShouldBeNil)
So(upload, ShouldNotBeEmpty)
blobrc, _, err := imgStore.GetBlob("dedupe1", digest, "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
err = blobrc.Close()
So(err, ShouldBeNil)
content := []byte("test-data3")
buf := bytes.NewBuffer(content)
buflen := buf.Len()
digest := godigest.FromBytes(content)
blob, err := imgStore.PutBlobChunkStreamed("dedupe1", upload, buf)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
blobDigest1 := strings.Split(digest.String(), ":")[1]
So(blobDigest1, ShouldNotBeEmpty)
cblob, cdigest := test.GetRandomImageConfig()
_, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest)
So(err, ShouldBeNil)
So(clen, ShouldEqual, len(cblob))
hasBlob, _, err := imgStore.CheckBlob("dedupe1", cdigest)
So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true)
err = imgStore.FinishBlobUpload("dedupe1", upload, buf, digest)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
manifest := ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(buflen),
},
},
}
manifest.SchemaVersion = 2
manifestBuf, err := json.Marshal(manifest)
So(err, ShouldBeNil)
digest = godigest.FromBytes(manifestBuf)
_, err = imgStore.PutImageManifest("dedupe1", digest.String(),
ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil)
_, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String())
So(err, ShouldBeNil)
// manifest2
upload, err = imgStore.NewBlobUpload("dedupe2")
So(err, ShouldBeNil)
So(upload, ShouldNotBeEmpty)
content = []byte("test-data3")
buf = bytes.NewBuffer(content)
buflen = buf.Len()
digest = godigest.FromBytes(content)
blob, err = imgStore.PutBlobChunkStreamed("dedupe2", upload, buf)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
blobDigest2 := strings.Split(digest.String(), ":")[1]
So(blobDigest2, ShouldNotBeEmpty)
err = imgStore.FinishBlobUpload("dedupe2", upload, buf, digest)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
_, _, err = imgStore.CheckBlob("dedupe2", digest)
So(err, ShouldBeNil)
blobrc, _, err = imgStore.GetBlob("dedupe2", digest, "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
err = blobrc.Close()
So(err, ShouldBeNil)
cblob, cdigest = test.GetRandomImageConfig()
_, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest)
So(err, ShouldBeNil)
So(clen, ShouldEqual, len(cblob))
hasBlob, _, err = imgStore.CheckBlob("dedupe2", cdigest)
So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true)
manifest = ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(buflen),
},
},
}
manifest.SchemaVersion = 2
manifestBuf, err = json.Marshal(manifest)
So(err, ShouldBeNil)
digest = godigest.FromBytes(manifestBuf)
_, err = imgStore.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil)
_, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String())
So(err, ShouldBeNil)
// verify that dedupe with hard links happened
fi1, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
So(os.SameFile(fi1, fi2), ShouldBeTrue)
Convey("storage and cache inconsistency", func() {
// delete blobs
err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
_, _, err = imgStore.CheckBlob("dedupe1", digest)
So(err, ShouldBeNil)
err := os.Remove(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
blobrc, _, err := imgStore.GetBlob("dedupe1", digest, "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
err = blobrc.Close()
So(err, ShouldBeNil)
// now cache is inconsistent with storage (blobs present in cache but not in storage)
upload, err = imgStore.NewBlobUpload("dedupe3")
cblob, cdigest := test.GetRandomImageConfig()
_, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest)
So(err, ShouldBeNil)
So(clen, ShouldEqual, len(cblob))
hasBlob, _, err := imgStore.CheckBlob("dedupe1", cdigest)
So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true)
manifest := ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(buflen),
},
},
}
manifest.SchemaVersion = 2
manifestBuf, err := json.Marshal(manifest)
So(err, ShouldBeNil)
digest = godigest.FromBytes(manifestBuf)
_, err = imgStore.PutImageManifest("dedupe1", digest.String(),
ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil)
_, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String())
So(err, ShouldBeNil)
// manifest2
upload, err = imgStore.NewBlobUpload("dedupe2")
So(err, ShouldBeNil)
So(upload, ShouldNotBeEmpty)
@@ -1222,17 +1180,213 @@ func TestDedupeLinks(t *testing.T) {
buf = bytes.NewBuffer(content)
buflen = buf.Len()
digest = godigest.FromBytes(content)
blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf)
blob, err = imgStore.PutBlobChunkStreamed("dedupe2", upload, buf)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
blobDigest2 := strings.Split(digest.String(), ":")[1]
So(blobDigest2, ShouldNotBeEmpty)
err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest)
err = imgStore.FinishBlobUpload("dedupe2", upload, buf, digest)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
_, _, err = imgStore.CheckBlob("dedupe2", digest)
So(err, ShouldBeNil)
blobrc, _, err = imgStore.GetBlob("dedupe2", digest, "application/vnd.oci.image.layer.v1.tar+gzip")
So(err, ShouldBeNil)
err = blobrc.Close()
So(err, ShouldBeNil)
cblob, cdigest = test.GetRandomImageConfig()
_, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest)
So(err, ShouldBeNil)
So(clen, ShouldEqual, len(cblob))
hasBlob, _, err = imgStore.CheckBlob("dedupe2", cdigest)
So(err, ShouldBeNil)
So(hasBlob, ShouldEqual, true)
manifest = ispec.Manifest{
Config: ispec.Descriptor{
MediaType: "application/vnd.oci.image.config.v1+json",
Digest: cdigest,
Size: int64(len(cblob)),
},
Layers: []ispec.Descriptor{
{
MediaType: "application/vnd.oci.image.layer.v1.tar",
Digest: digest,
Size: int64(buflen),
},
},
}
manifest.SchemaVersion = 2
manifestBuf, err = json.Marshal(manifest)
So(err, ShouldBeNil)
digest = godigest.FromBytes(manifestBuf)
_, err = imgStore.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, manifestBuf)
So(err, ShouldBeNil)
_, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String())
So(err, ShouldBeNil)
// verify that dedupe with hard links happened
fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
So(os.SameFile(fi1, fi2), ShouldEqual, testCase.expected)
if !testCase.dedupe {
Convey("Intrerrupt rebuilding and restart, checking idempotency", func() {
for i := 0; i < 10; i++ {
taskScheduler, cancel := runAndGetScheduler()
// rebuild with dedupe true
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
true, true, log, metrics, nil, cacheDriver)
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
sleepValue := i * 50
time.Sleep(time.Duration(sleepValue) * time.Millisecond)
cancel()
}
taskScheduler, cancel := runAndGetScheduler()
// rebuild with dedupe true
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
true, true, log, metrics, nil, cacheDriver)
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
// wait until rebuild finishes
time.Sleep(10 * time.Second)
cancel()
fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
So(os.SameFile(fi1, fi2), ShouldEqual, true)
})
Convey("rebuild dedupe index error cache nil", func() {
// switch dedupe to true from false
taskScheduler, cancel := runAndGetScheduler()
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
true, true, log, metrics, nil, nil)
// rebuild with dedupe true
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
// wait until rebuild finishes
time.Sleep(3 * time.Second)
cancel()
fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
So(os.SameFile(fi1, fi2), ShouldEqual, false)
})
Convey("rebuild dedupe index cache error on original blob", func() {
// switch dedupe to true from false
taskScheduler, cancel := runAndGetScheduler()
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
true, true, log, metrics, nil, &mocks.CacheMock{
HasBlobFn: func(digest godigest.Digest, path string) bool {
return false
},
PutBlobFn: func(digest godigest.Digest, path string) error {
return errCache
},
})
// rebuild with dedupe true, should have samefile blobs
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
// wait until rebuild finishes
time.Sleep(10 * time.Second)
cancel()
fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
So(os.SameFile(fi1, fi2), ShouldEqual, false)
})
Convey("rebuild dedupe index cache error on duplicate blob", func() {
// switch dedupe to true from false
taskScheduler, cancel := runAndGetScheduler()
imgStore := local.NewImageStore(dir, false, storage.DefaultGCDelay,
true, true, log, metrics, nil, &mocks.CacheMock{
HasBlobFn: func(digest godigest.Digest, path string) bool {
return false
},
PutBlobFn: func(digest godigest.Digest, path string) error {
if strings.Contains(path, "dedupe2") {
return errCache
}
return nil
},
})
// rebuild with dedupe true, should have samefile blobs
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
// wait until rebuild finishes
time.Sleep(15 * time.Second)
cancel()
fi1, err := os.Stat(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
fi2, err := os.Stat(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
// deduped happened, but didn't cached
So(os.SameFile(fi1, fi2), ShouldEqual, true)
})
}
Convey("storage and cache inconsistency", func() {
// delete blobs
err = os.Remove(path.Join(dir, "dedupe1", "blobs", "sha256", blobDigest1))
So(err, ShouldBeNil)
err := os.Remove(path.Join(dir, "dedupe2", "blobs", "sha256", blobDigest2))
So(err, ShouldBeNil)
// now cache is inconsistent with storage (blobs present in cache but not in storage)
upload, err = imgStore.NewBlobUpload("dedupe3")
So(err, ShouldBeNil)
So(upload, ShouldNotBeEmpty)
content = []byte("test-data3")
buf = bytes.NewBuffer(content)
buflen = buf.Len()
digest = godigest.FromBytes(content)
blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
blobDigest2 := strings.Split(digest.String(), ":")[1]
So(blobDigest2, ShouldNotBeEmpty)
err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest)
So(err, ShouldBeNil)
So(blob, ShouldEqual, buflen)
})
})
})
}
}
func TestDedupe(t *testing.T) {
+230 -2
View File
@@ -24,6 +24,7 @@ import (
"github.com/rs/zerolog"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/common"
"zotregistry.io/zot/pkg/extensions/monitoring"
zlog "zotregistry.io/zot/pkg/log"
zreg "zotregistry.io/zot/pkg/regexp"
@@ -1191,7 +1192,7 @@ func (is *ObjectStorage) GetBlob(repo string, digest godigest.Digest, mediaType
}
// is a 'deduped' blob?
if binfo.Size() == 0 && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
if binfo.Size() == 0 {
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
@@ -1244,7 +1245,7 @@ func (is *ObjectStorage) GetBlobContent(repo string, digest godigest.Digest) ([]
}
// is a 'deduped' blob?
if binfo.Size() == 0 && fmt.Sprintf("%v", is.cache) != fmt.Sprintf("%v", nil) {
if binfo.Size() == 0 {
// Check blobs in cache
dstRecord, err := is.checkCacheBlob(digest)
if err != nil {
@@ -1395,3 +1396,230 @@ func writeFile(store driver.StorageDriver, filepath string, buf []byte) (int, er
return n, nil
}
func (is *ObjectStorage) GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error) {
var lockLatency time.Time
dir := is.rootDir
is.RLock(&lockLatency)
defer is.RUnlock(&lockLatency)
var duplicateBlobs []string
var digest godigest.Digest
err := is.store.Walk(context.Background(), dir, func(fileInfo driver.FileInfo) error {
if fileInfo.IsDir() {
return nil
}
blobDigest := godigest.NewDigestFromEncoded("sha256", path.Base(fileInfo.Path()))
if err := blobDigest.Validate(); err != nil {
return nil //nolint:nilerr // ignore files which are not blobs
}
if digest == "" && !common.DContains(lastDigests, blobDigest) {
digest = blobDigest
}
if blobDigest == digest {
duplicateBlobs = append(duplicateBlobs, fileInfo.Path())
}
return nil
})
// if the root directory is not yet created
var perr driver.PathNotFoundError
if errors.As(err, &perr) {
return digest, duplicateBlobs, nil
}
return digest, duplicateBlobs, err
}
func (is *ObjectStorage) getOriginalBlobFromDisk(duplicateBlobs []string) (string, error) {
for _, blobPath := range duplicateBlobs {
binfo, err := is.store.Stat(context.Background(), blobPath)
if err != nil {
is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob")
return "", zerr.ErrBlobNotFound
}
if binfo.Size() > 0 {
return blobPath, nil
}
}
return "", zerr.ErrBlobNotFound
}
func (is *ObjectStorage) getOriginalBlob(digest godigest.Digest, duplicateBlobs []string) (string, error) {
originalBlob := ""
var err error
originalBlob, err = is.checkCacheBlob(digest)
if err != nil && !errors.Is(err, zerr.ErrBlobNotFound) && !errors.Is(err, zerr.ErrCacheMiss) {
is.log.Error().Err(err).Msg("rebuild dedupe: unable to find blob in cache")
return originalBlob, err
}
// if we still don't have, search it
if originalBlob == "" {
is.log.Warn().Msg("rebuild dedupe: failed to find blob in cache, searching it in s3...")
// a rebuild dedupe was attempted in the past
// get original blob, should be found otherwise exit with error
originalBlob, err = is.getOriginalBlobFromDisk(duplicateBlobs)
if err != nil {
return originalBlob, err
}
}
is.log.Info().Msgf("rebuild dedupe: found original blob %s", originalBlob)
return originalBlob, nil
}
func (is *ObjectStorage) dedupeBlobs(digest godigest.Digest, duplicateBlobs []string) error {
if fmt.Sprintf("%v", is.cache) == fmt.Sprintf("%v", nil) {
is.log.Error().Err(zerr.ErrDedupeRebuild).Msg("no cache driver found, can not dedupe blobs")
return zerr.ErrDedupeRebuild
}
is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest")
var originalBlob string
// rebuild from dedupe false to true
for _, blobPath := range duplicateBlobs {
binfo, err := is.store.Stat(context.Background(), blobPath)
if err != nil {
is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob")
return err
}
if binfo.Size() == 0 {
is.log.Warn().Msg("rebuild dedupe: found file without content, trying to find the original blob")
// a rebuild dedupe was attempted in the past
// get original blob, should be found otherwise exit with error
if originalBlob == "" {
originalBlob, err = is.getOriginalBlob(digest, duplicateBlobs)
if err != nil {
is.log.Error().Err(err).Msg("rebuild dedupe: unable to find original blob")
return zerr.ErrDedupeRebuild
}
// cache original blob
if ok := is.cache.HasBlob(digest, originalBlob); !ok {
if err := is.cache.PutBlob(digest, originalBlob); err != nil {
return err
}
}
}
// cache dedupe blob
if ok := is.cache.HasBlob(digest, blobPath); !ok {
if err := is.cache.PutBlob(digest, blobPath); err != nil {
return err
}
}
} else {
// cache it
if ok := is.cache.HasBlob(digest, blobPath); !ok {
if err := is.cache.PutBlob(digest, blobPath); err != nil {
return err
}
}
// if we have an original blob cached then we can safely dedupe the rest of them
if originalBlob != "" {
if err := is.store.PutContent(context.Background(), blobPath, []byte{}); err != nil {
is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: unable to dedupe blob")
return err
}
}
// mark blob as preserved
originalBlob = blobPath
}
}
is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: deduping blobs for digest finished successfully")
return nil
}
func (is *ObjectStorage) restoreDedupedBlobs(digest godigest.Digest, duplicateBlobs []string) error {
is.log.Info().Str("digest", digest.String()).Msgf("rebuild dedupe: restoring deduped blobs for digest")
// first we need to find the original blob, either in cache or by checking each blob size
originalBlob, err := is.getOriginalBlob(digest, duplicateBlobs)
if err != nil {
is.log.Error().Err(err).Msg("rebuild dedupe: unable to find original blob")
return zerr.ErrDedupeRebuild
}
for _, blobPath := range duplicateBlobs {
binfo, err := is.store.Stat(context.Background(), blobPath)
if err != nil {
is.log.Error().Err(err).Str("path", blobPath).Msg("rebuild dedupe: failed to stat blob")
return err
}
// if we find a deduped blob, then copy original blob content to deduped one
if binfo.Size() == 0 {
// move content from original blob to deduped one
buf, err := is.store.GetContent(context.Background(), originalBlob)
if err != nil {
is.log.Error().Err(err).Str("path", originalBlob).Msg("rebuild dedupe: failed to get original blob content")
return err
}
_, err = writeFile(is.store, blobPath, buf)
if err != nil {
return err
}
}
}
is.log.Info().Str("digest", digest.String()).
Msgf("rebuild dedupe: restoring deduped blobs for digest finished successfully")
return nil
}
func (is *ObjectStorage) RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error {
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
if dedupe {
return is.dedupeBlobs(digest, duplicateBlobs)
}
return is.restoreDedupedBlobs(digest, duplicateBlobs)
}
func (is *ObjectStorage) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) {
generator := &storage.DedupeTaskGenerator{
ImgStore: is,
Dedupe: is.dedupe,
Log: is.log,
}
sch.SubmitGenerator(generator, interval, scheduler.HighPriority)
}
+1050 -34
View File
File diff suppressed because it is too large Load Diff
+3
View File
@@ -53,4 +53,7 @@ type ImageStore interface { //nolint:interfacebloat
GetOrasReferrers(repo string, digest godigest.Digest, artifactType string) ([]artifactspec.Descriptor, error)
RunGCRepo(repo string) error
RunGCPeriodically(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler)
RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error
GetNextDigestWithBlobPaths(lastDigests []godigest.Digest) (godigest.Digest, []string, error)
}
+1
View File
@@ -84,6 +84,7 @@ func createObjectsStore(rootDir string, cacheDir string) (driver.StorageDriver,
Name: "s3_cache",
UseRelPaths: false,
}, log)
il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay,
true, false, log, metrics, nil, store, cacheDriver,
)