From f53dc9eb8d40c2c1b77e4d61b801def2aaefd2bf Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Mon, 7 Mar 2022 10:45:10 +0200 Subject: [PATCH] sync: Add a new flag to enforce syncing only signed images, closes #455 sync: When checking if a image is already synced also check for changes in upstream signatures. Signed-off-by: Petu Eusebiu --- errors/errors.go | 2 + examples/README.md | 1 + examples/config-sync.json | 3 +- go.sum | 17 + pkg/extensions/sync/on_demand.go | 234 +++--- pkg/extensions/sync/signatures.go | 352 +++++++++ pkg/extensions/sync/sync.go | 150 +++- pkg/extensions/sync/sync_internal_test.go | 125 ++-- pkg/extensions/sync/sync_test.go | 850 ++++++++++++++++------ pkg/extensions/sync/utils.go | 358 ++------- pkg/storage/s3/storage.go | 1 + 11 files changed, 1417 insertions(+), 676 deletions(-) create mode 100644 pkg/extensions/sync/signatures.go diff --git a/errors/errors.go b/errors/errors.go index 049d4bcd..9730bfa3 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -50,4 +50,6 @@ var ( ErrInjected = errors.New("test: injected failure") ErrSyncInvalidUpstreamURL = errors.New("sync: upstream url not found in sync config") ErrRegistryNoContent = errors.New("sync: could not find a Content that matches localRepo") + ErrSyncSignatureNotFound = errors.New("sync: couldn't find any upstream notary/cosign signatures") + ErrSyncSignature = errors.New("sync: couldn't get upstream notary/cosign signatures") ) diff --git a/examples/README.md b/examples/README.md index a32a40b1..cf8a35a8 100644 --- a/examples/README.md +++ b/examples/README.md @@ -390,6 +390,7 @@ Configure each registry sync: "certDir": "/home/user/certs", # use certificates at certDir path, if not specified then use the default certs dir "maxRetries": 5, # maxRetries in case of temporary errors (default: no retries) "retryDelay": "10m", # delay between retries, retry options are applied for both on demand and periodically sync and retryDelay is mandatory when using maxRetries. + "onlySigned": true, # sync only signed images (either notary or cosign) "content":[ # which content to periodically pull, also it's used for filtering ondemand images, if not set then periodically polling will not run { "prefix":"/repo1/repo", # pull image repo1/repo diff --git a/examples/config-sync.json b/examples/config-sync.json index 16658863..9789d3a7 100644 --- a/examples/config-sync.json +++ b/examples/config-sync.json @@ -22,6 +22,7 @@ "certDir": "/home/user/certs", "maxRetries": 3, "retryDelay": "5m", + "onlySigned": true, "content":[ { "prefix":"/repo1/repo", @@ -64,4 +65,4 @@ ] } } -} \ No newline at end of file +} diff --git a/go.sum b/go.sum index 44eaf13c..8a61067f 100644 --- a/go.sum +++ b/go.sum @@ -76,7 +76,10 @@ cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjp cloud.google.com/go/pubsub v1.17.1/go.mod h1:4qDxMr1WsM9+aQAz36ltDwCIM+R0QdlseyFjBuNvnss= cloud.google.com/go/secretmanager v1.0.0/go.mod h1:+Qkm5qxIJ5mk74xxIXA+87fseaY1JLYBcFPQoc/GQxg= cloud.google.com/go/security v1.1.1/go.mod h1:QZd0wTwNJNKnl0H4/wAFD10TSX8kI4nk8V6ie6fyc9w= +cloud.google.com/go/security v1.1.1/go.mod h1:QZd0wTwNJNKnl0H4/wAFD10TSX8kI4nk8V6ie6fyc9w= cloud.google.com/go/spanner v1.17.0/go.mod h1:+17t2ixFwRG4lWRwE+5kipDR9Ef07Jkmc8z0IbMDKUs= +cloud.google.com/go/spanner v1.17.0/go.mod h1:+17t2ixFwRG4lWRwE+5kipDR9Ef07Jkmc8z0IbMDKUs= +cloud.google.com/go/spanner v1.18.0/go.mod h1:LvAjUXPeJRGNuGpikMULjhLj/t9cRvdc+fxRoLiugXA= cloud.google.com/go/spanner v1.18.0/go.mod h1:LvAjUXPeJRGNuGpikMULjhLj/t9cRvdc+fxRoLiugXA= cloud.google.com/go/spanner v1.25.0/go.mod h1:kQUft3x355hzzaeFbObjsvkzZDgpDkesp3v75WBnI8w= cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw= @@ -142,6 +145,8 @@ github.com/Azure/azure-sdk-for-go v61.5.0+incompatible h1:OSHSFeNm7D1InGsQrFjyN9 github.com/Azure/azure-sdk-for-go v61.5.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-service-bus-go v0.9.1/go.mod h1:yzBx6/BUGfjfeqbRZny9AQIbIe3AcV9WZbAdpkoXOa0= github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU= +github.com/Azure/azure-service-bus-go v0.11.5/go.mod h1:MI6ge2CuQWBVq+ly456MY7XqNLJip5LO1iSFodbNLbU= +github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0= github.com/Azure/azure-storage-blob-go v0.8.0/go.mod h1:lPI3aLPpuLTeUwh1sViKXFxwl2B6teiRqI0deQUvsw0= github.com/Azure/azure-storage-blob-go v0.14.0/go.mod h1:SMqIBi+SuiQH32bvyjngEewEeXoPfKMgWlBDaYf6fck= github.com/Azure/go-amqp v0.16.0/go.mod h1:9YJ3RhxRT1gquYnzpZO1vcYMMpAdJT+QEg6fwmw9Zlg= @@ -1370,6 +1375,8 @@ github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= github.com/google/go-replayers/grpcreplay v0.1.0/go.mod h1:8Ig2Idjpr6gifRd6pNVggX6TC1Zw6Jx74AKp7QNH2QE= github.com/google/go-replayers/grpcreplay v1.1.0/go.mod h1:qzAvJ8/wi57zq7gWqaE6AwLM6miiXUQwP1S+I9icmhk= +github.com/google/go-replayers/grpcreplay v1.1.0/go.mod h1:qzAvJ8/wi57zq7gWqaE6AwLM6miiXUQwP1S+I9icmhk= +github.com/google/go-replayers/httpreplay v0.1.0/go.mod h1:YKZViNhiGgqdBlUbI2MwGpq4pXxNmhJLPHQ7cv2b5no= github.com/google/go-replayers/httpreplay v0.1.0/go.mod h1:YKZViNhiGgqdBlUbI2MwGpq4pXxNmhJLPHQ7cv2b5no= github.com/google/go-replayers/httpreplay v1.0.0/go.mod h1:LJhKoTwS5Wy5Ld/peq8dFFG5OfJyHEz7ft+DsTUv25M= github.com/google/gofuzz v0.0.0-20161122191042-44d81051d367/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= @@ -2069,6 +2076,8 @@ github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJ github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ= +github.com/openzipkin/zipkin-go v0.3.0/go.mod h1:4c3sLeE8xjNqehmF5RpAFLPLJxXscc0R4l6Zg0P1tTQ= +github.com/oras-project/artifacts-spec v0.0.0-20210914235636-eecc5d95bcee/go.mod h1:Xch2aLzSwtkhbFFN6LUzTfLtukYvMMdXJ4oZ8O7BOdc= github.com/oras-project/artifacts-spec v0.0.0-20210914235636-eecc5d95bcee/go.mod h1:Xch2aLzSwtkhbFFN6LUzTfLtukYvMMdXJ4oZ8O7BOdc= github.com/oras-project/artifacts-spec v1.0.0-draft.1.1 h1:2YMUDyDH0glYA4gNG/zEg9HNVzgGX8kr/NBLR9AQkLQ= github.com/oras-project/artifacts-spec v1.0.0-draft.1.1/go.mod h1:Xch2aLzSwtkhbFFN6LUzTfLtukYvMMdXJ4oZ8O7BOdc= @@ -2257,8 +2266,11 @@ github.com/sassoftware/relic v0.0.0-20210427151427-dfb082b79b74/go.mod h1:YlB8wF github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sclevine/agouti v3.0.0+incompatible/go.mod h1:b4WX9W9L1sfQKXeJf1mUTLZKJ48R1S7H23Ji7oFO5Bw= github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U= +github.com/sclevine/spec v1.2.0/go.mod h1:W4J29eT/Kzv7/b9IWLB055Z+qvVC9vt0Arko24q7p+U= github.com/sclevine/spec v1.4.0/go.mod h1:LvpgJaFyvQzRvc1kaDs0bulYwzC70PbiYjC4QnFHkOM= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/seccomp/libseccomp-golang v0.9.2-0.20210429002308-3879420cc921/go.mod h1:JA8cRccbGaA1s33RQf7Y1+q9gHmZX1yB/z9WDN1C6fg= github.com/secure-systems-lab/go-securesystemslib v0.2.0/go.mod h1:eIjBmIP8LD2MLBL/DkQWayLiz006Q4p+hCu79rvWleY= @@ -2283,6 +2295,7 @@ github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFR github.com/shurcooL/go v0.0.0-20180423040247-9e1955d9fb6e/go.mod h1:TDJrrUr11Vxrven61rcy3hJMUqaf/CLWYhHNPmT14Lk= github.com/shurcooL/go-goon v0.0.0-20170922171312-37c2f522c041/go.mod h1:N5mDOmsrJOB+vfqUK+7DmDyjhSLIIBnXo9lvZJj3MWQ= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= +github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sigstore/cosign v1.6.0 h1:GBG+asPgsf2iawldL9uO8JDGbFO5yXuvaBTjdejsMzo= github.com/sigstore/cosign v1.6.0/go.mod h1:Ocd28z0Pwtd6+A8s/Vb4SbhwuWOqVdeYAW4yCGF4Ndg= github.com/sigstore/fulcio v0.1.2-0.20220114150912-86a2036f9bc7 h1:XE7A9lJ+wYhmUFBWYTaw3Ph943zHB4iBYd5R0SX0ZOA= @@ -2706,6 +2719,8 @@ go.uber.org/zap v1.20.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE= +go4.org v0.0.0-20180809161055-417644f6feb5/go.mod h1:MkTOUMDaeVYJUOUsaDXIhWPZYa1yOyC1qaOBpL57BhE= +gocloud.dev v0.19.0/go.mod h1:SmKwiR8YwIMMJvQBKLsC3fHNyMwXLw3PMDO+VVteJMI= gocloud.dev v0.19.0/go.mod h1:SmKwiR8YwIMMJvQBKLsC3fHNyMwXLw3PMDO+VVteJMI= gocloud.dev v0.24.1-0.20211119014450-028788aaaa4c/go.mod h1:EIJSlY7nvfeoWaV2GauF6es27gZfqtTVon47QFueoyE= golang.org/x/build v0.0.0-20190314133821-5284462c4bec/go.mod h1:atTaCNAy0f16Ah5aV1gMSwgiKVHwu/JncqDpuRr7lS4= @@ -3625,6 +3640,7 @@ k8s.io/component-base v0.20.1/go.mod h1:guxkoJnNoh8LNrbtiQOlyp2Y2XFCZQmrcg2n/DeY k8s.io/component-base v0.20.4/go.mod h1:t4p9EdiagbVCJKrQ1RsA5/V4rFQNDfRlevJajlGwgjI= k8s.io/component-base v0.20.6/go.mod h1:6f1MPBAeI+mvuts3sIdtpjljHWBQ2cIy38oBIWMYnrM= k8s.io/component-base v0.22.5/go.mod h1:VK3I+TjuF9eaa+Ln67dKxhGar5ynVbwnGrUiNF4MqCI= +k8s.io/component-base v0.22.5/go.mod h1:VK3I+TjuF9eaa+Ln67dKxhGar5ynVbwnGrUiNF4MqCI= k8s.io/cri-api v0.17.3/go.mod h1:X1sbHmuXhwaHs9xxYffLqJogVsnI+f6cPRcgPel7ywM= k8s.io/cri-api v0.20.1/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= k8s.io/cri-api v0.20.4/go.mod h1:2JRbKt+BFLTjtrILYVqQK5jqhI+XNdF6UiGMgczeBCI= @@ -3670,6 +3686,7 @@ k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/ k8s.io/utils v0.0.0-20220127004650-9b3446523e65 h1:ONWS0Wgdg5wRiQIAui7L/023aC9+IxrIrydY7l8llsE= k8s.io/utils v0.0.0-20220127004650-9b3446523e65/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= knative.dev/hack v0.0.0-20220118141833-9b2ed8471e30/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= +knative.dev/hack v0.0.0-20220118141833-9b2ed8471e30/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack/schema v0.0.0-20220224013837-e1785985d364/go.mod h1:ffjwmdcrH5vN3mPhO8RrF2KfNnbHeCE2C60A+2cv3U0= knative.dev/pkg v0.0.0-20220202132633-df430fa0dd96 h1:JU0DFa06CaUtwSkAY0b3j47ohEJLIYlpPPgNgbPHlAo= knative.dev/pkg v0.0.0-20220202132633-df430fa0dd96/go.mod h1:etVT7Tm8pSDf4RKhGk4r7j/hj3dNBpvT7bO6a6wpahs= diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index ec63fc03..378d4219 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -10,11 +10,26 @@ import ( "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/copy" + "github.com/containers/image/v5/docker" + "github.com/containers/image/v5/signature" + "github.com/containers/image/v5/types" "gopkg.in/resty.v1" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" ) +type syncContextUtils struct { + imageStore storage.ImageStore + policyCtx *signature.PolicyContext + localCtx *types.SystemContext + upstreamCtx *types.SystemContext + client *resty.Client + url *url.URL + upstreamAddr string + retryOptions *retry.RetryOptions + copyOptions copy.Options +} + // nolint: gochecknoglobals var demandedImgs demandedImages @@ -89,8 +104,6 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S } } - var copyErr error - localCtx, policyCtx, err := getLocalContexts(log) if err != nil { imageChannel <- err @@ -139,42 +152,50 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S upstreamAddr := StripRegistryTransport(upstreamURL) - httpClient, err := getHTTPClient(®Cfg, upstreamURL, credentialsFile[upstreamAddr], log) + httpClient, registryURL, err := getHTTPClient(®Cfg, upstreamURL, credentialsFile[upstreamAddr], log) if err != nil { imageChannel <- err return } + // it's an image + upstreamCtx := getUpstreamContext(®Cfg, credentialsFile[upstreamAddr]) + options := getCopyOptions(upstreamCtx, localCtx) + // demanded 'image' is a signature - if isCosignTag(tag) || isArtifact { + if isCosignTag(tag) { // at tis point we should already have images synced, but not their signatures. - regURL, err := url.Parse(upstreamURL) - if err != nil { - log.Error().Err(err).Msgf("couldn't parse registry URL: %s", upstreamURL) - - imageChannel <- err - - return - } - - // is notary signature - if isArtifact { - err = syncNotarySignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log) - if err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag) - - continue - } - - imageChannel <- nil - - return - } // is cosign signature - err = syncCosignSignature(httpClient, storeController, *regURL, remoteRepo, localRepo, tag, log) + cosignManifest, err := getCosignManifest(httpClient, *registryURL, remoteRepo, tag, log) if err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, localRepo, tag) + log.Error().Err(err).Msgf("couldn't get upstream image %s:%s:%s cosign manifest", upstreamURL, remoteRepo, tag) + + continue + } + + err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, tag, cosignManifest, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't copy upstream image cosign signature %s/%s:%s", upstreamURL, remoteRepo, tag) + + continue + } + + imageChannel <- nil + + return + } else if isArtifact { + // is notary signature + refs, err := getNotaryRefs(httpClient, *registryURL, remoteRepo, tag, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s/%s:%s notary references", upstreamURL, remoteRepo, tag) + + continue + } + + err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, tag, refs, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't copy image signature %s/%s:%s", upstreamURL, remoteRepo, tag) continue } @@ -184,43 +205,35 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S return } - // it's an image - upstreamCtx := getUpstreamContext(®Cfg, credentialsFile[upstreamAddr]) - options := getCopyOptions(upstreamCtx, localCtx) - - upstreamImageRef, err := getImageRef(upstreamAddr, remoteRepo, tag) - if err != nil { - log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s", - upstreamAddr, remoteRepo, tag) - - imageChannel <- err - - return + syncContextUtils := syncContextUtils{ + imageStore: imageStore, + policyCtx: policyCtx, + localCtx: localCtx, + upstreamCtx: upstreamCtx, + client: httpClient, + url: registryURL, + upstreamAddr: upstreamAddr, + retryOptions: retryOptions, + copyOptions: options, } - localImageRef, localCachePath, err := getLocalImageRef(imageStore, localRepo, tag) - if err != nil { - log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", - localCachePath, localRepo, tag) - - imageChannel <- err - - return + skipped, copyErr := syncRun(regCfg, localRepo, remoteRepo, tag, syncContextUtils, log) + if skipped { + continue } - log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath) - + // key used to check if we already have a go routine syncing this image demandedImageRef := fmt.Sprintf("%s/%s:%s", upstreamAddr, remoteRepo, tag) - _, copyErr = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) if copyErr != nil { - log.Error().Err(err).Msgf("error encountered while syncing on demand %s to %s", - upstreamImageRef.DockerReference(), localCachePath) + // don't retry in background if maxretry is 0 + if retryOptions.MaxRetry == 0 { + continue + } _, found := demandedImgs.loadOrStoreStr(demandedImageRef, "") - if found || retryOptions.MaxRetry == 0 { - defer os.RemoveAll(localCachePath) - log.Info().Msgf("image %s already demanded in background or sync.registries[].MaxRetries == 0", demandedImageRef) + if found { + log.Info().Msgf("image %s already demanded in background", demandedImageRef) /* we already have a go routine spawned for this image or retryOptions is not configured */ continue @@ -230,8 +243,6 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S go func() { // remove image after syncing defer func() { - _ = os.RemoveAll(localCachePath) - demandedImgs.delete(demandedImageRef) log.Info().Msgf("sync routine: %s exited", demandedImageRef) }() @@ -241,52 +252,105 @@ func syncOneImage(imageChannel chan error, cfg Config, storeController storage.S time.Sleep(retryOptions.Delay) if err = retry.RetryIfNecessary(context.Background(), func() error { - _, err := copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) + _, err := syncRun(regCfg, localRepo, remoteRepo, tag, syncContextUtils, log) return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("sync routine: error while copying image %s to %s", - demandedImageRef, localCachePath) - } else { - _ = finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController, - retryOptions, httpClient, log) + log.Error().Err(err).Msgf("sync routine: error while copying image %s", demandedImageRef) } }() - } else { - err := finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL, storeController, - retryOptions, httpClient, log) - - imageChannel <- err - - return } } } - imageChannel <- err + imageChannel <- nil } -// push the local image into the storage, sync signatures. -func finishSyncing(localRepo, remoteRepo, tag, localCachePath, upstreamURL string, - storeController storage.StoreController, retryOptions *retry.RetryOptions, - httpClient *resty.Client, log log.Logger) error { - err := pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log) +func syncRun(regCfg RegistryConfig, localRepo, remoteRepo, tag string, utils syncContextUtils, + log log.Logger) (bool, error) { + upstreamImageRef, err := getImageRef(utils.upstreamAddr, remoteRepo, tag) + if err != nil { + log.Error().Err(err).Msgf("error creating docker reference for repository %s/%s:%s", + utils.upstreamAddr, remoteRepo, tag) + + return false, err + } + + upstreamImageDigest, err := docker.GetDigest(context.Background(), utils.upstreamCtx, upstreamImageRef) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference()) + + return false, err + } + + // get upstream signatures + cosignManifest, err := getCosignManifest(utils.client, *utils.url, remoteRepo, + upstreamImageDigest.String(), log) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s cosign manifest", upstreamImageRef.DockerReference()) + } + + refs, err := getNotaryRefs(utils.client, *utils.url, remoteRepo, upstreamImageDigest.String(), log) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference()) + } + + // check if upstream image is signed + if cosignManifest == nil && len(refs.References) == 0 { + // upstream image not signed + if regCfg.OnlySigned != nil && *regCfg.OnlySigned { + // skip unsigned images + log.Info().Msgf("skipping image without signature %s", upstreamImageRef.DockerReference()) + + return true, nil + } + } + + localImageRef, localCachePath, err := getLocalImageRef(utils.imageStore, localRepo, tag) + if err != nil { + log.Error().Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s", + localCachePath, localRepo, tag) + + return false, err + } + + defer os.RemoveAll(localCachePath) + + log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath) + + _, err = copy.Image(context.Background(), utils.policyCtx, localImageRef, upstreamImageRef, &utils.copyOptions) + if err != nil { + log.Error().Err(err).Msgf("error encountered while syncing on demand %s to %s", + upstreamImageRef.DockerReference(), localCachePath) + + return false, err + } + + err = pushSyncedLocalImage(localRepo, tag, localCachePath, utils.imageStore, log) if err != nil { log.Error().Err(err).Msgf("error while pushing synced cached image %s", fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag)) - return err + return false, err } - if err = retry.RetryIfNecessary(context.Background(), func() error { - err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log) + err = syncCosignSignature(utils.client, utils.imageStore, *utils.url, localRepo, remoteRepo, + upstreamImageDigest.String(), cosignManifest, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't copy image cosign signature %s/%s:%s", utils.upstreamAddr, remoteRepo, tag) - return err - }, retryOptions); err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature for %s/%s:%s", upstreamURL, remoteRepo, tag) + return false, err } - log.Info().Msgf("successfully synced %s/%s:%s", upstreamURL, remoteRepo, tag) + err = syncNotarySignature(utils.client, utils.imageStore, *utils.url, localRepo, remoteRepo, + upstreamImageDigest.String(), refs, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't copy image notary signature %s/%s:%s", utils.upstreamAddr, remoteRepo, tag) - return nil + return false, err + } + + log.Info().Msgf("successfully synced %s/%s:%s", utils.upstreamAddr, remoteRepo, tag) + + return false, nil } diff --git a/pkg/extensions/sync/signatures.go b/pkg/extensions/sync/signatures.go new file mode 100644 index 00000000..1742635e --- /dev/null +++ b/pkg/extensions/sync/signatures.go @@ -0,0 +1,352 @@ +package sync + +import ( + "encoding/json" + "errors" + "net/http" + "net/url" + "path" + "strings" + + notreg "github.com/notaryproject/notation/pkg/registry" + ispec "github.com/opencontainers/image-spec/specs-go/v1" + artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" + "github.com/sigstore/cosign/pkg/cosign" + "gopkg.in/resty.v1" + zerr "zotregistry.io/zot/errors" + "zotregistry.io/zot/pkg/log" + "zotregistry.io/zot/pkg/storage" +) + +func getCosignManifest(client *resty.Client, regURL url.URL, repo, digest string, + log log.Logger) (*ispec.Manifest, error) { + var m ispec.Manifest + + cosignTag := getCosignTagFromImageDigest(digest) + + getCosignManifestURL := regURL + + getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", repo, "manifests", cosignTag) + + getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode() + + resp, err := client.R().Get(getCosignManifestURL.String()) + if err != nil { + log.Error().Err(err).Str("url", getCosignManifestURL.String()). + Msgf("couldn't get cosign manifest: %s", cosignTag) + + return nil, err + } + + if resp.StatusCode() == http.StatusNotFound { + log.Info().Msgf("couldn't find any cosign signature from %s, status code: %d skipping", + getCosignManifestURL.String(), resp.StatusCode()) + + return nil, zerr.ErrSyncSignatureNotFound + } else if resp.IsError() { + log.Error().Err(zerr.ErrSyncSignature).Msgf("couldn't get cosign signature from %s, status code: %d skipping", + getCosignManifestURL.String(), resp.StatusCode()) + + return nil, zerr.ErrSyncSignature + } + + err = json.Unmarshal(resp.Body(), &m) + if err != nil { + log.Error().Err(err).Str("url", getCosignManifestURL.String()). + Msgf("couldn't unmarshal cosign manifest %s", cosignTag) + + return nil, err + } + + return &m, nil +} + +func getNotaryRefs(client *resty.Client, regURL url.URL, repo, digest string, log log.Logger) (ReferenceList, error) { + var referrers ReferenceList + + getReferrersURL := regURL + + // based on manifest digest get referrers + getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", repo, "manifests", digest, "referrers") + getReferrersURL.RawQuery = getReferrersURL.Query().Encode() + + resp, err := client.R(). + SetHeader("Content-Type", "application/json"). + SetQueryParam("artifactType", notreg.ArtifactTypeNotation). + Get(getReferrersURL.String()) + if err != nil { + log.Error().Err(err).Str("url", getReferrersURL.String()).Msg("couldn't get referrers") + + return referrers, err + } + + if resp.StatusCode() == http.StatusNotFound || resp.StatusCode() == http.StatusBadRequest { + log.Info().Msgf("couldn't find any notary signature from %s, status code: %d, skipping", + getReferrersURL.String(), resp.StatusCode()) + + return ReferenceList{}, zerr.ErrSyncSignatureNotFound + } else if resp.IsError() { + log.Error().Err(zerr.ErrSyncSignature).Msgf("couldn't get notary signature from %s, status code: %d skipping", + getReferrersURL.String(), resp.StatusCode()) + + return ReferenceList{}, zerr.ErrSyncSignature + } + + err = json.Unmarshal(resp.Body(), &referrers) + if err != nil { + log.Error().Err(err).Str("url", getReferrersURL.String()). + Msgf("couldn't unmarshal notary signature") + + return referrers, err + } + + return referrers, nil +} + +func syncCosignSignature(client *resty.Client, imageStore storage.ImageStore, + regURL url.URL, localRepo, remoteRepo, digest string, cosignManifest *ispec.Manifest, log log.Logger, +) error { + cosignTag := getCosignTagFromImageDigest(digest) + + // if no manifest found + if cosignManifest == nil { + return nil + } + + log.Info().Msg("syncing cosign signatures") + + for _, blob := range cosignManifest.Layers { + // get blob + getBlobURL := regURL + getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String()) + getBlobURL.RawQuery = getBlobURL.Query().Encode() + + resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) + if err != nil { + log.Error().Err(err).Msgf("couldn't get cosign blob: %s", blob.Digest.String()) + + return err + } + + if resp.IsError() { + log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) + + return zerr.ErrSyncSignature + } + + defer resp.RawBody().Close() + + // push blob + _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String()) + if err != nil { + log.Error().Err(err).Msg("couldn't upload cosign blob") + + return err + } + } + + // get config blob + getBlobURL := regURL + getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", cosignManifest.Config.Digest.String()) + getBlobURL.RawQuery = getBlobURL.Query().Encode() + + resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) + if err != nil { + log.Error().Err(err).Msgf("couldn't get cosign config blob: %s", getBlobURL.String()) + + return err + } + + if resp.IsError() { + log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) + + return zerr.ErrSyncSignature + } + + defer resp.RawBody().Close() + + // push config blob + _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), cosignManifest.Config.Digest.String()) + if err != nil { + log.Error().Err(err).Msg("couldn't upload cosign config blob") + + return err + } + + cosignManifestBuf, err := json.Marshal(cosignManifest) + if err != nil { + log.Error().Err(err).Msg("couldn't marshal cosign manifest") + } + + // push manifest + _, err = imageStore.PutImageManifest(localRepo, cosignTag, ispec.MediaTypeImageManifest, cosignManifestBuf) + if err != nil { + log.Error().Err(err).Msg("couldn't upload cosign manifest") + + return err + } + + log.Info().Msgf("successfully synced cosign signature for repo %s digest %s", localRepo, digest) + + return nil +} + +func syncNotarySignature(client *resty.Client, imageStore storage.ImageStore, + regURL url.URL, localRepo, remoteRepo, digest string, referrers ReferenceList, log log.Logger) error { + if len(referrers.References) == 0 { + return nil + } + + log.Info().Msg("syncing notary signatures") + + for _, ref := range referrers.References { + // get referrer manifest + getRefManifestURL := regURL + getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", remoteRepo, "manifests", ref.Digest.String()) + getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode() + + resp, err := client.R(). + Get(getRefManifestURL.String()) + if err != nil { + log.Error().Err(err).Msgf("couldn't get notary manifest: %s", getRefManifestURL.String()) + + return err + } + + // read manifest + var m artifactspec.Manifest + + err = json.Unmarshal(resp.Body(), &m) + if err != nil { + log.Error().Err(err).Msgf("couldn't unmarshal notary manifest: %s", getRefManifestURL.String()) + + return err + } + + for _, blob := range m.Blobs { + getBlobURL := regURL + getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String()) + getBlobURL.RawQuery = getBlobURL.Query().Encode() + + resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) + if err != nil { + log.Error().Err(err).Msgf("couldn't get notary blob: %s", getBlobURL.String()) + + return err + } + + defer resp.RawBody().Close() + + if resp.IsError() { + log.Info().Msgf("couldn't find notary blob from %s, status code: %d", + getBlobURL.String(), resp.StatusCode()) + + return zerr.ErrSyncSignature + } + + _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String()) + if err != nil { + log.Error().Err(err).Msg("couldn't upload notary sig blob") + + return err + } + } + + _, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(), + artifactspec.MediaTypeArtifactManifest, resp.Body()) + if err != nil { + log.Error().Err(err).Msg("couldn't upload notary sig manifest") + + return err + } + } + + log.Info().Msgf("successfully synced notary signature for repo %s digest %s", localRepo, digest) + + return nil +} + +func canSkipNotarySignature(repo, tag, digest string, refs ReferenceList, imageStore storage.ImageStore, + log log.Logger) (bool, error) { + // check notary signature already synced + if len(refs.References) > 0 { + localRefs, err := imageStore.GetReferrers(repo, digest, notreg.ArtifactTypeNotation) + if err != nil { + if errors.Is(err, zerr.ErrManifestNotFound) { + return false, nil + } + + log.Error().Err(err).Msgf("couldn't get local notary signature %s:%s manifest", repo, tag) + + return false, err + } + + if !artifactDescriptorsEqual(localRefs, refs.References) { + log.Info().Msgf("upstream notary signatures %s:%s changed, syncing again", repo, tag) + + return false, nil + } + } + + log.Info().Msgf("skipping notary signature %s:%s, already synced", repo, tag) + + return true, nil +} + +func canSkipCosignSignature(repo, tag, digest string, cosignManifest *ispec.Manifest, imageStore storage.ImageStore, + log log.Logger) (bool, error) { + // check cosign signature already synced + if cosignManifest != nil { + var localCosignManifest ispec.Manifest + + /* we need to use tag (cosign format: sha256-$IMAGE_TAG.sig) instead of digest to get local cosign manifest + because of an issue where cosign digests differs between upstream and downstream */ + cosignManifestTag := getCosignTagFromImageDigest(digest) + + localCosignManifestBuf, _, _, err := imageStore.GetImageManifest(repo, cosignManifestTag) + if err != nil { + if errors.Is(err, zerr.ErrManifestNotFound) { + return false, nil + } + + log.Error().Err(err).Msgf("couldn't get local cosign %s:%s manifest", repo, tag) + + return false, err + } + + err = json.Unmarshal(localCosignManifestBuf, &localCosignManifest) + if err != nil { + log.Error().Err(err).Msgf("couldn't unmarshal local cosign signature %s:%s manifest", repo, tag) + + return false, err + } + + if !manifestsEqual(localCosignManifest, *cosignManifest) { + log.Info().Msgf("upstream cosign signatures %s:%s changed, syncing again", repo, tag) + + return false, nil + } + } + + log.Info().Msgf("skipping cosign signature %s:%s, already synced", repo, tag) + + return true, nil +} + +// sync feature will try to pull cosign signature because for sync cosign signature is just an image +// this function will check if tag is a cosign tag. +func isCosignTag(tag string) bool { + if strings.HasPrefix(tag, "sha256-") && strings.HasSuffix(tag, cosign.SignatureTagSuffix) { + return true + } + + return false +} + +func getCosignTagFromImageDigest(digest string) string { + if !isCosignTag(digest) { + return strings.Replace(digest, ":", "-", 1) + cosign.SignatureTagSuffix + } + + return digest +} diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 930cae1a..1c24f0fa 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -3,6 +3,7 @@ package sync import ( "context" "encoding/json" + "errors" "fmt" "io" "os" @@ -19,7 +20,7 @@ import ( "github.com/containers/image/v5/types" ispec "github.com/opencontainers/image-spec/specs-go/v1" "gopkg.in/resty.v1" - "zotregistry.io/zot/errors" + zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" "zotregistry.io/zot/pkg/test" @@ -57,6 +58,7 @@ type RegistryConfig struct { CertDir string MaxRetries *int RetryDelay *time.Duration + OnlySigned *bool } type Content struct { @@ -88,7 +90,7 @@ func getUpstreamCatalog(client *resty.Client, upstreamURL string, log log.Logger log.Error().Msgf("couldn't query %s, status code: %d, body: %s", registryCatalogURL, resp.StatusCode(), resp.Body()) - return c, errors.ErrSyncMissingCatalog + return c, zerr.ErrSyncMissingCatalog } err = json.Unmarshal(resp.Body(), &c) @@ -283,7 +285,8 @@ func getUpstreamContext(regCfg *RegistryConfig, credentials Credentials) *types. func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string, storeController storage.StoreController, localCtx *types.SystemContext, - policyCtx *signature.PolicyContext, credentials Credentials, log log.Logger) error { + policyCtx *signature.PolicyContext, credentials Credentials, + retryOptions *retry.RetryOptions, log log.Logger) error { log.Info().Msgf("syncing registry: %s", upstreamURL) var err error @@ -293,22 +296,13 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string upstreamCtx := getUpstreamContext(®Cfg, credentials) options := getCopyOptions(upstreamCtx, localCtx) - retryOptions := &retry.RetryOptions{} - - if regCfg.MaxRetries != nil { - retryOptions.MaxRetry = *regCfg.MaxRetries - if regCfg.RetryDelay != nil { - retryOptions.Delay = *regCfg.RetryDelay - } - } - - var catalog catalog - - httpClient, err := getHTTPClient(®Cfg, upstreamURL, credentials, log) + httpClient, registryURL, err := getHTTPClient(®Cfg, upstreamURL, credentials, log) if err != nil { return err } + var catalog catalog + if err = retry.RetryIfNecessary(ctx, func() error { catalog, err = getUpstreamCatalog(httpClient, upstreamURL, log) @@ -356,28 +350,105 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string } } - if len(images) == 0 { - log.Info().Msg("no images to copy, no need to sync") + for _, image := range images { + select { + case <-ctx.Done(): + return ctx.Err() + default: + break + } - return nil - } - - for _, ref := range images { - upstreamImageRef := ref.ref + upstreamImageRef := image.ref remoteRepo := getRepoFromRef(upstreamImageRef, upstreamAddr) - localRepo := getRepoDestination(remoteRepo, ref.content) + localRepo := getRepoDestination(remoteRepo, image.content) + + upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamImageRef) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamImageRef.DockerReference()) + + return err + } + tag := getTagFromRef(upstreamImageRef, log).Tag() imageStore := storeController.GetImageStore(localRepo) - canBeSkipped, err := canSkipImage(ctx, localRepo, tag, upstreamImageRef, imageStore, upstreamCtx, log) + // get upstream signatures + cosignManifest, err := getCosignManifest(httpClient, *registryURL, remoteRepo, + upstreamImageDigest.String(), log) + if err != nil && !errors.Is(err, zerr.ErrSyncSignatureNotFound) { + log.Error().Err(err).Msgf("couldn't get upstream image %s cosign manifest", upstreamImageRef.DockerReference()) + + return err + } + + refs, err := getNotaryRefs(httpClient, *registryURL, remoteRepo, upstreamImageDigest.String(), log) + if err != nil && !errors.Is(err, zerr.ErrSyncSignatureNotFound) { + log.Error().Err(err).Msgf("couldn't get upstream image %s notary references", upstreamImageRef.DockerReference()) + + return err + } + + // check if upstream image is signed + if cosignManifest == nil && len(refs.References) == 0 { + // upstream image not signed + if regCfg.OnlySigned != nil && *regCfg.OnlySigned { + // skip unsigned images + log.Info().Msgf("skipping image without signature %s", upstreamImageRef.DockerReference()) + + continue + } + } + + skipImage, err := canSkipImage(localRepo, tag, upstreamImageDigest.String(), imageStore, log) if err != nil { log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped", upstreamImageRef.DockerReference()) + + return err } - if canBeSkipped { + // sync only differences + if skipImage { + log.Info().Msgf("already synced image %s, checking its signatures", upstreamImageRef.DockerReference()) + + skipNotarySig, err := canSkipNotarySignature(localRepo, tag, upstreamImageDigest.String(), + refs, imageStore, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't check if the upstream image %s notary signature can be skipped", + upstreamImageRef.DockerReference()) + } + + if !skipNotarySig { + if err = retry.RetryIfNecessary(ctx, func() error { + err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, + upstreamImageDigest.String(), refs, log) + + return err + }, retryOptions); err != nil { + log.Error().Err(err).Msgf("couldn't copy notary signature for %s", upstreamImageRef.DockerReference()) + } + } + + skipCosignSig, err := canSkipCosignSignature(localRepo, tag, upstreamImageDigest.String(), + cosignManifest, imageStore, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't check if the upstream image %s cosign signature can be skipped", + upstreamImageRef.DockerReference()) + } + + if !skipCosignSig { + if err = retry.RetryIfNecessary(ctx, func() error { + err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, + upstreamImageDigest.String(), cosignManifest, log) + + return err + }, retryOptions); err != nil { + log.Error().Err(err).Msgf("couldn't copy cosign signature for %s", upstreamImageRef.DockerReference()) + } + } + continue } @@ -404,7 +475,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string return err } - err = pushSyncedLocalImage(localRepo, tag, localCachePath, storeController, log) + err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log) if err != nil { log.Error().Err(err).Msgf("error while pushing synced cached image %s", fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag)) @@ -413,11 +484,21 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig, upstreamURL string } if err = retry.RetryIfNecessary(ctx, func() error { - err = syncSignatures(httpClient, storeController, upstreamURL, remoteRepo, localRepo, tag, log) + err = syncNotarySignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, upstreamImageDigest.String(), + refs, log) return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature %s", upstreamImageRef.DockerReference()) + log.Error().Err(err).Msgf("couldn't copy notary signature for %s", upstreamImageRef.DockerReference()) + } + + if err = retry.RetryIfNecessary(ctx, func() error { + err = syncCosignSignature(httpClient, imageStore, *registryURL, localRepo, remoteRepo, upstreamImageDigest.String(), + cosignManifest, log) + + return err + }, retryOptions); err != nil { + log.Error().Err(err).Msgf("couldn't copy cosign signature for %s", upstreamImageRef.DockerReference()) } } @@ -489,7 +570,16 @@ func Run(ctx context.Context, cfg Config, storeController storage.StoreControlle ticker := time.NewTicker(regCfg.PollInterval) // fork a new zerolog child to avoid data race - tlogger := log.Logger{Logger: logger.With().Caller().Timestamp().Logger()} + tlogger := log.Logger{Logger: logger.Logger} + + retryOptions := &retry.RetryOptions{} + + if regCfg.MaxRetries != nil { + retryOptions.MaxRetry = *regCfg.MaxRetries + if regCfg.RetryDelay != nil { + retryOptions.Delay = *regCfg.RetryDelay + } + } // schedule each registry sync go func(ctx context.Context, regCfg RegistryConfig, logger log.Logger) { @@ -501,7 +591,7 @@ func Run(ctx context.Context, cfg Config, storeController storage.StoreControlle upstreamAddr := StripRegistryTransport(upstreamURL) // first try syncing main registry if err := syncRegistry(ctx, regCfg, upstreamURL, storeController, localCtx, policyCtx, - credentialsFile[upstreamAddr], logger); err != nil { + credentialsFile[upstreamAddr], retryOptions, logger); err != nil { logger.Error().Err(err).Str("registry", upstreamURL). Msg("sync exited with error, falling back to auxiliary registries if any") } else { diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index e9c79375..a36b784e 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -17,6 +17,7 @@ import ( "github.com/containers/image/v5/types" godigest "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" + artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" "github.com/rs/zerolog" . "github.com/smartystreets/goconvey/convey" "gopkg.in/resty.v1" @@ -190,9 +191,10 @@ func TestSyncInternal(t *testing.T) { port := test.GetFreePort() baseURL := test.GetBaseURL(port) - httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) + httpClient, registryURL, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) So(httpClient, ShouldBeNil) + So(registryURL, ShouldBeNil) // _, err = getUpstreamCatalog(httpClient, baseURL, log.NewLogger("debug", "")) // So(err, ShouldNotBeNil) }) @@ -222,21 +224,24 @@ func TestSyncInternal(t *testing.T) { CertDir: badCertsDir, } - httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) + httpClient, _, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) So(httpClient, ShouldBeNil) syncRegistryConfig.CertDir = "/path/to/invalid/cert" - httpClient, err = getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) + + httpClient, _, err = getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) So(httpClient, ShouldBeNil) syncRegistryConfig.CertDir = "" syncRegistryConfig.URLs = []string{baseSecureURL} - httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", "")) + httpClient, registryURL, err := getHTTPClient(&syncRegistryConfig, baseSecureURL, + Credentials{}, log.NewLogger("debug", "")) So(err, ShouldBeNil) So(httpClient, ShouldNotBeNil) + So(registryURL.String(), ShouldEqual, baseSecureURL) _, err = getUpstreamCatalog(httpClient, baseURL, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) @@ -245,7 +250,12 @@ func TestSyncInternal(t *testing.T) { So(err, ShouldNotBeNil) syncRegistryConfig.URLs = []string{test.BaseURL} - httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", "")) + httpClient, _, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", "")) + So(err, ShouldNotBeNil) + So(httpClient, ShouldBeNil) + + syncRegistryConfig.URLs = []string{"%"} + httpClient, _, err = getHTTPClient(&syncRegistryConfig, "%", Credentials{}, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) So(httpClient, ShouldBeNil) }) @@ -263,23 +273,37 @@ func TestSyncInternal(t *testing.T) { So(err, ShouldNotBeNil) }) - // Convey("Test OneImage() skips cosign signatures", t, func() { - // err := OneImage(Config{}, storage.StoreController{}, "repo", "sha256-.sig", log.NewLogger("", "")) - // So(err, ShouldBeNil) - // }) - - Convey("Test syncSignatures()", t, func() { + Convey("Test signatures", t, func() { log := log.NewLogger("debug", "") - err := syncSignatures(resty.New(), storage.StoreController{}, "%", "repo", "repo", "tag", log) + + client := resty.New() + + regURL, err := url.Parse("http://zot") + So(err, ShouldBeNil) + So(regURL, ShouldNotBeNil) + + ref := artifactspec.Descriptor{ + Digest: "fakeDigest", + } + + desc := ispec.Descriptor{ + Digest: "fakeDigest", + } + + manifest := ispec.Manifest{ + Layers: []ispec.Descriptor{desc}, + } + + err = syncCosignSignature(client, &storage.ImageStoreFS{}, *regURL, testImage, testImage, + testImageTag, &ispec.Manifest{}, log) So(err, ShouldNotBeNil) - err = syncSignatures(resty.New(), storage.StoreController{}, "http://zot", "repo", "repo", "tag", log) + + err = syncCosignSignature(client, &storage.ImageStoreFS{}, *regURL, testImage, testImage, + testImageTag, &manifest, log) So(err, ShouldNotBeNil) - err = syncSignatures(resty.New(), storage.StoreController{}, "https://google.com", "repo", "repo", "tag", log) - So(err, ShouldNotBeNil) - url, _ := url.Parse("invalid") - err = syncCosignSignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log) - So(err, ShouldNotBeNil) - err = syncNotarySignature(resty.New(), storage.StoreController{}, *url, "repo", "repo", "tag", log) + + err = syncNotarySignature(client, &storage.ImageStoreFS{}, *regURL, testImage, testImage, + "invalidDigest", ReferenceList{[]artifactspec.Descriptor{ref}}, log) So(err, ShouldNotBeNil) }) @@ -296,32 +320,49 @@ func TestSyncInternal(t *testing.T) { imageStore := storage.NewImageStore(storageDir, false, storage.DefaultGCDelay, false, false, log, metrics) - repoRefStr := fmt.Sprintf("%s/%s", host, testImage) - repoRef, err := parseRepositoryReference(repoRefStr) - So(err, ShouldBeNil) - So(repoRef, ShouldNotBeNil) + refs := ReferenceList{[]artifactspec.Descriptor{ + { + Digest: "fakeDigest", + }, + }} - taggedRef, err := reference.WithTag(repoRef, testImageTag) + err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000) So(err, ShouldBeNil) - So(taggedRef, ShouldNotBeNil) - upstreamRef, err := docker.NewReference(taggedRef) - So(err, ShouldBeNil) - So(taggedRef, ShouldNotBeNil) - - canBeSkipped, err := canSkipImage(context.Background(), testImage, testImageTag, upstreamRef, - imageStore, &types.SystemContext{}, log) + canBeSkipped, err := canSkipImage(testImage, testImageTag, "fakeDigest", imageStore, log) So(err, ShouldNotBeNil) So(canBeSkipped, ShouldBeFalse) + err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o755) + So(err, ShouldBeNil) + + _, testImageManifestDigest, _, err := imageStore.GetImageManifest(testImage, testImageTag) + So(err, ShouldBeNil) + So(testImageManifestDigest, ShouldNotBeEmpty) + + canBeSkipped, err = canSkipNotarySignature(testImage, testImageTag, + testImageManifestDigest, refs, imageStore, log) + So(err, ShouldBeNil) + So(canBeSkipped, ShouldBeFalse) + err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000) - if err != nil { - panic(err) + So(err, ShouldBeNil) + + canBeSkipped, err = canSkipNotarySignature(testImage, testImageTag, + testImageManifestDigest, refs, imageStore, log) + So(err, ShouldNotBeNil) + So(canBeSkipped, ShouldBeFalse) + + cosignManifest := ispec.Manifest{ + Layers: []ispec.Descriptor{{Digest: "fakeDigest"}}, } - canBeSkipped, err = canSkipImage(context.Background(), testImage, testImageTag, upstreamRef, - imageStore, &types.SystemContext{}, log) - So(err, ShouldNotBeNil) + err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o755) + So(err, ShouldBeNil) + + canBeSkipped, err = canSkipCosignSignature(testImage, testImageTag, + testImageManifestDigest, &cosignManifest, imageStore, log) + So(err, ShouldBeNil) So(canBeSkipped, ShouldBeFalse) }) @@ -367,7 +408,7 @@ func TestSyncInternal(t *testing.T) { testRootDir := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir) // testImagePath := path.Join(testRootDir, testImage) - err := pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + err := pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log) So(err, ShouldNotBeNil) err = os.MkdirAll(testRootDir, 0o755) @@ -396,7 +437,7 @@ func TestSyncInternal(t *testing.T) { if os.Geteuid() != 0 { So(func() { - _ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + _ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log) }, ShouldPanic) } @@ -409,7 +450,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log) So(err, ShouldNotBeNil) if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256", @@ -423,7 +464,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log) So(err, ShouldNotBeNil) if err := os.Chmod(cachedManifestConfigPath, 0o755); err != nil { @@ -435,7 +476,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log) So(err, ShouldNotBeNil) if err := os.Remove(manifestConfigPath); err != nil { @@ -449,7 +490,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, storeController, log) + err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log) So(err, ShouldNotBeNil) }) } diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index e78a87ac..7d859d88 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -23,6 +23,7 @@ import ( "time" "github.com/notaryproject/notation-go-lib" + notreg "github.com/notaryproject/notation/pkg/registry" godigest "github.com/opencontainers/go-digest" ispec "github.com/opencontainers/image-spec/specs-go/v1" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" @@ -31,6 +32,7 @@ import ( "github.com/sigstore/cosign/cmd/cosign/cli/options" "github.com/sigstore/cosign/cmd/cosign/cli/sign" "github.com/sigstore/cosign/cmd/cosign/cli/verify" + "github.com/sigstore/cosign/pkg/cosign" . "github.com/smartystreets/goconvey/convey" "gopkg.in/resty.v1" "zotregistry.io/zot/pkg/api" @@ -310,7 +312,7 @@ func TestOnDemand(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 404) - err = os.Chmod(path.Join(destDir, testImage), 0o000) + err = os.MkdirAll(path.Join(destDir, testImage), 0o000) if err != nil { panic(err) } @@ -328,7 +330,7 @@ func TestOnDemand(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 404) - err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000) + err = os.MkdirAll(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000) if err != nil { panic(err) } @@ -614,6 +616,8 @@ func TestOnDemandPermsDenied(t *testing.T) { } }() + test.WaitTillServerReady(destBaseURL) + syncSubDir := path.Join(destDir, testImage, sync.SyncBlobUploadDir) err := os.MkdirAll(syncSubDir, 0o755) @@ -622,16 +626,6 @@ func TestOnDemandPermsDenied(t *testing.T) { err = os.Chmod(syncSubDir, 0o000) So(err, ShouldBeNil) - // wait till ready - for { - _, err := resty.R().Get(destBaseURL) - if err == nil { - break - } - - time.Sleep(100 * time.Millisecond) - } - resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 404) @@ -806,7 +800,7 @@ func TestBadTLS(t *testing.T) { }() // give it time to set up sync - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) resp, _ := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + "invalid") So(resp, ShouldNotBeNil) @@ -1070,16 +1064,9 @@ func TestBasicAuth(t *testing.T) { dctlr.Shutdown() }() - // wait till ready - for { - _, err := resty.R().Get(destBaseURL) - if err == nil { - break - } - time.Sleep(100 * time.Millisecond) - } + test.WaitTillServerReady(destBaseURL) - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) So(err, ShouldBeNil) @@ -1140,7 +1127,7 @@ func TestBasicAuth(t *testing.T) { dctlr.Shutdown() }() - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) So(err, ShouldBeNil) @@ -1263,7 +1250,7 @@ func TestBadURL(t *testing.T) { }, }, }, - URLs: []string{"bad-registry-url]"}, + URLs: []string{"bad-registry-url]", "%"}, PollInterval: updateDuration, TLSVerify: &tlsVerify, CertDir: "", @@ -1656,7 +1643,6 @@ func TestSubPaths(t *testing.T) { srcPort := test.GetFreePort() srcConfig := config.New() - client := resty.New() srcBaseURL := test.GetBaseURL(srcPort) srcConfig.HTTP.Port = srcPort @@ -1681,15 +1667,7 @@ func TestSubPaths(t *testing.T) { } }() - // wait till ready - for { - _, err := client.R().Get(srcBaseURL) - if err == nil { - break - } - - time.Sleep(100 * time.Millisecond) - } + test.WaitTillServerReady(srcBaseURL) defer func() { sctlr.Shutdown() @@ -1756,15 +1734,7 @@ func TestSubPaths(t *testing.T) { } }() - // wait till ready - for { - _, err := resty.R().Get(destBaseURL) - if err == nil { - break - } - - time.Sleep(100 * time.Millisecond) - } + test.WaitTillServerReady(destBaseURL) defer func() { dctlr.Shutdown() @@ -2065,10 +2035,11 @@ func TestMultipleURLs(t *testing.T) { Registries: []sync.RegistryConfig{syncRegistryConfig}, } - dc, destBaseURL, _, destClient := startDownstreamServer(t, false, syncConfig) + dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) defer func() { - dc.Shutdown() + dctlr.Shutdown() }() var srcTagsList TagsList @@ -2105,7 +2076,175 @@ func TestMultipleURLs(t *testing.T) { }) } -func TestPeriodicallySignatures(t *testing.T) { +func TestPeriodicallySignaturesErr(t *testing.T) { + Convey("Verify sync periodically signatures errors", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(t, false, false) + + defer func() { + sctlr.Shutdown() + }() + + // create repo, push and sign it + repoName := testSignedImage + var digest godigest.Digest + So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) + + splittedURL := strings.SplitAfter(srcBaseURL, ":") + srcPort := splittedURL[len(splittedURL)-1] + + cwd, err := os.Getwd() + So(err, ShouldBeNil) + + defer func() { _ = os.Chdir(cwd) }() + tdir := t.TempDir() + _ = os.Chdir(tdir) + generateKeyPairs(tdir) + + So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) + + regex := ".*" + var semver bool + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: repoName, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + // trigger permission denied on upstream manifest + var srcIndex ispec.Index + + srcBuf, err := ioutil.ReadFile(path.Join(srcDir, repoName, "index.json")) + if err != nil { + panic(err) + } + + if err := json.Unmarshal(srcBuf, &srcIndex); err != nil { + panic(err) + } + + imageManifestDigest := srcIndex.Manifests[0].Digest + + Convey("Trigger error on image manifest", func() { + // trigger permission denied on image manifest + manifestPath := path.Join(srcDir, repoName, "blobs", + string(imageManifestDigest.Algorithm()), imageManifestDigest.Hex()) + err = os.Chmod(manifestPath, 0o000) + So(err, ShouldBeNil) + + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + defer dctlr.Shutdown() + + time.Sleep(2 * time.Second) + + // should not be synced nor sync on demand + resp, err := resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + + Convey("Trigger error on cosign signature", func() { + // trigger permission error on cosign signature on upstream + cosignTag := string(imageManifestDigest.Algorithm()) + "-" + imageManifestDigest.Hex() + cosign.SignatureTagSuffix + + getCosignManifestURL := srcBaseURL + path.Join("/v2", repoName, "manifests", cosignTag) + mResp, err := resty.R().Get(getCosignManifestURL) + So(err, ShouldBeNil) + + var cm ispec.Manifest + + err = json.Unmarshal(mResp.Body(), &cm) + So(err, ShouldBeNil) + + for _, blob := range cm.Layers { + blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) + err := os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + } + + // start downstream server + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + defer dctlr.Shutdown() + + time.Sleep(2 * time.Second) + + // should not be synced nor sync on demand + resp, err := resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/" + cosignTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + + Convey("Trigger error on notary signature", func() { + // trigger permission error on cosign signature on upstream + notaryURLPath := path.Join("/oras/artifacts/v1/", repoName, "manifests", imageManifestDigest.String(), "referrers") + + // based on image manifest digest get referrers + resp, err := resty.R(). + SetHeader("Content-Type", "application/json"). + SetQueryParam("artifactType", "application/vnd.cncf.notary.v2.signature"). + Get(srcBaseURL + notaryURLPath) + + So(err, ShouldBeNil) + So(resp, ShouldNotBeEmpty) + + var referrers ReferenceList + + err = json.Unmarshal(resp.Body(), &referrers) + So(err, ShouldBeNil) + + // read manifest + var nm artifactspec.Manifest + for _, ref := range referrers.References { + refPath := path.Join(srcDir, repoName, "blobs", string(ref.Digest.Algorithm()), ref.Digest.Hex()) + body, err := ioutil.ReadFile(refPath) + So(err, ShouldBeNil) + + err = json.Unmarshal(body, &nm) + So(err, ShouldBeNil) + + // triggers perm denied on sig blobs + for _, blob := range nm.Blobs { + blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) + err := os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + } + } + + // start downstream server + dctlr, destBaseURL, _, _ := startDownstreamServer(t, false, syncConfig) + defer dctlr.Shutdown() + + time.Sleep(2 * time.Second) + + // should not be synced nor sync on demand + resp, err = resty.R().Get(destBaseURL + notaryURLPath) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 400) + }) + }) +} + +func TestSignatures(t *testing.T) { Convey("Verify sync signatures", t, func() { updateDuration, _ := time.ParseDuration("30m") @@ -2215,6 +2354,7 @@ func TestPeriodicallySignatures(t *testing.T) { KeyRef: path.Join(tdir, "cosign.pub"), Annotations: amap, } + err = vrfy.Exec(context.TODO(), []string{fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0")}) So(err, ShouldBeNil) @@ -2237,7 +2377,10 @@ func TestPeriodicallySignatures(t *testing.T) { err = json.Unmarshal(resp.Body(), &referrers) So(err, ShouldBeNil) - // read manifest + // remove already synced image + err = os.RemoveAll(path.Join(destDir, repoName)) + So(err, ShouldBeNil) + var nm artifactspec.Manifest for _, ref := range referrers.References { refPath := path.Join(srcDir, repoName, "blobs", string(ref.Digest.Algorithm()), ref.Digest.Hex()) @@ -2247,14 +2390,46 @@ func TestPeriodicallySignatures(t *testing.T) { err = json.Unmarshal(body, &nm) So(err, ShouldBeNil) - // triggers perm denied on sig blobs + // triggers perm denied on notary sig blobs on downstream for _, blob := range nm.Blobs { - blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - err := os.Chmod(blobPath, 0o000) + blobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) + err := os.MkdirAll(blobPath, 0o755) + So(err, ShouldBeNil) + err = os.Chmod(blobPath, 0o000) So(err, ShouldBeNil) } } + // sync on demand + resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + // remove already synced image + err = os.RemoveAll(path.Join(destDir, repoName)) + So(err, ShouldBeNil) + + // triggers perm denied on notary manifest on downstream + for _, ref := range referrers.References { + refPath := path.Join(destDir, repoName, "blobs", string(ref.Digest.Algorithm()), ref.Digest.Hex()) + err := os.MkdirAll(refPath, 0o755) + So(err, ShouldBeNil) + err = os.Chmod(refPath, 0o000) + So(err, ShouldBeNil) + } + + // sync on demand + resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + // triggers perm denied on sig blobs + for _, blob := range nm.Blobs { + blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) + err := os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + } + // remove already synced image err = os.RemoveAll(path.Join(destDir, repoName)) So(err, ShouldBeNil) @@ -2264,31 +2439,6 @@ func TestPeriodicallySignatures(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 200) - for _, blob := range nm.Blobs { - srcBlobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - err := os.Chmod(srcBlobPath, 0o755) - So(err, ShouldBeNil) - - destBlobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - _ = os.Remove(destBlobPath) - err = os.MkdirAll(destBlobPath, 0o000) - So(err, ShouldBeNil) - } - - // sync on demand - resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") - So(err, ShouldBeNil) - So(resp.StatusCode(), ShouldEqual, 200) - - // clean - for _, blob := range nm.Blobs { - destBlobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - err = os.Chmod(destBlobPath, 0o755) - So(err, ShouldBeNil) - err = os.Remove(destBlobPath) - So(err, ShouldBeNil) - } - // test cosign signatures errors // based on manifest digest get cosign manifest cosignEncodedDigest := strings.Replace(digest.String(), ":", "-", 1) + ".sig" @@ -2302,6 +2452,23 @@ func TestPeriodicallySignatures(t *testing.T) { err = json.Unmarshal(mResp.Body(), &cm) So(err, ShouldBeNil) + downstreaamCosignManifest := ispec.Manifest{ + MediaType: cm.MediaType, + Config: ispec.Descriptor{ + MediaType: cm.Config.MediaType, + Size: cm.Config.Size, + Digest: cm.Config.Digest, + Annotations: cm.Config.Annotations, + }, + Layers: cm.Layers, + Versioned: cm.Versioned, + Annotations: cm.Annotations, + } + + buf, err := json.Marshal(downstreaamCosignManifest) + So(err, ShouldBeNil) + cosignManifestDigest := godigest.FromBytes(buf) + for _, blob := range cm.Layers { blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) err := os.Chmod(blobPath, 0o000) @@ -2317,21 +2484,22 @@ func TestPeriodicallySignatures(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 200) + // remove already synced image + err = os.RemoveAll(path.Join(destDir, repoName)) + So(err, ShouldBeNil) + for _, blob := range cm.Layers { srcBlobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) err := os.Chmod(srcBlobPath, 0o755) So(err, ShouldBeNil) destBlobPath := path.Join(destDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - - err = os.MkdirAll(destBlobPath, 0o000) + err = os.MkdirAll(destBlobPath, 0o755) + So(err, ShouldBeNil) + err = os.Chmod(destBlobPath, 0o755) So(err, ShouldBeNil) } - // remove already synced image - err = os.RemoveAll(path.Join(destDir, repoName)) - So(err, ShouldBeNil) - // sync on demand resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") So(err, ShouldBeNil) @@ -2345,7 +2513,7 @@ func TestPeriodicallySignatures(t *testing.T) { So(err, ShouldBeNil) } - // trigger error on config blob + // trigger error on upstream config blob srcConfigBlobPath := path.Join(srcDir, repoName, "blobs", string(cm.Config.Digest.Algorithm()), cm.Config.Digest.Hex()) err = os.Chmod(srcConfigBlobPath, 0o000) @@ -2363,120 +2531,40 @@ func TestPeriodicallySignatures(t *testing.T) { err = os.Chmod(srcConfigBlobPath, 0o755) So(err, ShouldBeNil) - destConfigBlobPath := path.Join(destDir, repoName, "blobs", string(cm.Config.Digest.Algorithm()), - cm.Config.Digest.Hex()) - // err = os.Remove(destConfigBlobPath) - // So(err, ShouldBeNil) - err = os.MkdirAll(destConfigBlobPath, 0o000) - So(err, ShouldBeNil) - + // trigger error on upstream config blob // remove already synced image err = os.RemoveAll(path.Join(destDir, repoName)) So(err, ShouldBeNil) + destConfigBlobPath := path.Join(destDir, repoName, "blobs", string(cm.Config.Digest.Algorithm()), + cm.Config.Digest.Hex()) + + err = os.MkdirAll(destConfigBlobPath, 0o755) + So(err, ShouldBeNil) + err = os.Chmod(destConfigBlobPath, 0o000) + So(err, ShouldBeNil) + // sync on demand resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 200) - }) -} -func TestPeriodicallySignaturesErr(t *testing.T) { - Convey("Verify sync signatures gives error", t, func() { - updateDuration, _ := time.ParseDuration("30m") - - sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(t, false, false) - - defer func() { - sctlr.Shutdown() - }() - - // create repo, push and sign it - repoName := testSignedImage - var digest godigest.Digest - So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) - - splittedURL := strings.SplitAfter(srcBaseURL, ":") - srcPort := splittedURL[len(splittedURL)-1] - - cwd, err := os.Getwd() + // remove already synced image + err = os.RemoveAll(path.Join(destDir, repoName)) So(err, ShouldBeNil) - defer func() { _ = os.Chdir(cwd) }() - tdir := t.TempDir() - _ = os.Chdir(tdir) - generateKeyPairs(tdir) - - So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) - - regex := ".*" - var semver bool - var tlsVerify bool - - syncRegistryConfig := sync.RegistryConfig{ - Content: []sync.Content{ - { - Prefix: repoName, - Tags: &sync.Tags{ - Regex: ®ex, - Semver: &semver, - }, - }, - }, - URLs: []string{srcBaseURL}, - PollInterval: updateDuration, - TLSVerify: &tlsVerify, - CertDir: "", - OnDemand: true, - } - - defaultVal := true - syncConfig := &sync.Config{ - Enable: &defaultVal, - Registries: []sync.RegistryConfig{syncRegistryConfig}, - } - - // test negative cases (trigger errors) - // test notary signatures errors - - // based on manifest digest get referrers - getReferrersURL := srcBaseURL + path.Join("/oras/artifacts/v1/", repoName, "manifests", digest.String(), "referrers") - - resp, err := resty.R(). - SetHeader("Content-Type", "application/json"). - SetQueryParam("artifactType", "application/vnd.cncf.notary.v2.signature"). - Get(getReferrersURL) - + // trigger error on downstream manifest + destManifestPath := path.Join(destDir, repoName, "blobs", string(cosignManifestDigest.Algorithm()), + cosignManifestDigest.Hex()) + err = os.MkdirAll(destManifestPath, 0o755) So(err, ShouldBeNil) - So(resp, ShouldNotBeEmpty) - - var referrers ReferenceList - - err = json.Unmarshal(resp.Body(), &referrers) + err = os.Chmod(destManifestPath, 0o000) So(err, ShouldBeNil) - // read manifest - var nm artifactspec.Manifest - for _, ref := range referrers.References { - refPath := path.Join(srcDir, repoName, "blobs", string(ref.Digest.Algorithm()), ref.Digest.Hex()) - body, err := ioutil.ReadFile(refPath) - So(err, ShouldBeNil) - - err = json.Unmarshal(body, &nm) - So(err, ShouldBeNil) - - // triggers perm denied on sig blobs - for _, blob := range nm.Blobs { - blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) - err := os.Chmod(blobPath, 0o000) - So(err, ShouldBeNil) - } - } - - dctlr, _, _, _ := startDownstreamServer(t, false, syncConfig) - defer func() { - dctlr.Shutdown() - }() + // sync on demand + resp, err = resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/1.0") + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) }) } @@ -2530,10 +2618,11 @@ func TestOnDemandRetryGoroutine(t *testing.T) { Registries: []sync.RegistryConfig{syncRegistryConfig}, } - dc, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) defer func() { - dc.Shutdown() + dctlr.Shutdown() }() resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) @@ -2567,6 +2656,59 @@ func TestOnDemandRetryGoroutine(t *testing.T) { }) } +func TestOnDemandRetryGoroutineErr(t *testing.T) { + Convey("Verify ondemand sync retries in background on error", t, func() { + regex := ".*" + semver := true + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{"http://127.0.0.1"}, + OnDemand: true, + TLSVerify: &tlsVerify, + CertDir: "", + } + + maxRetries := 1 + delay := 1 * time.Second + syncRegistryConfig.MaxRetries = &maxRetries + syncRegistryConfig.RetryDelay = &delay + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dctlr.Shutdown() + }() + + resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + + // in the meantime ondemand should retry syncing and finish with error + time.Sleep(3 * time.Second) + + resp, err = destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) +} + func TestOnDemandMultipleRetries(t *testing.T) { Convey("Verify ondemand sync retries in background on error, multiple calls should spawn one routine", t, func() { srcPort := test.GetFreePort() @@ -2606,10 +2748,11 @@ func TestOnDemandMultipleRetries(t *testing.T) { Registries: []sync.RegistryConfig{syncRegistryConfig}, } - dc, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) defer func() { - dc.Shutdown() + dctlr.Shutdown() }() callsNo := 5 @@ -2652,15 +2795,7 @@ func TestOnDemandMultipleRetries(t *testing.T) { } }() - // wait till ready - for { - _, err := resty.R().Get(destBaseURL) - if err == nil { - break - } - - time.Sleep(100 * time.Millisecond) - } + test.WaitTillServerReady(srcBaseURL) defer func() { sctlr.Shutdown() @@ -2678,7 +2813,7 @@ func TestOnDemandMultipleRetries(t *testing.T) { time.Sleep(500 * time.Millisecond) } - waitSyncOndemand(destDir, testImage) + waitSync(destDir, testImage) So(len(populatedDirs), ShouldEqual, 1) }) @@ -2718,10 +2853,11 @@ func TestOnDemandPullsOnce(t *testing.T) { Registries: []sync.RegistryConfig{syncRegistryConfig}, } - dc, destBaseURL, destDir, _ := startDownstreamServer(t, false, syncConfig) + dctlr, destBaseURL, destDir, _ := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) defer func() { - dc.Shutdown() + dctlr.Shutdown() }() var wg goSync.WaitGroup @@ -3142,15 +3278,7 @@ func TestSyncOnlyDiff(t *testing.T) { } }() - // wait till ready - for { - _, err := resty.R().Get(destBaseURL) - if err == nil { - break - } - - time.Sleep(100 * time.Millisecond) - } + test.WaitTillServerReady(destBaseURL) defer func() { dctlr.Shutdown() @@ -3178,7 +3306,7 @@ func TestSyncOnlyDiff(t *testing.T) { So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 200) - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) done <- true So(isPopulated, ShouldBeFalse) @@ -3299,7 +3427,7 @@ func TestSyncWithDiffDigest(t *testing.T) { } }() - // watch .sync subdir, shouldn't be populated + // watch .sync subdir, should be populated done := make(chan bool) var isPopulated bool go func() { @@ -3321,27 +3449,309 @@ func TestSyncWithDiffDigest(t *testing.T) { dctlr.Shutdown() }() - // wait till ready - for { - _, err := resty.R().Get(destBaseURL) - if err == nil { - break - } - - time.Sleep(100 * time.Millisecond) - } + test.WaitTillServerReady(destBaseURL) resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 200) - time.Sleep(5 * time.Second) + waitSync(destDir, testImage) done <- true So(isPopulated, ShouldBeTrue) }) } +func TestSyncSignaturesDiff(t *testing.T) { + Convey("Verify sync detects changes in the upstream signatures", t, func() { + updateDuration, _ := time.ParseDuration("10s") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(t, false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + // create repo, push and sign it + repoName := testSignedImage + var digest godigest.Digest + So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) + + splittedURL := strings.SplitAfter(srcBaseURL, ":") + srcPort := splittedURL[len(splittedURL)-1] + + cwd, err := os.Getwd() + So(err, ShouldBeNil) + + defer func() { _ = os.Chdir(cwd) }() + tdir, err := ioutil.TempDir("", "sigs") + So(err, ShouldBeNil) + defer os.RemoveAll(tdir) + _ = os.Chdir(tdir) + generateKeyPairs(tdir) + + So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) + + regex := ".*" + var semver bool + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: repoName, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: false, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, destDir, destClient := startDownstreamServer(t, false, syncConfig) + defer func() { + os.RemoveAll(destDir) + dctlr.Shutdown() + }() + + // wait for sync + var destTagsList TagsList + + for { + resp, err := destClient.R().Get(destBaseURL + "/v2/" + repoName + "/tags/list") + if err != nil { + panic(err) + } + + err = json.Unmarshal(resp.Body(), &destTagsList) + if err != nil { + panic(err) + } + + if len(destTagsList.Tags) > 0 { + break + } + + time.Sleep(500 * time.Millisecond) + } + + splittedURL = strings.SplitAfter(destBaseURL, ":") + destPort := splittedURL[len(splittedURL)-1] + + a := &options.AnnotationOptions{Annotations: []string{"tag=1.0"}} + amap, err := a.AnnotationsMap() + if err != nil { + panic(err) + } + + // notation verify the image + image := fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0") + cmd := exec.Command("notation", "verify", "--cert", "good", "--plain-http", image) + out, err := cmd.CombinedOutput() + So(err, ShouldBeNil) + + msg := string(out) + So(msg, ShouldNotBeEmpty) + So(strings.Contains(msg, "verification failure"), ShouldBeFalse) + + // cosign verify the image + vrfy := verify.VerifyCommand{ + RegistryOptions: options.RegistryOptions{AllowInsecure: true}, + CheckClaims: true, + KeyRef: path.Join(tdir, "cosign.pub"), + Annotations: amap, + } + err = vrfy.Exec(context.TODO(), []string{fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0")}) + So(err, ShouldBeNil) + + // now add signatures to upstream and let sync detect that upstream signatures changed and pull them + So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) + + // wait for signatures + time.Sleep(10 * time.Second) + + // notation verify the image + image = fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0") + cmd = exec.Command("notation", "verify", "--cert", "good", "--plain-http", image) + out, err = cmd.CombinedOutput() + So(err, ShouldBeNil) + + msg = string(out) + So(msg, ShouldNotBeEmpty) + So(strings.Contains(msg, "verification failure"), ShouldBeFalse) + + // cosign verify the image + vrfy = verify.VerifyCommand{ + RegistryOptions: options.RegistryOptions{AllowInsecure: true}, + CheckClaims: true, + KeyRef: path.Join(tdir, "cosign.pub"), + Annotations: amap, + } + err = vrfy.Exec(context.TODO(), []string{fmt.Sprintf("localhost:%s/%s:%s", destPort, repoName, "1.0")}) + So(err, ShouldBeNil) + + // compare signatures + var srcIndex ispec.Index + var destIndex ispec.Index + + srcBuf, err := ioutil.ReadFile(path.Join(srcDir, repoName, "index.json")) + if err != nil { + panic(err) + } + + if err := json.Unmarshal(srcBuf, &srcIndex); err != nil { + panic(err) + } + + destBuf, err := ioutil.ReadFile(path.Join(destDir, repoName, "index.json")) + if err != nil { + panic(err) + } + + if err := json.Unmarshal(destBuf, &destIndex); err != nil { + panic(err) + } + + // find image manifest digest (signed-repo) and upstream notary digests + var upstreamRefsDigests []string + var downstreamRefsDigests []string + + var manifestDigest string + for _, manifestDesc := range srcIndex.Manifests { + if manifestDesc.Annotations[ispec.AnnotationRefName] == "1.0" { + manifestDigest = string(manifestDesc.Digest) + } else if manifestDesc.MediaType == notreg.ArtifactTypeNotation { + upstreamRefsDigests = append(upstreamRefsDigests, manifestDesc.Digest.String()) + } + } + + for _, manifestDesc := range destIndex.Manifests { + if manifestDesc.MediaType == notreg.ArtifactTypeNotation { + downstreamRefsDigests = append(downstreamRefsDigests, manifestDesc.Digest.String()) + } + } + + // compare notary signatures + So(upstreamRefsDigests, ShouldResemble, downstreamRefsDigests) + + cosignManifestTag := strings.Replace(manifestDigest, ":", "-", 1) + ".sig" + + // get synced cosign manifest from downstream + resp, err := resty.R().Get(destBaseURL + "/v2/" + repoName + "/manifests/" + cosignManifestTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + var syncedCosignManifest ispec.Manifest + + err = json.Unmarshal(resp.Body(), &syncedCosignManifest) + So(err, ShouldBeNil) + + // get cosign manifest from upstream + resp, err = resty.R().Get(srcBaseURL + "/v2/" + repoName + "/manifests/" + cosignManifestTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + var cosignManifest ispec.Manifest + + err = json.Unmarshal(resp.Body(), &cosignManifest) + So(err, ShouldBeNil) + + // compare cosign signatures + So(reflect.DeepEqual(cosignManifest, syncedCosignManifest), ShouldEqual, true) + + // let it sync one more time + time.Sleep(10 * time.Second) + }) +} + +func TestOnlySignedFlag(t *testing.T) { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(t, false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + regex := ".*" + semver := true + onlySigned := true + + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnlySigned: &onlySigned, + } + + defaultVal := true + + Convey("Verify sync revokes unsigned images", t, func() { + syncRegistryConfig.OnDemand = false + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, destDir, client := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dctlr.Shutdown() + }() + + time.Sleep(3 * time.Second) + + resp, err := client.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) + + Convey("Verify sync ondemand revokes unsigned images", t, func() { + syncRegistryConfig.OnDemand = true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + dctlr, destBaseURL, destDir, client := startDownstreamServer(t, false, syncConfig) + defer os.RemoveAll(destDir) + + defer func() { + dctlr.Shutdown() + }() + + resp, err := client.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 404) + }) +} + func generateKeyPairs(tdir string) { // generate a keypair os.Setenv("COSIGN_PASSWORD", "") @@ -3508,7 +3918,7 @@ func pushRepo(url, repoName string) godigest.Digest { return digest } -func waitSyncOndemand(rootDir, repoName string) { +func waitSync(rootDir, repoName string) { // wait for .sync subdirs to be removed for { dirs, err := os.ReadDir(path.Join(rootDir, repoName, sync.SyncBlobUploadDir)) diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index 8aa96719..3a9bc672 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -1,7 +1,6 @@ package sync import ( - "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -19,9 +18,9 @@ import ( "github.com/containers/image/v5/oci/layout" "github.com/containers/image/v5/types" guuid "github.com/gofrs/uuid" - "github.com/notaryproject/notation-go-lib" ispec "github.com/opencontainers/image-spec/specs-go/v1" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" + "github.com/sigstore/cosign/pkg/oci/static" "gopkg.in/resty.v1" zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/common" @@ -32,7 +31,7 @@ import ( ) type ReferenceList struct { - References []notation.Descriptor `json:"references"` + References []artifactspec.Descriptor `json:"references"` } // getTagFromRef returns a tagged reference from an image reference. @@ -207,18 +206,18 @@ func getFileCredentials(filepath string) (CredentialsFile, error) { } func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Credentials, - log log.Logger) (*resty.Client, error) { + log log.Logger) (*resty.Client, *url.URL, error) { client := resty.New() if !common.Contains(regCfg.URLs, upstreamURL) { - return nil, zerr.ErrSyncInvalidUpstreamURL + return nil, nil, zerr.ErrSyncInvalidUpstreamURL } registryURL, err := url.Parse(upstreamURL) if err != nil { log.Error().Err(err).Str("url", upstreamURL).Msg("couldn't parse url") - return nil, err + return nil, nil, err } if regCfg.CertDir != "" { @@ -231,7 +230,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede if err != nil { log.Error().Err(err).Msg("couldn't read CA certificate") - return nil, err + return nil, nil, err } caCertPool := x509.NewCertPool() @@ -243,7 +242,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede if err != nil { log.Error().Err(err).Msg("couldn't read certificates key pairs") - return nil, err + return nil, nil, err } client.SetCertificates(cert) @@ -259,275 +258,13 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede client.SetBasicAuth(credentials.Username, credentials.Password) } - return client, nil -} - -func syncCosignSignature(client *resty.Client, storeController storage.StoreController, - regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error { - log.Info().Msg("syncing cosign signatures") - - getCosignManifestURL := regURL - - if !isCosignTag(digest) { - digest = strings.Replace(digest, ":", "-", 1) + ".sig" - } - - getCosignManifestURL.Path = path.Join(getCosignManifestURL.Path, "v2", remoteRepo, "manifests", digest) - - getCosignManifestURL.RawQuery = getCosignManifestURL.Query().Encode() - - mResp, err := client.R().Get(getCosignManifestURL.String()) - if err != nil { - log.Error().Err(err).Str("url", getCosignManifestURL.String()). - Msgf("couldn't get cosign manifest: %s", digest) - - return err - } - - if mResp.IsError() { - log.Info().Msgf("couldn't find any cosign signature from %s, status code: %d skipping", - getCosignManifestURL.String(), mResp.StatusCode()) - - return nil - } - - var m ispec.Manifest - - err = json.Unmarshal(mResp.Body(), &m) - if err != nil { - log.Error().Err(err).Str("url", getCosignManifestURL.String()). - Msgf("couldn't unmarshal cosign manifest %s", digest) - - return err - } - - imageStore := storeController.GetImageStore(localRepo) - - for _, blob := range m.Layers { - // get blob - getBlobURL := regURL - getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String()) - getBlobURL.RawQuery = getBlobURL.Query().Encode() - - resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) - if err != nil { - log.Error().Err(err).Msgf("couldn't get cosign blob: %s", blob.Digest.String()) - - return err - } - - if resp.IsError() { - log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) - - return zerr.ErrBadBlobDigest - } - - defer resp.RawBody().Close() - - // push blob - _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String()) - if err != nil { - log.Error().Err(err).Msg("couldn't upload cosign blob") - - return err - } - } - - // get config blob - getBlobURL := regURL - getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", m.Config.Digest.String()) - getBlobURL.RawQuery = getBlobURL.Query().Encode() - - resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) - if err != nil { - log.Error().Err(err).Msgf("couldn't get cosign config blob: %s", getBlobURL.String()) - - return err - } - - if resp.IsError() { - log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) - - return zerr.ErrBadBlobDigest - } - - defer resp.RawBody().Close() - - // push config blob - _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), m.Config.Digest.String()) - if err != nil { - log.Error().Err(err).Msg("couldn't upload cosign blob") - - return err - } - - // push manifest - _, err = imageStore.PutImageManifest(localRepo, digest, ispec.MediaTypeImageManifest, mResp.Body()) - if err != nil { - log.Error().Err(err).Msg("couldn't upload cosing manifest") - - return err - } - - return nil -} - -func syncNotarySignature(client *resty.Client, storeController storage.StoreController, - regURL url.URL, remoteRepo, localRepo, digest string, log log.Logger) error { - log.Info().Msg("syncing notary signatures") - - getReferrersURL := regURL - - // based on manifest digest get referrers - getReferrersURL.Path = path.Join(getReferrersURL.Path, "oras/artifacts/v1/", - remoteRepo, "manifests", digest, "referrers") - getReferrersURL.RawQuery = getReferrersURL.Query().Encode() - - resp, err := client.R(). - SetHeader("Content-Type", "application/json"). - SetQueryParam("artifactType", "application/vnd.cncf.notary.v2.signature"). - Get(getReferrersURL.String()) - if err != nil { - log.Error().Err(err).Msgf("couldn't get referrers from %s", getReferrersURL.String()) - - return err - } - - if resp.IsError() { - log.Info().Msgf("couldn't find any notary signature from %s, status code: %d, skipping", - getReferrersURL.String(), resp.StatusCode()) - - return nil - } - - var referrers ReferenceList - - err = json.Unmarshal(resp.Body(), &referrers) - if err != nil { - log.Error().Err(err).Msgf("couldn't unmarshal notary signature from %s", getReferrersURL.String()) - - return err - } - - imageStore := storeController.GetImageStore(localRepo) - - for _, ref := range referrers.References { - // get referrer manifest - getRefManifestURL := regURL - getRefManifestURL.Path = path.Join(getRefManifestURL.Path, "v2", remoteRepo, "manifests", ref.Digest.String()) - getRefManifestURL.RawQuery = getRefManifestURL.Query().Encode() - - resp, err := client.R(). - Get(getRefManifestURL.String()) - if err != nil { - log.Error().Err(err).Msgf("couldn't get notary manifest: %s", getRefManifestURL.String()) - - return err - } - - // read manifest - var m artifactspec.Manifest - - err = json.Unmarshal(resp.Body(), &m) - if err != nil { - log.Error().Err(err).Msgf("couldn't unmarshal notary manifest: %s", getRefManifestURL.String()) - - return err - } - - for _, blob := range m.Blobs { - getBlobURL := regURL - getBlobURL.Path = path.Join(getBlobURL.Path, "v2", remoteRepo, "blobs", blob.Digest.String()) - getBlobURL.RawQuery = getBlobURL.Query().Encode() - - resp, err := client.R().SetDoNotParseResponse(true).Get(getBlobURL.String()) - if err != nil { - log.Error().Err(err).Msgf("couldn't get notary blob: %s", getBlobURL.String()) - - return err - } - - defer resp.RawBody().Close() - - if resp.IsError() { - log.Info().Msgf("couldn't find notary blob from %s, status code: %d", - getBlobURL.String(), resp.StatusCode()) - - return zerr.ErrBadBlobDigest - } - - _, _, err = imageStore.FullBlobUpload(localRepo, resp.RawBody(), blob.Digest.String()) - if err != nil { - log.Error().Err(err).Msg("couldn't upload notary sig blob") - - return err - } - } - - _, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(), artifactspec.MediaTypeArtifactManifest, - resp.Body()) - if err != nil { - log.Error().Err(err).Msg("couldn't upload notary sig manifest") - - return err - } - } - - return nil -} - -func syncSignatures(client *resty.Client, storeController storage.StoreController, - registryURL, remoteRepo, localRepo, tag string, log log.Logger) error { - log.Info().Msgf("syncing signatures from %s/%s:%s", registryURL, remoteRepo, tag) - // get manifest and find out its digest - regURL, err := url.Parse(registryURL) - if err != nil { - log.Error().Err(err).Msgf("couldn't parse registry URL: %s", registryURL) - - return err - } - - getManifestURL := *regURL - - getManifestURL.Path = path.Join(getManifestURL.Path, "v2", remoteRepo, "manifests", tag) - - resp, err := client.R().SetHeader("Content-Type", "application/json").Head(getManifestURL.String()) - if err != nil { - log.Error().Err(err).Str("url", getManifestURL.String()). - Msgf("couldn't query %s", registryURL) - - return err - } - - digest := resp.Header().Get("Docker-Content-Digest") - if digest == "" { - log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()). - Msgf("couldn't get digest for manifest: %s:%s", remoteRepo, tag) - - return zerr.ErrBadBlobDigest - } - - err = syncNotarySignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log) - if err != nil { - return err - } - - err = syncCosignSignature(client, storeController, *regURL, remoteRepo, localRepo, digest, log) - if err != nil { - return err - } - - log.Info().Msgf("successfully synced %s/%s:%s signatures", registryURL, remoteRepo, tag) - - return nil + return client, registryURL, nil } func pushSyncedLocalImage(localRepo, tag, localCachePath string, - storeController storage.StoreController, log log.Logger) error { + imageStore storage.ImageStore, log log.Logger) error { log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, tag) - imageStore := storeController.GetImageStore(localRepo) - metrics := monitoring.NewMetricsServer(false, log) cacheImageStore := storage.NewImageStore(localCachePath, false, storage.DefaultGCDelay, false, false, log, metrics) @@ -598,16 +335,6 @@ func pushSyncedLocalImage(localRepo, tag, localCachePath string, return nil } -// sync feature will try to pull cosign signature because for sync cosign signature is just an image -// this function will check if tag is a cosign tag. -func isCosignTag(tag string) bool { - if strings.HasPrefix(tag, "sha256-") && strings.HasSuffix(tag, ".sig") { - return true - } - - return false -} - // sync needs transport to be stripped to not be wrongly interpreted as an image reference // at a non-fully qualified registry (hostname as image and port as tag). func StripRegistryTransport(url string) string { @@ -659,11 +386,10 @@ func getLocalImageRef(imageStore storage.ImageStore, repo, tag string) (types.Im return localImageRef, localCachePath, nil } -// canSkipImage returns whether or not the image can be skipped from syncing. -func canSkipImage(ctx context.Context, repo, tag string, upstreamRef types.ImageReference, - imageStore storage.ImageStore, upstreamCtx *types.SystemContext, log log.Logger) (bool, error) { - // filter already pulled images - _, localImageDigest, _, err := imageStore.GetImageManifest(repo, tag) +// canSkipImage returns whether or not we already synced this image. +func canSkipImage(repo, tag, digest string, imageStore storage.ImageStore, log log.Logger) (bool, error) { + // check image already synced + _, localImageManifestDigest, _, err := imageStore.GetImageManifest(repo, tag) if err != nil { if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrManifestNotFound) { return false, nil @@ -674,18 +400,54 @@ func canSkipImage(ctx context.Context, repo, tag string, upstreamRef types.Image return false, err } - upstreamImageDigest, err := docker.GetDigest(ctx, upstreamCtx, upstreamRef) - if err != nil { - log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamRef.DockerReference()) + if localImageManifestDigest != digest { + log.Info().Msgf("upstream image %s:%s digest changed, syncing again", repo, tag) - return false, err + return false, nil } - if localImageDigest == string(upstreamImageDigest) { - log.Info().Msgf("skipping syncing %s:%s, image already synced", repo, tag) - - return true, nil - } - - return false, nil + return true, nil +} + +func manifestsEqual(manifest1, manifest2 ispec.Manifest) bool { + if manifest1.Config.Digest == manifest2.Config.Digest && + manifest1.Config.MediaType == manifest2.Config.MediaType && + manifest1.Config.Size == manifest2.Config.Size && + len(manifest1.Layers) == len(manifest2.Layers) { + if descriptorEqual(manifest1.Layers, manifest2.Layers) { + return true + } + } + + return false +} + +func artifactDescriptorsEqual(desc1, desc2 []artifactspec.Descriptor) bool { + if len(desc1) == len(desc2) { + for id, desc := range desc1 { + if desc.Digest == desc2[id].Digest && + desc.Size == desc2[id].Size && + desc.MediaType == desc2[id].MediaType && + desc.ArtifactType == desc2[id].ArtifactType { + return true + } + } + } + + return false +} + +func descriptorEqual(desc1, desc2 []ispec.Descriptor) bool { + if len(desc1) == len(desc2) { + for id, desc := range desc1 { + if desc.Digest == desc2[id].Digest && + desc.Size == desc2[id].Size && + desc.MediaType == desc2[id].MediaType && + desc.Annotations[static.SignatureAnnotationKey] == desc2[id].Annotations[static.SignatureAnnotationKey] { + return true + } + } + } + + return false } diff --git a/pkg/storage/s3/storage.go b/pkg/storage/s3/storage.go index 2bac4dca..c333a861 100644 --- a/pkg/storage/s3/storage.go +++ b/pkg/storage/s3/storage.go @@ -495,6 +495,7 @@ func (is *ObjectStorage) PutImageManifest(repo string, reference string, mediaTy } // manifest contents have changed for the same tag, // so update index.json descriptor + is.log.Info(). Int64("old size", desc.Size). Int64("new size", int64(len(body))).