mirror of
https://github.com/project-zot/zot.git
synced 2026-06-18 13:37:57 +08:00
feat(sync): fix review comments
Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
+8
-10
@@ -1182,9 +1182,8 @@ func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.
|
||||
|
||||
return err
|
||||
}
|
||||
blen := blobSize
|
||||
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(blobSize, 10))
|
||||
response.Header().Set("Accept-Ranges", "bytes")
|
||||
response.Header().Set("Content-Type", blobMediaType)
|
||||
response.Header().Set(constants.DistContentDigestKey, digest)
|
||||
@@ -1512,19 +1511,18 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
return
|
||||
}
|
||||
} else {
|
||||
clientCopyErr := copier.Copy()
|
||||
if clientCopyErr != nil {
|
||||
rh.c.Log.Error().Err(clientCopyErr).Str("digest", digest.String()).Msg("unexpected error during stream copy")
|
||||
response.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10))
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
response.Header().Set("Content-Type", copier.Source.InFlightReader.GetDescriptor().MediaType)
|
||||
response.WriteHeader(http.StatusOK)
|
||||
|
||||
clientCopyErr := copier.Copy()
|
||||
if clientCopyErr != nil {
|
||||
rh.c.Log.Error().Err(clientCopyErr).Str("digest", digest.String()).Msg("unexpected error during stream copy")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -13,7 +13,8 @@ type Credentials struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
Enable *bool
|
||||
Enable *bool
|
||||
// Stream is set to true when it is desired to stream blobs to clients as they are being synced to zot.
|
||||
Stream *bool
|
||||
CredentialsFile string
|
||||
/* DownloadDir is needed only in case of using cloud based storages
|
||||
|
||||
@@ -49,8 +49,7 @@ func NewChunkedBlobReader(onDiskPath string, logger log.Logger) (*ChunkedBlobRea
|
||||
return cbr, nil
|
||||
}
|
||||
|
||||
// InitReader initializes sets the regclient blob reader
|
||||
// and the total number of bytes to read for the blob.
|
||||
// InitReader sets the regclient blob reader and the total number of bytes to read for the blob.
|
||||
func (cbr *ChunkedBlobReader) InitReader(blobReader *blob.BReader, numBytesTotal int64) {
|
||||
cbr.bytesMu.Lock()
|
||||
defer cbr.bytesMu.Unlock()
|
||||
@@ -137,28 +136,29 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
|
||||
|
||||
// Subscribe to the reader each time a new client is interested in the current blob,
|
||||
// the client would create a subscription here with a channel where latest chunk info is sent.
|
||||
func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int {
|
||||
func (cbr *ChunkedBlobReader) Subscribe() (chan int64, int) {
|
||||
cbr.clientMu.Lock()
|
||||
defer func() {
|
||||
cbr.clientCond.Broadcast()
|
||||
cbr.clientMu.Unlock()
|
||||
}()
|
||||
|
||||
channel := make(chan int64, 1)
|
||||
|
||||
cbr.clients[cbr.numClientsTotal] = channel
|
||||
chanId := cbr.numClientsTotal
|
||||
cbr.numClientsTotal++
|
||||
|
||||
cbr.bytesMu.RLock()
|
||||
defer cbr.bytesMu.RUnlock()
|
||||
// Announce the current number of available chunks to the new client only if the reader is initialized
|
||||
if cbr.InFlightReader != nil {
|
||||
cbr.bytesMu.RLock()
|
||||
defer cbr.bytesMu.RUnlock()
|
||||
|
||||
go func() {
|
||||
channel <- cbr.numBytesReadToDisk
|
||||
}()
|
||||
}
|
||||
|
||||
return chanId
|
||||
return channel, chanId
|
||||
}
|
||||
|
||||
func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) {
|
||||
|
||||
@@ -47,10 +47,7 @@ func (ifbc *InFlightBlobCopier) Copy() error {
|
||||
}
|
||||
defer onDiskFile.Close()
|
||||
|
||||
// Register channel for latest byte count updates
|
||||
byteAnnounceChan := make(chan int64, 1)
|
||||
|
||||
id := ifbc.Source.Subscribe(byteAnnounceChan)
|
||||
byteAnnounceChan, id := ifbc.Source.Subscribe()
|
||||
defer ifbc.Source.Unsubscribe(id)
|
||||
|
||||
for {
|
||||
|
||||
@@ -106,21 +106,21 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo
|
||||
return chunkingReader.ToBReader(), nil
|
||||
}
|
||||
|
||||
func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descriptor.Descriptor) error {
|
||||
_, ok := sm.activeStreams[descriptor.Digest.String()]
|
||||
func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(desc descriptor.Descriptor) error {
|
||||
_, ok := sm.activeStreams[desc.Digest.String()]
|
||||
if ok {
|
||||
sm.logger.Warn().Str("blob", descriptor.Digest.String()).Msg("active stream already exists for blob")
|
||||
sm.logger.Warn().Str("blob", desc.Digest.String()).Msg("active stream already exists for blob")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.logger)
|
||||
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(desc.Digest), sm.logger)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
sm.activeStreams[descriptor.Digest.String()] = r
|
||||
sm.blobInfoMap[descriptor.Digest.String()] = descriptor
|
||||
sm.activeStreams[desc.Digest.String()] = r
|
||||
sm.blobInfoMap[desc.Digest.String()] = desc
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -273,8 +273,15 @@ func (sm *ChunkingStreamManager) waitForClientDrainAndDeleteStream(blobDigest st
|
||||
delete(sm.activeStreams, blobDigest)
|
||||
delete(sm.blobInfoMap, blobDigest)
|
||||
|
||||
blobPath := sm.tempStore.BlobPath(godigest.FromString(blobDigest))
|
||||
_, err := os.Stat(blobPath)
|
||||
dgst, err := godigest.Parse(blobDigest)
|
||||
if err != nil {
|
||||
sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to parse blob digest")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
blobPath := sm.tempStore.BlobPath(dgst)
|
||||
_, err = os.Stat(blobPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
return
|
||||
@@ -285,7 +292,7 @@ func (sm *ChunkingStreamManager) waitForClientDrainAndDeleteStream(blobDigest st
|
||||
return
|
||||
}
|
||||
|
||||
err = os.Remove(sm.tempStore.BlobPath(godigest.FromString(blobDigest)))
|
||||
err = os.Remove(sm.tempStore.BlobPath(dgst))
|
||||
if err != nil {
|
||||
sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to remove blob from temp store")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user