fix(storage): release global write lock during blob restore I/O (#4089)

* fix(storage): release global write lock during blob restore I/O

Restore-deduped-blob reads/writes no longer hold the global storage write
lock for the duration of slow S3 GETs, only for the final write. Adds a
restore-complete marker so subsequent dedupe=false startups can skip the
restore scan, and hardens DedupeTaskGenerator's completion tracking against
races between in-flight restore tasks and Reset().

Signed-off-by: shcherbak <ju.shcherbak@gmail.com>

* test(storage): reuse ThreadSafeLogBuffer instead of duplicate syncBuffer

---------

Signed-off-by: shcherbak <ju.shcherbak@gmail.com>
This commit is contained in:
shcherbak
2026-06-15 09:48:56 +03:00
committed by GitHub
parent c18a4a975d
commit 89f7e24d20
5 changed files with 787 additions and 17 deletions
+121 -9
View File
@@ -10,6 +10,8 @@ import (
"path"
"slices"
"strings"
"sync"
"sync/atomic"
"time"
dockerList "github.com/distribution/distribution/v3/manifest/manifestlist"
@@ -898,6 +900,28 @@ func IsEmptyLayersError(err error) bool {
return len(validationErr.Causes) == 1 && strings.Contains(validationErr.Error(), manifestWithEmptyLayersErrMsg)
}
// restoreRunState holds the completion-tracking state for a single restore run.
// Reset() replaces DedupeTaskGenerator.run with a brand new instance once the current
// run has no in-flight tasks left, rather than mutating it in place, so onDone closures
// captured during a previous run keep operating on their own isolated state and can
// never affect a new run's counters or completion callback. While tasks are still
// in-flight, Reset() keeps the same instance so their onDone callbacks can still drive
// checkCompletion for this run.
type restoreRunState struct {
// pendingTaskCount counts restore tasks generated for this run that have not yet
// completed successfully. It is incremented when a task is generated and decremented
// only by onDone, which a task calls on success (see dedupeTask.DoWork). A failed task
// is therefore never decremented, so the count intentionally never reaches zero again
// for this run, preventing checkCompletion from firing OnRestoreComplete after a failure.
pendingTaskCount atomic.Int64
// completeOnce ensures OnRestoreComplete is called at most once for this run.
completeOnce sync.Once
// done is set to true once all tasks for this run have been generated. It is read
// both by IsDone() on the scheduler goroutine and by checkCompletion() from onDone
// callbacks running on task executor goroutines, hence atomic.
done atomic.Bool
}
// DedupeTaskGenerator takes all blobs paths found in the storage.imagestore and groups them by digest.
// For each digest and based on the dedupe value it will dedupe or restore deduped blobs
// to the original state(undeduped) by creating a task for each digest and pushing it to the task scheduler.
@@ -911,18 +935,65 @@ type DedupeTaskGenerator struct {
/* store processed digest, used for iterating duplicateBlobs one by one
and generating a task for each unprocessed one*/
lastDigests []godigest.Digest
done bool
repos []string // list of repos on which we run dedupe
Log zlog.Logger
// OnRestoreComplete is called exactly once after ALL restore tasks have executed
// successfully. Used to write the restore-complete marker so the next startup with
// dedupe=false can skip the expensive per-digest scan.
OnRestoreComplete func()
// run holds the completion-tracking state for the current run. onDone closures
// capture the *restoreRunState pointer directly and operate on it independently
// of any later Reset(). It is an atomic.Pointer because checkCompletion() compares
// a captured run against the current one from task executor goroutines, while
// Next()/Reset() update it from the scheduler goroutine.
run atomic.Pointer[restoreRunState]
}
func (gen *DedupeTaskGenerator) Name() string {
return "DedupeTaskGenerator"
}
// getRun returns the state for the current run, lazily creating it on first use.
func (gen *DedupeTaskGenerator) getRun() *restoreRunState {
if run := gen.run.Load(); run != nil {
return run
}
run := &restoreRunState{}
if !gen.run.CompareAndSwap(nil, run) {
run = gen.run.Load()
}
return run
}
// All restore tasks for this run have been generated AND all of them have completed successfully.
func (gen *DedupeTaskGenerator) checkCompletion(run *restoreRunState) {
if gen.OnRestoreComplete != nil &&
run.done.Load() &&
run.pendingTaskCount.Load() == 0 {
// Dispatch asynchronously so the executor goroutine that triggered completion
// isn't blocked on the marker write, which may be a slow S3 PUT.
run.completeOnce.Do(func() {
go func() {
defer func() {
if r := recover(); r != nil {
gen.Log.Error().Interface("panic", r).Str("component", "dedupe").
Msg("panic in OnRestoreComplete")
}
}()
gen.OnRestoreComplete()
}()
})
}
}
func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
var err error
run := gen.getRun()
/* at first run get from storage currently found repositories so that we skip the ones that gets synced/uploaded
while this generator runs, there are deduped/restored inline, no need to run dedupe/restore again */
if len(gen.repos) == 0 {
@@ -939,7 +1010,8 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
gen.Log.Info().Str("component", "dedupe").Msg("no repositories found in storage, finished.")
// no repositories in storage, no need to continue
gen.done = true
run.done.Store(true)
gen.checkCompletion(run)
return nil, nil //nolint:nilnil
}
@@ -957,7 +1029,8 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
if gen.digest == "" {
gen.Log.Info().Str("component", "dedupe").Msg("no digests left, finished")
gen.done = true
run.done.Store(true)
gen.checkCompletion(run)
return nil, nil //nolint:nilnil
}
@@ -965,12 +1038,25 @@ func (gen *DedupeTaskGenerator) Next() (scheduler.Task, error) {
// mark digest as processed before running its task
gen.lastDigests = append(gen.lastDigests, gen.digest)
// For restore passes, track each task so the marker is only written after all succeed.
var onDone func()
if !gen.Dedupe && gen.OnRestoreComplete != nil {
run.pendingTaskCount.Add(1)
onDone = func() {
if run.pendingTaskCount.Add(-1) == 0 {
gen.checkCompletion(run)
}
}
}
// generate rebuild dedupe task for this digest
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log), nil
return newDedupeTask(gen.ImgStore, gen.digest, gen.Dedupe, gen.duplicateBlobs, gen.Log, onDone), nil
}
func (gen *DedupeTaskGenerator) IsDone() bool {
return gen.done
return gen.getRun().done.Load()
}
func (gen *DedupeTaskGenerator) IsReady() bool {
@@ -978,11 +1064,21 @@ func (gen *DedupeTaskGenerator) IsReady() bool {
}
func (gen *DedupeTaskGenerator) Reset() {
run := gen.getRun()
// Only start a fresh run if the current one has no in-flight tasks (or completion
// isn't tracked at all). If tasks are still executing, keep the same run state so
// their onDone callbacks can still drive checkCompletion to fire OnRestoreComplete
// once they finish; replacing gen.run here would otherwise make that run's
// completion unobservable forever.
if gen.OnRestoreComplete == nil || (run.done.Load() && run.pendingTaskCount.Load() == 0) {
gen.run.Store(&restoreRunState{})
}
gen.lastDigests = []godigest.Digest{}
gen.duplicateBlobs = []string{}
gen.repos = []string{}
gen.digest = ""
gen.done = false
}
type dedupeTask struct {
@@ -993,12 +1089,21 @@ type dedupeTask struct {
duplicateBlobs []string
dedupe bool
log zlog.Logger
// onDone is called when this restore task succeeds. nil for dedupe (non-restore) tasks.
onDone func()
}
func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, dedupe bool,
duplicateBlobs []string, log zlog.Logger,
duplicateBlobs []string, log zlog.Logger, onDone func(),
) *dedupeTask {
return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log}
return &dedupeTask{
imgStore: imgStore,
digest: digest,
duplicateBlobs: duplicateBlobs,
dedupe: dedupe,
log: log,
onDone: onDone,
}
}
func (dt *dedupeTask) DoWork(ctx context.Context) error {
@@ -1008,9 +1113,16 @@ func (dt *dedupeTask) DoWork(ctx context.Context) error {
// log it
dt.log.Error().Err(err).Str("digest", dt.digest.String()).Str("component", "dedupe").
Msg("failed to rebuild digest")
return err
}
return err
// Signal successful completion so the generator can track when all restores are done.
if dt.onDone != nil {
dt.onDone()
}
return nil
}
func (dt *dedupeTask) String() string {
+259
View File
@@ -2,10 +2,14 @@ package storage_test
import (
"bytes"
"context"
"encoding/json"
"errors"
"os"
"strings"
"sync"
"testing"
"time"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -19,6 +23,7 @@ import (
common "zotregistry.dev/zot/v2/pkg/storage/common"
"zotregistry.dev/zot/v2/pkg/storage/imagestore"
"zotregistry.dev/zot/v2/pkg/storage/local"
tcommon "zotregistry.dev/zot/v2/pkg/test/common"
. "zotregistry.dev/zot/v2/pkg/test/image-utils"
"zotregistry.dev/zot/v2/pkg/test/mocks"
)
@@ -611,6 +616,260 @@ func TestDedupeGeneratorErrors(t *testing.T) {
})
}
func TestDedupeTaskGeneratorRestoreComplete(t *testing.T) {
testLog := log.NewTestLogger()
Convey("OnRestoreComplete fires once after all restore tasks finish successfully", t, func(c C) {
digest := godigest.FromString("blob1")
duplicateBlobs := []string{"/repo/blob1-a", "/repo/blob1-b"}
getNextCalls := 0
imgStore := &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo1"}, nil
},
GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) (
godigest.Digest, []string, error,
) {
getNextCalls++
if getNextCalls == 1 {
return digest, duplicateBlobs, nil
}
return "", nil, nil
},
RunDedupeForDigestFn: func(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string,
) error {
return nil
},
}
var (
callCountMutex sync.Mutex
callCount int
)
done := make(chan struct{})
generator := &common.DedupeTaskGenerator{
ImgStore: imgStore,
Dedupe: false,
Log: testLog,
OnRestoreComplete: func() {
callCountMutex.Lock()
callCount++
callCountMutex.Unlock()
close(done)
},
}
// first Next(): generates the restore task, pendingTaskCount becomes 1
task, err := generator.Next()
So(err, ShouldBeNil)
So(task, ShouldNotBeNil)
So(generator.IsDone(), ShouldBeFalse)
// running the task triggers onDone(), bringing pendingTaskCount back to 0;
// the run isn't marked done yet so this is a no-op for checkCompletion
err = task.DoWork(context.Background())
So(err, ShouldBeNil)
// second Next(): no more digests, marks the run as done and triggers checkCompletion
task, err = generator.Next()
So(err, ShouldBeNil)
So(task, ShouldBeNil)
So(generator.IsDone(), ShouldBeTrue)
// OnRestoreComplete is dispatched asynchronously
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for OnRestoreComplete")
}
callCountMutex.Lock()
So(callCount, ShouldEqual, 1)
callCountMutex.Unlock()
// Reset() starts a fresh run since this one completed with no pending tasks
generator.Reset()
So(generator.IsDone(), ShouldBeFalse)
})
Convey("Reset keeps the same run while a restore task is in-flight", t, func(c C) {
digest := godigest.FromString("blob2")
duplicateBlobs := []string{"/repo/blob2-a"}
getNextCalls := 0
imgStore := &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo1"}, nil
},
GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) (
godigest.Digest, []string, error,
) {
getNextCalls++
if getNextCalls == 1 {
return digest, duplicateBlobs, nil
}
return "", nil, nil
},
RunDedupeForDigestFn: func(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string,
) error {
return nil
},
}
var (
callCountMutex sync.Mutex
callCount int
)
done := make(chan struct{})
generator := &common.DedupeTaskGenerator{
ImgStore: imgStore,
Dedupe: false,
Log: testLog,
OnRestoreComplete: func() {
callCountMutex.Lock()
callCount++
callCountMutex.Unlock()
close(done)
},
}
// first Next(): generates the restore task, pendingTaskCount becomes 1, task not run yet
task, err := generator.Next()
So(err, ShouldBeNil)
So(task, ShouldNotBeNil)
// second Next(): no more digests, marks the run as done; checkCompletion is a no-op
// because the task above hasn't finished yet (pendingTaskCount == 1)
_, err = generator.Next()
So(err, ShouldBeNil)
So(generator.IsDone(), ShouldBeTrue)
// Reset() must keep the same run state, since OnRestoreComplete hasn't fired yet
generator.Reset()
So(generator.IsDone(), ShouldBeTrue)
// finishing the in-flight task now drives checkCompletion to fire OnRestoreComplete
err = task.DoWork(context.Background())
So(err, ShouldBeNil)
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for OnRestoreComplete")
}
callCountMutex.Lock()
So(callCount, ShouldEqual, 1)
callCountMutex.Unlock()
})
Convey("checkCompletion recovers if OnRestoreComplete panics", t, func(c C) {
digest := godigest.FromString("blob3")
duplicateBlobs := []string{"/repo/blob3-a"}
getNextCalls := 0
imgStore := &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo1"}, nil
},
GetNextDigestWithBlobPathsFn: func(repos []string, lastDigests []godigest.Digest) (
godigest.Digest, []string, error,
) {
getNextCalls++
if getNextCalls == 1 {
return digest, duplicateBlobs, nil
}
return "", nil, nil
},
RunDedupeForDigestFn: func(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string,
) error {
return nil
},
}
logBuf := tcommon.NewThreadSafeLogBuffer()
panicLog := log.NewLoggerWithWriter("debug", logBuf)
done := make(chan struct{})
generator := &common.DedupeTaskGenerator{
ImgStore: imgStore,
Dedupe: false,
Log: panicLog,
OnRestoreComplete: func() {
defer close(done)
panic("boom")
},
}
task, err := generator.Next()
So(err, ShouldBeNil)
So(task, ShouldNotBeNil)
err = task.DoWork(context.Background())
So(err, ShouldBeNil)
_, err = generator.Next()
So(err, ShouldBeNil)
So(generator.IsDone(), ShouldBeTrue)
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("timed out waiting for OnRestoreComplete")
}
for range 100 {
if strings.Contains(logBuf.String(), "panic in OnRestoreComplete") {
break
}
time.Sleep(10 * time.Millisecond)
}
So(logBuf.String(), ShouldContainSubstring, "panic in OnRestoreComplete")
})
Convey("getRun is safe when the first call races across goroutines", t, func(c C) {
generator := &common.DedupeTaskGenerator{
Log: log.NewTestLogger(),
}
const numGoroutines = 50
var wg sync.WaitGroup
start := make(chan struct{})
for range numGoroutines {
wg.Go(func() {
<-start
generator.IsDone()
})
}
close(start)
wg.Wait()
So(generator.IsDone(), ShouldBeFalse)
})
}
func TestPruneImageManifestsFromIndexMissingIndex(t *testing.T) {
log := log.NewTestLogger()
+11
View File
@@ -26,4 +26,15 @@ const (
S3StorageDriverName = "s3"
GCSStorageDriverName = "gcs"
LocalStorageDriverName = "local"
// DedupeRestoreCompleteMarker is written at the image store root when a full dedupe-restore
// pass has completed. Its presence means no deduped blobs remain, so subsequent startups
// with dedupe=false can skip the expensive per-digest restore scan. The marker is deleted
// whenever dedupe is re-enabled, so that the next dedupe→false transition reruns restore.
DedupeRestoreCompleteMarker = "_restore_complete"
// DedupeRestoreMarkerComplete is the content of DedupeRestoreCompleteMarker when a restore
// pass has completed successfully.
DedupeRestoreMarkerComplete = "1"
// DedupeRestoreMarkerInvalid is the content written to DedupeRestoreCompleteMarker to
// invalidate a previous completion, forcing the restore scan to run again.
DedupeRestoreMarkerInvalid = "0"
)
+94 -7
View File
@@ -2268,7 +2268,11 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D
// if we find a deduped blob, then copy original blob content to deduped one
if binfo.Size() == 0 {
// move content from original blob to deduped one
// Read the original blob content without holding the write lock - this can be a
// large S3 GET and must not stall concurrent push operations.
// Note: this buffers the whole blob in memory, which can spike memory usage for
// large layers when many restore tasks run concurrently. Consider streaming the
// copy instead in a follow-up; that refactor is heavier and riskier than this fix.
buf, err := is.storeDriver.ReadFile(originalBlob)
if err != nil {
is.log.Error().Err(err).Str("path", originalBlob).Str("component", "dedupe").
@@ -2277,7 +2281,33 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D
return err
}
_, err = is.storeDriver.WriteFile(blobPath, buf)
// Hold the write lock only for the actual blob write so that concurrent
// CheckBlob (read lock) and FinishBlobUpload (write lock) are not starved
// by the preceding slow S3 reads.
err = func() error {
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
// Re-check size inside the lock: another goroutine may have already
// restored or uploaded this blob between our Stat and Lock above.
recheck, serr := is.storeDriver.Stat(blobPath)
if serr == nil {
if recheck.Size() > 0 {
return nil
}
} else {
var pathNotFound driver.PathNotFoundError
if !errors.As(serr, &pathNotFound) {
return serr
}
}
_, err := is.storeDriver.WriteFile(blobPath, buf)
return err
}()
if err != nil {
return err
}
@@ -2293,12 +2323,12 @@ func (is *ImageStore) restoreDedupedBlobs(ctx context.Context, digest godigest.D
func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Digest, dedupe bool,
duplicateBlobs []string,
) error {
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
if dedupe {
var lockLatency time.Time
is.Lock(&lockLatency)
defer is.Unlock(&lockLatency)
return is.dedupeBlobs(ctx, digest, duplicateBlobs)
}
@@ -2306,12 +2336,69 @@ func (is *ImageStore) RunDedupeForDigest(ctx context.Context, digest godigest.Di
}
func (is *ImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler.Scheduler) {
markerPath := path.Join(is.rootDir, storageConstants.DedupeRestoreCompleteMarker)
if is.dedupe {
// Dedupe is active: remove the restore-complete marker so that a future dedupe→false
// transition knows it must run restore again.
if err := is.storeDriver.Delete(markerPath); err != nil {
var pathNotFound driver.PathNotFoundError
if !errors.As(err, &pathNotFound) {
is.log.Warn().Err(err).Str("component", "dedupe").
Msg("failed to remove restore-complete marker")
// Overwrite with invalid content so future dedupe=false startups won't skip restore.
if _, werr := is.storeDriver.WriteFile(markerPath,
[]byte(storageConstants.DedupeRestoreMarkerInvalid)); werr != nil {
is.log.Error().Err(werr).Str("component", "dedupe").
Msg("failed to invalidate restore-complete marker; stale marker may cause incorrect skip on next startup")
}
}
}
} else {
// Dedupe is disabled: skip the restore scan if a previous pass already completed.
// The marker is absent on first run or after dedupe was re-enabled, in which case
// we must run restore to handle any zero-size blobs left by prior deduplication.
if data, err := is.storeDriver.ReadFile(markerPath); err == nil {
content := strings.TrimSpace(string(data))
if content == storageConstants.DedupeRestoreMarkerComplete {
is.log.Info().Str("component", "dedupe").
Msg("restore-complete marker present, skipping dedupe restore scan")
return
}
is.log.Debug().Str("component", "dedupe").Str("content", content).
Msg("restore-complete marker present but not complete, continuing with dedupe restore scan")
} else {
var pathNotFound driver.PathNotFoundError
if !errors.As(err, &pathNotFound) {
is.log.Warn().Err(err).Str("component", "dedupe").
Msg("failed to check restore-complete marker; continuing with dedupe restore scan")
}
}
}
generator := &common.DedupeTaskGenerator{
ImgStore: is,
Dedupe: is.dedupe,
Log: is.log,
}
if !is.dedupe {
generator.OnRestoreComplete = func() {
if _, err := is.storeDriver.WriteFile(markerPath,
[]byte(storageConstants.DedupeRestoreMarkerComplete)); err != nil {
is.log.Error().Err(err).Str("component", "dedupe").
Msg("failed to write restore-complete marker")
} else {
is.log.Info().Str("component", "dedupe").
Msg("restore-complete marker written; future startups will skip the restore scan")
}
}
}
sch.SubmitGenerator(generator, interval, scheduler.MediumPriority)
}
+302 -1
View File
@@ -14,10 +14,12 @@ import (
"os"
"path"
"strings"
"sync"
"syscall"
"testing"
"time"
storagedriver "github.com/distribution/distribution/v3/registry/storage/driver"
godigest "github.com/opencontainers/go-digest"
imeta "github.com/opencontainers/image-spec/specs-go"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
@@ -33,6 +35,7 @@ import (
"zotregistry.dev/zot/v2/pkg/storage/cache"
storageConstants "zotregistry.dev/zot/v2/pkg/storage/constants"
"zotregistry.dev/zot/v2/pkg/storage/gc"
"zotregistry.dev/zot/v2/pkg/storage/imagestore"
"zotregistry.dev/zot/v2/pkg/storage/local"
storageTypes "zotregistry.dev/zot/v2/pkg/storage/types"
test "zotregistry.dev/zot/v2/pkg/test/common"
@@ -59,7 +62,12 @@ var DeleteReferrers = config.ImageRetention{ //nolint: gochecknoglobals
},
}
var errCache = errors.New("new cache error")
var (
errCache = errors.New("new cache error")
errRecheckStatFailed = errors.New("recheck stat failed")
errMarkerDeleteFailed = errors.New("delete failed")
errMarkerWriteFailed = errors.New("write failed")
)
func runAndGetScheduler() *scheduler.Scheduler {
log := zlog.NewTestLogger()
@@ -1560,6 +1568,299 @@ func TestDedupeLinks(t *testing.T) {
}
}
func TestDedupeRestoreCompleteMarker(t *testing.T) {
waitForMarker := func(t *testing.T, markerPath, expected string) {
t.Helper()
for range 100 {
data, err := os.ReadFile(markerPath)
if err == nil && strings.TrimSpace(string(data)) == expected {
return
}
time.Sleep(50 * time.Millisecond)
}
t.Fatalf("timed out waiting for marker %q at %s", expected, markerPath)
}
Convey("Restore-complete marker lifecycle", t, func(c C) {
dir := t.TempDir()
logBuf := test.NewThreadSafeLogBuffer()
log := zlog.NewLoggerWithWriter("debug", logBuf)
metrics := monitoring.NewMetricsServer(false, log)
markerPath := path.Join(dir, storageConstants.DedupeRestoreCompleteMarker)
Convey("dedupe=false on an empty store writes the restore-complete marker", func() {
imgStore := local.NewImageStore(dir, false, true, log, metrics, nil, nil, nil, nil)
taskScheduler := runAndGetScheduler()
defer taskScheduler.Shutdown()
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
waitForMarker(t, markerPath, storageConstants.DedupeRestoreMarkerComplete)
taskScheduler.Shutdown()
So(logBuf.String(), ShouldContainSubstring, "restore-complete marker written")
})
Convey("dedupe=false with a complete marker skips the restore scan", func() {
err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerComplete),
storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
imgStore := local.NewImageStore(dir, false, true, log, metrics, nil, nil, nil, nil)
taskScheduler := runAndGetScheduler()
defer taskScheduler.Shutdown()
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
taskScheduler.Shutdown()
So(logBuf.String(), ShouldContainSubstring, "skipping dedupe restore scan")
data, err := os.ReadFile(markerPath)
So(err, ShouldBeNil)
So(strings.TrimSpace(string(data)), ShouldEqual, storageConstants.DedupeRestoreMarkerComplete)
})
Convey("dedupe=false with an incomplete marker continues the restore scan", func() {
err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerInvalid),
storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
imgStore := local.NewImageStore(dir, false, true, log, metrics, nil, nil, nil, nil)
taskScheduler := runAndGetScheduler()
defer taskScheduler.Shutdown()
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
waitForMarker(t, markerPath, storageConstants.DedupeRestoreMarkerComplete)
taskScheduler.Shutdown()
So(logBuf.String(), ShouldContainSubstring, "restore-complete marker present but not complete")
})
Convey("dedupe=true removes an existing restore-complete marker", func() {
err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerComplete),
storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, nil, nil, nil)
taskScheduler := runAndGetScheduler()
defer taskScheduler.Shutdown()
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
taskScheduler.Shutdown()
_, err = os.Stat(markerPath)
So(errors.Is(err, fs.ErrNotExist), ShouldBeTrue)
})
})
}
// recheckDriver wraps local.Driver and, for a single target path, returns a
// custom result on the second Stat call to simulate the state observed by
// restoreDedupedBlobs' re-check under the write lock.
type recheckDriver struct {
*local.Driver
targetPath string
mu sync.Mutex
statCalls int
recheckSize int64
recheckErr error
}
func (d *recheckDriver) Stat(path string) (storagedriver.FileInfo, error) {
if path == d.targetPath {
d.mu.Lock()
d.statCalls++
call := d.statCalls
d.mu.Unlock()
if call == 2 {
if d.recheckErr != nil {
return nil, d.recheckErr
}
return storagedriver.FileInfoInternal{
FileInfoFields: storagedriver.FileInfoFields{Path: path, Size: d.recheckSize},
}, nil
}
}
return d.Driver.Stat(path)
}
func TestRestoreDedupedBlobsRecheck(t *testing.T) {
Convey("restoreDedupedBlobs re-checks blob size under the write lock", t, func() {
log := zlog.NewTestLogger()
metrics := monitoring.NewMetricsServer(false, log)
Convey("a blob restored by a concurrent task is left untouched", func() {
dir := t.TempDir()
blobsDir := path.Join(dir, "test", "blobs", "sha256")
err := os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms)
So(err, ShouldBeNil)
content := []byte("original content")
digest := godigest.FromBytes(content)
originalPath := path.Join(blobsDir, digest.Encoded())
err = os.WriteFile(originalPath, content, storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
dupPath := path.Join(blobsDir, "duplicate")
err = os.WriteFile(dupPath, []byte{}, storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
testDriver := &recheckDriver{
Driver: local.New(true),
targetPath: dupPath,
recheckSize: int64(len(content)),
}
imgStore := imagestore.NewImageStore(dir, dir, false, true, log, metrics, nil, testDriver, nil, nil, nil)
err = imgStore.RunDedupeForDigest(context.Background(), digest, false, []string{originalPath, dupPath})
So(err, ShouldBeNil)
data, err := os.ReadFile(dupPath)
So(err, ShouldBeNil)
So(data, ShouldBeEmpty)
})
Convey("a stat error on the re-check is propagated", func() {
dir := t.TempDir()
blobsDir := path.Join(dir, "test", "blobs", "sha256")
err := os.MkdirAll(blobsDir, storageConstants.DefaultDirPerms)
So(err, ShouldBeNil)
content := []byte("original content")
digest := godigest.FromBytes(content)
originalPath := path.Join(blobsDir, digest.Encoded())
err = os.WriteFile(originalPath, content, storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
dupPath := path.Join(blobsDir, "duplicate")
err = os.WriteFile(dupPath, []byte{}, storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
testDriver := &recheckDriver{
Driver: local.New(true),
targetPath: dupPath,
recheckErr: errRecheckStatFailed,
}
imgStore := imagestore.NewImageStore(dir, dir, false, true, log, metrics, nil, testDriver, nil, nil, nil)
err = imgStore.RunDedupeForDigest(context.Background(), digest, false, []string{originalPath, dupPath})
So(err, ShouldEqual, errRecheckStatFailed)
})
})
}
// errMarkerDriver wraps local.Driver and injects errors for operations on the
// dedupe restore-complete marker file.
type errMarkerDriver struct {
*local.Driver
deleteErr error
writeErr error
}
func (d *errMarkerDriver) Delete(path string) error {
if d.deleteErr != nil && strings.Contains(path, storageConstants.DedupeRestoreCompleteMarker) {
return d.deleteErr
}
return d.Driver.Delete(path)
}
func (d *errMarkerDriver) WriteFile(filepath string, content []byte) (int, error) {
if d.writeErr != nil && strings.Contains(filepath, storageConstants.DedupeRestoreCompleteMarker) {
return 0, d.writeErr
}
return d.Driver.WriteFile(filepath, content)
}
func TestRunDedupeBlobsMarkerDeleteError(t *testing.T) {
Convey("RunDedupeBlobs handles errors removing the restore-complete marker", t, func() {
dir := t.TempDir()
markerPath := path.Join(dir, storageConstants.DedupeRestoreCompleteMarker)
err := os.WriteFile(markerPath, []byte(storageConstants.DedupeRestoreMarkerComplete),
storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
Convey("falls back to writing an invalid marker when delete fails", func() {
logBuf := test.NewThreadSafeLogBuffer()
log := zlog.NewLoggerWithWriter("debug", logBuf)
metrics := monitoring.NewMetricsServer(false, log)
testDriver := &errMarkerDriver{
Driver: local.New(true),
deleteErr: errMarkerDeleteFailed,
}
imgStore := imagestore.NewImageStore(dir, dir, true, true, log, metrics, nil, testDriver, nil, nil, nil)
taskScheduler := runAndGetScheduler()
defer taskScheduler.Shutdown()
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
So(logBuf.String(), ShouldContainSubstring, "failed to remove restore-complete marker")
data, err := os.ReadFile(markerPath)
So(err, ShouldBeNil)
So(strings.TrimSpace(string(data)), ShouldEqual, storageConstants.DedupeRestoreMarkerInvalid)
})
Convey("logs when even the invalid-marker write fails", func() {
logBuf := test.NewThreadSafeLogBuffer()
log := zlog.NewLoggerWithWriter("debug", logBuf)
metrics := monitoring.NewMetricsServer(false, log)
testDriver := &errMarkerDriver{
Driver: local.New(true),
deleteErr: errMarkerDeleteFailed,
writeErr: errMarkerWriteFailed,
}
imgStore := imagestore.NewImageStore(dir, dir, true, true, log, metrics, nil, testDriver, nil, nil, nil)
taskScheduler := runAndGetScheduler()
defer taskScheduler.Shutdown()
imgStore.RunDedupeBlobs(time.Duration(0), taskScheduler)
So(logBuf.String(), ShouldContainSubstring, "failed to remove restore-complete marker")
So(logBuf.String(), ShouldContainSubstring, "failed to invalidate restore-complete marker")
})
})
}
func TestDedupe(t *testing.T) {
Convey("Dedupe", t, func(c C) {
Convey("Nil ImageStore", func() {