diff --git a/pkg/api/routes.go b/pkg/api/routes.go index b0672fec..044da335 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1503,30 +1503,30 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrBlobNotFound) { rh.c.Log.Info().Msg("blob was not found. Connecting client to stream") - copier, err := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response) - if err != nil { - if !errors.Is(err, zerr.ErrBlobNotFoundInActiveStreams) { - rh.c.Log.Error().Err(err).Str("digest", digest.String()).Msg("failed to connect client to stream") + copier, clientConnErr := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response) + if clientConnErr != nil { + if !errors.Is(clientConnErr, zerr.ErrBlobNotFoundInActiveStreams) { + rh.c.Log.Error().Err(clientConnErr).Str("digest", digest.String()).Msg("failed to connect client to stream") response.WriteHeader(http.StatusInternalServerError) return } - } + } else { + clientCopyErr := copier.Copy() + if clientCopyErr != nil { + rh.c.Log.Error().Err(clientCopyErr).Str("digest", digest.String()).Msg("unexpected error during stream copy") + response.WriteHeader(http.StatusInternalServerError) - err = copier.Copy() - if err != nil { - rh.c.Log.Error().Err(err).Str("digest", digest.String()).Msg("unexpected error during stream copy") - response.WriteHeader(http.StatusInternalServerError) + return + } + + response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10)) + response.Header().Set(constants.DistContentDigestKey, digest.String()) + response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType) + response.WriteHeader(http.StatusOK) return } - - response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10)) - response.Header().Set(constants.DistContentDigestKey, digest.String()) - response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType) - response.WriteHeader(http.StatusOK) - - return } } diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index 69cdb7c5..d1afe5fd 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -1,7 +1,6 @@ package sync import ( - "bytes" "errors" "io" "os" @@ -9,42 +8,40 @@ import ( "github.com/regclient/regclient/types/blob" - zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/log" ) -// ChunkedBlobReader is a helper that splits a blob into chunks based on chunkSize -// It then copies chunks to disk. -// The latest chunk number is announced to channels of subscribers. +// ChunkedBlobReader is a helper that copies blobs to disk +// and keeps track of clients that are being served the blob. +// The latest byte number is announced to channels of subscribers. type ChunkedBlobReader struct { - numChunksTotal int64 - numChunksRead int64 - chunkSizeBytes int64 - onDiskPath string - onDiskFile *os.File + numBytesTotal int64 + numBytesReadToDisk int64 + bytesMu sync.RWMutex + + onDiskPath string + onDiskFile *os.File InFlightReader *blob.BReader - clientMu sync.Mutex + clientMu sync.RWMutex clientCond *sync.Cond - chunksMu sync.RWMutex clients map[int]chan int64 numClientsTotal int logger log.Logger } -func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Logger) (*ChunkedBlobReader, error) { +func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobReader, error) { createdFile, err := os.OpenFile(onDiskPath, os.O_CREATE|os.O_WRONLY, 0o644) if err != nil { return nil, err } cbr := &ChunkedBlobReader{ - clients: make(map[int]chan int64), - logger: logger, - onDiskPath: onDiskPath, - onDiskFile: createdFile, - chunkSizeBytes: chunkSizeBytes, + clients: make(map[int]chan int64), + logger: logger, + onDiskPath: onDiskPath, + onDiskFile: createdFile, } cbr.clientCond = sync.NewCond(&cbr.clientMu) @@ -52,65 +49,90 @@ func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Lo return cbr, nil } -func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) { +// InitReader initializes sets the regclient blob reader +// and the total number of bytes to read for the blob. +func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, numBytesTotal int64) { + cbr.bytesMu.Lock() + defer cbr.bytesMu.Unlock() + if cbr.InFlightReader == nil { - cbr.numChunksTotal = numChunksTotal - cbr.InFlightReader = r + cbr.numBytesTotal = numBytesTotal + cbr.InFlightReader = blobReader } } func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { - if cbr.InFlightReader == nil { - return 0, zerr.ErrStreamReaderNotInitialized - } + // InitReader is called inside the regclient callback + // When Read is called the reader will always be initialized. + cbr.bytesMu.Lock() - cbr.chunksMu.Lock() - - // Access the file that regclient is writing to avoid this extra duplication. - var internalBuffBytes []byte = make([]byte, 0, cbr.chunkSizeBytes) - internalBuff := bytes.NewBuffer(internalBuffBytes) - - multiWriter := io.MultiWriter(cbr.onDiskFile, internalBuff) - - numBytesRead, err := io.CopyN(multiWriter, cbr.InFlightReader, cbr.chunkSizeBytes) + n, err := io.ReadFull(cbr.InFlightReader, buff) if err != nil { - if !errors.Is(err, io.EOF) { + if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) { // upstream download error - cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader") - cbr.chunksMu.Unlock() + cbr.logger.Error().Err(err).Msg("failed to read from in flight reader") + cbr.bytesMu.Unlock() + + cbr.clientMu.RLock() + clients := cbr.clients + cbr.clientMu.RUnlock() // drain all clients and close their channels - for clientId := range cbr.clients { + for clientId := range clients { cbr.Unsubscribe(clientId) } return -1, err } + // partial read at end of stream; normalise to EOF for callers + err = io.EOF } - copy(buff, internalBuff.Bytes()) + if n > 0 { + if _, werr := cbr.onDiskFile.Write(buff[:n]); werr != nil { + cbr.logger.Error().Err(werr).Msg("failed to write blob data to disk") + cbr.bytesMu.Unlock() - cbr.numChunksRead++ - if cbr.numChunksRead == cbr.numChunksTotal { - cbr.onDiskFile.Close() + return -1, werr + } + + cbr.numBytesReadToDisk += int64(n) } - cbr.chunksMu.Unlock() + if cbr.numBytesReadToDisk >= cbr.numBytesTotal { + clsErr := cbr.onDiskFile.Close() + if clsErr != nil { + cbr.logger.Error().Err(clsErr).Msg("failed to close on disk file") + } + } + + numBytesRead := cbr.numBytesReadToDisk + cbr.bytesMu.Unlock() cbr.clientMu.Lock() - // Update all clients about the new chunk - // Clients always read the chunk from disk + // Update all clients about the latest byte offset available on disk. var wg sync.WaitGroup for _, c := range cbr.clients { wg.Go(func() { - c <- cbr.numChunksRead + c <- numBytesRead }) } - cbr.clientMu.Unlock() - wg.Wait() - return int(numBytesRead), err + cbr.clientMu.Unlock() + + // If the reader has finished reading the blob, close all clients. + if err == io.EOF { + cbr.clientMu.RLock() + clients := cbr.clients + cbr.clientMu.RUnlock() + + for clientId := range clients { + cbr.Unsubscribe(clientId) + } + } + + return n, err } // Subscribe to the reader each time a new client is interested in the current blob, @@ -128,11 +150,11 @@ func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int { // Announce the current number of available chunks to the new client only if the reader is initialized if cbr.InFlightReader != nil { - cbr.chunksMu.RLock() - defer cbr.chunksMu.RUnlock() + cbr.bytesMu.RLock() + defer cbr.bytesMu.RUnlock() go func() { - channel <- cbr.numChunksRead + channel <- cbr.numBytesReadToDisk }() } diff --git a/pkg/extensions/sync/inflight_blob_copier.go b/pkg/extensions/sync/inflight_blob_copier.go index 4ef77ac6..507a3481 100644 --- a/pkg/extensions/sync/inflight_blob_copier.go +++ b/pkg/extensions/sync/inflight_blob_copier.go @@ -11,29 +11,28 @@ import ( ) // InFlightBlobCopier represents a client that wants to stream an image while it is being downloaded. -// The data is copied first from disk up to the latest chunk and further copies wait for an announcement -// over a channel when a new chunk is available. +// The data is copied first from disk up to the latest byte and further copies wait for an announcement +// over a channel when a new set of bytes are available. +// Announcements are made after a set of bytes are copied to the disk. type InFlightBlobCopier struct { sync.Mutex - numChunksCopied int64 - Source *ChunkedBlobReader - onDiskPath string - dest io.Writer - log log.Logger - chunkSizeBytes int64 + numBytesCopied int64 + Source *ChunkedBlobReader + onDiskPath string + dest io.Writer + log log.Logger } func NewInFlightBlobCopier( - source *ChunkedBlobReader, onDiskPath string, dest io.Writer, chunkSizeBytes int64, logger log.Logger, + source *ChunkedBlobReader, onDiskPath string, dest io.Writer, logger log.Logger, ) *InFlightBlobCopier { return &InFlightBlobCopier{ - numChunksCopied: 0, - Source: source, - dest: dest, - onDiskPath: onDiskPath, - chunkSizeBytes: chunkSizeBytes, - log: logger, + numBytesCopied: 0, + Source: source, + dest: dest, + onDiskPath: onDiskPath, + log: logger, } } @@ -42,34 +41,40 @@ func (ifbc *InFlightBlobCopier) Copy() error { onDiskFile, err := os.Open(ifbc.onDiskPath) if err != nil { - ifbc.log.Error().Err(err).Msg("failed to open on disk path") + ifbc.log.Error().Err(err).Str("onDiskPath", ifbc.onDiskPath).Msg("failed to open on disk path") return err } defer onDiskFile.Close() - // Register channel for latest chunk count updates - chunkChan := make(chan int64, 1) + // Register channel for latest byte count updates + byteAnnounceChan := make(chan int64, 1) - id := ifbc.Source.Subscribe(chunkChan) + id := ifbc.Source.Subscribe(byteAnnounceChan) defer ifbc.Source.Unsubscribe(id) for { - latestChunkNum, ok := <-chunkChan + latestByteNum, ok := <-byteAnnounceChan if !ok { - ifbc.log.Error().Str("onDiskPath", ifbc.onDiskPath).Msg("failed to download from upstream, aborting inflight copy") + ifbc.log.Error().Str("onDiskPath", ifbc.onDiskPath). + Msg("failed to download from upstream, aborting inflight copy") return zerr.ErrSyncUpstreamDownloadFailed } ifbc.Lock() - if latestChunkNum <= ifbc.numChunksCopied { + + // If somehow, the copier receives an announcement for a byte number + // that has already been copied, skip the copy and wait for the next announcement. + if latestByteNum <= ifbc.numBytesCopied { ifbc.Unlock() continue } - _, err = io.CopyN(ifbc.dest, onDiskFile, (latestChunkNum-ifbc.numChunksCopied)*ifbc.chunkSizeBytes) + // As the blob size is known ahead of time, CopyN is not expected + // to encounter a partial read if the onDiskFile is healthy. + _, err = io.CopyN(ifbc.dest, onDiskFile, latestByteNum-ifbc.numBytesCopied) if err != nil { if !errors.Is(err, io.EOF) { ifbc.log.Error().Err(err).Msg("failed to copy data to downstream client") @@ -77,10 +82,10 @@ func (ifbc *InFlightBlobCopier) Copy() error { return err } } - ifbc.numChunksCopied = latestChunkNum + ifbc.numBytesCopied = latestByteNum ifbc.Unlock() - if latestChunkNum == ifbc.Source.numChunksTotal { + if latestByteNum >= ifbc.Source.numBytesTotal { // transfer is complete break } diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 8af5967d..1909433f 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -25,14 +25,13 @@ type StreamManager interface { CachedBlobInfo(blobDigest string) (blen int64, mediaType string, err error) } -const chunkSizeBytes = 32768 - type ChunkingStreamManager struct { tempStore StreamTempStore // activeStreams maps blob digest to the corresponding chunked blob reader // that is currently active and receiving data for that blob. activeStreams map[string]*ChunkedBlobReader - // streamingRefs holds the references to the images that are currently being streamed and their corresponding manifest. + // streamingRefs holds the references to the images that are + // currently being streamed and their corresponding manifest. streamingRefs map[string]manifestpkg.Manifest // blobInfo holds blobs and their corresponding descriptor. blobInfoMap map[string]descriptor.Descriptor @@ -67,7 +66,7 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ return nil, err } - copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, chunkSizeBytes, sm.logger) + copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, sm.logger) sm.logger.Info().Str("blob", blobDigest).Msg("connected client for blob") return copier, nil @@ -95,29 +94,18 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo size := desc.Size // This expects the chunked blob reader to be initialized and ready - // as the code here only supplies the reader and the chunk count + // as the code here only supplies the reader and the number of bytes. chunkingReader, ok := sm.activeStreams[digest] if !ok { return nil, zerr.ErrBlobReaderMissing } - chunkingReader.InitReader(reader, chunkCount(size, chunkSizeBytes)) - sm.logger.Info().Str("blob", digest).Msg("finished init chunked blob reader") + chunkingReader.InitReader(reader, size) + sm.logger.Debug().Str("blob", digest).Msg("finished init chunked blob reader") return chunkingReader.ToBReader(), nil } -func chunkCount(blobSize int64, chunkSizeBytes int64) int64 { - chunkCount := blobSize / chunkSizeBytes - remainder := blobSize % chunkSizeBytes - - if remainder > 0 { - chunkCount++ - } - - return chunkCount -} - func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descriptor.Descriptor) error { _, ok := sm.activeStreams[descriptor.Digest.String()] if ok { @@ -126,7 +114,7 @@ func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descripto return nil } - r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), chunkSizeBytes, sm.logger) + r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.logger) if err != nil { return err }