From 14cd52e99330e6d09aa6f2d37e9db7b92945e202 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Tue, 19 May 2026 00:55:07 +0530 Subject: [PATCH] feat(sync): move stream from global to per upstream Signed-off-by: Vishwas Rajashekar --- examples/config-sync-stream.json | 2 +- pkg/api/controller.go | 1 + pkg/api/routes.go | 23 +++--- pkg/cli/server/root.go | 27 +++++++ pkg/cli/server/root_test.go | 98 +++++++++++++++++++++++ pkg/extensions/config/config.go | 12 ++- pkg/extensions/config/sync/config.go | 21 +++-- pkg/extensions/extension_sync.go | 8 +- pkg/extensions/sync/on_demand.go | 11 +++ pkg/extensions/sync/on_demand_disabled.go | 4 + pkg/extensions/sync/service.go | 17 +++- pkg/extensions/sync/sync.go | 2 + 12 files changed, 198 insertions(+), 28 deletions(-) diff --git a/examples/config-sync-stream.json b/examples/config-sync-stream.json index fc392972..0e1c109d 100644 --- a/examples/config-sync-stream.json +++ b/examples/config-sync-stream.json @@ -13,13 +13,13 @@ "extensions": { "sync": { "enable": true, - "stream": true, "registries": [ { "urls": [ "http://localhost:9000" ], "onDemand": true, + "stream": true, "tlsVerify": false } ] diff --git a/pkg/api/controller.go b/pkg/api/controller.go index b714e620..6fccace3 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -656,4 +656,5 @@ type SyncOnDemand interface { SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error FetchManifestForStream(ctx context.Context, repo, reference string) (manifest.Manifest, error) StreamManager() sync.StreamManager + IsStreamingEnabledForRepo(repo string) bool } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 61ca9e09..97a74120 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1124,8 +1124,7 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re e := apiErr.NewError(apiErr.DIGEST_INVALID).AddDetail(details) zcommon.WriteJSON(response, http.StatusBadRequest, apiErr.NewErrorList(e)) } else if errors.Is(err, zerr.ErrRepoNotFound) { - extConf := rh.c.Config.CopyExtensionsConfig() - if extConf.IsStreamingEnabled() { + if rh.c.SyncOnDemand != nil && rh.c.SyncOnDemand.IsStreamingEnabledForRepo(name) { streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) if streamErr == nil { return @@ -1135,8 +1134,7 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re e := apiErr.NewError(apiErr.NAME_UNKNOWN).AddDetail(details) zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e)) } else if errors.Is(err, zerr.ErrBlobNotFound) { - extConf := rh.c.Config.CopyExtensionsConfig() - if extConf.IsStreamingEnabled() { + if rh.c.SyncOnDemand != nil && rh.c.SyncOnDemand.IsStreamingEnabledForRepo(name) { streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) if streamErr == nil { return @@ -1494,13 +1492,12 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ writeBlobError := func(err error) { details := zerr.GetDetails(err) - extConf := rh.c.Config.CopyExtensionsConfig() - if extConf.IsStreamingEnabled() { - rh.c.Log.Info().Msg("streaming enabled. using stream logic") + if rh.c.SyncOnDemand != nil && rh.c.SyncOnDemand.IsStreamingEnabledForRepo(name) { + rh.c.Log.Debug().Str("repo", name).Msg("streaming enabled for repo. using stream logic for blob.") if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrBlobNotFound) { - rh.c.Log.Info().Msg("blob was not found. Connecting client to stream") + rh.c.Log.Debug().Str("repo", name).Str("digest", digest.String()).Msg("connecting client to stream") copier, clientConnErr := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response) if clientConnErr != nil { @@ -2718,12 +2715,10 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). Msg("trying to get updated image by syncing on demand") - extConf := routeHandler.c.Config.CopyExtensionsConfig() - - // if streaming enabled, return manifest immediately - if extConf.IsStreamingEnabled() { - routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). - Msg("streaming is enabled. Direct fetching manifest.") + // If streaming is enabled for this repo, return manifest immediately. + if routeHandler.c.SyncOnDemand.IsStreamingEnabledForRepo(name) { + routeHandler.c.Log.Debug().Str("repository", name).Str("reference", reference). + Msg("streaming is enabled for repo. Direct fetching manifest.") fetchedManifest, err := routeHandler.c.SyncOnDemand.FetchManifestForStream(ctx, name, reference) if err != nil { diff --git a/pkg/cli/server/root.go b/pkg/cli/server/root.go index 61f4e4e2..6ae5bb16 100644 --- a/pkg/cli/server/root.go +++ b/pkg/cli/server/root.go @@ -1565,6 +1565,33 @@ func validateSync(config *config.Config, logger zlog.Logger) error { // can't check with IsSyncEnabled(), because it can't test invalid sync configs if extensionsConfig != nil && extensionsConfig.Sync != nil && len(extensionsConfig.Sync.Registries) > 0 { for regID, regCfg := range extensionsConfig.Sync.Registries { + // check streaming sync configuration + if regCfg.IsStreamEnabled() { + if !regCfg.OnDemand { + msg := "streaming sync requires onDemand to be enabled" + logger.Error().Err(zerr.ErrBadConfig).Int("id", regID).Interface("extensions.sync.registries[id]", + extensionsConfig.Sync.Registries[regID]).Msg(msg) + + return fmt.Errorf("%w: %s", zerr.ErrBadConfig, msg) + } + + if regCfg.MaxRetries != nil { + msg := "maxRetries cannot be used when streaming sync is enabled" + logger.Error().Err(zerr.ErrBadConfig).Int("id", regID).Interface("extensions.sync.registries[id]", + extensionsConfig.Sync.Registries[regID]).Msg(msg) + + return fmt.Errorf("%w: %s", zerr.ErrBadConfig, msg) + } + + if regCfg.RetryDelay != nil { + msg := "retryDelay cannot be used when streaming sync is enabled" + logger.Error().Err(zerr.ErrBadConfig).Int("id", regID).Interface("extensions.sync.registries[id]", + extensionsConfig.Sync.Registries[regID]).Msg(msg) + + return fmt.Errorf("%w: %s", zerr.ErrBadConfig, msg) + } + } + // check retry options are configured for sync if regCfg.MaxRetries != nil && regCfg.RetryDelay == nil { msg := "retryDelay is required when using maxRetries" diff --git a/pkg/cli/server/root_test.go b/pkg/cli/server/root_test.go index a4d4439d..5524e4c6 100644 --- a/pkg/cli/server/root_test.go +++ b/pkg/cli/server/root_test.go @@ -3446,3 +3446,101 @@ func TestBearerASMConfigValidation(t *testing.T) { }) }) } + +func TestValidateStreamingSync(t *testing.T) { + Convey("Test streaming sync config validation", t, func() { + Convey("Valid streaming sync with onDemand enabled", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "onDemand": true, "stream": true}]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldBeNil) + }) + + Convey("Reject streaming sync when onDemand is false", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "onDemand": false, "stream": true}]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldNotBeNil) + So(err, ShouldWrap, zerr.ErrBadConfig) + So(err.Error(), ShouldContainSubstring, "streaming sync requires onDemand to be enabled") + }) + + Convey("Reject streaming sync when maxRetries is set", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "onDemand": true, "stream": true, "maxRetries": 3, "retryDelay": "10s"}]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldNotBeNil) + So(err, ShouldWrap, zerr.ErrBadConfig) + So(err.Error(), ShouldContainSubstring, "maxRetries cannot be used when streaming sync is enabled") + }) + + Convey("Reject streaming sync when retryDelay is set without maxRetries", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "onDemand": true, "stream": true, "retryDelay": "10s"}]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldNotBeNil) + So(err, ShouldWrap, zerr.ErrBadConfig) + So(err.Error(), ShouldContainSubstring, "retryDelay cannot be used when streaming sync is enabled") + }) + + Convey("Non-streaming sync allows maxRetries with retryDelay", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [{"urls":["localhost:9999"], + "onDemand": true, "maxRetries": 3, "retryDelay": "10s"}]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldBeNil) + }) + + Convey("Streaming registry and non-streaming registry can coexist", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [ + {"urls":["localhost:9999"], "onDemand": true, "stream": true}, + {"urls":["localhost:9998"], "onDemand": true, "maxRetries": 3, "retryDelay": "10s"} + ]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldBeNil) + }) + + Convey("Streaming upstream and periodic sync upstreams can coexist", func() { + content := `{"storage":{"rootDirectory":"/tmp/zot"}, + "http":{"address":"127.0.0.1","port":"8080","realm":"zot", + "auth":{"htpasswd":{"path":"test/data/htpasswd"},"failDelay":1}}, + "extensions":{"sync": {"registries": [ + {"urls":["localhost:9999"], "onDemand": true, "stream": true}, + {"urls":["localhost:9998"], "onDemand": false, "pollInterval": "12h"} + ]}}}` + cfg := config.New() + tmpfile := MakeTempFileWithContent(t, "zot-test.json", content) + err := cli.LoadConfiguration(cfg, tmpfile) + So(err, ShouldBeNil) + }) + }) +} diff --git a/pkg/extensions/config/config.go b/pkg/extensions/config/config.go index 3575723f..006ec858 100644 --- a/pkg/extensions/config/config.go +++ b/pkg/extensions/config/config.go @@ -133,13 +133,19 @@ func (e *ExtensionConfig) IsSyncEnabled() bool { (e.Sync.Enable == nil && len(e.Sync.Registries) > 0)) } -// IsStreamingEnabled checks if streaming is enabled in this extensions config. +// IsStreamingEnabled checks if streaming is enabled for any upstream registry in the sync config. func (e *ExtensionConfig) IsStreamingEnabled() bool { - if e == nil { + if e == nil || e.Sync == nil { return false } - return e.Sync != nil && e.Sync.Stream != nil && *e.Sync.Stream + for i := range e.Sync.Registries { + if e.Sync.Registries[i].IsStreamEnabled() { + return true + } + } + + return false } // IsScrubEnabled checks if scrub is enabled in this extensions config. diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go index 3e29327e..9ad31c53 100644 --- a/pkg/extensions/config/sync/config.go +++ b/pkg/extensions/config/sync/config.go @@ -13,9 +13,7 @@ type Credentials struct { } type Config struct { - Enable *bool - // Stream is set to true when it is desired to stream blobs to clients as they are being synced to zot. - Stream *bool + Enable *bool CredentialsFile string /* DownloadDir is needed only in case of using cloud based storages it uses regclient to first copy images into this dir (as oci layout) @@ -25,11 +23,13 @@ type Config struct { } type RegistryConfig struct { - URLs []string - PollInterval time.Duration - Content []Content - TLSVerify *bool - OnDemand bool + URLs []string + PollInterval time.Duration + Content []Content + TLSVerify *bool + OnDemand bool + // Stream is set to true when it is desired to stream blobs to clients as they are being synced from this upstream. + Stream *bool CertDir string MaxRetries *int RetryDelay *time.Duration @@ -47,6 +47,11 @@ func (r RegistryConfig) ShouldSyncLegacyCosignTags() bool { return r.SyncLegacyCosignTags == nil || *r.SyncLegacyCosignTags } +// IsStreamEnabled returns true if streaming is enabled for this registry config. +func (r RegistryConfig) IsStreamEnabled() bool { + return r.Stream != nil && *r.Stream +} + type Content struct { Prefix string Tags *Tags diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go index f24cbc0e..289ca318 100644 --- a/pkg/extensions/extension_sync.go +++ b/pkg/extensions/extension_sync.go @@ -65,8 +65,14 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, // Get cluster config safely clusterConfig := config.CopyClusterConfig() + // Only pass the stream manager to services that have streaming enabled on their registry config. + var svcStreamManager sync.StreamManager + if registryConfig.Stream != nil && *registryConfig.Stream { + svcStreamManager = streamManager + } + service, err := sync.New( - registryConfig, credsPath, clusterConfig, tmpDir, storeController, streamManager, metaDB, log) + registryConfig, credsPath, clusterConfig, tmpDir, storeController, svcStreamManager, metaDB, log) if err != nil { log.Error().Err(err).Msg("failed to initialize sync extension") diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index eec81630..95665ddf 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -53,6 +53,17 @@ func (onDemand *BaseOnDemand) StreamManager() StreamManager { return onDemand.streamManager } +// IsStreamingEnabledForRepo returns true if any on-demand service has streaming enabled for the given repo. +func (onDemand *BaseOnDemand) IsStreamingEnabledForRepo(repo string) bool { + for _, service := range onDemand.services { + if service.IsStreamingForRepo(repo) { + return true + } + } + + return false +} + // FetchManifestForStream directly fetches the manifest from the upstream services and prepares the image // for streaming. // This is only intended for use with streaming sync. diff --git a/pkg/extensions/sync/on_demand_disabled.go b/pkg/extensions/sync/on_demand_disabled.go index 6d040c98..c2d668f9 100644 --- a/pkg/extensions/sync/on_demand_disabled.go +++ b/pkg/extensions/sync/on_demand_disabled.go @@ -29,3 +29,7 @@ func (onDemand *BaseOnDemand) FetchManifestForStream( func (onDemand *BaseOnDemand) StreamManager() StreamManager { return nil } + +func (onDemand *BaseOnDemand) IsStreamingEnabledForRepo(_ string) bool { + return false +} diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index 2e6b5f3c..39c3cdfc 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -215,6 +215,21 @@ func (service *BaseService) CanRetryOnError() bool { return false } +// IsStreamingForRepo returns whether streaming is enabled for the given local repo on this service. +// Streaming is enabled if the registry config has Stream set to true and the repo matches the content config. +func (service *BaseService) IsStreamingForRepo(repo string) bool { + if !service.config.IsStreamEnabled() { + return false + } + + // If no content filter is configured, all repos match. + if len(service.config.Content) == 0 { + return true + } + + return service.contentManager.GetContentByLocalRepo(repo) != nil +} + func (service *BaseService) GetSyncTimeout() time.Duration { if service.config.SyncTimeout == 0 { return syncConstants.DefaultSyncTimeout @@ -520,7 +535,7 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot copyOpts := []regclient.ImageOpts{} - if service.streamManager != nil { + if service.config.Stream != nil && *service.config.Stream && service.streamManager != nil { service.log.Debug().Str("repo", localRepo).Str("reference", remoteImageRef.Tag). Msg("streaming is enabled. Enabling reader hook") copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader)) diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 13b15c9f..92874280 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -39,6 +39,8 @@ type Service interface { GetSyncTimeout() time.Duration FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) + // Returns whether streaming is enabled for the given local repo on this service. + IsStreamingForRepo(repo string) bool } // Registry interface must be implemented by local and remote registries.