feat(sync): additional changes for streaming

Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
Vishwas Rajashekar
2026-05-16 18:44:10 +05:30
parent e2aa088e0d
commit 2fb691cd3b
14 changed files with 421 additions and 110 deletions
@@ -1,7 +1,7 @@
{
"distSpecVersion": "1.1.1",
"storage": {
"rootDirectory": "./temp/zot1"
"rootDirectory": "./temp/zot"
},
"http": {
"address": "127.0.0.1",
@@ -13,6 +13,7 @@
"extensions": {
"sync": {
"enable": true,
"enableStream": true,
"registries": [
{
"urls": [
@@ -0,0 +1,30 @@
{
"distSpecVersion": "1.1.1",
"storage": {
"rootDirectory": "./temp/zotlocalbuildnormal"
},
"http": {
"address": "127.0.0.1",
"port": "8082"
},
"log": {
"level": "debug"
},
"extensions": {
"sync": {
"enable": true,
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"maxRetries": 5,
"retryDelay": "30s",
"syncTimeout": "10m"
}
]
}
}
}
+30
View File
@@ -0,0 +1,30 @@
{
"distSpecVersion": "1.1.1",
"storage": {
"rootDirectory": "./temp/zotmainlinenormal"
},
"http": {
"address": "127.0.0.1",
"port": "8081"
},
"log": {
"level": "debug"
},
"extensions": {
"sync": {
"enable": true,
"registries": [
{
"urls": [
"http://localhost:9000"
],
"onDemand": true,
"tlsVerify": false,
"maxRetries": 5,
"retryDelay": "30s",
"syncTimeout": "10m"
}
]
}
}
}
+2 -9
View File
@@ -43,7 +43,6 @@ const (
type Controller struct {
Config *config.Config
Router *mux.Router
StreamManager sync.StreamManager
MetaDB mTypes.MetaDB
StoreController storage.StoreController
Log log.Logger
@@ -377,12 +376,6 @@ func (c *Controller) Init() error {
}
}
if extensionsConfig.IsStreamingEnabled() {
c.Log.Info().Msg("streaming sync enabled")
sm := sync.NewChunkingStreamManager(c.Config, c.Log)
c.StreamManager = sm
}
return nil
}
@@ -606,8 +599,7 @@ func (c *Controller) StartBackgroundTasks() {
// Always call EnableSyncExtension to ensure proper logging, even when sync is disabled
//nolint: contextcheck
syncOnDemand, err := ext.EnableSyncExtension(
c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.StreamManager, c.Log)
syncOnDemand, err := ext.EnableSyncExtension(c.Config, c.MetaDB, c.StoreController, c.taskScheduler, c.Log)
if err != nil {
c.Log.Error().Err(err).Msg("failed to start sync extension")
}
@@ -663,4 +655,5 @@ type SyncOnDemand interface {
SyncImage(ctx context.Context, repo, reference string) error
SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error
FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error)
StreamManager() sync.StreamManager
}
+35 -10
View File
@@ -1119,15 +1119,24 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
if err != nil {
details := zerr.GetDetails(err)
if errors.Is(err, zerr.ErrBadBlobDigest) { //nolint:gocritic,dupl // errorslint conflicts with gocritic:IfElseChain
details["digest"] = digest.String()
e := apiErr.NewError(apiErr.DIGEST_INVALID).AddDetail(details)
zcommon.WriteJSON(response, http.StatusBadRequest, apiErr.NewErrorList(e))
} else if errors.Is(err, zerr.ErrRepoNotFound) {
streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response)
if streamErr == nil {
return
}
details["name"] = name
e := apiErr.NewError(apiErr.NAME_UNKNOWN).AddDetail(details)
zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e))
} else if errors.Is(err, zerr.ErrBlobNotFound) {
streamErr := rh.getBlobInfoFromStreamCache(digest.String(), response)
if streamErr == nil {
return
}
details["digest"] = digest.String()
e := apiErr.NewError(apiErr.BLOB_UNKNOWN).AddDetail(details)
zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e))
@@ -1153,6 +1162,30 @@ func (rh *RouteHandler) CheckBlob(response http.ResponseWriter, request *http.Re
response.WriteHeader(http.StatusOK)
}
func (rh *RouteHandler) getBlobInfoFromStreamCache(digest string, response http.ResponseWriter) error {
rh.c.Log.Debug().Str("digest", digest).Msg("checking stream cache for blob existence")
extConf := rh.c.Config.CopyExtensionsConfig()
if extConf.IsStreamingEnabled() {
// when streaming is enabled, the blob might exist in the stream cache
blobSize, blobMediaType, err := rh.c.SyncOnDemand.StreamManager().CachedBlobInfo(digest)
if err != nil {
rh.c.Log.Error().Err(err).Str("digest", digest).Msg("error checking stream cache for blob existence")
return err
}
blen := blobSize
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
response.Header().Set("Accept-Ranges", "bytes")
response.Header().Set("Content-Type", blobMediaType)
response.Header().Set(constants.DistContentDigestKey, digest)
response.WriteHeader(http.StatusOK)
}
return nil
}
type httpRange struct {
start int64
end int64
@@ -1463,7 +1496,7 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
if errors.Is(err, zerr.ErrRepoNotFound) || errors.Is(err, zerr.ErrBlobNotFound) {
rh.c.Log.Info().Msg("blob was not found. Connecting client to stream")
copier, err := rh.c.StreamManager.ConnectClient(digest.String(), response)
copier, err := rh.c.SyncOnDemand.StreamManager().ConnectClient(digest.String(), response)
if err != nil {
rh.c.Log.Error().Err(err).Msg("failed to connect client to stream")
response.WriteHeader(http.StatusInternalServerError)
@@ -1471,7 +1504,6 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
return
}
// TODO: handle partial
err = copier.Copy()
if err != nil {
rh.c.Log.Error().Err(err).Msg("unexpected error during stream copy")
@@ -2678,7 +2710,7 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore
extConf := routeHandler.c.Config.CopyExtensionsConfig()
// if streaming enabled, return manifest immediately, start sync in background
// if streaming enabled, return manifest immediately
if extConf.IsStreamingEnabled() {
routeHandler.c.Log.Info().Str("repository", name).Str("reference", reference).
Msg("streaming is enabled. Direct fetching manifest.")
@@ -2699,13 +2731,6 @@ func getImageManifest(ctx context.Context, routeHandler *RouteHandler, imgStore
return imgStore.GetImageManifest(name, reference)
}
go func() {
if errSync := routeHandler.c.SyncOnDemand.SyncImage(ctx, name, reference); errSync != nil {
routeHandler.c.Log.Err(errSync).Str("repository", name).Str("reference", reference).
Msg("failed to sync image")
}
}()
return content, fetchedManifest.GetDescriptor().Digest, fetchedManifest.GetDescriptor().MediaType, nil
}
+11 -2
View File
@@ -19,7 +19,7 @@ import (
)
func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
storeController storage.StoreController, sch *scheduler.Scheduler, sm sync.StreamManager, log log.Logger,
storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger,
) (*sync.BaseOnDemand, error) {
// Get extensions config safely
extensionsConfig := config.CopyExtensionsConfig()
@@ -32,6 +32,14 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
onDemand := sync.NewOnDemand(log)
syncConfig := extensionsConfig.GetSyncConfig()
var streamManager sync.StreamManager
if extensionsConfig.IsStreamingEnabled() {
log.Info().Msg("streaming sync enabled. Initializing stream manager.")
streamManager = sync.NewChunkingStreamManager(config, log)
onDemand.SetStreamManager(streamManager)
}
for _, registryConfig := range syncConfig.Registries {
if len(registryConfig.URLs) > 1 {
if err := removeSelfURLs(httpAddress, httpPort, &registryConfig, log); err != nil {
@@ -57,7 +65,8 @@ func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
// Get cluster config safely
clusterConfig := config.CopyClusterConfig()
service, err := sync.New(registryConfig, credsPath, clusterConfig, tmpDir, storeController, sm, metaDB, log)
service, err := sync.New(
registryConfig, credsPath, clusterConfig, tmpDir, storeController, streamManager, metaDB, log)
if err != nil {
log.Error().Err(err).Msg("failed to initialize sync extension")
+1 -1
View File
@@ -13,7 +13,7 @@ import (
// EnableSyncExtension ...
func EnableSyncExtension(config *config.Config, metaDB mTypes.MetaDB,
storeController storage.StoreController, sch *scheduler.Scheduler, sm sync.StreamManager, log log.Logger,
storeController storage.StoreController, sch *scheduler.Scheduler, log log.Logger,
) (*sync.BaseOnDemand, error) {
log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't include this feature," +
"please build a binary that does so")
+28 -9
View File
@@ -24,6 +24,7 @@ type ChunkedBlobReader struct {
InFlightReader *blob.BReader
clientMu sync.Mutex
clientCond *sync.Cond
chunksMu sync.RWMutex
clients map[int]chan int64
numClientsTotal int
@@ -37,13 +38,17 @@ func NewChunkedBlobReader(onDiskPath string, chunkSizeBytes int64, logger log.Lo
return nil, err
}
return &ChunkedBlobReader{
cbr := &ChunkedBlobReader{
clients: make(map[int]chan int64),
logger: logger,
onDiskPath: onDiskPath,
onDiskFile: createdFile,
chunkSizeBytes: chunkSizeBytes,
}, nil
}
cbr.clientCond = sync.NewCond(&cbr.clientMu)
return cbr, nil
}
func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64) {
@@ -55,12 +60,11 @@ func (cbr *ChunkedBlobReader) InitReader(r *blob.BReader, numChunksTotal int64)
func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
if cbr.InFlightReader == nil {
return 0, errors.New("reader not initialized")
return 0, ErrReaderNotInitialized
}
cbr.chunksMu.Lock()
// TODO: This is duplicating file IO so that the stream logic can access it easily. It would be more efficient to
// Access the file that regclient is writing to avoid this extra duplication.
var internalBuffBytes []byte = make([]byte, 0, cbr.chunkSizeBytes)
internalBuff := bytes.NewBuffer(internalBuffBytes)
@@ -71,7 +75,6 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
if err != nil {
if !errors.Is(err, io.EOF) {
cbr.logger.Error().Err(err).Msg("failed to copy from in flight reader")
// TODO: This means there was an upstream read error. Should the in-progress streams be terminated?
copy(buff, internalBuff.Bytes())
cbr.chunksMu.Unlock()
@@ -108,7 +111,10 @@ func (cbr *ChunkedBlobReader) Read(buff []byte) (int, error) {
// the client would create a subscription here with a channel where latest chunk info is sent.
func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int {
cbr.clientMu.Lock()
defer cbr.clientMu.Unlock()
defer func() {
cbr.clientCond.Broadcast()
cbr.clientMu.Unlock()
}()
cbr.clients[cbr.numClientsTotal] = channel
chanId := cbr.numClientsTotal
@@ -127,11 +133,15 @@ func (cbr *ChunkedBlobReader) Subscribe(channel chan int64) int {
return chanId
}
func (cbr *ChunkedBlobReader) Unsubscribe(id int) {
func (cbr *ChunkedBlobReader) Unsubscribe(clientId int) {
cbr.clientMu.Lock()
defer cbr.clientMu.Unlock()
defer func() {
cbr.clientCond.Broadcast()
cbr.clientMu.Unlock()
}()
delete(cbr.clients, id)
delete(cbr.clients, clientId)
cbr.numClientsTotal--
}
func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader {
@@ -141,3 +151,12 @@ func (cbr *ChunkedBlobReader) ToBReader() *blob.BReader {
blob.WithReader(cbr),
)
}
func (cbr *ChunkedBlobReader) WaitForClientEmpty() {
cbr.clientMu.Lock()
defer cbr.clientMu.Unlock()
for len(cbr.clients) > 0 {
cbr.clientCond.Wait()
}
}
+10
View File
@@ -0,0 +1,10 @@
package sync
import "errors"
var (
ErrReaderNotInitialized = errors.New("reader not initialized")
ErrManifestNotFoundOnDemandDisabl = errors.New("manifest not found in ondemand disabled")
ErrBlobNotFoundInActiveStreams = errors.New("blob not found in active streams")
ErrChunkingReaderNotInitialized = errors.New("chunking blob reader not initialized for this blob!")
)
+48 -3
View File
@@ -9,6 +9,7 @@ import (
"time"
"github.com/regclient/regclient/types/manifest"
zerr "zotregistry.dev/zot/v2/errors"
"zotregistry.dev/zot/v2/pkg/common"
"zotregistry.dev/zot/v2/pkg/log"
@@ -31,8 +32,9 @@ process just the first one, also keep track of all background retrying routines.
type BaseOnDemand struct {
services []Service
// map[request]chan err
requestStore *sync.Map
log log.Logger
requestStore *sync.Map
log log.Logger
streamManager StreamManager
}
func NewOnDemand(log log.Logger) *BaseOnDemand {
@@ -43,21 +45,64 @@ func (onDemand *BaseOnDemand) Add(service Service) {
onDemand.services = append(onDemand.services, service)
}
func (onDemand *BaseOnDemand) SetStreamManager(sm StreamManager) {
onDemand.streamManager = sm
}
func (onDemand *BaseOnDemand) StreamManager() StreamManager {
return onDemand.streamManager
}
func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) {
// An image might already be streaming in which case, just return the one in cache.
if onDemand.streamManager != nil {
manifest, ok := onDemand.streamManager.StreamingImageManifest(repo, reference)
if ok {
onDemand.log.Debug().Str("repo", repo).Str("reference", reference).
Msg("streaming manifest already present in cache.")
return manifest, nil
}
}
var manifest manifest.Manifest
for _, service := range onDemand.services {
onDemand.log.Info().Str("repo", repo).Str("ref", reference).Msg("attempting to fetch manifest")
fetchedManifest, err := service.FetchManifest(ctx, repo, reference)
if err != nil {
onDemand.log.Error().Err(err).Msg("failed to fetch manifest from service")
continue
}
manifest = fetchedManifest
break
}
if manifest == nil {
return nil, errors.New("not found")
return nil, zerr.ErrBlobNotFound
}
if onDemand.streamManager != nil {
onDemand.log.Debug().Str("repo", repo).Str("reference", reference).
Msg("storing image for streaming.")
err := onDemand.streamManager.StoreImageForStreaming(repo, reference, manifest)
if err != nil {
onDemand.log.Error().Err(err).Str("repo", repo).Str("reference", reference).
Msg("failed to store manifest for streaming")
return nil, err
}
// sync the image in the background
go func() {
if errSync := onDemand.SyncImage(ctx, repo, reference); errSync != nil {
onDemand.log.Err(errSync).Str("repository", repo).Str("reference", reference).
Msg("failed to sync image")
}
}()
}
return manifest, nil
+5 -2
View File
@@ -4,7 +4,6 @@ package sync
import (
"context"
"errors"
"github.com/regclient/regclient/types/manifest"
)
@@ -22,5 +21,9 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string,
}
func (onDemand *BaseOnDemand) FetchManifest(ctx context.Context, repo, reference string) (manifest.Manifest, error) {
return nil, errors.New("manifest not found in ondemand disabled")
return nil, ErrManifestNotFoundOnDemandDisabl
}
func (onDemand *BaseOnDemand) StreamManager() StreamManager {
return nil
}
+11 -54
View File
@@ -4,7 +4,6 @@ package sync
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
@@ -16,7 +15,6 @@ import (
"time"
godigest "github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/regclient/regclient"
"github.com/regclient/regclient/config"
"github.com/regclient/regclient/mod"
@@ -333,57 +331,6 @@ func (service *BaseService) FetchManifest(ctx context.Context, repo, reference s
return nil, err
}
// if this is being executed, it is for sure part of streaming.
// install chunked blob readers for each blob into the stream manager's cache
if m != nil {
// first for the manifest blob
err := service.streamManager.PrepareActiveStreamForBlob(m.GetDescriptor().Digest)
if err != nil {
return nil, err
}
var contents ispec.Manifest
contentBytes, err := m.RawBody()
if err != nil {
return nil, err
}
err = json.Unmarshal(contentBytes, &contents)
if err != nil {
return nil, err
}
// imager, ok := orig.(manifest.Imager)
// if !ok {
// return nil, errors.New("failed to convert to imager")
// }
// next, for config
// cfg, err := imager.GetConfig()
// if err != nil {
// return nil, err
// }
err = service.streamManager.PrepareActiveStreamForBlob(contents.Config.Digest)
if err != nil {
return nil, err
}
// finally, for all layers
// layers, err := imager.GetLayers()
// if err != nil {
// return nil, err
// }
layers := contents.Layers
for _, layer := range layers {
err = service.streamManager.PrepareActiveStreamForBlob(layer.Digest)
if err != nil {
return nil, err
}
}
}
return m, nil
}
@@ -574,7 +521,8 @@ func (service *BaseService) syncRef(ctx context.Context, localRepo string, remot
copyOpts := []regclient.ImageOpts{}
if service.streamManager != nil {
service.log.Info().Str("repo", localRepo).Str("reference", remoteImageRef.Tag).Msg("streaming is enabled. Enabling reader hook")
service.log.Info().Str("repo", localRepo).Str("reference", remoteImageRef.Tag).
Msg("streaming is enabled. Enabling reader hook")
copyOpts = append(copyOpts, regclient.ImageWithBlobReaderHook(service.streamManager.StreamingBlobReader))
}
@@ -717,6 +665,15 @@ func (service *BaseService) syncImage(ctx context.Context, localRepo, remoteRepo
// just in case there is an error before commit() which cleans up.
defer service.destination.CleanupImage(localImageRef, localRepo) //nolint: errcheck
// clears the stream cache after the sync is done in both error as well as committed cases.
defer func() {
if service.streamManager != nil {
service.log.Debug().Str("repo", localRepo).Str("reference", tag).Msg("cleaning up stream cache after sync")
// run in a goroutine as the cleanup waits for clients to drain
go service.streamManager.RemoveStreamingImage(localRepo, tag)
}
}()
// first sync image
skipped, err := service.syncRef(ctx, localRepo, remoteImageRef, localImageRef, localDigest)
if err != nil {
+204 -15
View File
@@ -1,14 +1,17 @@
package sync
import (
"errors"
"io"
"os"
"path"
"sync"
godigest "github.com/opencontainers/go-digest"
"github.com/regclient/regclient/types/blob"
"github.com/regclient/regclient/types/descriptor"
manifestpkg "github.com/regclient/regclient/types/manifest"
zerr "zotregistry.dev/zot/v2/errors"
"zotregistry.dev/zot/v2/pkg/api/config"
"zotregistry.dev/zot/v2/pkg/log"
)
@@ -16,12 +19,21 @@ import (
type StreamManager interface {
ConnectClient(blobDigest string, writer io.Writer) (*InFlightBlobCopier, error)
StreamingBlobReader(reader *blob.BReader) (*blob.BReader, error)
PrepareActiveStreamForBlob(blobDigest godigest.Digest) error
StoreImageForStreaming(repo, reference string, manifest manifestpkg.Manifest) error
StreamingImageManifest(repo, reference string) (manifestpkg.Manifest, bool)
RemoveStreamingImage(repo, reference string)
CachedBlobInfo(blobDigest string) (blen int64, mediaType string, err error)
}
type ChunkingStreamManager struct {
tempStore StreamTempStore
activeStreams map[string]*ChunkedBlobReader
tempStore StreamTempStore
// activeStreams maps blob digest to the corresponding chunked blob reader
// that is currently active and receiving data for that blob.
activeStreams map[string]*ChunkedBlobReader
// streamingRefs holds the references to the images that are currently being streamed and their corresponding manifest.
streamingRefs map[string]manifestpkg.Manifest
// blobInfo holds blobs and their corresponding descriptor.
blobInfoMap map[string]descriptor.Descriptor
logger log.Logger
streamLock sync.Mutex
chunkSizeBytes int64
@@ -34,6 +46,8 @@ func NewChunkingStreamManager(config *config.Config, logger log.Logger) *Chunkin
return &ChunkingStreamManager{
tempStore: store,
activeStreams: map[string]*ChunkedBlobReader{},
streamingRefs: map[string]manifestpkg.Manifest{},
blobInfoMap: map[string]descriptor.Descriptor{},
logger: logger,
chunkSizeBytes: *extConf.Sync.StreamChunkSizeBytes,
}
@@ -44,10 +58,9 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ
sm.streamLock.Lock()
defer sm.streamLock.Unlock()
// TODO: this can result in a race condition if the ImageCopy with Options hasn't triggered the hook yet
stream, ok := sm.activeStreams[blobDigest]
if !ok {
return nil, errors.New("blob not found in active streams")
return nil, ErrBlobNotFoundInActiveStreams
}
dig, err := godigest.Parse(blobDigest)
@@ -61,6 +74,18 @@ func (sm *ChunkingStreamManager) ConnectClient(blobDigest string, writer io.Writ
return copier, nil
}
func (sm *ChunkingStreamManager) CachedBlobInfo(blobDigest string) (int64, string, error) {
sm.streamLock.Lock()
defer sm.streamLock.Unlock()
desc, ok := sm.blobInfoMap[blobDigest]
if !ok {
return 0, "", zerr.ErrBlobNotFound
}
return desc.Size, desc.MediaType, nil
}
// StreamingBlobReader is executed inside regclient as part of the reader hook.
func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blob.BReader, error) {
sm.streamLock.Lock()
@@ -74,7 +99,7 @@ func (sm *ChunkingStreamManager) StreamingBlobReader(reader *blob.BReader) (*blo
// as the code here only supplies the reader and the chunk count
chunkingReader, ok := sm.activeStreams[digest]
if !ok {
return nil, errors.New("chunking blob reader not initialized for this blob!")
return nil, ErrChunkingReaderNotInitialized
}
chunkingReader.InitReader(reader, chunkCount(size, sm.chunkSizeBytes))
@@ -94,23 +119,187 @@ func chunkCount(blobSize int64, chunkSizeBytes int64) int64 {
return chunkCount
}
func (sm *ChunkingStreamManager) PrepareActiveStreamForBlob(blobDigest godigest.Digest) error {
sm.streamLock.Lock()
defer sm.streamLock.Unlock()
_, ok := sm.activeStreams[blobDigest.String()]
func (sm *ChunkingStreamManager) prepareActiveStreamForBlob(descriptor descriptor.Descriptor) error {
_, ok := sm.activeStreams[descriptor.Digest.String()]
if ok {
sm.logger.Warn().Str("blob", blobDigest.String()).Msg("active stream already exists for blob")
sm.logger.Warn().Str("blob", descriptor.Digest.String()).Msg("active stream already exists for blob")
return nil
}
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(blobDigest), sm.chunkSizeBytes, sm.logger)
r, err := NewChunkedBlobReader(sm.tempStore.BlobPath(descriptor.Digest), sm.chunkSizeBytes, sm.logger)
if err != nil {
return err
}
sm.activeStreams[blobDigest.String()] = r
sm.activeStreams[descriptor.Digest.String()] = r
sm.blobInfoMap[descriptor.Digest.String()] = descriptor
return nil
}
func (sm *ChunkingStreamManager) StoreImageForStreaming(repo, reference string, manifest manifestpkg.Manifest) error {
sm.streamLock.Lock()
defer sm.streamLock.Unlock()
key := repo + ":" + reference
if _, ok := sm.streamingRefs[key]; ok {
sm.logger.Warn().Str("repo", repo).Str("reference", reference).
Msg("streaming manifest already exists for repo:reference")
return nil
}
// populate the manifest into streamingRefs
sm.streamingRefs[key] = manifest
// pre-load the individual blobs into activeStreams
// first, the manifest
err := sm.prepareActiveStreamForBlob(manifest.GetDescriptor())
if err != nil {
sm.logger.Error().Err(err).Str("blob", manifest.GetDescriptor().Digest.String()).
Msg("failed to prepare active stream for blob")
return err
}
imager, ok := manifest.(manifestpkg.Imager)
if !ok {
sm.logger.Warn().Str("repo", repo).Str("reference", reference).
Msg("failed to cast manifest to imager, skipping pre-loading config and layers for streaming")
return nil
}
// then, the config blob
configDesc, err := imager.GetConfig()
if err != nil {
sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).
Msg("failed to get config descriptor from manifest")
return err
}
err = sm.prepareActiveStreamForBlob(configDesc)
if err != nil {
sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).Msg("failed to prepare active stream for blob")
return err
}
// finally, the layer blobs
layers, err := imager.GetLayers()
if err != nil {
sm.logger.Error().Err(err).Msg("failed to get layers from manifest")
return err
}
for _, layer := range layers {
err = sm.prepareActiveStreamForBlob(layer)
if err != nil {
sm.logger.Error().Err(err).Str("blob", layer.Digest.String()).Msg("failed to prepare active stream for blob")
return err
}
}
return nil
}
func (sm *ChunkingStreamManager) StreamingImageManifest(repo, reference string) (manifestpkg.Manifest, bool) {
sm.streamLock.Lock()
defer sm.streamLock.Unlock()
key := repo + ":" + reference
manifest, ok := sm.streamingRefs[key]
return manifest, ok
}
func (sm *ChunkingStreamManager) RemoveStreamingImage(repo, reference string) {
sm.streamLock.Lock()
defer sm.streamLock.Unlock()
key := repo + ":" + reference
manifest, ok := sm.streamingRefs[key]
if !ok {
sm.logger.Warn().Str("repo", repo).Str("reference", reference).
Msg("no streaming manifest found for repo:reference")
return
}
sm.logger.Info().Str("repo", repo).Str("reference", reference).Msg("removing streaming image")
imager, ok := manifest.(manifestpkg.Imager)
if !ok {
sm.logger.Warn().Str("repo", repo).Str("reference", reference).
Msg("failed to cast manifest to imager, skipping removal of active streams for config and layers")
return
}
// config blob
configDesc, err := imager.GetConfig()
if err != nil {
sm.logger.Error().Err(err).Str("blob", configDesc.Digest.String()).
Msg("failed to get config descriptor from manifest")
return
}
sm.waitForClientDrainAndDeleteStream(configDesc.Digest.String())
layers, err := imager.GetLayers()
if err != nil {
sm.logger.Error().Err(err).Msg("failed to get layers from manifest")
return
}
for _, layer := range layers {
sm.waitForClientDrainAndDeleteStream(layer.Digest.String())
}
// finally, remove the manifest
sm.waitForClientDrainAndDeleteStream(manifest.GetDescriptor().Digest.String())
// remove the active streams for the manifest and its blobs
delete(sm.streamingRefs, key)
sm.logger.Info().Str("repo", repo).Str("reference", reference).Msg("finished removing streaming image")
}
func (sm *ChunkingStreamManager) waitForClientDrainAndDeleteStream(blobDigest string) {
reader, ok := sm.activeStreams[blobDigest]
if !ok {
sm.logger.Warn().Str("blob", blobDigest).Msg("no active stream found for blob")
return
}
reader.WaitForClientEmpty()
delete(sm.activeStreams, blobDigest)
delete(sm.blobInfoMap, blobDigest)
blobPath := sm.tempStore.BlobPath(godigest.FromString(blobDigest))
_, err := os.Stat(blobPath)
if err != nil {
if os.IsNotExist(err) {
return
}
sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to stat blob in temp store")
return
}
err = os.Remove(sm.tempStore.BlobPath(godigest.FromString(blobDigest)))
if err != nil {
sm.logger.Error().Err(err).Str("blob", blobDigest).Msg("failed to remove blob from temp store")
}
}
+4 -4
View File
@@ -135,7 +135,7 @@ func TestService(t *testing.T) {
URLs: []string{"http://localhost"},
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger())
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
// Mock remote returns isConverted=true so OCI conversion would be attempted if not skipped
@@ -654,7 +654,7 @@ func TestSyncLegacyCosignTagsSyncReferrers(t *testing.T) {
SyncLegacyCosignTags: &syncLegacyFalse,
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger())
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
service.rc = regclient.New()
@@ -701,7 +701,7 @@ func TestSyncLegacyCosignTagsSyncReferrers(t *testing.T) {
SyncLegacyCosignTags: &syncLegacyTrue,
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger())
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
service.rc = regclient.New()
@@ -744,7 +744,7 @@ func TestOnDemandSyncReferrersNonRecursive(t *testing.T) {
SyncLegacyCosignTags: &syncLegacyFalse,
}
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, mocks.MetaDBMock{}, log.NewTestLogger())
service, err := New(conf, "", nil, t.TempDir(), storage.StoreController{}, nil, mocks.MetaDBMock{}, log.NewTestLogger())
So(err, ShouldBeNil)
service.rc = regclient.New()