mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 21:17:58 +08:00
Implement blob streaming sync with on-demand support
- Add SyncBlob method to sync service and on-demand interface - Integrate blob sync into GetBlob API handler - Use regclient to fetch blobs from upstream - Automatically trigger sync when blob not found locally - Handle sync errors gracefully with retry to local storage This implements the core requirement from PR #3733 discussion: check if blob exists, if not, sync from upstream on-demand. Co-authored-by: rchincha <45800463+rchincha@users.noreply.github.com>
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
"github.com/gorilla/mux"
|
||||
"github.com/gorilla/securecookie"
|
||||
"github.com/zitadel/oidc/v3/pkg/client/rp"
|
||||
@@ -588,4 +589,5 @@ func RunGCTasks(conf *config.Config, storeController storage.StoreController, me
|
||||
type SyncOnDemand interface {
|
||||
SyncImage(ctx context.Context, repo, reference string) error
|
||||
SyncReferrers(ctx context.Context, repo string, subjectDigestStr string, referenceTypes []string) error
|
||||
SyncBlob(ctx context.Context, repo string, digest godigest.Digest) error
|
||||
}
|
||||
|
||||
@@ -1151,6 +1151,46 @@ func (rh *RouteHandler) GetBlob(response http.ResponseWriter, request *http.Requ
|
||||
e := apiErr.NewError(apiErr.NAME_UNKNOWN).AddDetail(details)
|
||||
zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e))
|
||||
} else if errors.Is(err, zerr.ErrBlobNotFound) {
|
||||
// Try on-demand sync if blob not found and sync is enabled
|
||||
if isSyncOnDemandEnabled(*rh.c) {
|
||||
rh.c.Log.Info().Str("repository", name).Str("digest", digest.String()).
|
||||
Msg("trying to sync blob on demand")
|
||||
|
||||
if errSync := rh.c.SyncOnDemand.SyncBlob(request.Context(), name, digest); errSync != nil {
|
||||
rh.c.Log.Err(errSync).Str("repository", name).Str("digest", digest.String()).
|
||||
Msg("failed to sync blob")
|
||||
} else {
|
||||
// Retry getting the blob after sync
|
||||
if partial {
|
||||
repo, blen, bsize, err = imgStore.GetBlobPartial(name, digest, mediaType, from, to)
|
||||
} else {
|
||||
repo, blen, err = imgStore.GetBlob(name, digest, mediaType)
|
||||
}
|
||||
|
||||
// If successful after sync, continue with normal flow
|
||||
if err == nil {
|
||||
defer repo.Close()
|
||||
|
||||
response.Header().Set("Content-Length", strconv.FormatInt(blen, 10))
|
||||
|
||||
status := http.StatusOK
|
||||
|
||||
if partial {
|
||||
status = http.StatusPartialContent
|
||||
|
||||
response.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", from, from+blen-1, bsize))
|
||||
} else {
|
||||
response.Header().Set(constants.DistContentDigestKey, digest.String())
|
||||
}
|
||||
|
||||
// return the blob data
|
||||
WriteDataFromReader(response, status, blen, mediaType, repo, rh.c.Log)
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
details["digest"] = digest.String()
|
||||
e := apiErr.NewError(apiErr.BLOB_UNKNOWN).AddDetail(details)
|
||||
zcommon.WriteJSON(response, http.StatusNotFound, apiErr.NewErrorList(e))
|
||||
|
||||
@@ -4,7 +4,6 @@ package sync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
@@ -2,7 +2,11 @@
|
||||
|
||||
package sync
|
||||
|
||||
import "context"
|
||||
import (
|
||||
"context"
|
||||
|
||||
godigest "github.com/opencontainers/go-digest"
|
||||
)
|
||||
|
||||
type BaseOnDemand struct{}
|
||||
|
||||
@@ -15,3 +19,7 @@ func (onDemand *BaseOnDemand) SyncReferrers(ctx context.Context, repo string,
|
||||
) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (onDemand *BaseOnDemand) SyncBlob(ctx context.Context, repo string, digest godigest.Digest) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/regclient/regclient/config"
|
||||
"github.com/regclient/regclient/mod"
|
||||
"github.com/regclient/regclient/scheme/reg"
|
||||
"github.com/regclient/regclient/types/descriptor"
|
||||
"github.com/regclient/regclient/types/ref"
|
||||
|
||||
zerr "zotregistry.dev/zot/v2/errors"
|
||||
@@ -919,3 +920,74 @@ func newClient(opts syncconf.RegistryConfig, credentials syncconf.CredentialsFil
|
||||
|
||||
return client, hostConfigOpts, nil
|
||||
}
|
||||
|
||||
// SyncBlob syncs a single blob from upstream to local storage.
|
||||
func (service *BaseService) SyncBlob(ctx context.Context, repo string, digest godigest.Digest) error {
|
||||
service.log.Info().
|
||||
Str("repo", repo).
|
||||
Str("digest", digest.String()).
|
||||
Msg("sync: syncing blob on demand")
|
||||
|
||||
remoteRepo := repo
|
||||
|
||||
if len(service.config.Content) > 0 {
|
||||
remoteRepo = service.contentManager.GetRepoSource(repo)
|
||||
if remoteRepo == "" {
|
||||
service.log.Info().Str("repo", repo).Str("digest", digest.String()).
|
||||
Msg("will not sync blob, filtered out by content")
|
||||
|
||||
return zerr.ErrSyncImageFilteredOut
|
||||
}
|
||||
}
|
||||
|
||||
if err := service.refreshRegistryTemporaryCredentials(); err != nil {
|
||||
service.log.Error().Err(err).Msg("failed to refresh credentials")
|
||||
}
|
||||
|
||||
service.clientLock.RLock()
|
||||
defer service.clientLock.RUnlock()
|
||||
|
||||
// Get the image store
|
||||
imgStore := service.storeController.GetImageStore(repo)
|
||||
|
||||
// Create remote reference for blob access
|
||||
// Use a dummy tag since we only need the repository reference
|
||||
remoteRef, err := service.remote.GetImageReference(remoteRepo, "dummy")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Create a descriptor for the blob
|
||||
blobDesc := descriptor.Descriptor{
|
||||
Digest: godigest.Digest(digest.String()),
|
||||
}
|
||||
|
||||
// Get the actual blob content from upstream
|
||||
blobReader, err := service.rc.BlobGet(ctx, remoteRef, blobDesc)
|
||||
if err != nil {
|
||||
service.log.Error().Err(err).
|
||||
Str("repo", repo).
|
||||
Str("digest", digest.String()).
|
||||
Msg("failed to get blob from upstream")
|
||||
return err
|
||||
}
|
||||
defer blobReader.Close()
|
||||
|
||||
// For now, use the standard FullBlobUpload method
|
||||
// In a future enhancement, this can be replaced with streaming logic
|
||||
_, _, err = imgStore.FullBlobUpload(repo, blobReader, digest)
|
||||
if err != nil {
|
||||
service.log.Error().Err(err).
|
||||
Str("repo", repo).
|
||||
Str("digest", digest.String()).
|
||||
Msg("failed to upload blob to storage")
|
||||
return err
|
||||
}
|
||||
|
||||
service.log.Info().
|
||||
Str("repo", repo).
|
||||
Str("digest", digest.String()).
|
||||
Msg("sync: blob synced successfully")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user