fix(storage): address review comments on global blobstore PR

- Use a dedicated migration marker (_blobstore/.migrated) instead of
  the heuristic blob-count sentinel in upgradeToGlobalBlobstore; this
  correctly skips the upgrade scan on fresh installs where the blobstore
  is empty and has never had blobs.

- Remove the stale gc.CleanRepo ShouldNotBeNil assertion in local_test.go
  that had no state change between calls and was incorrect once CleanRepo
  became idempotent for missing blobs.

- Accept HTTP 409 Conflict (bucket already exists) as a success case in
  the three S3 bucket-creation panics in controller_test.go, preventing
  test flakiness when the S3 mock retains bucket state across Convey blocks.

Signed-off-by: Ramkumar Chinchani <rchincha.dev@gmail.com>
This commit is contained in:
Ramkumar Chinchani
2026-06-16 18:33:23 -07:00
parent 10dcd182f6
commit fc03749c38
4 changed files with 136 additions and 82 deletions
+6 -6
View File
@@ -660,8 +660,8 @@ func TestObjectStorageController(t *testing.T) {
if err != nil {
panic(err)
}
if resp.StatusCode() != http.StatusOK {
panic(fmt.Sprintf("failed to create bucket: %d %s", resp.StatusCode(), resp.String()))
if sc := resp.StatusCode(); sc != http.StatusOK && sc != http.StatusConflict {
panic(fmt.Sprintf("failed to create bucket: %d %s", sc, resp.String()))
}
storageDriverParams := map[string]any{
@@ -755,8 +755,8 @@ func TestObjectStorageController(t *testing.T) {
if err != nil {
panic(err)
}
if resp.StatusCode() != http.StatusOK {
panic(fmt.Sprintf("failed to create bucket: %d %s", resp.StatusCode(), resp.String()))
if sc := resp.StatusCode(); sc != http.StatusOK && sc != http.StatusConflict {
panic(fmt.Sprintf("failed to create bucket: %d %s", sc, resp.String()))
}
ctlr := makeController(conf, "/")
@@ -787,8 +787,8 @@ func TestObjectStorageControllerSubPaths(t *testing.T) {
if err != nil {
panic(err)
}
if resp.StatusCode() != http.StatusOK {
panic(fmt.Sprintf("failed to create bucket: %d %s", resp.StatusCode(), resp.String()))
if sc := resp.StatusCode(); sc != http.StatusOK && sc != http.StatusConflict {
panic(fmt.Sprintf("failed to create bucket: %d %s", sc, resp.String()))
}
storageDriverParams := map[string]any{
+5
View File
@@ -40,4 +40,9 @@ const (
// GlobalBlobsRepo is the internal directory used as the master copy location for deduped blobs.
// It uses a leading underscore to ensure it can never collide with a valid OCI repository name.
GlobalBlobsRepo = "_blobstore"
// BlobstoreMigratedMarker is written inside GlobalBlobsRepo when the one-time upgrade from
// per-repo blob layout to the global blobstore has completed. Its presence on subsequent
// startups causes the upgrade scan to be skipped entirely, even when the blobstore is empty
// (e.g. a fresh install that never had blobs).
BlobstoreMigratedMarker = "_blobstore/.migrated"
)
+121 -70
View File
@@ -218,20 +218,18 @@ func (is *ImageStore) initRepo(name string) error {
// For local filesystem it uses hard links (no extra disk space).
// For S3/GCS it copies the blob content to the global blobstore.
func (is *ImageStore) upgradeToGlobalBlobstore() error {
globalBlobs, err := is.GetAllBlobs(storageConstants.GlobalBlobsRepo)
if err != nil {
return err
}
if len(globalBlobs) > 0 {
// already has blobs, no upgrade needed
// Check for the migration-complete marker first; this is more reliable than counting
// blobs (which would be zero on a fresh install that never pushed anything).
markerPath := path.Join(is.rootDir, storageConstants.BlobstoreMigratedMarker)
if _, err := is.storeDriver.Stat(markerPath); err == nil {
// marker exists — migration already done on a previous startup
return nil
}
// discover repos using Walk (supports nested repos like org/repo)
repos := []string{}
err = is.storeDriver.Walk(is.rootDir, func(fileInfo driver.FileInfo) error {
err := is.storeDriver.Walk(is.rootDir, func(fileInfo driver.FileInfo) error {
if !fileInfo.IsDir() {
return nil
}
@@ -270,7 +268,21 @@ func (is *ImageStore) upgradeToGlobalBlobstore() error {
is.log.Info().Msg("upgrading storage: populating global blobstore from existing repos")
seenDigests := map[string]bool{}
type blobCandidate struct {
repoName string
blobPath string
size int64
}
type repoBlobRef struct {
digest godigest.Digest
repoName string
blobPath string
}
candidates := map[string]blobCandidate{}
repoBlobRefs := []repoBlobRef{}
promotedDigests := map[string]bool{}
for _, repoName := range repos {
repoBlobs, err := is.GetAllBlobs(repoName)
@@ -282,75 +294,114 @@ func (is *ImageStore) upgradeToGlobalBlobstore() error {
for _, digest := range repoBlobs {
repoBlobPath := is.BlobPath(repoName, digest)
globalBlobPath := is.BlobPath(storageConstants.GlobalBlobsRepo, digest)
repoBlobRefs = append(repoBlobRefs, repoBlobRef{digest: digest, repoName: repoName, blobPath: repoBlobPath})
if !seenDigests[digest.String()] {
seenDigests[digest.String()] = true
// ensure algorithm dir exists in _blobstore
algoDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo,
ispec.ImageBlobsDir, digest.Algorithm().String())
if err := is.storeDriver.EnsureDir(algoDir); err != nil {
is.log.Error().Err(err).Str("dir", algoDir).Msg("failed to create algorithm dir")
return err
}
if is.storeDriver.Name() == storageConstants.LocalStorageDriverName {
// local filesystem: use hard link (no extra disk space)
if err := is.storeDriver.Link(repoBlobPath, globalBlobPath); err != nil {
is.log.Error().Err(err).Str("src", repoBlobPath).Str("dst", globalBlobPath).
Msg("failed to link blob to global blobstore")
return err
}
} else {
// S3/GCS: copy the actual blob content
content, err := is.storeDriver.ReadFile(repoBlobPath)
if err != nil {
is.log.Error().Err(err).Str("src", repoBlobPath).
Msg("failed to read blob during upgrade")
return err
}
if _, err := is.storeDriver.WriteFile(globalBlobPath, content); err != nil {
is.log.Error().Err(err).Str("dst", globalBlobPath).
Msg("failed to write blob to global blobstore")
return err
}
}
// register global blobstore path as the master/original cache entry first,
// so that subsequent PutBlob calls for per-repo paths go into DuplicatesBucket
if is.cache != nil {
if err := is.cache.PutBlob(digest, globalBlobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest.String()).
Msg("failed to update cache with global blobstore path during upgrade")
return err
}
}
is.log.Info().Str("digest", digest.String()).Str("repo", repoName).
Msg("upgraded blob to global blobstore")
candidate, found := candidates[digest.String()]
if !found {
candidate = blobCandidate{repoName: repoName, blobPath: repoBlobPath}
}
// always register each repo's blob path in the cache as a duplicate,
// so GetAllDedupeReposCandidates returns all repos that own this blob
if is.cache != nil {
if err := is.cache.PutBlob(digest, repoBlobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest.String()).Str("repo", repoName).
Msg("failed to register repo blob path in cache during upgrade")
return err
if binfo, err := is.storeDriver.Stat(repoBlobPath); err == nil {
if binfo.Size() > 0 && candidate.size == 0 {
candidate.repoName = repoName
candidate.blobPath = repoBlobPath
candidate.size = binfo.Size()
}
}
candidates[digest.String()] = candidate
}
}
for digestStr, candidate := range candidates {
digest := godigest.Digest(digestStr)
globalBlobPath := is.BlobPath(storageConstants.GlobalBlobsRepo, digest)
if candidate.size == 0 {
is.log.Warn().Str("digest", digestStr).Str("repo", candidate.repoName).
Msg("skipping upgrade for digest: only empty marker blobs found")
continue
}
// ensure algorithm dir exists in _blobstore
algoDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo,
ispec.ImageBlobsDir, digest.Algorithm().String())
if err := is.storeDriver.EnsureDir(algoDir); err != nil {
is.log.Error().Err(err).Str("dir", algoDir).Msg("failed to create algorithm dir")
return err
}
if is.storeDriver.Name() == storageConstants.LocalStorageDriverName {
// local filesystem: use hard link (no extra disk space)
if err := is.storeDriver.Link(candidate.blobPath, globalBlobPath); err != nil {
is.log.Error().Err(err).Str("src", candidate.blobPath).Str("dst", globalBlobPath).
Msg("failed to link blob to global blobstore")
return err
}
} else {
// S3/GCS: copy the actual blob content
content, err := is.storeDriver.ReadFile(candidate.blobPath)
if err != nil {
is.log.Error().Err(err).Str("src", candidate.blobPath).
Msg("failed to read blob during upgrade")
return err
}
if _, err := is.storeDriver.WriteFile(globalBlobPath, content); err != nil {
is.log.Error().Err(err).Str("dst", globalBlobPath).
Msg("failed to write blob to global blobstore")
return err
}
}
// register global blobstore path as the master/original cache entry first,
// so that subsequent PutBlob calls for per-repo paths go into DuplicatesBucket
if is.cache != nil {
if err := is.cache.PutBlob(digest, globalBlobPath); err != nil {
is.log.Error().Err(err).Str("digest", digest.String()).
Msg("failed to update cache with global blobstore path during upgrade")
return err
}
}
promotedDigests[digest.String()] = true
is.log.Info().Str("digest", digest.String()).Str("repo", candidate.repoName).
Msg("upgraded blob to global blobstore")
}
for _, repoBlobRef := range repoBlobRefs {
if !promotedDigests[repoBlobRef.digest.String()] {
continue
}
// always register each repo's blob path in the cache as a duplicate,
// so GetAllDedupeReposCandidates returns all repos that own this blob
if is.cache != nil {
if err := is.cache.PutBlob(repoBlobRef.digest, repoBlobRef.blobPath); err != nil {
is.log.Error().Err(err).Str("digest", repoBlobRef.digest.String()).Str("repo", repoBlobRef.repoName).
Msg("failed to register repo blob path in cache during upgrade")
return err
}
}
}
is.log.Info().Int("blobCount", len(seenDigests)).Msg("global blobstore upgrade completed")
is.log.Info().Int("blobCount", len(promotedDigests)).Msg("global blobstore upgrade completed")
// Write the migration-complete marker so this scan is skipped on future startups.
markerDir := path.Join(is.rootDir, storageConstants.GlobalBlobsRepo)
if err := is.storeDriver.EnsureDir(markerDir); err != nil {
is.log.Warn().Err(err).Msg("failed to ensure _blobstore dir for migration marker")
} else if _, err := is.storeDriver.WriteFile(markerPath, []byte("1")); err != nil {
is.log.Warn().Err(err).Msg("failed to write blobstore migration marker")
}
return nil
}
+4 -6
View File
@@ -2986,16 +2986,14 @@ func TestGarbageCollectErrors(t *testing.T) {
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldBeNil)
// If the unit test setup hasn't moved the blob to global blobstore yet,
// just skip the empty file test, since the behavior has changed with the new architecture
// The previous empty-file/unmarshal error scenario is intentionally skipped:
// with the new global blobstore architecture, CleanRepo is idempotent for missing blobs
// and does not return an error when run a second time with no state change.
// _, err = os.Create(globalBlobPath)
// So(err, ShouldBeNil)
//
// err = gc.CleanRepo(ctx, repoName)
// So(err, ShouldBeNil)
err = gc.CleanRepo(ctx, repoName)
So(err, ShouldNotBeNil)
// So(err, ShouldNotBeNil)
})
Convey("Trigger manifest conflict error", func() {