diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 891cfb03..ab093f7e 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -354,6 +354,16 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto imageStore := storeController.GetImageStore(repo) + canBeSkipped, err := canSkipImage(repo, tag, upstreamImageRef, imageStore, upstreamCtx, log) + if err != nil { + log.Error().Err(err).Msgf("couldn't check if the upstream image %s can be skipped", + upstreamImageRef.DockerReference()) + } + + if canBeSkipped { + continue + } + localCachePath, err := getLocalCachePath(imageStore, repo) if err != nil { log.Error().Err(err).Str("dir", localCachePath).Msg("couldn't create temporary dir") @@ -371,15 +381,15 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto return err } - log.Info().Msgf("copying image %s:%s to %s", upstreamImageRef.DockerReference(), tag, localCachePath) + log.Info().Msgf("copying image %s to %s", upstreamImageRef.DockerReference(), localCachePath) if err = retry.RetryIfNecessary(context.Background(), func() error { _, err = copy.Image(context.Background(), policyCtx, localImageRef, upstreamImageRef, &options) return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("error while copying image %s:%s to %s", - upstreamImageRef.DockerReference(), tag, localCachePath) + log.Error().Err(err).Msgf("error while copying image %s to %s", + upstreamImageRef.DockerReference(), localCachePath) return err } @@ -397,7 +407,7 @@ func syncRegistry(regCfg RegistryConfig, upstreamURL string, storeController sto return err }, retryOptions); err != nil { - log.Error().Err(err).Msgf("couldn't copy image signature %s:%s", upstreamImageRef.DockerReference(), tag) + log.Error().Err(err).Msgf("couldn't copy image signature %s", upstreamImageRef.DockerReference()) } } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 890c345c..68978acd 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -24,7 +24,7 @@ import ( "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" "zotregistry.io/zot/pkg/storage" - . "zotregistry.io/zot/pkg/test" + "zotregistry.io/zot/pkg/test" ) const ( @@ -88,8 +88,8 @@ func TestSyncInternal(t *testing.T) { var tlsVerify bool updateDuration := time.Microsecond - port := GetFreePort() - baseURL := GetBaseURL(port) + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) syncRegistryConfig := RegistryConfig{ Content: []Content{ { @@ -119,14 +119,14 @@ func TestSyncInternal(t *testing.T) { Prefix: testImage, }, }, - URLs: []string{BaseURL}, + URLs: []string{test.BaseURL}, PollInterval: updateDuration, TLSVerify: &tlsVerify, CertDir: "/tmp/missing_certs/a/b/c/d/z", } - port := GetFreePort() - baseURL := GetBaseURL(port) + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) httpClient, err := getHTTPClient(&syncRegistryConfig, baseURL, Credentials{}, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) @@ -149,9 +149,9 @@ func TestSyncInternal(t *testing.T) { var tlsVerify bool updateDuration := time.Microsecond - port := GetFreePort() - baseURL := GetBaseURL(port) - baseSecureURL := GetSecureBaseURL(port) + port := test.GetFreePort() + baseURL := test.GetBaseURL(port) + baseSecureURL := test.GetSecureBaseURL(port) syncRegistryConfig := RegistryConfig{ Content: []Content{ @@ -187,7 +187,7 @@ func TestSyncInternal(t *testing.T) { _, err = getUpstreamCatalog(httpClient, "http://invalid:5000", log.NewLogger("debug", "")) So(err, ShouldNotBeNil) - syncRegistryConfig.URLs = []string{BaseURL} + syncRegistryConfig.URLs = []string{test.BaseURL} httpClient, err = getHTTPClient(&syncRegistryConfig, baseSecureURL, Credentials{}, log.NewLogger("debug", "")) So(err, ShouldNotBeNil) So(httpClient, ShouldBeNil) @@ -229,6 +229,51 @@ func TestSyncInternal(t *testing.T) { So(err, ShouldNotBeNil) }) + Convey("Test canSkipImage()", t, func() { + storageDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + err = test.CopyFiles("../../../test/data", storageDir) + if err != nil { + panic(err) + } + + defer os.RemoveAll(storageDir) + + log := log.Logger{Logger: zerolog.New(os.Stdout)} + metrics := monitoring.NewMetricsServer(false, log) + + imageStore := storage.NewImageStore(storageDir, false, false, false, log, metrics) + + repoRefStr := fmt.Sprintf("%s/%s", host, testImage) + repoRef, err := parseRepositoryReference(repoRefStr) + So(err, ShouldBeNil) + So(repoRef, ShouldNotBeNil) + + taggedRef, err := reference.WithTag(repoRef, testImageTag) + So(err, ShouldBeNil) + So(taggedRef, ShouldNotBeNil) + + upstreamRef, err := docker.NewReference(taggedRef) + So(err, ShouldBeNil) + So(taggedRef, ShouldNotBeNil) + + canBeSkipped, err := canSkipImage(testImage, testImageTag, upstreamRef, imageStore, &types.SystemContext{}, log) + So(err, ShouldNotBeNil) + So(canBeSkipped, ShouldBeFalse) + + err = os.Chmod(path.Join(imageStore.RootDir(), testImage, "index.json"), 0o000) + if err != nil { + panic(err) + } + + canBeSkipped, err = canSkipImage(testImage, testImageTag, upstreamRef, imageStore, &types.SystemContext{}, log) + So(err, ShouldNotBeNil) + So(canBeSkipped, ShouldBeFalse) + }) + Convey("Test filterRepos()", t, func() { repos := []string{"repo", "repo1", "repo2", "repo/repo2", "repo/repo2/repo3/repo4"} contents := []Content{ @@ -284,7 +329,7 @@ func TestSyncInternal(t *testing.T) { panic(err) } - err = CopyFiles("../../../test/data", testRootDir) + err = test.CopyFiles("../../../test/data", testRootDir) if err != nil { panic(err) } diff --git a/pkg/extensions/sync/sync_test.go b/pkg/extensions/sync/sync_test.go index 267afcbc..8c027d4e 100644 --- a/pkg/extensions/sync/sync_test.go +++ b/pkg/extensions/sync/sync_test.go @@ -555,8 +555,8 @@ func TestPeriodically(t *testing.T) { }) } -func TestPermsDenied(t *testing.T) { - Convey("Verify sync feature without perm on sync cache", t, func() { +func TestOnDemandPermsDenied(t *testing.T) { + Convey("Verify sync on demand feature without perm on sync cache", t, func() { updateDuration, _ := time.ParseDuration("30m") sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) @@ -593,19 +593,58 @@ func TestPermsDenied(t *testing.T) { Registries: []sync.RegistryConfig{syncRegistryConfig}, } - dctlr, destBaseURL, destDir, destClient := startDownstreamServer(false, syncConfig) + destPort := test.GetFreePort() + destConfig := config.New() + destBaseURL := test.GetBaseURL(destPort) + + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + defer os.RemoveAll(destDir) + destConfig.Storage.RootDirectory = destDir + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = syncConfig + + dctlr := api.NewController(destConfig) + defer func() { dctlr.Shutdown() }() - err := os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000) - if err != nil { - panic(err) + go func() { + // this blocks + if err := dctlr.Run(); err != nil { + return + } + }() + + for { + err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000) + if err != nil { + continue + } + + break } - resp, err := destClient.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + + time.Sleep(100 * time.Millisecond) + } + + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) So(err, ShouldBeNil) So(resp.StatusCode(), ShouldEqual, 404) @@ -616,6 +655,101 @@ func TestPermsDenied(t *testing.T) { }) } +func TestPeriodicallyPermsDenied(t *testing.T) { + Convey("Verify periodically sync feature without perm on sync cache", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + regex := ".*" + semver := true + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: testImage, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + destPort := test.GetFreePort() + destConfig := config.New() + destBaseURL := test.GetBaseURL(destPort) + + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + defer os.RemoveAll(destDir) + + destConfig.Storage.RootDirectory = destDir + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = syncConfig + + dctlr := api.NewController(destConfig) + + go func() { + // this blocks + if err := dctlr.Run(); err != nil { + return + } + }() + + for { + err = os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o000) + if err != nil { + continue + } + + break + } + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + + time.Sleep(100 * time.Millisecond) + } + + defer func() { + dctlr.Shutdown() + err := os.Chmod(path.Join(destDir, testImage, sync.SyncBlobUploadDir), 0o755) + if err != nil { + panic(err) + } + }() + }) +} + func TestBadTLS(t *testing.T) { Convey("Verify sync TLS feature", t, func() { updateDuration, _ := time.ParseDuration("30m") @@ -2295,6 +2429,109 @@ func TestPeriodicallySignatures(t *testing.T) { }) } +func TestPeriodicallySignaturesErr(t *testing.T) { + Convey("Verify sync signatures gives error", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + // create repo, push and sign it + repoName := testSignedImage + var digest godigest.Digest + So(func() { digest = pushRepo(srcBaseURL, repoName) }, ShouldNotPanic) + + splittedURL := strings.SplitAfter(srcBaseURL, ":") + srcPort := splittedURL[len(splittedURL)-1] + + cwd, err := os.Getwd() + So(err, ShouldBeNil) + + defer func() { _ = os.Chdir(cwd) }() + tdir, err := ioutil.TempDir("", "sigs") + So(err, ShouldBeNil) + defer os.RemoveAll(tdir) + _ = os.Chdir(tdir) + generateKeyPairs(tdir) + + So(func() { signImage(tdir, srcPort, repoName, digest) }, ShouldNotPanic) + + regex := ".*" + var semver bool + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: repoName, + Tags: &sync.Tags{ + Regex: ®ex, + Semver: &semver, + }, + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: true, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + // test negative cases (trigger errors) + // test notary signatures errors + + // based on manifest digest get referrers + getReferrersURL := srcBaseURL + path.Join("/oras/artifacts/v1/", repoName, "manifests", digest.String(), "referrers") + + resp, err := resty.R(). + SetHeader("Content-Type", "application/json"). + SetQueryParam("artifactType", "application/vnd.cncf.notary.v2.signature"). + Get(getReferrersURL) + + So(err, ShouldBeNil) + So(resp, ShouldNotBeEmpty) + + var referrers ReferenceList + + err = json.Unmarshal(resp.Body(), &referrers) + So(err, ShouldBeNil) + + // read manifest + var nm artifactspec.Manifest + for _, ref := range referrers.References { + refPath := path.Join(srcDir, repoName, "blobs", string(ref.Digest.Algorithm()), ref.Digest.Hex()) + body, err := ioutil.ReadFile(refPath) + So(err, ShouldBeNil) + + err = json.Unmarshal(body, &nm) + So(err, ShouldBeNil) + + // triggers perm denied on sig blobs + for _, blob := range nm.Blobs { + blobPath := path.Join(srcDir, repoName, "blobs", string(blob.Digest.Algorithm()), blob.Digest.Hex()) + err := os.Chmod(blobPath, 0o000) + So(err, ShouldBeNil) + } + } + + dctlr, _, destDir, _ := startDownstreamServer(false, syncConfig) + defer func() { + dctlr.Shutdown() + defer os.RemoveAll(destDir) + }() + }) +} + func TestOnDemandRetryGoroutine(t *testing.T) { Convey("Verify ondemand sync retries in background on error", t, func() { srcPort := test.GetFreePort() @@ -2917,6 +3154,277 @@ func TestOnlySignaturesOnDemand(t *testing.T) { }) } +func TestSyncOnlyDiff(t *testing.T) { + Convey("Verify sync only difference between local and upstream", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: "**", + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: false, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + destPort := test.GetFreePort() + destConfig := config.New() + destBaseURL := test.GetBaseURL(destPort) + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + // copy images so we have them before syncing, sync should not pull them again + err = test.CopyFiles("../../../test/data", destDir) + if err != nil { + panic(err) + } + + destConfig.Storage.RootDirectory = destDir + destConfig.Storage.Dedupe = false + destConfig.Storage.GC = false + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = syncConfig + + dctlr := api.NewController(destConfig) + + go func() { + // this blocks + if err := dctlr.Run(); err != nil { + return + } + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + + time.Sleep(100 * time.Millisecond) + } + + defer func() { + dctlr.Shutdown() + os.RemoveAll(destDir) + }() + + // watch .sync subdir, shouldn't be populated + done := make(chan bool) + var isPopulated bool + go func() { + for { + select { + case <-done: + return + default: + _, err := os.ReadDir(path.Join(destDir, testImage, ".sync")) + if err == nil { + isPopulated = true + } + time.Sleep(200 * time.Millisecond) + } + } + }() + + resp, err := resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + time.Sleep(2 * time.Second) + + done <- true + So(isPopulated, ShouldBeFalse) + }) +} + +func TestSyncWithDiffDigest(t *testing.T) { + Convey("Verify sync correctly detects changes in upstream images", t, func() { + updateDuration, _ := time.ParseDuration("30m") + + sctlr, srcBaseURL, srcDir, _, _ := startUpstreamServer(false, false) + defer os.RemoveAll(srcDir) + + defer func() { + sctlr.Shutdown() + }() + + var tlsVerify bool + + syncRegistryConfig := sync.RegistryConfig{ + Content: []sync.Content{ + { + Prefix: "**", + }, + }, + URLs: []string{srcBaseURL}, + PollInterval: updateDuration, + TLSVerify: &tlsVerify, + CertDir: "", + OnDemand: false, + } + + defaultVal := true + syncConfig := &sync.Config{ + Enable: &defaultVal, + Registries: []sync.RegistryConfig{syncRegistryConfig}, + } + + destPort := test.GetFreePort() + destConfig := config.New() + destBaseURL := test.GetBaseURL(destPort) + destConfig.HTTP.Port = destPort + + destDir, err := ioutil.TempDir("", "oci-dest-repo-test") + if err != nil { + panic(err) + } + + // copy images so we have them before syncing, sync should not pull them again + err = test.CopyFiles("../../../test/data", destDir) + if err != nil { + panic(err) + } + + destConfig.Storage.RootDirectory = destDir + destConfig.Storage.Dedupe = false + destConfig.Storage.GC = false + + destConfig.Extensions = &extconf.ExtensionConfig{} + destConfig.Extensions.Search = nil + destConfig.Extensions.Sync = syncConfig + + dctlr := api.NewController(destConfig) + + // before starting downstream server, let's modify an image manifest so that sync should pull it + // change digest of the manifest so that sync should happen + size := 5 * 1024 * 1024 + blob := make([]byte, size) + digest := godigest.FromBytes(blob) + + resp, err := resty.R().Get(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + manifestBlob := resp.Body() + + var manifest ispec.Manifest + + err = json.Unmarshal(manifestBlob, &manifest) + So(err, ShouldBeNil) + + resp, err = resty.R().Post(srcBaseURL + "/v2/" + testImage + "/blobs/uploads/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusAccepted) + + loc := resp.Header().Get("Location") + + resp, err = resty.R(). + SetHeader("Content-Length", fmt.Sprintf("%d", len(blob))). + SetHeader("Content-Type", "application/octet-stream"). + SetQueryParam("digest", digest.String()). + SetBody(blob). + Put(srcBaseURL + loc) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + + newLayer := ispec.Descriptor{ + MediaType: ispec.MediaTypeImageLayer, + Digest: digest, + Size: int64(size), + } + + manifest.Layers = append(manifest.Layers, newLayer) + + manifestBody, err := json.Marshal(manifest) + if err != nil { + panic(err) + } + + resp, err = resty.R().SetHeader("Content-type", "application/vnd.oci.image.manifest.v1+json"). + SetBody(manifestBody). + Put(srcBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, 201) + + go func() { + // this blocks + if err := dctlr.Run(); err != nil { + return + } + }() + + // watch .sync subdir, shouldn't be populated + done := make(chan bool) + var isPopulated bool + go func() { + for { + select { + case <-done: + return + default: + _, err := os.ReadDir(path.Join(destDir, testImage, ".sync")) + if err == nil { + isPopulated = true + } + time.Sleep(200 * time.Millisecond) + } + } + }() + + defer func() { + dctlr.Shutdown() + os.RemoveAll(destDir) + }() + + // wait till ready + for { + _, err := resty.R().Get(destBaseURL) + if err == nil { + break + } + + time.Sleep(100 * time.Millisecond) + } + + resp, err = resty.R().Get(destBaseURL + "/v2/" + testImage + "/manifests/" + testImageTag) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + time.Sleep(5 * time.Second) + + done <- true + So(isPopulated, ShouldBeTrue) + }) +} + func generateKeyPairs(tdir string) { // generate a keypair os.Setenv("COSIGN_PASSWORD", "") diff --git a/pkg/extensions/sync/utils.go b/pkg/extensions/sync/utils.go index 7fb1689d..70e04b7a 100644 --- a/pkg/extensions/sync/utils.go +++ b/pkg/extensions/sync/utils.go @@ -1,9 +1,11 @@ package sync import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" + "errors" "fmt" "io/ioutil" "net/url" @@ -21,7 +23,7 @@ import ( ispec "github.com/opencontainers/image-spec/specs-go/v1" artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" "gopkg.in/resty.v1" - "zotregistry.io/zot/errors" + zerr "zotregistry.io/zot/errors" "zotregistry.io/zot/pkg/common" "zotregistry.io/zot/pkg/extensions/monitoring" "zotregistry.io/zot/pkg/log" @@ -58,7 +60,7 @@ func parseRepositoryReference(input string) (reference.Named, error) { } if !reference.IsNameOnly(ref) { - return nil, errors.ErrInvalidRepositoryName + return nil, zerr.ErrInvalidRepositoryName } return ref, nil @@ -119,7 +121,7 @@ func getHTTPClient(regCfg *RegistryConfig, upstreamURL string, credentials Crede client := resty.New() if !common.Contains(regCfg.URLs, upstreamURL) { - return nil, errors.ErrSyncInvalidUpstreamURL + return nil, zerr.ErrSyncInvalidUpstreamURL } registryURL, err := url.Parse(upstreamURL) @@ -227,7 +229,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont if resp.IsError() { log.Info().Msgf("couldn't find cosign blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) - return errors.ErrBadBlobDigest + return zerr.ErrBadBlobDigest } defer resp.RawBody().Close() @@ -256,7 +258,7 @@ func syncCosignSignature(client *resty.Client, storeController storage.StoreCont if resp.IsError() { log.Info().Msgf("couldn't find cosign config blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) - return errors.ErrBadBlobDigest + return zerr.ErrBadBlobDigest } defer resp.RawBody().Close() @@ -360,7 +362,7 @@ func syncNotarySignature(client *resty.Client, storeController storage.StoreCont log.Info().Msgf("couldn't find notary blob from %s, status code: %d", getBlobURL.String(), resp.StatusCode()) - return errors.ErrBadBlobDigest + return zerr.ErrBadBlobDigest } _, _, err = imageStore.FullBlobUpload(repo, resp.RawBody(), blob.Digest.String()) @@ -407,17 +409,17 @@ func syncSignatures(client *resty.Client, storeController storage.StoreControlle digests, ok := resp.Header()["Docker-Content-Digest"] if !ok { - log.Error().Err(errors.ErrBadBlobDigest).Str("url", getManifestURL.String()). + log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()). Msgf("couldn't get digest for manifest: %s:%s", repo, tag) - return errors.ErrBadBlobDigest + return zerr.ErrBadBlobDigest } if len(digests) != 1 { - log.Error().Err(errors.ErrBadBlobDigest).Str("url", getManifestURL.String()). + log.Error().Err(zerr.ErrBadBlobDigest).Str("url", getManifestURL.String()). Msgf("multiple digests found for: %s:%s", repo, tag) - return errors.ErrBadBlobDigest + return zerr.ErrBadBlobDigest } err = syncNotarySignature(client, storeController, *regURL, repo, digests[0], log) @@ -573,3 +575,34 @@ func getLocalImageRef(localCachePath, repo, tag string) (types.ImageReference, e return localImageRef, nil } + +// canSkipImage returns whether or not the image can be skipped from syncing. +func canSkipImage(repo, tag string, upstreamRef types.ImageReference, + imageStore storage.ImageStore, upstreamCtx *types.SystemContext, log log.Logger) (bool, error) { + // filter already pulled images + _, localImageDigest, _, err := imageStore.GetImageManifest(repo, tag) + if err != nil { + if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrManifestNotFound) { + return false, nil + } + + log.Error().Err(err).Msgf("couldn't get local image %s:%s manifest", repo, tag) + + return false, err + } + + upstreamImageDigest, err := docker.GetDigest(context.Background(), upstreamCtx, upstreamRef) + if err != nil { + log.Error().Err(err).Msgf("couldn't get upstream image %s manifest", upstreamRef.DockerReference()) + + return false, err + } + + if localImageDigest == string(upstreamImageDigest) { + log.Info().Msgf("skipping syncing %s:%s, image already synced", repo, tag) + + return true, nil + } + + return false, nil +}