From 3adf36a6c75ad109c822b8d66cb0d27117fcbb2e Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar Date: Sat, 16 May 2026 20:04:21 +0530 Subject: [PATCH] feat(sync): fix errors and cleanup code Signed-off-by: Vishwas Rajashekar --- errors/errors.go | 4 ++ examples/config-sync-local.json | 31 ------------- examples/config-sync-stream.json | 3 +- pkg/api/routes.go | 53 +++++++++++++--------- pkg/extensions/config/config.go | 2 +- pkg/extensions/config/sync/config.go | 7 ++- pkg/extensions/sync/chunked_blob_reader.go | 3 +- pkg/extensions/sync/errors.go | 10 ---- pkg/extensions/sync/on_demand_disabled.go | 2 +- pkg/extensions/sync/stream_manager.go | 31 ++++++------- 10 files changed, 58 insertions(+), 88 deletions(-) delete mode 100644 examples/config-sync-local.json delete mode 100644 pkg/extensions/sync/errors.go diff --git a/errors/errors.go b/errors/errors.go index 07a4e875..03d38ec7 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -212,4 +212,8 @@ var ( ErrCertificateWatcherAlreadyRunning = errors.New("certificate watcher is already running") ErrInvalidEndSessionEndpoint = errors.New("end_session_endpoint must be an absolute http(s) URL") ErrPolicyConditionNotCompiled = errors.New("policy condition not compiled") + ErrStreamManagerNotInitialized = errors.New("stream manager not initialized") + ErrStreamReaderNotInitialized = errors.New("reader not initialized") + ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams") + ErrBlobReaderMissing = errors.New("blob reader missing for this blob") ) diff --git a/examples/config-sync-local.json b/examples/config-sync-local.json deleted file mode 100644 index 14b7a997..00000000 --- a/examples/config-sync-local.json +++ /dev/null @@ -1,31 +0,0 @@ -{ - "distSpecVersion": "1.1.1", - "storage": { - "rootDirectory": "./temp/zot" - }, - "http": { - "address": "127.0.0.1", - "port": "8080" - }, - "log": { - "level": "debug" - }, - "extensions": { - "sync": { - "enable": true, - "enableStream": true, - "registries": [ - { - "urls": [ - "http://localhost:9000" - ], - "onDemand": true, - "tlsVerify": false, - "maxRetries": 5, - "retryDelay": "30s", - "syncTimeout": "10m" - } - ] - } - } -} \ No newline at end of file diff --git a/examples/config-sync-stream.json b/examples/config-sync-stream.json index 1d02490d..55e29053 100644 --- a/examples/config-sync-stream.json +++ b/examples/config-sync-stream.json @@ -13,8 +13,7 @@ "extensions": { "sync": { "enable": true, - "enableStreaming": true, - "streamChunkSizeBytes": 32768, + "stream": true, "registries": [ { "urls": [ diff --git a/pkg/api/routes.go b/pkg/api/routes.go index d95a340a..e49b6aac 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1119,23 +1119,28 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re if err != nil { details := zerr.GetDetails(err) - if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic,dupl // errorslint conflicts with gocritic:IfElseChain details["digest"] = digest.String() e := apiErr.NewError(apiErr.DIGEST_INVALID).AddDetail(details) zcommon.WriteJSON(response, http.StatusBadRequest, apiErr.NewErrorList(e)) } else if errors.Is(err, zerr.ErrRepoNotFound) { - streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) - if streamErr == nil { - return + extConf := rh.c.Config.CopyExtensionsConfig() + if extConf.IsStreamingEnabled() { + streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) + if streamErr == nil { + return + } } details["name"] = name e := apiErr.NewError(apiErr.NAME_UNKNOWN).AddDetail(details) zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e)) } else if errors.Is(err, zerr.ErrBlobNotFound) { - streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) - if streamErr == nil { - return + extConf := rh.c.Config.CopyExtensionsConfig() + if extConf.IsStreamingEnabled() { + streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response) + if streamErr == nil { + return + } } details["digest"] = digest.String() e := apiErr.NewError(apiErr.BLOB_UNKNOWN).AddDetail(details) @@ -1165,24 +1170,28 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.ResponseWriter) error { rh.c.Log.Debug().Str("digest", digest).Msg("checking stream cache for blob existence") - extConf := rh.c.Config.CopyExtensionsConfig() - if extConf.IsStreamingEnabled() { - // when streaming is enabled, the blob might exist in the stream cache - blobSize, blobMediaType, err := rh.c.SyncOnDemand.StreamManager().CachedBlobInfo(digest) - if err != nil { - rh.c.Log.Error().Err(err).Str("digest", digest).Msg("error checking stream cache for blob existence") + streamMgr := rh.c.SyncOnDemand.StreamManager() + if streamMgr == nil { + rh.c.Log.Error().Str("digest", digest).Msg("stream manager is not initialized") - return err - } - blen := blobSize - - response.Header().Set("Content-Length", strconv.FormatInt(blen, 10)) - response.Header().Set("Accept-Ranges", "bytes") - response.Header().Set("Content-Type", blobMediaType) - response.Header().Set(constants.DistContentDigestKey, digest) - response.WriteHeader(http.StatusOK) + return zerr.ErrStreamManagerNotInitialized } + // when streaming is enabled, the blob might exist in the stream cache + blobSize, blobMediaType, err := streamMgr.CachedBlobInfo(digest) + if err != nil { + rh.c.Log.Error().Err(err).Str("digest", digest).Msg("failed to check stream cache for blob existence") + + return err + } + blen := blobSize + + response.Header().Set("Content-Length", strconv.FormatInt(blen, 10)) + response.Header().Set("Accept-Ranges", "bytes") + response.Header().Set("Content-Type", blobMediaType) + response.Header().Set(constants.DistContentDigestKey, digest) + response.WriteHeader(http.StatusOK) + return nil } diff --git a/pkg/extensions/config/config.go b/pkg/extensions/config/config.go index 792a07ed..3575723f 100644 --- a/pkg/extensions/config/config.go +++ b/pkg/extensions/config/config.go @@ -139,7 +139,7 @@ func (e *ExtensionConfig) IsStreamingEnabled() bool { return false } - return e.Sync != nil && e.Sync.EnableStreaming != nil && *e.Sync.EnableStreaming + return e.Sync != nil && e.Sync.Stream != nil && *e.Sync.Stream } // IsScrubEnabled checks if scrub is enabled in this extensions config. diff --git a/pkg/extensions/config/sync/config.go b/pkg/extensions/config/sync/config.go index c242a0df..da3bcd2b 100644 --- a/pkg/extensions/config/sync/config.go +++ b/pkg/extensions/config/sync/config.go @@ -13,10 +13,9 @@ type Credentials struct { } type Config struct { - Enable *bool - EnableStreaming *bool - StreamChunkSizeBytes *int64 - CredentialsFile string + Enable *bool + Stream *bool + CredentialsFile string /* DownloadDir is needed only in case of using cloud based storages it uses regclient to first copy images into this dir (as oci layout) and then move them into storage. */ diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index 7e3c96d0..bf2ec1c1 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -9,6 +9,7 @@ import ( "github.com/regclient/regclient/types/blob" + zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/log" ) @@ -60,7 +61,7 @@ func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { if cbr.InFlightReader == nil { - return 0, ErrReaderNotInitialized + return 0, zerr.ErrStreamReaderNotInitialized } cbr.chunksMu.Lock() diff --git a/pkg/extensions/sync/errors.go b/pkg/extensions/sync/errors.go deleted file mode 100644 index 7ab1fab6..00000000 --- a/pkg/extensions/sync/errors.go +++ /dev/null @@ -1,10 +0,0 @@ -package sync - -import "errors" - -var ( - ErrReaderNotInitialized = errors.New("reader not initialized") - ErrManifestNotFoundOnDemandDisabl = errors.New("manifest not found in ondemand disabled") - ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams") - ErrChunkingReaderNotInitialized = errors.New("chunking blob reader not initialized for this blob!") -) diff --git a/pkg/extensions/sync/on_demand_disabled.go b/pkg/extensions/sync/on_demand_disabled.go index bde7a786..93b30c73 100644 --- a/pkg/extensions/sync/on_demand_disabled.go +++ b/pkg/extensions/sync/on_demand_disabled.go @@ -21,7 +21,7 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string, } func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { - return nil, ErrManifestNotFoundOnDemandDisabl + return manifest.New() } func (onDemand *BaseOnDemand) StreamManager() StreamManager { diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 410b425b..8742b03f 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -25,6 +25,8 @@ type StreamManager interface { CachedBlobInfo(blobDigest string) (blen int64, mediaType string, err error) } +const chunkSizeBytes = 32768 + type ChunkingStreamManager struct { tempStore StreamTempStore // activeStreams maps blob digest to the corresponding chunked blob reader @@ -33,23 +35,20 @@ type ChunkingStreamManager struct { // streamingRefs holds the references to the images that are currently being streamed and their corresponding manifest. streamingRefs map[string]manifestpkg.Manifest // blobInfo holds blobs and their corresponding descriptor. - blobInfoMap map[string]descriptor.Descriptor - logger log.Logger - streamLock sync.Mutex - chunkSizeBytes int64 + blobInfoMap map[string]descriptor.Descriptor + logger log.Logger + streamLock sync.Mutex } func NewChunkingStreamManager(config *config.Config, logger log.Logger) *ChunkingStreamManager { store := NewLocalTempStore(path.Join(config.Storage.RootDirectory, "stream")) - extConf := config.CopyExtensionsConfig() return &ChunkingStreamManager{ - tempStore: store, - activeStreams: map[string]*ChunkedBlobReader{}, - streamingRefs: map[string]manifestpkg.Manifest{}, - blobInfoMap: map[string]descriptor.Descriptor{}, - logger: logger, - chunkSizeBytes: *extConf.Sync.StreamChunkSizeBytes, + tempStore: store, + activeStreams: map[string]*ChunkedBlobReader{}, + streamingRefs: map[string]manifestpkg.Manifest{}, + blobInfoMap: map[string]descriptor.Descriptor{}, + logger: logger, } } @@ -60,7 +59,7 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ stream, ok := sm.activeStreams[blobDigest] if !ok { - return nil, ErrBlobNotFoundInActiveStreams + return nil, zerr.ErrBlobNotFoundInActiveStreams } dig, err := godigest.Parse(blobDigest) @@ -68,7 +67,7 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ return nil, err } - copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, sm.chunkSizeBytes, sm.logger) + copier := NewInFlightBlobCopier(stream, sm.tempStore.BlobPath(dig), writer, chunkSizeBytes, sm.logger) sm.logger.Info().Str("blob", blobDigest).Msg("connected client for blob") return copier, nil @@ -99,10 +98,10 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo // as the code here only supplies the reader and the chunk count chunkingReader, ok := sm.activeStreams[digest] if !ok { - return nil, ErrChunkingReaderNotInitialized + return nil, zerr.ErrBlobReaderMissing } - chunkingReader.InitReader(reader, chunkCount(size, sm.chunkSizeBytes)) + chunkingReader.InitReader(reader, chunkCount(size, chunkSizeBytes)) sm.logger.Info().Str("blob", digest).Msg("finished init chunked blob reader") return chunkingReader.ToBReader(), nil @@ -127,7 +126,7 @@ func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descripto return nil } - r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.chunkSizeBytes, sm.logger) + r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), chunkSizeBytes, sm.logger) if err != nil { return err }