diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index 3af97d97..40189a7f 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -663,9 +663,15 @@ func applyDefaultValues(config *config.Config, viperInstance *viper.Viper, logge config.Extensions.Sync.Enable = &defaultVal } - for id, regCfg := range config.Extensions.Sync.Registries { + defaultSyncTimeout := 3 * time.Hour + + for idx, regCfg := range config.Extensions.Sync.Registries { if regCfg.TLSVerify == nil { - config.Extensions.Sync.Registries[id].TLSVerify = &defaultVal + config.Extensions.Sync.Registries[idx].TLSVerify = &defaultVal + } + + if config.Extensions.Sync.Registries[idx].SyncTimeout == 0 { + config.Extensions.Sync.Registries[idx].SyncTimeout = defaultSyncTimeout } } } diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go index 77509287..e065d138 100644 --- a/pkg/extensions/config/sync/config.go +++ b/pkg/extensions/config/sync/config.go @@ -33,7 +33,8 @@ type RegistryConfig struct { RetryDelay *time.Duration OnlySigned *bool CredentialHelper string - PreserveDigest bool // sync without converting + PreserveDigest bool // sync without converting + SyncTimeout time.Duration // timeout for on-demand sync operations; if zero or unset, defaults to 3 hours } type Content struct { diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 0772bb9c..9ef2cd1d 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -7,6 +7,7 @@ import ( "context" "errors" "sync" + "time" zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/common" @@ -64,7 +65,7 @@ func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference str defer onDemand.requestStore.Delete(req) - go onDemand.syncImage(ctx, repo, reference, syncResult) + go onDemand.syncImage(repo, reference, syncResult) err := <-syncResult @@ -95,21 +96,37 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string, defer onDemand.requestStore.Delete(req) - go onDemand.syncReferrers(ctx, repo, subjectDigestStr, referenceTypes, syncResult) + go onDemand.syncReferrers(repo, subjectDigestStr, referenceTypes, syncResult) err := <-syncResult return err } -func (onDemand *BaseOnDemand) syncReferrers(ctx context.Context, repo, subjectDigestStr string, +func (onDemand *BaseOnDemand) syncReferrers(repo, subjectDigestStr string, referenceTypes []string, syncResult chan error, ) { defer close(syncResult) var err error + for serviceID, service := range onDemand.services { - err = service.SyncReferrers(ctx, repo, subjectDigestStr, referenceTypes) + timeout := service.GetSyncTimeout() + + onDemand.log.Debug(). + Str("repo", repo). + Str("reference", subjectDigestStr). + Int("serviceID", serviceID). + Dur("timeout", timeout). + Msg("starting on-demand referrer sync") + + // Create a detached context with timeout to ensure sync completes even if HTTP client disconnects. + // This prevents Kubernetes timeout/retries from aborting in-progress referrer downloads. + syncCtx, cancel := context.WithTimeout(context.Background(), timeout) + err = service.SyncReferrers(syncCtx, repo, subjectDigestStr, referenceTypes) + + cancel() + if err != nil { if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrSyncImageFilteredOut) || @@ -132,8 +149,10 @@ func (onDemand *BaseOnDemand) syncReferrers(ctx context.Context, repo, subjectDi } if service.CanRetryOnError() { + retryErr := err + // retry in background - go func(service Service) { + go func(service Service, serviceTimeout time.Duration) { // remove image after syncing defer func() { onDemand.requestStore.Delete(req) @@ -141,15 +160,19 @@ func (onDemand *BaseOnDemand) syncReferrers(ctx context.Context, repo, subjectDi Msg("sync routine for image exited") }() - onDemand.log.Info().Str("repo", repo).Str("reference", subjectDigestStr).Str("err", err.Error()). + onDemand.log.Info().Str("repo", repo).Str("reference", subjectDigestStr).Str("err", retryErr.Error()). Msg("sync routine: starting routine to copy image, because of error") - err := service.SyncReferrers(context.Background(), repo, subjectDigestStr, referenceTypes) + // Use detached context with timeout for background retry + retryCtx, cancel := context.WithTimeout(context.Background(), serviceTimeout) + defer cancel() + + err := service.SyncReferrers(retryCtx, repo, subjectDigestStr, referenceTypes) if err != nil { onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("reference", subjectDigestStr). Err(err).Msg("sync routine: starting routine to retry copy image due to error") } - }(service) + }(service, timeout) } } else { break @@ -159,13 +182,28 @@ func (onDemand *BaseOnDemand) syncReferrers(ctx context.Context, repo, subjectDi syncResult <- err } -func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference string, syncResult chan error) { +func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan error) { defer close(syncResult) var err error for serviceID, service := range onDemand.services { - err = service.SyncImage(ctx, repo, reference) + timeout := service.GetSyncTimeout() + + onDemand.log.Debug(). + Str("repo", repo). + Str("reference", reference). + Int("serviceID", serviceID). + Dur("timeout", timeout). + Msg("starting on-demand image sync") + + // Create a detached context with timeout to ensure sync completes even if HTTP client disconnects. + // This prevents Kubernetes timeout/retries from aborting in-progress image downloads. + syncCtx, cancel := context.WithTimeout(context.Background(), timeout) + err = service.SyncImage(syncCtx, repo, reference) + + cancel() + if err != nil { if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrSyncImageFilteredOut) || @@ -191,7 +229,7 @@ func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference str retryErr := err // retry in background - go func(service Service) { + go func(service Service, serviceTimeout time.Duration) { // remove image after syncing defer func() { onDemand.requestStore.Delete(req) @@ -202,12 +240,16 @@ func (onDemand *BaseOnDemand) syncImage(ctx context.Context, repo, reference str onDemand.log.Info().Str("repo", repo).Str("reference", reference).Str("err", retryErr.Error()). Msg("sync routine: starting routine to retry copy image due to error") - err := service.SyncImage(context.Background(), repo, reference) + // Use detached context with timeout for background retry + retryCtx, cancel := context.WithTimeout(context.Background(), serviceTimeout) + defer cancel() + + err := service.SyncImage(retryCtx, repo, reference) if err != nil { onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("reference", reference). Err(err).Msg("sync routine: error while copying image") } - }(service) + }(service, timeout) } } else { break diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index e450ef82..d61e7846 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -209,6 +209,14 @@ func (service *BaseService) CanRetryOnError() bool { return false } +func (service *BaseService) GetSyncTimeout() time.Duration { + if service.config.SyncTimeout == 0 { + return 3 * time.Hour // default timeout + } + + return service.config.SyncTimeout +} + func (service *BaseService) getNextRepoFromCatalog(lastRepo string) string { var found bool diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index aee2cf1e..d8dbda0b 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -35,6 +35,8 @@ type Service interface { /* Returns if service has retry option set. Is used by ondemand to decide if it retries pulling an image in background or not. */ CanRetryOnError() bool // used by sync on demand to retry in background + // Get the sync timeout configured for this service + GetSyncTimeout() time.Duration } // Local and remote registries must implement this interface.