diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index df8b55c4..a4d39c82 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -213,6 +213,80 @@ func (is *ImageStore) initRepo(name string) error { return nil } +// promoteBlobCandidate handles the core logic of copying/linking a single blob candidate +// into the global blobstore and registering it in the cache. +// +// blobCandidate holds metadata for a blob selected from the pre-blobstore layout. +type blobCandidate struct { + repoName string + blobPath string + size int64 +} + +// repoBlobRef tracks a blob's presence in a specific repo (for cache registration). +type repoBlobRef struct { + digest godigest.Digest + repoName string + blobPath string +} + +func (is *ImageStore) promoteBlobCandidate( + candidate blobCandidate, + digest godigest.Digest, + globalBlobPath string, +) error { + // ensure algorithm dir exists in _blobstore + algoDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo, + ispec.ImageBlobsDir, digest.Algorithm().String()) + if err := is.storeDriver.EnsureDir(algoDir); err != nil { + is.log.Error().Err(err).Str("dir", algoDir).Msg("failed to create algorithm dir") + + return err + } + + if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { + // local filesystem: use hard link (no extra disk space) + if err := is.storeDriver.Link(candidate.blobPath, globalBlobPath); err != nil { + is.log.Error().Err(err).Str("src", candidate.blobPath).Str("dst", globalBlobPath). + Msg("failed to link blob to global blobstore") + + return err + } + } else { + // S3/GCS: copy the actual blob content + content, err := is.storeDriver.ReadFile(candidate.blobPath) + if err != nil { + is.log.Error().Err(err).Str("src", candidate.blobPath). + Msg("failed to read blob during upgrade") + + return err + } + + if _, err := is.storeDriver.WriteFile(globalBlobPath, content); err != nil { + is.log.Error().Err(err).Str("dst", globalBlobPath). + Msg("failed to write blob to global blobstore") + + return err + } + } + + // register global blobstore path as the master/original cache entry first, + // so that subsequent PutBlob calls for per-repo paths go into DuplicatesBucket + if is.cache != nil { + if err := is.cache.PutBlob(digest, globalBlobPath); err != nil { + is.log.Error().Err(err).Str("digest", digest.String()). + Msg("failed to update cache with global blobstore path during upgrade") + + return err + } + } + + is.log.Info().Str("digest", digest.String()).Str("repo", candidate.repoName). + Msg("upgraded blob to global blobstore") + + return nil +} + // upgradeToGlobalBlobstore migrates blobs from per-repo directories into the global _blobstore // for older zot releases that did not have a centralized blobstore. // For local filesystem it uses hard links (no extra disk space). @@ -268,18 +342,6 @@ func (is *ImageStore) upgradeToGlobalBlobstore() error { is.log.Info().Msg("upgrading storage: populating global blobstore from existing repos") - type blobCandidate struct { - repoName string - blobPath string - size int64 - } - - type repoBlobRef struct { - digest godigest.Digest - repoName string - blobPath string - } - candidates := map[string]blobCandidate{} repoBlobRefs := []repoBlobRef{} promotedDigests := map[string]bool{} @@ -324,56 +386,11 @@ func (is *ImageStore) upgradeToGlobalBlobstore() error { continue } - // ensure algorithm dir exists in _blobstore - algoDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo, - ispec.ImageBlobsDir, digest.Algorithm().String()) - if err := is.storeDriver.EnsureDir(algoDir); err != nil { - is.log.Error().Err(err).Str("dir", algoDir).Msg("failed to create algorithm dir") - + if err := is.promoteBlobCandidate(candidate, digest, globalBlobPath); err != nil { return err } - if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { - // local filesystem: use hard link (no extra disk space) - if err := is.storeDriver.Link(candidate.blobPath, globalBlobPath); err != nil { - is.log.Error().Err(err).Str("src", candidate.blobPath).Str("dst", globalBlobPath). - Msg("failed to link blob to global blobstore") - - return err - } - } else { - // S3/GCS: copy the actual blob content - content, err := is.storeDriver.ReadFile(candidate.blobPath) - if err != nil { - is.log.Error().Err(err).Str("src", candidate.blobPath). - Msg("failed to read blob during upgrade") - - return err - } - - if _, err := is.storeDriver.WriteFile(globalBlobPath, content); err != nil { - is.log.Error().Err(err).Str("dst", globalBlobPath). - Msg("failed to write blob to global blobstore") - - return err - } - } - - // register global blobstore path as the master/original cache entry first, - // so that subsequent PutBlob calls for per-repo paths go into DuplicatesBucket - if is.cache != nil { - if err := is.cache.PutBlob(digest, globalBlobPath); err != nil { - is.log.Error().Err(err).Str("digest", digest.String()). - Msg("failed to update cache with global blobstore path during upgrade") - - return err - } - } - promotedDigests[digest.String()] = true - - is.log.Info().Str("digest", digest.String()).Str("repo", candidate.repoName). - Msg("upgraded blob to global blobstore") } for _, repoBlobRef := range repoBlobRefs {