From 80081bb012bf4fc9b638f3e0859fb6cabff12401 Mon Sep 17 00:00:00 2001 From: Andrei Aaron Date: Fri, 4 Jul 2025 19:12:18 +0300 Subject: [PATCH] fix: GetNextRepository to use a list already scanned repositories as input (#3230) Using just the last repository is not enough as in the case when it is deleted (either by GC or some other way), GetNextRepository returns empty string causing the generator to be marked completed without any errors. An alternative would have been to start over from the first repository, but this can take hours if multiple repositories need to be deleted, not to mention the processing power and I/O and S3 load this could take. Signed-off-by: Andrei Aaron --- pkg/extensions/extension_scrub.go | 30 +++-- pkg/extensions/monitoring/monitoring_test.go | 13 +- pkg/storage/common/common.go | 37 +++-- pkg/storage/gc/gc.go | 27 ++-- pkg/storage/gc/gc_test.go | 134 +++++++++++++++++++ pkg/storage/imagestore/imagestore.go | 32 ++--- pkg/storage/local/local_test.go | 34 ++++- pkg/storage/s3/s3_test.go | 4 +- pkg/storage/types/types.go | 2 +- pkg/test/mocks/image_store_mock.go | 6 +- 10 files changed, 244 insertions(+), 75 deletions(-) diff --git a/pkg/extensions/extension_scrub.go b/pkg/extensions/extension_scrub.go index 37278261..8ef1127e 100644 --- a/pkg/extensions/extension_scrub.go +++ b/pkg/extensions/extension_scrub.go @@ -28,18 +28,26 @@ func EnableScrubExtension(config *config.Config, log log.Logger, storeController log.Warn().Msg("scrub interval set to too-short interval < 2h, changing scrub duration to 2 hours and continuing.") //nolint:lll // gofumpt conflicts with lll } + processedRepos := make(map[string]struct{}) + generator := &taskGenerator{ - imgStore: storeController.DefaultStore, - log: log, + imgStore: storeController.DefaultStore, + log: log, + processedRepos: processedRepos, } + sch.SubmitGenerator(generator, config.Extensions.Scrub.Interval, scheduler.LowPriority) if config.Storage.SubPaths != nil { for route := range config.Storage.SubPaths { + processedRepos := make(map[string]struct{}) + generator := &taskGenerator{ - imgStore: storeController.SubStore[route], - log: log, + imgStore: storeController.SubStore[route], + log: log, + processedRepos: processedRepos, } + sch.SubmitGenerator(generator, config.Extensions.Scrub.Interval, scheduler.LowPriority) } } @@ -49,10 +57,10 @@ func EnableScrubExtension(config *config.Config, log log.Logger, storeController } type taskGenerator struct { - imgStore storageTypes.ImageStore - log log.Logger - lastRepo string - done bool + imgStore storageTypes.ImageStore + log log.Logger + processedRepos map[string]struct{} + done bool } func (gen *taskGenerator) Name() string { @@ -60,7 +68,7 @@ func (gen *taskGenerator) Name() string { } func (gen *taskGenerator) Next() (scheduler.Task, error) { - repo, err := gen.imgStore.GetNextRepository(gen.lastRepo) + repo, err := gen.imgStore.GetNextRepository(gen.processedRepos) if err != nil { return nil, err } @@ -71,7 +79,7 @@ func (gen *taskGenerator) Next() (scheduler.Task, error) { return nil, nil //nolint:nilnil } - gen.lastRepo = repo + gen.processedRepos[repo] = struct{}{} return scrub.NewTask(gen.imgStore, repo, gen.log), nil } @@ -85,6 +93,6 @@ func (gen *taskGenerator) IsReady() bool { } func (gen *taskGenerator) Reset() { - gen.lastRepo = "" + gen.processedRepos = make(map[string]struct{}) gen.done = false } diff --git a/pkg/extensions/monitoring/monitoring_test.go b/pkg/extensions/monitoring/monitoring_test.go index 64469834..cd68e90f 100644 --- a/pkg/extensions/monitoring/monitoring_test.go +++ b/pkg/extensions/monitoring/monitoring_test.go @@ -467,12 +467,13 @@ func TestPopulateStorageMetrics(t *testing.T) { sch := scheduler.NewScheduler(conf, metrics, ctlr.Log) sch.RunScheduler() - generator := &common.StorageMetricsInitGenerator{ - ImgStore: ctlr.StoreController.DefaultStore, - Metrics: ctlr.Metrics, - Log: ctlr.Log, - MaxDelay: 1, // maximum delay between jobs (each job computes repo's storage size) - } + generator := common.NewStorageMetricsInitGenerator( + ctlr.StoreController.DefaultStore, + ctlr.Metrics, + ctlr.Log, + ) + + generator.MaxDelay = 1 // maximum delay between jobs (each job computes repo's storage size) sch.SubmitGenerator(generator, time.Duration(0), scheduler.LowPriority) diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index 92d4a7bb..18113008 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -988,15 +988,28 @@ func (dt *dedupeTask) Name() string { return "DedupeTask" } +func NewStorageMetricsInitGenerator(imgStore storageTypes.ImageStore, metrics monitoring.MetricServer, log zlog.Logger, +) *StorageMetricsInitGenerator { + processedRepos := make(map[string]struct{}) + + return &StorageMetricsInitGenerator{ + ImgStore: imgStore, + Metrics: metrics, + Log: log, + processedRepos: processedRepos, + MaxDelay: 15, //nolint:mnd + } +} + type StorageMetricsInitGenerator struct { - ImgStore storageTypes.ImageStore - done bool - Metrics monitoring.MetricServer - lastRepo string - nextRun time.Time - rand *rand.Rand - Log zlog.Logger - MaxDelay int + ImgStore storageTypes.ImageStore + done bool + Metrics monitoring.MetricServer + processedRepos map[string]struct{} + nextRun time.Time + rand *rand.Rand + Log zlog.Logger + MaxDelay int } func (gen *StorageMetricsInitGenerator) Name() string { @@ -1004,7 +1017,7 @@ func (gen *StorageMetricsInitGenerator) Name() string { } func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) { - if gen.lastRepo == "" && gen.nextRun.IsZero() { + if len(gen.processedRepos) == 0 && gen.nextRun.IsZero() { gen.rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint: gosec } @@ -1012,7 +1025,7 @@ func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) { gen.nextRun = time.Now().Add(time.Duration(delay) * time.Second) - repo, err := gen.ImgStore.GetNextRepository(gen.lastRepo) + repo, err := gen.ImgStore.GetNextRepository(gen.processedRepos) if err != nil { return nil, err } @@ -1025,7 +1038,7 @@ func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) { return nil, nil //nolint:nilnil } - gen.lastRepo = repo + gen.processedRepos[repo] = struct{}{} return NewStorageMetricsTask(gen.ImgStore, gen.Metrics, repo, gen.Log), nil } @@ -1039,7 +1052,7 @@ func (gen *StorageMetricsInitGenerator) IsReady() bool { } func (gen *StorageMetricsInitGenerator) Reset() { - gen.lastRepo = "" + gen.processedRepos = make(map[string]struct{}) gen.done = false gen.nextRun = time.Time{} } diff --git a/pkg/storage/gc/gc.go b/pkg/storage/gc/gc.go index c9638640..c578c766 100644 --- a/pkg/storage/gc/gc.go +++ b/pkg/storage/gc/gc.go @@ -67,9 +67,12 @@ CleanImageStorePeriodically runs a periodic garbage collect on the ImageStore pr given an interval and a Scheduler. */ func (gc GarbageCollect) CleanImageStorePeriodically(interval time.Duration, sch *scheduler.Scheduler) { + processedRepos := make(map[string]struct{}) + generator := &GCTaskGenerator{ - imgStore: gc.imgStore, - gc: gc, + imgStore: gc.imgStore, + gc: gc, + processedRepos: processedRepos, } sch.SubmitGenerator(generator, interval, scheduler.MediumPriority) @@ -798,12 +801,12 @@ and it will execute garbage collection for each repository by creating a task for each repository and pushing it to the task scheduler. */ type GCTaskGenerator struct { - imgStore types.ImageStore - gc GarbageCollect - lastRepo string - nextRun time.Time - done bool - rand *rand.Rand + imgStore types.ImageStore + gc GarbageCollect + processedRepos map[string]struct{} + nextRun time.Time + done bool + rand *rand.Rand } func (gen *GCTaskGenerator) getRandomDelay() int { @@ -817,7 +820,7 @@ func (gen *GCTaskGenerator) Name() string { } func (gen *GCTaskGenerator) Next() (scheduler.Task, error) { - if gen.lastRepo == "" && gen.nextRun.IsZero() { + if len(gen.processedRepos) == 0 && gen.nextRun.IsZero() { gen.rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint: gosec } @@ -825,7 +828,7 @@ func (gen *GCTaskGenerator) Next() (scheduler.Task, error) { gen.nextRun = time.Now().Add(time.Duration(delay) * time.Second) - repo, err := gen.imgStore.GetNextRepository(gen.lastRepo) + repo, err := gen.imgStore.GetNextRepository(gen.processedRepos) if err != nil { return nil, err } @@ -836,7 +839,7 @@ func (gen *GCTaskGenerator) Next() (scheduler.Task, error) { return nil, nil //nolint:nilnil } - gen.lastRepo = repo + gen.processedRepos[repo] = struct{}{} return NewGCTask(gen.imgStore, gen.gc, repo), nil } @@ -850,7 +853,7 @@ func (gen *GCTaskGenerator) IsReady() bool { } func (gen *GCTaskGenerator) Reset() { - gen.lastRepo = "" + gen.processedRepos = make(map[string]struct{}) gen.done = false gen.nextRun = time.Time{} } diff --git a/pkg/storage/gc/gc_test.go b/pkg/storage/gc/gc_test.go index e02e7d31..7394a19e 100644 --- a/pkg/storage/gc/gc_test.go +++ b/pkg/storage/gc/gc_test.go @@ -563,6 +563,73 @@ func TestGarbageCollectAndRetentionMetaDB(t *testing.T) { So(repos, ShouldContain, "retention") }) + Convey("gc all tags, untagged, and afterwards referrers using GetNextRepository()", func() { + gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{ + Delay: 1 * time.Millisecond, + ImageRetention: config.ImageRetention{ + Delay: 1 * time.Millisecond, + Policies: []config.RetentionPolicy{ + { + Repositories: []string{"gc-test1", "gc-test3"}, + DeleteReferrers: true, + DeleteUntagged: &trueVal, + KeepTags: []config.KeepTagsPolicy{ + { + Patterns: []string{"v1"}, // should not match any tag + }, + }, + }, + }, + }, + }, audit, log) + + processedRepos := make(map[string]struct{}) + expectedRepos := []string{"gc-test1", "gc-test2", "gc-test3", "gc-test4", "retention"} + + for i := range 10 { + t.Logf("index %d, processed repos %v", i, processedRepos) + + // we need to check if GetNextRepository returns each repository just once, and empty string afterwards + repo, err := imgStore.GetNextRepository(processedRepos) + t.Logf("index %d, repo '%s'", i, repo) + So(err, ShouldBeNil) + + if i >= 5 { + So(repo, ShouldEqual, "") + + continue + } else { + So(repo, ShouldEqual, expectedRepos[i]) + } + + processedRepos[repo] = struct{}{} + + // run cleanRepo, this should not impact the list of calls necessary for + // GetNextRepository to iterate through every repo + err = gc.CleanRepo(ctx, repo) + So(err, ShouldBeNil) + } + + // verify one more time the returned values + So(len(processedRepos), ShouldEqual, len(expectedRepos)) + + for _, repo := range expectedRepos { + So(processedRepos, ShouldContainKey, repo) + } + + _, _, _, err = imgStore.GetImageManifest("gc-test1", gcUntagged1.DigestStr()) + So(err, ShouldNotBeNil) + + // now repos should get gc'ed + repos, err := imgStore.GetRepositories() + So(err, ShouldBeNil) + So(repos, ShouldNotContain, "gc-test1") + So(repos, ShouldContain, "gc-test2") + So(repos, ShouldNotContain, "gc-test3") + So(repos, ShouldContain, "gc-test4") + So(repos, ShouldContain, "retention") + }) + Convey("gc with dry-run all tags, untagged, and afterwards referrers", func() { gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{ Delay: 1 * time.Millisecond, @@ -1924,6 +1991,73 @@ func TestGarbageCollectAndRetentionNoMetaDB(t *testing.T) { So(repos, ShouldContain, "retention") }) + Convey("gc all tags, untagged, and afterwards referrers using GetNextRepository()", func() { + gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{ + Delay: 1 * time.Millisecond, + ImageRetention: config.ImageRetention{ + Delay: 1 * time.Millisecond, + Policies: []config.RetentionPolicy{ + { + Repositories: []string{"gc-test1", "gc-test3"}, + DeleteReferrers: true, + DeleteUntagged: &trueVal, + KeepTags: []config.KeepTagsPolicy{ + { + Patterns: []string{"v1"}, // should not match any tag + }, + }, + }, + }, + }, + }, audit, log) + + processedRepos := make(map[string]struct{}) + expectedRepos := []string{"gc-test1", "gc-test2", "gc-test3", "gc-test4", "retention"} + + for i := range 10 { + t.Logf("index %d, processed repos %v", i, processedRepos) + + // we need to check if GetNextRepository returns each repository just once, and empty string afterwards + repo, err := imgStore.GetNextRepository(processedRepos) + t.Logf("index %d, repo '%s'", i, repo) + So(err, ShouldBeNil) + + if i >= 5 { + So(repo, ShouldEqual, "") + + continue + } else { + So(repo, ShouldEqual, expectedRepos[i]) + } + + processedRepos[repo] = struct{}{} + + // run cleanRepo, this should not impact the list of calls necessary for + // GetNextRepository to iterate through every repo + err = gc.CleanRepo(ctx, repo) + So(err, ShouldBeNil) + } + + // verify one more time the returned values + So(len(processedRepos), ShouldEqual, len(expectedRepos)) + + for _, repo := range expectedRepos { + So(processedRepos, ShouldContainKey, repo) + } + + _, _, _, err = imgStore.GetImageManifest("gc-test1", gcUntagged1.DigestStr()) + So(err, ShouldNotBeNil) + + // now repos should get gc'ed + repos, err := imgStore.GetRepositories() + So(err, ShouldBeNil) + So(repos, ShouldNotContain, "gc-test1") + So(repos, ShouldContain, "gc-test2") + So(repos, ShouldNotContain, "gc-test3") + So(repos, ShouldContain, "gc-test4") + So(repos, ShouldContain, "retention") + }) + Convey("gc with dry-run all tags, untagged, and afterwards referrers", func() { gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{ Delay: 1 * time.Millisecond, diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index a2b93ab5..4ae27ae4 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -401,7 +401,7 @@ func (is *ImageStore) GetRepositories() ([]string, error) { } // GetNextRepository returns next repository under this store. -func (is *ImageStore) GetNextRepository(repo string) (string, error) { +func (is *ImageStore) GetNextRepository(processedRepos map[string]struct{}) (string, error) { var lockLatency time.Time dir := is.rootDir @@ -422,7 +422,6 @@ func (is *ImageStore) GetNextRepository(repo string) (string, error) { return "", err } - found := false store := "" err = is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error { if !fileInfo.IsDir() { @@ -434,28 +433,18 @@ func (is *ImageStore) GetNextRepository(repo string) (string, error) { return nil //nolint:nilerr // ignore paths not relative to root dir } + if _, ok := processedRepos[rel]; ok { + return nil // repo already processed + } + ok, err := is.ValidateRepo(rel) if !ok || err != nil { return nil //nolint:nilerr // ignore invalid repos } - if repo == "" && ok && err == nil { - store = rel + store = rel - return io.EOF - } - - if found { - store = rel - - return io.EOF - } - - if rel == repo { - found = true - } - - return nil + return io.EOF }) driverErr := &driver.Error{} @@ -2160,12 +2149,7 @@ func (is *ImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler.Sche } func (is *ImageStore) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) { - generator := &common.StorageMetricsInitGenerator{ - ImgStore: is, - Metrics: is.metrics, - Log: is.log, - MaxDelay: 15, //nolint:mnd - } + generator := common.NewStorageMetricsInitGenerator(is, is.metrics, is.log) sch.SubmitGenerator(generator, interval, scheduler.HighPriority) } diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 5a02cffc..fb35ee30 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -2929,6 +2929,7 @@ func TestGetNextRepository(t *testing.T) { imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver, nil, nil) firstRepoName := "repo1" secondRepoName := "repo2" + missingRepoName := "repo3" srcStorageCtlr := storage.StoreController{DefaultStore: imgStore} image := CreateDefaultImage() @@ -2946,21 +2947,44 @@ func TestGetNextRepository(t *testing.T) { } Convey("Return first repository", t, func() { - firstRepo, err := imgStore.GetNextRepository("") - So(firstRepo, ShouldEqual, firstRepoName) + processedRepos := make(map[string]struct{}, 0) + repo, err := imgStore.GetNextRepository(processedRepos) + So(repo, ShouldEqual, firstRepoName) So(err, ShouldBeNil) }) Convey("Return second repository", t, func() { - secondRepo, err := imgStore.GetNextRepository(firstRepoName) - So(secondRepo, ShouldEqual, secondRepoName) + processedRepos := make(map[string]struct{}, 0) + processedRepos[firstRepoName] = struct{}{} + repo, err := imgStore.GetNextRepository(processedRepos) + So(repo, ShouldEqual, secondRepoName) + So(err, ShouldBeNil) + }) + + Convey("Return valid repository for missing", t, func() { + processedRepos := make(map[string]struct{}, 0) + processedRepos[missingRepoName] = struct{}{} + processedRepos[firstRepoName] = struct{}{} + repo, err := imgStore.GetNextRepository(processedRepos) + So(repo, ShouldEqual, secondRepoName) + So(err, ShouldBeNil) + }) + + Convey("Return empty repository if all were processed", t, func() { + processedRepos := make(map[string]struct{}, 0) + processedRepos[firstRepoName] = struct{}{} + processedRepos[secondRepoName] = struct{}{} + repo, err := imgStore.GetNextRepository(processedRepos) + So(repo, ShouldEqual, "") So(err, ShouldBeNil) }) Convey("Return error", t, func() { + processedRepos := make(map[string]struct{}, 0) + processedRepos[firstRepoName] = struct{}{} err := os.Chmod(imgStore.RootDir(), 0o000) So(err, ShouldBeNil) - _, err = imgStore.GetNextRepository(firstRepoName) + _, err = imgStore.GetNextRepository(processedRepos) So(err, ShouldNotBeNil) err = os.Chmod(imgStore.RootDir(), 0o755) So(err, ShouldBeNil) diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 185b7ee9..a1d4ba25 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -2245,7 +2245,9 @@ func TestNextRepositoryMockStoreDriver(t *testing.T) { }, }) - nextRepository, err := imgStore.GetNextRepository("testRepo") + processedRepos := make(map[string]struct{}, 0) + processedRepos["testRepo"] = struct{}{} + nextRepository, err := imgStore.GetNextRepository(processedRepos) So(err, ShouldBeNil) So(nextRepository, ShouldEqual, "") }) diff --git a/pkg/storage/types/types.go b/pkg/storage/types/types.go index edc06707..828a2080 100644 --- a/pkg/storage/types/types.go +++ b/pkg/storage/types/types.go @@ -31,7 +31,7 @@ type ImageStore interface { //nolint:interfacebloat InitRepo(name string) error ValidateRepo(name string) (bool, error) GetRepositories() ([]string, error) - GetNextRepository(repo string) (string, error) + GetNextRepository(processedRepos map[string]struct{}) (string, error) GetNextRepositories(repo string, maxEntries int, fn FilterRepoFunc) ([]string, bool, error) GetImageTags(repo string) ([]string, error) GetImageManifest(repo, reference string) ([]byte, godigest.Digest, string, error) diff --git a/pkg/test/mocks/image_store_mock.go b/pkg/test/mocks/image_store_mock.go index 5d125a93..53ca277d 100644 --- a/pkg/test/mocks/image_store_mock.go +++ b/pkg/test/mocks/image_store_mock.go @@ -19,7 +19,7 @@ type MockedImageStore struct { InitRepoFn func(name string) error ValidateRepoFn func(name string) (bool, error) GetRepositoriesFn func() ([]string, error) - GetNextRepositoryFn func(repo string) (string, error) + GetNextRepositoryFn func(processedRepos map[string]struct{}) (string, error) GetNextRepositoriesFn func(lastRepo string, maxEntries int, fn storageTypes.FilterRepoFunc) ([]string, bool, error) GetImageTagsFn func(repo string) ([]string, error) GetImageManifestFn func(repo string, reference string) ([]byte, godigest.Digest, string, error) @@ -132,9 +132,9 @@ func (is MockedImageStore) GetRepositories() ([]string, error) { return []string{}, nil } -func (is MockedImageStore) GetNextRepository(repo string) (string, error) { +func (is MockedImageStore) GetNextRepository(processedRepos map[string]struct{}) (string, error) { if is.GetNextRepositoryFn != nil { - return is.GetNextRepositoryFn(repo) + return is.GetNextRepositoryFn(processedRepos) } return "", nil