refactor(storage): refactoring storage (#1459)

Signed-off-by: Laurentiu Niculae <niculae.laurentiu1@gmail.com>
This commit is contained in:
LaurentiuNiculae
2023-05-26 21:08:19 +03:00
committed by GitHub
parent 9acd19f7ea
commit a3f355c278
45 changed files with 850 additions and 751 deletions
+2 -2
View File
@@ -8,7 +8,7 @@ import (
distspec "github.com/opencontainers/distribution-spec/specs-go"
extconf "zotregistry.io/zot/pkg/extensions/config"
"zotregistry.io/zot/pkg/storage"
storageConstants "zotregistry.io/zot/pkg/storage/constants"
)
var (
@@ -147,7 +147,7 @@ func New() *Config {
ReleaseTag: ReleaseTag,
BinaryType: BinaryType,
Storage: GlobalStorageConfig{
StorageConfig: StorageConfig{GC: true, GCDelay: storage.DefaultGCDelay, Dedupe: true},
StorageConfig: StorageConfig{GC: true, GCDelay: storageConstants.DefaultGCDelay, Dedupe: true},
},
HTTP: HTTPConfig{Address: "127.0.0.1", Port: "8080", Auth: &AuthConfig{FailDelay: 0}},
Log: &LogConfig{Level: "debug"},
+5 -230
View File
@@ -14,7 +14,6 @@ import (
"syscall"
"time"
"github.com/docker/distribution/registry/storage/driver/factory"
"github.com/gorilla/handlers"
"github.com/gorilla/mux"
@@ -27,10 +26,6 @@ import (
"zotregistry.io/zot/pkg/meta/repodb/repodbfactory"
"zotregistry.io/zot/pkg/scheduler"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/cache"
"zotregistry.io/zot/pkg/storage/constants"
"zotregistry.io/zot/pkg/storage/local"
"zotregistry.io/zot/pkg/storage/s3"
)
const (
@@ -224,7 +219,7 @@ func (c *Controller) Init(reloadCtx context.Context) error {
c.Metrics = monitoring.NewMetricsServer(enabled, c.Log)
if err := c.InitImageStore(reloadCtx); err != nil {
if err := c.InitImageStore(); err != nil { //nolint:contextcheck
return err
}
@@ -244,235 +239,15 @@ func (c *Controller) InitCVEInfo() {
}
}
func (c *Controller) InitImageStore(ctx context.Context) error {
c.StoreController = storage.StoreController{}
func (c *Controller) InitImageStore() error {
linter := ext.GetLinter(c.Config, c.Log)
if c.Config.Storage.RootDirectory != "" {
// no need to validate hard links work on s3
if c.Config.Storage.Dedupe && c.Config.Storage.StorageDriver == nil {
err := local.ValidateHardLink(c.Config.Storage.RootDirectory)
if err != nil {
c.Log.Warn().Msg("input storage root directory filesystem does not supports hardlinking," +
"disabling dedupe functionality")
c.Config.Storage.Dedupe = false
}
}
var defaultStore storage.ImageStore
if c.Config.Storage.StorageDriver == nil {
// false positive lint - linter does not implement Lint method
//nolint:typecheck,contextcheck
defaultStore = local.NewImageStore(c.Config.Storage.RootDirectory,
c.Config.Storage.GC, c.Config.Storage.GCDelay,
c.Config.Storage.Dedupe, c.Config.Storage.Commit, c.Log, c.Metrics, linter,
CreateCacheDatabaseDriver(c.Config.Storage.StorageConfig, c.Log),
)
} else {
storeName := fmt.Sprintf("%v", c.Config.Storage.StorageDriver["name"])
if storeName != storage.S3StorageDriverName {
c.Log.Fatal().Err(errors.ErrBadConfig).Str("storageDriver", storeName).
Msg("unsupported storage driver")
}
// Init a Storager from connection string.
store, err := factory.Create(storeName, c.Config.Storage.StorageDriver)
if err != nil {
c.Log.Error().Err(err).Str("rootDir", c.Config.Storage.RootDirectory).Msg("unable to create s3 service")
return err
}
/* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and
c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */
rootDir := "/"
if c.Config.Storage.StorageDriver["rootdirectory"] != nil {
rootDir = fmt.Sprintf("%v", c.Config.Storage.StorageDriver["rootdirectory"])
}
// false positive lint - linter does not implement Lint method
//nolint: typecheck,contextcheck
defaultStore = s3.NewImageStore(rootDir, c.Config.Storage.RootDirectory,
c.Config.Storage.GC, c.Config.Storage.GCDelay, c.Config.Storage.Dedupe,
c.Config.Storage.Commit, c.Log, c.Metrics, linter, store,
CreateCacheDatabaseDriver(c.Config.Storage.StorageConfig, c.Log))
}
c.StoreController.DefaultStore = defaultStore
} else {
// we can't proceed without global storage
c.Log.Error().Err(errors.ErrImgStoreNotFound).Msg("controller: no storage config provided")
return errors.ErrImgStoreNotFound
}
if c.Config.Storage.SubPaths != nil {
if len(c.Config.Storage.SubPaths) > 0 {
subPaths := c.Config.Storage.SubPaths
//nolint: contextcheck
subImageStore, err := c.getSubStore(subPaths, linter)
if err != nil {
c.Log.Error().Err(err).Msg("controller: error getting sub image store")
return err
}
c.StoreController.SubStore = subImageStore
}
}
return nil
}
func (c *Controller) getSubStore(subPaths map[string]config.StorageConfig,
linter storage.Lint,
) (map[string]storage.ImageStore, error) {
imgStoreMap := make(map[string]storage.ImageStore, 0)
subImageStore := make(map[string]storage.ImageStore)
// creating image store per subpaths
for route, storageConfig := range subPaths {
// no need to validate hard links work on s3
if storageConfig.Dedupe && storageConfig.StorageDriver == nil {
err := local.ValidateHardLink(storageConfig.RootDirectory)
if err != nil {
c.Log.Warn().Msg("input storage root directory filesystem does not supports hardlinking, " +
"disabling dedupe functionality")
storageConfig.Dedupe = false
}
}
if storageConfig.StorageDriver == nil {
// Compare if subpath root dir is same as default root dir
isSame, _ := config.SameFile(c.Config.Storage.RootDirectory, storageConfig.RootDirectory)
if isSame {
c.Log.Error().Err(errors.ErrBadConfig).Msg("sub path storage directory is same as root directory")
return nil, errors.ErrBadConfig
}
isUnique := true
// Compare subpath unique files
for file := range imgStoreMap {
// We already have image storage for this file
if compareImageStore(file, storageConfig.RootDirectory) {
subImageStore[route] = imgStoreMap[file]
isUnique = true
}
}
// subpath root directory is unique
// add it to uniqueSubFiles
// Create a new image store and assign it to imgStoreMap
if isUnique {
imgStoreMap[storageConfig.RootDirectory] = local.NewImageStore(storageConfig.RootDirectory,
storageConfig.GC, storageConfig.GCDelay, storageConfig.Dedupe,
storageConfig.Commit, c.Log, c.Metrics, linter, CreateCacheDatabaseDriver(storageConfig, c.Log))
subImageStore[route] = imgStoreMap[storageConfig.RootDirectory]
}
} else {
storeName := fmt.Sprintf("%v", storageConfig.StorageDriver["name"])
if storeName != storage.S3StorageDriverName {
c.Log.Fatal().Err(errors.ErrBadConfig).Str("storageDriver", storeName).
Msg("unsupported storage driver")
}
// Init a Storager from connection string.
store, err := factory.Create(storeName, storageConfig.StorageDriver)
if err != nil {
c.Log.Error().Err(err).Str("rootDir", storageConfig.RootDirectory).Msg("Unable to create s3 service")
return nil, err
}
/* in the case of s3 c.Config.Storage.RootDirectory is used for caching blobs locally and
c.Config.Storage.StorageDriver["rootdirectory"] is the actual rootDir in s3 */
rootDir := "/"
if c.Config.Storage.StorageDriver["rootdirectory"] != nil {
rootDir = fmt.Sprintf("%v", c.Config.Storage.StorageDriver["rootdirectory"])
}
// false positive lint - linter does not implement Lint method
//nolint: typecheck
subImageStore[route] = s3.NewImageStore(rootDir, storageConfig.RootDirectory,
storageConfig.GC, storageConfig.GCDelay,
storageConfig.Dedupe, storageConfig.Commit, c.Log, c.Metrics, linter, store,
CreateCacheDatabaseDriver(storageConfig, c.Log),
)
}
}
return subImageStore, nil
}
func compareImageStore(root1, root2 string) bool {
isSameFile, err := config.SameFile(root1, root2)
// This error is path error that means either of root directory doesn't exist, in that case do string match
storeController, err := storage.New(c.Config, linter, c.Metrics, c.Log)
if err != nil {
return strings.EqualFold(root1, root2)
return err
}
return isSameFile
}
func getUseRelPaths(storageConfig *config.StorageConfig) bool {
return storageConfig.StorageDriver == nil
}
func CreateCacheDatabaseDriver(storageConfig config.StorageConfig, log log.Logger) cache.Cache {
if storageConfig.Dedupe || storageConfig.StorageDriver != nil {
if !storageConfig.RemoteCache {
params := cache.BoltDBDriverParameters{}
params.RootDir = storageConfig.RootDirectory
params.Name = constants.BoltdbName
if storageConfig.StorageDriver != nil {
params.Name = s3.CacheDBName
}
params.UseRelPaths = getUseRelPaths(&storageConfig)
driver, _ := storage.Create("boltdb", params, log)
return driver
}
// remote cache
if storageConfig.CacheDriver != nil {
name, ok := storageConfig.CacheDriver["name"].(string)
if !ok {
log.Warn().Msg("remote cache driver name missing!")
return nil
}
if name != constants.DynamoDBDriverName {
log.Warn().Str("driver", name).Msg("remote cache driver unsupported!")
return nil
}
// dynamodb
dynamoParams := cache.DynamoDBDriverParameters{}
dynamoParams.Endpoint, _ = storageConfig.CacheDriver["endpoint"].(string)
dynamoParams.Region, _ = storageConfig.CacheDriver["region"].(string)
dynamoParams.TableName, _ = storageConfig.CacheDriver["cachetablename"].(string)
driver, _ := storage.Create("dynamodb", dynamoParams, log)
return driver
}
return nil
}
c.StoreController = storeController
return nil
}
+18 -17
View File
@@ -51,8 +51,9 @@ import (
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb/repodbfactory"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
storageConstants "zotregistry.io/zot/pkg/storage/constants"
"zotregistry.io/zot/pkg/test"
"zotregistry.io/zot/pkg/test/inject"
)
const (
@@ -121,13 +122,13 @@ func TestCreateCacheDatabaseDriver(t *testing.T) {
panic(err)
}
driver := api.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
driver := storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
So(driver, ShouldBeNil)
conf.Storage.RemoteCache = true
conf.Storage.RootDirectory = t.TempDir()
driver = api.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
driver = storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
So(driver, ShouldBeNil)
})
skipDynamo(t)
@@ -160,7 +161,7 @@ func TestCreateCacheDatabaseDriver(t *testing.T) {
"versionTablename": "Version",
}
driver := api.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
driver := storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
So(driver, ShouldNotBeNil)
// negative test cases
@@ -175,7 +176,7 @@ func TestCreateCacheDatabaseDriver(t *testing.T) {
"versionTablename": "Version",
}
driver = api.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
driver = storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
So(driver, ShouldBeNil)
conf.Storage.CacheDriver = map[string]interface{}{
@@ -189,7 +190,7 @@ func TestCreateCacheDatabaseDriver(t *testing.T) {
"versionTablename": "Version",
}
driver = api.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
driver = storage.CreateCacheDatabaseDriver(conf.Storage.StorageConfig, log)
So(driver, ShouldBeNil)
})
}
@@ -360,7 +361,7 @@ func TestObjectStorageController(t *testing.T) {
conf.HTTP.Port = port
storageDriverParams := map[string]interface{}{
"rootdirectory": "zot",
"name": storage.S3StorageDriverName,
"name": storageConstants.S3StorageDriverName,
}
conf.Storage.StorageDriver = storageDriverParams
ctlr := makeController(conf, "zot", "")
@@ -380,7 +381,7 @@ func TestObjectStorageController(t *testing.T) {
storageDriverParams := map[string]interface{}{
"rootdirectory": "zot",
"name": storage.S3StorageDriverName,
"name": storageConstants.S3StorageDriverName,
"region": "us-east-2",
"bucket": bucket,
"regionendpoint": endpoint,
@@ -409,7 +410,7 @@ func TestObjectStorageControllerSubPaths(t *testing.T) {
storageDriverParams := map[string]interface{}{
"rootdirectory": "zot",
"name": storage.S3StorageDriverName,
"name": storageConstants.S3StorageDriverName,
"region": "us-east-2",
"bucket": bucket,
"regionendpoint": endpoint,
@@ -5533,7 +5534,7 @@ func TestManifestImageIndex(t *testing.T) {
Convey("Corrupt index", func() {
err = os.WriteFile(path.Join(dir, "index", "blobs", index1dgst.Algorithm().String(), index1dgst.Encoded()),
[]byte("deadbeef"), local.DefaultFilePerms)
[]byte("deadbeef"), storageConstants.DefaultFilePerms)
So(err, ShouldBeNil)
resp, err = resty.R().Delete(baseURL + fmt.Sprintf("/v2/index/manifests/%s", index1dgst))
So(err, ShouldBeNil)
@@ -5906,7 +5907,7 @@ func TestInjectInterruptedImageManifest(t *testing.T) {
// Testing router path: @Router /v2/{name}/manifests/{reference} [put]
Convey("Uploading an image manifest blob (when injected simulates an interrupted image manifest upload)", func() {
injected := test.InjectFailure(0)
injected := inject.InjectFailure(0)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
@@ -5967,7 +5968,7 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
So(digest, ShouldNotBeNil)
// monolithic blob upload
injected := test.InjectFailure(0)
injected := inject.InjectFailure(0)
if injected {
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, loc, bytes.NewReader(content))
tokens := strings.Split(loc, "/")
@@ -6040,7 +6041,7 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
// Testing router path: @Router /v2/{name}/manifests/{reference} [put]
//nolint:lll // gofumpt conflicts with lll
Convey("Uploading an image manifest blob (when injected simulates that PutImageManifest failed due to 'too many open files' error)", func() {
injected := test.InjectFailure(1)
injected := inject.InjectFailure(1)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
@@ -6060,7 +6061,7 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
}
})
Convey("when injected simulates a `too many open files` error inside PutImageManifest method of img store", func() {
injected := test.InjectFailure(2)
injected := inject.InjectFailure(2)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
@@ -6081,7 +6082,7 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
}
})
Convey("code coverage: error inside PutImageManifest method of img store (unable to marshal JSON)", func() {
injected := test.InjectFailure(1)
injected := inject.InjectFailure(1)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
@@ -6102,7 +6103,7 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
}
})
Convey("code coverage: error inside PutImageManifest method of img store (umoci.OpenLayout error)", func() {
injected := test.InjectFailure(3)
injected := inject.InjectFailure(3)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
@@ -6123,7 +6124,7 @@ func TestInjectTooManyOpenFiles(t *testing.T) {
}
})
Convey("code coverage: error inside PutImageManifest method of img store (oci.GC)", func() {
injected := test.InjectFailure(4)
injected := inject.InjectFailure(4)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPut, baseURL, bytes.NewReader(content))
request = mux.SetURLVars(request, map[string]string{"name": "repotest", "reference": "1.0"})
+9 -8
View File
@@ -39,8 +39,9 @@ import (
"zotregistry.io/zot/pkg/meta"
zreg "zotregistry.io/zot/pkg/regexp"
localCtx "zotregistry.io/zot/pkg/requestcontext"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test" //nolint:goimports
storageCommon "zotregistry.io/zot/pkg/storage/common"
storageTypes "zotregistry.io/zot/pkg/storage/types"
"zotregistry.io/zot/pkg/test/inject"
)
type RouteHandler struct {
@@ -488,7 +489,7 @@ type ImageIndex struct {
}
func getReferrers(ctx context.Context, routeHandler *RouteHandler,
imgStore storage.ImageStore, name string, digest godigest.Digest,
imgStore storageTypes.ImageStore, name string, digest godigest.Digest,
artifactTypes []string,
) (ispec.Index, error) {
references, err := imgStore.GetReferrers(name, digest, artifactTypes)
@@ -621,7 +622,7 @@ func (rh *RouteHandler) UpdateManifest(response http.ResponseWriter, request *ht
}
mediaType := request.Header.Get("Content-Type")
if !storage.IsSupportedMediaType(mediaType) {
if !storageCommon.IsSupportedMediaType(mediaType) {
// response.WriteHeader(http.StatusUnsupportedMediaType)
WriteJSON(response, http.StatusUnsupportedMediaType,
NewErrorList(NewError(MANIFEST_INVALID, map[string]string{"mediaType": mediaType})))
@@ -632,7 +633,7 @@ func (rh *RouteHandler) UpdateManifest(response http.ResponseWriter, request *ht
body, err := io.ReadAll(request.Body)
// hard to reach test case, injected error (simulates an interrupted image manifest upload)
// err could be io.ErrUnexpectedEOF
if err := test.Error(err); err != nil {
if err := inject.Error(err); err != nil {
rh.c.Log.Error().Err(err).Msg("unexpected error")
response.WriteHeader(http.StatusInternalServerError)
@@ -1716,12 +1717,12 @@ func WriteDataFromReader(response http.ResponseWriter, status int, length int64,
}
// will return image storage corresponding to subpath provided in config.
func (rh *RouteHandler) getImageStore(name string) storage.ImageStore {
func (rh *RouteHandler) getImageStore(name string) storageTypes.ImageStore {
return rh.c.StoreController.GetImageStore(name)
}
// will sync on demand if an image is not found, in case sync extensions is enabled.
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storage.ImageStore,
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storageTypes.ImageStore,
name, reference string,
) ([]byte, godigest.Digest, string, error) {
syncEnabled := false
@@ -1757,7 +1758,7 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore
// will sync referrers on demand if they are not found, in case sync extensions is enabled.
func getOrasReferrers(ctx context.Context, routeHandler *RouteHandler,
imgStore storage.ImageStore, name string, digest godigest.Digest,
imgStore storageTypes.ImageStore, name string, digest godigest.Digest,
artifactType string,
) ([]artifactspec.Descriptor, error) {
refs, err := imgStore.GetOrasReferrers(name, digest, artifactType)
+4 -4
View File
@@ -22,7 +22,7 @@ import (
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/api/constants"
localCtx "zotregistry.io/zot/pkg/requestcontext"
"zotregistry.io/zot/pkg/storage"
storageTypes "zotregistry.io/zot/pkg/storage/types"
"zotregistry.io/zot/pkg/test"
"zotregistry.io/zot/pkg/test/mocks"
)
@@ -1200,7 +1200,7 @@ func TestRoutes(t *testing.T) {
ism *mocks.MockedImageStore,
) int {
ctlr.StoreController.DefaultStore = ism
ctlr.StoreController.SubStore = map[string]storage.ImageStore{
ctlr.StoreController.SubStore = map[string]storageTypes.ImageStore{
"test": &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{}, ErrUnexpectedError
@@ -1238,7 +1238,7 @@ func TestRoutes(t *testing.T) {
ism *mocks.MockedImageStore,
) int {
ctlr.StoreController.DefaultStore = ism
ctlr.StoreController.SubStore = map[string]storage.ImageStore{}
ctlr.StoreController.SubStore = map[string]storageTypes.ImageStore{}
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodPatch, baseURL, nil)
request = mux.SetURLVars(request, vars)
@@ -1300,7 +1300,7 @@ func TestRoutes(t *testing.T) {
return []string{"repo"}, nil
},
}
ctlr.StoreController.SubStore = map[string]storage.ImageStore{
ctlr.StoreController.SubStore = map[string]storageTypes.ImageStore{
"test1": &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo1"}, nil