diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go index 04d17640..9bf86214 100644 --- a/pkg/extensions/extension_sync.go +++ b/pkg/extensions/extension_sync.go @@ -57,8 +57,10 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, if isPeriodical { // add to task scheduler periodic sync - gen := sync.NewTaskGenerator(service, log) - sch.SubmitGenerator(gen, registryConfig.PollInterval, scheduler.MediumPriority) + interval := registryConfig.PollInterval + + gen := sync.NewTaskGenerator(service, interval, log) + sch.SubmitGenerator(gen, interval, scheduler.MediumPriority) } if isOnDemand { diff --git a/pkg/extensions/search/cve/update.go b/pkg/extensions/search/cve/update.go index 06a81096..010599ba 100644 --- a/pkg/extensions/search/cve/update.go +++ b/pkg/extensions/search/cve/update.go @@ -59,6 +59,7 @@ func (gen *DBUpdateTaskGenerator) Next() (scheduler.Task, error) { newTask = newDBUpdadeTask(gen.interval, gen.scanner, gen, gen.log) gen.status = running } + gen.lock.Unlock() return newTask, nil diff --git a/pkg/extensions/sync/features/features.go b/pkg/extensions/sync/features/features.go new file mode 100644 index 00000000..0dd88a47 --- /dev/null +++ b/pkg/extensions/sync/features/features.go @@ -0,0 +1,53 @@ +package features + +import ( + "sync" + "time" +) + +const defaultExpireMinutes = 10 + +type featureKey struct { + kind string + repo string +} + +type featureVal struct { + enabled bool + expire time.Time +} + +type Map struct { + store map[featureKey]*featureVal + expireAfter time.Duration + mu *sync.Mutex +} + +func New() *Map { + return &Map{ + store: make(map[featureKey]*featureVal), + expireAfter: defaultExpireMinutes * time.Minute, + mu: new(sync.Mutex), + } +} + +// returns if registry supports this feature and if ok. +func (f *Map) Get(kind, repo string) (bool, bool) { + f.mu.Lock() + defer f.mu.Unlock() + + if feature, ok := f.store[featureKey{kind, repo}]; ok { + if time.Now().Before(feature.expire) { + return feature.enabled, true + } + } + + // feature expired or not found + return false, false +} + +func (f *Map) Set(kind, repo string, enabled bool) { + f.mu.Lock() + f.store[featureKey{kind: kind, repo: repo}] = &featureVal{enabled: enabled, expire: time.Now().Add(f.expireAfter)} + f.mu.Unlock() +} diff --git a/pkg/extensions/sync/references/references.go b/pkg/extensions/sync/references/references.go index e808e20a..99be3c9d 100644 --- a/pkg/extensions/sync/references/references.go +++ b/pkg/extensions/sync/references/references.go @@ -6,6 +6,7 @@ package references import ( "bytes" "context" + "errors" "fmt" "net/http" @@ -14,7 +15,10 @@ import ( artifactspec "github.com/oras-project/artifacts-spec/specs-go/v1" "github.com/sigstore/cosign/v2/pkg/oci/static" + zerr "zotregistry.dev/zot/errors" "zotregistry.dev/zot/pkg/common" + "zotregistry.dev/zot/pkg/extensions/sync/constants" + "zotregistry.dev/zot/pkg/extensions/sync/features" client "zotregistry.dev/zot/pkg/extensions/sync/httpclient" "zotregistry.dev/zot/pkg/log" mTypes "zotregistry.dev/zot/pkg/meta/types" @@ -33,13 +37,14 @@ type Reference interface { type References struct { referenceList []Reference + features *features.Map log log.Logger } func NewReferences(httpClient *client.Client, storeController storage.StoreController, metaDB mTypes.MetaDB, log log.Logger, ) References { - refs := References{log: log} + refs := References{features: features.New(), log: log} refs.referenceList = append(refs.referenceList, NewCosignReference(httpClient, storeController, metaDB, log)) refs.referenceList = append(refs.referenceList, NewTagReferences(httpClient, storeController, metaDB, log)) @@ -78,12 +83,30 @@ func (refs References) syncAll(ctx context.Context, localRepo, upstreamRepo, // for each reference type(cosign/oci/oras reference) for _, ref := range refs.referenceList { + supported, ok := refs.features.Get(ref.Name(), upstreamRepo) + if !supported && ok { + continue + } + syncedRefsDigests, err = ref.SyncReferences(ctx, localRepo, upstreamRepo, subjectDigestStr) if err != nil { + // for all referrers we can stop querying same repo (for ten minutes) if the errors are different than 404 + if !errors.Is(err, zerr.ErrSyncReferrerNotFound) { + refs.features.Set(ref.Name(), upstreamRepo, false) + } + + // in the case of oci referrers, it will return 404 only if the repo is not found or refferers API is not supported + // no need to continue to make requests to the same repo + if ref.Name() == constants.OCI && errors.Is(err, zerr.ErrSyncReferrerNotFound) { + refs.features.Set(ref.Name(), upstreamRepo, false) + } + refs.log.Debug().Err(err). Str("reference type", ref.Name()). Str("image", fmt.Sprintf("%s:%s", upstreamRepo, subjectDigestStr)). Msg("couldn't sync image referrer") + } else { + refs.features.Set(ref.Name(), upstreamRepo, true) } // for each synced references diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index 237822f0..a8cc10e2 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -364,8 +364,6 @@ func (service *BaseService) SyncRepo(ctx context.Context, repo string) error { }, service.retryOptions); err != nil { service.log.Error().Str("errorType", common.TypeOf(err)).Str("repository", repo). Err(err).Msg("failed to sync tags for repository") - - return err } } } diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index cfd9eb85..b0e67b25 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -6,6 +6,8 @@ package sync import ( "context" "fmt" + "sync" + "time" "github.com/containers/common/pkg/retry" "github.com/containers/image/v5/types" @@ -80,18 +82,26 @@ type Destination interface { } type TaskGenerator struct { - Service Service - lastRepo string - done bool - log log.Logger + Service Service + lastRepo string + done bool + waitTime time.Duration + lastTaskTime time.Time + maxWaitTime time.Duration + lock *sync.Mutex + log log.Logger } -func NewTaskGenerator(service Service, log log.Logger) *TaskGenerator { +func NewTaskGenerator(service Service, maxWaitTime time.Duration, log log.Logger) *TaskGenerator { return &TaskGenerator{ - Service: service, - done: false, - lastRepo: "", - log: log, + Service: service, + done: false, + waitTime: 0, + lastTaskTime: time.Now(), + lock: &sync.Mutex{}, + lastRepo: "", + maxWaitTime: maxWaitTime, + log: log, } } @@ -100,27 +110,35 @@ func (gen *TaskGenerator) Name() string { } func (gen *TaskGenerator) Next() (scheduler.Task, error) { + gen.lock.Lock() + defer gen.lock.Unlock() + + if time.Since(gen.lastTaskTime) <= gen.waitTime { + return nil, nil + } + if err := gen.Service.SetNextAvailableURL(); err != nil { + gen.increaseWaitTime() + return nil, err } repo, err := gen.Service.GetNextRepo(gen.lastRepo) if err != nil { + gen.increaseWaitTime() + return nil, err } + gen.resetWaitTime() + if repo == "" { - gen.log.Info().Str("component", "sync").Msg("finished syncing all repos") + gen.log.Info().Str("component", "sync").Msg("finished syncing all repositories") gen.done = true return nil, nil } - // a task with this repo is already running - if gen.lastRepo == repo { - return nil, nil - } - gen.lastRepo = repo return newSyncRepoTask(gen.lastRepo, gen.Service), nil @@ -135,9 +153,34 @@ func (gen *TaskGenerator) IsReady() bool { } func (gen *TaskGenerator) Reset() { + gen.lock.Lock() + defer gen.lock.Unlock() + gen.lastRepo = "" gen.Service.ResetCatalog() gen.done = false + gen.waitTime = 0 +} + +func (gen *TaskGenerator) increaseWaitTime() { + if gen.waitTime == 0 { + gen.waitTime = time.Second + } + + gen.waitTime *= 2 + + // max wait time should not exceed generator interval. + if gen.waitTime > gen.maxWaitTime { + gen.waitTime = gen.maxWaitTime + } + + gen.lastTaskTime = time.Now() +} + +// resets wait time. +func (gen *TaskGenerator) resetWaitTime() { + gen.lastTaskTime = time.Now() + gen.waitTime = 0 } type syncRepoTask struct { @@ -154,7 +197,7 @@ func (srt *syncRepoTask) DoWork(ctx context.Context) error { } func (srt *syncRepoTask) String() string { - return fmt.Sprintf("{Name: \"%s\", repo: \"%s\"}", + return fmt.Sprintf("{Name: \"%s\", repository: \"%s\"}", srt.Name(), srt.repo) }