refactor(sync): use task scheduler (#1301)

Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
peusebiu
2023-05-31 20:26:23 +03:00
committed by GitHub
parent e148343540
commit 612a12e5a8
40 changed files with 4343 additions and 3604 deletions
+34 -14
View File
@@ -43,6 +43,7 @@ type Controller struct {
Server *http.Server
Metrics monitoring.MetricServer
CveInfo ext.CveInfo
SyncOnDemand SyncOnDemand
// runtime params
chosenPort int // kernel-chosen port
}
@@ -279,16 +280,30 @@ func (c *Controller) LoadNewConfig(reloadCtx context.Context, config *config.Con
// reload access control config
c.Config.HTTP.AccessControl = config.HTTP.AccessControl
// Enable extensions if extension config is provided
if config.Extensions != nil && config.Extensions.Sync != nil {
// reload sync config
// reload periodical gc interval
c.Config.Storage.GCInterval = config.Storage.GCInterval
// reload background tasks
if config.Extensions != nil {
// reload sync extension
c.Config.Extensions.Sync = config.Extensions.Sync
ext.EnableSyncExtension(reloadCtx, c.Config, c.RepoDB, c.StoreController, c.Log)
} else if c.Config.Extensions != nil {
c.Config.Extensions.Sync = nil
// reload search cve extension
if c.Config.Extensions.Search != nil {
// reload only if search is enabled and reloaded config has search extension
if *c.Config.Extensions.Search.Enable && config.Extensions.Search != nil {
c.Config.Extensions.Search.CVE = config.Extensions.Search.CVE
}
}
// reload scrub extension
c.Config.Extensions.Scrub = config.Extensions.Scrub
} else {
c.Config.Extensions = nil
}
c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).Msg("new configuration settings")
c.StartBackgroundTasks(reloadCtx)
c.Log.Info().Interface("reloaded params", c.Config.Sanitize()).
Msg("loaded new configuration settings")
}
func (c *Controller) Shutdown() {
@@ -334,14 +349,19 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
}
}
// Enable extensions if extension config is provided for storeController
if c.Config.Extensions != nil {
if c.Config.Extensions.Sync != nil {
ext.EnableSyncExtension(reloadCtx, c.Config, c.RepoDB, c.StoreController, c.Log)
}
}
if c.Config.Extensions != nil {
ext.EnableScrubExtension(c.Config, c.Log, c.StoreController, taskScheduler)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.RepoDB, c.StoreController, taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("unable to start sync extension")
}
c.SyncOnDemand = syncOnDemand
}
}
type SyncOnDemand interface {
SyncImage(repo, reference string) error
SyncReference(repo string, subjectDigestStr string, referenceType string) error
}
+55 -45
View File
@@ -8,7 +8,6 @@
package api
import (
"context"
"encoding/json"
"errors"
"fmt"
@@ -27,6 +26,7 @@ import (
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1"
"github.com/sigstore/cosign/v2/pkg/oci/remote"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api/constants"
@@ -34,7 +34,7 @@ import (
gqlPlayground "zotregistry.io/zot/pkg/debug/gqlplayground"
debug "zotregistry.io/zot/pkg/debug/swagger"
ext "zotregistry.io/zot/pkg/extensions"
"zotregistry.io/zot/pkg/extensions/sync"
syncConstants "zotregistry.io/zot/pkg/extensions/sync/constants"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta"
zreg "zotregistry.io/zot/pkg/regexp"
@@ -376,8 +376,7 @@ func (rh *RouteHandler) CheckManifest(response http.ResponseWriter, request *htt
return
}
content, digest, mediaType, err := getImageManifest(request.Context(), rh, imgStore,
name, reference) //nolint:contextcheck
content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint:contextcheck
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusNotFound,
@@ -449,8 +448,7 @@ func (rh *RouteHandler) GetManifest(response http.ResponseWriter, request *http.
return
}
content, digest, mediaType, err := getImageManifest(request.Context(), rh,
imgStore, name, reference) //nolint: contextcheck
content, digest, mediaType, err := getImageManifest(rh, imgStore, name, reference) //nolint: contextcheck
if err != nil {
if errors.Is(err, zerr.ErrRepoNotFound) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain
WriteJSON(response, http.StatusNotFound,
@@ -488,31 +486,26 @@ type ImageIndex struct {
ispec.Index
}
func getReferrers(ctx context.Context, routeHandler *RouteHandler,
func getReferrers(routeHandler *RouteHandler,
imgStore storageTypes.ImageStore, name string, digest godigest.Digest,
artifactTypes []string,
) (ispec.Index, error) {
references, err := imgStore.GetReferrers(name, digest, artifactTypes)
if err != nil || len(references.Manifests) == 0 {
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
refs, err := imgStore.GetReferrers(name, digest, artifactTypes)
if err != nil || len(refs.Manifests) == 0 {
if isSyncOnDemandEnabled(*routeHandler.c) {
routeHandler.c.Log.Info().Str("repository", name).Str("reference", digest.String()).
Msg("referrers not found, trying to get reference by syncing on demand")
errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, digest.String(), sync.OCIReference, routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("unable to get references")
return ispec.Index{}, err
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), syncConstants.OCI); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", digest.String()).
Msg("error encounter while syncing OCI reference for image")
}
references, err = imgStore.GetReferrers(name, digest, artifactTypes)
refs, err = imgStore.GetReferrers(name, digest, artifactTypes)
}
}
return references, err
return refs, err
}
// GetReferrers godoc
@@ -559,7 +552,7 @@ func (rh *RouteHandler) GetReferrers(response http.ResponseWriter, request *http
imgStore := rh.getImageStore(name)
referrers, err := getReferrers(request.Context(), rh, imgStore, name, digest, artifactTypes)
referrers, err := getReferrers(rh, imgStore, name, digest, artifactTypes)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found")
@@ -1722,15 +1715,10 @@ func (rh *RouteHandler) getImageStore(name string) storageTypes.ImageStore {
}
// will sync on demand if an image is not found, in case sync extensions is enabled.
func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore storageTypes.ImageStore,
name, reference string,
func getImageManifest(routeHandler *RouteHandler, imgStore storageTypes.ImageStore, name,
reference string,
) ([]byte, godigest.Digest, string, error) {
syncEnabled := false
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
syncEnabled = true
}
syncEnabled := isSyncOnDemandEnabled(*routeHandler.c)
_, digestErr := godigest.Parse(reference)
if digestErr == nil {
@@ -1745,36 +1733,47 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore
routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference).
Msg("trying to get updated image by syncing on demand")
errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, reference, "", routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("error encounter while syncing image")
// we use a custom method for syncing cosign signatures for the moment, even though it's also an oci image.
if isCosignTag(reference) {
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, reference, syncConstants.Cosign); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("error encounter while syncing cosign signature for image")
}
} else {
if errSync := routeHandler.c.SyncOnDemand.SyncImage(name, reference); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("error encounter while syncing image")
}
}
}
return imgStore.GetImageManifest(name, reference)
}
// this function will check if tag is a cosign tag (signature or sbom).
func isCosignTag(tag string) bool {
if strings.HasPrefix(tag, "sha256-") &&
(strings.HasSuffix(tag, remote.SignatureTagSuffix) || strings.HasSuffix(tag, remote.SBOMTagSuffix)) {
return true
}
return false
}
// will sync referrers on demand if they are not found, in case sync extensions is enabled.
func getOrasReferrers(ctx context.Context, routeHandler *RouteHandler,
func getOrasReferrers(routeHandler *RouteHandler,
imgStore storageTypes.ImageStore, name string, digest godigest.Digest,
artifactType string,
) ([]artifactspec.Descriptor, error) {
refs, err := imgStore.GetOrasReferrers(name, digest, artifactType)
if err != nil {
if routeHandler.c.Config.Extensions != nil &&
routeHandler.c.Config.Extensions.Sync != nil &&
*routeHandler.c.Config.Extensions.Sync.Enable {
if isSyncOnDemandEnabled(*routeHandler.c) {
routeHandler.c.Log.Info().Str("repository", name).Str("reference", digest.String()).
Msg("artifact not found, trying to get artifact by syncing on demand")
errSync := ext.SyncOneImage(ctx, routeHandler.c.Config, routeHandler.c.RepoDB, routeHandler.c.StoreController,
name, digest.String(), sync.OrasArtifact, routeHandler.c.Log)
if errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("unable to get references")
return []artifactspec.Descriptor{}, err
if errSync := routeHandler.c.SyncOnDemand.SyncReference(name, digest.String(), syncConstants.Oras); errSync != nil {
routeHandler.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).
Msg("unable to get references")
}
refs, err = imgStore.GetOrasReferrers(name, digest, artifactType)
@@ -1838,7 +1837,7 @@ func (rh *RouteHandler) GetOrasReferrers(response http.ResponseWriter, request *
rh.c.Log.Info().Str("digest", digest.String()).Str("artifactType", artifactType).Msg("getting manifest")
refs, err := getOrasReferrers(request.Context(), rh, imgStore, name, digest, artifactType) //nolint:contextcheck
refs, err := getOrasReferrers(rh, imgStore, name, digest, artifactType) //nolint:contextcheck
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
rh.c.Log.Error().Err(err).Str("name", name).Str("digest", digest.String()).Msg("manifest not found")
@@ -1883,3 +1882,14 @@ func getBlobUploadLocation(url *url.URL, name string, digest godigest.Digest) st
return url.String()
}
func isSyncOnDemandEnabled(ctlr Controller) bool {
if ctlr.Config.Extensions != nil &&
ctlr.Config.Extensions.Sync != nil &&
*ctlr.Config.Extensions.Sync.Enable &&
fmt.Sprintf("%v", ctlr.SyncOnDemand) != fmt.Sprintf("%v", nil) {
return true
}
return false
}