mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 12:58:02 +08:00
fix(sync): fixed broken logic to get tags for repo (#900)
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
+148
-292
@@ -7,11 +7,9 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"regexp"
|
||||
goSync "sync"
|
||||
"time"
|
||||
|
||||
"github.com/Masterminds/semver"
|
||||
"github.com/containers/common/pkg/retry"
|
||||
"github.com/containers/image/v5/copy"
|
||||
"github.com/containers/image/v5/docker"
|
||||
@@ -76,6 +74,12 @@ type Tags struct {
|
||||
Semver *bool
|
||||
}
|
||||
|
||||
type RepoReferences struct {
|
||||
contentID int // matched registry config content
|
||||
name string // repo name
|
||||
imageReferences []types.ImageReference // contained images(tags)
|
||||
}
|
||||
|
||||
// getUpstreamCatalog gets all repos from a registry.
|
||||
func getUpstreamCatalog(client *resty.Client, upstreamURL string, log log.Logger) (catalog, error) {
|
||||
var catalog catalog
|
||||
@@ -106,159 +110,70 @@ func getUpstreamCatalog(client *resty.Client, upstreamURL string, log log.Logger
|
||||
return catalog, nil
|
||||
}
|
||||
|
||||
// getImageTags lists all tags in a repository.
|
||||
// It returns a string slice of tags and any error encountered.
|
||||
func getImageTags(ctx context.Context, sysCtx *types.SystemContext, repoRef reference.Named) ([]string, error) {
|
||||
dockerRef, err := docker.NewReference(reference.TagNameOnly(repoRef))
|
||||
// hard to reach test case, injected error, see pkg/test/dev.go
|
||||
if err = test.Error(err); err != nil {
|
||||
return nil, err // Should never happen for a reference with tag and no digest
|
||||
}
|
||||
|
||||
tags, err := docker.GetRepositoryTags(ctx, sysCtx, dockerRef)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
// filterImagesByTagRegex filters images by tag regex given in the config.
|
||||
func filterImagesByTagRegex(upstreamReferences *[]types.ImageReference, content Content, log log.Logger) error {
|
||||
refs := *upstreamReferences
|
||||
|
||||
if content.Tags == nil {
|
||||
// no need to filter anything
|
||||
return nil
|
||||
}
|
||||
|
||||
if content.Tags.Regex != nil {
|
||||
log.Info().Msgf("start filtering using the regular expression: %s", *content.Tags.Regex)
|
||||
|
||||
tagReg, err := regexp.Compile(*content.Tags.Regex)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
numTags := 0
|
||||
|
||||
for _, ref := range refs {
|
||||
tagged := getTagFromRef(ref, log)
|
||||
if tagged != nil {
|
||||
if tagReg.MatchString(tagged.Tag()) {
|
||||
refs[numTags] = ref
|
||||
numTags++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
refs = refs[:numTags]
|
||||
}
|
||||
|
||||
*upstreamReferences = refs
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// filterImagesBySemver filters images by checking if their tags are semver compliant.
|
||||
func filterImagesBySemver(upstreamReferences *[]types.ImageReference, content Content, log log.Logger) {
|
||||
refs := *upstreamReferences
|
||||
|
||||
if content.Tags == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if content.Tags.Semver != nil && *content.Tags.Semver {
|
||||
log.Info().Msg("start filtering using semver compliant rule")
|
||||
|
||||
numTags := 0
|
||||
|
||||
for _, ref := range refs {
|
||||
tagged := getTagFromRef(ref, log)
|
||||
if tagged != nil {
|
||||
_, ok := semver.NewVersion(tagged.Tag())
|
||||
if ok == nil {
|
||||
refs[numTags] = ref
|
||||
numTags++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
refs = refs[:numTags]
|
||||
}
|
||||
|
||||
*upstreamReferences = refs
|
||||
}
|
||||
|
||||
// imagesToCopyFromRepos lists all images given a registry name and its repos.
|
||||
func imagesToCopyFromUpstream(ctx context.Context, registryName string, repos []string,
|
||||
func imagesToCopyFromUpstream(ctx context.Context, registryName string, repoName string,
|
||||
upstreamCtx *types.SystemContext, content Content, log log.Logger,
|
||||
) (map[string][]types.ImageReference, error) {
|
||||
upstreamReferences := make(map[string][]types.ImageReference)
|
||||
) ([]types.ImageReference, error) {
|
||||
imageRefs := []types.ImageReference{}
|
||||
|
||||
for _, repoName := range repos {
|
||||
repoUpstreamReferences := make([]types.ImageReference, 0)
|
||||
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryName, repoName))
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't parse repository reference: %s", repoRef)
|
||||
|
||||
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryName, repoName))
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't parse repository reference: %s", repoRef)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
tags, err := getImageTags(ctx, upstreamCtx, repoRef)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't fetch tags for %s", repoRef)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, tag := range tags {
|
||||
// don't copy cosign signature, containers/image doesn't support it
|
||||
// we will copy it manually later
|
||||
if isCosignTag(tag) {
|
||||
continue
|
||||
}
|
||||
|
||||
taggedRef, err := reference.WithTag(repoRef, tag)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("error creating a reference for repository %s and tag %q", repoRef.Name(), tag)
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ref, err := docker.NewReference(taggedRef)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("cannot obtain a valid image reference for transport %q and reference %s",
|
||||
docker.Transport.Name(), taggedRef.String())
|
||||
|
||||
return nil, err
|
||||
}
|
||||
|
||||
repoUpstreamReferences = append(repoUpstreamReferences, ref)
|
||||
}
|
||||
|
||||
upstreamReferences[repoName] = repoUpstreamReferences
|
||||
|
||||
log.Debug().Msgf("repo: %s - upstream refs to be copied: %v", repoName, upstreamReferences)
|
||||
|
||||
err = filterImagesByTagRegex(&repoUpstreamReferences, content, log)
|
||||
if err != nil {
|
||||
return map[string][]types.ImageReference{}, err
|
||||
}
|
||||
|
||||
log.Debug().Msgf("repo: %s - remaining upstream refs to be copied: %v", repoName, repoUpstreamReferences)
|
||||
|
||||
filterImagesBySemver(&repoUpstreamReferences, content, log)
|
||||
|
||||
log.Debug().Msgf("repo: %s - remaining upstream refs to be copied: %v", repoName, repoUpstreamReferences)
|
||||
|
||||
upstreamReferences[repoName] = repoUpstreamReferences
|
||||
return imageRefs, err
|
||||
}
|
||||
|
||||
return upstreamReferences, nil
|
||||
tags, err := getImageTags(ctx, upstreamCtx, repoRef)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't fetch tags for %s", repoRef)
|
||||
|
||||
return imageRefs, err
|
||||
}
|
||||
|
||||
// filter based on tags rules
|
||||
if content.Tags != nil {
|
||||
if content.Tags.Regex != nil {
|
||||
tags, err = filterTagsByRegex(tags, *content.Tags.Regex, log)
|
||||
if err != nil {
|
||||
return imageRefs, err
|
||||
}
|
||||
}
|
||||
|
||||
if content.Tags.Semver != nil && *content.Tags.Semver {
|
||||
tags = filterTagsBySemver(tags, log)
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug().Msgf("repo: %s - upstream tags to be copied: %v", repoName, tags)
|
||||
|
||||
for _, tag := range tags {
|
||||
// don't copy cosign signature, containers/image doesn't support it
|
||||
// we will copy it manually later
|
||||
if isCosignTag(tag) {
|
||||
continue
|
||||
}
|
||||
|
||||
taggedRef, err := reference.WithTag(repoRef, tag)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("error creating a reference for repository %s and tag %q", repoRef.Name(), tag)
|
||||
|
||||
return imageRefs, err
|
||||
}
|
||||
|
||||
ref, err := docker.NewReference(taggedRef)
|
||||
if err != nil {
|
||||
log.Err(err).Msgf("cannot obtain a valid image reference for transport %q and reference %s",
|
||||
docker.Transport.Name(), taggedRef.String())
|
||||
|
||||
return imageRefs, err
|
||||
}
|
||||
|
||||
imageRefs = append(imageRefs, ref)
|
||||
}
|
||||
|
||||
return imageRefs, nil
|
||||
}
|
||||
|
||||
func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options {
|
||||
@@ -339,69 +254,53 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
|
||||
|
||||
upstreamAddr := StripRegistryTransport(upstreamURL)
|
||||
|
||||
reposWithContentID := make(map[string][]struct {
|
||||
ref types.ImageReference
|
||||
content Content
|
||||
})
|
||||
reposReferences := []RepoReferences{}
|
||||
|
||||
for contentID, repos := range repos {
|
||||
r := repos
|
||||
contentID := contentID
|
||||
for _, repoName := range repos {
|
||||
var imageReferences []types.ImageReference
|
||||
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
for _, repo := range r {
|
||||
refs, err := imagesToCopyFromUpstream(ctx, upstreamAddr, r, upstreamCtx, regCfg.Content[contentID], log)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
imageReferences, err = imagesToCopyFromUpstream(ctx, upstreamAddr, repoName, upstreamCtx,
|
||||
regCfg.Content[contentID], log)
|
||||
|
||||
for _, ref := range refs[repo] {
|
||||
reposWithContentID[repo] = append(reposWithContentID[repo], struct {
|
||||
ref types.ImageReference
|
||||
content Content
|
||||
}{
|
||||
ref: ref,
|
||||
content: regCfg.Content[contentID],
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msg("error while getting images references from upstream, retrying...")
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
for remoteRepo, imageList := range reposWithContentID {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
default:
|
||||
break
|
||||
}
|
||||
|
||||
remoteRepoCopy := remoteRepo
|
||||
|
||||
for _, image := range imageList {
|
||||
localRepo := getRepoDestination(remoteRepo, image.content)
|
||||
|
||||
imageStore := storeController.GetImageStore(localRepo)
|
||||
|
||||
localCachePath, err := getLocalCachePath(imageStore, localRepo)
|
||||
if err != nil {
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't get localCachePath for %s", remoteRepoCopy)
|
||||
Err(err).Msg("error while getting images references from upstream, retrying...")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
defer os.RemoveAll(localCachePath)
|
||||
reposReferences = append(reposReferences, RepoReferences{
|
||||
contentID: contentID,
|
||||
name: repoName,
|
||||
imageReferences: imageReferences,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
upstreamImageRef := image.ref
|
||||
sig := newSignaturesCopier(httpClient, *registryURL, storeController, log)
|
||||
|
||||
for _, repoReference := range reposReferences {
|
||||
upstreamRepo := repoReference.name
|
||||
content := regCfg.Content[repoReference.contentID]
|
||||
|
||||
localRepo := getRepoDestination(upstreamRepo, content)
|
||||
|
||||
imageStore := storeController.GetImageStore(localRepo)
|
||||
|
||||
localCachePath, err := getLocalCachePath(imageStore, localRepo)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't get localCachePath for %s", localRepo)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
defer os.RemoveAll(localCachePath)
|
||||
|
||||
for _, upstreamImageRef := range repoReference.imageReferences {
|
||||
upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamImageRef)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference())
|
||||
@@ -409,17 +308,15 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
|
||||
return err
|
||||
}
|
||||
|
||||
tag := getTagFromRef(upstreamImageRef, log).Tag()
|
||||
// get upstream signatures
|
||||
cosignManifest, err := getCosignManifest(httpClient, *registryURL, remoteRepoCopy,
|
||||
upstreamImageDigest.String(), log)
|
||||
cosignManifest, err := sig.getCosignManifest(upstreamRepo, upstreamImageDigest.String())
|
||||
if err != nil && !errors.Is(err, zerr.ErrSyncSignatureNotFound) {
|
||||
log.Error().Err(err).Msgf("couldn't get upstream image %s cosign manifest", upstreamImageRef.DockerReference())
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
refs, err := getNotaryRefs(httpClient, *registryURL, remoteRepoCopy, upstreamImageDigest.String(), log)
|
||||
refs, err := sig.getNotaryRefs(upstreamRepo, upstreamImageDigest.String())
|
||||
if err != nil && !errors.Is(err, zerr.ErrSyncSignatureNotFound) {
|
||||
log.Error().Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference())
|
||||
|
||||
@@ -437,6 +334,8 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
|
||||
}
|
||||
}
|
||||
|
||||
tag := getTagFromRef(upstreamImageRef, log).Tag()
|
||||
|
||||
skipImage, err := canSkipImage(localRepo, tag, upstreamImageDigest.String(), imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped",
|
||||
@@ -445,103 +344,60 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
|
||||
return err
|
||||
}
|
||||
|
||||
// sync only differences
|
||||
if skipImage {
|
||||
if !skipImage {
|
||||
// sync image
|
||||
localImageRef, err := getLocalImageRef(localCachePath, localRepo, tag)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
|
||||
localCachePath, localRepo, tag)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
|
||||
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
_, err = copy.Image(ctx, policyCtx, localImageRef, upstreamImageRef, &options)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("error while copying image %s to %s",
|
||||
upstreamImageRef.DockerReference(), localCachePath)
|
||||
|
||||
return err
|
||||
}
|
||||
// push from cache to repo
|
||||
err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("error while pushing synced cached image %s",
|
||||
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
|
||||
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
log.Info().Msgf("already synced image %s, checking its signatures", upstreamImageRef.DockerReference())
|
||||
|
||||
skipNotarySig, err := canSkipNotarySignature(localRepo, tag, upstreamImageDigest.String(),
|
||||
refs, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't check if the upstream image %s notary signature can be skipped",
|
||||
upstreamImageRef.DockerReference())
|
||||
}
|
||||
|
||||
if !skipNotarySig {
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepoCopy,
|
||||
upstreamImageDigest.String(), refs, log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't copy notary signature for %s", upstreamImageRef.DockerReference())
|
||||
}
|
||||
}
|
||||
|
||||
skipCosignSig, err := canSkipCosignSignature(localRepo, tag, upstreamImageDigest.String(),
|
||||
cosignManifest, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't check if the upstream image %s cosign signature can be skipped",
|
||||
upstreamImageRef.DockerReference())
|
||||
}
|
||||
|
||||
if !skipCosignSig {
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepoCopy,
|
||||
upstreamImageDigest.String(), cosignManifest, log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Err(err).Msgf("couldn't copy cosign signature for %s", upstreamImageRef.DockerReference())
|
||||
}
|
||||
}
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
localImageRef, err := getLocalImageRef(localCachePath, localRepo, tag)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
|
||||
localCachePath, localRepo, tag)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath)
|
||||
|
||||
// sync signatures
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
_, err = copy.Image(ctx, policyCtx, localImageRef, upstreamImageRef, &options)
|
||||
err = sig.syncNotarySignature(localRepo, upstreamRepo, upstreamImageDigest.String(), refs)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("error while copying image %s to %s",
|
||||
upstreamImageRef.DockerReference(), localCachePath)
|
||||
err = sig.syncCosignSignature(localRepo, upstreamRepo, upstreamImageDigest.String(), cosignManifest)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
// push from cache to repo
|
||||
err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
|
||||
if err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("error while pushing synced cached image %s",
|
||||
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
refs, err = getNotaryRefs(httpClient, *registryURL, remoteRepoCopy, upstreamImageDigest.String(), log)
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo,
|
||||
remoteRepoCopy, upstreamImageDigest.String(), refs, log)
|
||||
|
||||
return err
|
||||
return nil
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't copy notary signature for %s", upstreamImageRef.DockerReference())
|
||||
}
|
||||
|
||||
cosignManifest, err = getCosignManifest(httpClient, *registryURL, remoteRepoCopy,
|
||||
upstreamImageDigest.String(), log)
|
||||
if err = retry.RetryIfNecessary(ctx, func() error {
|
||||
err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo,
|
||||
remoteRepoCopy, upstreamImageDigest.String(), cosignManifest, log)
|
||||
|
||||
return err
|
||||
}, retryOptions); err != nil {
|
||||
log.Error().Str("errorType", TypeOf(err)).
|
||||
Err(err).Msgf("couldn't copy cosign signature for %s", upstreamImageRef.DockerReference())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user