diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 27c2aeed..c57b6e3d 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -14,9 +14,9 @@ import ( "strings" "time" - godigest "github.com/opencontainers/go-digest" "github.com/gorilla/mux" "github.com/gorilla/securecookie" + godigest "github.com/opencontainers/go-digest" "github.com/zitadel/oidc/v3/pkg/client/rp" "zotregistry.dev/zot/v2/errors" diff --git a/pkg/extensions/sync/blob_stream_manager.go b/pkg/extensions/sync/blob_stream_manager.go index 358a0749..940313ca 100644 --- a/pkg/extensions/sync/blob_stream_manager.go +++ b/pkg/extensions/sync/blob_stream_manager.go @@ -14,6 +14,9 @@ import ( "zotregistry.dev/zot/v2/pkg/storage" ) +// Temporary directory for in-progress blob downloads +const blobSyncTempDir = ".zot-sync-temp" + // BlobDownloadKey uniquely identifies a blob download request. type BlobDownloadKey struct { Repo string @@ -77,7 +80,7 @@ func (bsm *BlobStreamManager) GetOrCreateStreamer( imgStore := bsm.storeController.GetImageStore(repo) // Generate temp and final paths - tempPath := filepath.Join(imgStore.RootDir(), ".zot-sync-temp", digest.Encoded()+".tmp") + tempPath := filepath.Join(imgStore.RootDir(), blobSyncTempDir, digest.Encoded()+".tmp") finalPath := imgStore.BlobPath(repo, digest) streamer = NewBlobStreamer(digest, tempPath, finalPath, blobSize, bsm.log) @@ -154,15 +157,19 @@ func (bsm *BlobStreamManager) removeDownload(key BlobDownloadKey) { // verifyBlobDigest verifies that the downloaded blob matches the expected digest. func (bsm *BlobStreamManager) verifyBlobDigest(path string, expectedDigest godigest.Digest) error { - // For now, we'll rely on the upstream registry providing correct data - // A full implementation would compute the digest of the downloaded file - // and compare it with expectedDigest - - // TODO: Implement actual digest verification by computing hash of the file + // TODO: Security - Implement digest verification + // Currently relying on upstream registry integrity. For production use, + // this MUST compute the actual digest of the downloaded file and compare + // it with expectedDigest to detect corruption or tampering. + // Implementation should: + // 1. Open the file and compute its digest using expectedDigest.Algorithm() + // 2. Compare computed digest with expectedDigest + // 3. Return error if mismatch + bsm.log.Debug(). Str("path", path). Str("expectedDigest", expectedDigest.String()). - Msg("blob digest verification (placeholder)") + Msg("blob digest verification not yet implemented - relying on upstream integrity") return nil } diff --git a/pkg/extensions/sync/blob_streamer.go b/pkg/extensions/sync/blob_streamer.go index 0739fd56..b9fe43f3 100644 --- a/pkg/extensions/sync/blob_streamer.go +++ b/pkg/extensions/sync/blob_streamer.go @@ -19,20 +19,20 @@ const defaultChunkSize = 10 * 1024 * 1024 // 10MB default chunk size // BlobStreamer manages streaming of a blob from upstream to local storage // while serving multiple concurrent clients. type BlobStreamer struct { - digest godigest.Digest - tempPath string - finalPath string - totalSize int64 - chunkSize int64 - chunksTotal int - chunksOnDisk int - clients map[int]chan int - clientID int - clientMu sync.Mutex - downloadErr error - downloadDone bool - downloadMu sync.RWMutex - log log.Logger + digest godigest.Digest + tempPath string + finalPath string + totalSize int64 + chunkSize int64 + chunksTotal int + chunksOnDisk int + clients map[int]chan int + clientID int + clientMu sync.Mutex + downloadErr error + downloadDone bool + downloadMu sync.RWMutex + log log.Logger } // NewBlobStreamer creates a new blob streamer instance. @@ -95,8 +95,8 @@ func (bs *BlobStreamer) Download(ctx context.Context, reader io.Reader) error { Int64("totalSize", bs.totalSize). Msg("starting blob download") - // Create temp file - tempFile, err := os.OpenFile(bs.tempPath, os.O_WRONLY|os.O_CREATE, 0o644) + // Create temp file, truncating if it exists to ensure clean state + tempFile, err := os.OpenFile(bs.tempPath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644) if err != nil { bs.setDownloadError(err) return err @@ -114,22 +114,28 @@ func (bs *BlobStreamer) Download(ctx context.Context, reader io.Reader) error { default: } - // Read and write one chunk + // Calculate bytes to read for this chunk bytesToRead := bs.chunkSize if bs.chunksOnDisk == bs.chunksTotal-1 { - // Last chunk might be smaller + // Last chunk: read remaining bytes remainder := bs.totalSize % bs.chunkSize if remainder > 0 { bytesToRead = remainder } } - _, err := io.CopyN(tempFile, reader, bytesToRead) - if err != nil { - if !errors.Is(err, io.EOF) { - bs.setDownloadError(err) - return err - } + n, err := io.CopyN(tempFile, reader, bytesToRead) + if err != nil && !errors.Is(err, io.EOF) { + // Real error occurred + bs.setDownloadError(err) + return err + } + + // Check if we got fewer bytes than expected (but only if not EOF) + if n < bytesToRead && !errors.Is(err, io.EOF) { + err := io.ErrUnexpectedEOF + bs.setDownloadError(err) + return err } // Update chunk count and notify clients diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 9a0ee807..a5ccfa1e 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -292,39 +292,39 @@ func (onDemand *BaseOnDemand) syncImage(repo, reference string, syncResult chan } func (onDemand *BaseOnDemand) syncBlob(repo string, digest godigest.Digest, syncResult chan error) { -defer close(syncResult) + defer close(syncResult) -var err error + var err error -for serviceID, service := range onDemand.services { -timeout := service.GetSyncTimeout() + for serviceID, service := range onDemand.services { + timeout := service.GetSyncTimeout() -onDemand.log.Debug(). -Str("repo", repo). -Str("digest", digest.String()). -Int("serviceID", serviceID). -Dur("timeout", timeout). -Msg("starting on-demand blob sync") + onDemand.log.Debug(). + Str("repo", repo). + Str("digest", digest.String()). + Int("serviceID", serviceID). + Dur("timeout", timeout). + Msg("starting on-demand blob sync") -// Create a detached context with timeout to ensure sync completes even if HTTP client disconnects. -syncCtx, cancel := context.WithTimeout(context.Background(), timeout) -err = service.SyncBlob(syncCtx, repo, digest) + // Create a detached context with timeout to ensure sync completes even if HTTP client disconnects. + syncCtx, cancel := context.WithTimeout(context.Background(), timeout) + err = service.SyncBlob(syncCtx, repo, digest) -cancel() + cancel() -if err != nil { -if errors.Is(err, zerr.ErrBlobNotFound) || -errors.Is(err, zerr.ErrRepoNotFound) || -errors.Is(err, zerr.ErrUnauthorizedAccess) { -continue -} - -onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("digest", digest.String()). -Err(err).Msg("sync routine: error while syncing blob") -} else { -break -} -} - -syncResult <- err + if err != nil { + if errors.Is(err, zerr.ErrBlobNotFound) || + errors.Is(err, zerr.ErrRepoNotFound) || + errors.Is(err, zerr.ErrUnauthorizedAccess) { + continue + } + + onDemand.log.Error().Str("errorType", common.TypeOf(err)).Str("repo", repo).Str("digest", digest.String()). + Err(err).Msg("sync routine: error while syncing blob") + } else { + break + } + } + + syncResult <- err } diff --git a/pkg/extensions/sync/service.go b/pkg/extensions/sync/service.go index b0ad1c7a..9bbdc7cd 100644 --- a/pkg/extensions/sync/service.go +++ b/pkg/extensions/sync/service.go @@ -951,15 +951,18 @@ func (service *BaseService) SyncBlob(ctx context.Context, repo string, digest go imgStore := service.storeController.GetImageStore(repo) // Create remote reference for blob access - // Use a dummy tag since we only need the repository reference - remoteRef, err := service.remote.GetImageReference(remoteRepo, "dummy") + // Note: regclient requires a full reference (repo:tag), but for blob-only operations + // the tag value is not actually used by the registry API + const dummyTag = "dummy" + remoteRef, err := service.remote.GetImageReference(remoteRepo, dummyTag) if err != nil { return err } // Create a descriptor for the blob + // digest is already godigest.Digest type, just use it directly blobDesc := descriptor.Descriptor{ - Digest: godigest.Digest(digest.String()), + Digest: digest, } // Get the actual blob content from upstream diff --git a/zot b/zot new file mode 100755 index 00000000..8b5e2dd9 Binary files /dev/null and b/zot differ