fix(meta): handle cases when substores are nested (#3598)

fix(meta): handle cases where repositories when substores are nested

Note this is a case of bad configuration: having multiple stores
in the same tree structure. Guard against it in parse.go.

Fix getAllRepos to prevent duplicate repositories in metaDB when substore
directories are nested under the default store root directory.

The fix processes substores first, then the default store, using a
map-based deduplication approach to skip repositories that have already
been added. This ensures that when both the default store and substores
contain repositories with the same name (e.g., when a substore is nested
within the default store), only one instance is added to the repository
list.

Add test TestNoDuplicateReposWithSubstoresAndNestedRepoNames to verify
the deduplication logic works correctly with nested substores.

Also update the other tests to avoid these issues in the future
this is not a vali configuration.

This is not the intended use case for substores, and it may have caused:
https://github.com/project-zot/zot/actions/runs/19665302669/job/56320640980

Signed-off-by: Andrei Aaron <andreifdaaron@gmail.com>
This commit is contained in:
Andrei Aaron
2025-11-27 20:11:52 +02:00
committed by GitHub
parent 69c3a0b99b
commit cc34a6f4ef
4 changed files with 101 additions and 22 deletions
+7 -5
View File
@@ -1138,6 +1138,7 @@ func TestRetentionCheckWithSubpaths(t *testing.T) {
port := GetFreePort()
testDir := t.TempDir()
storageDir := path.Join(testDir, "storage")
subpathStoreDir := path.Join(testDir, "storage2")
configFile := path.Join(testDir, "zot-config.json")
logFile := path.Join(testDir, "retention-check.log")
@@ -1165,7 +1166,7 @@ func TestRetentionCheckWithSubpaths(t *testing.T) {
},
"subPaths": {
"/a": {
"rootDirectory": "%s/a",
"rootDirectory": "%s",
"gc": true,
"gcDelay": %q,
"gcInterval": "1m",
@@ -1195,7 +1196,7 @@ func TestRetentionCheckWithSubpaths(t *testing.T) {
"level": "debug"
}
}
`, storageDir, testGCDelay, storageDir, testGCDelay, port)
`, storageDir, testGCDelay, subpathStoreDir, testGCDelay, port)
err := os.WriteFile(configFile, content, 0o600)
So(err, ShouldBeNil)
@@ -1208,7 +1209,7 @@ func TestRetentionCheckWithSubpaths(t *testing.T) {
metricsServer := monitoring.NewMetricsServer(false, zlog.NewLogger("info", ""))
imgStore := local.NewImageStore(storageDir, false, false, zlog.NewLogger("info", ""), metricsServer,
nil, nil, nil, nil)
subpathStore := local.NewImageStore(path.Join(storageDir, "a"), false, false,
subpathStore := local.NewImageStore(subpathStoreDir, false, false,
zlog.NewLogger("info", ""), metricsServer, nil, nil, nil, nil)
params := boltdb.DBParameters{
RootDir: storageDir,
@@ -1438,6 +1439,7 @@ func TestRetentionCheckWithGCIntervalOverride(t *testing.T) {
Convey("config with gc-interval override", t, func(c C) {
testDir := t.TempDir()
storageDir := path.Join(testDir, "storage")
subpathStoreDir := path.Join(testDir, "storage2")
configFile := path.Join(testDir, "zot-config.json")
logFile := path.Join(testDir, "retention-check.log")
port := GetFreePort()
@@ -1451,7 +1453,7 @@ func TestRetentionCheckWithGCIntervalOverride(t *testing.T) {
"gcInterval": "1m",
"subPaths": {
"/a": {
"rootDirectory": "%s/a",
"rootDirectory": "%s",
"gc": true,
"gcDelay": %q,
"gcInterval": "1m"
@@ -1466,7 +1468,7 @@ func TestRetentionCheckWithGCIntervalOverride(t *testing.T) {
"level": "debug"
}
}
`, storageDir, testGCDelay, storageDir, testGCDelay, port)
`, storageDir, testGCDelay, subpathStoreDir, testGCDelay, port)
err := os.WriteFile(configFile, content, 0o600)
So(err, ShouldBeNil)
+3 -9
View File
@@ -3260,9 +3260,7 @@ func TestGlobalSearch(t *testing.T) { //nolint: gocyclo
subpath := "/a"
dir := t.TempDir()
subDir := t.TempDir()
subRootDir := path.Join(subDir, subpath)
subRootDir := t.TempDir()
port := GetFreePort()
baseURL := GetBaseURL(port)
@@ -3592,9 +3590,7 @@ func TestGlobalSearch(t *testing.T) { //nolint: gocyclo
subpath := "/a"
dir := t.TempDir()
subDir := t.TempDir()
subRootDir := path.Join(subDir, subpath)
subRootDir := t.TempDir()
port := GetFreePort()
baseURL := GetBaseURL(port)
@@ -5060,9 +5056,7 @@ func TestMetaDBWhenSigningImages(t *testing.T) {
subpath := "/a"
dir := t.TempDir()
subDir := t.TempDir()
subRootDir := path.Join(subDir, subpath)
subRootDir := t.TempDir()
port := GetFreePort()
baseURL := GetBaseURL(port)
+25 -8
View File
@@ -181,14 +181,10 @@ func ParseRepo(repo string, metaDB mTypes.MetaDB, storeController stypes.StoreCo
}
func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]string, error) {
allRepos, err := storeController.GetDefaultImageStore().GetRepositories()
if err != nil {
log.Error().Err(err).Str("rootDir", storeController.GetDefaultImageStore().RootDir()).
Msg("failed to get all repo names present under rootDir")
return nil, err
}
allRepos := make([]string, 0)
repoSet := make(map[string]struct{})
// Process substores first
if storeController.GetImageSubStores() != nil {
for _, store := range storeController.GetImageSubStores() {
substoreRepos, err := store.GetRepositories()
@@ -199,7 +195,28 @@ func getAllRepos(storeController stypes.StoreController, log log.Logger) ([]stri
return nil, err
}
allRepos = append(allRepos, substoreRepos...)
for _, repo := range substoreRepos {
if _, exists := repoSet[repo]; !exists {
allRepos = append(allRepos, repo)
repoSet[repo] = struct{}{}
}
}
}
}
// Process default store, skipping repos already in the set
defaultRepos, err := storeController.GetDefaultImageStore().GetRepositories()
if err != nil {
log.Error().Err(err).Str("rootDir", storeController.GetDefaultImageStore().RootDir()).
Msg("failed to get all repo names present under rootDir")
return nil, err
}
for _, repo := range defaultRepos {
if _, exists := repoSet[repo]; !exists {
allRepos = append(allRepos, repo)
repoSet[repo] = struct{}{}
}
}
+66
View File
@@ -721,6 +721,72 @@ func RunParseStorageTests(rootDir string, metaDB mTypes.MetaDB, log log.Logger)
So(repoMeta.Tags, ShouldContainKey, tag)
So(repoMeta.Tags, ShouldNotContainKey, tag2)
})
Convey("Test no duplicate repos with substores and nested repo names", func() {
// Create nested directories - substore is a subdirectory of default store
defaultStoreDir := rootDir
substoreDir := filepath.Join(rootDir, "a")
defaultStore := local.NewImageStore(defaultStoreDir, false, false,
log, monitoring.NewMetricsServer(false, log), nil, nil, nil, nil)
substore := local.NewImageStore(substoreDir, false, false,
log, monitoring.NewMetricsServer(false, log), nil, nil, nil, nil)
storeController := storage.StoreController{
DefaultStore: defaultStore,
SubStore: map[string]storageTypes.ImageStore{
"/a": substore,
},
}
// Create a repo in default store (regular repo name, no route prefix)
defaultRepo := "repo-in-default"
image1 := CreateRandomImage()
err := WriteImageToFileSystem(image1, defaultRepo, "tag1", storeController)
So(err, ShouldBeNil)
// Create repos in substore (these will be returned by substore.GetRepositories())
// Repos in substore should have the "a" prefix to match the substore route
substoreRepo1 := "a/repo-in-substore-1"
substoreRepo2 := "a/repo-in-substore-2"
image2 := CreateRandomImage()
err = WriteImageToFileSystem(image2, substoreRepo1, "tag1", storeController)
So(err, ShouldBeNil)
image3 := CreateRandomImage()
err = WriteImageToFileSystem(image3, substoreRepo2, "tag1", storeController)
So(err, ShouldBeNil)
// Parse storage
err = meta.ParseStorage(metaDB, storeController, log)
So(err, ShouldBeNil)
// Get all repos from metaDB
repoMetaList, err := metaDB.SearchRepos(ctx, "")
So(err, ShouldBeNil)
// Collect all repo names and count occurrences
repoNames := make(map[string]int)
for _, repoMeta := range repoMetaList {
repoNames[repoMeta.Name]++
}
// Verify expected repos are present
// Substore repos are processed first (with "a/" prefix), then default store repos
expectedRepos := []string{substoreRepo1, substoreRepo2, defaultRepo}
for _, expectedRepo := range expectedRepos {
So(repoNames, ShouldContainKey, expectedRepo)
}
// Verify no duplicates - each repo should appear exactly once
for _, count := range repoNames {
So(count, ShouldEqual, 1)
}
// Verify total count - should be 3 repos:
// - substoreRepo1, substoreRepo2 (from substore with "a/" prefix)
// - defaultRepo (from default store, no prefix)
So(len(repoMetaList), ShouldEqual, 3)
})
}
func TestGetSignatureLayersInfo(t *testing.T) {