mirror of
https://github.com/project-zot/zot.git
synced 2026-06-16 20:38:08 +08:00
feat(sync): sync can include self url in registry.URLs (#1562)
sync now ignores self referencing urls, this will help in clustering mode where we can have the same config for multiple zots closes #1335 Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
@@ -4,7 +4,13 @@
|
||||
package extensions
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
zerr "zotregistry.io/zot/errors"
|
||||
"zotregistry.io/zot/pkg/api/config"
|
||||
syncconf "zotregistry.io/zot/pkg/extensions/config/sync"
|
||||
"zotregistry.io/zot/pkg/extensions/sync"
|
||||
"zotregistry.io/zot/pkg/log"
|
||||
"zotregistry.io/zot/pkg/meta/repodb"
|
||||
@@ -19,6 +25,19 @@ func EnableSyncExtension(config *config.Config, repoDB repodb.RepoDB,
|
||||
onDemand := sync.NewOnDemand(log)
|
||||
|
||||
for _, registryConfig := range config.Extensions.Sync.Registries {
|
||||
registryConfig := registryConfig
|
||||
if len(registryConfig.URLs) > 1 {
|
||||
if err := removeSelfURLs(config, ®istryConfig, log); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if len(registryConfig.URLs) == 0 {
|
||||
log.Error().Err(zerr.ErrSyncNoURLsLeft).Msg("unable to start sync extension")
|
||||
|
||||
return nil, zerr.ErrSyncNoURLsLeft
|
||||
}
|
||||
|
||||
isPeriodical := len(registryConfig.Content) != 0 && registryConfig.PollInterval != 0
|
||||
isOnDemand := registryConfig.OnDemand
|
||||
|
||||
@@ -49,3 +68,106 @@ func EnableSyncExtension(config *config.Config, repoDB repodb.RepoDB,
|
||||
|
||||
return nil, nil //nolint: nilnil
|
||||
}
|
||||
|
||||
func getLocalIPs() ([]string, error) {
|
||||
var localIPs []string
|
||||
|
||||
ifaces, err := net.Interfaces()
|
||||
if err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
|
||||
for _, i := range ifaces {
|
||||
addrs, err := i.Addrs()
|
||||
if err != nil {
|
||||
return localIPs, err
|
||||
}
|
||||
|
||||
for _, addr := range addrs {
|
||||
if localIP, ok := addr.(*net.IPNet); ok {
|
||||
localIPs = append(localIPs, localIP.IP.String())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return localIPs, nil
|
||||
}
|
||||
|
||||
func getIPFromHostName(host string) ([]string, error) {
|
||||
addrs, err := net.LookupIP(host)
|
||||
if err != nil {
|
||||
return []string{}, err
|
||||
}
|
||||
|
||||
ips := make([]string, 0, len(addrs))
|
||||
|
||||
for _, ip := range addrs {
|
||||
ips = append(ips, ip.String())
|
||||
}
|
||||
|
||||
return ips, nil
|
||||
}
|
||||
|
||||
func removeSelfURLs(config *config.Config, registryConfig *syncconf.RegistryConfig, log log.Logger) error {
|
||||
// get IP from config
|
||||
port := config.HTTP.Port
|
||||
selfAddress := net.JoinHostPort(config.HTTP.Address, port)
|
||||
|
||||
// get all local IPs from interfaces
|
||||
localIPs, err := getLocalIPs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for idx := len(registryConfig.URLs) - 1; idx >= 0; idx-- {
|
||||
registryURL := registryConfig.URLs[idx]
|
||||
|
||||
url, err := url.Parse(registryURL)
|
||||
if err != nil {
|
||||
log.Error().Str("url", registryURL).Msg("failed to parse sync registry url, removing it")
|
||||
|
||||
registryConfig.URLs = append(registryConfig.URLs[:idx], registryConfig.URLs[idx+1:]...)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// check self address
|
||||
if strings.Contains(registryURL, selfAddress) {
|
||||
log.Info().Str("url", registryURL).Msg("removing local registry url")
|
||||
|
||||
registryConfig.URLs = append(registryConfig.URLs[:idx], registryConfig.URLs[idx+1:]...)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
// check dns
|
||||
ips, err := getIPFromHostName(url.Hostname())
|
||||
if err != nil {
|
||||
// will not remove, maybe it will get resolved later after multiple retries
|
||||
log.Warn().Str("url", registryURL).Msg("failed to lookup sync registry url's hostname")
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
var removed bool
|
||||
|
||||
for _, localIP := range localIPs {
|
||||
// if ip resolved from hostname/dns is equal with any local ip
|
||||
for _, ip := range ips {
|
||||
if net.JoinHostPort(ip, url.Port()) == net.JoinHostPort(localIP, port) {
|
||||
registryConfig.URLs = append(registryConfig.URLs[:idx], registryConfig.URLs[idx+1:]...)
|
||||
|
||||
removed = true
|
||||
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if removed {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -125,7 +125,7 @@ func (service *BaseService) SetNextAvailableClient() error {
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
continue
|
||||
}
|
||||
|
||||
if !service.client.IsAvailable() {
|
||||
|
||||
@@ -802,6 +802,14 @@ func TestOnDemand(t *testing.T) {
|
||||
regex := ".*"
|
||||
semver := true
|
||||
|
||||
destPort := test.GetFreePort()
|
||||
destConfig := config.New()
|
||||
|
||||
destBaseURL := test.GetBaseURL(destPort)
|
||||
|
||||
hostname, err := os.Hostname()
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
syncRegistryConfig := syncconf.RegistryConfig{
|
||||
Content: []syncconf.Content{
|
||||
{
|
||||
@@ -812,7 +820,11 @@ func TestOnDemand(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
URLs: []string{srcBaseURL},
|
||||
// include self url, should be ignored
|
||||
URLs: []string{
|
||||
fmt.Sprintf("http://%s", hostname), destBaseURL,
|
||||
srcBaseURL, fmt.Sprintf("http://localhost:%s", destPort),
|
||||
},
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
OnDemand: true,
|
||||
@@ -824,11 +836,6 @@ func TestOnDemand(t *testing.T) {
|
||||
Registries: []syncconf.RegistryConfig{syncRegistryConfig},
|
||||
}
|
||||
|
||||
destPort := test.GetFreePort()
|
||||
destConfig := config.New()
|
||||
|
||||
destBaseURL := test.GetBaseURL(destPort)
|
||||
|
||||
destConfig.HTTP.Port = destPort
|
||||
|
||||
destDir := t.TempDir()
|
||||
@@ -3384,7 +3391,7 @@ func TestMultipleURLs(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
URLs: []string{"badURL", "http://invalid.invalid/invalid/", srcBaseURL},
|
||||
URLs: []string{"badURL", "@!#!$#@%", "http://invalid.invalid/invalid/", srcBaseURL},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
@@ -3438,6 +3445,49 @@ func TestMultipleURLs(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestNoURLsLeftInConfig(t *testing.T) {
|
||||
Convey("Verify sync feature", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
regex := ".*"
|
||||
semver := true
|
||||
var tlsVerify bool
|
||||
|
||||
syncRegistryConfig := syncconf.RegistryConfig{
|
||||
Content: []syncconf.Content{
|
||||
{
|
||||
Prefix: testImage,
|
||||
Tags: &syncconf.Tags{
|
||||
Regex: ®ex,
|
||||
Semver: &semver,
|
||||
},
|
||||
},
|
||||
},
|
||||
URLs: []string{"@!#!$#@%", "@!#!$#@%"},
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
}
|
||||
|
||||
defaultVal := true
|
||||
syncConfig := &syncconf.Config{
|
||||
Enable: &defaultVal,
|
||||
Registries: []syncconf.RegistryConfig{syncRegistryConfig},
|
||||
}
|
||||
|
||||
dctlr, destBaseURL, _, destClient := makeDownstreamServer(t, false, syncConfig)
|
||||
|
||||
dcm := test.NewControllerManager(dctlr)
|
||||
dcm.StartAndWait(dctlr.Config.HTTP.Port)
|
||||
defer dcm.StopServer()
|
||||
|
||||
resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/tags/list")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp, ShouldNotBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusNotFound)
|
||||
})
|
||||
}
|
||||
|
||||
func TestPeriodicallySignaturesErr(t *testing.T) {
|
||||
Convey("Verify sync periodically signatures errors", t, func() {
|
||||
updateDuration, _ := time.ParseDuration("30m")
|
||||
|
||||
Reference in New Issue
Block a user