From 80b1712b6262e1466856358872c343d327873c5d Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Wed, 20 May 2026 08:53:17 +0530 Subject: [PATCH] feat(sync): fix review comments Signed-off-by: Vishwas Rajashekar --- pkg/extensions/sync/inflight_blob_copier.go | 2 +- pkg/extensions/sync/service.go | 3 ++- pkg/extensions/sync/stream_manager.go | 10 ++++++++++ pkg/extensions/sync/sync_internal_test.go | 2 +- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/pkg/extensions/sync/inflight_blob_copier.go b/pkg/extensions/sync/inflight_blob_copier.go index b716b9c2..331ca8d7 100644 --- a/pkg/extensions/sync/inflight_blob_copier.go +++ b/pkg/extensions/sync/inflight_blob_copier.go @@ -82,7 +82,7 @@ func (ifbc *InFlightBlobCopier) Copy() error { ifbc.numBytesCopied = latestByteNum ifbc.Unlock() - if latestByteNum >= ifbc.Source.numBytesTotal { + if latestByteNum >= ifbc.Source.Descriptor().Size { // transfer is complete break } diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index 39c3cdfc..279428e0 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -535,7 +535,8 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot copyOpts := []regclient.ImageOpts{} - if service.config.Stream != nil && *service.config.Stream && service.streamManager != nil { + // When streaming is enabled, all blobs are read through the streaming reader. + if service.config.IsStreamEnabled() { service.log.Debug().Str("repo", localRepo).Str("reference", remoteImageRef.Tag). Msg("streaming is enabled. Enabling reader hook") copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader)) diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 7b9817dc..704e049f 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -147,6 +147,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, sm.logger.Error().Err(err).Str("blob", manifest.GetDescriptor().Digest.String()). Msg("failed to prepare active stream for blob") + delete(sm.streamingRefs, key) + return err } @@ -164,6 +166,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()). Msg("failed to get config descriptor from manifest") + delete(sm.streamingRefs, key) + return err } @@ -171,6 +175,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, if err != nil { sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).Msg("failed to prepare active stream for blob") + delete(sm.streamingRefs, key) + return err } @@ -179,6 +185,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, if err != nil { sm.logger.Error().Err(err).Msg("failed to get layers from manifest") + delete(sm.streamingRefs, key) + return err } @@ -187,6 +195,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, if err != nil { sm.logger.Error().Err(err).Str("blob", layer.Digest.String()).Msg("failed to prepare active stream for blob") + delete(sm.streamingRefs, key) + return err } } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index ede4251b..034e72ce 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -1693,7 +1693,7 @@ func TestBaseServiceIsStreamingForRepo(t *testing.T) { streamEnabled := true streamDisabled := false - Convey("returns false when Stream is not configured)", t, func() { + Convey("returns false when Stream is not configured", t, func() { conf := syncconf.RegistryConfig{ URLs: []string{"http://localhost"}, }