feat(sync): rework concurrency, use bytes instead of chunk

Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
Vishwas Rajashekar
2026-05-17 18:29:58 +05:30
parent fa39761700
commit 4f49cea1a4
4 changed files with 126 additions and 111 deletions
+16 -16
View File
@@ -1503,30 +1503,30 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrBlobNotFound) {
rh.c.Log.Info().Msg("blob was not found. Connecting client to stream")
copier, err := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response)
if err != nil {
if !errors.Is(err, zerr.ErrBlobNotFoundInActiveStreams) {
rh.c.Log.Error().Err(err).Str("digest", digest.String()).Msg("failed to connect client to stream")
copier, clientConnErr := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response)
if clientConnErr != nil {
if !errors.Is(clientConnErr, zerr.ErrBlobNotFoundInActiveStreams) {
rh.c.Log.Error().Err(clientConnErr).Str("digest", digest.String()).Msg("failed to connect client to stream")
response.WriteHeader(http.StatusInternalServerError)
return
}
}
} else {
clientCopyErr := copier.Copy()
if clientCopyErr != nil {
rh.c.Log.Error().Err(clientCopyErr).Str("digest", digest.String()).Msg("unexpected error during stream copy")
response.WriteHeader(http.StatusInternalServerError)
err = copier.Copy()
if err != nil {
rh.c.Log.Error().Err(err).Str("digest", digest.String()).Msg("unexpected error during stream copy")
response.WriteHeader(http.StatusInternalServerError)
return
}
response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10))
response.Header().Set(constants.DistContentDigestKey, digest.String())
response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType)
response.WriteHeader(http.StatusOK)
return
}
response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10))
response.Header().Set(constants.DistContentDigestKey, digest.String())
response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType)
response.WriteHeader(http.StatusOK)
return
}
}
+73 -51
View File
@@ -1,7 +1,6 @@
package sync
import (
"bytes"
"errors"
"io"
"os"
@@ -9,42 +8,40 @@ import (
"github.com/regclient/regclient/types/blob"
zerr "zotregistry.dev/zot/v2/errors"
"zotregistry.dev/zot/v2/pkg/log"
)
// ChunkedBlobReader is a helper that splits a blob into chunks based on chunkSize
// It then copies chunks to disk.
// The latest chunk number is announced to channels of subscribers.
// ChunkedBlobReader is a helper that copies blobs to disk
// and keeps track of clients that are being served the blob.
// The latest byte number is announced to channels of subscribers.
type ChunkedBlobReader struct {
numChunksTotal int64
numChunksRead int64
chunkSizeBytes int64
onDiskPath string
onDiskFile *os.File
numBytesTotal int64
numBytesReadToDisk int64
bytesMu sync.RWMutex
onDiskPath string
onDiskFile *os.File
InFlightReader *blob.BReader
clientMu sync.Mutex
clientMu sync.RWMutex
clientCond *sync.Cond
chunksMu sync.RWMutex
clients map[int]chan int64
numClientsTotal int
logger log.Logger
}
func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Logger) (*ChunkedBlobReader, error) {
func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobReader, error) {
createdFile, err := os.OpenFile(onDiskPath, os.O_CREATE|os.O_WRONLY, 0o644)
if err != nil {
return nil, err
}
cbr := &ChunkedBlobReader{
clients: make(map[int]chan int64),
logger: logger,
onDiskPath: onDiskPath,
onDiskFile: createdFile,
chunkSizeBytes: chunkSizeBytes,
clients: make(map[int]chan int64),
logger: logger,
onDiskPath: onDiskPath,
onDiskFile: createdFile,
}
cbr.clientCond = sync.NewCond(&cbr.clientMu)
@@ -52,65 +49,90 @@ func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Lo
return cbr, nil
}
func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) {
// InitReader initializes sets the regclient blob reader
// and the total number of bytes to read for the blob.
func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, numBytesTotal int64) {
cbr.bytesMu.Lock()
defer cbr.bytesMu.Unlock()
if cbr.InFlightReader == nil {
cbr.numChunksTotal = numChunksTotal
cbr.InFlightReader = r
cbr.numBytesTotal = numBytesTotal
cbr.InFlightReader = blobReader
}
}
func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
if cbr.InFlightReader == nil {
return 0, zerr.ErrStreamReaderNotInitialized
}
// InitReader is called inside the regclient callback
// When Read is called the reader will always be initialized.
cbr.bytesMu.Lock()
cbr.chunksMu.Lock()
// Access the file that regclient is writing to avoid this extra duplication.
var internalBuffBytes []byte = make([]byte, 0, cbr.chunkSizeBytes)
internalBuff := bytes.NewBuffer(internalBuffBytes)
multiWriter := io.MultiWriter(cbr.onDiskFile, internalBuff)
numBytesRead, err := io.CopyN(multiWriter, cbr.InFlightReader, cbr.chunkSizeBytes)
n, err := io.ReadFull(cbr.InFlightReader, buff)
if err != nil {
if !errors.Is(err, io.EOF) {
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
// upstream download error
cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader")
cbr.chunksMu.Unlock()
cbr.logger.Error().Err(err).Msg("failed to read from in flight reader")
cbr.bytesMu.Unlock()
cbr.clientMu.RLock()
clients := cbr.clients
cbr.clientMu.RUnlock()
// drain all clients and close their channels
for clientId := range cbr.clients {
for clientId := range clients {
cbr.Unsubscribe(clientId)
}
return -1, err
}
// partial read at end of stream; normalise to EOF for callers
err = io.EOF
}
copy(buff, internalBuff.Bytes())
if n > 0 {
if _, werr := cbr.onDiskFile.Write(buff[:n]); werr != nil {
cbr.logger.Error().Err(werr).Msg("failed to write blob data to disk")
cbr.bytesMu.Unlock()
cbr.numChunksRead++
if cbr.numChunksRead == cbr.numChunksTotal {
cbr.onDiskFile.Close()
return -1, werr
}
cbr.numBytesReadToDisk += int64(n)
}
cbr.chunksMu.Unlock()
if cbr.numBytesReadToDisk >= cbr.numBytesTotal {
clsErr := cbr.onDiskFile.Close()
if clsErr != nil {
cbr.logger.Error().Err(clsErr).Msg("failed to close on disk file")
}
}
numBytesRead := cbr.numBytesReadToDisk
cbr.bytesMu.Unlock()
cbr.clientMu.Lock()
// Update all clients about the new chunk
// Clients always read the chunk from disk
// Update all clients about the latest byte offset available on disk.
var wg sync.WaitGroup
for _, c := range cbr.clients {
wg.Go(func() {
c <- cbr.numChunksRead
c <- numBytesRead
})
}
cbr.clientMu.Unlock()
wg.Wait()
return int(numBytesRead), err
cbr.clientMu.Unlock()
// If the reader has finished reading the blob, close all clients.
if err == io.EOF {
cbr.clientMu.RLock()
clients := cbr.clients
cbr.clientMu.RUnlock()
for clientId := range clients {
cbr.Unsubscribe(clientId)
}
}
return n, err
}
// Subscribe to the reader each time a new client is interested in the current blob,
@@ -128,11 +150,11 @@ func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int {
// Announce the current number of available chunks to the new client only if the reader is initialized
if cbr.InFlightReader != nil {
cbr.chunksMu.RLock()
defer cbr.chunksMu.RUnlock()
cbr.bytesMu.RLock()
defer cbr.bytesMu.RUnlock()
go func() {
channel <- cbr.numChunksRead
channel <- cbr.numBytesReadToDisk
}()
}
+30 -25
View File
@@ -11,29 +11,28 @@ import (
)
// InFlightBlobCopier represents a client that wants to stream an image while it is being downloaded.
// The data is copied first from disk up to the latest chunk and further copies wait for an announcement
// over a channel when a new chunk is available.
// The data is copied first from disk up to the latest byte and further copies wait for an announcement
// over a channel when a new set of bytes are available.
// Announcements are made after a set of bytes are copied to the disk.
type InFlightBlobCopier struct {
sync.Mutex
numChunksCopied int64
Source *ChunkedBlobReader
onDiskPath string
dest io.Writer
log log.Logger
chunkSizeBytes int64
numBytesCopied int64
Source *ChunkedBlobReader
onDiskPath string
dest io.Writer
log log.Logger
}
func NewInFlightBlobCopier(
source *ChunkedBlobReader, onDiskPath string, dest io.Writer, chunkSizeBytes int64, logger log.Logger,
source *ChunkedBlobReader, onDiskPath string, dest io.Writer, logger log.Logger,
) *InFlightBlobCopier {
return &InFlightBlobCopier{
numChunksCopied: 0,
Source: source,
dest: dest,
onDiskPath: onDiskPath,
chunkSizeBytes: chunkSizeBytes,
log: logger,
numBytesCopied: 0,
Source: source,
dest: dest,
onDiskPath: onDiskPath,
log: logger,
}
}
@@ -42,34 +41,40 @@ func (ifbc *InFlightBlobCopier) Copy() error {
onDiskFile, err := os.Open(ifbc.onDiskPath)
if err != nil {
ifbc.log.Error().Err(err).Msg("failed to open on disk path")
ifbc.log.Error().Err(err).Str("onDiskPath", ifbc.onDiskPath).Msg("failed to open on disk path")
return err
}
defer onDiskFile.Close()
// Register channel for latest chunk count updates
chunkChan := make(chan int64, 1)
// Register channel for latest byte count updates
byteAnnounceChan := make(chan int64, 1)
id := ifbc.Source.Subscribe(chunkChan)
id := ifbc.Source.Subscribe(byteAnnounceChan)
defer ifbc.Source.Unsubscribe(id)
for {
latestChunkNum, ok := <-chunkChan
latestByteNum, ok := <-byteAnnounceChan
if !ok {
ifbc.log.Error().Str("onDiskPath", ifbc.onDiskPath).Msg("failed to download from upstream, aborting inflight copy")
ifbc.log.Error().Str("onDiskPath", ifbc.onDiskPath).
Msg("failed to download from upstream, aborting inflight copy")
return zerr.ErrSyncUpstreamDownloadFailed
}
ifbc.Lock()
if latestChunkNum <= ifbc.numChunksCopied {
// If somehow, the copier receives an announcement for a byte number
// that has already been copied, skip the copy and wait for the next announcement.
if latestByteNum <= ifbc.numBytesCopied {
ifbc.Unlock()
continue
}
_, err = io.CopyN(ifbc.dest, onDiskFile, (latestChunkNum-ifbc.numChunksCopied)*ifbc.chunkSizeBytes)
// As the blob size is known ahead of time, CopyN is not expected
// to encounter a partial read if the onDiskFile is healthy.
_, err = io.CopyN(ifbc.dest, onDiskFile, latestByteNum-ifbc.numBytesCopied)
if err != nil {
if !errors.Is(err, io.EOF) {
ifbc.log.Error().Err(err).Msg("failed to copy data to downstream client")
@@ -77,10 +82,10 @@ func (ifbc *InFlightBlobCopier) Copy() error {
return err
}
}
ifbc.numChunksCopied = latestChunkNum
ifbc.numBytesCopied = latestByteNum
ifbc.Unlock()
if latestChunkNum == ifbc.Source.numChunksTotal {
if latestByteNum >= ifbc.Source.numBytesTotal {
// transfer is complete
break
}
+7 -19
View File
@@ -25,14 +25,13 @@ type StreamManager interface {
CachedBlobInfo(blobDigest string) (blen int64, mediaType string, err error)
}
const chunkSizeBytes = 32768
type ChunkingStreamManager struct {
tempStore StreamTempStore
// activeStreams maps blob digest to the corresponding chunked blob reader
// that is currently active and receiving data for that blob.
activeStreams map[string]*ChunkedBlobReader
// streamingRefs holds the references to the images that are currently being streamed and their corresponding manifest.
// streamingRefs holds the references to the images that are
// currently being streamed and their corresponding manifest.
streamingRefs map[string]manifestpkg.Manifest
// blobInfo holds blobs and their corresponding descriptor.
blobInfoMap map[string]descriptor.Descriptor
@@ -67,7 +66,7 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ
return nil, err
}
copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, chunkSizeBytes, sm.logger)
copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, sm.logger)
sm.logger.Info().Str("blob", blobDigest).Msg("connected client for blob")
return copier, nil
@@ -95,29 +94,18 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo
size := desc.Size
// This expects the chunked blob reader to be initialized and ready
// as the code here only supplies the reader and the chunk count
// as the code here only supplies the reader and the number of bytes.
chunkingReader, ok := sm.activeStreams[digest]
if !ok {
return nil, zerr.ErrBlobReaderMissing
}
chunkingReader.InitReader(reader, chunkCount(size, chunkSizeBytes))
sm.logger.Info().Str("blob", digest).Msg("finished init chunked blob reader")
chunkingReader.InitReader(reader, size)
sm.logger.Debug().Str("blob", digest).Msg("finished init chunked blob reader")
return chunkingReader.ToBReader(), nil
}
func chunkCount(blobSize int64, chunkSizeBytes int64) int64 {
chunkCount := blobSize / chunkSizeBytes
remainder := blobSize % chunkSizeBytes
if remainder > 0 {
chunkCount++
}
return chunkCount
}
func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descriptor.Descriptor) error {
_, ok := sm.activeStreams[descriptor.Digest.String()]
if ok {
@@ -126,7 +114,7 @@ func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descripto
return nil
}
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), chunkSizeBytes, sm.logger)
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.logger)
if err != nil {
return err
}