mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 05:28:07 +08:00
fix(storage/gcs): fix double-prefixed rootdirectory and EOF handling in Walk for GCS (#3903)
* fix(storage): resolve double-prefixing issue for GCS rootdirectory Preserve double-prefixing for S3 to maintain backward compatibility with existing data. For GCS, always use "/" as rootDir to avoid double-prefixing, as GCS rootdirectory usage is a newer feature without legacy data. Signed-off-by: Sebastian Thees <thees@users.noreply.github.com> * fix(gcs): handle io.EOF correctly in Walk method Ensure io.EOF is returned unwrapped to allow proper error handling with errors.Is() upstream. Signed-off-by: Sebastian Thees <thees@users.noreply.github.com> * fix(storage): set sensible default ("/zot") for GCS when storageDriver.rootdirectory is unset or empty or "/" Signed-off-by: Sebastian Thees <thees@users.noreply.github.com> * fix(imagestore): avoid warning logs for expected cache miss scenarios Refine logging to use debug level for expected cache misses, preventing unnecessary warnings. Signed-off-by: Sebastian Thees <thees@users.noreply.github.com> --------- Signed-off-by: Sebastian Thees <thees@users.noreply.github.com>
This commit is contained in:
@@ -4,6 +4,7 @@
|
||||
"dedupe": false,
|
||||
"storageDriver": {
|
||||
"name": "gcs",
|
||||
"rootdirectory": "/zot",
|
||||
"bucket": "zot-storage",
|
||||
"credentialsFile": "/path/to/gcs-credentials.json"
|
||||
}
|
||||
|
||||
@@ -116,7 +116,27 @@ func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (driver *Driver) Walk(path string, f storagedriver.WalkFn) error {
|
||||
return driver.formatErr(driver.store.Walk(context.Background(), path, f), path)
|
||||
err := driver.store.Walk(context.Background(), path, f)
|
||||
// io.EOF is used by callers (e.g. GetNextRepository) as a stop signal, not an error, so return directly.
|
||||
if isEOF(err) {
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
return driver.formatErr(err, path)
|
||||
}
|
||||
|
||||
// isEOF checks whether err is directly io.EOF or if io.EOF is wrapped into storagedriver.Error.Detail.
|
||||
func isEOF(err error) bool {
|
||||
if errors.Is(err, io.EOF) {
|
||||
return true
|
||||
}
|
||||
|
||||
var storageErr storagedriver.Error
|
||||
if errors.As(err, &storageErr) {
|
||||
return errors.Is(storageErr.Detail, io.EOF)
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (driver *Driver) List(fullpath string) ([]string, error) {
|
||||
|
||||
@@ -277,11 +277,44 @@ func TestDriver(t *testing.T) {
|
||||
})
|
||||
|
||||
Convey("Walk", func() {
|
||||
storeMock.WalkFn = func(ctx context.Context, path string, f driver.WalkFn, _ ...func(*driver.WalkOptions)) error {
|
||||
return nil
|
||||
}
|
||||
err := gcsDriver.Walk("/test", nil)
|
||||
So(err, ShouldBeNil)
|
||||
Convey("Success", func() {
|
||||
storeMock.WalkFn = func(ctx context.Context, path string, f driver.WalkFn, _ ...func(*driver.WalkOptions)) error {
|
||||
return nil
|
||||
}
|
||||
err := gcsDriver.Walk("/test", nil)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Direct io.EOF is returned as io.EOF", func() {
|
||||
storeMock.WalkFn = func(ctx context.Context, path string, f driver.WalkFn, _ ...func(*driver.WalkOptions)) error {
|
||||
return io.EOF
|
||||
}
|
||||
err := gcsDriver.Walk("/test", nil)
|
||||
So(errors.Is(err, io.EOF), ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("io.EOF wrapped in storagedriver.Error is returned as io.EOF", func() {
|
||||
storeMock.WalkFn = func(ctx context.Context, path string, f driver.WalkFn, _ ...func(*driver.WalkOptions)) error {
|
||||
return driver.Error{
|
||||
DriverName: "gcs",
|
||||
Detail: io.EOF,
|
||||
}
|
||||
}
|
||||
err := gcsDriver.Walk("/test", nil)
|
||||
So(errors.Is(err, io.EOF), ShouldBeTrue)
|
||||
})
|
||||
|
||||
Convey("Non-EOF error is formatted normally", func() {
|
||||
storeMock.WalkFn = func(ctx context.Context, path string, f driver.WalkFn, _ ...func(*driver.WalkOptions)) error {
|
||||
return errTest
|
||||
}
|
||||
err := gcsDriver.Walk("/test", nil)
|
||||
So(err, ShouldNotBeNil)
|
||||
So(errors.Is(err, io.EOF), ShouldBeFalse)
|
||||
|
||||
var storageErr driver.Error
|
||||
So(errors.As(err, &storageErr), ShouldBeTrue)
|
||||
})
|
||||
})
|
||||
|
||||
Convey("List", func() {
|
||||
|
||||
@@ -1449,7 +1449,12 @@ func (is *ImageStore) CheckBlob(repo string, digest godigest.Digest) (bool, int6
|
||||
// Check blobs in cache
|
||||
dstRecord, err := is.checkCacheBlob(digest)
|
||||
if err != nil {
|
||||
is.log.Warn().Err(err).Str("digest", digest.String()).Msg("not found in cache")
|
||||
// Cache miss / not-found is a normal condition when the blob truly doesn't exist.
|
||||
if errors.Is(err, zerr.ErrCacheMiss) || errors.Is(err, zerr.ErrBlobNotFound) {
|
||||
is.log.Debug().Err(err).Str("digest", digest.String()).Msg("cache miss for blob")
|
||||
} else {
|
||||
is.log.Warn().Err(err).Str("digest", digest.String()).Msg("failed to lookup blob in cache")
|
||||
}
|
||||
|
||||
return false, -1, zerr.ErrBlobNotFound
|
||||
}
|
||||
|
||||
+39
-12
@@ -71,6 +71,8 @@ func New(config *config.Config, linter common.Lint, metrics monitoring.MetricSer
|
||||
return storeController, fmt.Errorf("storageDriver '%s' unsupported storage driver: %w", storeName, zerr.ErrBadConfig)
|
||||
}
|
||||
|
||||
NormalizeGCSRootDirectory(storeName, config.Storage.StorageDriver)
|
||||
|
||||
// Init a Storager from connection string.
|
||||
store, err := factory.Create(context.Background(), storeName, config.Storage.StorageDriver)
|
||||
if err != nil {
|
||||
@@ -79,12 +81,7 @@ func New(config *config.Config, linter common.Lint, metrics monitoring.MetricSer
|
||||
return storeController, err
|
||||
}
|
||||
|
||||
/* in the case of s3 config.Storage.RootDirectory is used for caching blobs locally and
|
||||
config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */
|
||||
rootDir := "/"
|
||||
if config.Storage.StorageDriver["rootdirectory"] != nil {
|
||||
rootDir = fmt.Sprintf("%v", config.Storage.StorageDriver["rootdirectory"])
|
||||
}
|
||||
rootDir := RootDir(storeName, config.Storage.StorageDriver)
|
||||
|
||||
cacheDriver, err := CreateCacheDatabaseDriver(config.Storage.StorageConfig, log)
|
||||
if err != nil {
|
||||
@@ -193,6 +190,8 @@ func getSubStore(cfg *config.Config, subPaths map[string]config.StorageConfig,
|
||||
return nil, fmt.Errorf("storageDriver '%s' unsupported storage driver: %w", storeName, zerr.ErrBadConfig)
|
||||
}
|
||||
|
||||
NormalizeGCSRootDirectory(storeName, storageConfig.StorageDriver)
|
||||
|
||||
// Init a Storager from connection string.
|
||||
store, err := factory.Create(context.Background(), storeName, storageConfig.StorageDriver)
|
||||
if err != nil {
|
||||
@@ -201,12 +200,7 @@ func getSubStore(cfg *config.Config, subPaths map[string]config.StorageConfig,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
/* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and
|
||||
c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */
|
||||
rootDir := "/"
|
||||
if cfg.Storage.StorageDriver["rootdirectory"] != nil {
|
||||
rootDir = fmt.Sprintf("%v", cfg.Storage.StorageDriver["rootdirectory"])
|
||||
}
|
||||
rootDir := RootDir(storeName, storageConfig.StorageDriver)
|
||||
|
||||
cacheDriver, err := CreateCacheDatabaseDriver(storageConfig, log)
|
||||
if err != nil {
|
||||
@@ -233,6 +227,39 @@ func getSubStore(cfg *config.Config, subPaths map[string]config.StorageConfig,
|
||||
return subImageStore, nil
|
||||
}
|
||||
|
||||
// NormalizeGCSRootDirectory ensures the GCS storage driver has a sensible default for rootdirectory.
|
||||
// Defaults to "zot" if unset or empty; overrides "/" with "zot" to prevent upstream gcs driver issue.
|
||||
// Must be called before factory.Create so the upstream GCS driver receives the correct prefix.
|
||||
// Non-GCS drivers are not affected.
|
||||
func NormalizeGCSRootDirectory(storeName string, driverParams map[string]any) {
|
||||
if storeName != constants.GCSStorageDriverName {
|
||||
return
|
||||
}
|
||||
|
||||
rd, ok := driverParams["rootdirectory"]
|
||||
if !ok || rd == nil || fmt.Sprintf("%v", rd) == "" || fmt.Sprintf("%v", rd) == "/" {
|
||||
driverParams["rootdirectory"] = "/zot"
|
||||
}
|
||||
}
|
||||
|
||||
// RootDir returns the rootDir to use for a remote (S3/GCS) ImageStore.
|
||||
//
|
||||
// The upstream storage driver receives storageDriver["rootdirectory"] via factory.Create and
|
||||
// prefixes it internally (S3: s3Path, GCS: pathToKey). If ImageStore.rootDir also contains
|
||||
// the same prefix, every path gets double-prefixed (e.g. "zot/zot/repo/...").
|
||||
//
|
||||
// For S3: preserve the old behavior for backward compatibility — existing deployments
|
||||
// have data stored under double-prefixed keys.
|
||||
//
|
||||
// For GCS: use "/" to avoid double-prefixing (new driver in 2.1.15, no legacy data).
|
||||
func RootDir(storeName string, driverParams map[string]any) string {
|
||||
if storeName == constants.S3StorageDriverName && driverParams["rootdirectory"] != nil {
|
||||
return fmt.Sprintf("%v", driverParams["rootdirectory"])
|
||||
}
|
||||
|
||||
return "/"
|
||||
}
|
||||
|
||||
func compareImageStore(root1, root2 string) bool {
|
||||
isSameFile, err := config.SameFile(root1, root2)
|
||||
if err != nil {
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
|
||||
"github.com/alicebob/miniredis/v2"
|
||||
"github.com/distribution/distribution/v3/registry/storage/driver/factory"
|
||||
_ "github.com/distribution/distribution/v3/registry/storage/driver/gcs"
|
||||
_ "github.com/distribution/distribution/v3/registry/storage/driver/s3-aws"
|
||||
guuid "github.com/gofrs/uuid"
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
@@ -1560,6 +1561,64 @@ func TestStorageSubpaths(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestRootDir(t *testing.T) {
|
||||
Convey("S3 with rootdirectory preserves it for backward compatibility", t, func() {
|
||||
rootDir := storage.RootDir(storageConstants.S3StorageDriverName, map[string]any{
|
||||
"rootdirectory": "/custom-prefix",
|
||||
})
|
||||
So(rootDir, ShouldEqual, "/custom-prefix")
|
||||
})
|
||||
|
||||
Convey("S3 without rootdirectory defaults to /", t, func() {
|
||||
rootDir := storage.RootDir(storageConstants.S3StorageDriverName, map[string]any{})
|
||||
So(rootDir, ShouldEqual, "/")
|
||||
})
|
||||
|
||||
Convey("GCS with rootdirectory uses / to avoid double-prefixing", t, func() {
|
||||
rootDir := storage.RootDir(storageConstants.GCSStorageDriverName, map[string]any{
|
||||
"rootdirectory": "/custom-prefix",
|
||||
})
|
||||
So(rootDir, ShouldEqual, "/")
|
||||
})
|
||||
|
||||
Convey("GCS without rootdirectory defaults to /", t, func() {
|
||||
rootDir := storage.RootDir(storageConstants.GCSStorageDriverName, map[string]any{})
|
||||
So(rootDir, ShouldEqual, "/")
|
||||
})
|
||||
}
|
||||
|
||||
func TestNormalizeGCSRootDirectory(t *testing.T) {
|
||||
Convey("GCS without rootdirectory defaults to zot", t, func() {
|
||||
params := map[string]any{}
|
||||
storage.NormalizeGCSRootDirectory(storageConstants.GCSStorageDriverName, params)
|
||||
So(params["rootdirectory"], ShouldEqual, "/zot")
|
||||
})
|
||||
|
||||
Convey("GCS with rootdirectory / is overwritten to zot", t, func() {
|
||||
params := map[string]any{"rootdirectory": "/"}
|
||||
storage.NormalizeGCSRootDirectory(storageConstants.GCSStorageDriverName, params)
|
||||
So(params["rootdirectory"], ShouldEqual, "/zot")
|
||||
})
|
||||
|
||||
Convey("GCS with empty rootdirectory defaults to zot", t, func() {
|
||||
params := map[string]any{"rootdirectory": ""}
|
||||
storage.NormalizeGCSRootDirectory(storageConstants.GCSStorageDriverName, params)
|
||||
So(params["rootdirectory"], ShouldEqual, "/zot")
|
||||
})
|
||||
|
||||
Convey("GCS with custom rootdirectory is preserved", t, func() {
|
||||
params := map[string]any{"rootdirectory": "my-prefix"}
|
||||
storage.NormalizeGCSRootDirectory(storageConstants.GCSStorageDriverName, params)
|
||||
So(params["rootdirectory"], ShouldEqual, "my-prefix")
|
||||
})
|
||||
|
||||
Convey("S3 params are not affected", t, func() {
|
||||
params := map[string]any{"rootdirectory": "/"}
|
||||
storage.NormalizeGCSRootDirectory(storageConstants.S3StorageDriverName, params)
|
||||
So(params["rootdirectory"], ShouldEqual, "/")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteBlobsInUse(t *testing.T) {
|
||||
for _, testcase := range testCases {
|
||||
t.Run(testcase.testCaseName, func(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user