* feat(sync): local tmp store

Signed-off-by: a <a@tuxpa.in>

* fix(sync): various fixes for s3+remote storage feature

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>

---------

Signed-off-by: a <a@tuxpa.in>
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
Co-authored-by: a <a@tuxpa.in>
This commit is contained in:
peusebiu
2023-11-28 22:08:15 +02:00
committed by GitHub
parent 0de2210686
commit 3c8da6e6fc
12 changed files with 934 additions and 61 deletions
+204
View File
@@ -1653,3 +1653,207 @@ func TestOverlappingSyncRetentionConfig(t *testing.T) {
So(string(data), ShouldContainSubstring, "overlapping sync content\":{\"Prefix\":\"prod/*")
})
}
func TestSyncWithRemoteStorageConfig(t *testing.T) {
oldArgs := os.Args
defer func() { os.Args = oldArgs }()
Convey("Test verify sync with remote storage works if sync.tmpdir is provided", t, func(c C) {
tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := `{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"regionendpoint": "localhost:4566",
"bucket": "zot-storage",
"secure": false,
"skipverify": false
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"downloadDir": "/tmp/sync",
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`
logPath, err := runCLIWithConfig(t.TempDir(), content)
So(err, ShouldBeNil)
data, err := os.ReadFile(logPath)
So(err, ShouldBeNil)
defer os.Remove(logPath) // clean up
So(string(data), ShouldNotContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
})
Convey("Test verify sync with remote storage panics if sync.tmpdir is not provided", t, func(c C) {
port := GetFreePort()
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := fmt.Sprintf(`{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver": {
"name": "s3",
"rootdirectory": "/zot",
"region": "us-east-2",
"regionendpoint": "localhost:4566",
"bucket": "zot-storage",
"secure": false,
"skipverify": false
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`, t.TempDir(), port, logFile.Name())
err = os.WriteFile(tmpfile.Name(), []byte(content), 0o0600)
So(err, ShouldBeNil)
os.Args = []string{"cli_test", "serve", tmpfile.Name()}
err = cli.NewServerRootCmd().Execute()
So(err, ShouldNotBeNil)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
So(string(data), ShouldContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
})
Convey("Test verify sync with remote storage on subpath panics if sync.tmpdir is not provided", t, func(c C) {
port := GetFreePort()
logFile, err := os.CreateTemp("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
tmpfile, err := os.CreateTemp("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(tmpfile.Name()) // clean up
content := fmt.Sprintf(`{
"distSpecVersion": "1.1.0-dev",
"storage": {
"rootDirectory": "%s",
"subPaths":{
"/a": {
"rootDirectory": "%s",
"dedupe": false,
"remoteCache": false,
"storageDriver":{
"name":"s3",
"rootdirectory":"/zot-a",
"region":"us-east-2",
"bucket":"zot-storage",
"secure":true,
"skipverify":true
}
}
}
},
"http": {
"address": "0.0.0.0",
"port": "%s"
},
"log": {
"level": "debug",
"output": "%s"
},
"extensions": {
"sync": {
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"content": [
{
"prefix": "**"
}
]
}
]
}
}
}`, t.TempDir(), t.TempDir(), port, logFile.Name())
err = os.WriteFile(tmpfile.Name(), []byte(content), 0o0600)
So(err, ShouldBeNil)
os.Args = []string{"cli_test", "serve", tmpfile.Name()}
err = cli.NewServerRootCmd().Execute()
So(err, ShouldNotBeNil)
data, err := os.ReadFile(logFile.Name())
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
So(string(data), ShouldContainSubstring,
"using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
})
}
+12 -3
View File
@@ -392,9 +392,10 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {
return zerr.ErrBadConfig
}
// enforce filesystem storage in case sync feature is enabled
if config.Extensions != nil && config.Extensions.Sync != nil {
log.Error().Err(zerr.ErrBadConfig).Msg("sync supports only filesystem storage")
// enforce tmpDir in case sync + s3
if config.Extensions != nil && config.Extensions.Sync != nil && config.Extensions.Sync.DownloadDir == "" {
log.Error().Err(zerr.ErrBadConfig).
Msg("using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
return zerr.ErrBadConfig
}
@@ -413,6 +414,14 @@ func validateConfiguration(config *config.Config, log zlog.Logger) error {
return zerr.ErrBadConfig
}
// enforce tmpDir in case sync + s3
if config.Extensions != nil && config.Extensions.Sync != nil && config.Extensions.Sync.DownloadDir == "" {
log.Error().Err(zerr.ErrBadConfig).
Msg("using both sync and remote storage features needs config.Extensions.Sync.DownloadDir to be specified")
return zerr.ErrBadConfig
}
}
}
}
+5 -1
View File
@@ -15,7 +15,11 @@ type Credentials struct {
type Config struct {
Enable *bool
CredentialsFile string
Registries []RegistryConfig
/* DownloadDir is needed only in case of using cloud based storages
it uses regclient to first copy images into this dir (as oci layout)
and then move them into storage. */
DownloadDir string
Registries []RegistryConfig
}
type RegistryConfig struct {
+19 -15
View File
@@ -41,23 +41,27 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
isOnDemand := registryConfig.OnDemand
if isPeriodical || isOnDemand {
service, err := sync.New(registryConfig, config.Extensions.Sync.CredentialsFile,
storeController, metaDB, log)
if err != nil {
return nil, err
}
if !(isPeriodical || isOnDemand) {
continue
}
if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}
tmpDir := config.Extensions.Sync.DownloadDir
credsPath := config.Extensions.Sync.CredentialsFile
if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
service, err := sync.New(registryConfig, credsPath, tmpDir, storeController, metaDB, log)
if err != nil {
return nil, err
}
if isPeriodical {
// add to task scheduler periodic sync
gen := sync.NewTaskGenerator(service, log)
sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority)
}
if isOnDemand {
// onDemand services used in routes.go
onDemand.Add(service)
}
}
@@ -29,25 +29,30 @@ import (
storageTypes "zotregistry.io/zot/pkg/storage/types"
)
type LocalRegistry struct {
type DestinationRegistry struct {
storeController storage.StoreController
tempStorage OciLayoutStorage
metaDB mTypes.MetaDB
log log.Logger
}
func NewLocalRegistry(storeController storage.StoreController, metaDB mTypes.MetaDB, log log.Logger) Local {
return &LocalRegistry{
func NewDestinationRegistry(
storeController storage.StoreController, // local store controller
tempStoreController storage.StoreController, // temp store controller
metaDB mTypes.MetaDB,
log log.Logger,
) Destination {
return &DestinationRegistry{
storeController: storeController,
tempStorage: NewOciLayoutStorage(tempStoreController),
metaDB: metaDB,
// first we sync from remote (using containers/image copy from docker:// to oci:) to a temp imageStore
// then we copy the image from tempStorage to zot's storage using ImageStore APIs
tempStorage: NewOciLayoutStorage(storeController),
log: log,
log: log,
}
}
func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
func (registry *DestinationRegistry) CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error) {
// check image already synced
imageStore := registry.storeController.GetImageStore(repo)
@@ -75,16 +80,16 @@ func (registry *LocalRegistry) CanSkipImage(repo, tag string, imageDigest digest
return true, nil
}
func (registry *LocalRegistry) GetContext() *types.SystemContext {
func (registry *DestinationRegistry) GetContext() *types.SystemContext {
return registry.tempStorage.GetContext()
}
func (registry *LocalRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
func (registry *DestinationRegistry) GetImageReference(repo, reference string) (types.ImageReference, error) {
return registry.tempStorage.GetImageReference(repo, reference)
}
// finalize a syncing image.
func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
func (registry *DestinationRegistry) CommitImage(imageReference types.ImageReference, repo, reference string) error {
imageStore := registry.storeController.GetImageStore(repo)
tempImageStore := getImageStoreFromImageReference(imageReference, repo, reference)
@@ -180,7 +185,7 @@ func (registry *LocalRegistry) CommitImage(imageReference types.ImageReference,
return nil
}
func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte, reference string,
func (registry *DestinationRegistry) copyManifest(repo string, manifestContent []byte, reference string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
@@ -239,7 +244,7 @@ func (registry *LocalRegistry) copyManifest(repo string, manifestContent []byte,
}
// Copy a blob from one image store to another image store.
func (registry *LocalRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
func (registry *DestinationRegistry) copyBlob(repo string, blobDigest digest.Digest, blobMediaType string,
tempImageStore storageTypes.ImageStore,
) error {
imageStore := registry.storeController.GetImageStore(repo)
@@ -279,9 +284,11 @@ func getImageStoreFromImageReference(imageReference types.ImageReference, repo,
tempRootDir = strings.ReplaceAll(imageReference.StringWithinTransport(), fmt.Sprintf("%s:", repo), "")
}
return getImageStore(tempRootDir)
}
func getImageStore(rootDir string) storageTypes.ImageStore {
metrics := monitoring.NewMetricsServer(false, log.Logger{})
tempImageStore := local.NewImageStore(tempRootDir, false, false, log.Logger{}, metrics, nil, nil)
return tempImageStore
return local.NewImageStore(rootDir, false, false, log.Logger{}, metrics, nil, nil)
}
+38 -18
View File
@@ -26,7 +26,7 @@ type BaseService struct {
config syncconf.RegistryConfig
credentials syncconf.CredentialsFile
remote Remote
local Local
destination Destination
retryOptions *retry.RetryOptions
contentManager ContentManager
storeController storage.StoreController
@@ -37,8 +37,13 @@ type BaseService struct {
log log.Logger
}
func New(opts syncconf.RegistryConfig, credentialsFilepath string,
storeController storage.StoreController, metadb mTypes.MetaDB, log log.Logger,
func New(
opts syncconf.RegistryConfig,
credentialsFilepath string,
tmpDir string,
storeController storage.StoreController,
metadb mTypes.MetaDB,
log log.Logger,
) (Service, error) {
service := &BaseService{}
@@ -60,7 +65,21 @@ func New(opts syncconf.RegistryConfig, credentialsFilepath string,
service.credentials = credentialsFile
service.contentManager = NewContentManager(opts.Content, log)
service.local = NewLocalRegistry(storeController, metadb, log)
if len(tmpDir) == 0 {
// first it will sync in tmpDir then it will move everything into local ImageStore
service.destination = NewDestinationRegistry(storeController, storeController, metadb, log)
} else {
// first it will sync under /rootDir/reponame/.sync/ then it will move everything into local ImageStore
service.destination = NewDestinationRegistry(
storeController,
storage.StoreController{
DefaultStore: getImageStore(tmpDir),
},
metadb,
log,
)
}
retryOptions := &retry.RetryOptions{}
@@ -127,7 +146,7 @@ func (service *BaseService) SetNextAvailableClient() error {
if err != nil {
service.log.Error().Err(err).Str("url", url).Msg("sync: failed to initialize http client")
continue
return err
}
if !service.client.Ping() {
@@ -289,7 +308,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Msgf("sync: syncing tags %v", tags)
// apply content.destination rule
localRepo := service.contentManager.GetRepoDestination(repo)
destinationRepo := service.contentManager.GetRepoDestination(repo)
for _, tag := range tags {
if common.IsContextDone(ctx) {
@@ -303,7 +322,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
var manifestDigest digest.Digest
if err = retry.RetryIfNecessary(ctx, func() error {
manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)
manifestDigest, err = service.syncTag(ctx, destinationRepo, repo, tag)
return err
}, service.retryOptions); err != nil {
@@ -320,7 +339,7 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
if manifestDigest != "" {
if err = retry.RetryIfNecessary(ctx, func() error {
err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String())
err = service.references.SyncAll(ctx, destinationRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
}
@@ -340,8 +359,9 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
return nil
}
func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())
func (service *BaseService) syncTag(ctx context.Context, destinationRepo, remoteRepo, tag string,
) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.destination.GetContext())
policyContext, err := getPolicyContext(service.log)
if err != nil {
@@ -384,38 +404,38 @@ func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo,
}
}
skipImage, err := service.local.CanSkipImage(localRepo, tag, manifestDigest)
skipImage, err := service.destination.CanSkipImage(destinationRepo, tag, manifestDigest)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).
Str("repo", destinationRepo).Str("reference", tag).
Msg("couldn't check if the local image can be skipped")
}
if !skipImage {
localImageRef, err := service.local.GetImageReference(localRepo, tag)
localImageRef, err := service.destination.GetImageReference(destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).Msg("couldn't get a local image reference")
Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't get a local image reference")
return "", err
}
service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")
Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("syncing image")
_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, &copyOptions)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("coulnd't sync image")
Str("local image", fmt.Sprintf("%s:%s", destinationRepo, tag)).Msg("coulnd't sync image")
return "", err
}
err = service.local.CommitImage(localImageRef, localRepo, tag)
err = service.destination.CommitImage(localImageRef, destinationRepo, tag)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("repo", localRepo).Str("reference", tag).Msg("couldn't commit image to local image store")
Str("repo", destinationRepo).Str("reference", tag).Msg("couldn't commit image to local image store")
return "", err
}
+1 -1
View File
@@ -65,7 +65,7 @@ type Remote interface {
}
// Local registry.
type Local interface {
type Destination interface {
Registry
// Check if an image is already synced
CanSkipImage(repo, tag string, imageDigest digest.Digest) (bool, error)
+10 -6
View File
@@ -162,7 +162,7 @@ func TestService(t *testing.T) {
URLs: []string{"http://localhost"},
}
service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
service, err := New(conf, "", os.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)
err = service.SyncRepo(context.Background(), "repo")
@@ -170,7 +170,7 @@ func TestService(t *testing.T) {
})
}
func TestLocalRegistry(t *testing.T) {
func TestDestinationRegistry(t *testing.T) {
Convey("make StoreController", t, func() {
dir := t.TempDir()
@@ -185,7 +185,8 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, nil, cacheDriver)
repoName := "repo"
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, nil, log)
imageReference, err := registry.GetImageReference(repoName, "1.0")
So(err, ShouldBeNil)
So(imageReference, ShouldNotBeNil)
@@ -302,7 +303,8 @@ func TestLocalRegistry(t *testing.T) {
syncImgStore := local.NewImageStore(dir, true, true, log, metrics, linter, cacheDriver)
repoName := "repo"
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, nil, log)
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, nil, log)
err = registry.CommitImage(imageReference, repoName, "1.0")
So(err, ShouldBeNil)
@@ -336,7 +338,8 @@ func TestLocalRegistry(t *testing.T) {
})
Convey("trigger metaDB error on index manifest in CommitImage()", func() {
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo string, reference string, imageMeta mTypes.ImageMeta) error {
if reference == "1.0" {
return zerr.ErrRepoMetaNotFound
@@ -351,7 +354,8 @@ func TestLocalRegistry(t *testing.T) {
})
Convey("trigger metaDB error on image manifest in CommitImage()", func() {
registry := NewLocalRegistry(storage.StoreController{DefaultStore: syncImgStore}, mocks.MetaDBMock{
storeController := storage.StoreController{DefaultStore: syncImgStore}
registry := NewDestinationRegistry(storeController, storeController, mocks.MetaDBMock{
SetRepoReferenceFn: func(ctx context.Context, repo, reference string, imageMeta mTypes.ImageMeta) error {
return zerr.ErrRepoMetaNotFound
},