sync: cache streaming index sub-manifests

This commit is contained in:
copilot-swe-agent[bot]
2026-06-09 21:50:49 +00:00
committed by GitHub
parent 6262cba0d0
commit e20c08c96d
4 changed files with 195 additions and 6 deletions
+56 -6
View File
@@ -80,29 +80,36 @@ func (onDemand *BaseOnDemand) FetchManifestForStream(
return cachedManifest, nil
}
var manifest manifest.Manifest
var fetchedManifest manifest.Manifest
var fetchedService Service
for _, service := range onDemand.services {
onDemand.log.Debug().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest")
fetchedManifest, err := service.FetchManifest(ctx, repo, reference)
man, err := service.FetchManifest(ctx, repo, reference)
if err != nil {
onDemand.log.Error().Err(err).Msg("failed to fetch manifest from service")
continue
}
manifest = fetchedManifest
fetchedManifest = man
fetchedService = service
break
}
if manifest == nil {
if fetchedManifest == nil {
return nil, zerr.ErrBlobNotFound
}
if err := onDemand.prepareIndexManifestsForStreaming(ctx, fetchedService, repo, fetchedManifest); err != nil {
return nil, err
}
onDemand.log.Debug().Str("repo", repo).Str("reference", reference).
Msg("storing image for streaming")
err := onDemand.streamManager.StoreImageForStreaming(repo, reference, manifest)
err := onDemand.streamManager.StoreImageForStreaming(repo, reference, fetchedManifest)
if err != nil {
onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference).
Msg("failed to store manifest for streaming")
@@ -119,7 +126,50 @@ func (onDemand *BaseOnDemand) FetchManifestForStream(
}
}()
return manifest, nil
return fetchedManifest, nil
}
func (onDemand *BaseOnDemand) prepareIndexManifestsForStreaming(
ctx context.Context, service Service, repo string, man manifest.Manifest,
) error {
indexer, ok := man.(manifest.Indexer)
if !ok {
return nil
}
manifestList, err := indexer.GetManifestList()
if err != nil {
return err
}
for _, desc := range manifestList {
reference := desc.Digest.String()
if _, ok := onDemand.streamManager.StreamingImageManifest(repo, reference); ok {
continue
}
onDemand.log.Debug().Str("repo", repo).Str("reference", reference).
Msg("fetching index sub-manifest for streaming")
subManifest, err := service.FetchManifest(ctx, repo, reference)
if err != nil {
onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference).
Msg("failed to fetch index sub-manifest for streaming")
return err
}
err = onDemand.streamManager.StoreImageForStreaming(repo, reference, subManifest)
if err != nil {
onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference).
Msg("failed to store index sub-manifest for streaming")
return err
}
}
return nil
}
func (onDemand *BaseOnDemand) SyncImage(ctx context.Context, repo, reference string) error {
+11
View File
@@ -129,6 +129,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
defer sm.streamLock.Unlock()
key := repo + ":" + reference
digestKey := repo + ":" + manifest.GetDescriptor().Digest.String()
if _, ok := sm.streamingRefs[key]; ok {
sm.logger.Warn().Str("repo", repo).Str("reference", reference).
@@ -139,6 +140,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
// populate the manifest into streamingRefs
sm.streamingRefs[key] = manifest
sm.streamingRefs[digestKey] = manifest
// pre-load the individual blobs into activeStreams
// first, the manifest
@@ -148,6 +150,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
Msg("failed to prepare active stream for blob")
delete(sm.streamingRefs, key)
delete(sm.streamingRefs, digestKey)
return err
}
@@ -167,6 +170,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
Msg("failed to get config descriptor from manifest")
delete(sm.streamingRefs, key)
delete(sm.streamingRefs, digestKey)
return err
}
@@ -176,6 +180,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).Msg("failed to prepare active stream for blob")
delete(sm.streamingRefs, key)
delete(sm.streamingRefs, digestKey)
return err
}
@@ -186,6 +191,7 @@ func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string,
sm.logger.Error().Err(err).Msg("failed to get layers from manifest")
delete(sm.streamingRefs, key)
delete(sm.streamingRefs, digestKey)
return err
}
@@ -235,6 +241,10 @@ func (sm *ChunkingStreamManager) RemoveStreamingImage(repo, reference string) {
sm.logger.Warn().Str("repo", repo).Str("reference", reference).
Msg("failed to cast manifest to imager, skipping removal of active streams for config and layers")
sm.waitForClientDrainAndDeleteStream(manifest.GetDescriptor().Digest.String())
delete(sm.streamingRefs, key)
delete(sm.streamingRefs, repo+":"+manifest.GetDescriptor().Digest.String())
return
}
@@ -265,6 +275,7 @@ func (sm *ChunkingStreamManager) RemoveStreamingImage(repo, reference string) {
// remove the active streams for the manifest and its blobs
delete(sm.streamingRefs, key)
delete(sm.streamingRefs, repo+":"+manifest.GetDescriptor().Digest.String())
sm.logger.Info().Str("repo", repo).Str("reference", reference).Msg("finished removing streaming image")
}
@@ -69,6 +69,27 @@ func newTestOCIManifestWithBlobs(t *testing.T, configData, layerData []byte) rcM
return m
}
func newTestOCIIndexWithManifests(t *testing.T, manifests ...rcManifest.Manifest) rcManifest.Manifest {
t.Helper()
descriptors := make([]descriptor.Descriptor, 0, len(manifests))
for _, man := range manifests {
descriptors = append(descriptors, man.GetDescriptor())
}
origMan := rcOCIV1.Index{
Versioned: rcOCIV1.IndexSchemaVersion,
Manifests: descriptors,
}
m, err := rcManifest.New(rcManifest.WithOrig(origMan))
if err != nil {
t.Fatalf("failed to create test OCI index: %v", err)
}
return m
}
func TestChunkingStreamManagerConnectClient(t *testing.T) {
Convey("ConnectClient", t, func() {
sm := newTestChunkingStreamManager(t.TempDir())
@@ -241,6 +262,15 @@ func TestChunkingStreamManagerStreamingImageManifest(t *testing.T) {
So(ok, ShouldBeTrue)
So(m, ShouldEqual, manifest)
})
Convey("returns the manifest by digest after it is stored by tag", func() {
err := sm.StoreImageForStreaming("repo", "tag", manifest)
So(err, ShouldBeNil)
m, ok := sm.StreamingImageManifest("repo", manifest.GetDescriptor().Digest.String())
So(ok, ShouldBeTrue)
So(m, ShouldEqual, manifest)
})
})
}
@@ -284,5 +314,35 @@ func TestChunkingStreamManagerRemoveStreamingImage(t *testing.T) {
_, stillHasLayer := sm.activeStreams[layerDigest]
So(stillHasLayer, ShouldBeFalse)
})
Convey("removes an index manifest without removing separately stored sub-manifests", func() {
configData := []byte("index-cfg-payload")
layerData := []byte("index-lyr-payload")
manifest := newTestOCIManifestWithBlobs(t, configData, layerData)
index := newTestOCIIndexWithManifests(t, manifest)
err := sm.StoreImageForStreaming("myrepo", manifest.GetDescriptor().Digest.String(), manifest)
So(err, ShouldBeNil)
err = sm.StoreImageForStreaming("myrepo", "multi", index)
So(err, ShouldBeNil)
indexDigest := index.GetDescriptor().Digest.String()
manifestDigest := manifest.GetDescriptor().Digest.String()
sm.RemoveStreamingImage("myrepo", "multi")
_, found := sm.StreamingImageManifest("myrepo", "multi")
So(found, ShouldBeFalse)
_, found = sm.StreamingImageManifest("myrepo", indexDigest)
So(found, ShouldBeFalse)
_, stillHasIndex := sm.activeStreams[indexDigest]
So(stillHasIndex, ShouldBeFalse)
_, found = sm.StreamingImageManifest("myrepo", manifestDigest)
So(found, ShouldBeTrue)
})
})
}
+68
View File
@@ -21,6 +21,7 @@ import (
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/regclient/regclient"
rcBlob "github.com/regclient/regclient/types/blob"
"github.com/regclient/regclient/types/descriptor"
rcManifest "github.com/regclient/regclient/types/manifest"
rcOCIV1 "github.com/regclient/regclient/types/oci/v1"
"github.com/regclient/regclient/types/ref"
@@ -1386,6 +1387,27 @@ func newTestManifest(t *testing.T) rcManifest.Manifest {
return m
}
func newTestIndexManifest(t *testing.T, manifests ...rcManifest.Manifest) rcManifest.Manifest {
t.Helper()
descriptors := make([]descriptor.Descriptor, 0, len(manifests))
for _, man := range manifests {
descriptors = append(descriptors, man.GetDescriptor())
}
origMan := rcOCIV1.Index{
Versioned: rcOCIV1.IndexSchemaVersion,
Manifests: descriptors,
}
m, err := rcManifest.New(rcManifest.WithOrig(origMan))
if err != nil {
t.Fatalf("failed to create test index manifest: %v", err)
}
return m
}
func TestOnDemandSetAndGetStreamManager(t *testing.T) {
Convey("StreamManager is nil before SetStreamManager is called", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
@@ -1553,6 +1575,52 @@ func TestOnDemandFetchManifestForStream(t *testing.T) {
So(secondFetchCalled, ShouldBeFalse)
})
Convey("fetches and stores index sub-manifests before returning the index", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
childManifest := newTestManifest(t)
indexManifest := newTestIndexManifest(t, childManifest)
childDigest := childManifest.GetDescriptor().Digest.String()
stored := map[string]rcManifest.Manifest{}
sm := &mockStreamManager{
streamingImageManifestFn: func(repo, reference string) (rcManifest.Manifest, bool) {
man, ok := stored[repo+":"+reference]
return man, ok
},
storeImageForStreamingFn: func(repo, reference string, m rcManifest.Manifest) error {
stored[repo+":"+reference] = m
stored[repo+":"+m.GetDescriptor().Digest.String()] = m
return nil
},
}
onDemand.SetStreamManager(sm)
fetchedRefs := []string{}
svc := &mockSyncService{
fetchManifestFn: func(_ context.Context, _, reference string) (rcManifest.Manifest, error) {
fetchedRefs = append(fetchedRefs, reference)
if reference == "latest" {
return indexManifest, nil
}
So(reference, ShouldEqual, childDigest)
return childManifest, nil
},
getSyncTimeoutFn: func() time.Duration { return 5 * time.Second },
}
onDemand.Add(svc)
result, err := onDemand.FetchManifestForStream(context.Background(), "myrepo", "latest")
So(err, ShouldBeNil)
So(result, ShouldEqual, indexManifest)
So(fetchedRefs, ShouldResemble, []string{"latest", childDigest})
So(stored["myrepo:"+childDigest], ShouldEqual, childManifest)
So(stored["myrepo:latest"], ShouldEqual, indexManifest)
})
Convey("returns error when StoreImageForStreaming fails", t, func() {
onDemand := NewOnDemand(log.NewTestLogger())
fetched := newTestManifest(t)