diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index 1b7e0f79..59611699 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -10,6 +10,8 @@ import ( "path" "slices" "strings" + "sync" + "sync/atomic" "time" dockerList "github.com/distribution/distribution/v3/manifest/manifestlist" @@ -898,6 +900,28 @@ func IsEmptyLayersError(err error) bool { return len(validationErr.Causes) == 1 && strings.Contains(validationErr.Error(), manifestWithEmptyLayersErrMsg) } +// restoreRunState holds the completion-tracking state for a single restore run. +// Reset() replaces DedupeTaskGenerator.run with a brand new instance once the current +// run has no in-flight tasks left, rather than mutating it in place, so onDone closures +// captured during a previous run keep operating on their own isolated state and can +// never affect a new run's counters or completion callback. While tasks are still +// in-flight, Reset() keeps the same instance so their onDone callbacks can still drive +// checkCompletion for this run. +type restoreRunState struct { + // pendingTaskCount counts restore tasks generated for this run that have not yet + // completed successfully. It is incremented when a task is generated and decremented + // only by onDone, which a task calls on success (see dedupeTask.DoWork). A failed task + // is therefore never decremented, so the count intentionally never reaches zero again + // for this run, preventing checkCompletion from firing OnRestoreComplete after a failure. + pendingTaskCount atomic.Int64 + // completeOnce ensures OnRestoreComplete is called at most once for this run. + completeOnce sync.Once + // done is set to true once all tasks for this run have been generated. It is read + // both by IsDone() on the scheduler goroutine and by checkCompletion() from onDone + // callbacks running on task executor goroutines, hence atomic. + done atomic.Bool +} + // DedupeTaskGenerator takes all blobs paths found in the storage.imagestore and groups them by digest. // For each digest and based on the dedupe value it will dedupe or restore deduped blobs // to the original state(undeduped) by creating a task for each digest and pushing it to the task scheduler. @@ -911,18 +935,65 @@ type DedupeTaskGenerator struct { /* store processed digest, used for iterating duplicateBlobs one by one and generating a task for each unprocessed one*/ lastDigests []godigest.Digest - done bool repos []string // list of repos on which we run dedupe Log zlog.Logger + // OnRestoreComplete is called exactly once after ALL restore tasks have executed + // successfully. Used to write the restore-complete marker so the next startup with + // dedupe=false can skip the expensive per-digest scan. + OnRestoreComplete func() + // run holds the completion-tracking state for the current run. onDone closures + // capture the *restoreRunState pointer directly and operate on it independently + // of any later Reset(). It is an atomic.Pointer because checkCompletion() compares + // a captured run against the current one from task executor goroutines, while + // Next()/Reset() update it from the scheduler goroutine. + run atomic.Pointer[restoreRunState] } func (gen *DedupeTaskGenerator) Name() string { return "DedupeTaskGenerator" } +// getRun returns the state for the current run, lazily creating it on first use. +func (gen *DedupeTaskGenerator) getRun() *restoreRunState { + if run := gen.run.Load(); run != nil { + return run + } + + run := &restoreRunState{} + if !gen.run.CompareAndSwap(nil, run) { + run = gen.run.Load() + } + + return run +} + +// All restore tasks for this run have been generated AND all of them have completed successfully. +func (gen *DedupeTaskGenerator) checkCompletion(run *restoreRunState) { + if gen.OnRestoreComplete != nil && + run.done.Load() && + run.pendingTaskCount.Load() == 0 { + // Dispatch asynchronously so the executor goroutine that triggered completion + // isn't blocked on the marker write, which may be a slow S3 PUT. + run.completeOnce.Do(func() { + go func() { + defer func() { + if r := recover(); r != nil { + gen.Log.Error().Interface("panic", r).Str("component", "dedupe"). + Msg("panic in OnRestoreComplete") + } + }() + + gen.OnRestoreComplete() + }() + }) + } +} + func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { var err error + run := gen.getRun() + /* at first run get from storage currently found repositories so that we skip the ones that gets synced/uploaded while this generator runs, there are deduped/restored inline, no need to run dedupe/restore again */ if len(gen.repos) == 0 { @@ -939,7 +1010,8 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { gen.Log.Info().Str("component", "dedupe").Msg("no repositories found in storage, finished.") // no repositories in storage, no need to continue - gen.done = true + run.done.Store(true) + gen.checkCompletion(run) return nil, nil //nolint:nilnil } @@ -957,7 +1029,8 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { if gen.digest == "" { gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished") - gen.done = true + run.done.Store(true) + gen.checkCompletion(run) return nil, nil //nolint:nilnil } @@ -965,12 +1038,25 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) { // mark digest as processed before running its task gen.lastDigests = append(gen.lastDigests, gen.digest) + // For restore passes, track each task so the marker is only written after all succeed. + var onDone func() + + if !gen.Dedupe && gen.OnRestoreComplete != nil { + run.pendingTaskCount.Add(1) + + onDone = func() { + if run.pendingTaskCount.Add(-1) == 0 { + gen.checkCompletion(run) + } + } + } + // generate rebuild dedupe task for this digest - return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil + return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log, onDone), nil } func (gen *DedupeTaskGenerator) IsDone() bool { - return gen.done + return gen.getRun().done.Load() } func (gen *DedupeTaskGenerator) IsReady() bool { @@ -978,11 +1064,21 @@ func (gen *DedupeTaskGenerator) IsReady() bool { } func (gen *DedupeTaskGenerator) Reset() { + run := gen.getRun() + + // Only start a fresh run if the current one has no in-flight tasks (or completion + // isn't tracked at all). If tasks are still executing, keep the same run state so + // their onDone callbacks can still drive checkCompletion to fire OnRestoreComplete + // once they finish; replacing gen.run here would otherwise make that run's + // completion unobservable forever. + if gen.OnRestoreComplete == nil || (run.done.Load() && run.pendingTaskCount.Load() == 0) { + gen.run.Store(&restoreRunState{}) + } + gen.lastDigests = []godigest.Digest{} gen.duplicateBlobs = []string{} gen.repos = []string{} gen.digest = "" - gen.done = false } type dedupeTask struct { @@ -993,12 +1089,21 @@ type dedupeTask struct { duplicateBlobs []string dedupe bool log zlog.Logger + // onDone is called when this restore task succeeds. nil for dedupe (non-restore) tasks. + onDone func() } func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, dedupe bool, - duplicateBlobs []string, log zlog.Logger, + duplicateBlobs []string, log zlog.Logger, onDone func(), ) *dedupeTask { - return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log} + return &dedupeTask{ + imgStore: imgStore, + digest: digest, + duplicateBlobs: duplicateBlobs, + dedupe: dedupe, + log: log, + onDone: onDone, + } } func (dt *dedupeTask) DoWork(ctx context.Context) error { @@ -1008,9 +1113,16 @@ func (dt *dedupeTask) DoWork(ctx context.Context) error { // log it dt.log.Error().Err(err).Str("digest", dt.digest.String()).Str("component", "dedupe"). Msg("failed to rebuild digest") + + return err } - return err + // Signal successful completion so the generator can track when all restores are done. + if dt.onDone != nil { + dt.onDone() + } + + return nil } func (dt *dedupeTask) String() string { diff --git a/pkg/storage/common/common_test.go b/pkg/storage/common/common_test.go index e65c3d1e..a4a6b748 100644 --- a/pkg/storage/common/common_test.go +++ b/pkg/storage/common/common_test.go @@ -2,10 +2,14 @@ package storage_test import ( "bytes" + "context" "encoding/json" "errors" "os" + "strings" + "sync" "testing" + "time" godigest "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -19,6 +23,7 @@ import ( common "zotregistry.dev/zot/v2/pkg/storage/common" "zotregistry.dev/zot/v2/pkg/storage/imagestore" "zotregistry.dev/zot/v2/pkg/storage/local" + tcommon "zotregistry.dev/zot/v2/pkg/test/common" . "zotregistry.dev/zot/v2/pkg/test/image-utils" "zotregistry.dev/zot/v2/pkg/test/mocks" ) @@ -611,6 +616,260 @@ func TestDedupeGeneratorErrors(t *testing.T) { }) } +func TestDedupeTaskGeneratorRestoreComplete(t *testing.T) { + testLog := log.NewTestLogger() + + Convey("OnRestoreComplete fires once after all restore tasks finish successfully", t, func(c C) { + digest := godigest.FromString("blob1") + duplicateBlobs := []string{"/repo/blob1-a", "/repo/blob1-b"} + + getNextCalls := 0 + + imgStore := &mocks.MockedImageStore{ + GetRepositoriesFn: func() ([]string, error) { + return []string{"repo1"}, nil + }, + GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) ( + godigest.Digest, []string, error, + ) { + getNextCalls++ + if getNextCalls == 1 { + return digest, duplicateBlobs, nil + } + + return "", nil, nil + }, + RunDedupeForDigestFn: func(ctx context.Context, digest godigest.Digest, dedupe bool, + duplicateBlobs []string, + ) error { + return nil + }, + } + + var ( + callCountMutex sync.Mutex + callCount int + ) + + done := make(chan struct{}) + + generator := &common.DedupeTaskGenerator{ + ImgStore: imgStore, + Dedupe: false, + Log: testLog, + OnRestoreComplete: func() { + callCountMutex.Lock() + callCount++ + callCountMutex.Unlock() + close(done) + }, + } + + // first Next(): generates the restore task, pendingTaskCount becomes 1 + task, err := generator.Next() + So(err, ShouldBeNil) + So(task, ShouldNotBeNil) + So(generator.IsDone(), ShouldBeFalse) + + // running the task triggers onDone(), bringing pendingTaskCount back to 0; + // the run isn't marked done yet so this is a no-op for checkCompletion + err = task.DoWork(context.Background()) + So(err, ShouldBeNil) + + // second Next(): no more digests, marks the run as done and triggers checkCompletion + task, err = generator.Next() + So(err, ShouldBeNil) + So(task, ShouldBeNil) + So(generator.IsDone(), ShouldBeTrue) + + // OnRestoreComplete is dispatched asynchronously + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for OnRestoreComplete") + } + + callCountMutex.Lock() + So(callCount, ShouldEqual, 1) + callCountMutex.Unlock() + + // Reset() starts a fresh run since this one completed with no pending tasks + generator.Reset() + So(generator.IsDone(), ShouldBeFalse) + }) + + Convey("Reset keeps the same run while a restore task is in-flight", t, func(c C) { + digest := godigest.FromString("blob2") + duplicateBlobs := []string{"/repo/blob2-a"} + + getNextCalls := 0 + + imgStore := &mocks.MockedImageStore{ + GetRepositoriesFn: func() ([]string, error) { + return []string{"repo1"}, nil + }, + GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) ( + godigest.Digest, []string, error, + ) { + getNextCalls++ + if getNextCalls == 1 { + return digest, duplicateBlobs, nil + } + + return "", nil, nil + }, + RunDedupeForDigestFn: func(ctx context.Context, digest godigest.Digest, dedupe bool, + duplicateBlobs []string, + ) error { + return nil + }, + } + + var ( + callCountMutex sync.Mutex + callCount int + ) + + done := make(chan struct{}) + + generator := &common.DedupeTaskGenerator{ + ImgStore: imgStore, + Dedupe: false, + Log: testLog, + OnRestoreComplete: func() { + callCountMutex.Lock() + callCount++ + callCountMutex.Unlock() + close(done) + }, + } + + // first Next(): generates the restore task, pendingTaskCount becomes 1, task not run yet + task, err := generator.Next() + So(err, ShouldBeNil) + So(task, ShouldNotBeNil) + + // second Next(): no more digests, marks the run as done; checkCompletion is a no-op + // because the task above hasn't finished yet (pendingTaskCount == 1) + _, err = generator.Next() + So(err, ShouldBeNil) + So(generator.IsDone(), ShouldBeTrue) + + // Reset() must keep the same run state, since OnRestoreComplete hasn't fired yet + generator.Reset() + So(generator.IsDone(), ShouldBeTrue) + + // finishing the in-flight task now drives checkCompletion to fire OnRestoreComplete + err = task.DoWork(context.Background()) + So(err, ShouldBeNil) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for OnRestoreComplete") + } + + callCountMutex.Lock() + So(callCount, ShouldEqual, 1) + callCountMutex.Unlock() + }) + + Convey("checkCompletion recovers if OnRestoreComplete panics", t, func(c C) { + digest := godigest.FromString("blob3") + duplicateBlobs := []string{"/repo/blob3-a"} + + getNextCalls := 0 + + imgStore := &mocks.MockedImageStore{ + GetRepositoriesFn: func() ([]string, error) { + return []string{"repo1"}, nil + }, + GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) ( + godigest.Digest, []string, error, + ) { + getNextCalls++ + if getNextCalls == 1 { + return digest, duplicateBlobs, nil + } + + return "", nil, nil + }, + RunDedupeForDigestFn: func(ctx context.Context, digest godigest.Digest, dedupe bool, + duplicateBlobs []string, + ) error { + return nil + }, + } + + logBuf := tcommon.NewThreadSafeLogBuffer() + + panicLog := log.NewLoggerWithWriter("debug", logBuf) + + done := make(chan struct{}) + + generator := &common.DedupeTaskGenerator{ + ImgStore: imgStore, + Dedupe: false, + Log: panicLog, + OnRestoreComplete: func() { + defer close(done) + panic("boom") + }, + } + + task, err := generator.Next() + So(err, ShouldBeNil) + So(task, ShouldNotBeNil) + + err = task.DoWork(context.Background()) + So(err, ShouldBeNil) + + _, err = generator.Next() + So(err, ShouldBeNil) + So(generator.IsDone(), ShouldBeTrue) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for OnRestoreComplete") + } + + for range 100 { + if strings.Contains(logBuf.String(), "panic in OnRestoreComplete") { + break + } + + time.Sleep(10 * time.Millisecond) + } + + So(logBuf.String(), ShouldContainSubstring, "panic in OnRestoreComplete") + }) + + Convey("getRun is safe when the first call races across goroutines", t, func(c C) { + generator := &common.DedupeTaskGenerator{ + Log: log.NewTestLogger(), + } + + const numGoroutines = 50 + + var wg sync.WaitGroup + + start := make(chan struct{}) + + for range numGoroutines { + wg.Go(func() { + <-start + generator.IsDone() + }) + } + + close(start) + wg.Wait() + + So(generator.IsDone(), ShouldBeFalse) + }) +} + func TestPruneImageManifestsFromIndexMissingIndex(t *testing.T) { log := log.NewTestLogger() diff --git a/pkg/storage/constants/constants.go b/pkg/storage/constants/constants.go index c2b305dc..416668aa 100644 --- a/pkg/storage/constants/constants.go +++ b/pkg/storage/constants/constants.go @@ -26,4 +26,15 @@ const ( S3StorageDriverName = "s3" GCSStorageDriverName = "gcs" LocalStorageDriverName = "local" + // DedupeRestoreCompleteMarker is written at the image store root when a full dedupe-restore + // pass has completed. Its presence means no deduped blobs remain, so subsequent startups + // with dedupe=false can skip the expensive per-digest restore scan. The marker is deleted + // whenever dedupe is re-enabled, so that the next dedupe→false transition reruns restore. + DedupeRestoreCompleteMarker = "_restore_complete" + // DedupeRestoreMarkerComplete is the content of DedupeRestoreCompleteMarker when a restore + // pass has completed successfully. + DedupeRestoreMarkerComplete = "1" + // DedupeRestoreMarkerInvalid is the content written to DedupeRestoreCompleteMarker to + // invalidate a previous completion, forcing the restore scan to run again. + DedupeRestoreMarkerInvalid = "0" ) diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index 1819b363..981e1952 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -2268,7 +2268,11 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D // if we find a deduped blob, then copy original blob content to deduped one if binfo.Size() == 0 { - // move content from original blob to deduped one + // Read the original blob content without holding the write lock - this can be a + // large S3 GET and must not stall concurrent push operations. + // Note: this buffers the whole blob in memory, which can spike memory usage for + // large layers when many restore tasks run concurrently. Consider streaming the + // copy instead in a follow-up; that refactor is heavier and riskier than this fix. buf, err := is.storeDriver.ReadFile(originalBlob) if err != nil { is.log.Error().Err(err).Str("path", originalBlob).Str("component", "dedupe"). @@ -2277,7 +2281,33 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D return err } - _, err = is.storeDriver.WriteFile(blobPath, buf) + // Hold the write lock only for the actual blob write so that concurrent + // CheckBlob (read lock) and FinishBlobUpload (write lock) are not starved + // by the preceding slow S3 reads. + err = func() error { + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + + // Re-check size inside the lock: another goroutine may have already + // restored or uploaded this blob between our Stat and Lock above. + recheck, serr := is.storeDriver.Stat(blobPath) + if serr == nil { + if recheck.Size() > 0 { + return nil + } + } else { + var pathNotFound driver.PathNotFoundError + if !errors.As(serr, &pathNotFound) { + return serr + } + } + + _, err := is.storeDriver.WriteFile(blobPath, buf) + + return err + }() if err != nil { return err } @@ -2293,12 +2323,12 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool, duplicateBlobs []string, ) error { - var lockLatency time.Time - - is.Lock(&lockLatency) - defer is.Unlock(&lockLatency) - if dedupe { + var lockLatency time.Time + + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + return is.dedupeBlobs(ctx, digest, duplicateBlobs) } @@ -2306,12 +2336,69 @@ func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Di } func (is *ImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) { + markerPath := path.Join(is.rootDir, storageConstants.DedupeRestoreCompleteMarker) + + if is.dedupe { + // Dedupe is active: remove the restore-complete marker so that a future dedupe→false + // transition knows it must run restore again. + if err := is.storeDriver.Delete(markerPath); err != nil { + var pathNotFound driver.PathNotFoundError + if !errors.As(err, &pathNotFound) { + is.log.Warn().Err(err).Str("component", "dedupe"). + Msg("failed to remove restore-complete marker") + + // Overwrite with invalid content so future dedupe=false startups won't skip restore. + if _, werr := is.storeDriver.WriteFile(markerPath, + []byte(storageConstants.DedupeRestoreMarkerInvalid)); werr != nil { + is.log.Error().Err(werr).Str("component", "dedupe"). + Msg("failed to invalidate restore-complete marker; stale marker may cause incorrect skip on next startup") + } + } + } + } else { + // Dedupe is disabled: skip the restore scan if a previous pass already completed. + // The marker is absent on first run or after dedupe was re-enabled, in which case + // we must run restore to handle any zero-size blobs left by prior deduplication. + if data, err := is.storeDriver.ReadFile(markerPath); err == nil { + content := strings.TrimSpace(string(data)) + + if content == storageConstants.DedupeRestoreMarkerComplete { + is.log.Info().Str("component", "dedupe"). + Msg("restore-complete marker present, skipping dedupe restore scan") + + return + } + + is.log.Debug().Str("component", "dedupe").Str("content", content). + Msg("restore-complete marker present but not complete, continuing with dedupe restore scan") + } else { + var pathNotFound driver.PathNotFoundError + if !errors.As(err, &pathNotFound) { + is.log.Warn().Err(err).Str("component", "dedupe"). + Msg("failed to check restore-complete marker; continuing with dedupe restore scan") + } + } + } + generator := &common.DedupeTaskGenerator{ ImgStore: is, Dedupe: is.dedupe, Log: is.log, } + if !is.dedupe { + generator.OnRestoreComplete = func() { + if _, err := is.storeDriver.WriteFile(markerPath, + []byte(storageConstants.DedupeRestoreMarkerComplete)); err != nil { + is.log.Error().Err(err).Str("component", "dedupe"). + Msg("failed to write restore-complete marker") + } else { + is.log.Info().Str("component", "dedupe"). + Msg("restore-complete marker written; future startups will skip the restore scan") + } + } + } + sch.SubmitGenerator(generator, interval, scheduler.MediumPriority) } diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 78d15d9f..590f5c3c 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -14,10 +14,12 @@ import ( "os" "path" "strings" + "sync" "syscall" "testing" "time" + storagedriver "github.com/distribution/distribution/v3/registry/storage/driver" godigest "github.com/opencontainers/go-digest" imeta "github.com/opencontainers/image-spec/specs-go" ispec "github.com/opencontainers/image-spec/specs-go/v1" @@ -33,6 +35,7 @@ import ( "zotregistry.dev/zot/v2/pkg/storage/cache" storageConstants "zotregistry.dev/zot/v2/pkg/storage/constants" "zotregistry.dev/zot/v2/pkg/storage/gc" + "zotregistry.dev/zot/v2/pkg/storage/imagestore" "zotregistry.dev/zot/v2/pkg/storage/local" storageTypes "zotregistry.dev/zot/v2/pkg/storage/types" test "zotregistry.dev/zot/v2/pkg/test/common" @@ -59,7 +62,12 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals }, } -var errCache = errors.New("new cache error") +var ( + errCache = errors.New("new cache error") + errRecheckStatFailed = errors.New("recheck stat failed") + errMarkerDeleteFailed = errors.New("delete failed") + errMarkerWriteFailed = errors.New("write failed") +) func runAndGetScheduler() *scheduler.Scheduler { log := zlog.NewTestLogger() @@ -1560,6 +1568,299 @@ func TestDedupeLinks(t *testing.T) { } } +func TestDedupeRestoreCompleteMarker(t *testing.T) { + waitForMarker := func(t *testing.T, markerPath, expected string) { + t.Helper() + + for range 100 { + data, err := os.ReadFile(markerPath) + if err == nil && strings.TrimSpace(string(data)) == expected { + return + } + + time.Sleep(50 * time.Millisecond) + } + + t.Fatalf("timed out waiting for marker %q at %s", expected, markerPath) + } + + Convey("Restore-complete marker lifecycle", t, func(c C) { + dir := t.TempDir() + + logBuf := test.NewThreadSafeLogBuffer() + + log := zlog.NewLoggerWithWriter("debug", logBuf) + metrics := monitoring.NewMetricsServer(false, log) + + markerPath := path.Join(dir, storageConstants.DedupeRestoreCompleteMarker) + + Convey("dedupe=false on an empty store writes the restore-complete marker", func() { + imgStore := local.NewImageStore(dir, false, true, log, metrics, nil, nil, nil, nil) + + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + waitForMarker(t, markerPath, storageConstants.DedupeRestoreMarkerComplete) + + taskScheduler.Shutdown() + + So(logBuf.String(), ShouldContainSubstring, "restore-complete marker written") + }) + + Convey("dedupe=false with a complete marker skips the restore scan", func() { + err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerComplete), + storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(dir, false, true, log, metrics, nil, nil, nil, nil) + + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + taskScheduler.Shutdown() + + So(logBuf.String(), ShouldContainSubstring, "skipping dedupe restore scan") + + data, err := os.ReadFile(markerPath) + So(err, ShouldBeNil) + So(strings.TrimSpace(string(data)), ShouldEqual, storageConstants.DedupeRestoreMarkerComplete) + }) + + Convey("dedupe=false with an incomplete marker continues the restore scan", func() { + err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerInvalid), + storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(dir, false, true, log, metrics, nil, nil, nil, nil) + + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + waitForMarker(t, markerPath, storageConstants.DedupeRestoreMarkerComplete) + + taskScheduler.Shutdown() + + So(logBuf.String(), ShouldContainSubstring, "restore-complete marker present but not complete") + }) + + Convey("dedupe=true removes an existing restore-complete marker", func() { + err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerComplete), + storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil, nil, nil) + + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + taskScheduler.Shutdown() + + _, err = os.Stat(markerPath) + So(errors.Is(err, fs.ErrNotExist), ShouldBeTrue) + }) + }) +} + +// recheckDriver wraps local.Driver and, for a single target path, returns a +// custom result on the second Stat call to simulate the state observed by +// restoreDedupedBlobs' re-check under the write lock. +type recheckDriver struct { + *local.Driver + + targetPath string + + mu sync.Mutex + statCalls int + + recheckSize int64 + recheckErr error +} + +func (d *recheckDriver) Stat(path string) (storagedriver.FileInfo, error) { + if path == d.targetPath { + d.mu.Lock() + d.statCalls++ + call := d.statCalls + d.mu.Unlock() + + if call == 2 { + if d.recheckErr != nil { + return nil, d.recheckErr + } + + return storagedriver.FileInfoInternal{ + FileInfoFields: storagedriver.FileInfoFields{Path: path, Size: d.recheckSize}, + }, nil + } + } + + return d.Driver.Stat(path) +} + +func TestRestoreDedupedBlobsRecheck(t *testing.T) { + Convey("restoreDedupedBlobs re-checks blob size under the write lock", t, func() { + log := zlog.NewTestLogger() + metrics := monitoring.NewMetricsServer(false, log) + + Convey("a blob restored by a concurrent task is left untouched", func() { + dir := t.TempDir() + blobsDir := path.Join(dir, "test", "blobs", "sha256") + + err := os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + content := []byte("original content") + digest := godigest.FromBytes(content) + + originalPath := path.Join(blobsDir, digest.Encoded()) + err = os.WriteFile(originalPath, content, storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + dupPath := path.Join(blobsDir, "duplicate") + err = os.WriteFile(dupPath, []byte{}, storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + testDriver := &recheckDriver{ + Driver: local.New(true), + targetPath: dupPath, + recheckSize: int64(len(content)), + } + + imgStore := imagestore.NewImageStore(dir, dir, false, true, log, metrics, nil, testDriver, nil, nil, nil) + + err = imgStore.RunDedupeForDigest(context.Background(), digest, false, []string{originalPath, dupPath}) + So(err, ShouldBeNil) + + data, err := os.ReadFile(dupPath) + So(err, ShouldBeNil) + So(data, ShouldBeEmpty) + }) + + Convey("a stat error on the re-check is propagated", func() { + dir := t.TempDir() + blobsDir := path.Join(dir, "test", "blobs", "sha256") + + err := os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms) + So(err, ShouldBeNil) + + content := []byte("original content") + digest := godigest.FromBytes(content) + + originalPath := path.Join(blobsDir, digest.Encoded()) + err = os.WriteFile(originalPath, content, storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + dupPath := path.Join(blobsDir, "duplicate") + err = os.WriteFile(dupPath, []byte{}, storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + testDriver := &recheckDriver{ + Driver: local.New(true), + targetPath: dupPath, + recheckErr: errRecheckStatFailed, + } + + imgStore := imagestore.NewImageStore(dir, dir, false, true, log, metrics, nil, testDriver, nil, nil, nil) + + err = imgStore.RunDedupeForDigest(context.Background(), digest, false, []string{originalPath, dupPath}) + So(err, ShouldEqual, errRecheckStatFailed) + }) + }) +} + +// errMarkerDriver wraps local.Driver and injects errors for operations on the +// dedupe restore-complete marker file. +type errMarkerDriver struct { + *local.Driver + + deleteErr error + writeErr error +} + +func (d *errMarkerDriver) Delete(path string) error { + if d.deleteErr != nil && strings.Contains(path, storageConstants.DedupeRestoreCompleteMarker) { + return d.deleteErr + } + + return d.Driver.Delete(path) +} + +func (d *errMarkerDriver) WriteFile(filepath string, content []byte) (int, error) { + if d.writeErr != nil && strings.Contains(filepath, storageConstants.DedupeRestoreCompleteMarker) { + return 0, d.writeErr + } + + return d.Driver.WriteFile(filepath, content) +} + +func TestRunDedupeBlobsMarkerDeleteError(t *testing.T) { + Convey("RunDedupeBlobs handles errors removing the restore-complete marker", t, func() { + dir := t.TempDir() + + markerPath := path.Join(dir, storageConstants.DedupeRestoreCompleteMarker) + + err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerComplete), + storageConstants.DefaultFilePerms) + So(err, ShouldBeNil) + + Convey("falls back to writing an invalid marker when delete fails", func() { + logBuf := test.NewThreadSafeLogBuffer() + + log := zlog.NewLoggerWithWriter("debug", logBuf) + metrics := monitoring.NewMetricsServer(false, log) + + testDriver := &errMarkerDriver{ + Driver: local.New(true), + deleteErr: errMarkerDeleteFailed, + } + + imgStore := imagestore.NewImageStore(dir, dir, true, true, log, metrics, nil, testDriver, nil, nil, nil) + + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + So(logBuf.String(), ShouldContainSubstring, "failed to remove restore-complete marker") + + data, err := os.ReadFile(markerPath) + So(err, ShouldBeNil) + So(strings.TrimSpace(string(data)), ShouldEqual, storageConstants.DedupeRestoreMarkerInvalid) + }) + + Convey("logs when even the invalid-marker write fails", func() { + logBuf := test.NewThreadSafeLogBuffer() + + log := zlog.NewLoggerWithWriter("debug", logBuf) + metrics := monitoring.NewMetricsServer(false, log) + + testDriver := &errMarkerDriver{ + Driver: local.New(true), + deleteErr: errMarkerDeleteFailed, + writeErr: errMarkerWriteFailed, + } + + imgStore := imagestore.NewImageStore(dir, dir, true, true, log, metrics, nil, testDriver, nil, nil, nil) + + taskScheduler := runAndGetScheduler() + defer taskScheduler.Shutdown() + + imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler) + + So(logBuf.String(), ShouldContainSubstring, "failed to remove restore-complete marker") + So(logBuf.String(), ShouldContainSubstring, "failed to invalidate restore-complete marker") + }) + }) +} + func TestDedupe(t *testing.T) { Convey("Dedupe", t, func(c C) { Convey("Nil ImageStore", func() {