mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 04:48:26 +08:00
refactor(storage): refactor storage into a single ImageStore (#1656)
unified both local and s3 ImageStore logic into a single ImageStore added a new driver interface for common file/dirs manipulations to be implemented by different storage types refactor(gc): drop umoci dependency, implemented internal gc added retentionDelay config option that specifies the garbage collect delay for images without tags this will also clean manifests which are part of an index image (multiarch) that no longer exist. fix(dedupe): skip blobs under .sync/ directory if startup dedupe is running while also syncing is running ignore blobs under sync's temporary storage fix(storage): do not allow image indexes modifications when deleting a manifest verify that it is not part of a multiarch image and throw a MethodNotAllowed error to the client if it is. we don't want to modify multiarch images Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
@@ -0,0 +1,115 @@
|
||||
package s3
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
// Add s3 support.
|
||||
"github.com/docker/distribution/registry/storage/driver"
|
||||
_ "github.com/docker/distribution/registry/storage/driver/s3-aws"
|
||||
|
||||
storageConstants "zotregistry.io/zot/pkg/storage/constants"
|
||||
)
|
||||
|
||||
type Driver struct {
|
||||
store driver.StorageDriver
|
||||
}
|
||||
|
||||
func New(storeDriver driver.StorageDriver) *Driver {
|
||||
return &Driver{store: storeDriver}
|
||||
}
|
||||
|
||||
func (driver *Driver) Name() string {
|
||||
return storageConstants.S3StorageDriverName
|
||||
}
|
||||
|
||||
func (driver *Driver) EnsureDir(path string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (driver *Driver) DirExists(path string) bool {
|
||||
if fi, err := driver.store.Stat(context.Background(), path); err == nil && fi.IsDir() {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (driver *Driver) Reader(path string, offset int64) (io.ReadCloser, error) {
|
||||
return driver.store.Reader(context.Background(), path, offset)
|
||||
}
|
||||
|
||||
func (driver *Driver) ReadFile(path string) ([]byte, error) {
|
||||
return driver.store.GetContent(context.Background(), path)
|
||||
}
|
||||
|
||||
func (driver *Driver) Delete(path string) error {
|
||||
return driver.store.Delete(context.Background(), path)
|
||||
}
|
||||
|
||||
func (driver *Driver) Stat(path string) (driver.FileInfo, error) {
|
||||
return driver.store.Stat(context.Background(), path)
|
||||
}
|
||||
|
||||
func (driver *Driver) Writer(filepath string, append bool) (driver.FileWriter, error) { //nolint:predeclared
|
||||
return driver.store.Writer(context.Background(), filepath, append)
|
||||
}
|
||||
|
||||
func (driver *Driver) WriteFile(filepath string, content []byte) (int, error) {
|
||||
var n int
|
||||
|
||||
if stwr, err := driver.store.Writer(context.Background(), filepath, false); err == nil {
|
||||
defer stwr.Close()
|
||||
|
||||
if n, err = stwr.Write(content); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
if err := stwr.Commit(); err != nil {
|
||||
return -1, err
|
||||
}
|
||||
} else {
|
||||
return -1, err
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func (driver *Driver) Walk(path string, f driver.WalkFn) error {
|
||||
return driver.store.Walk(context.Background(), path, f)
|
||||
}
|
||||
|
||||
func (driver *Driver) List(fullpath string) ([]string, error) {
|
||||
return driver.store.List(context.Background(), fullpath)
|
||||
}
|
||||
|
||||
func (driver *Driver) Move(sourcePath string, destPath string) error {
|
||||
return driver.store.Move(context.Background(), sourcePath, destPath)
|
||||
}
|
||||
|
||||
func (driver *Driver) SameFile(path1, path2 string) bool {
|
||||
fi1, _ := driver.store.Stat(context.Background(), path1)
|
||||
|
||||
fi2, _ := driver.store.Stat(context.Background(), path2)
|
||||
|
||||
if fi1 != nil && fi2 != nil {
|
||||
if fi1.IsDir() == fi2.IsDir() &&
|
||||
fi1.ModTime() == fi2.ModTime() &&
|
||||
fi1.Path() == fi2.Path() &&
|
||||
fi1.Size() == fi2.Size() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
/*
|
||||
Link put an empty file that will act like a link between the original file and deduped one
|
||||
|
||||
because s3 doesn't support symlinks, wherever the storage will encounter an empty file, it will get the original one
|
||||
from cache.
|
||||
*/
|
||||
func (driver *Driver) Link(src, dest string) error {
|
||||
return driver.store.PutContent(context.Background(), dest, []byte{})
|
||||
}
|
||||
+19
-1707
File diff suppressed because it is too large
Load Diff
+34
-28
@@ -77,15 +77,17 @@ func createMockStorage(rootDir string, cacheDir string, dedupe bool, store drive
|
||||
var cacheDriver cache.Cache
|
||||
|
||||
// from pkg/cli/root.go/applyDefaultValues, s3 magic
|
||||
if _, err := os.Stat(path.Join(cacheDir, "s3_cache.db")); dedupe || (!dedupe && err == nil) {
|
||||
if _, err := os.Stat(path.Join(cacheDir,
|
||||
storageConstants.BoltdbName+storageConstants.DBExtensionName)); dedupe || (!dedupe && err == nil) {
|
||||
cacheDriver, _ = storage.Create("boltdb", cache.BoltDBDriverParameters{
|
||||
RootDir: cacheDir,
|
||||
Name: "s3_cache",
|
||||
Name: "cache",
|
||||
UseRelPaths: false,
|
||||
}, log)
|
||||
}
|
||||
il := s3.NewImageStore(rootDir, cacheDir, false, storageConstants.DefaultGCDelay,
|
||||
dedupe, false, log, metrics, nil, store, cacheDriver,
|
||||
|
||||
il := s3.NewImageStore(rootDir, cacheDir, true, true, storageConstants.DefaultGCDelay,
|
||||
storageConstants.DefaultUntaggedImgeRetentionDelay, dedupe, false, log, metrics, nil, store, cacheDriver,
|
||||
)
|
||||
|
||||
return il
|
||||
@@ -97,8 +99,8 @@ func createMockStorageWithMockCache(rootDir string, dedupe bool, store driver.St
|
||||
log := log.Logger{Logger: zerolog.New(os.Stdout)}
|
||||
metrics := monitoring.NewMetricsServer(false, log)
|
||||
|
||||
il := s3.NewImageStore(rootDir, "", false, storageConstants.DefaultGCDelay,
|
||||
dedupe, false, log, metrics, nil, store, cacheDriver,
|
||||
il := s3.NewImageStore(rootDir, "", true, true, storageConstants.DefaultGCDelay,
|
||||
storageConstants.DefaultUntaggedImgeRetentionDelay, dedupe, false, log, metrics, nil, store, cacheDriver,
|
||||
)
|
||||
|
||||
return il
|
||||
@@ -150,17 +152,17 @@ func createObjectsStore(rootDir string, cacheDir string, dedupe bool) (
|
||||
var err error
|
||||
|
||||
// from pkg/cli/root.go/applyDefaultValues, s3 magic
|
||||
s3CacheDBPath := path.Join(cacheDir, s3.CacheDBName+storageConstants.DBExtensionName)
|
||||
s3CacheDBPath := path.Join(cacheDir, storageConstants.BoltdbName+storageConstants.DBExtensionName)
|
||||
if _, err = os.Stat(s3CacheDBPath); dedupe || (!dedupe && err == nil) {
|
||||
cacheDriver, _ = storage.Create("boltdb", cache.BoltDBDriverParameters{
|
||||
RootDir: cacheDir,
|
||||
Name: "s3_cache",
|
||||
Name: "cache",
|
||||
UseRelPaths: false,
|
||||
}, log)
|
||||
}
|
||||
|
||||
il := s3.NewImageStore(rootDir, cacheDir, false, storageConstants.DefaultGCDelay,
|
||||
dedupe, false, log, metrics, nil, store, cacheDriver)
|
||||
il := s3.NewImageStore(rootDir, cacheDir, true, true, storageConstants.DefaultGCDelay,
|
||||
storageConstants.DefaultUntaggedImgeRetentionDelay, dedupe, false, log, metrics, nil, store, cacheDriver)
|
||||
|
||||
return store, il, err
|
||||
}
|
||||
@@ -194,8 +196,8 @@ func createObjectsStoreDynamo(rootDir string, cacheDir string, dedupe bool, tabl
|
||||
panic(err)
|
||||
}
|
||||
|
||||
il := s3.NewImageStore(rootDir, cacheDir, false, storageConstants.DefaultGCDelay,
|
||||
dedupe, false, log, metrics, nil, store, cacheDriver)
|
||||
il := s3.NewImageStore(rootDir, cacheDir, true, true, storageConstants.DefaultGCDelay,
|
||||
storageConstants.DefaultUntaggedImgeRetentionDelay, dedupe, false, log, metrics, nil, store, cacheDriver)
|
||||
|
||||
return store, il, err
|
||||
}
|
||||
@@ -893,7 +895,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
||||
_, _, err = imgStore.CheckBlob(testImage, digest)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, err = imgStore.StatBlob(testImage, digest)
|
||||
_, _, _, err = imgStore.StatBlob(testImage, digest)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
|
||||
@@ -1050,7 +1052,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
||||
WriterFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) {
|
||||
return &FileWriterMock{WriteFn: func(b []byte) (int, error) {
|
||||
return 0, errS3
|
||||
}}, nil
|
||||
}}, errS3
|
||||
},
|
||||
})
|
||||
_, err := imgStore.PutBlobChunkStreamed(testImage, "uuid", io.NopCloser(strings.NewReader("")))
|
||||
@@ -1091,7 +1093,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) {
|
||||
WriteFn: func(b []byte) (int, error) {
|
||||
return 0, errS3
|
||||
},
|
||||
}, nil
|
||||
}, errS3
|
||||
},
|
||||
})
|
||||
_, err := imgStore.PutBlobChunk(testImage, "uuid", 12, 100, io.NopCloser(strings.NewReader("")))
|
||||
@@ -1280,7 +1282,7 @@ func TestS3Dedupe(t *testing.T) {
|
||||
So(checkBlobSize1, ShouldBeGreaterThan, 0)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
ok, checkBlobSize1, err = imgStore.StatBlob("dedupe1", digest)
|
||||
ok, checkBlobSize1, _, err = imgStore.StatBlob("dedupe1", digest)
|
||||
So(ok, ShouldBeTrue)
|
||||
So(checkBlobSize1, ShouldBeGreaterThan, 0)
|
||||
So(err, ShouldBeNil)
|
||||
@@ -1466,12 +1468,12 @@ func TestS3Dedupe(t *testing.T) {
|
||||
Convey("Check backward compatibility - switch dedupe to false", func() {
|
||||
/* copy cache to the new storage with dedupe false (doing this because we
|
||||
already have a cache object holding the lock on cache db file) */
|
||||
input, err := os.ReadFile(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName))
|
||||
input, err := os.ReadFile(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tdir = t.TempDir()
|
||||
|
||||
err = os.WriteFile(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName), input, 0o600)
|
||||
err = os.WriteFile(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName), input, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, false)
|
||||
@@ -3306,6 +3308,7 @@ func TestS3ManifestImageIndex(t *testing.T) {
|
||||
|
||||
err = imgStore.DeleteImageManifest("index", "test:index1", false)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetImageManifest("index", "test:index1")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
@@ -3599,7 +3602,7 @@ func TestS3DedupeErr(t *testing.T) {
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{})
|
||||
|
||||
err = os.Remove(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName))
|
||||
err = os.Remove(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName))
|
||||
digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest")
|
||||
|
||||
// trigger unable to insert blob record
|
||||
@@ -3640,8 +3643,9 @@ func TestS3DedupeErr(t *testing.T) {
|
||||
err := imgStore.DedupeBlob("", digest, "dst")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// error will be triggered in driver.SameFile()
|
||||
err = imgStore.DedupeBlob("", digest, "dst2")
|
||||
So(err, ShouldNotBeNil)
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Test DedupeBlob - error on store.PutContent()", t, func(c C) {
|
||||
@@ -3776,12 +3780,12 @@ func TestS3DedupeErr(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// copy cache db to the new imagestore
|
||||
input, err := os.ReadFile(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName))
|
||||
input, err := os.ReadFile(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tdir = t.TempDir()
|
||||
|
||||
err = os.WriteFile(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName), input, 0o600)
|
||||
err = os.WriteFile(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName), input, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
@@ -3797,12 +3801,14 @@ func TestS3DedupeErr(t *testing.T) {
|
||||
_, _, err = imgStore.GetBlob("repo2", digest, "application/vnd.oci.image.layer.v1.tar+gzip")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
// now it should move content from /repo1/dst1 to /repo2/dst2
|
||||
_, err = imgStore.GetBlobContent("repo2", digest)
|
||||
So(err, ShouldNotBeNil)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
_, _, err = imgStore.StatBlob("repo2", digest)
|
||||
So(err, ShouldNotBeNil)
|
||||
_, _, _, err = imgStore.StatBlob("repo2", digest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// it errors out because of bad range, as mock store returns a driver.FileInfo with 0 size
|
||||
_, _, _, err = imgStore.GetBlobPartial("repo2", digest, "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1)
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
@@ -3822,12 +3828,12 @@ func TestS3DedupeErr(t *testing.T) {
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
// copy cache db to the new imagestore
|
||||
input, err := os.ReadFile(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName))
|
||||
input, err := os.ReadFile(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName))
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
tdir = t.TempDir()
|
||||
|
||||
err = os.WriteFile(path.Join(tdir, s3.CacheDBName+storageConstants.DBExtensionName), input, 0o600)
|
||||
err = os.WriteFile(path.Join(tdir, storageConstants.BoltdbName+storageConstants.DBExtensionName), input, 0o600)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{
|
||||
@@ -3887,7 +3893,7 @@ func TestS3DedupeErr(t *testing.T) {
|
||||
_, err = imgStore.GetBlobContent("repo2", digest)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, err = imgStore.StatBlob("repo2", digest)
|
||||
_, _, _, err = imgStore.StatBlob("repo2", digest)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, _, err = imgStore.GetBlobPartial("repo2", digest, "application/vnd.oci.image.layer.v1.tar+gzip", 0, 1)
|
||||
|
||||
Reference in New Issue
Block a user