From e20c08c96ddb1946685c6a4f5e5f909a39a1dd92 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 9 Jun 2026 21:50:49 +0000 Subject: [PATCH] sync: cache streaming index sub-manifests --- pkg/extensions/sync/on_demand.go | 62 +++++++++++++++-- pkg/extensions/sync/stream_manager.go | 11 +++ .../sync/stream_manager_internal_test.go | 60 ++++++++++++++++ pkg/extensions/sync/sync_internal_test.go | 68 +++++++++++++++++++ 4 files changed, 195 insertions(+), 6 deletions(-) diff --git a/pkg/extensions/sync/on_demand.go b/pkg/extensions/sync/on_demand.go index 95665ddf..6f3d01cb 100644 --- a/pkg/extensions/sync/on_demand.go +++ b/pkg/extensions/sync/on_demand.go @@ -80,29 +80,36 @@ func (onDemand *BaseOnDemand) FetchManifestForStream( return cachedManifest, nil } - var manifest manifest.Manifest + var fetchedManifest manifest.Manifest + var fetchedService Service for _, service := range onDemand.services { onDemand.log.Debug().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest") - fetchedManifest, err := service.FetchManifest(ctx, repo, reference) + man, err := service.FetchManifest(ctx, repo, reference) if err != nil { onDemand.log.Error().Err(err).Msg("failed to fetch manifest from service") continue } - manifest = fetchedManifest + + fetchedManifest = man + fetchedService = service break } - if manifest == nil { + if fetchedManifest == nil { return nil, zerr.ErrBlobNotFound } + if err := onDemand.prepareIndexManifestsForStreaming(ctx, fetchedService, repo, fetchedManifest); err != nil { + return nil, err + } + onDemand.log.Debug().Str("repo", repo).Str("reference", reference). Msg("storing image for streaming") - err := onDemand.streamManager.StoreImageForStreaming(repo, reference, manifest) + err := onDemand.streamManager.StoreImageForStreaming(repo, reference, fetchedManifest) if err != nil { onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference). Msg("failed to store manifest for streaming") @@ -119,7 +126,50 @@ func (onDemand *BaseOnDemand) FetchManifestForStream( } }() - return manifest, nil + return fetchedManifest, nil +} + +func (onDemand *BaseOnDemand) prepareIndexManifestsForStreaming( + ctx context.Context, service Service, repo string, man manifest.Manifest, +) error { + indexer, ok := man.(manifest.Indexer) + if !ok { + return nil + } + + manifestList, err := indexer.GetManifestList() + if err != nil { + return err + } + + for _, desc := range manifestList { + reference := desc.Digest.String() + + if _, ok := onDemand.streamManager.StreamingImageManifest(repo, reference); ok { + continue + } + + onDemand.log.Debug().Str("repo", repo).Str("reference", reference). + Msg("fetching index sub-manifest for streaming") + + subManifest, err := service.FetchManifest(ctx, repo, reference) + if err != nil { + onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference). + Msg("failed to fetch index sub-manifest for streaming") + + return err + } + + err = onDemand.streamManager.StoreImageForStreaming(repo, reference, subManifest) + if err != nil { + onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference). + Msg("failed to store index sub-manifest for streaming") + + return err + } + } + + return nil } func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error { diff --git a/pkg/extensions/sync/stream_manager.go b/pkg/extensions/sync/stream_manager.go index 704e049f..b2f54b80 100644 --- a/pkg/extensions/sync/stream_manager.go +++ b/pkg/extensions/sync/stream_manager.go @@ -129,6 +129,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, defer sm.streamLock.Unlock() key := repo + ":" + reference + digestKey := repo + ":" + manifest.GetDescriptor().Digest.String() if _, ok := sm.streamingRefs[key]; ok { sm.logger.Warn().Str("repo", repo).Str("reference", reference). @@ -139,6 +140,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, // populate the manifest into streamingRefs sm.streamingRefs[key] = manifest + sm.streamingRefs[digestKey] = manifest // pre-load the individual blobs into activeStreams // first, the manifest @@ -148,6 +150,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, Msg("failed to prepare active stream for blob") delete(sm.streamingRefs, key) + delete(sm.streamingRefs, digestKey) return err } @@ -167,6 +170,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, Msg("failed to get config descriptor from manifest") delete(sm.streamingRefs, key) + delete(sm.streamingRefs, digestKey) return err } @@ -176,6 +180,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).Msg("failed to prepare active stream for blob") delete(sm.streamingRefs, key) + delete(sm.streamingRefs, digestKey) return err } @@ -186,6 +191,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, sm.logger.Error().Err(err).Msg("failed to get layers from manifest") delete(sm.streamingRefs, key) + delete(sm.streamingRefs, digestKey) return err } @@ -235,6 +241,10 @@ func (sm *ChunkingStreamManager) RemoveStreamingImage(repo, reference string) { sm.logger.Warn().Str("repo", repo).Str("reference", reference). Msg("failed to cast manifest to imager, skipping removal of active streams for config and layers") + sm.waitForClientDrainAndDeleteStream(manifest.GetDescriptor().Digest.String()) + delete(sm.streamingRefs, key) + delete(sm.streamingRefs, repo+":"+manifest.GetDescriptor().Digest.String()) + return } @@ -265,6 +275,7 @@ func (sm *ChunkingStreamManager) RemoveStreamingImage(repo, reference string) { // remove the active streams for the manifest and its blobs delete(sm.streamingRefs, key) + delete(sm.streamingRefs, repo+":"+manifest.GetDescriptor().Digest.String()) sm.logger.Info().Str("repo", repo).Str("reference", reference).Msg("finished removing streaming image") } diff --git a/pkg/extensions/sync/stream_manager_internal_test.go b/pkg/extensions/sync/stream_manager_internal_test.go index 73344a38..243f04d9 100644 --- a/pkg/extensions/sync/stream_manager_internal_test.go +++ b/pkg/extensions/sync/stream_manager_internal_test.go @@ -69,6 +69,27 @@ func newTestOCIManifestWithBlobs(t *testing.T, configData, layerData []byte) rcM return m } +func newTestOCIIndexWithManifests(t *testing.T, manifests ...rcManifest.Manifest) rcManifest.Manifest { + t.Helper() + + descriptors := make([]descriptor.Descriptor, 0, len(manifests)) + for _, man := range manifests { + descriptors = append(descriptors, man.GetDescriptor()) + } + + origMan := rcOCIV1.Index{ + Versioned: rcOCIV1.IndexSchemaVersion, + Manifests: descriptors, + } + + m, err := rcManifest.New(rcManifest.WithOrig(origMan)) + if err != nil { + t.Fatalf("failed to create test OCI index: %v", err) + } + + return m +} + func TestChunkingStreamManagerConnectClient(t *testing.T) { Convey("ConnectClient", t, func() { sm := newTestChunkingStreamManager(t.TempDir()) @@ -241,6 +262,15 @@ func TestChunkingStreamManagerStreamingImageManifest(t *testing.T) { So(ok, ShouldBeTrue) So(m, ShouldEqual, manifest) }) + + Convey("returns the manifest by digest after it is stored by tag", func() { + err := sm.StoreImageForStreaming("repo", "tag", manifest) + So(err, ShouldBeNil) + + m, ok := sm.StreamingImageManifest("repo", manifest.GetDescriptor().Digest.String()) + So(ok, ShouldBeTrue) + So(m, ShouldEqual, manifest) + }) }) } @@ -284,5 +314,35 @@ func TestChunkingStreamManagerRemoveStreamingImage(t *testing.T) { _, stillHasLayer := sm.activeStreams[layerDigest] So(stillHasLayer, ShouldBeFalse) }) + + Convey("removes an index manifest without removing separately stored sub-manifests", func() { + configData := []byte("index-cfg-payload") + layerData := []byte("index-lyr-payload") + manifest := newTestOCIManifestWithBlobs(t, configData, layerData) + index := newTestOCIIndexWithManifests(t, manifest) + + err := sm.StoreImageForStreaming("myrepo", manifest.GetDescriptor().Digest.String(), manifest) + So(err, ShouldBeNil) + + err = sm.StoreImageForStreaming("myrepo", "multi", index) + So(err, ShouldBeNil) + + indexDigest := index.GetDescriptor().Digest.String() + manifestDigest := manifest.GetDescriptor().Digest.String() + + sm.RemoveStreamingImage("myrepo", "multi") + + _, found := sm.StreamingImageManifest("myrepo", "multi") + So(found, ShouldBeFalse) + + _, found = sm.StreamingImageManifest("myrepo", indexDigest) + So(found, ShouldBeFalse) + + _, stillHasIndex := sm.activeStreams[indexDigest] + So(stillHasIndex, ShouldBeFalse) + + _, found = sm.StreamingImageManifest("myrepo", manifestDigest) + So(found, ShouldBeTrue) + }) }) } diff --git a/pkg/extensions/sync/sync_internal_test.go b/pkg/extensions/sync/sync_internal_test.go index 034e72ce..6f1888a2 100644 --- a/pkg/extensions/sync/sync_internal_test.go +++ b/pkg/extensions/sync/sync_internal_test.go @@ -21,6 +21,7 @@ import ( ispec "github.com/opencontainers/image-spec/specs-go/v1" "github.com/regclient/regclient" rcBlob "github.com/regclient/regclient/types/blob" + "github.com/regclient/regclient/types/descriptor" rcManifest "github.com/regclient/regclient/types/manifest" rcOCIV1 "github.com/regclient/regclient/types/oci/v1" "github.com/regclient/regclient/types/ref" @@ -1386,6 +1387,27 @@ func newTestManifest(t *testing.T) rcManifest.Manifest { return m } +func newTestIndexManifest(t *testing.T, manifests ...rcManifest.Manifest) rcManifest.Manifest { + t.Helper() + + descriptors := make([]descriptor.Descriptor, 0, len(manifests)) + for _, man := range manifests { + descriptors = append(descriptors, man.GetDescriptor()) + } + + origMan := rcOCIV1.Index{ + Versioned: rcOCIV1.IndexSchemaVersion, + Manifests: descriptors, + } + + m, err := rcManifest.New(rcManifest.WithOrig(origMan)) + if err != nil { + t.Fatalf("failed to create test index manifest: %v", err) + } + + return m +} + func TestOnDemandSetAndGetStreamManager(t *testing.T) { Convey("StreamManager is nil before SetStreamManager is called", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) @@ -1553,6 +1575,52 @@ func TestOnDemandFetchManifestForStream(t *testing.T) { So(secondFetchCalled, ShouldBeFalse) }) + Convey("fetches and stores index sub-manifests before returning the index", t, func() { + onDemand := NewOnDemand(log.NewTestLogger()) + childManifest := newTestManifest(t) + indexManifest := newTestIndexManifest(t, childManifest) + childDigest := childManifest.GetDescriptor().Digest.String() + + stored := map[string]rcManifest.Manifest{} + sm := &mockStreamManager{ + streamingImageManifestFn: func(repo, reference string) (rcManifest.Manifest, bool) { + man, ok := stored[repo+":"+reference] + + return man, ok + }, + storeImageForStreamingFn: func(repo, reference string, m rcManifest.Manifest) error { + stored[repo+":"+reference] = m + stored[repo+":"+m.GetDescriptor().Digest.String()] = m + + return nil + }, + } + onDemand.SetStreamManager(sm) + + fetchedRefs := []string{} + svc := &mockSyncService{ + fetchManifestFn: func(_ context.Context, _, reference string) (rcManifest.Manifest, error) { + fetchedRefs = append(fetchedRefs, reference) + if reference == "latest" { + return indexManifest, nil + } + + So(reference, ShouldEqual, childDigest) + + return childManifest, nil + }, + getSyncTimeoutFn: func() time.Duration { return 5 * time.Second }, + } + onDemand.Add(svc) + + result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "latest") + So(err, ShouldBeNil) + So(result, ShouldEqual, indexManifest) + So(fetchedRefs, ShouldResemble, []string{"latest", childDigest}) + So(stored["myrepo:"+childDigest], ShouldEqual, childManifest) + So(stored["myrepo:latest"], ShouldEqual, indexManifest) + }) + Convey("returns error when StoreImageForStreaming fails", t, func() { onDemand := NewOnDemand(log.NewTestLogger()) fetched := newTestManifest(t)