fix: GetNextRepository to use a list already scanned repositories as input (#3230)

Using just the last repository is not enough as in the case when it is deleted
(either by GC or some other way), GetNextRepository returns empty string
causing the generator to be marked completed without any errors.

An alternative would have been to start over from the first repository,
but this can take hours if multiple repositories need to be deleted,
not to mention the processing power and I/O and S3 load this could take.

Signed-off-by: Andrei Aaron <aaaron@luxoft.com>
This commit is contained in:
Andrei Aaron
2025-07-04 19:12:18 +03:00
committed by GitHub
parent e8ac21c001
commit 80081bb012
10 changed files with 244 additions and 75 deletions
+25 -12
View File
@@ -988,15 +988,28 @@ func (dt *dedupeTask) Name() string {
return "DedupeTask"
}
func NewStorageMetricsInitGenerator(imgStore storageTypes.ImageStore, metrics monitoring.MetricServer, log zlog.Logger,
) *StorageMetricsInitGenerator {
processedRepos := make(map[string]struct{})
return &StorageMetricsInitGenerator{
ImgStore: imgStore,
Metrics: metrics,
Log: log,
processedRepos: processedRepos,
MaxDelay: 15, //nolint:mnd
}
}
type StorageMetricsInitGenerator struct {
ImgStore storageTypes.ImageStore
done bool
Metrics monitoring.MetricServer
lastRepo string
nextRun time.Time
rand *rand.Rand
Log zlog.Logger
MaxDelay int
ImgStore storageTypes.ImageStore
done bool
Metrics monitoring.MetricServer
processedRepos map[string]struct{}
nextRun time.Time
rand *rand.Rand
Log zlog.Logger
MaxDelay int
}
func (gen *StorageMetricsInitGenerator) Name() string {
@@ -1004,7 +1017,7 @@ func (gen *StorageMetricsInitGenerator) Name() string {
}
func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) {
if gen.lastRepo == "" && gen.nextRun.IsZero() {
if len(gen.processedRepos) == 0 && gen.nextRun.IsZero() {
gen.rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint: gosec
}
@@ -1012,7 +1025,7 @@ func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) {
gen.nextRun = time.Now().Add(time.Duration(delay) * time.Second)
repo, err := gen.ImgStore.GetNextRepository(gen.lastRepo)
repo, err := gen.ImgStore.GetNextRepository(gen.processedRepos)
if err != nil {
return nil, err
}
@@ -1025,7 +1038,7 @@ func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) {
return nil, nil //nolint:nilnil
}
gen.lastRepo = repo
gen.processedRepos[repo] = struct{}{}
return NewStorageMetricsTask(gen.ImgStore, gen.Metrics, repo, gen.Log), nil
}
@@ -1039,7 +1052,7 @@ func (gen *StorageMetricsInitGenerator) IsReady() bool {
}
func (gen *StorageMetricsInitGenerator) Reset() {
gen.lastRepo = ""
gen.processedRepos = make(map[string]struct{})
gen.done = false
gen.nextRun = time.Time{}
}
+15 -12
View File
@@ -67,9 +67,12 @@ CleanImageStorePeriodically runs a periodic garbage collect on the ImageStore pr
given an interval and a Scheduler.
*/
func (gc GarbageCollect) CleanImageStorePeriodically(interval time.Duration, sch *scheduler.Scheduler) {
processedRepos := make(map[string]struct{})
generator := &GCTaskGenerator{
imgStore: gc.imgStore,
gc: gc,
imgStore: gc.imgStore,
gc: gc,
processedRepos: processedRepos,
}
sch.SubmitGenerator(generator, interval, scheduler.MediumPriority)
@@ -798,12 +801,12 @@ and it will execute garbage collection for each repository by creating a task
for each repository and pushing it to the task scheduler.
*/
type GCTaskGenerator struct {
imgStore types.ImageStore
gc GarbageCollect
lastRepo string
nextRun time.Time
done bool
rand *rand.Rand
imgStore types.ImageStore
gc GarbageCollect
processedRepos map[string]struct{}
nextRun time.Time
done bool
rand *rand.Rand
}
func (gen *GCTaskGenerator) getRandomDelay() int {
@@ -817,7 +820,7 @@ func (gen *GCTaskGenerator) Name() string {
}
func (gen *GCTaskGenerator) Next() (scheduler.Task, error) {
if gen.lastRepo == "" && gen.nextRun.IsZero() {
if len(gen.processedRepos) == 0 && gen.nextRun.IsZero() {
gen.rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint: gosec
}
@@ -825,7 +828,7 @@ func (gen *GCTaskGenerator) Next() (scheduler.Task, error) {
gen.nextRun = time.Now().Add(time.Duration(delay) * time.Second)
repo, err := gen.imgStore.GetNextRepository(gen.lastRepo)
repo, err := gen.imgStore.GetNextRepository(gen.processedRepos)
if err != nil {
return nil, err
}
@@ -836,7 +839,7 @@ func (gen *GCTaskGenerator) Next() (scheduler.Task, error) {
return nil, nil //nolint:nilnil
}
gen.lastRepo = repo
gen.processedRepos[repo] = struct{}{}
return NewGCTask(gen.imgStore, gen.gc, repo), nil
}
@@ -850,7 +853,7 @@ func (gen *GCTaskGenerator) IsReady() bool {
}
func (gen *GCTaskGenerator) Reset() {
gen.lastRepo = ""
gen.processedRepos = make(map[string]struct{})
gen.done = false
gen.nextRun = time.Time{}
}
+134
View File
@@ -563,6 +563,73 @@ func TestGarbageCollectAndRetentionMetaDB(t *testing.T) {
So(repos, ShouldContain, "retention")
})
Convey("gc all tags, untagged, and afterwards referrers using GetNextRepository()", func() {
gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{
Delay: 1 * time.Millisecond,
ImageRetention: config.ImageRetention{
Delay: 1 * time.Millisecond,
Policies: []config.RetentionPolicy{
{
Repositories: []string{"gc-test1", "gc-test3"},
DeleteReferrers: true,
DeleteUntagged: &trueVal,
KeepTags: []config.KeepTagsPolicy{
{
Patterns: []string{"v1"}, // should not match any tag
},
},
},
},
},
}, audit, log)
processedRepos := make(map[string]struct{})
expectedRepos := []string{"gc-test1", "gc-test2", "gc-test3", "gc-test4", "retention"}
for i := range 10 {
t.Logf("index %d, processed repos %v", i, processedRepos)
// we need to check if GetNextRepository returns each repository just once, and empty string afterwards
repo, err := imgStore.GetNextRepository(processedRepos)
t.Logf("index %d, repo '%s'", i, repo)
So(err, ShouldBeNil)
if i >= 5 {
So(repo, ShouldEqual, "")
continue
} else {
So(repo, ShouldEqual, expectedRepos[i])
}
processedRepos[repo] = struct{}{}
// run cleanRepo, this should not impact the list of calls necessary for
// GetNextRepository to iterate through every repo
err = gc.CleanRepo(ctx, repo)
So(err, ShouldBeNil)
}
// verify one more time the returned values
So(len(processedRepos), ShouldEqual, len(expectedRepos))
for _, repo := range expectedRepos {
So(processedRepos, ShouldContainKey, repo)
}
_, _, _, err = imgStore.GetImageManifest("gc-test1", gcUntagged1.DigestStr())
So(err, ShouldNotBeNil)
// now repos should get gc'ed
repos, err := imgStore.GetRepositories()
So(err, ShouldBeNil)
So(repos, ShouldNotContain, "gc-test1")
So(repos, ShouldContain, "gc-test2")
So(repos, ShouldNotContain, "gc-test3")
So(repos, ShouldContain, "gc-test4")
So(repos, ShouldContain, "retention")
})
Convey("gc with dry-run all tags, untagged, and afterwards referrers", func() {
gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{
Delay: 1 * time.Millisecond,
@@ -1924,6 +1991,73 @@ func TestGarbageCollectAndRetentionNoMetaDB(t *testing.T) {
So(repos, ShouldContain, "retention")
})
Convey("gc all tags, untagged, and afterwards referrers using GetNextRepository()", func() {
gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{
Delay: 1 * time.Millisecond,
ImageRetention: config.ImageRetention{
Delay: 1 * time.Millisecond,
Policies: []config.RetentionPolicy{
{
Repositories: []string{"gc-test1", "gc-test3"},
DeleteReferrers: true,
DeleteUntagged: &trueVal,
KeepTags: []config.KeepTagsPolicy{
{
Patterns: []string{"v1"}, // should not match any tag
},
},
},
},
},
}, audit, log)
processedRepos := make(map[string]struct{})
expectedRepos := []string{"gc-test1", "gc-test2", "gc-test3", "gc-test4", "retention"}
for i := range 10 {
t.Logf("index %d, processed repos %v", i, processedRepos)
// we need to check if GetNextRepository returns each repository just once, and empty string afterwards
repo, err := imgStore.GetNextRepository(processedRepos)
t.Logf("index %d, repo '%s'", i, repo)
So(err, ShouldBeNil)
if i >= 5 {
So(repo, ShouldEqual, "")
continue
} else {
So(repo, ShouldEqual, expectedRepos[i])
}
processedRepos[repo] = struct{}{}
// run cleanRepo, this should not impact the list of calls necessary for
// GetNextRepository to iterate through every repo
err = gc.CleanRepo(ctx, repo)
So(err, ShouldBeNil)
}
// verify one more time the returned values
So(len(processedRepos), ShouldEqual, len(expectedRepos))
for _, repo := range expectedRepos {
So(processedRepos, ShouldContainKey, repo)
}
_, _, _, err = imgStore.GetImageManifest("gc-test1", gcUntagged1.DigestStr())
So(err, ShouldNotBeNil)
// now repos should get gc'ed
repos, err := imgStore.GetRepositories()
So(err, ShouldBeNil)
So(repos, ShouldNotContain, "gc-test1")
So(repos, ShouldContain, "gc-test2")
So(repos, ShouldNotContain, "gc-test3")
So(repos, ShouldContain, "gc-test4")
So(repos, ShouldContain, "retention")
})
Convey("gc with dry-run all tags, untagged, and afterwards referrers", func() {
gc := gc.NewGarbageCollect(imgStore, metaDB, gc.Options{
Delay: 1 * time.Millisecond,
+8 -24
View File
@@ -401,7 +401,7 @@ func (is *ImageStore) GetRepositories() ([]string, error) {
}
// GetNextRepository returns next repository under this store.
func (is *ImageStore) GetNextRepository(repo string) (string, error) {
func (is *ImageStore) GetNextRepository(processedRepos map[string]struct{}) (string, error) {
var lockLatency time.Time
dir := is.rootDir
@@ -422,7 +422,6 @@ func (is *ImageStore) GetNextRepository(repo string) (string, error) {
return "", err
}
found := false
store := ""
err = is.storeDriver.Walk(dir, func(fileInfo driver.FileInfo) error {
if !fileInfo.IsDir() {
@@ -434,28 +433,18 @@ func (is *ImageStore) GetNextRepository(repo string) (string, error) {
return nil //nolint:nilerr // ignore paths not relative to root dir
}
if _, ok := processedRepos[rel]; ok {
return nil // repo already processed
}
ok, err := is.ValidateRepo(rel)
if !ok || err != nil {
return nil //nolint:nilerr // ignore invalid repos
}
if repo == "" && ok && err == nil {
store = rel
store = rel
return io.EOF
}
if found {
store = rel
return io.EOF
}
if rel == repo {
found = true
}
return nil
return io.EOF
})
driverErr := &driver.Error{}
@@ -2160,12 +2149,7 @@ func (is *ImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler.Sche
}
func (is *ImageStore) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) {
generator := &common.StorageMetricsInitGenerator{
ImgStore: is,
Metrics: is.metrics,
Log: is.log,
MaxDelay: 15, //nolint:mnd
}
generator := common.NewStorageMetricsInitGenerator(is, is.metrics, is.log)
sch.SubmitGenerator(generator, interval, scheduler.HighPriority)
}
+29 -5
View File
@@ -2929,6 +2929,7 @@ func TestGetNextRepository(t *testing.T) {
imgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver, nil, nil)
firstRepoName := "repo1"
secondRepoName := "repo2"
missingRepoName := "repo3"
srcStorageCtlr := storage.StoreController{DefaultStore: imgStore}
image := CreateDefaultImage()
@@ -2946,21 +2947,44 @@ func TestGetNextRepository(t *testing.T) {
}
Convey("Return first repository", t, func() {
firstRepo, err := imgStore.GetNextRepository("")
So(firstRepo, ShouldEqual, firstRepoName)
processedRepos := make(map[string]struct{}, 0)
repo, err := imgStore.GetNextRepository(processedRepos)
So(repo, ShouldEqual, firstRepoName)
So(err, ShouldBeNil)
})
Convey("Return second repository", t, func() {
secondRepo, err := imgStore.GetNextRepository(firstRepoName)
So(secondRepo, ShouldEqual, secondRepoName)
processedRepos := make(map[string]struct{}, 0)
processedRepos[firstRepoName] = struct{}{}
repo, err := imgStore.GetNextRepository(processedRepos)
So(repo, ShouldEqual, secondRepoName)
So(err, ShouldBeNil)
})
Convey("Return valid repository for missing", t, func() {
processedRepos := make(map[string]struct{}, 0)
processedRepos[missingRepoName] = struct{}{}
processedRepos[firstRepoName] = struct{}{}
repo, err := imgStore.GetNextRepository(processedRepos)
So(repo, ShouldEqual, secondRepoName)
So(err, ShouldBeNil)
})
Convey("Return empty repository if all were processed", t, func() {
processedRepos := make(map[string]struct{}, 0)
processedRepos[firstRepoName] = struct{}{}
processedRepos[secondRepoName] = struct{}{}
repo, err := imgStore.GetNextRepository(processedRepos)
So(repo, ShouldEqual, "")
So(err, ShouldBeNil)
})
Convey("Return error", t, func() {
processedRepos := make(map[string]struct{}, 0)
processedRepos[firstRepoName] = struct{}{}
err := os.Chmod(imgStore.RootDir(), 0o000)
So(err, ShouldBeNil)
_, err = imgStore.GetNextRepository(firstRepoName)
_, err = imgStore.GetNextRepository(processedRepos)
So(err, ShouldNotBeNil)
err = os.Chmod(imgStore.RootDir(), 0o755)
So(err, ShouldBeNil)
+3 -1
View File
@@ -2245,7 +2245,9 @@ func TestNextRepositoryMockStoreDriver(t *testing.T) {
},
})
nextRepository, err := imgStore.GetNextRepository("testRepo")
processedRepos := make(map[string]struct{}, 0)
processedRepos["testRepo"] = struct{}{}
nextRepository, err := imgStore.GetNextRepository(processedRepos)
So(err, ShouldBeNil)
So(nextRepository, ShouldEqual, "")
})
+1 -1
View File
@@ -31,7 +31,7 @@ type ImageStore interface { //nolint:interfacebloat
InitRepo(name string) error
ValidateRepo(name string) (bool, error)
GetRepositories() ([]string, error)
GetNextRepository(repo string) (string, error)
GetNextRepository(processedRepos map[string]struct{}) (string, error)
GetNextRepositories(repo string, maxEntries int, fn FilterRepoFunc) ([]string, bool, error)
GetImageTags(repo string) ([]string, error)
GetImageManifest(repo, reference string) ([]byte, godigest.Digest, string, error)