mirror of
https://github.com/project-zot/zot.git
synced 2026-06-16 04:17:55 +08:00
Added new extension "sync"
Periodically poll registries and pull images according to sync's config Added sync on demand, syncing when clients asks for an image which zot doesn't have. Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
committed by
Ramkumar Chinchani
parent
1027f872ec
commit
19003e8a71
@@ -0,0 +1,60 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/anuvu/zot/pkg/log"
|
||||
)
|
||||
|
||||
type PostHandler struct {
|
||||
Address string
|
||||
Port string
|
||||
ServerCert string
|
||||
ServerKey string
|
||||
CACert string
|
||||
Cfg Config
|
||||
Log log.Logger
|
||||
}
|
||||
|
||||
func (h *PostHandler) Handler(w http.ResponseWriter, r *http.Request) {
|
||||
upstreamCtx, policyCtx, err := getLocalContexts(h.ServerCert, h.ServerKey, h.CACert, h.Log)
|
||||
if err != nil {
|
||||
WriteData(w, http.StatusInternalServerError, err.Error())
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer policyCtx.Destroy() //nolint: errcheck
|
||||
|
||||
var credentialsFile CredentialsFile
|
||||
|
||||
if h.Cfg.CredentialsFile != "" {
|
||||
credentialsFile, err = getFileCredentials(h.Cfg.CredentialsFile)
|
||||
if err != nil {
|
||||
h.Log.Error().Err(err).Msgf("couldn't get registry credentials from %s", h.Cfg.CredentialsFile)
|
||||
WriteData(w, http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
localRegistryName := strings.Replace(fmt.Sprintf("%s:%s", h.Address, h.Port), "0.0.0.0", "127.0.0.1", 1)
|
||||
|
||||
for _, regCfg := range h.Cfg.Registries {
|
||||
upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
|
||||
if err := syncRegistry(regCfg, h.Log, localRegistryName, upstreamCtx, policyCtx,
|
||||
credentialsFile[upstreamRegistryName]); err != nil {
|
||||
h.Log.Err(err).Msg("error while syncing")
|
||||
WriteData(w, http.StatusInternalServerError, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
WriteData(w, http.StatusOK, "")
|
||||
}
|
||||
|
||||
func WriteData(w http.ResponseWriter, status int, msg string) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(status)
|
||||
_, _ = w.Write([]byte(msg))
|
||||
}
|
||||
@@ -0,0 +1,90 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/anuvu/zot/pkg/log"
|
||||
"github.com/containers/common/pkg/retry"
|
||||
"github.com/containers/image/v5/copy"
|
||||
"github.com/containers/image/v5/docker"
|
||||
"github.com/containers/image/v5/docker/reference"
|
||||
)
|
||||
|
||||
func OneImage(cfg Config, log log.Logger,
|
||||
address, port, serverCert, serverKey, caCert, repoName, tag string) (bool, error) {
|
||||
localCtx, policyCtx, err := getLocalContexts(serverCert, serverKey, caCert, log)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
localRegistryName := strings.Replace(fmt.Sprintf("%s:%s", address, port), "0.0.0.0", "127.0.0.1", 1)
|
||||
|
||||
var credentialsFile CredentialsFile
|
||||
|
||||
if cfg.CredentialsFile != "" {
|
||||
credentialsFile, err = getFileCredentials(cfg.CredentialsFile)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
var synced bool
|
||||
|
||||
for _, regCfg := range cfg.Registries {
|
||||
if !regCfg.OnDemand {
|
||||
log.Info().Msgf("skipping syncing on demand from %s, onDemand flag is false", regCfg.URL)
|
||||
continue
|
||||
}
|
||||
|
||||
registryConfig := regCfg
|
||||
log.Info().Msgf("syncing on demand with %s", registryConfig.URL)
|
||||
|
||||
upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
|
||||
upstreamCtx := getUpstreamContext(®istryConfig, credentialsFile[upstreamRegistryName])
|
||||
|
||||
upstreamRepoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", upstreamRegistryName, repoName))
|
||||
|
||||
upstreamTaggedRef, err := reference.WithTag(upstreamRepoRef, tag)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("error creating a reference for repository %s and tag %q", upstreamRepoRef.Name(), tag)
|
||||
return synced, err
|
||||
}
|
||||
|
||||
upstreamRef, err := docker.NewReference(upstreamTaggedRef)
|
||||
ref := strings.Replace(upstreamRef.DockerReference().String(), upstreamRegistryName, "", 1)
|
||||
|
||||
localRef, err := docker.Transport.ParseReference(
|
||||
fmt.Sprintf("//%s%s", localRegistryName, ref),
|
||||
)
|
||||
if err != nil {
|
||||
return synced, err
|
||||
}
|
||||
|
||||
log.Info().Msgf("copying image %s to %s", upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
|
||||
|
||||
options := getCopyOptions(upstreamCtx, localCtx)
|
||||
|
||||
retryOptions := &retry.RetryOptions{
|
||||
MaxRetry: maxRetries,
|
||||
}
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
_, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msgf("error while copying image %s to %s",
|
||||
upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
|
||||
} else {
|
||||
log.Info().Msgf("successfully synced %s", upstreamRef.DockerReference().Name())
|
||||
synced = true
|
||||
|
||||
return synced, nil
|
||||
}
|
||||
}
|
||||
|
||||
return synced, nil
|
||||
}
|
||||
@@ -0,0 +1,469 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"regexp"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/semver"
|
||||
"github.com/anuvu/zot/errors"
|
||||
"github.com/anuvu/zot/pkg/log"
|
||||
"github.com/containers/common/pkg/retry"
|
||||
"github.com/containers/image/v5/copy"
|
||||
"github.com/containers/image/v5/docker"
|
||||
"github.com/containers/image/v5/docker/reference"
|
||||
"github.com/containers/image/v5/signature"
|
||||
"github.com/containers/image/v5/types"
|
||||
ispec "github.com/opencontainers/image-spec/specs-go/v1"
|
||||
"gopkg.in/resty.v1"
|
||||
)
|
||||
|
||||
const (
|
||||
maxRetries = 3
|
||||
delay = 5 * time.Minute
|
||||
)
|
||||
|
||||
// /v2/_catalog struct.
|
||||
type catalog struct {
|
||||
Repositories []string `json:"repositories"`
|
||||
}
|
||||
|
||||
// key is registry address.
|
||||
type CredentialsFile map[string]Credentials
|
||||
|
||||
type Credentials struct {
|
||||
Username string
|
||||
Password string
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
CredentialsFile string
|
||||
Registries []RegistryConfig
|
||||
}
|
||||
|
||||
type RegistryConfig struct {
|
||||
URL string
|
||||
PollInterval time.Duration
|
||||
Content []Content
|
||||
TLSVerify *bool
|
||||
OnDemand bool
|
||||
CertDir string
|
||||
}
|
||||
|
||||
type Content struct {
|
||||
Prefix string
|
||||
Tags *Tags
|
||||
}
|
||||
|
||||
type Tags struct {
|
||||
Regex *string
|
||||
Semver *bool
|
||||
}
|
||||
|
||||
// getUpstreamCatalog gets all repos from a registry.
|
||||
func getUpstreamCatalog(regCfg *RegistryConfig, credentials Credentials, log log.Logger) (catalog, error) {
|
||||
var c catalog
|
||||
|
||||
registryCatalogURL := fmt.Sprintf("%s%s", regCfg.URL, "/v2/_catalog")
|
||||
client := resty.New()
|
||||
|
||||
if regCfg.CertDir != "" {
|
||||
log.Debug().Msgf("sync: using certs directory: %s", regCfg.CertDir)
|
||||
clientCert := fmt.Sprintf("%s/client.cert", regCfg.CertDir)
|
||||
clientKey := fmt.Sprintf("%s/client.key", regCfg.CertDir)
|
||||
caCertPath := fmt.Sprintf("%s/ca.crt", regCfg.CertDir)
|
||||
|
||||
caCert, err := ioutil.ReadFile(caCertPath)
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
|
||||
caCertPool := x509.NewCertPool()
|
||||
caCertPool.AppendCertsFromPEM(caCert)
|
||||
|
||||
client.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool})
|
||||
|
||||
cert, err := tls.LoadX509KeyPair(clientCert, clientKey)
|
||||
if err != nil {
|
||||
return c, err
|
||||
}
|
||||
|
||||
client.SetCertificates(cert)
|
||||
}
|
||||
|
||||
if credentials.Username != "" && credentials.Password != "" {
|
||||
log.Debug().Msgf("sync: using basic auth")
|
||||
client.SetBasicAuth(credentials.Username, credentials.Password)
|
||||
}
|
||||
|
||||
resp, err := client.R().SetHeader("Content-Type", "application/json").Get(registryCatalogURL)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("couldn't query %s", registryCatalogURL)
|
||||
return c, err
|
||||
}
|
||||
|
||||
if resp.IsError() {
|
||||
log.Error().Msgf("couldn't query %s, status code: %d, body: %s", registryCatalogURL,
|
||||
resp.StatusCode(), resp.Body())
|
||||
return c, errors.ErrSyncMissingCatalog
|
||||
}
|
||||
|
||||
err = json.Unmarshal(resp.Body(), &c)
|
||||
if err != nil {
|
||||
log.Err(err).Str("body", string(resp.Body())).Msg("couldn't unmarshal registry's catalog")
|
||||
return c, err
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// getImageTags lists all tags in a repository.
|
||||
// It returns a string slice of tags and any error encountered.
|
||||
func getImageTags(ctx context.Context, sysCtx *types.SystemContext, repoRef reference.Named) ([]string, error) {
|
||||
dockerRef, err := docker.NewReference(reference.TagNameOnly(repoRef))
|
||||
if err != nil {
|
||||
return nil, err // Should never happen for a reference with tag and no digest
|
||||
}
|
||||
|
||||
tags, err := docker.GetRepositoryTags(ctx, sysCtx, dockerRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
// filterImagesByTagRegex filters images by tag regex give in the config.
|
||||
func filterImagesByTagRegex(upstreamReferences *[]types.ImageReference, content Content, log log.Logger) error {
|
||||
refs := *upstreamReferences
|
||||
|
||||
if content.Tags == nil {
|
||||
// no need to filter anything
|
||||
return nil
|
||||
}
|
||||
|
||||
if content.Tags.Regex != nil {
|
||||
log.Info().Msgf("start filtering using the regular expression: %s", *content.Tags.Regex)
|
||||
|
||||
tagReg, err := regexp.Compile(*content.Tags.Regex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
n := 0
|
||||
|
||||
for _, ref := range refs {
|
||||
tagged := getTagFromRef(ref, log)
|
||||
if tagged != nil {
|
||||
if tagReg.MatchString(tagged.Tag()) {
|
||||
refs[n] = ref
|
||||
n++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
refs = refs[:n]
|
||||
}
|
||||
|
||||
*upstreamReferences = refs
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// filterImagesBySemver filters images by checking if their tags are semver compliant.
|
||||
func filterImagesBySemver(upstreamReferences *[]types.ImageReference, content Content, log log.Logger) {
|
||||
refs := *upstreamReferences
|
||||
|
||||
if content.Tags == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if content.Tags.Semver != nil && *content.Tags.Semver {
|
||||
log.Info().Msg("start filtering using semver compliant rule")
|
||||
|
||||
n := 0
|
||||
|
||||
for _, ref := range refs {
|
||||
tagged := getTagFromRef(ref, log)
|
||||
if tagged != nil {
|
||||
_, ok := semver.NewVersion(tagged.Tag())
|
||||
if ok == nil {
|
||||
refs[n] = ref
|
||||
n++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
refs = refs[:n]
|
||||
}
|
||||
|
||||
*upstreamReferences = refs
|
||||
}
|
||||
|
||||
// imagesToCopyFromRepos lists all images given a registry name and its repos.
|
||||
func imagesToCopyFromUpstream(registryName string, repos []string, sourceCtx *types.SystemContext,
|
||||
content Content, log log.Logger) ([]types.ImageReference, error) {
|
||||
var upstreamReferences []types.ImageReference
|
||||
|
||||
for _, repoName := range repos {
|
||||
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryName, repoName))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tags, err := getImageTags(context.Background(), sourceCtx, repoRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, tag := range tags {
|
||||
taggedRef, err := reference.WithTag(repoRef, tag)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("error creating a reference for repository %s and tag %q", repoRef.Name(), tag)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ref, err := docker.NewReference(taggedRef)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("cannot obtain a valid image reference for transport %q and reference %s",
|
||||
docker.Transport.Name(), taggedRef.String())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
upstreamReferences = append(upstreamReferences, ref)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msgf("upstream refs to be copied: %v", upstreamReferences)
|
||||
|
||||
err := filterImagesByTagRegex(&upstreamReferences, content, log)
|
||||
if err != nil {
|
||||
return []types.ImageReference{}, err
|
||||
}
|
||||
|
||||
log.Debug().Msgf("remaining upstream refs to be copied: %v", upstreamReferences)
|
||||
filterImagesBySemver(&upstreamReferences, content, log)
|
||||
|
||||
log.Debug().Msgf("remaining upstream refs to be copied: %v", upstreamReferences)
|
||||
|
||||
return upstreamReferences, nil
|
||||
}
|
||||
|
||||
func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options {
|
||||
options := copy.Options{
|
||||
DestinationCtx: localCtx,
|
||||
SourceCtx: upstreamCtx,
|
||||
// force only oci manifest MIME type
|
||||
ForceManifestMIMEType: ispec.MediaTypeImageManifest,
|
||||
}
|
||||
|
||||
return options
|
||||
}
|
||||
|
||||
func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types.SystemContext {
|
||||
upstreamCtx := &types.SystemContext{}
|
||||
|
||||
upstreamCtx.DockerCertPath = regCfg.CertDir
|
||||
upstreamCtx.DockerDaemonCertPath = regCfg.CertDir
|
||||
|
||||
if regCfg.TLSVerify != nil && *regCfg.TLSVerify {
|
||||
upstreamCtx.DockerDaemonInsecureSkipTLSVerify = false
|
||||
upstreamCtx.DockerInsecureSkipTLSVerify = types.NewOptionalBool(false)
|
||||
} else {
|
||||
upstreamCtx.DockerDaemonInsecureSkipTLSVerify = true
|
||||
upstreamCtx.DockerInsecureSkipTLSVerify = types.NewOptionalBool(true)
|
||||
}
|
||||
|
||||
if credentials != (Credentials{}) {
|
||||
upstreamCtx.DockerAuthConfig = &types.DockerAuthConfig{
|
||||
Username: credentials.Username,
|
||||
Password: credentials.Password,
|
||||
}
|
||||
}
|
||||
|
||||
return upstreamCtx
|
||||
}
|
||||
|
||||
func syncRegistry(regCfg RegistryConfig, log log.Logger, localRegistryName string, localCtx *types.SystemContext,
|
||||
policyCtx *signature.PolicyContext, credentials Credentials) error {
|
||||
if len(regCfg.Content) == 0 {
|
||||
log.Info().Msgf("no content found for %s, will not run periodically sync", regCfg.URL)
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info().Msgf("syncing registry: %s", regCfg.URL)
|
||||
|
||||
var err error
|
||||
|
||||
log.Debug().Msg("getting upstream context")
|
||||
|
||||
upstreamCtx := getUpstreamContext(®Cfg, credentials)
|
||||
options := getCopyOptions(upstreamCtx, localCtx)
|
||||
|
||||
retryOptions := &retry.RetryOptions{
|
||||
MaxRetry: maxRetries,
|
||||
Delay: delay,
|
||||
}
|
||||
|
||||
var catalog catalog
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
catalog, err = getUpstreamCatalog(®Cfg, credentials, log)
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msg("error while getting upstream catalog, retrying...")
|
||||
return err
|
||||
}
|
||||
|
||||
upstreamRegistryName := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
|
||||
log.Info().Msg("filtering repos based on sync prefixes")
|
||||
|
||||
repos := filterRepos(catalog.Repositories, regCfg.Content)
|
||||
|
||||
log.Info().Msgf("got repos: %v", repos)
|
||||
|
||||
var images []types.ImageReference
|
||||
|
||||
for contentID, repos := range repos {
|
||||
r := repos
|
||||
id := contentID
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
refs, err := imagesToCopyFromUpstream(upstreamRegistryName, r, upstreamCtx, regCfg.Content[id], log)
|
||||
images = append(images, refs...)
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msg("error while getting images references from upstream, retrying...")
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if len(images) == 0 {
|
||||
log.Info().Msg("no images to copy, no need to sync")
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, ref := range images {
|
||||
upstreamRef := ref
|
||||
|
||||
suffix := strings.Replace(ref.DockerReference().String(), upstreamRegistryName, "", 1)
|
||||
|
||||
localRef, err := docker.Transport.ParseReference(
|
||||
fmt.Sprintf("//%s%s", localRegistryName, suffix),
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msgf("copying image %s to %s", upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
|
||||
|
||||
if err = retry.RetryIfNecessary(context.Background(), func() error {
|
||||
_, err = copy.Image(context.Background(), policyCtx, localRef, upstreamRef, &options)
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msgf("error while copying image %s to %s",
|
||||
upstreamRef.DockerReference().Name(), localRef.DockerReference().Name())
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Msgf("finished syncing %s", regCfg.URL)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func getLocalContexts(serverCert, serverKey,
|
||||
caCert string, log log.Logger) (*types.SystemContext, *signature.PolicyContext, error) {
|
||||
log.Debug().Msg("getting local context")
|
||||
|
||||
var policy *signature.Policy
|
||||
|
||||
var err error
|
||||
|
||||
localCtx := &types.SystemContext{}
|
||||
|
||||
if serverCert != "" && serverKey != "" {
|
||||
certsDir, err := copyLocalCerts(serverCert, serverKey, caCert, log)
|
||||
if err != nil {
|
||||
return &types.SystemContext{}, &signature.PolicyContext{}, err
|
||||
}
|
||||
|
||||
localCtx.DockerDaemonCertPath = certsDir
|
||||
localCtx.DockerCertPath = certsDir
|
||||
|
||||
policy, err = signature.DefaultPolicy(localCtx)
|
||||
if err != nil {
|
||||
return &types.SystemContext{}, &signature.PolicyContext{}, err
|
||||
}
|
||||
} else {
|
||||
localCtx.DockerDaemonInsecureSkipTLSVerify = true
|
||||
localCtx.DockerInsecureSkipTLSVerify = types.NewOptionalBool(true)
|
||||
policy = &signature.Policy{Default: []signature.PolicyRequirement{signature.NewPRInsecureAcceptAnything()}}
|
||||
}
|
||||
|
||||
policyContext, err := signature.NewPolicyContext(policy)
|
||||
if err != nil {
|
||||
return &types.SystemContext{}, &signature.PolicyContext{}, err
|
||||
}
|
||||
|
||||
return localCtx, policyContext, nil
|
||||
}
|
||||
|
||||
func Run(cfg Config, log log.Logger, address, port, serverCert, serverKey, caCert string) error {
|
||||
localCtx, policyCtx, err := getLocalContexts(serverCert, serverKey, caCert, log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
localRegistry := strings.Replace(fmt.Sprintf("%s:%s", address, port), "0.0.0.0", "127.0.0.1", 1)
|
||||
|
||||
var credentialsFile CredentialsFile
|
||||
|
||||
if cfg.CredentialsFile != "" {
|
||||
credentialsFile, err = getFileCredentials(cfg.CredentialsFile)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
var ticker *time.Ticker
|
||||
|
||||
for _, regCfg := range cfg.Registries {
|
||||
// schedule each registry sync
|
||||
ticker = time.NewTicker(regCfg.PollInterval)
|
||||
|
||||
upstreamRegistry := strings.Replace(strings.Replace(regCfg.URL, "http://", "", 1), "https://", "", 1)
|
||||
|
||||
go func(regCfg RegistryConfig) {
|
||||
defer os.RemoveAll(certsDir)
|
||||
// run sync first, then run on interval
|
||||
if err := syncRegistry(regCfg, log, localRegistry, localCtx, policyCtx,
|
||||
credentialsFile[upstreamRegistry]); err != nil {
|
||||
log.Err(err).Msg("sync exited with error, stopping it...")
|
||||
ticker.Stop()
|
||||
}
|
||||
|
||||
// run on intervals
|
||||
for range ticker.C {
|
||||
if err := syncRegistry(regCfg, log, localRegistry, localCtx, policyCtx,
|
||||
credentialsFile[upstreamRegistry]); err != nil {
|
||||
log.Err(err).Msg("sync exited with error, stopping it...")
|
||||
ticker.Stop()
|
||||
}
|
||||
}
|
||||
}(regCfg)
|
||||
}
|
||||
|
||||
log.Info().Msg("finished setting up sync")
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,109 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/anuvu/zot/errors"
|
||||
"github.com/anuvu/zot/pkg/log"
|
||||
"github.com/containers/image/v5/docker"
|
||||
"github.com/containers/image/v5/docker/reference"
|
||||
"github.com/containers/image/v5/types"
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
)
|
||||
|
||||
const (
|
||||
BaseURL = "http://127.0.0.1:5001"
|
||||
ServerCert = "../../../test/data/server.cert"
|
||||
ServerKey = "../../../test/data/server.key"
|
||||
CACert = "../../../test/data/ca.crt"
|
||||
|
||||
testImage = "zot-test"
|
||||
testImageTag = "0.0.1"
|
||||
|
||||
host = "127.0.0.1:45117"
|
||||
)
|
||||
|
||||
func TestSyncInternal(t *testing.T) {
|
||||
Convey("test parseRepositoryReference func", t, func() {
|
||||
repositoryReference := fmt.Sprintf("%s/%s", host, testImage)
|
||||
ref, err := parseRepositoryReference(repositoryReference)
|
||||
So(err, ShouldBeNil)
|
||||
So(ref.Name(), ShouldEqual, repositoryReference)
|
||||
|
||||
repositoryReference = fmt.Sprintf("%s/%s:tagged", host, testImage)
|
||||
_, err = parseRepositoryReference(repositoryReference)
|
||||
So(err, ShouldEqual, errors.ErrInvalidRepositoryName)
|
||||
|
||||
repositoryReference = fmt.Sprintf("http://%s/%s", host, testImage)
|
||||
_, err = parseRepositoryReference(repositoryReference)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
repositoryReference = fmt.Sprintf("docker://%s/%s", host, testImage)
|
||||
_, err = parseRepositoryReference(repositoryReference)
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, err = getFileCredentials("/path/to/inexistent/file")
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
f, err := ioutil.TempFile("", "sync-credentials-")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
content := []byte(`{`)
|
||||
if err := ioutil.WriteFile(f.Name(), content, 0600); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
_, err = getFileCredentials(f.Name())
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
srcCtx := &types.SystemContext{}
|
||||
_, err = getImageTags(context.Background(), srcCtx, ref)
|
||||
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, err = getLocalContexts("inexistent.cert", "inexistent.key", "inexistent.crt", log.NewLogger("", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, err = getLocalContexts(ServerCert, "inexistent.key", "inexistent.crt", log.NewLogger("", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
_, _, err = getLocalContexts(ServerCert, ServerKey, "inexistent.crt", log.NewLogger("", ""))
|
||||
So(err, ShouldNotBeNil)
|
||||
|
||||
taggedRef, err := reference.WithTag(ref, testImageTag)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
dockerRef, err := docker.NewReference(taggedRef)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
So(getTagFromRef(dockerRef, log.NewLogger("", "")), ShouldNotBeNil)
|
||||
|
||||
var tlsVerify bool
|
||||
updateDuration := time.Microsecond
|
||||
syncRegistryConfig := RegistryConfig{
|
||||
Content: []Content{
|
||||
{
|
||||
Prefix: testImage,
|
||||
},
|
||||
},
|
||||
URL: BaseURL,
|
||||
PollInterval: updateDuration,
|
||||
TLSVerify: &tlsVerify,
|
||||
CertDir: "",
|
||||
}
|
||||
|
||||
cfg := Config{Registries: []RegistryConfig{syncRegistryConfig}, CredentialsFile: "/invalid/path/to/file"}
|
||||
|
||||
So(Run(cfg, log.NewLogger("", ""),
|
||||
"127.0.0.1", "5000", ServerCert, ServerKey, CACert), ShouldNotBeNil)
|
||||
|
||||
_, err = getFileCredentials("/invalid/path/to/file")
|
||||
So(err, ShouldNotBeNil)
|
||||
})
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,166 @@
|
||||
package sync
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
"github.com/anuvu/zot/errors"
|
||||
"github.com/anuvu/zot/pkg/log"
|
||||
"github.com/containers/image/v5/docker/reference"
|
||||
"github.com/containers/image/v5/types"
|
||||
)
|
||||
|
||||
var certsDir = fmt.Sprintf("%s/zot-certs-dir/", os.TempDir()) //nolint: gochecknoglobals
|
||||
|
||||
func copyFile(sourceFilePath, destFilePath string) error {
|
||||
destFile, err := os.Create(destFilePath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer destFile.Close()
|
||||
|
||||
// should never get error because server certs are already handled by zot, by the time
|
||||
// it gets here
|
||||
sourceFile, _ := os.Open(sourceFilePath)
|
||||
defer sourceFile.Close()
|
||||
|
||||
if _, err := io.Copy(destFile, sourceFile); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func copyLocalCerts(serverCert, serverKey, caCert string, log log.Logger) (string, error) {
|
||||
log.Debug().Msgf("Creating certs directory: %s", certsDir)
|
||||
|
||||
err := os.Mkdir(certsDir, 0755)
|
||||
if err != nil && !os.IsExist(err) {
|
||||
return "", err
|
||||
}
|
||||
|
||||
if serverCert != "" {
|
||||
log.Debug().Msgf("Copying server cert: %s", serverCert)
|
||||
|
||||
err := copyFile(serverCert, path.Join(certsDir, "server.cert"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
if serverKey != "" {
|
||||
log.Debug().Msgf("Copying server key: %s", serverKey)
|
||||
|
||||
err := copyFile(serverKey, path.Join(certsDir, "server.key"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
if caCert != "" {
|
||||
log.Debug().Msgf("Copying CA cert: %s", caCert)
|
||||
|
||||
err := copyFile(caCert, path.Join(certsDir, "ca.crt"))
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
}
|
||||
|
||||
return certsDir, nil
|
||||
}
|
||||
|
||||
// getTagFromRef returns a tagged reference from an image reference.
|
||||
func getTagFromRef(ref types.ImageReference, log log.Logger) reference.Tagged {
|
||||
tagged, isTagged := ref.DockerReference().(reference.Tagged)
|
||||
if !isTagged {
|
||||
log.Warn().Msgf("internal server error, reference %s does not have a tag, skipping", ref.DockerReference())
|
||||
return nil
|
||||
}
|
||||
|
||||
return tagged
|
||||
}
|
||||
|
||||
// parseRepositoryReference parses input into a reference.Named, and verifies that it names a repository, not an image.
|
||||
func parseRepositoryReference(input string) (reference.Named, error) {
|
||||
ref, err := reference.ParseNormalizedNamed(input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if !reference.IsNameOnly(ref) {
|
||||
return nil, errors.ErrInvalidRepositoryName
|
||||
}
|
||||
|
||||
return ref, nil
|
||||
}
|
||||
|
||||
// filterRepos filters repos based on prefix given in the config.
|
||||
func filterRepos(repos []string, content []Content) map[int][]string {
|
||||
// prefix: repo
|
||||
filtered := make(map[int][]string)
|
||||
|
||||
for _, repo := range repos {
|
||||
matched := false
|
||||
// we use contentID to figure out tags filtering
|
||||
for contentID, c := range content {
|
||||
// handle prefixes starting with '/'
|
||||
var prefix string
|
||||
if strings.HasPrefix(c.Prefix, "/") {
|
||||
prefix = c.Prefix[1:]
|
||||
} else {
|
||||
prefix = c.Prefix
|
||||
}
|
||||
|
||||
// split both prefix and repository and compare each part
|
||||
splittedPrefix := strings.Split(prefix, "/")
|
||||
// split at most n + 1
|
||||
splittedRepo := strings.SplitN(repo, "/", len(splittedPrefix)+1)
|
||||
|
||||
// if prefix is longer than a repository, no match
|
||||
if len(splittedPrefix) > len(splittedRepo) {
|
||||
continue
|
||||
}
|
||||
|
||||
// check if matched each part of prefix and repository
|
||||
for i := 0; i < len(splittedPrefix); i++ {
|
||||
if splittedRepo[i] == splittedPrefix[i] {
|
||||
matched = true
|
||||
} else {
|
||||
// if a part doesn't match, check next prefix
|
||||
matched = false
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// if matched no need to check the next prefixes
|
||||
if matched {
|
||||
filtered[contentID] = append(filtered[contentID], repo)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return filtered
|
||||
}
|
||||
|
||||
// Get sync.FileCredentials from file.
|
||||
func getFileCredentials(filepath string) (CredentialsFile, error) {
|
||||
f, err := ioutil.ReadFile(filepath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var creds CredentialsFile
|
||||
|
||||
err = json.Unmarshal(f, &creds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return creds, nil
|
||||
}
|
||||
Reference in New Issue
Block a user