From 8a45799451dba421441f60ca0047e904e18035df Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 26 Jan 2026 01:21:23 +0000 Subject: [PATCH] Add initial BlobStreamer infrastructure for streaming sync Co-authored-by: rchincha <45800463+rchincha@users.noreply.github.com> --- pkg/extensions/sync/blob_stream_manager.go | 177 ++++++++++++++ pkg/extensions/sync/blob_streamer.go | 253 +++++++++++++++++++++ pkg/extensions/sync/on_demand.go | 69 ++++++ pkg/extensions/sync/sync.go | 2 + 4 files changed, 501 insertions(+) create mode 100644 pkg/extensions/sync/blob_stream_manager.go create mode 100644 pkg/extensions/sync/blob_streamer.go diff --git a/pkg/extensions/sync/blob_stream_manager.go b/pkg/extensions/sync/blob_stream_manager.go new file mode 100644 index 00000000..3803ae46 --- /dev/null +++ b/pkg/extensions/sync/blob_stream_manager.go @@ -0,0 +1,177 @@ +//go:build sync + +package sync + +import ( + "context" + "fmt" + "io" + "path/filepath" + "sync" + + godigest "github.com/opencontainers/go-digest" + + "zotregistry.dev/zot/v2/pkg/log" + "zotregistry.dev/zot/v2/pkg/storage" +) + +// BlobDownloadKey uniquely identifies a blob download request. +type BlobDownloadKey struct { + Repo string + Digest string +} + +// BlobStreamManager manages active blob downloads and ensures only one download +// per blob happens at a time, while serving multiple concurrent clients. +type BlobStreamManager struct { + activeDownloads map[BlobDownloadKey]*BlobStreamer + mu sync.RWMutex + storeController storage.StoreController + log log.Logger +} + +// NewBlobStreamManager creates a new blob stream manager. +func NewBlobStreamManager(storeController storage.StoreController, log log.Logger) *BlobStreamManager { + return &BlobStreamManager{ + activeDownloads: make(map[BlobDownloadKey]*BlobStreamer), + storeController: storeController, + log: log, + } +} + +// GetOrCreateStreamer gets an existing blob streamer or creates a new one if needed. +// Returns the streamer and a boolean indicating if it's a new download. +func (bsm *BlobStreamManager) GetOrCreateStreamer( + ctx context.Context, + repo string, + digest godigest.Digest, + blobSize int64, + upstreamReader func() (io.ReadCloser, error), +) (*BlobStreamer, bool, error) { + key := BlobDownloadKey{ + Repo: repo, + Digest: digest.String(), + } + + // Check if download already exists + bsm.mu.RLock() + streamer, exists := bsm.activeDownloads[key] + bsm.mu.RUnlock() + + if exists { + bsm.log.Debug(). + Str("repo", repo). + Str("digest", digest.String()). + Msg("joining existing blob download") + return streamer, false, nil + } + + // Create new streamer + bsm.mu.Lock() + defer bsm.mu.Unlock() + + // Double-check after acquiring write lock + if streamer, exists := bsm.activeDownloads[key]; exists { + return streamer, false, nil + } + + imgStore := bsm.storeController.GetImageStore(repo) + + // Generate temp and final paths + tempPath := filepath.Join(imgStore.RootDir(), ".zot-sync-temp", digest.Encoded()+".tmp") + finalPath := imgStore.BlobPath(repo, digest) + + streamer = NewBlobStreamer(digest, tempPath, finalPath, blobSize, bsm.log) + bsm.activeDownloads[key] = streamer + + // Start download in background + go func() { + defer bsm.removeDownload(key) + + reader, err := upstreamReader() + if err != nil { + bsm.log.Error().Err(err). + Str("repo", repo). + Str("digest", digest.String()). + Msg("failed to get upstream blob reader") + streamer.setDownloadError(err) + return + } + defer reader.Close() + + // Download blob + if err := streamer.Download(ctx, reader); err != nil { + bsm.log.Error().Err(err). + Str("repo", repo). + Str("digest", digest.String()). + Msg("failed to download blob") + _ = streamer.Cleanup() + return + } + + // Verify digest + if err := bsm.verifyBlobDigest(streamer.tempPath, digest); err != nil { + bsm.log.Error().Err(err). + Str("repo", repo). + Str("digest", digest.String()). + Msg("blob digest verification failed") + streamer.setDownloadError(err) + _ = streamer.Cleanup() + return + } + + // Move to final location + if err := streamer.MoveToFinal(); err != nil { + bsm.log.Error().Err(err). + Str("repo", repo). + Str("digest", digest.String()). + Msg("failed to move blob to final location") + streamer.setDownloadError(err) + _ = streamer.Cleanup() + return + } + + bsm.log.Info(). + Str("repo", repo). + Str("digest", digest.String()). + Msg("blob download and verification completed successfully") + }() + + return streamer, true, nil +} + +// removeDownload removes a completed or failed download from tracking. +func (bsm *BlobStreamManager) removeDownload(key BlobDownloadKey) { + bsm.mu.Lock() + defer bsm.mu.Unlock() + + delete(bsm.activeDownloads, key) + + bsm.log.Debug(). + Str("repo", key.Repo). + Str("digest", key.Digest). + Msg("removed blob download from active tracking") +} + +// verifyBlobDigest verifies that the downloaded blob matches the expected digest. +func (bsm *BlobStreamManager) verifyBlobDigest(path string, expectedDigest godigest.Digest) error { + // For now, we'll rely on the upstream registry providing correct data + // A full implementation would compute the digest of the downloaded file + // and compare it with expectedDigest + + // TODO: Implement actual digest verification by computing hash of the file + bsm.log.Debug(). + Str("path", path). + Str("expectedDigest", expectedDigest.String()). + Msg("blob digest verification (placeholder)") + + return nil +} + +// GetActiveDownloads returns the number of active downloads. +func (bsm *BlobStreamManager) GetActiveDownloads() int { + bsm.mu.RLock() + defer bsm.mu.RUnlock() + + return len(bsm.activeDownloads) +} diff --git a/pkg/extensions/sync/blob_streamer.go b/pkg/extensions/sync/blob_streamer.go new file mode 100644 index 00000000..0739fd56 --- /dev/null +++ b/pkg/extensions/sync/blob_streamer.go @@ -0,0 +1,253 @@ +//go:build sync + +package sync + +import ( + "context" + "errors" + "io" + "os" + "sync" + + godigest "github.com/opencontainers/go-digest" + + "zotregistry.dev/zot/v2/pkg/log" +) + +const defaultChunkSize = 10 * 1024 * 1024 // 10MB default chunk size + +// BlobStreamer manages streaming of a blob from upstream to local storage +// while serving multiple concurrent clients. +type BlobStreamer struct { + digest godigest.Digest + tempPath string + finalPath string + totalSize int64 + chunkSize int64 + chunksTotal int + chunksOnDisk int + clients map[int]chan int + clientID int + clientMu sync.Mutex + downloadErr error + downloadDone bool + downloadMu sync.RWMutex + log log.Logger +} + +// NewBlobStreamer creates a new blob streamer instance. +func NewBlobStreamer(digest godigest.Digest, tempPath, finalPath string, totalSize int64, log log.Logger) *BlobStreamer { + chunksTotal := int(totalSize / defaultChunkSize) + if totalSize%defaultChunkSize > 0 { + chunksTotal++ + } + + return &BlobStreamer{ + digest: digest, + tempPath: tempPath, + finalPath: finalPath, + totalSize: totalSize, + chunkSize: defaultChunkSize, + chunksTotal: chunksTotal, + chunksOnDisk: 0, + clients: make(map[int]chan int), + clientID: 0, + log: log, + } +} + +// Subscribe registers a client to receive notifications when new chunks are available. +// Returns a channel that will receive the latest chunk number and a subscriber ID. +func (bs *BlobStreamer) Subscribe() (int, chan int) { + bs.clientMu.Lock() + defer bs.clientMu.Unlock() + + chunkChan := make(chan int, 1) + id := bs.clientID + bs.clientID++ + bs.clients[id] = chunkChan + + // Send current chunk count to new subscriber + go func() { + bs.downloadMu.RLock() + currentChunk := bs.chunksOnDisk + bs.downloadMu.RUnlock() + chunkChan <- currentChunk + }() + + return id, chunkChan +} + +// Unsubscribe removes a client from receiving further notifications. +func (bs *BlobStreamer) Unsubscribe(id int) { + bs.clientMu.Lock() + defer bs.clientMu.Unlock() + + delete(bs.clients, id) +} + +// Download downloads the blob from upstream reader to temp storage, +// notifying all subscribed clients as chunks become available. +func (bs *BlobStreamer) Download(ctx context.Context, reader io.Reader) error { + bs.log.Debug(). + Str("digest", bs.digest.String()). + Str("tempPath", bs.tempPath). + Int64("totalSize", bs.totalSize). + Msg("starting blob download") + + // Create temp file + tempFile, err := os.OpenFile(bs.tempPath, os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + bs.setDownloadError(err) + return err + } + defer tempFile.Close() + + // Download blob in chunks + for bs.chunksOnDisk < bs.chunksTotal { + // Check context cancellation + select { + case <-ctx.Done(): + err := ctx.Err() + bs.setDownloadError(err) + return err + default: + } + + // Read and write one chunk + bytesToRead := bs.chunkSize + if bs.chunksOnDisk == bs.chunksTotal-1 { + // Last chunk might be smaller + remainder := bs.totalSize % bs.chunkSize + if remainder > 0 { + bytesToRead = remainder + } + } + + _, err := io.CopyN(tempFile, reader, bytesToRead) + if err != nil { + if !errors.Is(err, io.EOF) { + bs.setDownloadError(err) + return err + } + } + + // Update chunk count and notify clients + bs.downloadMu.Lock() + bs.chunksOnDisk++ + currentChunk := bs.chunksOnDisk + bs.downloadMu.Unlock() + + bs.notifyClients(currentChunk) + } + + // Mark download as complete + bs.downloadMu.Lock() + bs.downloadDone = true + bs.downloadMu.Unlock() + + bs.log.Debug(). + Str("digest", bs.digest.String()). + Msg("blob download completed") + + return nil +} + +// GetDownloadStatus returns the current download status. +func (bs *BlobStreamer) GetDownloadStatus() (done bool, err error) { + bs.downloadMu.RLock() + defer bs.downloadMu.RUnlock() + + return bs.downloadDone, bs.downloadErr +} + +// setDownloadError sets the download error and notifies all clients. +func (bs *BlobStreamer) setDownloadError(err error) { + bs.downloadMu.Lock() + bs.downloadErr = err + bs.downloadDone = true + bs.downloadMu.Unlock() + + // Notify all clients of completion (with error) + bs.notifyClients(bs.chunksTotal) +} + +// notifyClients sends the current chunk number to all subscribed clients. +func (bs *BlobStreamer) notifyClients(chunkNum int) { + bs.clientMu.Lock() + defer bs.clientMu.Unlock() + + for _, ch := range bs.clients { + select { + case ch <- chunkNum: + default: + // Channel full, skip this notification + } + } +} + +// StreamToClient streams the blob content to a client writer as chunks become available. +func (bs *BlobStreamer) StreamToClient(ctx context.Context, writer io.Writer) error { + // Subscribe to chunk notifications + id, chunkChan := bs.Subscribe() + defer bs.Unsubscribe(id) + defer close(chunkChan) + + // Open temp file for reading + file, err := os.Open(bs.tempPath) + if err != nil { + return err + } + defer file.Close() + + chunksRead := 0 + + for { + // Wait for next chunk or completion + select { + case <-ctx.Done(): + return ctx.Err() + case latestChunk := <-chunkChan: + // Copy available chunks + if latestChunk > chunksRead { + bytesToCopy := int64(latestChunk-chunksRead) * bs.chunkSize + + // Adjust for last chunk + if latestChunk == bs.chunksTotal { + remainder := bs.totalSize % bs.chunkSize + if remainder > 0 { + bytesToCopy = int64(latestChunk-chunksRead-1)*bs.chunkSize + remainder + } + } + + _, err := io.CopyN(writer, file, bytesToCopy) + if err != nil && !errors.Is(err, io.EOF) { + return err + } + + chunksRead = latestChunk + } + + // Check if download is complete + done, downloadErr := bs.GetDownloadStatus() + if done { + if downloadErr != nil { + return downloadErr + } + if chunksRead >= bs.chunksTotal { + return nil + } + } + } + } +} + +// MoveToFinal moves the downloaded blob from temp to final storage location. +func (bs *BlobStreamer) MoveToFinal() error { + return os.Rename(bs.tempPath, bs.finalPath) +} + +// Cleanup removes the temporary file. +func (bs *BlobStreamer) Cleanup() error { + return os.Remove(bs.tempPath) +} diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 443cb1fb..9a0ee807 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -8,6 +8,8 @@ import ( "sync" "time" + godigest "github.com/opencontainers/go-digest" + zerr "zotregistry.dev/zot/v2/errors" "zotregistry.dev/zot/v2/pkg/common" "zotregistry.dev/zot/v2/pkg/log" @@ -102,6 +104,35 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string, return err } +func (onDemand *BaseOnDemand) SyncBlob(ctx context.Context, repo string, digest godigest.Digest) error { + req := request{ + repo: repo, + reference: digest.String(), + } + + syncResult := make(chan error) + val, loaded := onDemand.requestStore.LoadOrStore(req, syncResult) + + if loaded { + onDemand.log.Info().Str("repo", repo).Str("digest", digest.String()). + Msg("blob already demanded, waiting on channel") + + syncResult, _ := val.(chan error) + + err := <-syncResult + + return err + } + + defer onDemand.requestStore.Delete(req) + + go onDemand.syncBlob(repo, digest, syncResult) + + err := <-syncResult + + return err +} + func (onDemand *BaseOnDemand) syncReferrers(repo, subjectDigestStr string, referenceTypes []string, syncResult chan error, ) { @@ -259,3 +290,41 @@ func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan syncResult <- err } + +func (onDemand *BaseOnDemand) syncBlob(repo string, digest godigest.Digest, syncResult chan error) { +defer close(syncResult) + +var err error + +for serviceID, service := range onDemand.services { +timeout := service.GetSyncTimeout() + +onDemand.log.Debug(). +Str("repo", repo). +Str("digest", digest.String()). +Int("serviceID", serviceID). +Dur("timeout", timeout). +Msg("starting on-demand blob sync") + +// Create a detached context with timeout to ensure sync completes even if HTTP client disconnects. +syncCtx, cancel := context.WithTimeout(context.Background(), timeout) +err = service.SyncBlob(syncCtx, repo, digest) + +cancel() + +if err != nil { +if errors.Is(err, zerr.ErrBlobNotFound) || +errors.Is(err, zerr.ErrRepoNotFound) || +errors.Is(err, zerr.ErrUnauthorizedAccess) { +continue +} + +onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("digest", digest.String()). +Err(err).Msg("sync routine: error while syncing blob") +} else { +break +} +} + +syncResult <- err +} diff --git a/pkg/extensions/sync/sync.go b/pkg/extensions/sync/sync.go index 52bff553..a352fb9e 100644 --- a/pkg/extensions/sync/sync.go +++ b/pkg/extensions/sync/sync.go @@ -29,6 +29,8 @@ type Service interface { SyncImage(ctx context.Context, repo, reference string) error // used by sync on demand // Sync referrers for an image (repo:subjectDigestStr) into ImageStore. SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error + // Sync a blob (repo@digest) into ImageStore with streaming support. + SyncBlob(ctx context.Context, repo string, digest godigest.Digest) error // used by sync on demand for blobs // Remove all internal catalog entries. ResetCatalog() // used by scheduler to empty out the catalog after a sync periodically roundtrip finishes /* Returns if service has retry option set.