feat(scheduler): pass the shutdown/reload ctx to running tasks (#1671)

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu
2023-09-05 19:48:56 +03:00
committed by GitHub
parent a0290b4b37
commit 59dc4c3229
28 changed files with 190 additions and 143 deletions
+4 -3
View File
@@ -1,6 +1,7 @@
package client
import (
"context"
"net/http"
"net/url"
"sync"
@@ -71,7 +72,7 @@ func (httpClient *Client) SetConfig(config Config) error {
}
func (httpClient *Client) IsAvailable() bool {
_, _, statusCode, err := httpClient.MakeGetRequest(nil, "", "/v2/")
_, _, statusCode, err := httpClient.MakeGetRequest(context.Background(), nil, "", "/v2/")
if err != nil || statusCode != http.StatusOK {
return false
}
@@ -79,7 +80,7 @@ func (httpClient *Client) IsAvailable() bool {
return true
}
func (httpClient *Client) MakeGetRequest(resultPtr interface{}, mediaType string,
func (httpClient *Client) MakeGetRequest(ctx context.Context, resultPtr interface{}, mediaType string,
route ...string,
) ([]byte, string, int, error) {
httpClient.lock.RLock()
@@ -93,7 +94,7 @@ func (httpClient *Client) MakeGetRequest(resultPtr interface{}, mediaType string
url.RawQuery = url.Query().Encode()
body, mediaType, statusCode, err := common.MakeHTTPGetRequest(httpClient.client, httpClient.config.Username,
body, mediaType, statusCode, err := common.MakeHTTPGetRequest(ctx, httpClient.client, httpClient.config.Username,
httpClient.config.Password, resultPtr,
url.String(), mediaType, httpClient.log)
+10 -7
View File
@@ -45,7 +45,7 @@ func (onDemand *BaseOnDemand) Add(service Service) {
onDemand.services = append(onDemand.services, service)
}
func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error {
func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error {
req := request{
repo: repo,
reference: reference,
@@ -73,7 +73,7 @@ func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error {
defer onDemand.requestStore.Delete(req)
defer close(syncResult)
go onDemand.syncImage(repo, reference, syncResult)
go onDemand.syncImage(ctx, repo, reference, syncResult)
err, ok := <-syncResult
if !ok {
@@ -83,7 +83,9 @@ func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error {
return err
}
func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string, referenceType string) error {
func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string,
subjectDigestStr string, referenceType string,
) error {
var err error
for _, service := range onDemand.services {
@@ -92,7 +94,7 @@ func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string
return err
}
err = service.SyncReference(repo, subjectDigestStr, referenceType)
err = service.SyncReference(ctx, repo, subjectDigestStr, referenceType)
if err != nil {
continue
} else {
@@ -103,7 +105,7 @@ func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string
return err
}
func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan error) {
func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference string, syncResult chan error) {
var err error
for serviceID, service := range onDemand.services {
err = service.SetNextAvailableURL()
@@ -113,7 +115,7 @@ func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan
return
}
err = service.SyncImage(repo, reference)
err = service.SyncImage(ctx, repo, reference)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) ||
errors.Is(err, zerr.ErrSyncImageFilteredOut) ||
@@ -150,8 +152,9 @@ func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan
time.Sleep(retryOptions.Delay)
// retrying in background, can't use the same context which should be cancelled by now.
if err = retry.RetryIfNecessary(context.Background(), func() error {
err := service.SyncImage(repo, reference)
err := service.SyncImage(context.Background(), repo, reference)
return err
}, retryOptions); err != nil {
+6 -2
View File
@@ -3,12 +3,16 @@
package sync
import "context"
type BaseOnDemand struct{}
func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error {
func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error {
return nil
}
func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string, referenceType string) error {
func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string, subjectDigestStr string,
referenceType string,
) error {
return nil
}
+11 -8
View File
@@ -4,6 +4,7 @@
package references
import (
"context"
"errors"
"fmt"
"net/http"
@@ -45,9 +46,9 @@ func (ref CosignReference) Name() string {
return constants.Cosign
}
func (ref CosignReference) IsSigned(upstreamRepo, subjectDigestStr string) bool {
func (ref CosignReference) IsSigned(ctx context.Context, upstreamRepo, subjectDigestStr string) bool {
cosignSignatureTag := getCosignSignatureTagFromSubjectDigest(subjectDigestStr)
_, _, err := ref.getManifest(upstreamRepo, cosignSignatureTag)
_, _, err := ref.getManifest(ctx, upstreamRepo, cosignSignatureTag)
return err == nil
}
@@ -85,13 +86,15 @@ func (ref CosignReference) canSkipReferences(localRepo, digest string, manifest
return true, nil
}
func (ref CosignReference) SyncReferences(localRepo, remoteRepo, subjectDigestStr string) ([]godigest.Digest, error) {
func (ref CosignReference) SyncReferences(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string) (
[]godigest.Digest, error,
) {
cosignTags := getCosignTagsFromSubjectDigest(subjectDigestStr)
refsDigests := make([]godigest.Digest, 0, len(cosignTags))
for _, cosignTag := range cosignTags {
manifest, manifestBuf, err := ref.getManifest(remoteRepo, cosignTag)
manifest, manifestBuf, err := ref.getManifest(ctx, remoteRepo, cosignTag)
if err != nil {
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
continue
@@ -120,13 +123,13 @@ func (ref CosignReference) SyncReferences(localRepo, remoteRepo, subjectDigestSt
Msg("syncing cosign reference for image")
for _, blob := range manifest.Layers {
if err := syncBlob(ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil {
if err := syncBlob(ctx, ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil {
return refsDigests, err
}
}
// sync config blob
if err := syncBlob(ref.client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, ref.log); err != nil {
if err := syncBlob(ctx, ref.client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, ref.log); err != nil {
return refsDigests, err
}
@@ -181,10 +184,10 @@ func (ref CosignReference) SyncReferences(localRepo, remoteRepo, subjectDigestSt
return refsDigests, nil
}
func (ref CosignReference) getManifest(repo, cosignTag string) (*ispec.Manifest, []byte, error) {
func (ref CosignReference) getManifest(ctx context.Context, repo, cosignTag string) (*ispec.Manifest, []byte, error) {
var cosignManifest ispec.Manifest
body, _, statusCode, err := ref.client.MakeGetRequest(&cosignManifest, ispec.MediaTypeImageManifest,
body, _, statusCode, err := ref.client.MakeGetRequest(ctx, &cosignManifest, ispec.MediaTypeImageManifest,
"v2", repo, "manifests", cosignTag)
if err != nil {
if statusCode == http.StatusNotFound {
+15 -12
View File
@@ -4,6 +4,7 @@
package references
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -45,9 +46,9 @@ func (ref OciReferences) Name() string {
return constants.OCI
}
func (ref OciReferences) IsSigned(remoteRepo, subjectDigestStr string) bool {
func (ref OciReferences) IsSigned(ctx context.Context, remoteRepo, subjectDigestStr string) bool {
// use artifactTypeFilter
index, err := ref.getIndex(remoteRepo, subjectDigestStr)
index, err := ref.getIndex(ctx, remoteRepo, subjectDigestStr)
if err != nil {
return false
}
@@ -92,10 +93,12 @@ func (ref OciReferences) canSkipReferences(localRepo, subjectDigestStr string, i
return true, nil
}
func (ref OciReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr string) ([]godigest.Digest, error) {
func (ref OciReferences) SyncReferences(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string) (
[]godigest.Digest, error,
) {
refsDigests := make([]godigest.Digest, 0, 10)
index, err := ref.getIndex(remoteRepo, subjectDigestStr)
index, err := ref.getIndex(ctx, remoteRepo, subjectDigestStr)
if err != nil {
return refsDigests, err
}
@@ -122,7 +125,7 @@ func (ref OciReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr
Msg("syncing oci references for image")
for _, referrer := range index.Manifests {
referenceBuf, referenceDigest, err := syncManifest(ref.client, imageStore, localRepo, remoteRepo,
referenceBuf, referenceDigest, err := syncManifest(ctx, ref.client, imageStore, localRepo, remoteRepo,
referrer, subjectDigestStr, ref.log)
if err != nil {
return refsDigests, err
@@ -168,10 +171,10 @@ func (ref OciReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr
return refsDigests, nil
}
func (ref OciReferences) getIndex(repo, subjectDigestStr string) (ispec.Index, error) {
func (ref OciReferences) getIndex(ctx context.Context, repo, subjectDigestStr string) (ispec.Index, error) {
var index ispec.Index
_, _, statusCode, err := ref.client.MakeGetRequest(&index, ispec.MediaTypeImageIndex,
_, _, statusCode, err := ref.client.MakeGetRequest(ctx, &index, ispec.MediaTypeImageIndex,
"v2", repo, "referrers", subjectDigestStr)
if err != nil {
if statusCode == http.StatusNotFound {
@@ -191,14 +194,14 @@ func (ref OciReferences) getIndex(repo, subjectDigestStr string) (ispec.Index, e
return index, nil
}
func syncManifest(client *client.Client, imageStore storageTypes.ImageStore, localRepo, remoteRepo string,
desc ispec.Descriptor, subjectDigestStr string, log log.Logger,
func syncManifest(ctx context.Context, client *client.Client, imageStore storageTypes.ImageStore, localRepo,
remoteRepo string, desc ispec.Descriptor, subjectDigestStr string, log log.Logger,
) ([]byte, godigest.Digest, error) {
var manifest ispec.Manifest
var refDigest godigest.Digest
OCIRefBuf, _, statusCode, err := client.MakeGetRequest(&manifest, ispec.MediaTypeImageManifest,
OCIRefBuf, _, statusCode, err := client.MakeGetRequest(ctx, &manifest, ispec.MediaTypeImageManifest,
"v2", remoteRepo, "manifests", desc.Digest.String())
if err != nil {
if statusCode == http.StatusNotFound {
@@ -226,13 +229,13 @@ func syncManifest(client *client.Client, imageStore storageTypes.ImageStore, loc
}
for _, layer := range manifest.Layers {
if err := syncBlob(client, imageStore, localRepo, remoteRepo, layer.Digest, log); err != nil {
if err := syncBlob(ctx, client, imageStore, localRepo, remoteRepo, layer.Digest, log); err != nil {
return []byte{}, refDigest, err
}
}
// sync config blob
if err := syncBlob(client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, log); err != nil {
if err := syncBlob(ctx, client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, log); err != nil {
return []byte{}, refDigest, err
}
} else {
+10 -7
View File
@@ -4,6 +4,7 @@
package references
import (
"context"
"errors"
"fmt"
"net/http"
@@ -48,7 +49,7 @@ func (ref ORASReferences) Name() string {
return constants.Oras
}
func (ref ORASReferences) IsSigned(remoteRepo, subjectDigestStr string) bool {
func (ref ORASReferences) IsSigned(ctx context.Context, remoteRepo, subjectDigestStr string) bool {
return false
}
@@ -85,10 +86,12 @@ func (ref ORASReferences) canSkipReferences(localRepo, subjectDigestStr string,
return true, nil
}
func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr string) ([]godigest.Digest, error) {
func (ref ORASReferences) SyncReferences(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string) (
[]godigest.Digest, error,
) {
refsDigests := make([]godigest.Digest, 0, 10)
referrers, err := ref.getReferenceList(remoteRepo, subjectDigestStr)
referrers, err := ref.getReferenceList(ctx, remoteRepo, subjectDigestStr)
if err != nil {
return refsDigests, err
}
@@ -115,7 +118,7 @@ func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr
for _, referrer := range referrers.References {
var artifactManifest oras.Manifest
orasBuf, _, statusCode, err := ref.client.MakeGetRequest(&artifactManifest, oras.MediaTypeDescriptor,
orasBuf, _, statusCode, err := ref.client.MakeGetRequest(ctx, &artifactManifest, oras.MediaTypeDescriptor,
"v2", remoteRepo, "manifests", referrer.Digest.String())
if err != nil {
if statusCode == http.StatusNotFound {
@@ -130,7 +133,7 @@ func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr
}
for _, blob := range artifactManifest.Blobs {
if err := syncBlob(ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil {
if err := syncBlob(ctx, ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil {
return refsDigests, err
}
}
@@ -170,10 +173,10 @@ func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr
return refsDigests, nil
}
func (ref ORASReferences) getReferenceList(repo, subjectDigestStr string) (ReferenceList, error) {
func (ref ORASReferences) getReferenceList(ctx context.Context, repo, subjectDigestStr string) (ReferenceList, error) {
var referrers ReferenceList
_, _, statusCode, err := ref.client.MakeGetRequest(&referrers, "application/json",
_, _, statusCode, err := ref.client.MakeGetRequest(ctx, &referrers, "application/json",
apiConstants.ArtifactSpecRoutePrefix, repo, "manifests", subjectDigestStr, "referrers")
if err != nil {
if statusCode == http.StatusNotFound || statusCode == http.StatusBadRequest {
+20 -15
View File
@@ -5,6 +5,7 @@ package references
import (
"bytes"
"context"
"fmt"
"net/http"
@@ -26,9 +27,9 @@ type Reference interface {
// Returns name of reference (OCIReference/CosignReference/OrasReference)
Name() string
// Returns whether or not image is signed
IsSigned(upstreamRepo, subjectDigestStr string) bool
IsSigned(ctx context.Context, upstreamRepo, subjectDigestStr string) bool
// Sync recursively all references for a subject digest (can be image/artifacts/signatures)
SyncReferences(localRepo, upstreamRepo, subjectDigestStr string) ([]godigest.Digest, error)
SyncReferences(ctx context.Context, localRepo, upstreamRepo, subjectDigestStr string) ([]godigest.Digest, error)
}
type References struct {
@@ -48,9 +49,9 @@ func NewReferences(httpClient *client.Client, storeController storage.StoreContr
return refs
}
func (refs References) IsSigned(upstreamRepo, subjectDigestStr string) bool {
func (refs References) IsSigned(ctx context.Context, upstreamRepo, subjectDigestStr string) bool {
for _, ref := range refs.referenceList {
ok := ref.IsSigned(upstreamRepo, subjectDigestStr)
ok := ref.IsSigned(ctx, upstreamRepo, subjectDigestStr)
if ok {
return true
}
@@ -59,13 +60,15 @@ func (refs References) IsSigned(upstreamRepo, subjectDigestStr string) bool {
return false
}
func (refs References) SyncAll(localRepo, upstreamRepo, subjectDigestStr string) error {
func (refs References) SyncAll(ctx context.Context, localRepo, upstreamRepo, subjectDigestStr string) error {
seen := &[]godigest.Digest{}
return refs.syncAll(localRepo, upstreamRepo, subjectDigestStr, seen)
return refs.syncAll(ctx, localRepo, upstreamRepo, subjectDigestStr, seen)
}
func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string, seen *[]godigest.Digest) error {
func (refs References) syncAll(ctx context.Context, localRepo, upstreamRepo,
subjectDigestStr string, seen *[]godigest.Digest,
) error {
var err error
var syncedRefsDigests []godigest.Digest
@@ -75,7 +78,7 @@ func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string,
// for each reference type(cosign/oci/oras reference)
for _, ref := range refs.referenceList {
syncedRefsDigests, err = ref.SyncReferences(localRepo, upstreamRepo, subjectDigestStr)
syncedRefsDigests, err = ref.SyncReferences(ctx, localRepo, upstreamRepo, subjectDigestStr)
if err != nil {
refs.log.Debug().Err(err).
Str("reference type", ref.Name()).
@@ -87,7 +90,7 @@ func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string,
for _, refDigest := range syncedRefsDigests {
if !common.Contains(*seen, refDigest) {
// sync all references pointing to this one
err = refs.syncAll(localRepo, upstreamRepo, refDigest.String(), seen)
err = refs.syncAll(ctx, localRepo, upstreamRepo, refDigest.String(), seen)
}
}
}
@@ -95,14 +98,16 @@ func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string,
return err
}
func (refs References) SyncReference(localRepo, upstreamRepo, subjectDigestStr, referenceType string) error {
func (refs References) SyncReference(ctx context.Context, localRepo, upstreamRepo,
subjectDigestStr, referenceType string,
) error {
var err error
var syncedRefsDigests []godigest.Digest
for _, ref := range refs.referenceList {
if ref.Name() == referenceType {
syncedRefsDigests, err = ref.SyncReferences(localRepo, upstreamRepo, subjectDigestStr)
syncedRefsDigests, err = ref.SyncReferences(ctx, localRepo, upstreamRepo, subjectDigestStr)
if err != nil {
refs.log.Error().Err(err).
Str("reference type", ref.Name()).
@@ -113,7 +118,7 @@ func (refs References) SyncReference(localRepo, upstreamRepo, subjectDigestStr,
}
for _, refDigest := range syncedRefsDigests {
err = refs.SyncAll(localRepo, upstreamRepo, refDigest.String())
err = refs.SyncAll(ctx, localRepo, upstreamRepo, refDigest.String())
}
}
}
@@ -121,12 +126,12 @@ func (refs References) SyncReference(localRepo, upstreamRepo, subjectDigestStr,
return err
}
func syncBlob(client *client.Client, imageStore storageTypes.ImageStore, localRepo, remoteRepo string,
digest godigest.Digest, log log.Logger,
func syncBlob(ctx context.Context, client *client.Client, imageStore storageTypes.ImageStore,
localRepo, remoteRepo string, digest godigest.Digest, log log.Logger,
) error {
var resultPtr interface{}
body, _, statusCode, err := client.MakeGetRequest(resultPtr, "", "v2", remoteRepo, "blobs", digest.String())
body, _, statusCode, err := client.MakeGetRequest(ctx, resultPtr, "", "v2", remoteRepo, "blobs", digest.String())
if err != nil {
if statusCode != http.StatusOK {
log.Info().Str("repo", remoteRepo).Str("digest", digest.String()).Msg("couldn't get remote blob")
@@ -4,6 +4,7 @@
package references
import (
"context"
"errors"
"testing"
@@ -75,7 +76,7 @@ func TestOci(t *testing.T) {
},
}}, nil, log.NewLogger("debug", ""))
ok := oci.IsSigned("repo", "")
ok := oci.IsSigned(context.Background(), "repo", "")
So(ok, ShouldBeFalse)
// trigger GetReferrers err
@@ -136,11 +137,12 @@ func TestSyncManifest(t *testing.T) {
digest := godigest.FromString("test")
buf, refDigest, err := syncManifest(client, mocks.MockedImageStore{}, "repo", "repo", ispec.Descriptor{
Digest: digest,
Size: 10,
MediaType: ispec.MediaTypeImageManifest,
}, digest.String(), log.Logger{})
buf, refDigest, err := syncManifest(context.Background(), client, mocks.MockedImageStore{},
"repo", "repo", ispec.Descriptor{
Digest: digest,
Size: 10,
MediaType: ispec.MediaTypeImageManifest,
}, digest.String(), log.Logger{})
So(buf, ShouldBeEmpty)
So(refDigest, ShouldBeEmpty)
+1 -1
View File
@@ -49,7 +49,7 @@ func (registry *RemoteRegistry) GetContext() *types.SystemContext {
func (registry *RemoteRegistry) GetRepositories(ctx context.Context) ([]string, error) {
var catalog catalog
_, _, _, err := registry.client.MakeGetRequest(&catalog, "application/json", //nolint: dogsled
_, _, _, err := registry.client.MakeGetRequest(ctx, &catalog, "application/json", //nolint: dogsled
constants.RoutePrefix, constants.ExtCatalogPrefix)
if err != nil {
return []string{}, err
+22 -14
View File
@@ -203,7 +203,9 @@ func (service *BaseService) GetNextRepo(lastRepo string) (string, error) {
}
// SyncReference on demand.
func (service *BaseService) SyncReference(repo string, subjectDigestStr string, referenceType string) error {
func (service *BaseService) SyncReference(ctx context.Context, repo string,
subjectDigestStr string, referenceType string,
) error {
remoteRepo := repo
remoteURL := service.client.GetConfig().URL
@@ -221,11 +223,11 @@ func (service *BaseService) SyncReference(repo string, subjectDigestStr string,
service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("subject", subjectDigestStr).
Str("reference type", referenceType).Msg("sync: syncing reference for image")
return service.references.SyncReference(repo, remoteRepo, subjectDigestStr, referenceType)
return service.references.SyncReference(ctx, repo, remoteRepo, subjectDigestStr, referenceType)
}
// SyncImage on demand.
func (service *BaseService) SyncImage(repo, reference string) error {
func (service *BaseService) SyncImage(ctx context.Context, repo, reference string) error {
remoteRepo := repo
remoteURL := service.client.GetConfig().URL
@@ -243,12 +245,12 @@ func (service *BaseService) SyncImage(repo, reference string) error {
service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference).
Msg("sync: syncing image")
manifestDigest, err := service.syncTag(repo, remoteRepo, reference)
manifestDigest, err := service.syncTag(ctx, repo, remoteRepo, reference)
if err != nil {
return err
}
err = service.references.SyncAll(repo, remoteRepo, manifestDigest.String())
err = service.references.SyncAll(ctx, repo, remoteRepo, manifestDigest.String())
if err != nil && !errors.Is(err, zerr.ErrSyncReferrerNotFound) {
service.log.Error().Err(err).Str("remote", remoteURL).Str("repo", repo).Str("reference", reference).
Msg("error while syncing references for image")
@@ -260,7 +262,7 @@ func (service *BaseService) SyncImage(repo, reference string) error {
}
// sync repo periodically.
func (service *BaseService) SyncRepo(repo string) error {
func (service *BaseService) SyncRepo(ctx context.Context, repo string) error {
service.log.Info().Str("repo", repo).Str("registry", service.client.GetConfig().URL).
Msg("sync: syncing repo")
@@ -268,7 +270,7 @@ func (service *BaseService) SyncRepo(repo string) error {
var tags []string
if err = retry.RetryIfNecessary(context.Background(), func() error {
if err = retry.RetryIfNecessary(ctx, func() error {
tags, err = service.remote.GetRepoTags(repo)
return err
@@ -291,14 +293,20 @@ func (service *BaseService) SyncRepo(repo string) error {
localRepo := service.contentManager.GetRepoDestination(repo)
for _, tag := range tags {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if references.IsCosignTag(tag) {
continue
}
var manifestDigest digest.Digest
if err = retry.RetryIfNecessary(context.Background(), func() error {
manifestDigest, err = service.syncTag(localRepo, repo, tag)
if err = retry.RetryIfNecessary(ctx, func() error {
manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag)
return err
}, service.retryOptions); err != nil {
@@ -314,8 +322,8 @@ func (service *BaseService) SyncRepo(repo string) error {
}
if manifestDigest != "" {
if err = retry.RetryIfNecessary(context.Background(), func() error {
err = service.references.SyncAll(localRepo, repo, manifestDigest.String())
if err = retry.RetryIfNecessary(ctx, func() error {
err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String())
if errors.Is(err, zerr.ErrSyncReferrerNotFound) {
return nil
}
@@ -335,7 +343,7 @@ func (service *BaseService) SyncRepo(repo string) error {
return nil
}
func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.Digest, error) {
func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) {
copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext())
policyContext, err := getPolicyContext(service.log)
@@ -368,7 +376,7 @@ func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.D
}
if service.config.OnlySigned != nil && *service.config.OnlySigned && !references.IsCosignTag(tag) {
signed := service.references.IsSigned(remoteRepo, manifestDigest.String())
signed := service.references.IsSigned(ctx, remoteRepo, manifestDigest.String())
if !signed {
// skip unsigned images
service.log.Info().Str("image", remoteImageRef.DockerReference().String()).
@@ -397,7 +405,7 @@ func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.D
service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()).
Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image")
_, err = copy.Image(context.Background(), policyContext, localImageRef, remoteImageRef, &copyOptions)
_, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, &copyOptions)
if err != nil {
service.log.Error().Err(err).Str("errortype", common.TypeOf(err)).
Str("remote image", remoteImageRef.DockerReference().String()).
+6 -5
View File
@@ -23,11 +23,12 @@ type Service interface {
// Get next repo from remote /v2/_catalog, will return empty string when there is no repo left.
GetNextRepo(lastRepo string) (string, error) // used by task scheduler
// Sync a repo with all of its tags and references (signatures, artifacts, sboms) into ImageStore.
SyncRepo(repo string) error // used by periodically sync
SyncRepo(ctx context.Context, repo string) error // used by periodically sync
// Sync an image (repo:tag || repo:digest) into ImageStore.
SyncImage(repo, reference string) error // used by sync on demand
SyncImage(ctx context.Context, repo, reference string) error // used by sync on demand
// Sync a single reference for an image.
SyncReference(repo string, subjectDigestStr string, referenceType string) error // used by sync on demand
SyncReference(ctx context.Context, repo string, subjectDigestStr string,
referenceType string) error // used by sync on demand
// Remove all internal catalog entries.
ResetCatalog() // used by scheduler to empty out the catalog after a sync periodically roundtrip finishes
// Sync supports multiple urls per registry, before a sync repo/image/ref 'ping' each url.
@@ -133,6 +134,6 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask {
return &syncRepoTask{repo, service}
}
func (srt *syncRepoTask) DoWork() error {
return srt.service.SyncRepo(srt.repo)
func (srt *syncRepoTask) DoWork(ctx context.Context) error {
return srt.service.SyncRepo(ctx, srt.repo)
}
+1 -1
View File
@@ -165,7 +165,7 @@ func TestService(t *testing.T) {
service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{})
So(err, ShouldBeNil)
err = service.SyncRepo("repo")
err = service.SyncRepo(context.Background(), "repo")
So(err, ShouldNotBeNil)
})
}