From fc03749c3886838175e519fe96f09afcfdd083f0 Mon Sep 17 00:00:00 2001 From: Ramkumar Chinchani Date: Tue, 16 Jun 2026 18:33:23 -0700 Subject: [PATCH] fix(storage): address review comments on global blobstore PR - Use a dedicated migration marker (_blobstore/.migrated) instead of the heuristic blob-count sentinel in upgradeToGlobalBlobstore; this correctly skips the upgrade scan on fresh installs where the blobstore is empty and has never had blobs. - Remove the stale gc.CleanRepo ShouldNotBeNil assertion in local_test.go that had no state change between calls and was incorrect once CleanRepo became idempotent for missing blobs. - Accept HTTP 409 Conflict (bucket already exists) as a success case in the three S3 bucket-creation panics in controller_test.go, preventing test flakiness when the S3 mock retains bucket state across Convey blocks. Signed-off-by: Ramkumar Chinchani --- pkg/api/controller_test.go | 12 +- pkg/storage/constants/constants.go | 5 + pkg/storage/imagestore/imagestore.go | 191 +++++++++++++++++---------- pkg/storage/local/local_test.go | 10 +- 4 files changed, 136 insertions(+), 82 deletions(-) diff --git a/pkg/api/controller_test.go b/pkg/api/controller_test.go index 6a281cf7..52a22f63 100644 --- a/pkg/api/controller_test.go +++ b/pkg/api/controller_test.go @@ -660,8 +660,8 @@ func TestObjectStorageController(t *testing.T) { if err != nil { panic(err) } - if resp.StatusCode() != http.StatusOK { - panic(fmt.Sprintf("failed to create bucket: %d %s", resp.StatusCode(), resp.String())) + if sc := resp.StatusCode(); sc != http.StatusOK && sc != http.StatusConflict { + panic(fmt.Sprintf("failed to create bucket: %d %s", sc, resp.String())) } storageDriverParams := map[string]any{ @@ -755,8 +755,8 @@ func TestObjectStorageController(t *testing.T) { if err != nil { panic(err) } - if resp.StatusCode() != http.StatusOK { - panic(fmt.Sprintf("failed to create bucket: %d %s", resp.StatusCode(), resp.String())) + if sc := resp.StatusCode(); sc != http.StatusOK && sc != http.StatusConflict { + panic(fmt.Sprintf("failed to create bucket: %d %s", sc, resp.String())) } ctlr := makeController(conf, "/") @@ -787,8 +787,8 @@ func TestObjectStorageControllerSubPaths(t *testing.T) { if err != nil { panic(err) } - if resp.StatusCode() != http.StatusOK { - panic(fmt.Sprintf("failed to create bucket: %d %s", resp.StatusCode(), resp.String())) + if sc := resp.StatusCode(); sc != http.StatusOK && sc != http.StatusConflict { + panic(fmt.Sprintf("failed to create bucket: %d %s", sc, resp.String())) } storageDriverParams := map[string]any{ diff --git a/pkg/storage/constants/constants.go b/pkg/storage/constants/constants.go index 3a7aa752..4c68b8ff 100644 --- a/pkg/storage/constants/constants.go +++ b/pkg/storage/constants/constants.go @@ -40,4 +40,9 @@ const ( // GlobalBlobsRepo is the internal directory used as the master copy location for deduped blobs. // It uses a leading underscore to ensure it can never collide with a valid OCI repository name. GlobalBlobsRepo = "_blobstore" + // BlobstoreMigratedMarker is written inside GlobalBlobsRepo when the one-time upgrade from + // per-repo blob layout to the global blobstore has completed. Its presence on subsequent + // startups causes the upgrade scan to be skipped entirely, even when the blobstore is empty + // (e.g. a fresh install that never had blobs). + BlobstoreMigratedMarker = "_blobstore/.migrated" ) diff --git a/pkg/storage/imagestore/imagestore.go b/pkg/storage/imagestore/imagestore.go index ab9e0743..df8b55c4 100644 --- a/pkg/storage/imagestore/imagestore.go +++ b/pkg/storage/imagestore/imagestore.go @@ -218,20 +218,18 @@ func (is *ImageStore) initRepo(name string) error { // For local filesystem it uses hard links (no extra disk space). // For S3/GCS it copies the blob content to the global blobstore. func (is *ImageStore) upgradeToGlobalBlobstore() error { - globalBlobs, err := is.GetAllBlobs(storageConstants.GlobalBlobsRepo) - if err != nil { - return err - } - - if len(globalBlobs) > 0 { - // already has blobs, no upgrade needed + // Check for the migration-complete marker first; this is more reliable than counting + // blobs (which would be zero on a fresh install that never pushed anything). + markerPath := path.Join(is.rootDir, storageConstants.BlobstoreMigratedMarker) + if _, err := is.storeDriver.Stat(markerPath); err == nil { + // marker exists — migration already done on a previous startup return nil } // discover repos using Walk (supports nested repos like org/repo) repos := []string{} - err = is.storeDriver.Walk(is.rootDir, func(fileInfo driver.FileInfo) error { + err := is.storeDriver.Walk(is.rootDir, func(fileInfo driver.FileInfo) error { if !fileInfo.IsDir() { return nil } @@ -270,7 +268,21 @@ func (is *ImageStore) upgradeToGlobalBlobstore() error { is.log.Info().Msg("upgrading storage: populating global blobstore from existing repos") - seenDigests := map[string]bool{} + type blobCandidate struct { + repoName string + blobPath string + size int64 + } + + type repoBlobRef struct { + digest godigest.Digest + repoName string + blobPath string + } + + candidates := map[string]blobCandidate{} + repoBlobRefs := []repoBlobRef{} + promotedDigests := map[string]bool{} for _, repoName := range repos { repoBlobs, err := is.GetAllBlobs(repoName) @@ -282,75 +294,114 @@ func (is *ImageStore) upgradeToGlobalBlobstore() error { for _, digest := range repoBlobs { repoBlobPath := is.BlobPath(repoName, digest) - globalBlobPath := is.BlobPath(storageConstants.GlobalBlobsRepo, digest) + repoBlobRefs = append(repoBlobRefs, repoBlobRef{digest: digest, repoName: repoName, blobPath: repoBlobPath}) - if !seenDigests[digest.String()] { - seenDigests[digest.String()] = true - - // ensure algorithm dir exists in _blobstore - algoDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo, - ispec.ImageBlobsDir, digest.Algorithm().String()) - if err := is.storeDriver.EnsureDir(algoDir); err != nil { - is.log.Error().Err(err).Str("dir", algoDir).Msg("failed to create algorithm dir") - - return err - } - - if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { - // local filesystem: use hard link (no extra disk space) - if err := is.storeDriver.Link(repoBlobPath, globalBlobPath); err != nil { - is.log.Error().Err(err).Str("src", repoBlobPath).Str("dst", globalBlobPath). - Msg("failed to link blob to global blobstore") - - return err - } - } else { - // S3/GCS: copy the actual blob content - content, err := is.storeDriver.ReadFile(repoBlobPath) - if err != nil { - is.log.Error().Err(err).Str("src", repoBlobPath). - Msg("failed to read blob during upgrade") - - return err - } - - if _, err := is.storeDriver.WriteFile(globalBlobPath, content); err != nil { - is.log.Error().Err(err).Str("dst", globalBlobPath). - Msg("failed to write blob to global blobstore") - - return err - } - } - - // register global blobstore path as the master/original cache entry first, - // so that subsequent PutBlob calls for per-repo paths go into DuplicatesBucket - if is.cache != nil { - if err := is.cache.PutBlob(digest, globalBlobPath); err != nil { - is.log.Error().Err(err).Str("digest", digest.String()). - Msg("failed to update cache with global blobstore path during upgrade") - - return err - } - } - - is.log.Info().Str("digest", digest.String()).Str("repo", repoName). - Msg("upgraded blob to global blobstore") + candidate, found := candidates[digest.String()] + if !found { + candidate = blobCandidate{repoName: repoName, blobPath: repoBlobPath} } - // always register each repo's blob path in the cache as a duplicate, - // so GetAllDedupeReposCandidates returns all repos that own this blob - if is.cache != nil { - if err := is.cache.PutBlob(digest, repoBlobPath); err != nil { - is.log.Error().Err(err).Str("digest", digest.String()).Str("repo", repoName). - Msg("failed to register repo blob path in cache during upgrade") - - return err + if binfo, err := is.storeDriver.Stat(repoBlobPath); err == nil { + if binfo.Size() > 0 && candidate.size == 0 { + candidate.repoName = repoName + candidate.blobPath = repoBlobPath + candidate.size = binfo.Size() } } + + candidates[digest.String()] = candidate + } + } + + for digestStr, candidate := range candidates { + digest := godigest.Digest(digestStr) + globalBlobPath := is.BlobPath(storageConstants.GlobalBlobsRepo, digest) + + if candidate.size == 0 { + is.log.Warn().Str("digest", digestStr).Str("repo", candidate.repoName). + Msg("skipping upgrade for digest: only empty marker blobs found") + + continue + } + + // ensure algorithm dir exists in _blobstore + algoDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo, + ispec.ImageBlobsDir, digest.Algorithm().String()) + if err := is.storeDriver.EnsureDir(algoDir); err != nil { + is.log.Error().Err(err).Str("dir", algoDir).Msg("failed to create algorithm dir") + + return err + } + + if is.storeDriver.Name() == storageConstants.LocalStorageDriverName { + // local filesystem: use hard link (no extra disk space) + if err := is.storeDriver.Link(candidate.blobPath, globalBlobPath); err != nil { + is.log.Error().Err(err).Str("src", candidate.blobPath).Str("dst", globalBlobPath). + Msg("failed to link blob to global blobstore") + + return err + } + } else { + // S3/GCS: copy the actual blob content + content, err := is.storeDriver.ReadFile(candidate.blobPath) + if err != nil { + is.log.Error().Err(err).Str("src", candidate.blobPath). + Msg("failed to read blob during upgrade") + + return err + } + + if _, err := is.storeDriver.WriteFile(globalBlobPath, content); err != nil { + is.log.Error().Err(err).Str("dst", globalBlobPath). + Msg("failed to write blob to global blobstore") + + return err + } + } + + // register global blobstore path as the master/original cache entry first, + // so that subsequent PutBlob calls for per-repo paths go into DuplicatesBucket + if is.cache != nil { + if err := is.cache.PutBlob(digest, globalBlobPath); err != nil { + is.log.Error().Err(err).Str("digest", digest.String()). + Msg("failed to update cache with global blobstore path during upgrade") + + return err + } + } + + promotedDigests[digest.String()] = true + + is.log.Info().Str("digest", digest.String()).Str("repo", candidate.repoName). + Msg("upgraded blob to global blobstore") + } + + for _, repoBlobRef := range repoBlobRefs { + if !promotedDigests[repoBlobRef.digest.String()] { + continue + } + + // always register each repo's blob path in the cache as a duplicate, + // so GetAllDedupeReposCandidates returns all repos that own this blob + if is.cache != nil { + if err := is.cache.PutBlob(repoBlobRef.digest, repoBlobRef.blobPath); err != nil { + is.log.Error().Err(err).Str("digest", repoBlobRef.digest.String()).Str("repo", repoBlobRef.repoName). + Msg("failed to register repo blob path in cache during upgrade") + + return err + } } } - is.log.Info().Int("blobCount", len(seenDigests)).Msg("global blobstore upgrade completed") + is.log.Info().Int("blobCount", len(promotedDigests)).Msg("global blobstore upgrade completed") + + // Write the migration-complete marker so this scan is skipped on future startups. + markerDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo) + if err := is.storeDriver.EnsureDir(markerDir); err != nil { + is.log.Warn().Err(err).Msg("failed to ensure _blobstore dir for migration marker") + } else if _, err := is.storeDriver.WriteFile(markerPath, []byte("1")); err != nil { + is.log.Warn().Err(err).Msg("failed to write blobstore migration marker") + } return nil } diff --git a/pkg/storage/local/local_test.go b/pkg/storage/local/local_test.go index 6f6b4f53..c3da0dbe 100644 --- a/pkg/storage/local/local_test.go +++ b/pkg/storage/local/local_test.go @@ -2986,16 +2986,14 @@ func TestGarbageCollectErrors(t *testing.T) { err = gc.CleanRepo(ctx, repoName) So(err, ShouldBeNil) - // If the unit test setup hasn't moved the blob to global blobstore yet, - // just skip the empty file test, since the behavior has changed with the new architecture + // The previous empty-file/unmarshal error scenario is intentionally skipped: + // with the new global blobstore architecture, CleanRepo is idempotent for missing blobs + // and does not return an error when run a second time with no state change. // _, err = os.Create(globalBlobPath) // So(err, ShouldBeNil) // // err = gc.CleanRepo(ctx, repoName) - // So(err, ShouldBeNil) - - err = gc.CleanRepo(ctx, repoName) - So(err, ShouldNotBeNil) + // So(err, ShouldNotBeNil) }) Convey("Trigger manifest conflict error", func() {