From 63b7654d503e6dc57fa97ed6f70fdad812a82965 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Mon, 18 May 2026 23:47:18 +0530 Subject: [PATCH] feat(sync): fix review comments Signed-off-by: Vishwas Rajashekar --- examples/README.md | 4 +++- pkg/api/routes.go | 18 +++++++-------- pkg/extensions/config/sync/config.go | 3 ++- pkg/extensions/sync/chunked_blob_reader.go | 14 ++++++------ pkg/extensions/sync/inflight_blob_copier.go | 5 +---- pkg/extensions/sync/stream_manager.go | 25 +++++++++++++-------- test/blackbox/ci.sh | 2 +- 7 files changed, 38 insertions(+), 33 deletions(-) diff --git a/examples/README.md b/examples/README.md index fd6a9cb6..68104c83 100644 --- a/examples/README.md +++ b/examples/README.md @@ -1289,7 +1289,9 @@ Besides sync-auth.json file, zot also reads and uses docker credentials by defau ### Streaming sync Under sync, set `"stream": true` to enable streaming. With this option enabled, blobs are streamed to clients as they are being downloaded -from the upstream registry. This does not require any configuration on the upstream. +from the upstream registry. This does not require any configuration on the upstream registry. + +Note that when streaming is enabled, sync retry cannot be configured. If a stream fails, all clients on that stream are disconnected. When a client next retries, a fresh stream is started. ## Search and CVE scanning (Trivy) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 044da335..61ca9e09 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1182,9 +1182,8 @@ func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http. return err } - blen := blobSize - response.Header().Set("Content-Length", strconv.FormatInt(blen, 10)) + response.Header().Set("Content-Length", strconv.FormatInt(blobSize, 10)) response.Header().Set("Accept-Ranges", "bytes") response.Header().Set("Content-Type", blobMediaType) response.Header().Set(constants.DistContentDigestKey, digest) @@ -1512,19 +1511,18 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ return } } else { - clientCopyErr := copier.Copy() - if clientCopyErr != nil { - rh.c.Log.Error().Err(clientCopyErr).Str("digest", digest.String()).Msg("unexpected error during stream copy") - response.WriteHeader(http.StatusInternalServerError) - - return - } - response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10)) response.Header().Set(constants.DistContentDigestKey, digest.String()) response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType) response.WriteHeader(http.StatusOK) + clientCopyErr := copier.Copy() + if clientCopyErr != nil { + rh.c.Log.Error().Err(clientCopyErr).Str("digest", digest.String()).Msg("unexpected error during stream copy") + + return + } + return } } diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go index da3bcd2b..3e29327e 100644 --- a/pkg/extensions/config/sync/config.go +++ b/pkg/extensions/config/sync/config.go @@ -13,7 +13,8 @@ type Credentials struct { } type Config struct { - Enable *bool + Enable *bool + // Stream is set to true when it is desired to stream blobs to clients as they are being synced to zot. Stream *bool CredentialsFile string /* DownloadDir is needed only in case of using cloud based storages diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index d1afe5fd..991c9fa8 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -49,8 +49,7 @@ func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobRea return cbr, nil } -// InitReader initializes sets the regclient blob reader -// and the total number of bytes to read for the blob. +// InitReader sets the regclient blob reader and the total number of bytes to read for the blob. func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, numBytesTotal int64) { cbr.bytesMu.Lock() defer cbr.bytesMu.Unlock() @@ -137,28 +136,29 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { // Subscribe to the reader each time a new client is interested in the current blob, // the client would create a subscription here with a channel where latest chunk info is sent. -func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int { +func (cbr *ChunkedBlobReader) Subscribe() (chan int64, int) { cbr.clientMu.Lock() defer func() { cbr.clientCond.Broadcast() cbr.clientMu.Unlock() }() + channel := make(chan int64, 1) + cbr.clients[cbr.numClientsTotal] = channel chanId := cbr.numClientsTotal cbr.numClientsTotal++ + cbr.bytesMu.RLock() + defer cbr.bytesMu.RUnlock() // Announce the current number of available chunks to the new client only if the reader is initialized if cbr.InFlightReader != nil { - cbr.bytesMu.RLock() - defer cbr.bytesMu.RUnlock() - go func() { channel <- cbr.numBytesReadToDisk }() } - return chanId + return channel, chanId } func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) { diff --git a/pkg/extensions/sync/inflight_blob_copier.go b/pkg/extensions/sync/inflight_blob_copier.go index 507a3481..b716b9c2 100644 --- a/pkg/extensions/sync/inflight_blob_copier.go +++ b/pkg/extensions/sync/inflight_blob_copier.go @@ -47,10 +47,7 @@ func (ifbc *InFlightBlobCopier) Copy() error { } defer onDiskFile.Close() - // Register channel for latest byte count updates - byteAnnounceChan := make(chan int64, 1) - - id := ifbc.Source.Subscribe(byteAnnounceChan) + byteAnnounceChan, id := ifbc.Source.Subscribe() defer ifbc.Source.Unsubscribe(id) for { diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 1909433f..a195e071 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -106,21 +106,21 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo return chunkingReader.ToBReader(), nil } -func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descriptor.Descriptor) error { - _, ok := sm.activeStreams[descriptor.Digest.String()] +func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(desc descriptor.Descriptor) error { + _, ok := sm.activeStreams[desc.Digest.String()] if ok { - sm.logger.Warn().Str("blob", descriptor.Digest.String()).Msg("active stream already exists for blob") + sm.logger.Warn().Str("blob", desc.Digest.String()).Msg("active stream already exists for blob") return nil } - r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.logger) + r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(desc.Digest), sm.logger) if err != nil { return err } - sm.activeStreams[descriptor.Digest.String()] = r - sm.blobInfoMap[descriptor.Digest.String()] = descriptor + sm.activeStreams[desc.Digest.String()] = r + sm.blobInfoMap[desc.Digest.String()] = desc return nil } @@ -273,8 +273,15 @@ func (sm *ChunkingStreamManager) waitForClientDrainAndDeleteStream(blobDigest st delete(sm.activeStreams, blobDigest) delete(sm.blobInfoMap, blobDigest) - blobPath := sm.tempStore.BlobPath(godigest.FromString(blobDigest)) - _, err := os.Stat(blobPath) + dgst, err := godigest.Parse(blobDigest) + if err != nil { + sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to parse blob digest") + + return + } + + blobPath := sm.tempStore.BlobPath(dgst) + _, err = os.Stat(blobPath) if err != nil { if os.IsNotExist(err) { return @@ -285,7 +292,7 @@ func (sm *ChunkingStreamManager) waitForClientDrainAndDeleteStream(blobDigest st return } - err = os.Remove(sm.tempStore.BlobPath(godigest.FromString(blobDigest))) + err = os.Remove(sm.tempStore.BlobPath(dgst)) if err != nil { sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to remove blob from temp store") } diff --git a/test/blackbox/ci.sh b/test/blackbox/ci.sh index 171c9484..2ad502d3 100755 --- a/test/blackbox/ci.sh +++ b/test/blackbox/ci.sh @@ -19,7 +19,7 @@ tests=("pushpull" "pushpull_authn" "delete_images" "referrers" "metadata" "anony "annotations" "detect_manifest_collision" "cve" "sync" "sync_docker" "sync_replica_cluster" "scrub" "garbage_collect" "metrics" "metrics_minimal" "multiarch_index" "docker_compat" "redis_local" "redis_session_store" "events_nats" "events_http" "events_nats_lint_failure" "events_http_lint_failure" "events_sink_failure" "events_config_decoding" - "fips140" "fips140_authn" "openid_claim_mapping" "upgrade" "upgrade_minimal" "dynamic_tls" "quota", "sync_streaming") + "fips140" "fips140_authn" "openid_claim_mapping" "upgrade" "upgrade_minimal" "dynamic_tls" "quota" "sync_streaming") for test in ${tests[*]}; do ${BATS} ${BATS_FLAGS} ${SCRIPTPATH}/${test}.bats > ${test}.log & pids+=($!)