diff --git a/errors/errors.go b/errors/errors.go index 03d38ec7..55918709 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -216,4 +216,5 @@ var ( ErrStreamReaderNotInitialized = errors.New("reader not initialized") ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams") ErrBlobReaderMissing = errors.New("blob reader missing for this blob") + ErrSyncUpstreamDownloadFailed = errors.New("upstream blob download failed") ) diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 778a6d74..b714e620 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -654,6 +654,6 @@ func RunGCTasks(conf *config.Config, storeController storage.StoreController, me type SyncOnDemand interface { SyncImage(ctx context.Context, repo, reference string) error SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error - FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) + FetchManifestForStream(ctx context.Context, repo, reference string) (manifest.Manifest, error) StreamManager() sync.StreamManager } diff --git a/pkg/api/routes.go b/pkg/api/routes.go index e49b6aac..b0672fec 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1167,15 +1167,13 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re response.WriteHeader(http.StatusOK) } +// getBlobInfoFromStreamCache checks if a blob exists in the stream cache +// and writes appropriate headers to the response if it does. +// This is only applicable when streaming is enabled. func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.ResponseWriter) error { rh.c.Log.Debug().Str("digest", digest).Msg("checking stream cache for blob existence") streamMgr := rh.c.SyncOnDemand.StreamManager() - if streamMgr == nil { - rh.c.Log.Error().Str("digest", digest).Msg("stream manager is not initialized") - - return zerr.ErrStreamManagerNotInitialized - } // when streaming is enabled, the blob might exist in the stream cache blobSize, blobMediaType, err := streamMgr.CachedBlobInfo(digest) @@ -1507,15 +1505,20 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ copier, err := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response) if err != nil { - rh.c.Log.Error().Err(err).Msg("failed to connect client to stream") - response.WriteHeader(http.StatusInternalServerError) + if !errors.Is(err, zerr.ErrBlobNotFoundInActiveStreams) { + rh.c.Log.Error().Err(err).Str("digest", digest.String()).Msg("failed to connect client to stream") + response.WriteHeader(http.StatusInternalServerError) - return + return + } } err = copier.Copy() if err != nil { - rh.c.Log.Error().Err(err).Msg("unexpected error during stream copy") + rh.c.Log.Error().Err(err).Str("digest", digest.String()).Msg("unexpected error during stream copy") + response.WriteHeader(http.StatusInternalServerError) + + return } response.Header().Set("Content-Length", strconv.FormatInt(copier.Source.InFlightReader.GetDescriptor().Size, 10)) @@ -2724,7 +2727,7 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference). Msg("streaming is enabled. Direct fetching manifest.") - fetchedManifest, err := routeHandler.c.SyncOnDemand.FetchManifest(ctx, name, reference) + fetchedManifest, err := routeHandler.c.SyncOnDemand.FetchManifestForStream(ctx, name, reference) if err != nil { routeHandler.c.Log.Err(err).Str("repository", name).Str("reference", reference). Msg("failed to fetch manifest") diff --git a/pkg/extensions/sync/chunked_blob_reader.go b/pkg/extensions/sync/chunked_blob_reader.go index bf2ec1c1..69cdb7c5 100644 --- a/pkg/extensions/sync/chunked_blob_reader.go +++ b/pkg/extensions/sync/chunked_blob_reader.go @@ -75,11 +75,16 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) { numBytesRead, err := io.CopyN(multiWriter, cbr.InFlightReader, cbr.chunkSizeBytes) if err != nil { if !errors.Is(err, io.EOF) { + // upstream download error cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader") - copy(buff, internalBuff.Bytes()) cbr.chunksMu.Unlock() - return int(numBytesRead), err + // drain all clients and close their channels + for clientId := range cbr.clients { + cbr.Unsubscribe(clientId) + } + + return -1, err } } @@ -141,8 +146,13 @@ func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) { cbr.clientMu.Unlock() }() - delete(cbr.clients, clientId) - cbr.numClientsTotal-- + channel, ok := cbr.clients[clientId] + if ok { + close(channel) + + cbr.numClientsTotal-- + delete(cbr.clients, clientId) + } } func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader { diff --git a/pkg/extensions/sync/inflight_blob_copier.go b/pkg/extensions/sync/inflight_blob_copier.go index 227dc81b..4ef77ac6 100644 --- a/pkg/extensions/sync/inflight_blob_copier.go +++ b/pkg/extensions/sync/inflight_blob_copier.go @@ -6,6 +6,7 @@ import ( "os" "sync" + zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/log" ) @@ -37,7 +38,7 @@ func NewInFlightBlobCopier( } func (ifbc *InFlightBlobCopier) Copy() error { - ifbc.log.Info().Msg("starting inflight copy") + ifbc.log.Debug().Str("onDiskPath", ifbc.onDiskPath).Msg("starting inflight copy") onDiskFile, err := os.Open(ifbc.onDiskPath) if err != nil { @@ -51,12 +52,15 @@ func (ifbc *InFlightBlobCopier) Copy() error { chunkChan := make(chan int64, 1) id := ifbc.Source.Subscribe(chunkChan) - defer ifbc.Source.Unsubscribe(id) - defer close(chunkChan) for { - latestChunkNum := <-chunkChan + latestChunkNum, ok := <-chunkChan + if !ok { + ifbc.log.Error().Str("onDiskPath", ifbc.onDiskPath).Msg("failed to download from upstream, aborting inflight copy") + + return zerr.ErrSyncUpstreamDownloadFailed + } ifbc.Lock() if latestChunkNum <= ifbc.numChunksCopied { diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 4c476da4..eec81630 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -53,22 +53,26 @@ func (onDemand *BaseOnDemand) StreamManager() StreamManager { return onDemand.streamManager } -func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { - // An image might already be streaming in which case, just return the one in cache. - if onDemand.streamManager != nil { - manifest, ok := onDemand.streamManager.StreamingImageManifest(repo, reference) - if ok { - onDemand.log.Debug().Str("repo", repo).Str("reference", reference). - Msg("streaming manifest already present in cache.") +// FetchManifestForStream directly fetches the manifest from the upstream services and prepares the image +// for streaming. +// This is only intended for use with streaming sync. +func (onDemand *BaseOnDemand) FetchManifestForStream( + ctx context.Context, repo, reference string, +) (manifest.Manifest, error) { + // If an image is already streaming, return the one in cache. + // There is no need to start a new background sync if the manifest is already cached. + cachedManifest, ok := onDemand.streamManager.StreamingImageManifest(repo, reference) + if ok { + onDemand.log.Debug().Str("repo", repo).Str("reference", reference). + Msg("streaming manifest already present in cache.") - return manifest, nil - } + return cachedManifest, nil } var manifest manifest.Manifest for _, service := range onDemand.services { - onDemand.log.Info().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest") + onDemand.log.Debug().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest") fetchedManifest, err := service.FetchManifest(ctx, repo, reference) if err != nil { onDemand.log.Error().Err(err).Msg("failed to fetch manifest from service") @@ -84,27 +88,26 @@ func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference return nil, zerr.ErrBlobNotFound } - if onDemand.streamManager != nil { - onDemand.log.Debug().Str("repo", repo).Str("reference", reference). - Msg("storing image for streaming.") + onDemand.log.Debug().Str("repo", repo).Str("reference", reference). + Msg("storing image for streaming") - err := onDemand.streamManager.StoreImageForStreaming(repo, reference, manifest) - if err != nil { - onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference). - Msg("failed to store manifest for streaming") + err := onDemand.streamManager.StoreImageForStreaming(repo, reference, manifest) + if err != nil { + onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference). + Msg("failed to store manifest for streaming") - return nil, err - } - - // sync the image in the background - go func() { - if errSync := onDemand.SyncImage(ctx, repo, reference); errSync != nil { - onDemand.log.Err(errSync).Str("repository", repo).Str("reference", reference). - Msg("failed to sync image") - } - }() + return nil, err } + // sync the image in the background + go func() { + syncCtx := context.WithoutCancel(ctx) + if errSync := onDemand.SyncImage(syncCtx, repo, reference); errSync != nil { + onDemand.log.Err(errSync).Str("repository", repo).Str("reference", reference). + Msg("failed to sync image") + } + }() + return manifest, nil } diff --git a/pkg/extensions/sync/on_demand_disabled.go b/pkg/extensions/sync/on_demand_disabled.go index 93b30c73..6d040c98 100644 --- a/pkg/extensions/sync/on_demand_disabled.go +++ b/pkg/extensions/sync/on_demand_disabled.go @@ -20,7 +20,9 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string, return nil } -func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) { +func (onDemand *BaseOnDemand) FetchManifestForStream( + ctx context.Context, repo, reference string, +) (manifest.Manifest, error) { return manifest.New() } diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index 149c57c5..2e6b5f3c 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -521,7 +521,7 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot copyOpts := []regclient.ImageOpts{} if service.streamManager != nil { - service.log.Info().Str("repo", localRepo).Str("reference", remoteImageRef.Tag). + service.log.Debug().Str("repo", localRepo).Str("reference", remoteImageRef.Tag). Msg("streaming is enabled. Enabling reader hook") copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader)) } diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 8742b03f..8af5967d 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -41,7 +41,7 @@ type ChunkingStreamManager struct { } func NewChunkingStreamManager(config *config.Config, logger log.Logger) *ChunkingStreamManager { - store := NewLocalTempStore(path.Join(config.Storage.RootDirectory, "stream")) + store := NewLocalTempStore(path.Join(config.Storage.RootDirectory, "_stream"), logger) return &ChunkingStreamManager{ tempStore: store, diff --git a/pkg/extensions/sync/stream_temp_store.go b/pkg/extensions/sync/stream_temp_store.go index efd997d1..9bda8c42 100644 --- a/pkg/extensions/sync/stream_temp_store.go +++ b/pkg/extensions/sync/stream_temp_store.go @@ -2,11 +2,12 @@ package sync import ( "errors" - "fmt" "os" "path" godigest "github.com/opencontainers/go-digest" + + "zotregistry.dev/zot/v2/pkg/log" ) type StreamTempStore interface { @@ -15,23 +16,27 @@ type StreamTempStore interface { type LocalTempStore struct { rootPath string + logger log.Logger } -func NewLocalTempStore(rootDir string) *LocalTempStore { +func NewLocalTempStore(rootDir string, logger log.Logger) *LocalTempStore { _, err := os.Stat(rootDir) if err != nil { if errors.Is(err, os.ErrNotExist) { err := os.MkdirAll(rootDir, 0o755) if err != nil { - fmt.Println("failed to create root dir " + err.Error()) + // If the root directory cannot be created, log a fatal error and exit. + logger.Fatal().Str("rootDir", rootDir).Err(err).Msg("failed to create root dir for stream temp store") } } else { - fmt.Println("failed to stat root dir " + err.Error()) + // If there is an error other than "not exists", log a fatal error and exit. + logger.Fatal().Str("rootDir", rootDir).Err(err).Msg("failed to stat root dir for stream temp store") } } return &LocalTempStore{ rootPath: rootDir, + logger: logger, } } @@ -41,7 +46,9 @@ func (lts *LocalTempStore) BlobPath(digest godigest.Digest) string { if err != nil && errors.Is(err, os.ErrNotExist) { err := os.MkdirAll(parentDir, 0o755) if err != nil { - fmt.Println("failed to create directory " + err.Error()) + // It is safe to not use fatal here as the stream will hit an error while + // trying to use the blob path, and the error will be handled there. + lts.logger.Error().Str("parentDir", parentDir).Err(err).Msg("failed to create directory") } }