mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
feat(sync): fix review comments
Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
@@ -82,7 +82,7 @@ func (ifbc *InFlightBlobCopier) Copy() error {
|
||||
ifbc.numBytesCopied = latestByteNum
|
||||
ifbc.Unlock()
|
||||
|
||||
if latestByteNum >= ifbc.Source.numBytesTotal {
|
||||
if latestByteNum >= ifbc.Source.Descriptor().Size {
|
||||
// transfer is complete
|
||||
break
|
||||
}
|
||||
|
||||
@@ -535,7 +535,8 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot
|
||||
|
||||
copyOpts := []regclient.ImageOpts{}
|
||||
|
||||
if service.config.Stream != nil && *service.config.Stream && service.streamManager != nil {
|
||||
// When streaming is enabled, all blobs are read through the streaming reader.
|
||||
if service.config.IsStreamEnabled() {
|
||||
service.log.Debug().Str("repo", localRepo).Str("reference", remoteImageRef.Tag).
|
||||
Msg("streaming is enabled. Enabling reader hook")
|
||||
copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader))
|
||||
|
||||
@@ -147,6 +147,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
|
||||
sm.logger.Error().Err(err).Str("blob", manifest.GetDescriptor().Digest.String()).
|
||||
Msg("failed to prepare active stream for blob")
|
||||
|
||||
delete(sm.streamingRefs, key)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -164,6 +166,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
|
||||
sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).
|
||||
Msg("failed to get config descriptor from manifest")
|
||||
|
||||
delete(sm.streamingRefs, key)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -171,6 +175,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
|
||||
if err != nil {
|
||||
sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).Msg("failed to prepare active stream for blob")
|
||||
|
||||
delete(sm.streamingRefs, key)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -179,6 +185,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
|
||||
if err != nil {
|
||||
sm.logger.Error().Err(err).Msg("failed to get layers from manifest")
|
||||
|
||||
delete(sm.streamingRefs, key)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -187,6 +195,8 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
|
||||
if err != nil {
|
||||
sm.logger.Error().Err(err).Str("blob", layer.Digest.String()).Msg("failed to prepare active stream for blob")
|
||||
|
||||
delete(sm.streamingRefs, key)
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1693,7 +1693,7 @@ func TestBaseServiceIsStreamingForRepo(t *testing.T) {
|
||||
streamEnabled := true
|
||||
streamDisabled := false
|
||||
|
||||
Convey("returns false when Stream is not configured)", t, func() {
|
||||
Convey("returns false when Stream is not configured", t, func() {
|
||||
conf := syncconf.RegistryConfig{
|
||||
URLs: []string{"http://localhost"},
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user