From 2fb691cd3b24edf63f803ace088cef493802ed3b Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Sat, 16 May 2026 18:44:10 +0530 Subject: [PATCH] feat(sync): additional changes for streaming Signed-off-by: Vishwas Rajashekar --- ...ync-normal.json => config-sync-local.json} | 3 +- examples/config-sync-normal-local-build.json | 30 +++ examples/config-sync-normal-mainline.json | 30 +++ pkg/api/controller.go | 11 +- pkg/api/routes.go | 45 +++- pkg/extensions/extension_sync.go | 13 +- pkg/extensions/extension_sync_disabled.go | 2 +- pkg/extensions/sync/chunked_blob_reader.go | 37 ++- pkg/extensions/sync/errors.go | 10 + pkg/extensions/sync/on_demand.go | 51 +++- pkg/extensions/sync/on_demand_disabled.go | 7 +- pkg/extensions/sync/service.go | 65 +----- pkg/extensions/sync/stream_manager.go | 219 ++++++++++++++++-- pkg/extensions/sync/sync_internal_test.go | 8 +- 14 files changed, 421 insertions(+), 110 deletions(-) rename examples/{config-sync-normal.json => config-sync-local.json} (87%) create mode 100644 examples/config-sync-normal-local-build.json create mode 100644 examples/config-sync-normal-mainline.json create mode 100644 pkg/extensions/sync/errors.go diff --git a/examples/config-sync-normal.json b/examples/config-sync-local.json similarity index 87% rename from examples/config-sync-normal.json rename to examples/config-sync-local.json index 28dd6bf7..14b7a997 100644 --- a/examples/config-sync-normal.json +++ b/examples/config-sync-local.json @@ -1,7 +1,7 @@ { "distSpecVersion": "1.1.1", "storage": { - "rootDirectory": "./temp/zot1" + "rootDirectory": "./temp/zot" }, "http": { "address": "127.0.0.1", @@ -13,6 +13,7 @@ "extensions": { "sync": { "enable": true, + "enableStream": true, "registries": [ { "urls": [ diff --git a/examples/config-sync-normal-local-build.json b/examples/config-sync-normal-local-build.json new file mode 100644 index 00000000..c398a4de --- /dev/null +++ b/examples/config-sync-normal-local-build.json @@ -0,0 +1,30 @@ +{ + "distSpecVersion": "1.1.1", + "storage": { + "rootDirectory": "./temp/zotlocalbuildnormal" + }, + "http": { + "address": "127.0.0.1", + "port": "8082" + }, + "log": { + "level": "debug" + }, + "extensions": { + "sync": { + "enable": true, + "registries": [ + { + "urls": [ + "http://localhost:9000" + ], + "onDemand": true, + "tlsVerify": false, + "maxRetries": 5, + "retryDelay": "30s", + "syncTimeout": "10m" + } + ] + } + } +} \ No newline at end of file diff --git a/examples/config-sync-normal-mainline.json b/examples/config-sync-normal-mainline.json new file mode 100644 index 00000000..179ee10a --- /dev/null +++ b/examples/config-sync-normal-mainline.json @@ -0,0 +1,30 @@ +{ + "distSpecVersion": "1.1.1", + "storage": { + "rootDirectory": "./temp/zotmainlinenormal" + }, + "http": { + "address": "127.0.0.1", + "port": "8081" + }, + "log": { + "level": "debug" + }, + "extensions": { + "sync": { + "enable": true, + "registries": [ + { + "urls": [ + "http://localhost:9000" + ], + "onDemand": true, + "tlsVerify": false, + "maxRetries": 5, + "retryDelay": "30s", + "syncTimeout": "10m" + } + ] + } + } +} \ No newline at end of file diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 3d1817ba..778a6d74 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -43,7 +43,6 @@ const ( type Controller struct { Config *config.Config Router *mux.Router - StreamManager sync.StreamManager MetaDB mTypes.MetaDB StoreController storage.StoreController Log log.Logger @@ -377,12 +376,6 @@ func (c *Controller) Init() error { } } - if extensionsConfig.IsStreamingEnabled() { - c.Log.Info().Msg("streaming sync enabled") - sm := sync.NewChunkingStreamManager(c.Config, c.Log) - c.StreamManager = sm - } - return nil } @@ -606,8 +599,7 @@ func (c *Controller) StartBackgroundTasks() { // Always call EnableSyncExtension to ensure proper logging, even when sync is disabled //nolint: contextcheck - syncOnDemand, err := ext.EnableSyncExtension( - c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.StreamManager, c.Log) + syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log) if err != nil { c.Log.Error().Err(err).Msg("failed to start sync extension") } @@ -663,4 +655,5 @@ type SyncOnDemand interface { SyncImage(ctx context.Context, repo, reference string) error SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) + StreamManager() sync.StreamManager } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index ed8f5967..d95a340a 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1119,15 +1119,24 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re if err != nil { details := zerr.GetDetails(err) + if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic,dupl // errorslint conflicts with gocritic:IfElseChain details["digest"] = digest.String() e := apiErr.NewError(apiErr.DIGEST_INVALID).AddDetail(details) zcommon.WriteJSON(response, http.StatusBadRequest, apiErr.NewErrorList(e)) } else if errors.Is(err, zerr.ErrRepoNotFound) { + streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) + if streamErr == nil { + return + } details["name"] = name e := apiErr.NewError(apiErr.NAME_UNKNOWN).AddDetail(details) zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e)) } else if errors.Is(err, zerr.ErrBlobNotFound) { + streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) + if streamErr == nil { + return + } details["digest"] = digest.String() e := apiErr.NewError(apiErr.BLOB_UNKNOWN).AddDetail(details) zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e)) @@ -1153,6 +1162,30 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re response.WriteHeader(http.StatusOK) } +func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.ResponseWriter) error { + rh.c.Log.Debug().Str("digest", digest).Msg("checking stream cache for blob existence") + + extConf := rh.c.Config.CopyExtensionsConfig() + if extConf.IsStreamingEnabled() { + // when streaming is enabled, the blob might exist in the stream cache + blobSize, blobMediaType, err := rh.c.SyncOnDemand.StreamManager().CachedBlobInfo(digest) + if err != nil { + rh.c.Log.Error().Err(err).Str("digest", digest).Msg("error checking stream cache for blob existence") + + return err + } + blen := blobSize + + response.Header().Set("Content-Length", strconv.FormatInt(blen, 10)) + response.Header().Set("Accept-Ranges", "bytes") + response.Header().Set("Content-Type", blobMediaType) + response.Header().Set(constants.DistContentDigestKey, digest) + response.WriteHeader(http.StatusOK) + } + + return nil +} + type httpRange struct { start int64 end int64 @@ -1463,7 +1496,7 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrBlobNotFound) { rh.c.Log.Info().Msg("blob was not found. Connecting client to stream") - copier, err := rh.c.StreamManager.ConnectClient(digest.String(), response) + copier, err := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response) if err != nil { rh.c.Log.Error().Err(err).Msg("failed to connect client to stream") response.WriteHeader(http.StatusInternalServerError) @@ -1471,7 +1504,6 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ return } - // TODO: handle partial err = copier.Copy() if err != nil { rh.c.Log.Error().Err(err).Msg("unexpected error during stream copy") @@ -2678,7 +2710,7 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore extConf := routeHandler.c.Config.CopyExtensionsConfig() - // if streaming enabled, return manifest immediately, start sync in background + // if streaming enabled, return manifest immediately if extConf.IsStreamingEnabled() { routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). Msg("streaming is enabled. Direct fetching manifest.") @@ -2699,13 +2731,6 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore return imgStore.GetImageManifest(name, reference) } - go func() { - if errSync := routeHandler.c.SyncOnDemand.SyncImage(ctx, name, reference); errSync != nil { - routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference). - Msg("failed to sync image") - } - }() - return content, fetchedManifest.GetDescriptor().Digest, fetchedManifest.GetDescriptor().MediaType, nil } diff --git a/pkg/extensions/extension_sync.go b/pkg/extensions/extension_sync.go index e3cfc482..f24cbc0e 100644 --- a/pkg/extensions/extension_sync.go +++ b/pkg/extensions/extension_sync.go @@ -19,7 +19,7 @@ import ( ) func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, - storeController storage.StoreController, sch *scheduler.Scheduler, sm sync.StreamManager, log log.Logger, + storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger, ) (*sync.BaseOnDemand, error) { // Get extensions config safely extensionsConfig := config.CopyExtensionsConfig() @@ -32,6 +32,14 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, onDemand := sync.NewOnDemand(log) syncConfig := extensionsConfig.GetSyncConfig() + var streamManager sync.StreamManager + + if extensionsConfig.IsStreamingEnabled() { + log.Info().Msg("streaming sync enabled. Initializing stream manager.") + streamManager = sync.NewChunkingStreamManager(config, log) + onDemand.SetStreamManager(streamManager) + } + for _, registryConfig := range syncConfig.Registries { if len(registryConfig.URLs) > 1 { if err := removeSelfURLs(httpAddress, httpPort, ®istryConfig, log); err != nil { @@ -57,7 +65,8 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, // Get cluster config safely clusterConfig := config.CopyClusterConfig() - service, err := sync.New(registryConfig, credsPath, clusterConfig, tmpDir, storeController, sm, metaDB, log) + service, err := sync.New( + registryConfig, credsPath, clusterConfig, tmpDir, storeController, streamManager, metaDB, log) if err != nil { log.Error().Err(err).Msg("failed to initialize sync extension") diff --git a/pkg/extensions/extension_sync_disabled.go b/pkg/extensions/extension_sync_disabled.go index ea66fcd1..dbab182b 100644 --- a/pkg/extensions/extension_sync_disabled.go +++ b/pkg/extensions/extension_sync_disabled.go @@ -13,7 +13,7 @@ import ( // EnableSyncExtension ... func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB, - storeController storage.StoreController, sch *scheduler.Scheduler, sm sync.StreamManager, log log.Logger, + storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger, ) (*sync.BaseOnDemand, error) { log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't include this feature," + "please build a binary that does so") diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index 6336679b..7e3c96d0 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -24,6 +24,7 @@ type ChunkedBlobReader struct { InFlightReader *blob.BReader clientMu sync.Mutex + clientCond *sync.Cond chunksMu sync.RWMutex clients map[int]chan int64 numClientsTotal int @@ -37,13 +38,17 @@ func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Lo return nil, err } - return &ChunkedBlobReader{ + cbr := &ChunkedBlobReader{ clients: make(map[int]chan int64), logger: logger, onDiskPath: onDiskPath, onDiskFile: createdFile, chunkSizeBytes: chunkSizeBytes, - }, nil + } + + cbr.clientCond = sync.NewCond(&cbr.clientMu) + + return cbr, nil } func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) { @@ -55,12 +60,11 @@ func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { if cbr.InFlightReader == nil { - return 0, errors.New("reader not initialized") + return 0, ErrReaderNotInitialized } cbr.chunksMu.Lock() - // TODO: This is duplicating file IO so that the stream logic can access it easily. It would be more efficient to // Access the file that regclient is writing to avoid this extra duplication. var internalBuffBytes []byte = make([]byte, 0, cbr.chunkSizeBytes) internalBuff := bytes.NewBuffer(internalBuffBytes) @@ -71,7 +75,6 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { if err != nil { if !errors.Is(err, io.EOF) { cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader") - // TODO: This means there was an upstream read error. Should the in-progress streams be terminated? copy(buff, internalBuff.Bytes()) cbr.chunksMu.Unlock() @@ -108,7 +111,10 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { // the client would create a subscription here with a channel where latest chunk info is sent. func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int { cbr.clientMu.Lock() - defer cbr.clientMu.Unlock() + defer func() { + cbr.clientCond.Broadcast() + cbr.clientMu.Unlock() + }() cbr.clients[cbr.numClientsTotal] = channel chanId := cbr.numClientsTotal @@ -127,11 +133,15 @@ func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int { return chanId } -func (cbr *ChunkedBlobReader) Unsubscribe(id int) { +func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) { cbr.clientMu.Lock() - defer cbr.clientMu.Unlock() + defer func() { + cbr.clientCond.Broadcast() + cbr.clientMu.Unlock() + }() - delete(cbr.clients, id) + delete(cbr.clients, clientId) + cbr.numClientsTotal-- } func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader { @@ -141,3 +151,12 @@ func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader { blob.WithReader(cbr), ) } + +func (cbr *ChunkedBlobReader) WaitForClientEmpty() { + cbr.clientMu.Lock() + defer cbr.clientMu.Unlock() + + for len(cbr.clients) > 0 { + cbr.clientCond.Wait() + } +} diff --git a/pkg/extensions/sync/errors.go b/pkg/extensions/sync/errors.go new file mode 100644 index 00000000..7ab1fab6 --- /dev/null +++ b/pkg/extensions/sync/errors.go @@ -0,0 +1,10 @@ +package sync + +import "errors" + +var ( + ErrReaderNotInitialized = errors.New("reader not initialized") + ErrManifestNotFoundOnDemandDisabl = errors.New("manifest not found in ondemand disabled") + ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams") + ErrChunkingReaderNotInitialized = errors.New("chunking blob reader not initialized for this blob!") +) diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 81b3228f..4c476da4 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -9,6 +9,7 @@ import ( "time" "github.com/regclient/regclient/types/manifest" + zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/common" "zotregistry.dev/zot/v2/pkg/log" @@ -31,8 +32,9 @@ process just the first one, also keep track of all background retrying routines. type BaseOnDemand struct { services []Service // map[request]chan err - requestStore *sync.Map - log log.Logger + requestStore *sync.Map + log log.Logger + streamManager StreamManager } func NewOnDemand(log log.Logger) *BaseOnDemand { @@ -43,21 +45,64 @@ func (onDemand *BaseOnDemand) Add(service Service) { onDemand.services = append(onDemand.services, service) } +func (onDemand *BaseOnDemand) SetStreamManager(sm StreamManager) { + onDemand.streamManager = sm +} + +func (onDemand *BaseOnDemand) StreamManager() StreamManager { + return onDemand.streamManager +} + func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { + // An image might already be streaming in which case, just return the one in cache. + if onDemand.streamManager != nil { + manifest, ok := onDemand.streamManager.StreamingImageManifest(repo, reference) + if ok { + onDemand.log.Debug().Str("repo", repo).Str("reference", reference). + Msg("streaming manifest already present in cache.") + + return manifest, nil + } + } + var manifest manifest.Manifest + for _, service := range onDemand.services { onDemand.log.Info().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest") fetchedManifest, err := service.FetchManifest(ctx, repo, reference) if err != nil { onDemand.log.Error().Err(err).Msg("failed to fetch manifest from service") + continue } manifest = fetchedManifest + break } if manifest == nil { - return nil, errors.New("not found") + return nil, zerr.ErrBlobNotFound + } + + if onDemand.streamManager != nil { + onDemand.log.Debug().Str("repo", repo).Str("reference", reference). + Msg("storing image for streaming.") + + err := onDemand.streamManager.StoreImageForStreaming(repo, reference, manifest) + if err != nil { + onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference). + Msg("failed to store manifest for streaming") + + return nil, err + } + + // sync the image in the background + go func() { + if errSync := onDemand.SyncImage(ctx, repo, reference); errSync != nil { + onDemand.log.Err(errSync).Str("repository", repo).Str("reference", reference). + Msg("failed to sync image") + } + }() } return manifest, nil diff --git a/pkg/extensions/sync/on_demand_disabled.go b/pkg/extensions/sync/on_demand_disabled.go index 5566e877..bde7a786 100644 --- a/pkg/extensions/sync/on_demand_disabled.go +++ b/pkg/extensions/sync/on_demand_disabled.go @@ -4,7 +4,6 @@ package sync import ( "context" - "errors" "github.com/regclient/regclient/types/manifest" ) @@ -22,5 +21,9 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string, } func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { - return nil, errors.New("manifest not found in ondemand disabled") + return nil, ErrManifestNotFoundOnDemandDisabl +} + +func (onDemand *BaseOnDemand) StreamManager() StreamManager { + return nil } diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index 38169244..149c57c5 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -4,7 +4,6 @@ package sync import ( "context" - "encoding/json" "errors" "fmt" "net/http" @@ -16,7 +15,6 @@ import ( "time" godigest "github.com/opencontainers/go-digest" - ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/regclient/regclient" "github.com/regclient/regclient/config" "github.com/regclient/regclient/mod" @@ -333,57 +331,6 @@ func (service *BaseService) FetchManifest(ctx context.Context, repo, reference s return nil, err } - // if this is being executed, it is for sure part of streaming. - // install chunked blob readers for each blob into the stream manager's cache - if m != nil { - // first for the manifest blob - err := service.streamManager.PrepareActiveStreamForBlob(m.GetDescriptor().Digest) - if err != nil { - return nil, err - } - - var contents ispec.Manifest - contentBytes, err := m.RawBody() - if err != nil { - return nil, err - } - - err = json.Unmarshal(contentBytes, &contents) - if err != nil { - return nil, err - } - - // imager, ok := orig.(manifest.Imager) - // if !ok { - // return nil, errors.New("failed to convert to imager") - // } - - // next, for config - // cfg, err := imager.GetConfig() - // if err != nil { - // return nil, err - // } - - err = service.streamManager.PrepareActiveStreamForBlob(contents.Config.Digest) - if err != nil { - return nil, err - } - - // finally, for all layers - // layers, err := imager.GetLayers() - // if err != nil { - // return nil, err - // } - - layers := contents.Layers - for _, layer := range layers { - err = service.streamManager.PrepareActiveStreamForBlob(layer.Digest) - if err != nil { - return nil, err - } - } - } - return m, nil } @@ -574,7 +521,8 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot copyOpts := []regclient.ImageOpts{} if service.streamManager != nil { - service.log.Info().Str("repo", localRepo).Str("reference", remoteImageRef.Tag).Msg("streaming is enabled. Enabling reader hook") + service.log.Info().Str("repo", localRepo).Str("reference", remoteImageRef.Tag). + Msg("streaming is enabled. Enabling reader hook") copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader)) } @@ -717,6 +665,15 @@ func (service *BaseService) syncImage(ctx context.Context, localRepo, remoteRepo // just in case there is an error before commit() which cleans up. defer service.destination.CleanupImage(localImageRef, localRepo) //nolint: errcheck + // clears the stream cache after the sync is done in both error as well as committed cases. + defer func() { + if service.streamManager != nil { + service.log.Debug().Str("repo", localRepo).Str("reference", tag).Msg("cleaning up stream cache after sync") + // run in a goroutine as the cleanup waits for clients to drain + go service.streamManager.RemoveStreamingImage(localRepo, tag) + } + }() + // first sync image skipped, err := service.syncRef(ctx, localRepo, remoteImageRef, localImageRef, localDigest) if err != nil { diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 4027c5c9..410b425b 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -1,14 +1,17 @@ package sync import ( - "errors" "io" + "os" "path" "sync" godigest "github.com/opencontainers/go-digest" "github.com/regclient/regclient/types/blob" + "github.com/regclient/regclient/types/descriptor" + manifestpkg "github.com/regclient/regclient/types/manifest" + zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/api/config" "zotregistry.dev/zot/v2/pkg/log" ) @@ -16,12 +19,21 @@ import ( type StreamManager interface { ConnectClient(blobDigest string, writer io.Writer) (*InFlightBlobCopier, error) StreamingBlobReader(reader *blob.BReader) (*blob.BReader, error) - PrepareActiveStreamForBlob(blobDigest godigest.Digest) error + StoreImageForStreaming(repo, reference string, manifest manifestpkg.Manifest) error + StreamingImageManifest(repo, reference string) (manifestpkg.Manifest, bool) + RemoveStreamingImage(repo, reference string) + CachedBlobInfo(blobDigest string) (blen int64, mediaType string, err error) } type ChunkingStreamManager struct { - tempStore StreamTempStore - activeStreams map[string]*ChunkedBlobReader + tempStore StreamTempStore + // activeStreams maps blob digest to the corresponding chunked blob reader + // that is currently active and receiving data for that blob. + activeStreams map[string]*ChunkedBlobReader + // streamingRefs holds the references to the images that are currently being streamed and their corresponding manifest. + streamingRefs map[string]manifestpkg.Manifest + // blobInfo holds blobs and their corresponding descriptor. + blobInfoMap map[string]descriptor.Descriptor logger log.Logger streamLock sync.Mutex chunkSizeBytes int64 @@ -34,6 +46,8 @@ func NewChunkingStreamManager(config *config.Config, logger log.Logger) *Chunkin return &ChunkingStreamManager{ tempStore: store, activeStreams: map[string]*ChunkedBlobReader{}, + streamingRefs: map[string]manifestpkg.Manifest{}, + blobInfoMap: map[string]descriptor.Descriptor{}, logger: logger, chunkSizeBytes: *extConf.Sync.StreamChunkSizeBytes, } @@ -44,10 +58,9 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ sm.streamLock.Lock() defer sm.streamLock.Unlock() - // TODO: this can result in a race condition if the ImageCopy with Options hasn't triggered the hook yet stream, ok := sm.activeStreams[blobDigest] if !ok { - return nil, errors.New("blob not found in active streams") + return nil, ErrBlobNotFoundInActiveStreams } dig, err := godigest.Parse(blobDigest) @@ -61,6 +74,18 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ return copier, nil } +func (sm *ChunkingStreamManager) CachedBlobInfo(blobDigest string) (int64, string, error) { + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + desc, ok := sm.blobInfoMap[blobDigest] + if !ok { + return 0, "", zerr.ErrBlobNotFound + } + + return desc.Size, desc.MediaType, nil +} + // StreamingBlobReader is executed inside regclient as part of the reader hook. func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blob.BReader, error) { sm.streamLock.Lock() @@ -74,7 +99,7 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo // as the code here only supplies the reader and the chunk count chunkingReader, ok := sm.activeStreams[digest] if !ok { - return nil, errors.New("chunking blob reader not initialized for this blob!") + return nil, ErrChunkingReaderNotInitialized } chunkingReader.InitReader(reader, chunkCount(size, sm.chunkSizeBytes)) @@ -94,23 +119,187 @@ func chunkCount(blobSize int64, chunkSizeBytes int64) int64 { return chunkCount } -func (sm *ChunkingStreamManager) PrepareActiveStreamForBlob(blobDigest godigest.Digest) error { - sm.streamLock.Lock() - defer sm.streamLock.Unlock() - - _, ok := sm.activeStreams[blobDigest.String()] +func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descriptor.Descriptor) error { + _, ok := sm.activeStreams[descriptor.Digest.String()] if ok { - sm.logger.Warn().Str("blob", blobDigest.String()).Msg("active stream already exists for blob") + sm.logger.Warn().Str("blob", descriptor.Digest.String()).Msg("active stream already exists for blob") return nil } - r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(blobDigest), sm.chunkSizeBytes, sm.logger) + r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.chunkSizeBytes, sm.logger) if err != nil { return err } - sm.activeStreams[blobDigest.String()] = r + sm.activeStreams[descriptor.Digest.String()] = r + sm.blobInfoMap[descriptor.Digest.String()] = descriptor return nil } + +func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, manifest manifestpkg.Manifest) error { + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + key := repo + ":" + reference + + if _, ok := sm.streamingRefs[key]; ok { + sm.logger.Warn().Str("repo", repo).Str("reference", reference). + Msg("streaming manifest already exists for repo:reference") + + return nil + } + + // populate the manifest into streamingRefs + sm.streamingRefs[key] = manifest + + // pre-load the individual blobs into activeStreams + // first, the manifest + err := sm.prepareActiveStreamForBlob(manifest.GetDescriptor()) + if err != nil { + sm.logger.Error().Err(err).Str("blob", manifest.GetDescriptor().Digest.String()). + Msg("failed to prepare active stream for blob") + + return err + } + + imager, ok := manifest.(manifestpkg.Imager) + if !ok { + sm.logger.Warn().Str("repo", repo).Str("reference", reference). + Msg("failed to cast manifest to imager, skipping pre-loading config and layers for streaming") + + return nil + } + + // then, the config blob + configDesc, err := imager.GetConfig() + if err != nil { + sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()). + Msg("failed to get config descriptor from manifest") + + return err + } + + err = sm.prepareActiveStreamForBlob(configDesc) + if err != nil { + sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).Msg("failed to prepare active stream for blob") + + return err + } + + // finally, the layer blobs + layers, err := imager.GetLayers() + if err != nil { + sm.logger.Error().Err(err).Msg("failed to get layers from manifest") + + return err + } + + for _, layer := range layers { + err = sm.prepareActiveStreamForBlob(layer) + if err != nil { + sm.logger.Error().Err(err).Str("blob", layer.Digest.String()).Msg("failed to prepare active stream for blob") + + return err + } + } + + return nil +} + +func (sm *ChunkingStreamManager) StreamingImageManifest(repo, reference string) (manifestpkg.Manifest, bool) { + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + key := repo + ":" + reference + manifest, ok := sm.streamingRefs[key] + + return manifest, ok +} + +func (sm *ChunkingStreamManager) RemoveStreamingImage(repo, reference string) { + sm.streamLock.Lock() + defer sm.streamLock.Unlock() + + key := repo + ":" + reference + + manifest, ok := sm.streamingRefs[key] + if !ok { + sm.logger.Warn().Str("repo", repo).Str("reference", reference). + Msg("no streaming manifest found for repo:reference") + + return + } + + sm.logger.Info().Str("repo", repo).Str("reference", reference).Msg("removing streaming image") + + imager, ok := manifest.(manifestpkg.Imager) + if !ok { + sm.logger.Warn().Str("repo", repo).Str("reference", reference). + Msg("failed to cast manifest to imager, skipping removal of active streams for config and layers") + + return + } + + // config blob + configDesc, err := imager.GetConfig() + if err != nil { + sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()). + Msg("failed to get config descriptor from manifest") + + return + } + + sm.waitForClientDrainAndDeleteStream(configDesc.Digest.String()) + + layers, err := imager.GetLayers() + if err != nil { + sm.logger.Error().Err(err).Msg("failed to get layers from manifest") + + return + } + + for _, layer := range layers { + sm.waitForClientDrainAndDeleteStream(layer.Digest.String()) + } + + // finally, remove the manifest + sm.waitForClientDrainAndDeleteStream(manifest.GetDescriptor().Digest.String()) + + // remove the active streams for the manifest and its blobs + delete(sm.streamingRefs, key) + + sm.logger.Info().Str("repo", repo).Str("reference", reference).Msg("finished removing streaming image") +} + +func (sm *ChunkingStreamManager) waitForClientDrainAndDeleteStream(blobDigest string) { + reader, ok := sm.activeStreams[blobDigest] + if !ok { + sm.logger.Warn().Str("blob", blobDigest).Msg("no active stream found for blob") + + return + } + + reader.WaitForClientEmpty() + + delete(sm.activeStreams, blobDigest) + delete(sm.blobInfoMap, blobDigest) + + blobPath := sm.tempStore.BlobPath(godigest.FromString(blobDigest)) + _, err := os.Stat(blobPath) + if err != nil { + if os.IsNotExist(err) { + return + } + + sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to stat blob in temp store") + + return + } + + err = os.Remove(sm.tempStore.BlobPath(godigest.FromString(blobDigest))) + if err != nil { + sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to remove blob from temp store") + } +} diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 1309c5fc..4b8b3df9 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -135,7 +135,7 @@ func TestService(t *testing.T) { URLs: []string{"http://localhost"}, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) // Mock remote returns isConverted=true so OCI conversion would be attempted if not skipped @@ -654,7 +654,7 @@ func TestSyncLegacyCosignTagsSyncReferrers(t *testing.T) { SyncLegacyCosignTags: &syncLegacyFalse, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.rc = regclient.New() @@ -701,7 +701,7 @@ func TestSyncLegacyCosignTagsSyncReferrers(t *testing.T) { SyncLegacyCosignTags: &syncLegacyTrue, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.rc = regclient.New() @@ -744,7 +744,7 @@ func TestOnDemandSyncReferrersNonRecursive(t *testing.T) { SyncLegacyCosignTags: &syncLegacyFalse, } - service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger()) + service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger()) So(err, ShouldBeNil) service.rc = regclient.New()