From f7444abbd477d72cb58c70e969dc515c4b0c82ed Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Wed, 20 May 2026 01:12:09 +0530 Subject: [PATCH] feat(sync): use waiting for descriptor fetch Signed-off-by: Vishwas Rajashekar --- pkg/api/routes.go | 5 +- pkg/extensions/sync/chunked_blob_reader.go | 53 +++++++++--- .../sync/chunked_blob_reader_internal_test.go | 85 ++++++++++++++++--- .../inflight_blob_copier_internal_test.go | 12 ++- pkg/extensions/sync/stream_manager.go | 5 +- test/blackbox/sync_streaming.bats | 2 +- 6 files changed, 129 insertions(+), 33 deletions(-) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 97a74120..a64bc436 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1508,9 +1508,10 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ return } } else { - response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10)) + desc := copier.Source.Descriptor() + response.Header().Set("Content-Length", strconv.FormatInt(desc.Size, 10)) response.Header().Set(constants.DistContentDigestKey, digest.String()) - response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType) + response.Header().Set("Content-Type", desc.MediaType) response.WriteHeader(http.StatusOK) clientCopyErr := copier.Copy() diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index c40a88c0..63f19ae4 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/regclient/regclient/types/blob" + "github.com/regclient/regclient/types/descriptor" "zotregistry.dev/zot/v2/pkg/log" ) @@ -18,11 +19,13 @@ type ChunkedBlobReader struct { numBytesTotal int64 numBytesReadToDisk int64 bytesMu sync.RWMutex + readerReady chan struct{} + blobDesc descriptor.Descriptor onDiskPath string onDiskFile *os.File - InFlightReader *blob.BReader + inFlightReader *blob.BReader clientMu sync.RWMutex clientCond *sync.Cond clients map[int]chan int64 @@ -38,10 +41,11 @@ func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobRea } cbr := &ChunkedBlobReader{ - clients: make(map[int]chan int64), - logger: logger, - onDiskPath: onDiskPath, - onDiskFile: createdFile, + clients: make(map[int]chan int64), + logger: logger, + onDiskPath: onDiskPath, + onDiskFile: createdFile, + readerReady: make(chan struct{}), } cbr.clientCond = sync.NewCond(&cbr.clientMu) @@ -49,14 +53,37 @@ func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobRea return cbr, nil } +// Descriptor returns the descriptor of the blob being read. +// If the descriptor is not yet available, it waits until it is set by InitReader. +func (cbr *ChunkedBlobReader) Descriptor() descriptor.Descriptor { + cbr.bytesMu.RLock() + if cbr.inFlightReader != nil { + desc := cbr.blobDesc + cbr.bytesMu.RUnlock() + + return desc + } + cbr.bytesMu.RUnlock() + + // Block without holding any lock until InitReader signals readiness. + <-cbr.readerReady + + cbr.bytesMu.RLock() + defer cbr.bytesMu.RUnlock() + + return cbr.blobDesc +} + // InitReader sets the regclient blob reader and the total number of bytes to read for the blob. -func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, numBytesTotal int64) { +func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, desc descriptor.Descriptor) { cbr.bytesMu.Lock() defer cbr.bytesMu.Unlock() - if cbr.InFlightReader == nil { - cbr.numBytesTotal = numBytesTotal - cbr.InFlightReader = blobReader + if cbr.inFlightReader == nil { + cbr.numBytesTotal = desc.Size + cbr.inFlightReader = blobReader + cbr.blobDesc = desc + close(cbr.readerReady) } } @@ -65,7 +92,7 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { // When Read is called the reader will always be initialized. cbr.bytesMu.Lock() - n, err := io.ReadFull(cbr.InFlightReader, buff) + n, err := io.ReadFull(cbr.inFlightReader, buff) if err != nil { if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { // upstream download error @@ -160,7 +187,7 @@ func (cbr *ChunkedBlobReader) Subscribe() (chan int64, int) { // the reader is initialized. Send synchronously while clientMu is held so // that Unsubscribe cannot close the channel between the map insertion above // and this send. - if cbr.InFlightReader != nil { + if cbr.inFlightReader != nil { channel <- cbr.numBytesReadToDisk } @@ -185,8 +212,8 @@ func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) { func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader { return blob.NewReader( - blob.WithHeader(cbr.InFlightReader.RawHeaders()), - blob.WithDesc(cbr.InFlightReader.GetDescriptor()), + blob.WithHeader(cbr.inFlightReader.RawHeaders()), + blob.WithDesc(cbr.inFlightReader.GetDescriptor()), blob.WithReader(cbr), ) } diff --git a/pkg/extensions/sync/chunked_blob_reader_internal_test.go b/pkg/extensions/sync/chunked_blob_reader_internal_test.go index aba487a1..54ccd3cd 100644 --- a/pkg/extensions/sync/chunked_blob_reader_internal_test.go +++ b/pkg/extensions/sync/chunked_blob_reader_internal_test.go @@ -69,21 +69,21 @@ func TestInitReader(t *testing.T) { reader := newTestBReader(data) Convey("sets the in-flight reader and total bytes", func() { - So(cbr.InFlightReader, ShouldBeNil) + So(cbr.inFlightReader, ShouldBeNil) - cbr.InitReader(reader, int64(len(data))) + cbr.InitReader(reader, reader.GetDescriptor()) - So(cbr.InFlightReader, ShouldEqual, reader) + So(cbr.inFlightReader, ShouldEqual, reader) So(cbr.numBytesTotal, ShouldEqual, int64(len(data))) }) Convey("is idempotent — second call does not overwrite first reader", func() { - cbr.InitReader(reader, int64(len(data))) + cbr.InitReader(reader, reader.GetDescriptor()) secondReader := newTestBReader([]byte("other data")) - cbr.InitReader(secondReader, 99) + cbr.InitReader(secondReader, secondReader.GetDescriptor()) - So(cbr.InFlightReader, ShouldEqual, reader) + So(cbr.inFlightReader, ShouldEqual, reader) So(cbr.numBytesTotal, ShouldEqual, int64(len(data))) }) }) @@ -97,7 +97,8 @@ func TestRead(t *testing.T) { So(err, ShouldBeNil) data := []byte("hello world") - cbr.InitReader(newTestBReader(data), int64(len(data))) + testBReader := newTestBReader(data) + cbr.InitReader(testBReader, testBReader.GetDescriptor()) Convey("reads all data and writes it to disk", func() { buf := make([]byte, len(data)) @@ -220,7 +221,7 @@ func TestRead(t *testing.T) { return 0, zerr.ErrSyncUpstreamDownloadFailed })), ) - errCBR.InitReader(errReader, 100) + errCBR.InitReader(errReader, errReader.GetDescriptor()) buf := make([]byte, 50) n, readErr := errCBR.Read(buf) @@ -255,7 +256,8 @@ func TestSubscribeUnsubscribe(t *testing.T) { Convey("Subscribe sends current byte offset when reader is already initialized", func() { data := []byte("preloaded") - cbr.InitReader(newTestBReader(data), int64(len(data))) + testBReader := newTestBReader(data) + cbr.InitReader(testBReader, testBReader.GetDescriptor()) // Manually advance numBytesReadToDisk to simulate partial read. cbr.bytesMu.Lock() @@ -382,7 +384,7 @@ func TestToBReader(t *testing.T) { data := []byte("to-breader test data") original := newTestBReader(data) - cbr.InitReader(original, int64(len(data))) + cbr.InitReader(original, original.GetDescriptor()) br := cbr.ToBReader() So(br, ShouldNotBeNil) @@ -393,6 +395,69 @@ func TestToBReader(t *testing.T) { }) } +func TestDescriptor(t *testing.T) { + Convey("Descriptor", t, func() { + dir := t.TempDir() + cbr, err := NewChunkedBlobReader(filepath.Join(dir, "blob.bin"), log.NewTestLogger()) + So(err, ShouldBeNil) + defer cbr.onDiskFile.Close() + + data := []byte("descriptor test data") + testBReader := newTestBReader(data) + expectedDesc := testBReader.GetDescriptor() + + Convey("returns descriptor immediately when reader is already initialized", func() { + cbr.InitReader(testBReader, expectedDesc) + + desc := cbr.Descriptor() + So(desc.Digest, ShouldEqual, expectedDesc.Digest) + So(desc.Size, ShouldEqual, expectedDesc.Size) + }) + + Convey("blocks until InitReader is called and returns the correct descriptor", func() { + result := make(chan descriptor.Descriptor, 1) + + go func() { + result <- cbr.Descriptor() + }() + + // Give the goroutine time to block on readerReady. + // It must not have returned yet since InitReader has not been called. + select { + case <-result: + So("Descriptor returned before InitReader was called", ShouldBeEmpty) + default: + } + + cbr.InitReader(testBReader, expectedDesc) + + desc := <-result + So(desc.Digest, ShouldEqual, expectedDesc.Digest) + So(desc.Size, ShouldEqual, expectedDesc.Size) + }) + + Convey("multiple concurrent callers all receive the descriptor", func() { + const numCallers = 5 + + results := make([]chan descriptor.Descriptor, numCallers) + for i := range results { + results[i] = make(chan descriptor.Descriptor, 1) + go func(ch chan descriptor.Descriptor) { + ch <- cbr.Descriptor() + }(results[i]) + } + + cbr.InitReader(testBReader, expectedDesc) + + for _, ch := range results { + desc := <-ch + So(desc.Digest, ShouldEqual, expectedDesc.Digest) + So(desc.Size, ShouldEqual, expectedDesc.Size) + } + }) + }) +} + type errReaderFunc func(p []byte) (int, error) func (f errReaderFunc) Read(p []byte) (int, error) { diff --git a/pkg/extensions/sync/inflight_blob_copier_internal_test.go b/pkg/extensions/sync/inflight_blob_copier_internal_test.go index 9c9e605c..e08f1581 100644 --- a/pkg/extensions/sync/inflight_blob_copier_internal_test.go +++ b/pkg/extensions/sync/inflight_blob_copier_internal_test.go @@ -26,7 +26,8 @@ func TestInFlightBlobCopierCopy(t *testing.T) { cbr, err := NewChunkedBlobReader(blobPath, log.NewTestLogger()) So(err, ShouldBeNil) - cbr.InitReader(newTestBReader(data), int64(len(data))) + testBReader := newTestBReader(data) + cbr.InitReader(testBReader, testBReader.GetDescriptor()) var dest bytes.Buffer ifbc := NewInFlightBlobCopier(cbr, blobPath, &dest, log.NewTestLogger()) @@ -54,7 +55,8 @@ func TestInFlightBlobCopierCopy(t *testing.T) { cbr, err := NewChunkedBlobReader(blobPath, log.NewTestLogger()) So(err, ShouldBeNil) - cbr.InitReader(newTestBReader(data), int64(len(data))) + testBReader := newTestBReader(data) + cbr.InitReader(testBReader, testBReader.GetDescriptor()) var dest bytes.Buffer ifbc := NewInFlightBlobCopier(cbr, blobPath, &dest, log.NewTestLogger()) @@ -108,7 +110,7 @@ func TestInFlightBlobCopierCopy(t *testing.T) { errCBR, cerr := NewChunkedBlobReader(errPath, log.NewTestLogger()) So(cerr, ShouldBeNil) - errCBR.InitReader(blob.NewReader( + testReader := blob.NewReader( blob.WithDesc(descriptor.Descriptor{ Digest: godigest.FromBytes([]byte("x")), Size: 100, @@ -117,7 +119,9 @@ func TestInFlightBlobCopierCopy(t *testing.T) { blob.WithReader(errReaderFunc(func(p []byte) (int, error) { return 0, zerr.ErrSyncUpstreamDownloadFailed })), - ), 100) + ) + + errCBR.InitReader(testReader, testReader.GetDescriptor()) var dest bytes.Buffer ifbc := NewInFlightBlobCopier(errCBR, errPath, &dest, log.NewTestLogger()) diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index a195e071..7b9817dc 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -91,16 +91,15 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo desc := reader.GetDescriptor() digest := desc.Digest.String() - size := desc.Size // This expects the chunked blob reader to be initialized and ready - // as the code here only supplies the reader and the number of bytes. + // as the code here only supplies the reader and the descriptor. chunkingReader, ok := sm.activeStreams[digest] if !ok { return nil, zerr.ErrBlobReaderMissing } - chunkingReader.InitReader(reader, size) + chunkingReader.InitReader(reader, desc) sm.logger.Debug().Str("blob", digest).Msg("finished init chunked blob reader") return chunkingReader.ToBReader(), nil diff --git a/test/blackbox/sync_streaming.bats b/test/blackbox/sync_streaming.bats index 2e3190b4..3a302145 100644 --- a/test/blackbox/sync_streaming.bats +++ b/test/blackbox/sync_streaming.bats @@ -106,13 +106,13 @@ EOF "extensions": { "sync": { "enable": true, - "stream": true, "registries": [ { "urls": [ "http://localhost:${upstream_port}" ], "onDemand": true, + "stream": true, "tlsVerify": false } ]