From e2aa088e0dfc0c39ad9b9d5cb6965b92d85506d7 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Sun, 8 Feb 2026 00:37:45 +0530 Subject: [PATCH] feat(sync): initial commit for streaming sync initial working prototype for sync fix: pre-load chunk readers on manifest fetch feat: make chunkSize configurable fix minimal build fix: linter errors Signed-off-by: Vishwas Rajashekar --- examples/config-sync-normal.json | 30 ++++ examples/config-sync-stream.json | 32 +++++ pkg/api/controller.go | 13 +- pkg/api/routes.go | 64 +++++++++ pkg/extensions/config/config.go | 9 ++ pkg/extensions/config/sync/config.go | 6 +- pkg/extensions/extension_sync.go | 4 +- pkg/extensions/extension_sync_disabled.go | 2 +- pkg/extensions/sync/chunked_blob_reader.go | 143 ++++++++++++++++++++ pkg/extensions/sync/inflight_blob_copier.go | 86 ++++++++++++ pkg/extensions/sync/on_demand.go | 21 +++ pkg/extensions/sync/on_demand_disabled.go | 11 +- pkg/extensions/sync/service.go | 98 ++++++++++++++ pkg/extensions/sync/stream_manager.go | 116 ++++++++++++++++ pkg/extensions/sync/stream_temp_store.go | 49 +++++++ pkg/extensions/sync/sync.go | 3 + pkg/extensions/sync/sync_internal_test.go | 22 +-- 17 files changed, 691 insertions(+), 18 deletions(-) create mode 100644 examples/config-sync-normal.json create mode 100644 examples/config-sync-stream.json create mode 100644 pkg/extensions/sync/chunked_blob_reader.go create mode 100644 pkg/extensions/sync/inflight_blob_copier.go create mode 100644 pkg/extensions/sync/stream_manager.go create mode 100644 pkg/extensions/sync/stream_temp_store.go diff --git a/examples/config-sync-normal.json b/examples/config-sync-normal.json new file mode 100644 index 00000000..28dd6bf7 --- /dev/null +++ b/examples/config-sync-normal.json @@ -0,0 +1,30 @@ +{ + "distSpecVersion": "1.1.1", + "storage": { + "rootDirectory": "./temp/zot1" + }, + "http": { + "address": "127.0.0.1", + "port": "8080" + }, + "log": { + "level": "debug" + }, + "extensions": { + "sync": { + "enable": true, + "registries": [ + { + "urls": [ + "http://localhost:9000" + ], + "onDemand": true, + "tlsVerify": false, + "maxRetries": 5, + "retryDelay": "30s", + "syncTimeout": "10m" + } + ] + } + } +} \ No newline at end of file diff --git a/examples/config-sync-stream.json b/examples/config-sync-stream.json new file mode 100644 index 00000000..1d02490d --- /dev/null +++ b/examples/config-sync-stream.json @@ -0,0 +1,32 @@ +{ + "distSpecVersion": "1.1.1", + "storage": { + "rootDirectory": "./temp/zotstream" + }, + "http": { + "address": "127.0.0.1", + "port": "8080" + }, + "log": { + "level": "debug" + }, + "extensions": { + "sync": { + "enable": true, + "enableStreaming": true, + "streamChunkSizeBytes": 32768, + "registries": [ + { + "urls": [ + "http://localhost:9000" + ], + "onDemand": true, + "tlsVerify": false, + "maxRetries": 5, + "retryDelay": "30s", + "syncTimeout": "10m" + } + ] + } + } +} \ No newline at end of file diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 749433d2..3d1817ba 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -17,6 +17,7 @@ import ( "github.com/gorilla/mux" "github.com/gorilla/securecookie" + "github.com/regclient/regclient/types/manifest" "github.com/zitadel/oidc/v3/pkg/client/rp" "zotregistry.dev/zot/v2/errors" @@ -25,6 +26,7 @@ import ( ext "zotregistry.dev/zot/v2/pkg/extensions" events "zotregistry.dev/zot/v2/pkg/extensions/events" monitoring "zotregistry.dev/zot/v2/pkg/extensions/monitoring" + "zotregistry.dev/zot/v2/pkg/extensions/sync" log "zotregistry.dev/zot/v2/pkg/log" meta "zotregistry.dev/zot/v2/pkg/meta" mTypes "zotregistry.dev/zot/v2/pkg/meta/types" @@ -41,6 +43,7 @@ const ( type Controller struct { Config *config.Config Router *mux.Router + StreamManager sync.StreamManager MetaDB mTypes.MetaDB StoreController storage.StoreController Log log.Logger @@ -374,6 +377,12 @@ func (c *Controller) Init() error { } } + if extensionsConfig.IsStreamingEnabled() { + c.Log.Info().Msg("streaming sync enabled") + sm := sync.NewChunkingStreamManager(c.Config, c.Log) + c.StreamManager = sm + } + return nil } @@ -597,7 +606,8 @@ func (c *Controller) StartBackgroundTasks() { // Always call EnableSyncExtension to ensure proper logging, even when sync is disabled //nolint: contextcheck - syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log) + syncOnDemand, err := ext.EnableSyncExtension( + c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.StreamManager, c.Log) if err != nil { c.Log.Error().Err(err).Msg("failed to start sync extension") } @@ -652,4 +662,5 @@ func RunGCTasks(conf *config.Config, storeController storage.StoreController, me type SyncOnDemand interface { SyncImage(ctx context.Context, repo, reference string) error SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error + FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 55e009be..ed8f5967 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1455,6 +1455,37 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ writeBlobError := func(err error) { details := zerr.GetDetails(err) + extConf := rh.c.Config.CopyExtensionsConfig() + + if extConf.IsStreamingEnabled() { + rh.c.Log.Info().Msg("streaming enabled. using stream logic") + + if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrBlobNotFound) { + rh.c.Log.Info().Msg("blob was not found. Connecting client to stream") + + copier, err := rh.c.StreamManager.ConnectClient(digest.String(), response) + if err != nil { + rh.c.Log.Error().Err(err).Msg("failed to connect client to stream") + response.WriteHeader(http.StatusInternalServerError) + + return + } + + // TODO: handle partial + err = copier.Copy() + if err != nil { + rh.c.Log.Error().Err(err).Msg("unexpected error during stream copy") + } + + response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10)) + response.Header().Set(constants.DistContentDigestKey, digest.String()) + response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType) + response.WriteHeader(http.StatusOK) + + return + } + } + if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic // errorslint conflicts with gocritic:IfElseChain details["digest"] = digest.String() e := apiErr.NewError(apiErr.DIGEST_INVALID).AddDetail(details) @@ -2645,6 +2676,39 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). Msg("trying to get updated image by syncing on demand") + extConf := routeHandler.c.Config.CopyExtensionsConfig() + + // if streaming enabled, return manifest immediately, start sync in background + if extConf.IsStreamingEnabled() { + routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). + Msg("streaming is enabled. Direct fetching manifest.") + + fetchedManifest, err := routeHandler.c.SyncOnDemand.FetchManifest(ctx, name, reference) + if err != nil { + routeHandler.c.Log.Err(err).Str("repository", name).Str("reference", reference). + Msg("failed to fetch manifest") + + return imgStore.GetImageManifest(name, reference) + } + + content, err := fetchedManifest.RawBody() + if err != nil { + routeHandler.c.Log.Err(err).Str("repository", name).Str("reference", reference). + Msg("failed to read manifest") + + return imgStore.GetImageManifest(name, reference) + } + + go func() { + if errSync := routeHandler.c.SyncOnDemand.SyncImage(ctx, name, reference); errSync != nil { + routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference). + Msg("failed to sync image") + } + }() + + return content, fetchedManifest.GetDescriptor().Digest, fetchedManifest.GetDescriptor().MediaType, nil + } + if errSync := routeHandler.c.SyncOnDemand.SyncImage(ctx, name, reference); errSync != nil { routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference). Msg("failed to sync image") diff --git a/pkg/extensions/config/config.go b/pkg/extensions/config/config.go index 15b442cd..792a07ed 100644 --- a/pkg/extensions/config/config.go +++ b/pkg/extensions/config/config.go @@ -133,6 +133,15 @@ func (e *ExtensionConfig) IsSyncEnabled() bool { (e.Sync.Enable == nil && len(e.Sync.Registries) > 0)) } +// IsStreamingEnabled checks if streaming is enabled in this extensions config. +func (e *ExtensionConfig) IsStreamingEnabled() bool { + if e == nil { + return false + } + + return e.Sync != nil && e.Sync.EnableStreaming != nil && *e.Sync.EnableStreaming +} + // IsScrubEnabled checks if scrub is enabled in this extensions config. func (e *ExtensionConfig) IsScrubEnabled() bool { if e == nil { diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go index c7266b0a..c242a0df 100644 --- a/pkg/extensions/config/sync/config.go +++ b/pkg/extensions/config/sync/config.go @@ -13,8 +13,10 @@ type Credentials struct { } type Config struct { - Enable *bool - CredentialsFile string + Enable *bool + EnableStreaming *bool + StreamChunkSizeBytes *int64 + CredentialsFile string /* DownloadDir is needed only in case of using cloud based storages it uses regclient to first copy images into this dir (as oci layout) and then move them into storage. */ diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go index b92f6b4b..e3cfc482 100644 --- a/pkg/extensions/extension_sync.go +++ b/pkg/extensions/extension_sync.go @@ -19,7 +19,7 @@ import ( ) func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, - storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger, + storeController storage.StoreController, sch *scheduler.Scheduler, sm sync.StreamManager, log log.Logger, ) (*sync.BaseOnDemand, error) { // Get extensions config safely extensionsConfig := config.CopyExtensionsConfig() @@ -57,7 +57,7 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, // Get cluster config safely clusterConfig := config.CopyClusterConfig() - service, err := sync.New(registryConfig, credsPath, clusterConfig, tmpDir, storeController, metaDB, log) + service, err := sync.New(registryConfig, credsPath, clusterConfig, tmpDir, storeController, sm, metaDB, log) if err != nil { log.Error().Err(err).Msg("failed to initialize sync extension") diff --git a/pkg/extensions/extension_sync_disabled.go b/pkg/extensions/extension_sync_disabled.go index dbab182b..ea66fcd1 100644 --- a/pkg/extensions/extension_sync_disabled.go +++ b/pkg/extensions/extension_sync_disabled.go @@ -13,7 +13,7 @@ import ( // EnableSyncExtension ... func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, - storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger, + storeController storage.StoreController, sch *scheduler.Scheduler, sm sync.StreamManager, log log.Logger, ) (*sync.BaseOnDemand, error) { log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't include this feature," + "please build a binary that does so") diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go new file mode 100644 index 00000000..6336679b --- /dev/null +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -0,0 +1,143 @@ +package sync + +import ( + "bytes" + "errors" + "io" + "os" + "sync" + + "github.com/regclient/regclient/types/blob" + + "zotregistry.dev/zot/v2/pkg/log" +) + +// ChunkedBlobReader is a helper that splits a blob into chunks based on chunkSize +// It then copies chunks to disk. +// The latest chunk number is announced to channels of subscribers. +type ChunkedBlobReader struct { + numChunksTotal int64 + numChunksRead int64 + chunkSizeBytes int64 + onDiskPath string + onDiskFile *os.File + + InFlightReader *blob.BReader + clientMu sync.Mutex + chunksMu sync.RWMutex + clients map[int]chan int64 + numClientsTotal int + + logger log.Logger +} + +func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Logger) (*ChunkedBlobReader, error) { + createdFile, err := os.OpenFile(onDiskPath, os.O_CREATE|os.O_WRONLY, 0o644) + if err != nil { + return nil, err + } + + return &ChunkedBlobReader{ + clients: make(map[int]chan int64), + logger: logger, + onDiskPath: onDiskPath, + onDiskFile: createdFile, + chunkSizeBytes: chunkSizeBytes, + }, nil +} + +func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) { + if cbr.InFlightReader == nil { + cbr.numChunksTotal = numChunksTotal + cbr.InFlightReader = r + } +} + +func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { + if cbr.InFlightReader == nil { + return 0, errors.New("reader not initialized") + } + + cbr.chunksMu.Lock() + + // TODO: This is duplicating file IO so that the stream logic can access it easily. It would be more efficient to + // Access the file that regclient is writing to avoid this extra duplication. + var internalBuffBytes []byte = make([]byte, 0, cbr.chunkSizeBytes) + internalBuff := bytes.NewBuffer(internalBuffBytes) + + multiWriter := io.MultiWriter(cbr.onDiskFile, internalBuff) + + numBytesRead, err := io.CopyN(multiWriter, cbr.InFlightReader, cbr.chunkSizeBytes) + if err != nil { + if !errors.Is(err, io.EOF) { + cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader") + // TODO: This means there was an upstream read error. Should the in-progress streams be terminated? + copy(buff, internalBuff.Bytes()) + cbr.chunksMu.Unlock() + + return int(numBytesRead), err + } + } + + copy(buff, internalBuff.Bytes()) + + cbr.numChunksRead++ + if cbr.numChunksRead == cbr.numChunksTotal { + cbr.onDiskFile.Close() + } + + cbr.chunksMu.Unlock() + + cbr.clientMu.Lock() + // Update all clients about the new chunk + // Clients always read the chunk from disk + var wg sync.WaitGroup + for _, c := range cbr.clients { + wg.Go(func() { + c <- cbr.numChunksRead + }) + } + cbr.clientMu.Unlock() + + wg.Wait() + + return int(numBytesRead), err +} + +// Subscribe to the reader each time a new client is interested in the current blob, +// the client would create a subscription here with a channel where latest chunk info is sent. +func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int { + cbr.clientMu.Lock() + defer cbr.clientMu.Unlock() + + cbr.clients[cbr.numClientsTotal] = channel + chanId := cbr.numClientsTotal + cbr.numClientsTotal++ + + // Announce the current number of available chunks to the new client only if the reader is initialized + if cbr.InFlightReader != nil { + cbr.chunksMu.RLock() + defer cbr.chunksMu.RUnlock() + + go func() { + channel <- cbr.numChunksRead + }() + } + + return chanId +} + +func (cbr *ChunkedBlobReader) Unsubscribe(id int) { + cbr.clientMu.Lock() + defer cbr.clientMu.Unlock() + + delete(cbr.clients, id) +} + +func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader { + return blob.NewReader( + blob.WithHeader(cbr.InFlightReader.RawHeaders()), + blob.WithDesc(cbr.InFlightReader.GetDescriptor()), + blob.WithReader(cbr), + ) +} diff --git a/pkg/extensions/sync/inflight_blob_copier.go b/pkg/extensions/sync/inflight_blob_copier.go new file mode 100644 index 00000000..227dc81b --- /dev/null +++ b/pkg/extensions/sync/inflight_blob_copier.go @@ -0,0 +1,86 @@ +package sync + +import ( + "errors" + "io" + "os" + "sync" + + "zotregistry.dev/zot/v2/pkg/log" +) + +// InFlightBlobCopier represents a client that wants to stream an image while it is being downloaded. +// The data is copied first from disk up to the latest chunk and further copies wait for an announcement +// over a channel when a new chunk is available. +type InFlightBlobCopier struct { + sync.Mutex + + numChunksCopied int64 + Source *ChunkedBlobReader + onDiskPath string + dest io.Writer + log log.Logger + chunkSizeBytes int64 +} + +func NewInFlightBlobCopier( + source *ChunkedBlobReader, onDiskPath string, dest io.Writer, chunkSizeBytes int64, logger log.Logger, +) *InFlightBlobCopier { + return &InFlightBlobCopier{ + numChunksCopied: 0, + Source: source, + dest: dest, + onDiskPath: onDiskPath, + chunkSizeBytes: chunkSizeBytes, + log: logger, + } +} + +func (ifbc *InFlightBlobCopier) Copy() error { + ifbc.log.Info().Msg("starting inflight copy") + + onDiskFile, err := os.Open(ifbc.onDiskPath) + if err != nil { + ifbc.log.Error().Err(err).Msg("failed to open on disk path") + + return err + } + defer onDiskFile.Close() + + // Register channel for latest chunk count updates + chunkChan := make(chan int64, 1) + + id := ifbc.Source.Subscribe(chunkChan) + + defer ifbc.Source.Unsubscribe(id) + defer close(chunkChan) + + for { + latestChunkNum := <-chunkChan + + ifbc.Lock() + if latestChunkNum <= ifbc.numChunksCopied { + ifbc.Unlock() + + continue + } + + _, err = io.CopyN(ifbc.dest, onDiskFile, (latestChunkNum-ifbc.numChunksCopied)*ifbc.chunkSizeBytes) + if err != nil { + if !errors.Is(err, io.EOF) { + ifbc.log.Error().Err(err).Msg("failed to copy data to downstream client") + + return err + } + } + ifbc.numChunksCopied = latestChunkNum + ifbc.Unlock() + + if latestChunkNum == ifbc.Source.numChunksTotal { + // transfer is complete + break + } + } + + return nil +} diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index aedbfa87..81b3228f 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/regclient/regclient/types/manifest" zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/common" "zotregistry.dev/zot/v2/pkg/log" @@ -42,6 +43,26 @@ func (onDemand *BaseOnDemand) Add(service Service) { onDemand.services = append(onDemand.services, service) } +func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { + var manifest manifest.Manifest + for _, service := range onDemand.services { + onDemand.log.Info().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest") + fetchedManifest, err := service.FetchManifest(ctx, repo, reference) + if err != nil { + onDemand.log.Error().Err(err).Msg("failed to fetch manifest from service") + continue + } + manifest = fetchedManifest + break + } + + if manifest == nil { + return nil, errors.New("not found") + } + + return manifest, nil +} + func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error { req := request{ repo: repo, diff --git a/pkg/extensions/sync/on_demand_disabled.go b/pkg/extensions/sync/on_demand_disabled.go index 54c2ed36..5566e877 100644 --- a/pkg/extensions/sync/on_demand_disabled.go +++ b/pkg/extensions/sync/on_demand_disabled.go @@ -2,7 +2,12 @@ package sync -import "context" +import ( + "context" + "errors" + + "github.com/regclient/regclient/types/manifest" +) type BaseOnDemand struct{} @@ -15,3 +20,7 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string, ) error { return nil } + +func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { + return nil, errors.New("manifest not found in ondemand disabled") +} diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index 009af113..38169244 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -4,6 +4,7 @@ package sync import ( "context" + "encoding/json" "errors" "fmt" "net/http" @@ -15,10 +16,12 @@ import ( "time" godigest "github.com/opencontainers/go-digest" + ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/regclient/regclient" "github.com/regclient/regclient/config" "github.com/regclient/regclient/mod" "github.com/regclient/regclient/scheme/reg" + "github.com/regclient/regclient/types/manifest" "github.com/regclient/regclient/types/ref" zerr "zotregistry.dev/zot/v2/errors" @@ -49,6 +52,7 @@ type BaseService struct { rc *regclient.RegClient hosts []config.Host tagsCache *tagsCache + streamManager StreamManager clientLock sync.RWMutex log log.Logger @@ -60,6 +64,7 @@ func New( clusterConfig *zconfig.ClusterConfig, tmpDir string, storeController storage.StoreController, + streamManager StreamManager, metadb mTypes.MetaDB, log log.Logger, ) (*BaseService, error) { @@ -71,6 +76,7 @@ func New( service.contentManager = NewContentManager(config.Content, log) service.storeController = storeController service.tagsCache = newTagsCache(defaultExpireMinutes) + service.streamManager = streamManager var err error @@ -294,6 +300,93 @@ func (service *BaseService) GetNextRepo(lastRepo string) (string, error) { return lastRepo, nil } +// FetchManifest on demand. +func (service *BaseService) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { + remoteRepo := repo + + remoteURL := service.remote.GetHostName() + + if len(service.config.Content) > 0 { + remoteRepo = service.contentManager.GetRepoSource(repo) + if remoteRepo == "" { + service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference). + Msg("will not sync image, filtered out by content") + + return nil, zerr.ErrSyncImageFilteredOut + } + } + + service.log.Info().Str("remote", remoteURL).Str("repo", repo).Str("reference", reference). + Msg("sync: fetching manifest") + + if err := service.refreshRegistryTemporaryCredentials(); err != nil { + service.log.Error().Err(err).Msg("failed to refresh credentials") + } + + artifactRef, err := service.remote.GetImageReference(remoteRepo, reference) + if err != nil { + return nil, err + } + + m, err := service.rc.ManifestGet(ctx, artifactRef) + if err != nil { + return nil, err + } + + // if this is being executed, it is for sure part of streaming. + // install chunked blob readers for each blob into the stream manager's cache + if m != nil { + // first for the manifest blob + err := service.streamManager.PrepareActiveStreamForBlob(m.GetDescriptor().Digest) + if err != nil { + return nil, err + } + + var contents ispec.Manifest + contentBytes, err := m.RawBody() + if err != nil { + return nil, err + } + + err = json.Unmarshal(contentBytes, &contents) + if err != nil { + return nil, err + } + + // imager, ok := orig.(manifest.Imager) + // if !ok { + // return nil, errors.New("failed to convert to imager") + // } + + // next, for config + // cfg, err := imager.GetConfig() + // if err != nil { + // return nil, err + // } + + err = service.streamManager.PrepareActiveStreamForBlob(contents.Config.Digest) + if err != nil { + return nil, err + } + + // finally, for all layers + // layers, err := imager.GetLayers() + // if err != nil { + // return nil, err + // } + + layers := contents.Layers + for _, layer := range layers { + err = service.streamManager.PrepareActiveStreamForBlob(layer.Digest) + if err != nil { + return nil, err + } + } + } + + return m, nil +} + // SyncImage on demand. func (service *BaseService) SyncImage(ctx context.Context, repo, reference string) error { remoteRepo := repo @@ -480,6 +573,11 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot copyOpts := []regclient.ImageOpts{} + if service.streamManager != nil { + service.log.Info().Str("repo", localRepo).Str("reference", remoteImageRef.Tag).Msg("streaming is enabled. Enabling reader hook") + copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader)) + } + // check if image is already synced skipImage, err = service.destination.CanSkipImage(localRepo, reference, remoteDigest) if err != nil { diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go new file mode 100644 index 00000000..4027c5c9 --- /dev/null +++ b/pkg/extensions/sync/stream_manager.go @@ -0,0 +1,116 @@ +package sync + +import ( + "errors" + "io" + "path" + "sync" + + godigest "github.com/opencontainers/go-digest" + "github.com/regclient/regclient/types/blob" + + "zotregistry.dev/zot/v2/pkg/api/config" + "zotregistry.dev/zot/v2/pkg/log" +) + +type StreamManager interface { + ConnectClient(blobDigest string, writer io.Writer) (*InFlightBlobCopier, error) + StreamingBlobReader(reader *blob.BReader) (*blob.BReader, error) + PrepareActiveStreamForBlob(blobDigest godigest.Digest) error +} + +type ChunkingStreamManager struct { + tempStore StreamTempStore + activeStreams map[string]*ChunkedBlobReader + logger log.Logger + streamLock sync.Mutex + chunkSizeBytes int64 +} + +func NewChunkingStreamManager(config *config.Config, logger log.Logger) *ChunkingStreamManager { + store := NewLocalTempStore(path.Join(config.Storage.RootDirectory, "stream")) + extConf := config.CopyExtensionsConfig() + + return &ChunkingStreamManager{ + tempStore: store, + activeStreams: map[string]*ChunkedBlobReader{}, + logger: logger, + chunkSizeBytes: *extConf.Sync.StreamChunkSizeBytes, + } +} + +func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writer) (*InFlightBlobCopier, error) { + // Creates a new inflight blob copier if the blobDigest is an active stream + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + // TODO: this can result in a race condition if the ImageCopy with Options hasn't triggered the hook yet + stream, ok := sm.activeStreams[blobDigest] + if !ok { + return nil, errors.New("blob not found in active streams") + } + + dig, err := godigest.Parse(blobDigest) + if err != nil { + return nil, err + } + + copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, sm.chunkSizeBytes, sm.logger) + sm.logger.Info().Str("blob", blobDigest).Msg("connected client for blob") + + return copier, nil +} + +// StreamingBlobReader is executed inside regclient as part of the reader hook. +func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blob.BReader, error) { + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + desc := reader.GetDescriptor() + digest := desc.Digest.String() + size := desc.Size + + // This expects the chunked blob reader to be initialized and ready + // as the code here only supplies the reader and the chunk count + chunkingReader, ok := sm.activeStreams[digest] + if !ok { + return nil, errors.New("chunking blob reader not initialized for this blob!") + } + + chunkingReader.InitReader(reader, chunkCount(size, sm.chunkSizeBytes)) + sm.logger.Info().Str("blob", digest).Msg("finished init chunked blob reader") + + return chunkingReader.ToBReader(), nil +} + +func chunkCount(blobSize int64, chunkSizeBytes int64) int64 { + chunkCount := blobSize / chunkSizeBytes + remainder := blobSize % chunkSizeBytes + + if remainder > 0 { + chunkCount++ + } + + return chunkCount +} + +func (sm *ChunkingStreamManager) PrepareActiveStreamForBlob(blobDigest godigest.Digest) error { + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + _, ok := sm.activeStreams[blobDigest.String()] + if ok { + sm.logger.Warn().Str("blob", blobDigest.String()).Msg("active stream already exists for blob") + + return nil + } + + r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(blobDigest), sm.chunkSizeBytes, sm.logger) + if err != nil { + return err + } + + sm.activeStreams[blobDigest.String()] = r + + return nil +} diff --git a/pkg/extensions/sync/stream_temp_store.go b/pkg/extensions/sync/stream_temp_store.go new file mode 100644 index 00000000..efd997d1 --- /dev/null +++ b/pkg/extensions/sync/stream_temp_store.go @@ -0,0 +1,49 @@ +package sync + +import ( + "errors" + "fmt" + "os" + "path" + + godigest "github.com/opencontainers/go-digest" +) + +type StreamTempStore interface { + BlobPath(digest godigest.Digest) string +} + +type LocalTempStore struct { + rootPath string +} + +func NewLocalTempStore(rootDir string) *LocalTempStore { + _, err := os.Stat(rootDir) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + err := os.MkdirAll(rootDir, 0o755) + if err != nil { + fmt.Println("failed to create root dir " + err.Error()) + } + } else { + fmt.Println("failed to stat root dir " + err.Error()) + } + } + + return &LocalTempStore{ + rootPath: rootDir, + } +} + +func (lts *LocalTempStore) BlobPath(digest godigest.Digest) string { + parentDir := path.Join(lts.rootPath, digest.Algorithm().String()) + _, err := os.Stat(parentDir) + if err != nil && errors.Is(err, os.ErrNotExist) { + err := os.MkdirAll(parentDir, 0o755) + if err != nil { + fmt.Println("failed to create directory " + err.Error()) + } + } + + return path.Join(parentDir, digest.Encoded()) +} diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 52bff553..13b15c9f 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -9,6 +9,7 @@ import ( "time" godigest "github.com/opencontainers/go-digest" + "github.com/regclient/regclient/types/manifest" "github.com/regclient/regclient/types/ref" syncconf "zotregistry.dev/zot/v2/pkg/extensions/config/sync" @@ -36,6 +37,8 @@ type Service interface { CanRetryOnError() bool // used by sync on demand to retry in background // Get the sync timeout configured for this service GetSyncTimeout() time.Duration + + FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) } // Registry interface must be implemented by local and remote registries. diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 57e7875a..1309c5fc 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -44,7 +44,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) err = service.SyncRepo(context.Background(), "repo") @@ -56,7 +56,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a context that's already cancelled @@ -73,7 +73,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a mock remote that returns tags so we can reach the loop @@ -100,7 +100,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a minimal mock remote that only returns tags @@ -188,7 +188,7 @@ func TestService(t *testing.T) { OnlySigned: &onlySigned, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a mock remote that returns an invalid reference to trigger ReferrerList error @@ -224,7 +224,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create a mock remote that returns valid references @@ -276,7 +276,7 @@ func TestService(t *testing.T) { RetryDelay: &retryDelay, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) @@ -394,7 +394,7 @@ func TestService(t *testing.T) { }}, } - service1, err := New(conf1, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service1, err := New(conf1, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Create second service for normal processing @@ -407,7 +407,7 @@ func TestService(t *testing.T) { }}, } - service2, err := New(conf2, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service2, err := New(conf2, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) @@ -439,7 +439,7 @@ func TestService(t *testing.T) { }}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger()) @@ -479,7 +479,7 @@ func TestService(t *testing.T) { }}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) onDemand := NewOnDemand(log.NewTestLogger())