feat(sync): fix errors and cleanup code

Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
Vishwas Rajashekar
2026-05-16 20:04:21 +05:30
parent 2fb691cd3b
commit 3adf36a6c7
10 changed files with 58 additions and 88 deletions
+4
View File
@@ -212,4 +212,8 @@ var (
ErrCertificateWatcherAlreadyRunning = errors.New("certificate watcher is already running")
ErrInvalidEndSessionEndpoint = errors.New("end_session_endpoint must be an absolute http(s) URL")
ErrPolicyConditionNotCompiled = errors.New("policy condition not compiled")
ErrStreamManagerNotInitialized = errors.New("stream manager not initialized")
ErrStreamReaderNotInitialized = errors.New("reader not initialized")
ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams")
ErrBlobReaderMissing = errors.New("blob reader missing for this blob")
)
-31
View File
@@ -1,31 +0,0 @@
{
"distSpecVersion": "1.1.1",
"storage": {
"rootDirectory": "./temp/zot"
},
"http": {
"address": "127.0.0.1",
"port": "8080"
},
"log": {
"level": "debug"
},
"extensions": {
"sync": {
"enable": true,
"enableStream": true,
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"maxRetries": 5,
"retryDelay": "30s",
"syncTimeout": "10m"
}
]
}
}
}
+1 -2
View File
@@ -13,8 +13,7 @@
"extensions": {
"sync": {
"enable": true,
"enableStreaming": true,
"streamChunkSizeBytes": 32768,
"stream": true,
"registries": [
{
"urls": [
+31 -22
View File
@@ -1119,23 +1119,28 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
if err != nil {
details := zerr.GetDetails(err)
if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic,dupl // errorslint conflicts with gocritic:IfElseChain
details["digest"] = digest.String()
e := apiErr.NewError(apiErr.DIGEST_INVALID).AddDetail(details)
zcommon.WriteJSON(response, http.StatusBadRequest, apiErr.NewErrorList(e))
} else if errors.Is(err, zerr.ErrRepoNotFound) {
streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response)
if streamErr == nil {
return
extConf := rh.c.Config.CopyExtensionsConfig()
if extConf.IsStreamingEnabled() {
streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response)
if streamErr == nil {
return
}
}
details["name"] = name
e := apiErr.NewError(apiErr.NAME_UNKNOWN).AddDetail(details)
zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e))
} else if errors.Is(err, zerr.ErrBlobNotFound) {
streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response)
if streamErr == nil {
return
extConf := rh.c.Config.CopyExtensionsConfig()
if extConf.IsStreamingEnabled() {
streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response)
if streamErr == nil {
return
}
}
details["digest"] = digest.String()
e := apiErr.NewError(apiErr.BLOB_UNKNOWN).AddDetail(details)
@@ -1165,24 +1170,28 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.ResponseWriter) error {
rh.c.Log.Debug().Str("digest", digest).Msg("checking stream cache for blob existence")
extConf := rh.c.Config.CopyExtensionsConfig()
if extConf.IsStreamingEnabled() {
// when streaming is enabled, the blob might exist in the stream cache
blobSize, blobMediaType, err := rh.c.SyncOnDemand.StreamManager().CachedBlobInfo(digest)
if err != nil {
rh.c.Log.Error().Err(err).Str("digest", digest).Msg("error checking stream cache for blob existence")
streamMgr := rh.c.SyncOnDemand.StreamManager()
if streamMgr == nil {
rh.c.Log.Error().Str("digest", digest).Msg("stream manager is not initialized")
return err
}
blen := blobSize
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
response.Header().Set("Accept-Ranges", "bytes")
response.Header().Set("Content-Type", blobMediaType)
response.Header().Set(constants.DistContentDigestKey, digest)
response.WriteHeader(http.StatusOK)
return zerr.ErrStreamManagerNotInitialized
}
// when streaming is enabled, the blob might exist in the stream cache
blobSize, blobMediaType, err := streamMgr.CachedBlobInfo(digest)
if err != nil {
rh.c.Log.Error().Err(err).Str("digest", digest).Msg("failed to check stream cache for blob existence")
return err
}
blen := blobSize
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
response.Header().Set("Accept-Ranges", "bytes")
response.Header().Set("Content-Type", blobMediaType)
response.Header().Set(constants.DistContentDigestKey, digest)
response.WriteHeader(http.StatusOK)
return nil
}
+1 -1
View File
@@ -139,7 +139,7 @@ func (e *ExtensionConfig) IsStreamingEnabled() bool {
return false
}
return e.Sync != nil && e.Sync.EnableStreaming != nil && *e.Sync.EnableStreaming
return e.Sync != nil && e.Sync.Stream != nil && *e.Sync.Stream
}
// IsScrubEnabled checks if scrub is enabled in this extensions config.
+3 -4
View File
@@ -13,10 +13,9 @@ type Credentials struct {
}
type Config struct {
Enable *bool
EnableStreaming *bool
StreamChunkSizeBytes *int64
CredentialsFile string
Enable *bool
Stream *bool
CredentialsFile string
/* DownloadDir is needed only in case of using cloud based storages
it uses regclient to first copy images into this dir (as oci layout)
and then move them into storage. */
+2 -1
View File
@@ -9,6 +9,7 @@ import (
"github.com/regclient/regclient/types/blob"
zerr "zotregistry.dev/zot/v2/errors"
"zotregistry.dev/zot/v2/pkg/log"
)
@@ -60,7 +61,7 @@ func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64)
func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
if cbr.InFlightReader == nil {
return 0, ErrReaderNotInitialized
return 0, zerr.ErrStreamReaderNotInitialized
}
cbr.chunksMu.Lock()
-10
View File
@@ -1,10 +0,0 @@
package sync
import "errors"
var (
ErrReaderNotInitialized = errors.New("reader not initialized")
ErrManifestNotFoundOnDemandDisabl = errors.New("manifest not found in ondemand disabled")
ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams")
ErrChunkingReaderNotInitialized = errors.New("chunking blob reader not initialized for this blob!")
)
+1 -1
View File
@@ -21,7 +21,7 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string,
}
func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) {
return nil, ErrManifestNotFoundOnDemandDisabl
return manifest.New()
}
func (onDemand *BaseOnDemand) StreamManager() StreamManager {
+15 -16
View File
@@ -25,6 +25,8 @@ type StreamManager interface {
CachedBlobInfo(blobDigest string) (blen int64, mediaType string, err error)
}
const chunkSizeBytes = 32768
type ChunkingStreamManager struct {
tempStore StreamTempStore
// activeStreams maps blob digest to the corresponding chunked blob reader
@@ -33,23 +35,20 @@ type ChunkingStreamManager struct {
// streamingRefs holds the references to the images that are currently being streamed and their corresponding manifest.
streamingRefs map[string]manifestpkg.Manifest
// blobInfo holds blobs and their corresponding descriptor.
blobInfoMap map[string]descriptor.Descriptor
logger log.Logger
streamLock sync.Mutex
chunkSizeBytes int64
blobInfoMap map[string]descriptor.Descriptor
logger log.Logger
streamLock sync.Mutex
}
func NewChunkingStreamManager(config *config.Config, logger log.Logger) *ChunkingStreamManager {
store := NewLocalTempStore(path.Join(config.Storage.RootDirectory, "stream"))
extConf := config.CopyExtensionsConfig()
return &ChunkingStreamManager{
tempStore: store,
activeStreams: map[string]*ChunkedBlobReader{},
streamingRefs: map[string]manifestpkg.Manifest{},
blobInfoMap: map[string]descriptor.Descriptor{},
logger: logger,
chunkSizeBytes: *extConf.Sync.StreamChunkSizeBytes,
tempStore: store,
activeStreams: map[string]*ChunkedBlobReader{},
streamingRefs: map[string]manifestpkg.Manifest{},
blobInfoMap: map[string]descriptor.Descriptor{},
logger: logger,
}
}
@@ -60,7 +59,7 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ
stream, ok := sm.activeStreams[blobDigest]
if !ok {
return nil, ErrBlobNotFoundInActiveStreams
return nil, zerr.ErrBlobNotFoundInActiveStreams
}
dig, err := godigest.Parse(blobDigest)
@@ -68,7 +67,7 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ
return nil, err
}
copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, sm.chunkSizeBytes, sm.logger)
copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, chunkSizeBytes, sm.logger)
sm.logger.Info().Str("blob", blobDigest).Msg("connected client for blob")
return copier, nil
@@ -99,10 +98,10 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo
// as the code here only supplies the reader and the chunk count
chunkingReader, ok := sm.activeStreams[digest]
if !ok {
return nil, ErrChunkingReaderNotInitialized
return nil, zerr.ErrBlobReaderMissing
}
chunkingReader.InitReader(reader, chunkCount(size, sm.chunkSizeBytes))
chunkingReader.InitReader(reader, chunkCount(size, chunkSizeBytes))
sm.logger.Info().Str("blob", digest).Msg("finished init chunked blob reader")
return chunkingReader.ToBReader(), nil
@@ -127,7 +126,7 @@ func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descripto
return nil
}
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.chunkSizeBytes, sm.logger)
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), chunkSizeBytes, sm.logger)
if err != nil {
return err
}