sync: support reloading sync config when the config file changes

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
Petu Eusebiu
2022-02-10 16:17:49 +02:00
committed by Ramkumar Chinchani
parent 7e8cc3c71c
commit 6d04ab3cdc
11 changed files with 728 additions and 99 deletions
+30 -19
View File
@@ -184,8 +184,8 @@ func filterImagesBySemver(upstreamReferences *[]types.ImageReference, content Co
}
// imagesToCopyFromRepos lists all images given a registry name and its repos.
func imagesToCopyFromUpstream(registryName string, repos []string, upstreamCtx *types.SystemContext,
content Content, log log.Logger) ([]types.ImageReference, error) {
func imagesToCopyFromUpstream(ctx context.Context, registryName string, repos []string,
upstreamCtx *types.SystemContext, content Content, log log.Logger) ([]types.ImageReference, error) {
var upstreamReferences []types.ImageReference
for _, repoName := range repos {
@@ -196,7 +196,7 @@ func imagesToCopyFromUpstream(registryName string, repos []string, upstreamCtx *
return nil, err
}
tags, err := getImageTags(context.Background(), upstreamCtx, repoRef)
tags, err := getImageTags(ctx, upstreamCtx, repoRef)
if err != nil {
log.Error().Err(err).Msgf("couldn't fetch tags for %s", repoRef)
@@ -279,8 +279,9 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types.
return upstreamCtx
}
func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController storage.StoreController,
localCtx *types.SystemContext, policyCtx *signature.PolicyContext, credentials Credentials, log log.Logger) error {
func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string,
storeController storage.StoreController, localCtx *types.SystemContext,
policyCtx *signature.PolicyContext, credentials Credentials, log log.Logger) error {
log.Info().Msgf("syncing registry: %s", upstreamURL)
var err error
@@ -306,7 +307,7 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
return err
}
if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
catalog, err = getUpstreamCatalog(httpClient, upstreamURL, log)
return err
@@ -330,8 +331,8 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
r := repos
id := contentID
if err = retry.RetryIfNecessary(context.Background(), func() error {
refs, err := imagesToCopyFromUpstream(upstreamAddr, r, upstreamCtx, regCfg.Content[id], log)
if err = retry.RetryIfNecessary(ctx, func() error {
refs, err := imagesToCopyFromUpstream(ctx, upstreamAddr, r, upstreamCtx, regCfg.Content[id], log)
images = append(images, refs...)
return err
@@ -356,7 +357,7 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
imageStore := storeController.GetImageStore(repo)
canBeSkipped, err := canSkipImage(repo, tag, upstreamImageRef, imageStore, upstreamCtx, log)
canBeSkipped, err := canSkipImage(ctx, repo, tag, upstreamImageRef, imageStore, upstreamCtx, log)
if err != nil {
log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped",
upstreamImageRef.DockerReference())
@@ -378,8 +379,8 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
if err = retry.RetryIfNecessary(context.Background(), func() error {
_, err = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options)
if err = retry.RetryIfNecessary(ctx, func() error {
_, err = copy.Image(ctx, policyCtx, localImageRef, upstreamImageRef, &options)
return err
}, retryOptions); err != nil {
@@ -397,7 +398,7 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto
return err
}
if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
err = syncSignatures(httpClient, storeController, upstreamURL, repo, tag, log)
return err
@@ -435,7 +436,8 @@ func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyCo
return localCtx, policyContext, nil
}
func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.WaitGroup, logger log.Logger) error {
func Run(ctx context.Context, cfg Config, storeController storage.StoreController,
wtgrp *goSync.WaitGroup, logger log.Logger) error {
var credentialsFile CredentialsFile
var err error
@@ -476,19 +478,18 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait
tlogger := log.Logger{Logger: logger.With().Caller().Timestamp().Logger()}
// schedule each registry sync
go func(regCfg RegistryConfig, logger log.Logger) {
// run on intervals
for ; true; <-ticker.C {
go func(ctx context.Context, regCfg RegistryConfig, logger log.Logger) {
for {
// increment reference since will be busy, so shutdown has to wait
wtgrp.Add(1)
for _, upstreamURL := range regCfg.URLs {
upstreamAddr := StripRegistryTransport(upstreamURL)
// first try syncing main registry
if err := syncRegistry(regCfg, upstreamURL, storeController, localCtx, policyCtx,
if err := syncRegistry(ctx, regCfg, upstreamURL, storeController, localCtx, policyCtx,
credentialsFile[upstreamAddr], logger); err != nil {
logger.Error().Err(err).Str("registry", upstreamURL).
Msg("sync exited with error, falling back to auxiliary registries")
Msg("sync exited with error, falling back to auxiliary registries if any")
} else {
// if success fall back to main registry
break
@@ -496,8 +497,18 @@ func Run(cfg Config, storeController storage.StoreController, wtgrp *goSync.Wait
}
// mark as done after a single sync run
wtgrp.Done()
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
// run on intervals
continue
}
}
}(regCfg, tlogger)
}(ctx, regCfg, tlogger)
}
logger.Info().Msg("finished setting up sync")
+15 -6
View File
@@ -139,9 +139,15 @@ func TestSyncInternal(t *testing.T) {
CertDir: "",
}
cfg := Config{Registries: []RegistryConfig{syncRegistryConfig}, CredentialsFile: "/invalid/path/to/file"}
defaultValue := true
cfg := Config{
Registries: []RegistryConfig{syncRegistryConfig},
Enable: &defaultValue,
CredentialsFile: "/invalid/path/to/file",
}
ctx := context.Background()
So(Run(cfg, storage.StoreController{}, new(goSync.WaitGroup), log.NewLogger("debug", "")), ShouldNotBeNil)
So(Run(ctx, cfg, storage.StoreController{}, new(goSync.WaitGroup), log.NewLogger("debug", "")), ShouldNotBeNil)
_, err = getFileCredentials("/invalid/path/to/file")
So(err, ShouldNotBeNil)
@@ -248,10 +254,11 @@ func TestSyncInternal(t *testing.T) {
repos := []string{"repo1"}
upstreamCtx := &types.SystemContext{}
_, err := imagesToCopyFromUpstream("localhost:4566", repos, upstreamCtx, Content{}, log.NewLogger("debug", ""))
_, err := imagesToCopyFromUpstream(context.Background(), "localhost:4566", repos, upstreamCtx,
Content{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
_, err = imagesToCopyFromUpstream("docker://localhost:4566", repos, upstreamCtx,
_, err = imagesToCopyFromUpstream(context.Background(), "docker://localhost:4566", repos, upstreamCtx,
Content{}, log.NewLogger("debug", ""))
So(err, ShouldNotBeNil)
})
@@ -302,7 +309,8 @@ func TestSyncInternal(t *testing.T) {
So(err, ShouldBeNil)
So(taggedRef, ShouldNotBeNil)
canBeSkipped, err := canSkipImage(testImage, testImageTag, upstreamRef, imageStore, &types.SystemContext{}, log)
canBeSkipped, err := canSkipImage(context.Background(), testImage, testImageTag, upstreamRef,
imageStore, &types.SystemContext{}, log)
So(err, ShouldNotBeNil)
So(canBeSkipped, ShouldBeFalse)
@@ -311,7 +319,8 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
canBeSkipped, err = canSkipImage(testImage, testImageTag, upstreamRef, imageStore, &types.SystemContext{}, log)
canBeSkipped, err = canSkipImage(context.Background(), testImage, testImageTag, upstreamRef,
imageStore, &types.SystemContext{}, log)
So(err, ShouldNotBeNil)
So(canBeSkipped, ShouldBeFalse)
})
+122 -1
View File
@@ -35,6 +35,7 @@ import (
"gopkg.in/resty.v1"
"zotregistry.io/zot/pkg/api"
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/cli"
extconf "zotregistry.io/zot/pkg/extensions/config"
"zotregistry.io/zot/pkg/extensions/sync"
"zotregistry.io/zot/pkg/storage"
@@ -642,6 +643,126 @@ func TestOnDemandPermsDenied(t *testing.T) {
})
}
func TestConfigReloader(t *testing.T) {
Convey("Verify periodically sync config reloader works", t, func() {
duration, _ := time.ParseDuration("3s")
sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(t, false, false)
defer os.RemoveAll(srcDir)
defer func() {
sctlr.Shutdown()
}()
var tlsVerify bool
syncRegistryConfig := sync.RegistryConfig{
Content: []sync.Content{
{
Prefix: testImage,
},
},
URLs: []string{srcBaseURL},
PollInterval: duration,
TLSVerify: &tlsVerify,
CertDir: "",
OnDemand: true,
}
defaultVal := true
syncConfig := &sync.Config{
Enable: &defaultVal,
Registries: []sync.RegistryConfig{syncRegistryConfig},
}
destPort := test.GetFreePort()
destConfig := config.New()
destBaseURL := test.GetBaseURL(destPort)
destConfig.HTTP.Port = destPort
destDir, err := ioutil.TempDir("", "oci-dest-repo-test")
if err != nil {
panic(err)
}
defer os.RemoveAll(destDir)
destConfig.Storage.RootDirectory = destDir
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
destConfig.Extensions.Sync = syncConfig
logFile, err := ioutil.TempFile("", "zot-log*.txt")
So(err, ShouldBeNil)
defer os.Remove(logFile.Name()) // clean up
destConfig.Log.Output = logFile.Name()
dctlr := api.NewController(destConfig)
defer func() {
dctlr.Shutdown()
}()
go func() {
// this blocks
if err := dctlr.Run(); err != nil {
return
}
}()
// wait till ready
for {
_, err := resty.R().Get(destBaseURL)
if err == nil {
break
}
time.Sleep(100 * time.Millisecond)
}
content := fmt.Sprintf(`{"distSpecVersion": "0.1.0-dev", "storage": {"rootDirectory": "%s"},
"http": {"address": "127.0.0.1", "port": "%s", "ReadOnly": false},
"log": {"level": "debug", "output": "%s"}}`, destDir, destPort, logFile.Name())
cfgfile, err := ioutil.TempFile("", "zot-test*.json")
So(err, ShouldBeNil)
defer os.Remove(cfgfile.Name()) // clean up
_, err = cfgfile.Write([]byte(content))
So(err, ShouldBeNil)
hotReloader, err := cli.NewHotReloader(dctlr, cfgfile.Name())
So(err, ShouldBeNil)
hotReloader.Start()
// let it sync
time.Sleep(3 * time.Second)
// modify config
_, err = cfgfile.WriteString(" ")
So(err, ShouldBeNil)
err = cfgfile.Close()
So(err, ShouldBeNil)
time.Sleep(2 * time.Second)
data, err := os.ReadFile(logFile.Name())
t.Logf("downstream log: %s", string(data))
So(err, ShouldBeNil)
So(string(data), ShouldContainSubstring, "reloaded params")
So(string(data), ShouldContainSubstring, "new configuration settings")
So(string(data), ShouldContainSubstring, "\"Sync\":null")
So(string(data), ShouldNotContainSubstring, "sync:")
})
}
func TestBadTLS(t *testing.T) {
Convey("Verify sync TLS feature", t, func() {
updateDuration, _ := time.ParseDuration("30m")
@@ -2501,7 +2622,7 @@ func TestOnDemandMultipleRetries(t *testing.T) {
done := make(chan bool)
go func() {
/* watch .sync local cache, make sure just one .sync/subdir is populated with image
the lock from ondemand should prevent spawning multiple go routines for the same image*/
the channel from ondemand should prevent spawning multiple go routines for the same image*/
for {
time.Sleep(250 * time.Millisecond)
select {
+2 -2
View File
@@ -574,7 +574,7 @@ func getLocalImageRef(imageStore storage.ImageStore, repo, tag string) (types.Im
}
// canSkipImage returns whether or not the image can be skipped from syncing.
func canSkipImage(repo, tag string, upstreamRef types.ImageReference,
func canSkipImage(ctx context.Context, repo, tag string, upstreamRef types.ImageReference,
imageStore storage.ImageStore, upstreamCtx *types.SystemContext, log log.Logger) (bool, error) {
// filter already pulled images
_, localImageDigest, _, err := imageStore.GetImageManifest(repo, tag)
@@ -588,7 +588,7 @@ func canSkipImage(repo, tag string, upstreamRef types.ImageReference,
return false, err
}
upstreamImageDigest, err := docker.GetDigest(context.Background(), upstreamCtx, upstreamRef)
upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamRef)
if err != nil {
log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamRef.DockerReference())