diff --git a/examples/README.md b/examples/README.md index cf8a35a8..180566d5 100644 --- a/examples/README.md +++ b/examples/README.md @@ -308,13 +308,15 @@ zot also supports different storage drivers for each subpath. ### Specifying S3 credentials -There are multiple ways to specify S3 credentials: - - Config file: ``` + "storage": { + "rootDirectory": "/tmp/zot", # local path used to store dedupe cache database + "dedupe": true, "storageDriver": { "name": "s3", + "rootdirectory": "/zot", # this is a prefix that is applied to all S3 keys to allow you to segment data in your bucket if necessary. "region": "us-east-2", "bucket": "zot-storage", "secure": true, @@ -324,6 +326,8 @@ There are multiple ways to specify S3 credentials: } ``` +There are multiple ways to specify S3 credentials besides config file: + - Environment variables: SDK looks for credentials in the following environment variables: diff --git a/examples/config-s3.json b/examples/config-s3.json index aef3273a..8b091159 100644 --- a/examples/config-s3.json +++ b/examples/config-s3.json @@ -1,9 +1,11 @@ { "distSpecVersion": "1.0.1-dev", "storage": { - "rootDirectory": "/zot", + "rootDirectory": "/tmp/zot", + "dedupe": true, "storageDriver": { "name": "s3", + "rootdirectory": "/zot", "region": "us-east-2", "bucket": "zot-storage", "secure": true, @@ -11,9 +13,11 @@ }, "subPaths": { "/a": { - "rootDirectory": "/zot-a", + "rootDirectory": "/tmp/zot1", + "dedupe": false, "storageDriver": { "name": "s3", + "rootdirectory": "/zot-a", "region": "us-east-2", "bucket": "zot-storage", "secure": true, @@ -21,9 +25,11 @@ } }, "/b": { - "rootDirectory": "/zot-b", + "rootDirectory": "/tmp/zot2", + "dedupe": true, "storageDriver": { "name": "s3", + "rootdirectory": "/zot-b", "region": "us-east-2", "bucket": "zot-storage", "secure": true, @@ -31,9 +37,11 @@ } }, "/c": { - "rootDirectory": "/zot-c", + "rootDirectory": "/tmp/zot3", + "dedupe": true, "storageDriver": { "name": "s3", + "rootdirectory": "/zot-c", "region": "us-east-2", "bucket": "zot-storage", "secure": false, diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 591b0703..53dcd733 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -218,7 +218,8 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { c.StoreController = storage.StoreController{} if c.Config.Storage.RootDirectory != "" { - if c.Config.Storage.Dedupe { + // no need to validate hard links work on s3 + if c.Config.Storage.Dedupe && c.Config.Storage.StorageDriver == nil { err := storage.ValidateHardLink(c.Config.Storage.RootDirectory) if err != nil { c.Log.Warn().Msg("input storage root directory filesystem does not supports hardlinking," + @@ -229,7 +230,7 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { } var defaultStore storage.ImageStore - if len(c.Config.Storage.StorageDriver) == 0 { + if c.Config.Storage.StorageDriver == nil { defaultStore = storage.NewImageStore(c.Config.Storage.RootDirectory, c.Config.Storage.GC, c.Config.Storage.GCDelay, c.Config.Storage.Dedupe, c.Config.Storage.Commit, c.Log, c.Metrics) } else { @@ -246,7 +247,14 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { return err } - defaultStore = s3.NewImageStore(c.Config.Storage.RootDirectory, + /* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and + c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */ + rootDir := "/" + if c.Config.Storage.StorageDriver["rootdirectory"] != nil { + rootDir = fmt.Sprintf("%v", c.Config.Storage.StorageDriver["rootdirectory"]) + } + + defaultStore = s3.NewImageStore(rootDir, c.Config.Storage.RootDirectory, c.Config.Storage.GC, c.Config.Storage.GCDelay, c.Config.Storage.Dedupe, c.Config.Storage.Commit, c.Log, c.Metrics, store) } @@ -267,7 +275,8 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { // creating image store per subpaths for route, storageConfig := range subPaths { - if storageConfig.Dedupe { + // no need to validate hard links work on s3 + if storageConfig.Dedupe && storageConfig.StorageDriver == nil { err := storage.ValidateHardLink(storageConfig.RootDirectory) if err != nil { c.Log.Warn().Msg("input storage root directory filesystem does not supports hardlinking, " + @@ -277,7 +286,7 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { } } - if len(storageConfig.StorageDriver) == 0 { + if storageConfig.StorageDriver == nil { subImageStore[route] = storage.NewImageStore(storageConfig.RootDirectory, storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics) } else { @@ -294,7 +303,14 @@ func (c *Controller) InitImageStore(reloadCtx context.Context) error { return err } - subImageStore[route] = s3.NewImageStore(storageConfig.RootDirectory, + /* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and + c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */ + rootDir := "/" + if c.Config.Storage.StorageDriver["rootdirectory"] != nil { + rootDir = fmt.Sprintf("%v", c.Config.Storage.StorageDriver["rootdirectory"]) + } + + subImageStore[route] = s3.NewImageStore(rootDir, storageConfig.RootDirectory, storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics, store) } } diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index c22640d4..0b692d99 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -151,8 +151,8 @@ func TestObjectStorageController(t *testing.T) { conf := config.New() conf.HTTP.Port = port storageDriverParams := map[string]interface{}{ - "rootDir": "zot", - "name": storage.S3StorageDriverName, + "rootdirectory": "zot", + "name": storage.S3StorageDriverName, } conf.Storage.StorageDriver = storageDriverParams ctlr := api.NewController(conf) @@ -174,7 +174,7 @@ func TestObjectStorageController(t *testing.T) { endpoint := os.Getenv("S3MOCK_ENDPOINT") storageDriverParams := map[string]interface{}{ - "rootDir": "zot", + "rootdirectory": "zot", "name": storage.S3StorageDriverName, "region": "us-east-2", "bucket": bucket, @@ -206,7 +206,7 @@ func TestObjectStorageControllerSubPaths(t *testing.T) { endpoint := os.Getenv("S3MOCK_ENDPOINT") storageDriverParams := map[string]interface{}{ - "rootDir": "zot", + "rootdirectory": "zot", "name": storage.S3StorageDriverName, "region": "us-east-2", "bucket": bucket, diff --git a/pkg/storage/cache.go b/pkg/storage/cache.go index aaa98877..0b054a87 100644 --- a/pkg/storage/cache.go +++ b/pkg/storage/cache.go @@ -13,13 +13,15 @@ import ( const ( BlobsCache = "blobs" + DBExtensionName = ".db" dbCacheLockCheckTimeout = 10 * time.Second ) type Cache struct { - rootDir string - db *bbolt.DB - log zlog.Logger + rootDir string + db *bbolt.DB + log zlog.Logger + useRelPaths bool // weather or not to use relative paths, should be true for filesystem and false for s3 } // Blob is a blob record. @@ -27,8 +29,8 @@ type Blob struct { Path string } -func NewCache(rootDir, name string, log zlog.Logger) *Cache { - dbPath := path.Join(rootDir, name+".db") +func NewCache(rootDir string, name string, useRelPaths bool, log zlog.Logger) *Cache { + dbPath := path.Join(rootDir, name+DBExtensionName) dbOpts := &bbolt.Options{ Timeout: dbCacheLockCheckTimeout, FreelistType: bbolt.FreelistArrayType, @@ -57,7 +59,7 @@ func NewCache(rootDir, name string, log zlog.Logger) *Cache { return nil } - return &Cache{rootDir: rootDir, db: cacheDB, log: log} + return &Cache{rootDir: rootDir, db: cacheDB, useRelPaths: useRelPaths, log: log} } func (c *Cache) PutBlob(digest, path string) error { @@ -68,9 +70,12 @@ func (c *Cache) PutBlob(digest, path string) error { } // use only relative (to rootDir) paths on blobs - relp, err := filepath.Rel(c.rootDir, path) - if err != nil { - c.log.Error().Err(err).Str("path", path).Msg("unable to get relative path") + var err error + if c.useRelPaths { + path, err = filepath.Rel(c.rootDir, path) + if err != nil { + c.log.Error().Err(err).Str("path", path).Msg("unable to get relative path") + } } if err := c.db.Update(func(tx *bbolt.Tx) error { @@ -91,8 +96,8 @@ func (c *Cache) PutBlob(digest, path string) error { return err } - if err := bucket.Put([]byte(relp), nil); err != nil { - c.log.Error().Err(err).Str("bucket", digest).Str("value", relp).Msg("unable to put record") + if err := bucket.Put([]byte(path), nil); err != nil { + c.log.Error().Err(err).Str("bucket", digest).Str("value", path).Msg("unable to put record") return err } @@ -166,9 +171,12 @@ func (c *Cache) HasBlob(digest, blob string) bool { func (c *Cache) DeleteBlob(digest, path string) error { // use only relative (to rootDir) paths on blobs - relp, err := filepath.Rel(c.rootDir, path) - if err != nil { - c.log.Error().Err(err).Str("path", path).Msg("unable to get relative path") + var err error + if c.useRelPaths { + path, err = filepath.Rel(c.rootDir, path) + if err != nil { + c.log.Error().Err(err).Str("path", path).Msg("unable to get relative path") + } } if err := c.db.Update(func(tx *bbolt.Tx) error { @@ -186,8 +194,8 @@ func (c *Cache) DeleteBlob(digest, path string) error { return errors.ErrCacheMiss } - if err := bucket.Delete([]byte(relp)); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("path", relp).Msg("unable to delete") + if err := bucket.Delete([]byte(path)); err != nil { + c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete") return err } @@ -196,9 +204,9 @@ func (c *Cache) DeleteBlob(digest, path string) error { k, _ := cur.First() if k == nil { - c.log.Debug().Str("digest", digest).Str("path", relp).Msg("deleting empty bucket") + c.log.Debug().Str("digest", digest).Str("path", path).Msg("deleting empty bucket") if err := root.DeleteBucket([]byte(digest)); err != nil { - c.log.Error().Err(err).Str("digest", digest).Str("path", relp).Msg("unable to delete") + c.log.Error().Err(err).Str("digest", digest).Str("path", path).Msg("unable to delete") return err } diff --git a/pkg/storage/cache_test.go b/pkg/storage/cache_test.go index c35238fd..6824ec06 100644 --- a/pkg/storage/cache_test.go +++ b/pkg/storage/cache_test.go @@ -17,9 +17,9 @@ func TestCache(t *testing.T) { log := log.NewLogger("debug", "") So(log, ShouldNotBeNil) - So(storage.NewCache("/deadBEEF", "cache_test", log), ShouldBeNil) + So(storage.NewCache("/deadBEEF", "cache_test", true, log), ShouldBeNil) - cache := storage.NewCache(dir, "cache_test", log) + cache := storage.NewCache(dir, "cache_test", true, log) So(cache, ShouldNotBeNil) val, err := cache.GetBlob("key") diff --git a/pkg/storage/s3/s3_test.go b/pkg/storage/s3/s3_test.go index 3c7e2ff9..43e41dca 100644 --- a/pkg/storage/s3/s3_test.go +++ b/pkg/storage/s3/s3_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" _ "crypto/sha256" + "encoding/json" "errors" "fmt" "io" @@ -19,6 +20,7 @@ import ( _ "github.com/docker/distribution/registry/storage/driver/s3-aws" guuid "github.com/gofrs/uuid" godigest "github.com/opencontainers/go-digest" + ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/rs/zerolog" . "github.com/smartystreets/goconvey/convey" "gopkg.in/resty.v1" @@ -27,6 +29,7 @@ import ( "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/storage/s3" + "zotregistry.io/zot/pkg/test" ) // nolint: gochecknoglobals @@ -50,15 +53,19 @@ func skipIt(t *testing.T) { } } -func createMockStorage(rootDir string, store driver.StorageDriver) storage.ImageStore { +func createMockStorage(rootDir string, cacheDir string, dedupe bool, store driver.StorageDriver) storage.ImageStore { log := log.Logger{Logger: zerolog.New(os.Stdout)} metrics := monitoring.NewMetricsServer(false, log) - il := s3.NewImageStore(rootDir, false, storage.DefaultGCDelay, false, false, log, metrics, store) + il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, dedupe, false, log, metrics, store) return il } -func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStore, error) { +func createObjectsStore(rootDir string, cacheDir string, dedupe bool) ( + driver.StorageDriver, + storage.ImageStore, + error, +) { bucket := "zot-storage-test" endpoint := os.Getenv("S3MOCK_ENDPOINT") storageDriverParams := map[string]interface{}{ @@ -67,6 +74,8 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor "region": "us-east-2", "bucket": bucket, "regionendpoint": endpoint, + "accesskey": "minioadmin", + "secretkey": "minioadmin", "secure": false, "skipverify": false, } @@ -86,13 +95,14 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor log := log.Logger{Logger: zerolog.New(os.Stdout)} metrics := monitoring.NewMetricsServer(false, log) - il := s3.NewImageStore(rootDir, false, storage.DefaultGCDelay, false, false, log, metrics, store) + il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, dedupe, false, log, metrics, store) return store, il, err } type FileInfoMock struct { isDirFn func() bool + sizeFn func() int64 } func (f *FileInfoMock) Path() string { @@ -100,6 +110,10 @@ func (f *FileInfoMock) Path() string { } func (f *FileInfoMock) Size() int64 { + if f != nil && f.sizeFn != nil { + return f.sizeFn() + } + return int64(fileInfoSize) } @@ -265,7 +279,7 @@ func TestStorageDriverStatFunction(t *testing.T) { testDir := path.Join("/oci-repo-test", uuid.String()) - storeDriver, imgStore, _ := createObjectsStore(testDir) + storeDriver, imgStore, _ := createObjectsStore(testDir, t.TempDir(), true) defer cleanupStorage(storeDriver, testDir) /* There is an issue with storageDriver.Stat() that returns a storageDriver.FileInfo() @@ -345,11 +359,12 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { testDir := path.Join("/oci-repo-test", uuid.String()) - storeDriver, imgStore, _ := createObjectsStore(testDir) + tdir := t.TempDir() + + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) defer cleanupStorage(storeDriver, testDir) Convey("Invalid validate repo", t, func(c C) { - So(imgStore, ShouldNotBeNil) So(imgStore.InitRepo(testImage), ShouldBeNil) objects, err := storeDriver.List(context.Background(), path.Join(imgStore.RootDir(), testImage)) So(err, ShouldBeNil) @@ -365,9 +380,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Invalid get image tags", t, func(c C) { - storeDriver, imgStore, err := createObjectsStore(testDir) - defer cleanupStorage(storeDriver, testDir) - So(err, ShouldBeNil) So(imgStore.InitRepo(testImage), ShouldBeNil) So(storeDriver.Move(context.Background(), path.Join(testDir, testImage, "index.json"), @@ -386,10 +398,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Invalid get image manifest", t, func(c C) { - storeDriver, imgStore, err := createObjectsStore(testDir) - defer cleanupStorage(storeDriver, testDir) - So(err, ShouldBeNil) - So(imgStore, ShouldNotBeNil) So(imgStore.InitRepo(testImage), ShouldBeNil) So(storeDriver.Delete(context.Background(), path.Join(testDir, testImage, "index.json")), ShouldBeNil) _, _, _, err = imgStore.GetImageManifest(testImage, "") @@ -402,9 +410,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Invalid validate repo", t, func(c C) { - storeDriver, imgStore, err := createObjectsStore(testDir) - defer cleanupStorage(storeDriver, testDir) - So(err, ShouldBeNil) So(imgStore, ShouldNotBeNil) So(imgStore.InitRepo(testImage), ShouldBeNil) @@ -421,9 +426,6 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Invalid finish blob upload", t, func(c C) { - storeDriver, imgStore, err := createObjectsStore(testDir) - defer cleanupStorage(storeDriver, testDir) - So(err, ShouldBeNil) So(imgStore, ShouldNotBeNil) So(imgStore.InitRepo(testImage), ShouldBeNil) @@ -455,7 +457,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test storage driver errors", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ listFn: func(ctx context.Context, path string) ([]string, error) { return []string{testImage}, errS3 }, @@ -527,17 +529,32 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test ValidateRepo", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + tdir := t.TempDir() + + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ listFn: func(ctx context.Context, path string) ([]string, error) { return []string{testImage, testImage}, errS3 }, }) + _, err := imgStore.ValidateRepo(testImage) So(err, ShouldNotBeNil) + + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ + listFn: func(ctx context.Context, path string) ([]string, error) { + return []string{testImage, testImage}, nil + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return nil, errS3 + }, + }) + + _, err = imgStore.ValidateRepo(testImage) + So(err, ShouldNotBeNil) }) Convey("Test ValidateRepo2", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ listFn: func(ctx context.Context, path string) ([]string, error) { return []string{"test/test/oci-layout", "test/test/index.json"}, nil }, @@ -550,7 +567,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test ValidateRepo3", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ listFn: func(ctx context.Context, path string) ([]string, error) { return []string{"test/test/oci-layout", "test/test/index.json"}, nil }, @@ -567,7 +584,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { Convey("Test ValidateRepo4", t, func(c C) { ociLayout := []byte(`{"imageLayoutVersion": "9.9.9"}`) - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ listFn: func(ctx context.Context, path string) ([]string, error) { return []string{"test/test/oci-layout", "test/test/index.json"}, nil }, @@ -583,7 +600,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test GetRepositories", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ walkFn: func(ctx context.Context, path string, f driver.WalkFn) error { return f(new(FileInfoMock)) }, @@ -594,7 +611,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test DeleteImageManifest", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ getContentFn: func(ctx context.Context, path string) ([]byte, error) { return []byte{}, errS3 }, @@ -604,13 +621,13 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test DeleteImageManifest2", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{}) + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{}) err := imgStore.DeleteImageManifest(testImage, "1.0") So(err, ShouldNotBeNil) }) Convey("Test NewBlobUpload", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ putContentFn: func(ctx context.Context, path string, content []byte) error { return errS3 }, @@ -620,7 +637,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test GetBlobUpload", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { return &FileInfoMock{}, errS3 }, @@ -630,7 +647,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test PutBlobChunkStreamed", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{}, errS3 }, @@ -640,7 +657,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test PutBlobChunkStreamed2", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{writeFn: func(b []byte) (int, error) { return 0, errS3 @@ -652,7 +669,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test PutBlobChunk", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{}, errS3 }, @@ -662,7 +679,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test PutBlobChunk2", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{ writeFn: func(b []byte) (int, error) { @@ -679,7 +696,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test PutBlobChunk3", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{ writeFn: func(b []byte) (int, error) { @@ -693,7 +710,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test FinishBlobUpload", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{ commitFn: func() error { @@ -708,7 +725,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test FinishBlobUpload2", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{ closeFn: func() error { @@ -723,7 +740,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test FinishBlobUpload3", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ readerFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { return nil, errS3 }, @@ -734,7 +751,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test FinishBlobUpload4", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ moveFn: func(ctx context.Context, sourcePath, destPath string) error { return errS3 }, @@ -745,7 +762,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test FullBlobUpload", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { return &FileWriterMock{}, errS3 }, @@ -756,14 +773,14 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test FullBlobUpload2", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{}) + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{}) d := godigest.FromBytes([]byte(" ")) _, _, err := imgStore.FullBlobUpload(testImage, ioutil.NopCloser(strings.NewReader("")), d.String()) So(err, ShouldNotBeNil) }) Convey("Test FullBlobUpload3", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ moveFn: func(ctx context.Context, sourcePath, destPath string) error { return errS3 }, @@ -774,7 +791,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test GetBlob", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ readerFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { return ioutil.NopCloser(strings.NewReader("")), errS3 }, @@ -785,7 +802,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test DeleteBlob", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ deleteFn: func(ctx context.Context, path string) error { return errS3 }, @@ -796,7 +813,7 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { }) Convey("Test GetReferrers", t, func(c C) { - imgStore = createMockStorage(testDir, &StorageDriverMock{ + imgStore = createMockStorage(testDir, tdir, false, &StorageDriverMock{ deleteFn: func(ctx context.Context, path string) error { return errS3 }, @@ -807,3 +824,622 @@ func TestNegativeCasesObjectsStorage(t *testing.T) { So(err, ShouldEqual, zerr.ErrMethodNotSupported) }) } + +func TestS3Dedupe(t *testing.T) { + skipIt(t) + Convey("Dedupe", t, func(c C) { + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir := path.Join("/oci-repo-test", uuid.String()) + + tdir := t.TempDir() + + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) + defer cleanupStorage(storeDriver, testDir) + + // manifest1 + upload, err := imgStore.NewBlobUpload("dedupe1") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content := []byte("test-data3") + buf := bytes.NewBuffer(content) + buflen := buf.Len() + digest := godigest.FromBytes(content) + blob, err := imgStore.PutBlobChunkStreamed("dedupe1", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + blobDigest1 := strings.Split(digest.String(), ":")[1] + So(blobDigest1, ShouldNotBeEmpty) + + err = imgStore.FinishBlobUpload("dedupe1", upload, buf, digest.String()) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + _, checkBlobSize1, err := imgStore.CheckBlob("dedupe1", digest.String()) + So(checkBlobSize1, ShouldBeGreaterThan, 0) + So(err, ShouldBeNil) + + _, getBlobSize1, err := imgStore.GetBlob("dedupe1", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(getBlobSize1, ShouldBeGreaterThan, 0) + So(err, ShouldBeNil) + + cblob, cdigest := test.GetRandomImageConfig() + _, clen, err := imgStore.FullBlobUpload("dedupe1", bytes.NewReader(cblob), cdigest.String()) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + hasBlob, _, err := imgStore.CheckBlob("dedupe1", cdigest.String()) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + + manifest := ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(buflen), + }, + }, + } + + manifest.SchemaVersion = 2 + manifestBuf, err := json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe1", digest.String(), ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe1", digest.String()) + So(err, ShouldBeNil) + + // manifest2 + upload, err = imgStore.NewBlobUpload("dedupe2") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content = []byte("test-data3") + buf = bytes.NewBuffer(content) + buflen = buf.Len() + digest = godigest.FromBytes(content) + + blob, err = imgStore.PutBlobChunkStreamed("dedupe2", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + blobDigest2 := strings.Split(digest.String(), ":")[1] + So(blobDigest2, ShouldNotBeEmpty) + + err = imgStore.FinishBlobUpload("dedupe2", upload, buf, digest.String()) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + _, checkBlobSize2, err := imgStore.CheckBlob("dedupe2", digest.String()) + So(err, ShouldBeNil) + So(checkBlobSize2, ShouldBeGreaterThan, 0) + + _, getBlobSize2, err := imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + So(getBlobSize2, ShouldBeGreaterThan, 0) + So(checkBlobSize1, ShouldEqual, checkBlobSize2) + So(getBlobSize1, ShouldEqual, getBlobSize2) + + cblob, cdigest = test.GetRandomImageConfig() + _, clen, err = imgStore.FullBlobUpload("dedupe2", bytes.NewReader(cblob), cdigest.String()) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + hasBlob, _, err = imgStore.CheckBlob("dedupe2", cdigest.String()) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + + manifest = ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(buflen), + }, + }, + } + manifest.SchemaVersion = 2 + manifestBuf, err = json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe2", "1.0", ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe2", digest.String()) + So(err, ShouldBeNil) + + fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + // original blob should have the real content of blob + So(fi1.Size(), ShouldNotEqual, fi2.Size()) + So(fi1.Size(), ShouldBeGreaterThan, 0) + // deduped blob should be of size 0 + So(fi2.Size(), ShouldEqual, 0) + + Convey("Check that delete blobs moves the real content to the next contenders", func() { + // if we delete blob1, the content should be moved to blob2 + err = imgStore.DeleteBlob("dedupe1", "sha256:"+blobDigest1) + So(err, ShouldBeNil) + + _, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldNotBeNil) + + fi2, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + So(fi2.Size(), ShouldBeGreaterThan, 0) + // the second blob should now be equal to the deleted blob. + So(fi2.Size(), ShouldEqual, fi1.Size()) + + err = imgStore.DeleteBlob("dedupe2", "sha256:"+blobDigest2) + So(err, ShouldBeNil) + + _, err = storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest2)) + So(err, ShouldNotBeNil) + }) + + Convey("Check backward compatibility - switch dedupe to false", func() { + /* copy cache to the new storage with dedupe false (doing this because we + already have a cache object holding the lock on cache db file) */ + input, err := ioutil.ReadFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName)) + So(err, ShouldBeNil) + + tdir = t.TempDir() + + err = ioutil.WriteFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName), input, 0o600) + So(err, ShouldBeNil) + + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, false) + defer cleanupStorage(storeDriver, testDir) + + // manifest3 without dedupe + upload, err = imgStore.NewBlobUpload("dedupe3") + So(err, ShouldBeNil) + So(upload, ShouldNotBeEmpty) + + content = []byte("test-data3") + buf = bytes.NewBuffer(content) + buflen = buf.Len() + digest = godigest.FromBytes(content) + + blob, err = imgStore.PutBlobChunkStreamed("dedupe3", upload, buf) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + blobDigest2 := strings.Split(digest.String(), ":")[1] + So(blobDigest2, ShouldNotBeEmpty) + + err = imgStore.FinishBlobUpload("dedupe3", upload, buf, digest.String()) + So(err, ShouldBeNil) + So(blob, ShouldEqual, buflen) + + _, _, err = imgStore.CheckBlob("dedupe3", digest.String()) + So(err, ShouldBeNil) + + // check that we retrieve the real dedupe2/blob (which is deduped earlier - 0 size) when switching to dedupe false + _, getBlobSize2, err = imgStore.GetBlob("dedupe2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + So(getBlobSize1, ShouldEqual, getBlobSize2) + + _, checkBlobSize2, err := imgStore.CheckBlob("dedupe2", digest.String()) + So(err, ShouldBeNil) + So(checkBlobSize2, ShouldBeGreaterThan, 0) + So(checkBlobSize2, ShouldEqual, getBlobSize2) + + _, getBlobSize3, err := imgStore.GetBlob("dedupe3", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldBeNil) + So(getBlobSize1, ShouldEqual, getBlobSize3) + + _, checkBlobSize3, err := imgStore.CheckBlob("dedupe3", digest.String()) + So(err, ShouldBeNil) + So(checkBlobSize3, ShouldBeGreaterThan, 0) + So(checkBlobSize3, ShouldEqual, getBlobSize3) + + cblob, cdigest = test.GetRandomImageConfig() + _, clen, err = imgStore.FullBlobUpload("dedupe3", bytes.NewReader(cblob), cdigest.String()) + So(err, ShouldBeNil) + So(clen, ShouldEqual, len(cblob)) + hasBlob, _, err = imgStore.CheckBlob("dedupe3", cdigest.String()) + So(err, ShouldBeNil) + So(hasBlob, ShouldEqual, true) + + manifest = ispec.Manifest{ + Config: ispec.Descriptor{ + MediaType: "application/vnd.oci.image.config.v1+json", + Digest: cdigest, + Size: int64(len(cblob)), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: digest, + Size: int64(buflen), + }, + }, + } + manifest.SchemaVersion = 2 + manifestBuf, err = json.Marshal(manifest) + So(err, ShouldBeNil) + digest = godigest.FromBytes(manifestBuf) + _, err = imgStore.PutImageManifest("dedupe3", "1.0", ispec.MediaTypeImageManifest, manifestBuf) + So(err, ShouldBeNil) + + _, _, _, err = imgStore.GetImageManifest("dedupe3", digest.String()) + So(err, ShouldBeNil) + + fi1, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe1", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + + fi2, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe2", "blobs", "sha256", blobDigest1)) + So(err, ShouldBeNil) + So(fi2.Size(), ShouldEqual, 0) + + fi3, err := storeDriver.Stat(context.Background(), path.Join(testDir, "dedupe3", "blobs", "sha256", blobDigest2)) + So(err, ShouldBeNil) + + // the new blob with dedupe false should be equal with the origin blob from dedupe1 + So(fi1.Size(), ShouldEqual, fi3.Size()) + }) + }) +} + +func TestS3DedupeErr(t *testing.T) { + skipIt(t) + + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir := path.Join("/oci-repo-test", uuid.String()) + + tdir := t.TempDir() + + storeDriver, imgStore, _ := createObjectsStore(testDir, tdir, true) + defer cleanupStorage(storeDriver, testDir) + + Convey("Test DedupeBlob", t, func(c C) { + tdir := t.TempDir() + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{}) + + err = os.Remove(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName)) + digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") + + // trigger unable to insert blob record + err := imgStore.DedupeBlob("", digest, "") + So(err, ShouldNotBeNil) + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + moveFn: func(ctx context.Context, sourcePath string, destPath string) error { + return errS3 + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return driver.FileInfoInternal{}, errS3 + }, + }) + + // trigger unable to rename blob + err = imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldNotBeNil) + + // trigger retry + err = imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldNotBeNil) + }) + + Convey("Test DedupeBlob - error on second store.Stat()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path == "dst2" { + return driver.FileInfoInternal{}, errS3 + } + + return driver.FileInfoInternal{}, nil + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") + err := imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("", digest, "dst2") + So(err, ShouldNotBeNil) + }) + + Convey("Test DedupeBlob - error on store.PutContent()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + putContentFn: func(ctx context.Context, path string, content []byte) error { + return errS3 + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return nil, nil + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") + err := imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("", digest, "dst2") + So(err, ShouldNotBeNil) + }) + + Convey("Test DedupeBlob - error on store.Delete()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + deleteFn: func(ctx context.Context, path string) error { + return errS3 + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return nil, nil + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, "digest") + err := imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("", digest, "dst") + So(err, ShouldNotBeNil) + }) + + Convey("Test copyBlob() - error on initRepo()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + putContentFn: func(ctx context.Context, path string, content []byte) error { + return errS3 + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return driver.FileInfoInternal{}, errS3 + }, + writerFn: func(ctx context.Context, path string, isAppend bool) (driver.FileWriter, error) { + return &FileWriterMock{}, errS3 + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, + "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") + + err := imgStore.DedupeBlob("repo", digest, "dst") + So(err, ShouldBeNil) + + _, _, err = imgStore.CheckBlob("repo", digest.String()) + So(err, ShouldNotBeNil) + }) + + Convey("Test copyBlob() - error on store.PutContent()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + putContentFn: func(ctx context.Context, path string, content []byte) error { + return errS3 + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return driver.FileInfoInternal{}, errS3 + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, + "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") + + err := imgStore.DedupeBlob("repo", digest, "dst") + So(err, ShouldBeNil) + + _, _, err = imgStore.CheckBlob("repo", digest.String()) + So(err, ShouldNotBeNil) + }) + + Convey("Test copyBlob() - error on store.Stat()", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return driver.FileInfoInternal{}, errS3 + }, + }) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, + "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") + + err := imgStore.DedupeBlob("repo", digest, "dst") + So(err, ShouldBeNil) + + _, _, err = imgStore.CheckBlob("repo", digest.String()) + So(err, ShouldNotBeNil) + }) + + Convey("Test GetBlob() - error on second store.Stat()", t, func(c C) { + tdir := t.TempDir() + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{}) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, + "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") + + err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2") + So(err, ShouldBeNil) + + // copy cache db to the new imagestore + input, err := ioutil.ReadFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName)) + So(err, ShouldBeNil) + + tdir = t.TempDir() + + err = ioutil.WriteFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName), input, 0o600) + So(err, ShouldBeNil) + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if strings.Contains(path, "repo1/dst1") { + return driver.FileInfoInternal{}, driver.PathNotFoundError{} + } + + return driver.FileInfoInternal{}, nil + }, + }) + + _, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldNotBeNil) + }) + + Convey("Test GetBlob() - error on store.Reader()", t, func(c C) { + tdir := t.TempDir() + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{}) + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, + "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc") + + err := imgStore.DedupeBlob("/src/dst", digest, "/repo1/dst1") + So(err, ShouldBeNil) + + err = imgStore.DedupeBlob("/src/dst", digest, "/repo2/dst2") + So(err, ShouldBeNil) + + // copy cache db to the new imagestore + input, err := ioutil.ReadFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName)) + So(err, ShouldBeNil) + + tdir = t.TempDir() + + err = ioutil.WriteFile(path.Join(tdir, s3.CacheDBName+storage.DBExtensionName), input, 0o600) + So(err, ShouldBeNil) + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return &FileInfoMock{ + sizeFn: func() int64 { + return 0 + }, + }, nil + }, + readerFn: func(ctx context.Context, path string, offset int64) (io.ReadCloser, error) { + if strings.Contains(path, "repo1/dst1") { + return ioutil.NopCloser(strings.NewReader("")), errS3 + } + + return ioutil.NopCloser(strings.NewReader("")), nil + }, + }) + + _, _, err = imgStore.GetBlob("repo2", digest.String(), "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldNotBeNil) + }) + + Convey("Test DeleteBlob() - error on store.Move()", t, func(c C) { + tdir := t.TempDir() + hash := "7173b809ca12ec5dee4506cd86be934c4596dd234ee82c0662eac04a8c2c71dc" + + digest := godigest.NewDigestFromEncoded(godigest.SHA256, hash) + + blobPath := path.Join(testDir, "repo/blobs/sha256", hash) + + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + moveFn: func(ctx context.Context, sourcePath, destPath string) error { + if destPath == blobPath { + return nil + } + + return errS3 + }, + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + if path != blobPath { + return nil, errS3 + } + + return &FileInfoMock{}, nil + }, + }) + + err := imgStore.DedupeBlob("repo", digest, blobPath) + So(err, ShouldBeNil) + + _, _, err = imgStore.CheckBlob("repo2", digest.String()) + So(err, ShouldBeNil) + + err = imgStore.DeleteBlob("repo", digest.String()) + So(err, ShouldNotBeNil) + }) + + Convey("Test FullBlobUpload", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + moveFn: func(ctx context.Context, sourcePath, destPath string) error { + return errS3 + }, + }) + d := godigest.FromBytes([]byte("")) + _, _, err := imgStore.FullBlobUpload(testImage, ioutil.NopCloser(strings.NewReader("")), d.String()) + So(err, ShouldNotBeNil) + }) + + Convey("Test FinishBlobUpload", t, func(c C) { + tdir := t.TempDir() + imgStore = createMockStorage(testDir, tdir, true, &StorageDriverMock{ + moveFn: func(ctx context.Context, sourcePath, destPath string) error { + return errS3 + }, + }) + d := godigest.FromBytes([]byte("")) + err := imgStore.FinishBlobUpload(testImage, "uuid", ioutil.NopCloser(strings.NewReader("")), d.String()) + So(err, ShouldNotBeNil) + }) +} + +func TestInjectDedupe(t *testing.T) { + tdir := t.TempDir() + + uuid, err := guuid.NewV4() + if err != nil { + panic(err) + } + + testDir := path.Join("/oci-repo-test", uuid.String()) + + Convey("Inject errors in DedupeBlob function", t, func() { + imgStore := createMockStorage(testDir, tdir, true, &StorageDriverMock{ + statFn: func(ctx context.Context, path string) (driver.FileInfo, error) { + return &FileInfoMock{}, errS3 + }, + }) + err := imgStore.DedupeBlob("blob", "digest", "newblob") + So(err, ShouldBeNil) + + injected := test.InjectFailure(0) + err = imgStore.DedupeBlob("blob", "digest", "newblob") + if injected { + So(err, ShouldNotBeNil) + } else { + So(err, ShouldBeNil) + } + + injected = test.InjectFailure(1) + err = imgStore.DedupeBlob("blob", "digest", "newblob") + if injected { + So(err, ShouldNotBeNil) + } else { + So(err, ShouldBeNil) + } + }) +} diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 26ccff9a..da845f24 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -8,9 +8,9 @@ import ( "errors" "fmt" "io" + "os" "path" "path/filepath" - "strings" "sync" "time" @@ -27,11 +27,13 @@ import ( "zotregistry.io/zot/pkg/extensions/monitoring" zlog "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" + "zotregistry.io/zot/pkg/test" ) const ( - RLOCK = "RLock" - RWLOCK = "RWLock" + RLOCK = "RLock" + RWLOCK = "RWLock" + CacheDBName = "s3_cache" ) // ObjectStorage provides the image storage operations. @@ -46,6 +48,8 @@ type ObjectStorage struct { // see: https://github.com/distribution/distribution/blob/main/registry/storage/driver/s3-aws/s3.go#L545 multiPartUploads sync.Map metrics monitoring.MetricServer + cache *storage.Cache + dedupe bool } func (is *ObjectStorage) RootDir() string { @@ -62,7 +66,7 @@ func (is *ObjectStorage) DirExists(d string) bool { // NewObjectStorage returns a new image store backed by cloud storages. // see https://github.com/docker/docker.github.io/tree/master/registry/storage-drivers -func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commit bool, +func NewImageStore(rootDir string, cacheDir string, gc bool, gcDelay time.Duration, dedupe, commit bool, log zlog.Logger, metrics monitoring.MetricServer, store driver.StorageDriver, ) storage.ImageStore { @@ -74,6 +78,19 @@ func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commi log: log.With().Caller().Logger(), multiPartUploads: sync.Map{}, metrics: metrics, + dedupe: dedupe, + } + + cachePath := path.Join(cacheDir, CacheDBName+storage.DBExtensionName) + + if dedupe { + imgStore.cache = storage.NewCache(cacheDir, CacheDBName, false, log) + } else { + // if dedupe was used in previous runs use it to serve blobs correctly + if _, err := os.Stat(cachePath); err == nil { + log.Info().Str("cache path", cachePath).Msg("found cache database") + imgStore.cache = storage.NewCache(cacheDir, CacheDBName, false, log) + } } return imgStore @@ -197,15 +214,11 @@ func (is *ObjectStorage) ValidateRepo(name string) (bool, error) { } for _, file := range files { - f, err := is.store.Stat(context.Background(), file) + _, err := is.store.Stat(context.Background(), file) if err != nil { return false, err } - if strings.HasSuffix(file, "blobs") && !f.IsDir() { - return false, nil - } - filename, err := filepath.Rel(dir, file) if err != nil { return false, err @@ -923,11 +936,20 @@ func (is *ObjectStorage) FinishBlobUpload(repo, uuid string, body io.Reader, dig is.Lock(&lockLatency) defer is.Unlock(&lockLatency) - if err := is.store.Move(context.Background(), src, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). - Str("dst", dst).Msg("unable to finish blob") + if is.dedupe && is.cache != nil { + if err := is.DedupeBlob(src, dstDigest, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to dedupe blob") - return err + return err + } + } else { + if err := is.store.Move(context.Background(), src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to finish blob") + + return err + } } is.multiPartUploads.Delete(src) @@ -994,17 +1016,104 @@ func (is *ObjectStorage) FullBlobUpload(repo string, body io.Reader, digest stri dst := is.BlobPath(repo, dstDigest) - if err := is.store.Move(context.Background(), src, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). - Str("dst", dst).Msg("unable to finish blob") + if is.dedupe && is.cache != nil { + if err := is.DedupeBlob(src, dstDigest, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to dedupe blob") - return "", -1, err + return "", -1, err + } + } else { + if err := is.store.Move(context.Background(), src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to finish blob") + + return "", -1, err + } } return uuid, int64(nbytes), nil } func (is *ObjectStorage) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error { +retry: + is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: enter") + + dstRecord, err := is.cache.GetBlob(dstDigest.String()) + if err := test.Error(err); err != nil && !errors.Is(err, zerr.ErrCacheMiss) { + is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record") + + return err + } + + if dstRecord == "" { + // cache record doesn't exist, so first disk and cache entry for this digest + if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil { + is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") + + return err + } + + // move the blob from uploads to final dest + if err := is.store.Move(context.Background(), src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob") + + return err + } + + is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename") + } else { + // cache record exists, but due to GC and upgrades from older versions, + // disk content and cache records may go out of sync + _, err := is.store.Stat(context.Background(), dstRecord) + if err != nil { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") + // the actual blob on disk may have been removed by GC, so sync the cache + err := is.cache.DeleteBlob(dstDigest.String(), dstRecord) + if err = test.Error(err); err != nil { + // nolint:lll + is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record") + + return err + } + + goto retry + } + + fileInfo, err := is.store.Stat(context.Background(), dst) + if err != nil && !errors.As(err, &driver.PathNotFoundError{}) { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") + + return err + } + + if dstRecord == dst { + is.log.Warn().Msg("FOUND equal dsts") + } + + // prevent overwrite original blob + if fileInfo == nil && dstRecord != dst { + // put empty file so that we are compliant with oci layout, this will act as a deduped blob + err = is.store.PutContent(context.Background(), dst, []byte{}) + if err != nil { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to write empty file") + + return err + } + } else { + is.log.Warn().Msg("prevent overwrite") + } + + // remove temp blobupload + if err := is.store.Delete(context.Background(), src); err != nil { + is.log.Error().Err(err).Str("src", src).Msg("dedupe: unable to remove blob") + + return err + } + + is.log.Debug().Str("src", src).Msg("dedupe: remove") + } + return nil } @@ -1041,24 +1150,81 @@ func (is *ObjectStorage) CheckBlob(repo, digest string) (bool, int64, error) { blobPath := is.BlobPath(repo, dgst) - is.RLock(&lockLatency) - defer is.RUnlock(&lockLatency) + if is.dedupe && is.cache != nil { + is.Lock(&lockLatency) + defer is.Unlock(&lockLatency) + } else { + is.RLock(&lockLatency) + defer is.RUnlock(&lockLatency) + } binfo, err := is.store.Stat(context.Background(), blobPath) - if err != nil { - var perr driver.PathNotFoundError - if errors.As(err, &perr) { - return false, -1, zerr.ErrBlobNotFound - } + if err == nil && binfo.Size() > 0 { + is.log.Debug().Str("blob path", blobPath).Msg("blob path found") - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + return true, binfo.Size(), nil + } + // otherwise is a 'deduped' blob (empty file) + + // Check blobs in cache + dstRecord, err := is.checkCacheBlob(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found") + + return false, -1, zerr.ErrBlobNotFound + } + + // If found copy to location + blobSize, err := is.copyBlob(repo, blobPath, dstRecord) + if err != nil { + return false, -1, zerr.ErrBlobNotFound + } + + // put deduped blob in cache + if err := is.cache.PutBlob(digest, blobPath); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Msg("dedupe: unable to insert blob record") return false, -1, err } - is.log.Debug().Str("blob path", blobPath).Msg("blob path found") + return true, blobSize, nil +} - return true, binfo.Size(), nil +func (is *ObjectStorage) checkCacheBlob(digest string) (string, error) { + if is.cache == nil { + return "", zerr.ErrBlobNotFound + } + + dstRecord, err := is.cache.GetBlob(digest) + if err != nil { + return "", err + } + + is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record") + + return dstRecord, nil +} + +func (is *ObjectStorage) copyBlob(repo string, blobPath string, dstRecord string) (int64, error) { + if err := is.initRepo(repo); err != nil { + is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo") + + return -1, err + } + + if err := is.store.PutContent(context.Background(), blobPath, []byte{}); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to link") + + return -1, zerr.ErrBlobNotFound + } + + // return original blob with content instead of the deduped one (blobPath) + binfo, err := is.store.Stat(context.Background(), dstRecord) + if err == nil { + return binfo.Size(), nil + } + + return -1, zerr.ErrBlobNotFound } // GetBlob returns a stream to read the blob. @@ -1092,6 +1258,36 @@ func (is *ObjectStorage) GetBlob(repo, digest, mediaType string) (io.Reader, int return nil, -1, err } + // is a 'deduped' blob + if binfo.Size() == 0 && is.cache != nil { + // Check blobs in cache + dstRecord, err := is.checkCacheBlob(digest) + if err == nil { + binfo, err := is.store.Stat(context.Background(), dstRecord) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to stat blob") + + // the actual blob on disk may have been removed by GC, so sync the cache + if err := is.cache.DeleteBlob(digest, dstRecord); err != nil { + is.log.Error().Err(err).Str("dstDigest", digest).Str("dst", dstRecord).Msg("dedupe: unable to delete blob record") + + return nil, -1, err + } + + return nil, -1, zerr.ErrBlobNotFound + } + + blobReader, err := is.store.Reader(context.Background(), dstRecord, 0) + if err != nil { + is.log.Error().Err(err).Str("blob", dstRecord).Msg("failed to open blob") + + return nil, -1, err + } + + return blobReader, binfo.Size(), nil + } + } + return blobReader, binfo.Size(), nil } @@ -1158,6 +1354,44 @@ func (is *ObjectStorage) DeleteBlob(repo, digest string) error { return zerr.ErrBlobNotFound } + if is.cache != nil { + dstRecord, err := is.cache.GetBlob(digest) + if err != nil && !errors.Is(err, zerr.ErrCacheMiss) { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to lookup blob record") + + return err + } + + // remove cache entry and move blob contents to the next candidate if there is any + if err := is.cache.DeleteBlob(digest, blobPath); err != nil { + is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache") + + return err + } + + // if the deleted blob is one with content + if dstRecord == blobPath { + // get next candidate + dstRecord, err := is.cache.GetBlob(digest) + if err != nil && !errors.Is(err, zerr.ErrCacheMiss) { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to lookup blob record") + + return err + } + + // if we have a new candidate move the blob content to it + if dstRecord != "" { + if err := is.store.Move(context.Background(), blobPath, dstRecord); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path") + + return err + } + + return nil + } + } + } + if err := is.store.Delete(context.Background(), blobPath); err != nil { is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path") diff --git a/pkg/storage/storage_fs.go b/pkg/storage/storage_fs.go index 1ea4b92b..29622989 100644 --- a/pkg/storage/storage_fs.go +++ b/pkg/storage/storage_fs.go @@ -129,7 +129,7 @@ func NewImageStore(rootDir string, gc bool, gcDelay time.Duration, dedupe, commi } if dedupe { - imgStore.cache = NewCache(rootDir, "cache", log) + imgStore.cache = NewCache(rootDir, "cache", true, log) } if gc { diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index d814f380..4f781a23 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -42,7 +42,7 @@ func skipIt(t *testing.T) { } } -func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStore, error) { +func createObjectsStore(rootDir string, cacheDir string) (driver.StorageDriver, storage.ImageStore, error) { bucket := "zot-storage-test" endpoint := os.Getenv("S3MOCK_ENDPOINT") storageDriverParams := map[string]interface{}{ @@ -51,6 +51,8 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor "region": "us-east-2", "bucket": bucket, "regionendpoint": endpoint, + "accesskey": "minioadmin", + "secretkey": "minioadmin", "secure": false, "skipverify": false, } @@ -71,7 +73,7 @@ func createObjectsStore(rootDir string) (driver.StorageDriver, storage.ImageStor log := log.Logger{Logger: zerolog.New(os.Stdout)} metrics := monitoring.NewMetricsServer(false, log) - il := s3.NewImageStore(rootDir, false, storage.DefaultGCDelay, false, false, log, metrics, store) + il := s3.NewImageStore(rootDir, cacheDir, false, storage.DefaultGCDelay, true, false, log, metrics, store) return store, il, err } @@ -105,9 +107,10 @@ func TestStorageAPIs(t *testing.T) { } testDir := path.Join("/oci-repo-test", uuid.String()) + tdir := t.TempDir() var store driver.StorageDriver - store, imgStore, _ = createObjectsStore(testDir) + store, imgStore, _ = createObjectsStore(testDir, tdir) defer cleanupStorage(store, testDir) } else { dir := t.TempDir() @@ -676,15 +679,15 @@ func TestStorageHandler(t *testing.T) { var thirdStorageDriver driver.StorageDriver firstRootDir = "/util_test1" - firstStorageDriver, firstStore, _ = createObjectsStore(firstRootDir) + firstStorageDriver, firstStore, _ = createObjectsStore(firstRootDir, t.TempDir()) defer cleanupStorage(firstStorageDriver, firstRootDir) secondRootDir = "/util_test2" - secondStorageDriver, secondStore, _ = createObjectsStore(secondRootDir) + secondStorageDriver, secondStore, _ = createObjectsStore(secondRootDir, t.TempDir()) defer cleanupStorage(secondStorageDriver, secondRootDir) thirdRootDir = "/util_test3" - thirdStorageDriver, thirdStore, _ = createObjectsStore(thirdRootDir) + thirdStorageDriver, thirdStore, _ = createObjectsStore(thirdRootDir, t.TempDir()) defer cleanupStorage(thirdStorageDriver, thirdRootDir) } else { // Create temporary directory diff --git a/test/cluster/config-minio.json b/test/cluster/config-minio.json index 9da67e6e..64b9562b 100644 --- a/test/cluster/config-minio.json +++ b/test/cluster/config-minio.json @@ -1,11 +1,12 @@ { - "distSpecVersion": "1.0.0", + "distSpecVersion": "1.0.1", "storage": { - "rootDirectory": "/zot", + "rootDirectory": "/tmp/zot", "gc": false, "dedupe": false, "storageDriver": { "name": "s3", + "rootdirectory": "/zot", "region": "us-east-2", "bucket": "zot-storage", "regionendpoint": "http://localhost:9000",