diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 60c44778..abe5eede 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -51,10 +51,10 @@ func (di *demandedImages) delete(key string) { } func OneImage(cfg Config, storeController storage.StoreController, - repo, tag string, isArtifact bool, log log.Logger, + repo, reference string, isArtifact bool, log log.Logger, ) error { // guard against multiple parallel requests - demandedImage := fmt.Sprintf("%s:%s", repo, tag) + demandedImage := fmt.Sprintf("%s:%s", repo, reference) // loadOrStore image-based channel imageChannel, found := demandedImgs.loadOrStoreChan(demandedImage, make(chan error)) // if value found wait on channel receive or close @@ -73,7 +73,7 @@ func OneImage(cfg Config, storeController storage.StoreController, defer demandedImgs.delete(demandedImage) defer close(imageChannel) - go syncOneImage(imageChannel, cfg, storeController, repo, tag, isArtifact, log) + go syncOneImage(imageChannel, cfg, storeController, repo, reference, isArtifact, log) err, ok := <-imageChannel if !ok { @@ -84,7 +84,7 @@ func OneImage(cfg Config, storeController storage.StoreController, } func syncOneImage(imageChannel chan error, cfg Config, storeController storage.StoreController, - localRepo, tag string, isArtifact bool, log log.Logger, + localRepo, reference string, isArtifact bool, log log.Logger, ) { var credentialsFile CredentialsFile @@ -161,21 +161,21 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S options := getCopyOptions(upstreamCtx, localCtx) // demanded 'image' is a signature - if isCosignTag(tag) { + if isCosignTag(reference) { // at tis point we should already have images synced, but not their signatures. // is cosign signature - cosignManifest, err := sig.getCosignManifest(upstreamRepo, tag) + cosignManifest, err := sig.getCosignManifest(upstreamRepo, reference) if err != nil { log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, upstreamRepo, tag) + Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, upstreamRepo, reference) continue } - err = sig.syncCosignSignature(localRepo, upstreamRepo, tag, cosignManifest) + err = sig.syncCosignSignature(localRepo, upstreamRepo, reference, cosignManifest) if err != nil { log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, upstreamRepo, tag) + Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, upstreamRepo, reference) continue } @@ -185,18 +185,18 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S return } else if isArtifact { // is notary signature - refs, err := sig.getNotaryRefs(upstreamRepo, tag) + refs, err := sig.getNotaryRefs(upstreamRepo, reference) if err != nil { log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, tag) + Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, upstreamRepo, reference) continue } - err = sig.syncNotarySignature(localRepo, upstreamRepo, tag, refs) + err = sig.syncNotarySignature(localRepo, upstreamRepo, reference, refs) if err != nil { log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, tag) + Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, upstreamRepo, reference) continue } @@ -214,13 +214,13 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S copyOptions: options, } - skipped, copyErr := syncRun(regCfg, localRepo, upstreamRepo, tag, syncContextUtils, sig, log) + skipped, copyErr := syncRun(regCfg, localRepo, upstreamRepo, reference, syncContextUtils, sig, log) if skipped { continue } // key used to check if we already have a go routine syncing this image - demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, upstreamRepo, tag) + demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, upstreamRepo, reference) if copyErr != nil { // don't retry in background if maxretry is 0 @@ -249,7 +249,7 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S time.Sleep(retryOptions.Delay) if err = retry.RetryIfNecessary(context.Background(), func() error { - _, err := syncRun(regCfg, localRepo, upstreamRepo, tag, syncContextUtils, sig, log) + _, err := syncRun(regCfg, localRepo, upstreamRepo, reference, syncContextUtils, sig, log) return err }, retryOptions); err != nil { @@ -257,6 +257,10 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S Err(err).Msgf("sync routine: error while copying image %s", demandedImageRef) } }() + } else { + imageChannel <- nil + + return } } } @@ -265,24 +269,28 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S } func syncRun(regCfg RegistryConfig, - localRepo, upstreamRepo, tag string, utils syncContextUtils, sig *signaturesCopier, + localRepo, upstreamRepo, reference string, utils syncContextUtils, sig *signaturesCopier, log log.Logger, ) (bool, error) { - upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, tag) + upstreamImageDigest, refIsDigest := parseDigest(reference) + + upstreamImageRef, err := getImageRef(utils.upstreamAddr, upstreamRepo, reference) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("error creating docker reference for repository %s/%s:%s", - utils.upstreamAddr, upstreamRepo, tag) + utils.upstreamAddr, upstreamRepo, reference) return false, err } - upstreamImageDigest, err := docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef) - if err != nil { - log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference()) + if !refIsDigest { + upstreamImageDigest, err = docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef) + if err != nil { + log.Error().Str("errorType", TypeOf(err)). + Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference()) - return false, err + return false, err + } } // get upstream signatures @@ -316,11 +324,11 @@ func syncRun(regCfg RegistryConfig, log.Error().Err(err).Msgf("couldn't get localCachePath for %s", localRepo) } - localImageRef, err := getLocalImageRef(localCachePath, localRepo, tag) + localImageRef, err := getLocalImageRef(localCachePath, localRepo, reference) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", - localCachePath, localRepo, tag) + localCachePath, localRepo, reference) return false, err } @@ -338,11 +346,11 @@ func syncRun(regCfg RegistryConfig, return false, err } - err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log) + err = pushSyncedLocalImage(localRepo, reference, localCachePath, imageStore, log) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Msgf("error while pushing synced cached image %s", - fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag)) + fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, reference)) return false, err } @@ -350,7 +358,7 @@ func syncRun(regCfg RegistryConfig, err = sig.syncCosignSignature(localRepo, upstreamRepo, upstreamImageDigest.String(), cosignManifest) if err != nil { log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag) + Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference) return false, err } @@ -358,12 +366,12 @@ func syncRun(regCfg RegistryConfig, err = sig.syncNotarySignature(localRepo, upstreamRepo, upstreamImageDigest.String(), refs) if err != nil { log.Error().Str("errorType", TypeOf(err)). - Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag) + Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference) return false, err } - log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, upstreamRepo, tag) + log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, upstreamRepo, reference) return false, nil } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index f328a2b0..399005c3 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -182,7 +182,6 @@ func getCopyOptions(upstreamCtx, localCtx *types.SystemContext) copy.Options { SourceCtx: upstreamCtx, ReportWriter: io.Discard, ForceManifestMIMEType: ispec.MediaTypeImageManifest, // force only oci manifest MIME type - PreserveDigests: true, ImageListSelection: copy.CopyAllImages, } diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index c4980de0..93d61d00 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -2865,6 +2865,56 @@ func TestOnDemandRetryGoroutine(t *testing.T) { }) } +func TestOnDemandWithDigest(t *testing.T) { + Convey("Verify ondemand sync retries in background on error", t, func() { + _, srcBaseURL, _, _, _ := startUpstreamServer(t, false, false) + + regex := ".*" + semver := true + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + OnDemand: true, + TLSVerify: &tlsVerify, + CertDir: "", + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dctlr.Shutdown() + }() + + // get manifest digest from source + resp, err := destClient.R().Get(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + digest := godigest.FromBytes(resp.Body()) + + resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + digest.String()) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + }) +} + func TestOnDemandRetryGoroutineErr(t *testing.T) { Convey("Verify ondemand sync retries in background on error", t, func() { regex := ".*" diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index b0d31b6e..b24a5d65 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -328,28 +328,28 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede return client, registryURL, nil } -func pushSyncedLocalImage(localRepo, tag, localCachePath string, +func pushSyncedLocalImage(localRepo, reference, localCachePath string, imageStore storage.ImageStore, log log.Logger, ) error { - log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag) + log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, reference) metrics := monitoring.NewMetricsServer(false, log) cacheImageStore := local.NewImageStore(localCachePath, false, storage.DefaultGCDelay, false, false, log, metrics, nil) - manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, tag) + manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, reference) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)). - Msg("couldn't find index.json") + Msgf("couldn't find %s manifest", reference) return err } // is image manifest if mediaType == ispec.MediaTypeImageManifest { - if err := copyManifest(localRepo, manifestContent, tag, cacheImageStore, imageStore, log); err != nil { + if err := copyManifest(localRepo, manifestContent, reference, cacheImageStore, imageStore, log); err != nil { if errors.Is(err, zerr.ErrImageLintAnnotations) { log.Error().Str("errorType", TypeOf(err)). Err(err).Msg("couldn't upload manifest because of missing annotations") @@ -397,7 +397,7 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string, } } - _, err = imageStore.PutImageManifest(localRepo, tag, mediaType, manifestContent) + _, err = imageStore.PutImageManifest(localRepo, reference, mediaType, manifestContent) if err != nil { log.Error().Str("errorType", TypeOf(err)). Err(err).Msg("couldn't upload manifest") @@ -486,18 +486,28 @@ func StripRegistryTransport(url string) string { } // get an ImageReference given the registry, repo and tag. -func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error) { +func getImageRef(registryDomain, repo, ref string) (types.ImageReference, error) { repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryDomain, repo)) if err != nil { return nil, err } - taggedRepoRef, err := reference.WithTag(repoRef, tag) - if err != nil { - return nil, err + var namedRepoRef reference.Named + + digest, ok := parseDigest(ref) + if ok { + namedRepoRef, err = reference.WithDigest(repoRef, digest) + if err != nil { + return nil, err + } + } else { + namedRepoRef, err = reference.WithTag(repoRef, ref) + if err != nil { + return nil, err + } } - imageRef, err := docker.NewReference(taggedRepoRef) + imageRef, err := docker.NewReference(namedRepoRef) if err != nil { return nil, err } @@ -506,15 +516,20 @@ func getImageRef(registryDomain, repo, tag string) (types.ImageReference, error) } // get a local ImageReference used to temporary store one synced image. -func getLocalImageRef(localCachePath, repo, tag string) (types.ImageReference, error) { +func getLocalImageRef(localCachePath, repo, reference string) (types.ImageReference, error) { if _, err := os.ReadDir(localCachePath); err != nil { return nil, err } localRepo := path.Join(localCachePath, repo) - localTaggedRepo := fmt.Sprintf("%s:%s", localRepo, tag) - localImageRef, err := layout.ParseReference(localTaggedRepo) + _, refIsDigest := parseDigest(reference) + + if !refIsDigest { + localRepo = fmt.Sprintf("%s:%s", localRepo, reference) + } + + localImageRef, err := layout.ParseReference(localRepo) if err != nil { return nil, err } @@ -579,6 +594,18 @@ func canSkipImage(repo, tag string, digest godigest.Digest, imageStore storage.I return true, nil } +// parse a reference, return its digest and if it's valid. +func parseDigest(reference string) (godigest.Digest, bool) { + var ok bool + + d, err := godigest.Parse(reference) + if err == nil { + ok = true + } + + return d, ok +} + func manifestsEqual(manifest1, manifest2 ispec.Manifest) bool { if manifest1.Config.Digest == manifest2.Config.Digest && manifest1.Config.MediaType == manifest2.Config.MediaType &&