diff --git a/pkg/api/authn.go b/pkg/api/authn.go index 7b4ffb81..f146ec16 100644 --- a/pkg/api/authn.go +++ b/pkg/api/authn.go @@ -548,9 +548,9 @@ func (rh *RouteHandler) AuthURLHandler() http.HandlerFunc { client, ok := rh.c.RelyingParties[provider] if !ok { - http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { - response.WriteHeader(http.StatusBadRequest) - })(w, r) + rh.c.Log.Error().Msg("unrecognized openid provider") + + w.WriteHeader(http.StatusBadRequest) return } diff --git a/pkg/api/controller.go b/pkg/api/controller.go index b424457f..c854384d 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -358,7 +358,7 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { if c.Config.Extensions != nil { ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler) - + //nolint: contextcheck syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, taskScheduler, c.Log) if err != nil { c.Log.Error().Err(err).Msg("unable to start sync extension") @@ -372,6 +372,6 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) { } type SyncOnDemand interface { - SyncImage(repo, reference string) error - SyncReference(repo string, subjectDigestStr string, referenceType string) error + SyncImage(ctx context.Context, repo, reference string) error + SyncReference(ctx context.Context, repo string, subjectDigestStr string, referenceType string) error } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index d43ecf63..7a8efb35 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -8,6 +8,7 @@ package api import ( + "context" "encoding/json" "errors" "fmt" @@ -429,7 +430,7 @@ func (rh *RouteHandler) CheckManifest(response http.ResponseWriter, request *htt return } - content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint:contextcheck + content, digest, mediaType, err := getImageManifest(request.Context(), rh, imgStore, name, reference) if err != nil { details := zerr.GetDetails(err) details["reference"] = reference @@ -499,7 +500,7 @@ func (rh *RouteHandler) GetManifest(response http.ResponseWriter, request *http. return } - content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint: contextcheck + content, digest, mediaType, err := getImageManifest(request.Context(), rh, imgStore, name, reference) if err != nil { details := zerr.GetDetails(err) if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain @@ -541,7 +542,7 @@ type ImageIndex struct { ispec.Index } -func getReferrers(routeHandler *RouteHandler, +func getReferrers(ctx context.Context, routeHandler *RouteHandler, imgStore storageTypes.ImageStore, name string, digest godigest.Digest, artifactTypes []string, ) (ispec.Index, error) { @@ -551,7 +552,8 @@ func getReferrers(routeHandler *RouteHandler, routeHandler.c.Log.Info().Str("repository", name).Str("reference", digest.String()). Msg("referrers not found, trying to get reference by syncing on demand") - if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), syncConstants.OCI); errSync != nil { + if errSync := routeHandler.c.SyncOnDemand.SyncReference(ctx, name, digest.String(), + syncConstants.OCI); errSync != nil { routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", digest.String()). Msg("error encounter while syncing OCI reference for image") } @@ -605,7 +607,7 @@ func (rh *RouteHandler) GetReferrers(response http.ResponseWriter, request *http imgStore := rh.getImageStore(name) - referrers, err := getReferrers(rh, imgStore, name, digest, artifactTypes) + referrers, err := getReferrers(request.Context(), rh, imgStore, name, digest, artifactTypes) if err != nil { if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) { rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found") @@ -1913,7 +1915,7 @@ func (rh *RouteHandler) getImageStore(name string) storageTypes.ImageStore { } // will sync on demand if an image is not found, in case sync extensions is enabled. -func getImageManifest(routeHandler *RouteHandler, imgStore storageTypes.ImageStore, name, +func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storageTypes.ImageStore, name, reference string, ) ([]byte, godigest.Digest, string, error) { syncEnabled := isSyncOnDemandEnabled(*routeHandler.c) @@ -1931,7 +1933,7 @@ func getImageManifest(routeHandler *RouteHandler, imgStore storageTypes.ImageSto routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). Msg("trying to get updated image by syncing on demand") - if errSync := routeHandler.c.SyncOnDemand.SyncImage(name, reference); errSync != nil { + if errSync := routeHandler.c.SyncOnDemand.SyncImage(ctx, name, reference); errSync != nil { routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference). Msg("error encounter while syncing image") } @@ -1941,7 +1943,7 @@ func getImageManifest(routeHandler *RouteHandler, imgStore storageTypes.ImageSto } // will sync referrers on demand if they are not found, in case sync extensions is enabled. -func getOrasReferrers(routeHandler *RouteHandler, +func getOrasReferrers(ctx context.Context, routeHandler *RouteHandler, imgStore storageTypes.ImageStore, name string, digest godigest.Digest, artifactType string, ) ([]artifactspec.Descriptor, error) { @@ -1951,7 +1953,8 @@ func getOrasReferrers(routeHandler *RouteHandler, routeHandler.c.Log.Info().Str("repository", name).Str("reference", digest.String()). Msg("artifact not found, trying to get artifact by syncing on demand") - if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), syncConstants.Oras); errSync != nil { + if errSync := routeHandler.c.SyncOnDemand.SyncReference(ctx, name, digest.String(), + syncConstants.Oras); errSync != nil { routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()). Msg("unable to get references") } @@ -2017,7 +2020,7 @@ func (rh *RouteHandler) GetOrasReferrers(response http.ResponseWriter, request * rh.c.Log.Info().Str("digest", digest.String()).Str("artifactType", artifactType).Msg("getting manifest") - refs, err := getOrasReferrers(rh, imgStore, name, digest, artifactType) //nolint:contextcheck + refs, err := getOrasReferrers(request.Context(), rh, imgStore, name, digest, artifactType) //nolint:contextcheck if err != nil { if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) { rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found") diff --git a/pkg/cli/root.go b/pkg/cli/root.go index ce00cbe8..b17ce6b3 100644 --- a/pkg/cli/root.go +++ b/pkg/cli/root.go @@ -124,7 +124,7 @@ func newScrubCmd(conf *config.Config) *cobra.Command { panic(err) } - result, err := ctlr.StoreController.CheckAllBlobsIntegrity() + result, err := ctlr.StoreController.CheckAllBlobsIntegrity(cmd.Context()) if err != nil { panic(err) } diff --git a/pkg/common/http_client.go b/pkg/common/http_client.go index 03a3f210..26ecedb3 100644 --- a/pkg/common/http_client.go +++ b/pkg/common/http_client.go @@ -1,6 +1,7 @@ package common import ( + "context" "crypto/tls" "crypto/x509" "encoding/json" @@ -112,10 +113,11 @@ func CreateHTTPClient(verifyTLS bool, host string, certDir string) (*http.Client }, nil } -func MakeHTTPGetRequest(httpClient *http.Client, username string, password string, resultPtr interface{}, +func MakeHTTPGetRequest(ctx context.Context, httpClient *http.Client, + username string, password string, resultPtr interface{}, blobURL string, mediaType string, log log.Logger, ) ([]byte, string, int, error) { - req, err := http.NewRequest(http.MethodGet, blobURL, nil) //nolint + req, err := http.NewRequestWithContext(ctx, http.MethodGet, blobURL, nil) //nolint if err != nil { return nil, "", 0, err } diff --git a/pkg/common/http_client_test.go b/pkg/common/http_client_test.go index be61e170..4a64cc2d 100644 --- a/pkg/common/http_client_test.go +++ b/pkg/common/http_client_test.go @@ -1,6 +1,7 @@ package common_test import ( + "context" "crypto/x509" "os" "path" @@ -75,7 +76,7 @@ func TestHTTPClient(t *testing.T) { var resultPtr interface{} httpClient, err := common.CreateHTTPClient(true, "localhost", tempDir) So(err, ShouldBeNil) - _, _, _, err = common.MakeHTTPGetRequest(httpClient, "", "", + _, _, _, err = common.MakeHTTPGetRequest(context.Background(), httpClient, "", "", resultPtr, baseURL+"/v2/", ispec.MediaTypeImageManifest, log.NewLogger("", "")) So(err, ShouldBeNil) }) diff --git a/pkg/extensions/extension_search.go b/pkg/extensions/extension_search.go index ff3d8c91..83024a51 100644 --- a/pkg/extensions/extension_search.go +++ b/pkg/extensions/extension_search.go @@ -4,6 +4,7 @@ package extensions import ( + "context" "net/http" "sync" "time" @@ -132,7 +133,7 @@ func newTrivyTask(interval time.Duration, cveInfo cveinfo.CveInfo, return &trivyTask{interval, cveInfo, generator, log} } -func (trivyT *trivyTask) DoWork() error { +func (trivyT *trivyTask) DoWork(ctx context.Context) error { trivyT.log.Info().Msg("updating the CVE database") err := trivyT.cveInfo.UpdateDB() diff --git a/pkg/extensions/imagetrust/image_trust.go b/pkg/extensions/imagetrust/image_trust.go index ca4c8f2f..42c1466c 100644 --- a/pkg/extensions/imagetrust/image_trust.go +++ b/pkg/extensions/imagetrust/image_trust.go @@ -135,7 +135,7 @@ func NewValidityTask(metaDB mTypes.MetaDB, repo mTypes.RepoMetadata, log log.Log return &validityTask{metaDB, repo, log} } -func (validityT *validityTask) DoWork() error { +func (validityT *validityTask) DoWork(ctx context.Context) error { validityT.log.Info().Msg("updating signatures validity") for signedManifest, sigs := range validityT.repo.Signatures { diff --git a/pkg/extensions/scrub/scrub.go b/pkg/extensions/scrub/scrub.go index 9a94d05d..35d7f3d6 100644 --- a/pkg/extensions/scrub/scrub.go +++ b/pkg/extensions/scrub/scrub.go @@ -4,6 +4,7 @@ package scrub import ( + "context" "fmt" "path" @@ -13,11 +14,11 @@ import ( ) // Scrub Extension for repo... -func RunScrubRepo(imgStore storageTypes.ImageStore, repo string, log log.Logger) error { +func RunScrubRepo(ctx context.Context, imgStore storageTypes.ImageStore, repo string, log log.Logger) error { execMsg := fmt.Sprintf("executing scrub to check manifest/blob integrity for %s", path.Join(imgStore.RootDir(), repo)) log.Info().Msg(execMsg) - results, err := storage.CheckRepo(repo, imgStore) + results, err := storage.CheckRepo(ctx, repo, imgStore) if err != nil { errMessage := fmt.Sprintf("error while running scrub for %s", path.Join(imgStore.RootDir(), repo)) log.Error().Err(err).Msg(errMessage) @@ -58,6 +59,6 @@ func NewTask(imgStore storageTypes.ImageStore, repo string, log log.Logger) *Tas return &Task{imgStore, repo, log} } -func (scrubT *Task) DoWork() error { - return RunScrubRepo(scrubT.imgStore, scrubT.repo, scrubT.log) +func (scrubT *Task) DoWork(ctx context.Context) error { + return RunScrubRepo(ctx, scrubT.imgStore, scrubT.repo, scrubT.log) //nolint: contextcheck } diff --git a/pkg/extensions/scrub/scrub_test.go b/pkg/extensions/scrub/scrub_test.go index 29814850..c03c6fe6 100644 --- a/pkg/extensions/scrub/scrub_test.go +++ b/pkg/extensions/scrub/scrub_test.go @@ -4,6 +4,7 @@ package scrub_test import ( + "context" "fmt" "os" "path" @@ -207,7 +208,7 @@ func TestRunScrubRepo(t *testing.T) { err = test.WriteImageToFileSystem(image, repoName, "0.0.1", srcStorageCtlr) So(err, ShouldBeNil) - err = scrub.RunScrubRepo(imgStore, repoName, log) + err = scrub.RunScrubRepo(context.Background(), imgStore, repoName, log) So(err, ShouldBeNil) data, err := os.ReadFile(logFile.Name()) @@ -250,7 +251,7 @@ func TestRunScrubRepo(t *testing.T) { panic(err) } - err = scrub.RunScrubRepo(imgStore, repoName, log) + err = scrub.RunScrubRepo(context.Background(), imgStore, repoName, log) So(err, ShouldBeNil) data, err := os.ReadFile(logFile.Name()) @@ -288,7 +289,7 @@ func TestRunScrubRepo(t *testing.T) { So(os.Chmod(path.Join(dir, repoName), 0o000), ShouldBeNil) - err = scrub.RunScrubRepo(imgStore, repoName, log) + err = scrub.RunScrubRepo(context.Background(), imgStore, repoName, log) So(err, ShouldNotBeNil) data, err := os.ReadFile(logFile.Name()) diff --git a/pkg/extensions/sync/httpclient/client.go b/pkg/extensions/sync/httpclient/client.go index eff00698..d71233d2 100644 --- a/pkg/extensions/sync/httpclient/client.go +++ b/pkg/extensions/sync/httpclient/client.go @@ -1,6 +1,7 @@ package client import ( + "context" "net/http" "net/url" "sync" @@ -71,7 +72,7 @@ func (httpClient *Client) SetConfig(config Config) error { } func (httpClient *Client) IsAvailable() bool { - _, _, statusCode, err := httpClient.MakeGetRequest(nil, "", "/v2/") + _, _, statusCode, err := httpClient.MakeGetRequest(context.Background(), nil, "", "/v2/") if err != nil || statusCode != http.StatusOK { return false } @@ -79,7 +80,7 @@ func (httpClient *Client) IsAvailable() bool { return true } -func (httpClient *Client) MakeGetRequest(resultPtr interface{}, mediaType string, +func (httpClient *Client) MakeGetRequest(ctx context.Context, resultPtr interface{}, mediaType string, route ...string, ) ([]byte, string, int, error) { httpClient.lock.RLock() @@ -93,7 +94,7 @@ func (httpClient *Client) MakeGetRequest(resultPtr interface{}, mediaType string url.RawQuery = url.Query().Encode() - body, mediaType, statusCode, err := common.MakeHTTPGetRequest(httpClient.client, httpClient.config.Username, + body, mediaType, statusCode, err := common.MakeHTTPGetRequest(ctx, httpClient.client, httpClient.config.Username, httpClient.config.Password, resultPtr, url.String(), mediaType, httpClient.log) diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 14a73f40..06212804 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -45,7 +45,7 @@ func (onDemand *BaseOnDemand) Add(service Service) { onDemand.services = append(onDemand.services, service) } -func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error { +func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error { req := request{ repo: repo, reference: reference, @@ -73,7 +73,7 @@ func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error { defer onDemand.requestStore.Delete(req) defer close(syncResult) - go onDemand.syncImage(repo, reference, syncResult) + go onDemand.syncImage(ctx, repo, reference, syncResult) err, ok := <-syncResult if !ok { @@ -83,7 +83,9 @@ func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error { return err } -func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string, referenceType string) error { +func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string, + subjectDigestStr string, referenceType string, +) error { var err error for _, service := range onDemand.services { @@ -92,7 +94,7 @@ func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string return err } - err = service.SyncReference(repo, subjectDigestStr, referenceType) + err = service.SyncReference(ctx, repo, subjectDigestStr, referenceType) if err != nil { continue } else { @@ -103,7 +105,7 @@ func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string return err } -func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan error) { +func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference string, syncResult chan error) { var err error for serviceID, service := range onDemand.services { err = service.SetNextAvailableURL() @@ -113,7 +115,7 @@ func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan return } - err = service.SyncImage(repo, reference) + err = service.SyncImage(ctx, repo, reference) if err != nil { if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrSyncImageFilteredOut) || @@ -150,8 +152,9 @@ func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan time.Sleep(retryOptions.Delay) + // retrying in background, can't use the same context which should be cancelled by now. if err = retry.RetryIfNecessary(context.Background(), func() error { - err := service.SyncImage(repo, reference) + err := service.SyncImage(context.Background(), repo, reference) return err }, retryOptions); err != nil { diff --git a/pkg/extensions/sync/on_demand_disabled.go b/pkg/extensions/sync/on_demand_disabled.go index 7a284bda..57772d58 100644 --- a/pkg/extensions/sync/on_demand_disabled.go +++ b/pkg/extensions/sync/on_demand_disabled.go @@ -3,12 +3,16 @@ package sync +import "context" + type BaseOnDemand struct{} -func (onDemand *BaseOnDemand) SyncImage(repo, reference string) error { +func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error { return nil } -func (onDemand *BaseOnDemand) SyncReference(repo string, subjectDigestStr string, referenceType string) error { +func (onDemand *BaseOnDemand) SyncReference(ctx context.Context, repo string, subjectDigestStr string, + referenceType string, +) error { return nil } diff --git a/pkg/extensions/sync/references/cosign.go b/pkg/extensions/sync/references/cosign.go index 7dd5d13b..dee938ee 100644 --- a/pkg/extensions/sync/references/cosign.go +++ b/pkg/extensions/sync/references/cosign.go @@ -4,6 +4,7 @@ package references import ( + "context" "errors" "fmt" "net/http" @@ -45,9 +46,9 @@ func (ref CosignReference) Name() string { return constants.Cosign } -func (ref CosignReference) IsSigned(upstreamRepo, subjectDigestStr string) bool { +func (ref CosignReference) IsSigned(ctx context.Context, upstreamRepo, subjectDigestStr string) bool { cosignSignatureTag := getCosignSignatureTagFromSubjectDigest(subjectDigestStr) - _, _, err := ref.getManifest(upstreamRepo, cosignSignatureTag) + _, _, err := ref.getManifest(ctx, upstreamRepo, cosignSignatureTag) return err == nil } @@ -85,13 +86,15 @@ func (ref CosignReference) canSkipReferences(localRepo, digest string, manifest return true, nil } -func (ref CosignReference) SyncReferences(localRepo, remoteRepo, subjectDigestStr string) ([]godigest.Digest, error) { +func (ref CosignReference) SyncReferences(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string) ( + []godigest.Digest, error, +) { cosignTags := getCosignTagsFromSubjectDigest(subjectDigestStr) refsDigests := make([]godigest.Digest, 0, len(cosignTags)) for _, cosignTag := range cosignTags { - manifest, manifestBuf, err := ref.getManifest(remoteRepo, cosignTag) + manifest, manifestBuf, err := ref.getManifest(ctx, remoteRepo, cosignTag) if err != nil { if errors.Is(err, zerr.ErrSyncReferrerNotFound) { continue @@ -120,13 +123,13 @@ func (ref CosignReference) SyncReferences(localRepo, remoteRepo, subjectDigestSt Msg("syncing cosign reference for image") for _, blob := range manifest.Layers { - if err := syncBlob(ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil { + if err := syncBlob(ctx, ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil { return refsDigests, err } } // sync config blob - if err := syncBlob(ref.client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, ref.log); err != nil { + if err := syncBlob(ctx, ref.client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, ref.log); err != nil { return refsDigests, err } @@ -181,10 +184,10 @@ func (ref CosignReference) SyncReferences(localRepo, remoteRepo, subjectDigestSt return refsDigests, nil } -func (ref CosignReference) getManifest(repo, cosignTag string) (*ispec.Manifest, []byte, error) { +func (ref CosignReference) getManifest(ctx context.Context, repo, cosignTag string) (*ispec.Manifest, []byte, error) { var cosignManifest ispec.Manifest - body, _, statusCode, err := ref.client.MakeGetRequest(&cosignManifest, ispec.MediaTypeImageManifest, + body, _, statusCode, err := ref.client.MakeGetRequest(ctx, &cosignManifest, ispec.MediaTypeImageManifest, "v2", repo, "manifests", cosignTag) if err != nil { if statusCode == http.StatusNotFound { diff --git a/pkg/extensions/sync/references/oci.go b/pkg/extensions/sync/references/oci.go index 26592b9c..86f55c31 100644 --- a/pkg/extensions/sync/references/oci.go +++ b/pkg/extensions/sync/references/oci.go @@ -4,6 +4,7 @@ package references import ( + "context" "encoding/json" "errors" "fmt" @@ -45,9 +46,9 @@ func (ref OciReferences) Name() string { return constants.OCI } -func (ref OciReferences) IsSigned(remoteRepo, subjectDigestStr string) bool { +func (ref OciReferences) IsSigned(ctx context.Context, remoteRepo, subjectDigestStr string) bool { // use artifactTypeFilter - index, err := ref.getIndex(remoteRepo, subjectDigestStr) + index, err := ref.getIndex(ctx, remoteRepo, subjectDigestStr) if err != nil { return false } @@ -92,10 +93,12 @@ func (ref OciReferences) canSkipReferences(localRepo, subjectDigestStr string, i return true, nil } -func (ref OciReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr string) ([]godigest.Digest, error) { +func (ref OciReferences) SyncReferences(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string) ( + []godigest.Digest, error, +) { refsDigests := make([]godigest.Digest, 0, 10) - index, err := ref.getIndex(remoteRepo, subjectDigestStr) + index, err := ref.getIndex(ctx, remoteRepo, subjectDigestStr) if err != nil { return refsDigests, err } @@ -122,7 +125,7 @@ func (ref OciReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr Msg("syncing oci references for image") for _, referrer := range index.Manifests { - referenceBuf, referenceDigest, err := syncManifest(ref.client, imageStore, localRepo, remoteRepo, + referenceBuf, referenceDigest, err := syncManifest(ctx, ref.client, imageStore, localRepo, remoteRepo, referrer, subjectDigestStr, ref.log) if err != nil { return refsDigests, err @@ -168,10 +171,10 @@ func (ref OciReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr return refsDigests, nil } -func (ref OciReferences) getIndex(repo, subjectDigestStr string) (ispec.Index, error) { +func (ref OciReferences) getIndex(ctx context.Context, repo, subjectDigestStr string) (ispec.Index, error) { var index ispec.Index - _, _, statusCode, err := ref.client.MakeGetRequest(&index, ispec.MediaTypeImageIndex, + _, _, statusCode, err := ref.client.MakeGetRequest(ctx, &index, ispec.MediaTypeImageIndex, "v2", repo, "referrers", subjectDigestStr) if err != nil { if statusCode == http.StatusNotFound { @@ -191,14 +194,14 @@ func (ref OciReferences) getIndex(repo, subjectDigestStr string) (ispec.Index, e return index, nil } -func syncManifest(client *client.Client, imageStore storageTypes.ImageStore, localRepo, remoteRepo string, - desc ispec.Descriptor, subjectDigestStr string, log log.Logger, +func syncManifest(ctx context.Context, client *client.Client, imageStore storageTypes.ImageStore, localRepo, + remoteRepo string, desc ispec.Descriptor, subjectDigestStr string, log log.Logger, ) ([]byte, godigest.Digest, error) { var manifest ispec.Manifest var refDigest godigest.Digest - OCIRefBuf, _, statusCode, err := client.MakeGetRequest(&manifest, ispec.MediaTypeImageManifest, + OCIRefBuf, _, statusCode, err := client.MakeGetRequest(ctx, &manifest, ispec.MediaTypeImageManifest, "v2", remoteRepo, "manifests", desc.Digest.String()) if err != nil { if statusCode == http.StatusNotFound { @@ -226,13 +229,13 @@ func syncManifest(client *client.Client, imageStore storageTypes.ImageStore, loc } for _, layer := range manifest.Layers { - if err := syncBlob(client, imageStore, localRepo, remoteRepo, layer.Digest, log); err != nil { + if err := syncBlob(ctx, client, imageStore, localRepo, remoteRepo, layer.Digest, log); err != nil { return []byte{}, refDigest, err } } // sync config blob - if err := syncBlob(client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, log); err != nil { + if err := syncBlob(ctx, client, imageStore, localRepo, remoteRepo, manifest.Config.Digest, log); err != nil { return []byte{}, refDigest, err } } else { diff --git a/pkg/extensions/sync/references/oras.go b/pkg/extensions/sync/references/oras.go index 6e2316ce..0a9fd4e3 100644 --- a/pkg/extensions/sync/references/oras.go +++ b/pkg/extensions/sync/references/oras.go @@ -4,6 +4,7 @@ package references import ( + "context" "errors" "fmt" "net/http" @@ -48,7 +49,7 @@ func (ref ORASReferences) Name() string { return constants.Oras } -func (ref ORASReferences) IsSigned(remoteRepo, subjectDigestStr string) bool { +func (ref ORASReferences) IsSigned(ctx context.Context, remoteRepo, subjectDigestStr string) bool { return false } @@ -85,10 +86,12 @@ func (ref ORASReferences) canSkipReferences(localRepo, subjectDigestStr string, return true, nil } -func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr string) ([]godigest.Digest, error) { +func (ref ORASReferences) SyncReferences(ctx context.Context, localRepo, remoteRepo, subjectDigestStr string) ( + []godigest.Digest, error, +) { refsDigests := make([]godigest.Digest, 0, 10) - referrers, err := ref.getReferenceList(remoteRepo, subjectDigestStr) + referrers, err := ref.getReferenceList(ctx, remoteRepo, subjectDigestStr) if err != nil { return refsDigests, err } @@ -115,7 +118,7 @@ func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr for _, referrer := range referrers.References { var artifactManifest oras.Manifest - orasBuf, _, statusCode, err := ref.client.MakeGetRequest(&artifactManifest, oras.MediaTypeDescriptor, + orasBuf, _, statusCode, err := ref.client.MakeGetRequest(ctx, &artifactManifest, oras.MediaTypeDescriptor, "v2", remoteRepo, "manifests", referrer.Digest.String()) if err != nil { if statusCode == http.StatusNotFound { @@ -130,7 +133,7 @@ func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr } for _, blob := range artifactManifest.Blobs { - if err := syncBlob(ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil { + if err := syncBlob(ctx, ref.client, imageStore, localRepo, remoteRepo, blob.Digest, ref.log); err != nil { return refsDigests, err } } @@ -170,10 +173,10 @@ func (ref ORASReferences) SyncReferences(localRepo, remoteRepo, subjectDigestStr return refsDigests, nil } -func (ref ORASReferences) getReferenceList(repo, subjectDigestStr string) (ReferenceList, error) { +func (ref ORASReferences) getReferenceList(ctx context.Context, repo, subjectDigestStr string) (ReferenceList, error) { var referrers ReferenceList - _, _, statusCode, err := ref.client.MakeGetRequest(&referrers, "application/json", + _, _, statusCode, err := ref.client.MakeGetRequest(ctx, &referrers, "application/json", apiConstants.ArtifactSpecRoutePrefix, repo, "manifests", subjectDigestStr, "referrers") if err != nil { if statusCode == http.StatusNotFound || statusCode == http.StatusBadRequest { diff --git a/pkg/extensions/sync/references/references.go b/pkg/extensions/sync/references/references.go index 08c2259b..7dcee283 100644 --- a/pkg/extensions/sync/references/references.go +++ b/pkg/extensions/sync/references/references.go @@ -5,6 +5,7 @@ package references import ( "bytes" + "context" "fmt" "net/http" @@ -26,9 +27,9 @@ type Reference interface { // Returns name of reference (OCIReference/CosignReference/OrasReference) Name() string // Returns whether or not image is signed - IsSigned(upstreamRepo, subjectDigestStr string) bool + IsSigned(ctx context.Context, upstreamRepo, subjectDigestStr string) bool // Sync recursively all references for a subject digest (can be image/artifacts/signatures) - SyncReferences(localRepo, upstreamRepo, subjectDigestStr string) ([]godigest.Digest, error) + SyncReferences(ctx context.Context, localRepo, upstreamRepo, subjectDigestStr string) ([]godigest.Digest, error) } type References struct { @@ -48,9 +49,9 @@ func NewReferences(httpClient *client.Client, storeController storage.StoreContr return refs } -func (refs References) IsSigned(upstreamRepo, subjectDigestStr string) bool { +func (refs References) IsSigned(ctx context.Context, upstreamRepo, subjectDigestStr string) bool { for _, ref := range refs.referenceList { - ok := ref.IsSigned(upstreamRepo, subjectDigestStr) + ok := ref.IsSigned(ctx, upstreamRepo, subjectDigestStr) if ok { return true } @@ -59,13 +60,15 @@ func (refs References) IsSigned(upstreamRepo, subjectDigestStr string) bool { return false } -func (refs References) SyncAll(localRepo, upstreamRepo, subjectDigestStr string) error { +func (refs References) SyncAll(ctx context.Context, localRepo, upstreamRepo, subjectDigestStr string) error { seen := &[]godigest.Digest{} - return refs.syncAll(localRepo, upstreamRepo, subjectDigestStr, seen) + return refs.syncAll(ctx, localRepo, upstreamRepo, subjectDigestStr, seen) } -func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string, seen *[]godigest.Digest) error { +func (refs References) syncAll(ctx context.Context, localRepo, upstreamRepo, + subjectDigestStr string, seen *[]godigest.Digest, +) error { var err error var syncedRefsDigests []godigest.Digest @@ -75,7 +78,7 @@ func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string, // for each reference type(cosign/oci/oras reference) for _, ref := range refs.referenceList { - syncedRefsDigests, err = ref.SyncReferences(localRepo, upstreamRepo, subjectDigestStr) + syncedRefsDigests, err = ref.SyncReferences(ctx, localRepo, upstreamRepo, subjectDigestStr) if err != nil { refs.log.Debug().Err(err). Str("reference type", ref.Name()). @@ -87,7 +90,7 @@ func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string, for _, refDigest := range syncedRefsDigests { if !common.Contains(*seen, refDigest) { // sync all references pointing to this one - err = refs.syncAll(localRepo, upstreamRepo, refDigest.String(), seen) + err = refs.syncAll(ctx, localRepo, upstreamRepo, refDigest.String(), seen) } } } @@ -95,14 +98,16 @@ func (refs References) syncAll(localRepo, upstreamRepo, subjectDigestStr string, return err } -func (refs References) SyncReference(localRepo, upstreamRepo, subjectDigestStr, referenceType string) error { +func (refs References) SyncReference(ctx context.Context, localRepo, upstreamRepo, + subjectDigestStr, referenceType string, +) error { var err error var syncedRefsDigests []godigest.Digest for _, ref := range refs.referenceList { if ref.Name() == referenceType { - syncedRefsDigests, err = ref.SyncReferences(localRepo, upstreamRepo, subjectDigestStr) + syncedRefsDigests, err = ref.SyncReferences(ctx, localRepo, upstreamRepo, subjectDigestStr) if err != nil { refs.log.Error().Err(err). Str("reference type", ref.Name()). @@ -113,7 +118,7 @@ func (refs References) SyncReference(localRepo, upstreamRepo, subjectDigestStr, } for _, refDigest := range syncedRefsDigests { - err = refs.SyncAll(localRepo, upstreamRepo, refDigest.String()) + err = refs.SyncAll(ctx, localRepo, upstreamRepo, refDigest.String()) } } } @@ -121,12 +126,12 @@ func (refs References) SyncReference(localRepo, upstreamRepo, subjectDigestStr, return err } -func syncBlob(client *client.Client, imageStore storageTypes.ImageStore, localRepo, remoteRepo string, - digest godigest.Digest, log log.Logger, +func syncBlob(ctx context.Context, client *client.Client, imageStore storageTypes.ImageStore, + localRepo, remoteRepo string, digest godigest.Digest, log log.Logger, ) error { var resultPtr interface{} - body, _, statusCode, err := client.MakeGetRequest(resultPtr, "", "v2", remoteRepo, "blobs", digest.String()) + body, _, statusCode, err := client.MakeGetRequest(ctx, resultPtr, "", "v2", remoteRepo, "blobs", digest.String()) if err != nil { if statusCode != http.StatusOK { log.Info().Str("repo", remoteRepo).Str("digest", digest.String()).Msg("couldn't get remote blob") diff --git a/pkg/extensions/sync/references/references_internal_test.go b/pkg/extensions/sync/references/references_internal_test.go index e0183861..2f968906 100644 --- a/pkg/extensions/sync/references/references_internal_test.go +++ b/pkg/extensions/sync/references/references_internal_test.go @@ -4,6 +4,7 @@ package references import ( + "context" "errors" "testing" @@ -75,7 +76,7 @@ func TestOci(t *testing.T) { }, }}, nil, log.NewLogger("debug", "")) - ok := oci.IsSigned("repo", "") + ok := oci.IsSigned(context.Background(), "repo", "") So(ok, ShouldBeFalse) // trigger GetReferrers err @@ -136,11 +137,12 @@ func TestSyncManifest(t *testing.T) { digest := godigest.FromString("test") - buf, refDigest, err := syncManifest(client, mocks.MockedImageStore{}, "repo", "repo", ispec.Descriptor{ - Digest: digest, - Size: 10, - MediaType: ispec.MediaTypeImageManifest, - }, digest.String(), log.Logger{}) + buf, refDigest, err := syncManifest(context.Background(), client, mocks.MockedImageStore{}, + "repo", "repo", ispec.Descriptor{ + Digest: digest, + Size: 10, + MediaType: ispec.MediaTypeImageManifest, + }, digest.String(), log.Logger{}) So(buf, ShouldBeEmpty) So(refDigest, ShouldBeEmpty) diff --git a/pkg/extensions/sync/remote.go b/pkg/extensions/sync/remote.go index 0694e22b..295fc765 100644 --- a/pkg/extensions/sync/remote.go +++ b/pkg/extensions/sync/remote.go @@ -49,7 +49,7 @@ func (registry *RemoteRegistry) GetContext() *types.SystemContext { func (registry *RemoteRegistry) GetRepositories(ctx context.Context) ([]string, error) { var catalog catalog - _, _, _, err := registry.client.MakeGetRequest(&catalog, "application/json", //nolint: dogsled + _, _, _, err := registry.client.MakeGetRequest(ctx, &catalog, "application/json", //nolint: dogsled constants.RoutePrefix, constants.ExtCatalogPrefix) if err != nil { return []string{}, err diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index bbf99958..cb426f45 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -203,7 +203,9 @@ func (service *BaseService) GetNextRepo(lastRepo string) (string, error) { } // SyncReference on demand. -func (service *BaseService) SyncReference(repo string, subjectDigestStr string, referenceType string) error { +func (service *BaseService) SyncReference(ctx context.Context, repo string, + subjectDigestStr string, referenceType string, +) error { remoteRepo := repo remoteURL := service.client.GetConfig().URL @@ -221,11 +223,11 @@ func (service *BaseService) SyncReference(repo string, subjectDigestStr string, service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("subject", subjectDigestStr). Str("reference type", referenceType).Msg("sync: syncing reference for image") - return service.references.SyncReference(repo, remoteRepo, subjectDigestStr, referenceType) + return service.references.SyncReference(ctx, repo, remoteRepo, subjectDigestStr, referenceType) } // SyncImage on demand. -func (service *BaseService) SyncImage(repo, reference string) error { +func (service *BaseService) SyncImage(ctx context.Context, repo, reference string) error { remoteRepo := repo remoteURL := service.client.GetConfig().URL @@ -243,12 +245,12 @@ func (service *BaseService) SyncImage(repo, reference string) error { service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference). Msg("sync: syncing image") - manifestDigest, err := service.syncTag(repo, remoteRepo, reference) + manifestDigest, err := service.syncTag(ctx, repo, remoteRepo, reference) if err != nil { return err } - err = service.references.SyncAll(repo, remoteRepo, manifestDigest.String()) + err = service.references.SyncAll(ctx, repo, remoteRepo, manifestDigest.String()) if err != nil && !errors.Is(err, zerr.ErrSyncReferrerNotFound) { service.log.Error().Err(err).Str("remote", remoteURL).Str("repo", repo).Str("reference", reference). Msg("error while syncing references for image") @@ -260,7 +262,7 @@ func (service *BaseService) SyncImage(repo, reference string) error { } // sync repo periodically. -func (service *BaseService) SyncRepo(repo string) error { +func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { service.log.Info().Str("repo", repo).Str("registry", service.client.GetConfig().URL). Msg("sync: syncing repo") @@ -268,7 +270,7 @@ func (service *BaseService) SyncRepo(repo string) error { var tags []string - if err = retry.RetryIfNecessary(context.Background(), func() error { + if err = retry.RetryIfNecessary(ctx, func() error { tags, err = service.remote.GetRepoTags(repo) return err @@ -291,14 +293,20 @@ func (service *BaseService) SyncRepo(repo string) error { localRepo := service.contentManager.GetRepoDestination(repo) for _, tag := range tags { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if references.IsCosignTag(tag) { continue } var manifestDigest digest.Digest - if err = retry.RetryIfNecessary(context.Background(), func() error { - manifestDigest, err = service.syncTag(localRepo, repo, tag) + if err = retry.RetryIfNecessary(ctx, func() error { + manifestDigest, err = service.syncTag(ctx, localRepo, repo, tag) return err }, service.retryOptions); err != nil { @@ -314,8 +322,8 @@ func (service *BaseService) SyncRepo(repo string) error { } if manifestDigest != "" { - if err = retry.RetryIfNecessary(context.Background(), func() error { - err = service.references.SyncAll(localRepo, repo, manifestDigest.String()) + if err = retry.RetryIfNecessary(ctx, func() error { + err = service.references.SyncAll(ctx, localRepo, repo, manifestDigest.String()) if errors.Is(err, zerr.ErrSyncReferrerNotFound) { return nil } @@ -335,7 +343,7 @@ func (service *BaseService) SyncRepo(repo string) error { return nil } -func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.Digest, error) { +func (service *BaseService) syncTag(ctx context.Context, localRepo, remoteRepo, tag string) (digest.Digest, error) { copyOptions := getCopyOptions(service.remote.GetContext(), service.local.GetContext()) policyContext, err := getPolicyContext(service.log) @@ -368,7 +376,7 @@ func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.D } if service.config.OnlySigned != nil && *service.config.OnlySigned && !references.IsCosignTag(tag) { - signed := service.references.IsSigned(remoteRepo, manifestDigest.String()) + signed := service.references.IsSigned(ctx, remoteRepo, manifestDigest.String()) if !signed { // skip unsigned images service.log.Info().Str("image", remoteImageRef.DockerReference().String()). @@ -397,7 +405,7 @@ func (service *BaseService) syncTag(localRepo, remoteRepo, tag string) (digest.D service.log.Info().Str("remote image", remoteImageRef.DockerReference().String()). Str("local image", fmt.Sprintf("%s:%s", localRepo, tag)).Msg("syncing image") - _, err = copy.Image(context.Background(), policyContext, localImageRef, remoteImageRef, ©Options) + _, err = copy.Image(ctx, policyContext, localImageRef, remoteImageRef, ©Options) if err != nil { service.log.Error().Err(err).Str("errortype", common.TypeOf(err)). Str("remote image", remoteImageRef.DockerReference().String()). diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index bfd672b1..bb4dd0bf 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -23,11 +23,12 @@ type Service interface { // Get next repo from remote /v2/_catalog, will return empty string when there is no repo left. GetNextRepo(lastRepo string) (string, error) // used by task scheduler // Sync a repo with all of its tags and references (signatures, artifacts, sboms) into ImageStore. - SyncRepo(repo string) error // used by periodically sync + SyncRepo(ctx context.Context, repo string) error // used by periodically sync // Sync an image (repo:tag || repo:digest) into ImageStore. - SyncImage(repo, reference string) error // used by sync on demand + SyncImage(ctx context.Context, repo, reference string) error // used by sync on demand // Sync a single reference for an image. - SyncReference(repo string, subjectDigestStr string, referenceType string) error // used by sync on demand + SyncReference(ctx context.Context, repo string, subjectDigestStr string, + referenceType string) error // used by sync on demand // Remove all internal catalog entries. ResetCatalog() // used by scheduler to empty out the catalog after a sync periodically roundtrip finishes // Sync supports multiple urls per registry, before a sync repo/image/ref 'ping' each url. @@ -133,6 +134,6 @@ func newSyncRepoTask(repo string, service Service) *syncRepoTask { return &syncRepoTask{repo, service} } -func (srt *syncRepoTask) DoWork() error { - return srt.service.SyncRepo(srt.repo) +func (srt *syncRepoTask) DoWork(ctx context.Context) error { + return srt.service.SyncRepo(ctx, srt.repo) } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 43a77902..fae4e02e 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -165,7 +165,7 @@ func TestService(t *testing.T) { service, err := New(conf, "", storage.StoreController{}, mocks.MetaDBMock{}, log.Logger{}) So(err, ShouldBeNil) - err = service.SyncRepo("repo") + err = service.SyncRepo(context.Background(), "repo") So(err, ShouldNotBeNil) }) } diff --git a/pkg/scheduler/README.md b/pkg/scheduler/README.md index 2b6d9096..c5826f58 100644 --- a/pkg/scheduler/README.md +++ b/pkg/scheduler/README.md @@ -6,7 +6,7 @@ In order to create a new generator (which will generate new tasks one by one) an ``` This method should implement the logic for generating a new task. Basically, when this method is called by the scheduler it should return the next task until there are no more tasks to be generated. - Also, the Task returned by this method should implement DoWork() method which should contain the logic that should be executed when this task is run by the scheduler. + Also, the Task returned by this method should implement DoWork(ctx context.Context) method which should contain the logic that should be executed when this task is run by the scheduler. ``` 2. ***IsDone() bool*** ``` @@ -35,10 +35,10 @@ Notes: # How to submit a Task to the scheduler -In order to create a new task and add it to the scheduler ***DoWork() error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler. +In order to create a new task and add it to the scheduler ***DoWork(ctx context.Context) error*** is the method that should be implemented. This should contain the logic that should be executed when this task is run by the scheduler. To submit a task to the scheduler ***SubmitTask*** should be called with the implemented task and the priority of the task as parameters. Note: - - A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task. \ No newline at end of file + - A task can not be periodic. In order to add a periodic task, it can be created a generator which will generate periodically the same task. diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index de574da9..cc771d06 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -12,7 +12,7 @@ import ( ) type Task interface { - DoWork() error + DoWork(ctx context.Context) error } type generatorsPriorityQueue []*generator @@ -97,13 +97,13 @@ func NewScheduler(cfg *config.Config, logC log.Logger) *Scheduler { } } -func (scheduler *Scheduler) poolWorker(numWorkers int, tasks chan Task) { +func (scheduler *Scheduler) poolWorker(ctx context.Context, numWorkers int, tasks chan Task) { for i := 0; i < numWorkers; i++ { go func(workerID int) { for task := range tasks { scheduler.log.Debug().Int("worker", workerID).Msg("scheduler: starting task") - if err := task.DoWork(); err != nil { + if err := task.DoWork(ctx); err != nil { scheduler.log.Error().Int("worker", workerID).Err(err).Msg("scheduler: error while executing task") } @@ -120,7 +120,7 @@ func (scheduler *Scheduler) RunScheduler(ctx context.Context) { tasksWorker := make(chan Task, numWorkers) // start worker pool - go scheduler.poolWorker(numWorkers, tasksWorker) + go scheduler.poolWorker(ctx, numWorkers, tasksWorker) go func() { for { diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index c74c5c09..14e2c83e 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -24,7 +24,7 @@ type task struct { var errInternal = errors.New("task: internal error") -func (t *task) DoWork() error { +func (t *task) DoWork(ctx context.Context) error { if t.err { return errInternal } diff --git a/pkg/storage/common/common.go b/pkg/storage/common/common.go index d58495a9..1f819c63 100644 --- a/pkg/storage/common/common.go +++ b/pkg/storage/common/common.go @@ -2,6 +2,7 @@ package storage import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -1032,9 +1033,9 @@ func newDedupeTask(imgStore storageTypes.ImageStore, digest godigest.Digest, ded return &dedupeTask{imgStore, digest, duplicateBlobs, dedupe, log} } -func (dt *dedupeTask) DoWork() error { +func (dt *dedupeTask) DoWork(ctx context.Context) error { // run task - err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs) + err := dt.imgStore.RunDedupeForDigest(dt.digest, dt.dedupe, dt.duplicateBlobs) //nolint: contextcheck if err != nil { // log it dt.log.Error().Err(err).Str("digest", dt.digest.String()).Msg("rebuild dedupe: failed to rebuild digest") @@ -1112,7 +1113,7 @@ func NewGCTask(imgStore storageTypes.ImageStore, repo string, return &gcTask{imgStore, repo} } -func (gct *gcTask) DoWork() error { +func (gct *gcTask) DoWork(ctx context.Context) error { // run task return gct.imgStore.RunGCRepo(gct.repo) } diff --git a/pkg/storage/scrub.go b/pkg/storage/scrub.go index 327ddb30..7ec34e16 100644 --- a/pkg/storage/scrub.go +++ b/pkg/storage/scrub.go @@ -43,7 +43,7 @@ type ScrubResults struct { ScrubResults []ScrubImageResult `json:"scrubResults"` } -func (sc StoreController) CheckAllBlobsIntegrity() (ScrubResults, error) { +func (sc StoreController) CheckAllBlobsIntegrity(ctx context.Context) (ScrubResults, error) { results := ScrubResults{} imageStoreList := make(map[string]storageTypes.ImageStore) @@ -54,7 +54,7 @@ func (sc StoreController) CheckAllBlobsIntegrity() (ScrubResults, error) { imageStoreList[""] = sc.DefaultStore for _, imgStore := range imageStoreList { - imgStoreResults, err := CheckImageStoreBlobsIntegrity(imgStore) + imgStoreResults, err := CheckImageStoreBlobsIntegrity(ctx, imgStore) if err != nil { return results, err } @@ -65,7 +65,7 @@ func (sc StoreController) CheckAllBlobsIntegrity() (ScrubResults, error) { return results, nil } -func CheckImageStoreBlobsIntegrity(imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) { +func CheckImageStoreBlobsIntegrity(ctx context.Context, imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) { results := []ScrubImageResult{} repos, err := imgStore.GetRepositories() @@ -74,7 +74,7 @@ func CheckImageStoreBlobsIntegrity(imgStore storageTypes.ImageStore) ([]ScrubIma } for _, repo := range repos { - imageResults, err := CheckRepo(repo, imgStore) + imageResults, err := CheckRepo(ctx, repo, imgStore) if err != nil { return results, err } @@ -85,20 +85,23 @@ func CheckImageStoreBlobsIntegrity(imgStore storageTypes.ImageStore) ([]ScrubIma return results, nil } -func CheckRepo(imageName string, imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) { +func CheckRepo(ctx context.Context, imageName string, imgStore storageTypes.ImageStore) ([]ScrubImageResult, error) { results := []ScrubImageResult{} + if ctx.Err() != nil { + return results, ctx.Err() + } + dir := path.Join(imgStore.RootDir(), imageName) if !imgStore.DirExists(dir) { return results, errors.ErrRepoNotFound } - ctxUmoci := context.Background() - oci, err := umoci.OpenLayout(dir) if err != nil { return results, err } + defer oci.Close() var lockLatency time.Time @@ -146,7 +149,7 @@ func CheckRepo(imageName string, imgStore storageTypes.ImageStore) ([]ScrubImage for _, m := range listOfManifests { tag := m.Annotations[ispec.AnnotationRefName] - imageResult := CheckIntegrity(ctxUmoci, imageName, tag, oci, m, dir) + imageResult := CheckIntegrity(ctx, imageName, tag, oci, m, dir) results = append(results, imageResult) } @@ -160,10 +163,10 @@ func CheckIntegrity(ctx context.Context, imageName, tagName string, oci casext.E } // check layers - return CheckLayers(imageName, tagName, dir, manifest) + return CheckLayers(ctx, imageName, tagName, dir, manifest) } -func CheckLayers(imageName, tagName, dir string, manifest ispec.Descriptor) ScrubImageResult { +func CheckLayers(ctx context.Context, imageName, tagName, dir string, manifest ispec.Descriptor) ScrubImageResult { imageRes := ScrubImageResult{} buf, err := os.ReadFile(path.Join(dir, "blobs", manifest.Digest.Algorithm().String(), manifest.Digest.Encoded())) diff --git a/pkg/storage/scrub_test.go b/pkg/storage/scrub_test.go index 1253784b..26a974ba 100644 --- a/pkg/storage/scrub_test.go +++ b/pkg/storage/scrub_test.go @@ -2,6 +2,7 @@ package storage_test import ( "bytes" + "context" "encoding/json" "os" "path" @@ -78,7 +79,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { Convey("Blobs integrity not affected", func() { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -102,7 +103,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -120,7 +121,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { manifestDescriptor := index.Manifests[0] repoDir := path.Join(dir, repoName) - imageRes := storage.CheckLayers(repoName, tag, repoDir, manifestDescriptor) + imageRes := storage.CheckLayers(context.Background(), repoName, tag, repoDir, manifestDescriptor) So(imageRes.Status, ShouldEqual, "affected") So(imageRes.Error, ShouldEqual, "unexpected end of JSON input") @@ -142,7 +143,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -170,7 +171,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -199,7 +200,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { So(len(index.Manifests), ShouldEqual, 1) manifestDescriptor := index.Manifests[0] - imageRes := storage.CheckLayers(repoName, tag, repoDir, manifestDescriptor) + imageRes := storage.CheckLayers(context.Background(), repoName, tag, repoDir, manifestDescriptor) So(imageRes.Status, ShouldEqual, "affected") So(imageRes.Error, ShouldEqual, "blob: not found") err = os.Chmod(layerFile, 0x0600) @@ -211,7 +212,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -262,7 +263,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -280,7 +281,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff = bytes.NewBufferString("") - res, err = storeCtlr.CheckAllBlobsIntegrity() + res, err = storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -297,7 +298,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff = bytes.NewBufferString("") - res, err = storeCtlr.CheckAllBlobsIntegrity() + res, err = storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -316,7 +317,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { buff := bytes.NewBufferString("") - res, err := storeCtlr.CheckAllBlobsIntegrity() + res, err := storeCtlr.CheckAllBlobsIntegrity(context.Background()) res.PrintScrubResults(buff) So(err, ShouldBeNil) @@ -334,7 +335,7 @@ func TestCheckAllBlobsIntegrity(t *testing.T) { manifestDescriptor := index.Manifests[0] repoDir := path.Join(dir, repoName) - imageRes := storage.CheckLayers(repoName, tag, repoDir, manifestDescriptor) + imageRes := storage.CheckLayers(context.Background(), repoName, tag, repoDir, manifestDescriptor) So(imageRes.Status, ShouldEqual, "affected") So(imageRes.Error, ShouldContainSubstring, "no such file or directory") })