mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
f8ef0ae706
Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
235 lines
5.7 KiB
Go
235 lines
5.7 KiB
Go
package sync
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
|
|
"github.com/regclient/regclient/types/blob"
|
|
"github.com/regclient/regclient/types/descriptor"
|
|
|
|
"zotregistry.dev/zot/v2/pkg/log"
|
|
)
|
|
|
|
// ChunkedBlobReader is a helper that copies blobs to disk
|
|
// and keeps track of clients that are being served the blob.
|
|
// The latest byte number is announced to channels of subscribers.
|
|
type ChunkedBlobReader struct {
|
|
numBytesTotal int64
|
|
numBytesReadToDisk int64
|
|
bytesMu sync.RWMutex
|
|
readerReady chan struct{}
|
|
blobDesc descriptor.Descriptor
|
|
|
|
onDiskPath string
|
|
onDiskFile *os.File
|
|
|
|
inFlightReader *blob.BReader
|
|
clientMu sync.RWMutex
|
|
clientCond *sync.Cond
|
|
clients map[int]chan int64
|
|
nextClientId int
|
|
|
|
logger log.Logger
|
|
}
|
|
|
|
func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobReader, error) {
|
|
createdFile, err := os.OpenFile(onDiskPath, os.O_CREATE|os.O_WRONLY, 0o644)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
cbr := &ChunkedBlobReader{
|
|
clients: make(map[int]chan int64),
|
|
logger: logger,
|
|
onDiskPath: onDiskPath,
|
|
onDiskFile: createdFile,
|
|
readerReady: make(chan struct{}),
|
|
}
|
|
|
|
cbr.clientCond = sync.NewCond(&cbr.clientMu)
|
|
|
|
return cbr, nil
|
|
}
|
|
|
|
// Descriptor returns the descriptor of the blob being read.
|
|
// If the descriptor is not yet available, it waits until it is set by InitReader.
|
|
func (cbr *ChunkedBlobReader) Descriptor() descriptor.Descriptor {
|
|
cbr.bytesMu.RLock()
|
|
if cbr.inFlightReader != nil {
|
|
desc := cbr.blobDesc
|
|
cbr.bytesMu.RUnlock()
|
|
|
|
return desc
|
|
}
|
|
cbr.bytesMu.RUnlock()
|
|
|
|
// Block without holding any lock until InitReader signals readiness.
|
|
<-cbr.readerReady
|
|
|
|
cbr.bytesMu.RLock()
|
|
defer cbr.bytesMu.RUnlock()
|
|
|
|
return cbr.blobDesc
|
|
}
|
|
|
|
// InitReader sets the regclient blob reader and the total number of bytes to read for the blob.
|
|
func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, desc descriptor.Descriptor) {
|
|
cbr.bytesMu.Lock()
|
|
defer cbr.bytesMu.Unlock()
|
|
|
|
if cbr.inFlightReader == nil {
|
|
cbr.numBytesTotal = desc.Size
|
|
cbr.inFlightReader = blobReader
|
|
cbr.blobDesc = desc
|
|
close(cbr.readerReady)
|
|
}
|
|
}
|
|
|
|
func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
|
|
// InitReader is called inside the regclient callback
|
|
// When Read is called the reader will always be initialized.
|
|
cbr.bytesMu.Lock()
|
|
|
|
n, err := io.ReadFull(cbr.inFlightReader, buff)
|
|
if err != nil {
|
|
if !errors.Is(err, io.EOF) && !errors.Is(err, io.ErrUnexpectedEOF) {
|
|
// upstream download error
|
|
cbr.logger.Error().Err(err).Msg("failed to read from in flight reader")
|
|
cbr.bytesMu.Unlock()
|
|
|
|
cbr.clientMu.RLock()
|
|
|
|
clientIDs := make([]int, 0, len(cbr.clients))
|
|
for id := range cbr.clients {
|
|
clientIDs = append(clientIDs, id)
|
|
}
|
|
cbr.clientMu.RUnlock()
|
|
|
|
// drain all clients and close their channels
|
|
for _, clientId := range clientIDs {
|
|
cbr.Unsubscribe(clientId)
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
// partial read at end of stream; normalise to EOF for callers
|
|
err = io.EOF
|
|
}
|
|
|
|
if n > 0 {
|
|
if _, werr := cbr.onDiskFile.Write(buff[:n]); werr != nil {
|
|
cbr.logger.Error().Err(werr).Msg("failed to write blob data to disk")
|
|
cbr.bytesMu.Unlock()
|
|
|
|
return n, werr
|
|
}
|
|
|
|
cbr.numBytesReadToDisk += int64(n)
|
|
}
|
|
|
|
if cbr.numBytesReadToDisk >= cbr.numBytesTotal {
|
|
clsErr := cbr.onDiskFile.Close()
|
|
if clsErr != nil {
|
|
cbr.logger.Error().Err(clsErr).Msg("failed to close on disk file")
|
|
}
|
|
// All bytes have been written to disk; treat as EOF regardless of
|
|
// what io.ReadFull returned. This handles the case where the caller's
|
|
// buffer is exactly the remaining data size and io.ReadFull returns
|
|
// (n, nil) instead of (n, io.ErrUnexpectedEOF).
|
|
err = io.EOF
|
|
}
|
|
|
|
numBytesRead := cbr.numBytesReadToDisk
|
|
cbr.bytesMu.Unlock()
|
|
|
|
cbr.clientMu.Lock()
|
|
// Update all clients about the latest byte offset available on disk.
|
|
var wg sync.WaitGroup
|
|
for _, c := range cbr.clients {
|
|
wg.Go(func() {
|
|
c <- numBytesRead
|
|
})
|
|
}
|
|
wg.Wait()
|
|
|
|
cbr.clientMu.Unlock()
|
|
|
|
// If the reader has finished reading the blob, close all clients.
|
|
if err == io.EOF {
|
|
cbr.clientMu.RLock()
|
|
|
|
clientIDs := make([]int, 0, len(cbr.clients))
|
|
for id := range cbr.clients {
|
|
clientIDs = append(clientIDs, id)
|
|
}
|
|
cbr.clientMu.RUnlock()
|
|
|
|
for _, clientId := range clientIDs {
|
|
cbr.Unsubscribe(clientId)
|
|
}
|
|
}
|
|
|
|
return n, err
|
|
}
|
|
|
|
// Subscribe to the reader each time a new client is interested in the current blob,
|
|
// the client would create a subscription here with a channel where latest bytes info is sent.
|
|
func (cbr *ChunkedBlobReader) Subscribe() (chan int64, int) {
|
|
cbr.clientMu.Lock()
|
|
defer func() {
|
|
cbr.clientCond.Broadcast()
|
|
cbr.clientMu.Unlock()
|
|
}()
|
|
|
|
channel := make(chan int64, 1)
|
|
|
|
cbr.clients[cbr.nextClientId] = channel
|
|
chanId := cbr.nextClientId
|
|
cbr.nextClientId++
|
|
|
|
cbr.bytesMu.RLock()
|
|
defer cbr.bytesMu.RUnlock()
|
|
// Announce the current number of available bytes to the new client only if
|
|
// the reader is initialized. Send synchronously while clientMu is held so
|
|
// that Unsubscribe cannot close the channel between the map insertion above
|
|
// and this send.
|
|
if cbr.inFlightReader != nil {
|
|
channel <- cbr.numBytesReadToDisk
|
|
}
|
|
|
|
return channel, chanId
|
|
}
|
|
|
|
func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) {
|
|
cbr.clientMu.Lock()
|
|
defer func() {
|
|
cbr.clientCond.Broadcast()
|
|
cbr.clientMu.Unlock()
|
|
}()
|
|
|
|
channel, ok := cbr.clients[clientId]
|
|
if ok {
|
|
close(channel)
|
|
delete(cbr.clients, clientId)
|
|
}
|
|
}
|
|
|
|
func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader {
|
|
return blob.NewReader(
|
|
blob.WithHeader(cbr.inFlightReader.RawHeaders()),
|
|
blob.WithDesc(cbr.inFlightReader.GetDescriptor()),
|
|
blob.WithReader(cbr),
|
|
)
|
|
}
|
|
|
|
func (cbr *ChunkedBlobReader) WaitForClientEmpty() {
|
|
cbr.clientMu.Lock()
|
|
defer cbr.clientMu.Unlock()
|
|
|
|
for len(cbr.clients) > 0 {
|
|
cbr.clientCond.Wait()
|
|
}
|
|
}
|