From 38f0e498abe53b41715161e6924d4f86fc3a95a4 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Wed, 20 May 2026 23:15:01 +0530 Subject: [PATCH] feat(sync): address review comments Signed-off-by: Vishwas Rajashekar --- pkg/api/routes.go | 6 +++++ pkg/extensions/extension_sync_test.go | 1 + pkg/extensions/sync/chunked_blob_reader.go | 22 +++++++-------- .../sync/chunked_blob_reader_internal_test.go | 3 +-- test/blackbox/sync_streaming.bats | 27 +++++++------------ 5 files changed, 28 insertions(+), 31 deletions(-) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index a64bc436..61091105 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1176,6 +1176,12 @@ func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http. // when streaming is enabled, the blob might exist in the stream cache blobSize, blobMediaType, err := streamMgr.CachedBlobInfo(digest) if err != nil { + if errors.Is(err, zerr.ErrBlobNotFound) { + rh.c.Log.Debug().Str("digest", digest).Msg("blob not found in stream cache") + + return err + } + rh.c.Log.Error().Err(err).Str("digest", digest).Msg("failed to check stream cache for blob existence") return err diff --git a/pkg/extensions/extension_sync_test.go b/pkg/extensions/extension_sync_test.go index 686799b4..8ce5b0c8 100644 --- a/pkg/extensions/extension_sync_test.go +++ b/pkg/extensions/extension_sync_test.go @@ -27,6 +27,7 @@ func TestEnableSyncExtension_StreamManager(t *testing.T) { metaDB := mocks.MetaDBMock{} storeController := storage.StoreController{} metrics := monitoring.NewMetricsServer(false, logger) + t.Cleanup(metrics.Stop) sch := scheduler.NewScheduler(cfg, metrics, logger) Convey("stream manager is nil when Stream is not set on any registry", func() { diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index 63f19ae4..3ab10741 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -25,11 +25,11 @@ type ChunkedBlobReader struct { onDiskPath string onDiskFile *os.File - inFlightReader *blob.BReader - clientMu sync.RWMutex - clientCond *sync.Cond - clients map[int]chan int64 - numClientsTotal int + inFlightReader *blob.BReader + clientMu sync.RWMutex + clientCond *sync.Cond + clients map[int]chan int64 + nextClientId int logger log.Logger } @@ -108,7 +108,7 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { cbr.Unsubscribe(clientId) } - return -1, err + return n, err } // partial read at end of stream; normalise to EOF for callers err = io.EOF @@ -119,7 +119,7 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { cbr.logger.Error().Err(werr).Msg("failed to write blob data to disk") cbr.bytesMu.Unlock() - return -1, werr + return n, werr } cbr.numBytesReadToDisk += int64(n) @@ -177,9 +177,9 @@ func (cbr *ChunkedBlobReader) Subscribe() (chan int64, int) { channel := make(chan int64, 1) - cbr.clients[cbr.numClientsTotal] = channel - chanId := cbr.numClientsTotal - cbr.numClientsTotal++ + cbr.clients[cbr.nextClientId] = channel + chanId := cbr.nextClientId + cbr.nextClientId++ cbr.bytesMu.RLock() defer cbr.bytesMu.RUnlock() @@ -204,8 +204,6 @@ func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) { channel, ok := cbr.clients[clientId] if ok { close(channel) - - cbr.numClientsTotal-- delete(cbr.clients, clientId) } } diff --git a/pkg/extensions/sync/chunked_blob_reader_internal_test.go b/pkg/extensions/sync/chunked_blob_reader_internal_test.go index 54ccd3cd..1df47b32 100644 --- a/pkg/extensions/sync/chunked_blob_reader_internal_test.go +++ b/pkg/extensions/sync/chunked_blob_reader_internal_test.go @@ -224,9 +224,8 @@ func TestRead(t *testing.T) { errCBR.InitReader(errReader, errReader.GetDescriptor()) buf := make([]byte, 50) - n, readErr := errCBR.Read(buf) + _, readErr := errCBR.Read(buf) So(readErr, ShouldNotBeNil) - So(n, ShouldEqual, -1) // Channel should have been closed. _, open := <-bytesUpdateChan diff --git a/test/blackbox/sync_streaming.bats b/test/blackbox/sync_streaming.bats index 1082455c..a3a573fd 100644 --- a/test/blackbox/sync_streaming.bats +++ b/test/blackbox/sync_streaming.bats @@ -7,22 +7,15 @@ load helpers_wait load ../port_helper function verify_prerequisites() { - if [ ! $(command -v curl) ]; then - echo "you need to install curl as a prerequisite to running the tests" >&3 - return 1 - fi + local ok=0 + for cmd in curl jq skopeo; do + if ! command -v "${cmd}" &>/dev/null; then + echo "you need to install ${cmd} as a prerequisite to running the tests" >&3 + ok=1 + fi + done - if [ ! $(command -v jq) ]; then - echo "you need to install jq as a prerequisite to running the tests" >&3 - return 1 - fi - - if [ ! $(command -v skopeo) ]; then - echo "you need to install skopeo as a prerequisite to running the tests" >&3 - return 1 - fi - - return 0 + return "${ok}" } # delete_repo_from_zot @@ -45,7 +38,7 @@ function delete_repo_from_zot() { curl -s -X DELETE "http://127.0.0.1:${port}/v2/${repo}/manifests/${digest}" >/dev/null # delete blobs from disk - rm -r "${root}/${repo}/blobs" + rm -rf "${root}/${repo}/blobs" } function setup_file() { @@ -125,7 +118,7 @@ EOF local upstream_bin="${BATS_FILE_TMPDIR}/${zot_bin_name}" if [ ! -f "${upstream_bin}" ]; then if ! curl -f -L -o "${upstream_bin}" \ - "https://github.com/project-zot/zot/releases/latest/download/${zot_bin_name}"; then + "https://github.com/project-zot/zot/releases/download/v2.1.17/${zot_bin_name}"; then echo "ERROR: failed to download upstream zot release binary" >&2 exit 1 fi