feat(repodb): sync-repodb WIP (#1241)

Signed-off-by: Laurentiu Niculae <niculae.laurentiu1@gmail.com>
This commit is contained in:
LaurentiuNiculae
2023-03-09 20:41:48 +02:00
committed by GitHub
parent fd5a2af10b
commit 4c156234cb
30 changed files with 652 additions and 315 deletions
+6 -5
View File
@@ -10,14 +10,15 @@ import (
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/extensions/sync"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb"
"zotregistry.io/zot/pkg/storage"
)
func EnableSyncExtension(ctx context.Context, config *config.Config, wg *goSync.WaitGroup,
storeController storage.StoreController, log log.Logger,
repoDB repodb.RepoDB, storeController storage.StoreController, log log.Logger,
) {
if config.Extensions.Sync != nil && *config.Extensions.Sync.Enable {
if err := sync.Run(ctx, *config.Extensions.Sync, storeController, wg, log); err != nil {
if err := sync.Run(ctx, *config.Extensions.Sync, repoDB, storeController, wg, log); err != nil {
log.Error().Err(err).Msg("Error encountered while setting up syncing")
}
} else {
@@ -25,12 +26,12 @@ func EnableSyncExtension(ctx context.Context, config *config.Config, wg *goSync.
}
}
func SyncOneImage(ctx context.Context, config *config.Config, storeController storage.StoreController,
repoName, reference string, artifactType string, log log.Logger,
func SyncOneImage(ctx context.Context, config *config.Config, repoDB repodb.RepoDB,
storeController storage.StoreController, repoName, reference string, artifactType string, log log.Logger,
) error {
log.Info().Msgf("syncing image %s:%s", repoName, reference)
err := sync.OneImage(ctx, *config.Extensions.Sync, storeController, repoName, reference, artifactType, log)
err := sync.OneImage(ctx, *config.Extensions.Sync, repoDB, storeController, repoName, reference, artifactType, log)
return err
}
+4 -3
View File
@@ -9,12 +9,13 @@ import (
"zotregistry.io/zot/pkg/api/config"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb"
"zotregistry.io/zot/pkg/storage"
)
// EnableSyncExtension ...
func EnableSyncExtension(ctx context.Context,
config *config.Config, wg *goSync.WaitGroup,
config *config.Config, wg *goSync.WaitGroup, repoDB repodb.RepoDB,
storeController storage.StoreController, log log.Logger,
) {
log.Warn().Msg("skipping enabling sync extension because given zot binary doesn't include this feature," +
@@ -22,8 +23,8 @@ func EnableSyncExtension(ctx context.Context,
}
// SyncOneImage ...
func SyncOneImage(ctx context.Context, config *config.Config, storeController storage.StoreController,
repoName, reference string, artifactType string, log log.Logger,
func SyncOneImage(ctx context.Context, config *config.Config, repoDB repodb.RepoDB,
storeController storage.StoreController, repoName, reference string, artifactType string, log log.Logger,
) error {
log.Warn().Msg("skipping syncing on demand because given zot binary doesn't include this feature," +
"please build a binary that does so")
+2 -2
View File
@@ -4588,9 +4588,9 @@ func TestRepoDBWhenPushingImages(t *testing.T) {
So(err, ShouldNotBeNil)
})
Convey("SetManifestMeta succeeds but SetRepoTag fails", func() {
Convey("SetManifestMeta succeeds but SetRepoReference fails", func() {
ctlr.RepoDB = mocks.RepoDBMock{
SetRepoTagFn: func(repo, tag string, manifestDigest godigest.Digest, mediaType string) error {
SetRepoReferenceFn: func(repo, reference string, manifestDigest godigest.Digest, mediaType string) error {
return ErrTestError
},
}
@@ -57,7 +57,7 @@ func TestConvertErrors(t *testing.T) {
digest11 := godigest.FromString("abc1")
err = repoDB.SetManifestMeta("repo1", digest11, repoMeta11)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "0.1.0", digest11, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "0.1.0", digest11, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
repoMetas, manifestMetaMap, _, _, err := repoDB.SearchRepos(context.Background(), "", repodb.Filter{},
+10 -10
View File
@@ -755,7 +755,7 @@ func TestCVEStruct(t *testing.T) {
digest11 := godigest.FromBytes(manifestBlob11)
err = repoDB.SetManifestMeta("repo1", digest11, repoMeta11)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "0.1.0", digest11, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "0.1.0", digest11, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
timeStamp12 := time.Date(2009, 1, 1, 12, 0, 0, 0, time.UTC)
@@ -791,7 +791,7 @@ func TestCVEStruct(t *testing.T) {
digest12 := godigest.FromBytes(manifestBlob12)
err = repoDB.SetManifestMeta("repo1", digest12, repoMeta12)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "1.0.0", digest12, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "1.0.0", digest12, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
timeStamp13 := time.Date(2010, 1, 1, 12, 0, 0, 0, time.UTC)
@@ -825,7 +825,7 @@ func TestCVEStruct(t *testing.T) {
digest13 := godigest.FromBytes(manifestBlob13)
err = repoDB.SetManifestMeta("repo1", digest13, repoMeta13)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "1.1.0", digest13, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "1.1.0", digest13, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
timeStamp14 := time.Date(2011, 1, 1, 12, 0, 0, 0, time.UTC)
@@ -859,7 +859,7 @@ func TestCVEStruct(t *testing.T) {
digest14 := godigest.FromBytes(manifestBlob14)
err = repoDB.SetManifestMeta("repo1", digest14, repoMeta14)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "1.0.1", digest14, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "1.0.1", digest14, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
// Create repodb data for scannable image with no vulnerabilities
@@ -894,7 +894,7 @@ func TestCVEStruct(t *testing.T) {
digest61 := godigest.FromBytes(manifestBlob61)
err = repoDB.SetManifestMeta("repo6", digest61, repoMeta61)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo6", "1.0.0", digest61, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo6", "1.0.0", digest61, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
// Create repodb data for image not supporting scanning
@@ -929,7 +929,7 @@ func TestCVEStruct(t *testing.T) {
digest21 := godigest.FromBytes(manifestBlob21)
err = repoDB.SetManifestMeta("repo2", digest21, repoMeta21)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo2", "1.0.0", digest21, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo2", "1.0.0", digest21, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
// Create repodb data for invalid images/negative tests
@@ -943,7 +943,7 @@ func TestCVEStruct(t *testing.T) {
digest31 := godigest.FromBytes(manifestBlob31)
err = repoDB.SetManifestMeta("repo3", digest31, repoMeta31)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo3", "invalid-manifest", digest31, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo3", "invalid-manifest", digest31, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
configBlob41 := []byte("invalid config blob")
@@ -956,11 +956,11 @@ func TestCVEStruct(t *testing.T) {
digest41 := godigest.FromString("abc7")
err = repoDB.SetManifestMeta("repo4", digest41, repoMeta41)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo4", "invalid-config", digest41, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo4", "invalid-config", digest41, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
digest51 := godigest.FromString("abc8")
err = repoDB.SetRepoTag("repo5", "nonexitent-manifest", digest51, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo5", "nonexitent-manifest", digest51, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
// ------ Multiarch image
@@ -997,7 +997,7 @@ func TestCVEStruct(t *testing.T) {
})
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repoIndex", "tagIndex", indexDigest, ispec.MediaTypeImageIndex)
err = repoDB.SetRepoReference("repoIndex", "tagIndex", indexDigest, ispec.MediaTypeImageIndex)
So(err, ShouldBeNil)
// RepoDB loaded with initial data, mock the scanner
+2 -2
View File
@@ -61,7 +61,7 @@ func TestCVEPagination(t *testing.T) {
digest11 := godigest.FromBytes(manifestBlob11)
err = repoDB.SetManifestMeta("repo1", digest11, repoMeta11)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "0.1.0", digest11, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "0.1.0", digest11, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
timeStamp12 := time.Date(2009, 1, 1, 12, 0, 0, 0, time.UTC)
@@ -95,7 +95,7 @@ func TestCVEPagination(t *testing.T) {
digest12 := godigest.FromBytes(manifestBlob12)
err = repoDB.SetManifestMeta("repo1", digest12, repoMeta12)
So(err, ShouldBeNil)
err = repoDB.SetRepoTag("repo1", "1.0.0", digest12, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "1.0.0", digest12, ispec.MediaTypeImageManifest)
So(err, ShouldBeNil)
// RepoDB loaded with initial data, mock the scanner
@@ -273,7 +273,7 @@ func TestImageScannable(t *testing.T) {
panic(err)
}
err = repoDB.SetRepoTag("repo1", "valid", digestValidManifest, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "valid", digestValidManifest, ispec.MediaTypeImageManifest)
if err != nil {
panic(err)
}
@@ -309,7 +309,8 @@ func TestImageScannable(t *testing.T) {
panic(err)
}
err = repoDB.SetRepoTag("repo1", "unscannable-layer", digestManifestUnscannableLayer, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "unscannable-layer", digestManifestUnscannableLayer,
ispec.MediaTypeImageManifest)
if err != nil {
panic(err)
}
@@ -328,7 +329,7 @@ func TestImageScannable(t *testing.T) {
panic(err)
}
err = repoDB.SetRepoTag("repo1", "unmarshable", digestUnmarshableManifest, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "unmarshable", digestUnmarshableManifest, ispec.MediaTypeImageManifest)
if err != nil {
panic(err)
}
@@ -336,13 +337,13 @@ func TestImageScannable(t *testing.T) {
// Manifest meta cannot be found
digestMissingManifest := godigest.FromBytes([]byte("Some other string"))
err = repoDB.SetRepoTag("repo1", "missing", digestMissingManifest, ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "missing", digestMissingManifest, ispec.MediaTypeImageManifest)
if err != nil {
panic(err)
}
// RepoMeta contains invalid digest
err = repoDB.SetRepoTag("repo1", "invalid-digest", "invalid", ispec.MediaTypeImageManifest)
err = repoDB.SetRepoReference("repo1", "invalid-digest", "invalid", ispec.MediaTypeImageManifest)
if err != nil {
panic(err)
}
+1 -1
View File
@@ -2020,7 +2020,7 @@ func TestCVEResolvers(t *testing.T) { //nolint:gocyclo
for image, digest := range tagsMap {
repo, tag := common.GetImageDirAndTag(image)
err := repoDB.SetRepoTag(repo, tag, digest, ispec.MediaTypeImageManifest)
err := repoDB.SetRepoReference(repo, tag, digest, ispec.MediaTypeImageManifest)
if err != nil {
panic(err)
}
+8 -6
View File
@@ -16,6 +16,7 @@ import (
"zotregistry.io/zot/pkg/common"
syncconf "zotregistry.io/zot/pkg/extensions/config/sync"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb"
"zotregistry.io/zot/pkg/storage"
)
@@ -59,8 +60,8 @@ func (di *demandedImages) delete(key string) {
di.syncedMap.Delete(key)
}
func OneImage(ctx context.Context, cfg syncconf.Config, storeController storage.StoreController,
repo, reference string, artifactType string, log log.Logger,
func OneImage(ctx context.Context, cfg syncconf.Config, repoDB repodb.RepoDB,
storeController storage.StoreController, repo, reference string, artifactType string, log log.Logger,
) error {
// guard against multiple parallel requests
demandedImage := fmt.Sprintf("%s:%s", repo, reference)
@@ -82,7 +83,7 @@ func OneImage(ctx context.Context, cfg syncconf.Config, storeController storage.
defer demandedImgs.delete(demandedImage)
defer close(imageChannel)
go syncOneImage(ctx, imageChannel, cfg, storeController, repo, reference, artifactType, log)
go syncOneImage(ctx, imageChannel, cfg, repoDB, storeController, repo, reference, artifactType, log)
err, ok := <-imageChannel
if !ok {
@@ -92,8 +93,8 @@ func OneImage(ctx context.Context, cfg syncconf.Config, storeController storage.
return err
}
func syncOneImage(ctx context.Context, imageChannel chan error,
cfg syncconf.Config, storeController storage.StoreController,
func syncOneImage(ctx context.Context, imageChannel chan error, cfg syncconf.Config,
repoDB repodb.RepoDB, storeController storage.StoreController,
localRepo, reference string, artifactType string, log log.Logger,
) {
var credentialsFile syncconf.CredentialsFile
@@ -180,7 +181,8 @@ func syncOneImage(ctx context.Context, imageChannel chan error,
return
}
sig := newSignaturesCopier(httpClient, credentialsFile[upstreamAddr], *registryURL, storeController, log)
sig := newSignaturesCopier(httpClient, credentialsFile[upstreamAddr], *registryURL, repoDB,
storeController, log)
upstreamCtx := getUpstreamContext(&regCfg, credentialsFile[upstreamAddr])
options := getCopyOptions(upstreamCtx, localCtx)
+66 -11
View File
@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"net/url"
"path"
@@ -20,25 +21,28 @@ import (
"zotregistry.io/zot/pkg/common"
syncconf "zotregistry.io/zot/pkg/extensions/config/sync"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb"
"zotregistry.io/zot/pkg/storage"
)
type signaturesCopier struct {
client *http.Client
upstreamURL url.URL
storeController storage.StoreController
credentials syncconf.Credentials
repoDB repodb.RepoDB
storeController storage.StoreController
log log.Logger
}
func newSignaturesCopier(httpClient *http.Client, credentials syncconf.Credentials,
upstreamURL url.URL,
upstreamURL url.URL, repoDB repodb.RepoDB,
storeController storage.StoreController, log log.Logger,
) *signaturesCopier {
return &signaturesCopier{
client: httpClient,
credentials: credentials,
upstreamURL: upstreamURL,
repoDB: repoDB,
storeController: storeController,
log: log,
}
@@ -185,7 +189,18 @@ func (sig *signaturesCopier) syncCosignSignature(localRepo, remoteRepo, digestSt
return err
}
sig.log.Info().Msgf("successfully synced cosign signature for repo %s digest %s", localRepo, digestStr)
if sig.repoDB != nil {
sig.log.Debug().Msgf("trying to sync cosign signature for repo %s digest %s", localRepo, digestStr)
err = repodb.SetMetadataFromInput(localRepo, cosignTag, ispec.MediaTypeImageManifest,
godigest.FromBytes(cosignManifestBuf), cosignManifestBuf, sig.storeController.GetImageStore(localRepo),
sig.repoDB, sig.log)
if err != nil {
return fmt.Errorf("failed to set metadata for cosign signature '%s@%s': %w", localRepo, digestStr, err)
}
sig.log.Info().Msgf("successfully added cosign signature to RepoDB for repo %s digest %s", localRepo, digestStr)
}
return nil
}
@@ -249,6 +264,19 @@ func (sig *signaturesCopier) syncORASRefs(localRepo, remoteRepo, digestStr strin
return err
}
// this is for notation signatures
if sig.repoDB != nil {
sig.log.Debug().Msgf("trying to sync oras artifact for repo %s digest %s", localRepo, digestStr)
err = repodb.SetMetadataFromInput(localRepo, ref.Digest.String(), ref.MediaType,
ref.Digest, body, sig.storeController.GetImageStore(localRepo), sig.repoDB, sig.log)
if err != nil {
return fmt.Errorf("failed to set metadata for oras artifact '%s@%s': %w", localRepo, digestStr, err)
}
sig.log.Info().Msgf("successfully added oras artifacts to RepoDB for repo %s digest %s", localRepo, digestStr)
}
}
sig.log.Info().Msgf("successfully synced ORAS artifacts for repo %s digest %s", localRepo, digestStr)
@@ -283,7 +311,7 @@ func (sig *signaturesCopier) syncOCIRefs(localRepo, remoteRepo, digestStr string
var artifactManifest oras.Manifest
body, statusCode, err := common.MakeHTTPGetRequest(sig.client, sig.credentials.Username,
OCIRefBody, statusCode, err := common.MakeHTTPGetRequest(sig.client, sig.credentials.Username,
sig.credentials.Password, &artifactManifest,
getRefManifestURL.String(), ref.MediaType, sig.log)
if err != nil {
@@ -304,7 +332,7 @@ func (sig *signaturesCopier) syncOCIRefs(localRepo, remoteRepo, digestStr string
// read manifest
var manifest ispec.Manifest
err = json.Unmarshal(body, &manifest)
err = json.Unmarshal(OCIRefBody, &manifest)
if err != nil {
sig.log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't unmarshal oci reference manifest: %s", getRefManifestURL.String())
@@ -326,7 +354,7 @@ func (sig *signaturesCopier) syncOCIRefs(localRepo, remoteRepo, digestStr string
// read manifest
var manifest ispec.Artifact
err = json.Unmarshal(body, &manifest)
err = json.Unmarshal(OCIRefBody, &manifest)
if err != nil {
sig.log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't unmarshal oci reference manifest: %s", getRefManifestURL.String())
@@ -341,17 +369,30 @@ func (sig *signaturesCopier) syncOCIRefs(localRepo, remoteRepo, digestStr string
}
}
_, err = imageStore.PutImageManifest(localRepo, ref.Digest.String(),
ref.MediaType, body)
digest, err := imageStore.PutImageManifest(localRepo, ref.Digest.String(),
ref.MediaType, OCIRefBody)
if err != nil {
sig.log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("couldn't upload oci reference manifest")
return err
}
if sig.repoDB != nil {
sig.log.Debug().Msgf("trying to add OCI refs for repo %s digest %s", localRepo, digestStr)
err = repodb.SetMetadataFromInput(localRepo, digestStr, ref.MediaType,
digest, OCIRefBody, sig.storeController.GetImageStore(localRepo),
sig.repoDB, sig.log)
if err != nil {
return fmt.Errorf("failed to set metadata for OCI ref in '%s@%s': %w", localRepo, digestStr, err)
}
sig.log.Info().Msgf("successfully added OCI refs to RepoDB for repo %s digest %s", localRepo, digestStr)
}
}
sig.log.Info().Msgf("successfully synced oci references for repo %s digest %s", localRepo, digestStr)
sig.log.Info().Msgf("successfully synced OCI refs for repo %s digest %s", localRepo, digestStr)
return nil
}
@@ -397,7 +438,7 @@ func (sig *signaturesCopier) syncOCIArtifact(localRepo, remoteRepo, reference st
}
// push manifest
_, err = imageStore.PutImageManifest(localRepo, reference,
digest, err := imageStore.PutImageManifest(localRepo, reference,
ispec.MediaTypeArtifactManifest, artifactManifestBuf)
if err != nil {
sig.log.Error().Str("errorType", common.TypeOf(err)).
@@ -406,6 +447,20 @@ func (sig *signaturesCopier) syncOCIArtifact(localRepo, remoteRepo, reference st
return err
}
if sig.repoDB != nil {
sig.log.Debug().Msgf("trying to OCI refs for repo %s digest %s", localRepo, digest.String())
err = repodb.SetMetadataFromInput(localRepo, reference, ispec.MediaTypeArtifactManifest,
digest, artifactManifestBuf, sig.storeController.GetImageStore(localRepo),
sig.repoDB, sig.log)
if err != nil {
return fmt.Errorf("failed to set metadata for OCI Artifact '%s@%s': %w", localRepo, digest.String(), err)
}
sig.log.Info().Msgf("successfully added oci artifacts to RepoDB for repo %s digest %s", localRepo,
digest.String())
}
sig.log.Info().Msgf("successfully synced OCI artifact for repo %s tag %s", localRepo, reference)
return nil
@@ -450,7 +505,7 @@ func (sig *signaturesCopier) canSkipOCIArtifact(localRepo, reference string, art
localArtifactBuf, _, _, err := imageStore.GetImageManifest(localRepo, reference)
if err != nil {
if errors.Is(err, zerr.ErrManifestNotFound) {
if errors.Is(err, zerr.ErrManifestNotFound) || errors.Is(err, zerr.ErrRepoNotFound) {
return false, nil
}
+6 -6
View File
@@ -23,6 +23,7 @@ import (
"zotregistry.io/zot/pkg/common"
syncconf "zotregistry.io/zot/pkg/extensions/config/sync"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test"
)
@@ -165,7 +166,7 @@ func getUpstreamContext(regCfg *syncconf.RegistryConfig, credentials syncconf.Cr
//nolint:gocyclo // offloading some of the functionalities from here would make the code harder to follow
func syncRegistry(ctx context.Context, regCfg syncconf.RegistryConfig,
upstreamURL string,
upstreamURL string, repoDB repodb.RepoDB,
storeController storage.StoreController, localCtx *types.SystemContext,
policyCtx *signature.PolicyContext, credentials syncconf.Credentials,
retryOptions *retry.RetryOptions, log log.Logger,
@@ -246,7 +247,7 @@ func syncRegistry(ctx context.Context, regCfg syncconf.RegistryConfig,
}
}
sig := newSignaturesCopier(httpClient, credentials, *registryURL, storeController, log)
sig := newSignaturesCopier(httpClient, credentials, *registryURL, repoDB, storeController, log)
for _, repoReference := range reposReferences {
upstreamRepo := repoReference.name
@@ -323,9 +324,8 @@ func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyCo
return localCtx, policyContext, nil
}
func Run(ctx context.Context, cfg syncconf.Config,
storeController storage.StoreController,
wtgrp *goSync.WaitGroup, logger log.Logger,
func Run(ctx context.Context, cfg syncconf.Config, repoDB repodb.RepoDB,
storeController storage.StoreController, wtgrp *goSync.WaitGroup, logger log.Logger,
) error {
var credentialsFile syncconf.CredentialsFile
@@ -382,7 +382,7 @@ func Run(ctx context.Context, cfg syncconf.Config,
for _, upstreamURL := range regCfg.URLs {
upstreamAddr := StripRegistryTransport(upstreamURL)
// first try syncing main registry
if err := syncRegistry(ctx, regCfg, upstreamURL, storeController, localCtx, policyCtx,
if err := syncRegistry(ctx, regCfg, upstreamURL, repoDB, storeController, localCtx, policyCtx,
credentialsFile[upstreamAddr], retryOptions, logger); err != nil {
logger.Error().Str("errortype", common.TypeOf(err)).
Err(err).Str("registry", upstreamURL).
+51 -15
View File
@@ -40,6 +40,8 @@ const (
host = "127.0.0.1:45117"
)
var ErrTestError = fmt.Errorf("testError")
func TestInjectSyncUtils(t *testing.T) {
Convey("Inject errors in utils functions", t, func() {
repositoryReference := fmt.Sprintf("%s/%s", host, testImage)
@@ -154,7 +156,7 @@ func TestSyncInternal(t *testing.T) {
}
ctx := context.Background()
So(Run(ctx, cfg, storage.StoreController{},
So(Run(ctx, cfg, mocks.RepoDBMock{}, storage.StoreController{},
new(goSync.WaitGroup), log.NewLogger("debug", "")), ShouldNotBeNil)
_, err = getFileCredentials("/invalid/path/to/file")
@@ -190,7 +192,7 @@ func TestSyncInternal(t *testing.T) {
localCtx, policyCtx, err := getLocalContexts(log)
So(err, ShouldBeNil)
err = syncRegistry(ctx, syncRegistryConfig, "randomUpstreamURL",
err = syncRegistry(ctx, syncRegistryConfig, "randomUpstreamURL", mocks.RepoDBMock{},
storage.StoreController{DefaultStore: imageStore}, localCtx, policyCtx,
syncconf.Credentials{}, nil, log)
So(err, ShouldNotBeNil)
@@ -378,9 +380,10 @@ func TestSyncInternal(t *testing.T) {
imageStore := local.NewImageStore(t.TempDir(), false, storage.DefaultGCDelay,
false, false, log, metrics, nil, nil,
)
mockRepoDB := mocks.RepoDBMock{}
sig := newSignaturesCopier(client, syncconf.Credentials{},
*regURL, storage.StoreController{DefaultStore: imageStore}, log)
*regURL, mockRepoDB, storage.StoreController{DefaultStore: imageStore}, log)
err = sig.syncCosignSignature(testImage, testImage, testImageTag, &ispec.Manifest{})
So(err, ShouldNotBeNil)
@@ -430,8 +433,9 @@ func TestSyncInternal(t *testing.T) {
So(regURL, ShouldNotBeNil)
client := &http.Client{}
mockRepoDB := mocks.RepoDBMock{}
sig := newSignaturesCopier(client, syncconf.Credentials{},
*regURL, storage.StoreController{DefaultStore: imageStore}, log)
*regURL, mockRepoDB, storage.StoreController{DefaultStore: imageStore}, log)
canBeSkipped, err = sig.canSkipOCIRefs(testImage, testImageManifestDigest.String(), refs)
So(err, ShouldBeNil)
@@ -596,7 +600,7 @@ func TestSyncInternal(t *testing.T) {
testRootDir := path.Join(imageStore.RootDir(), testImage, SyncBlobUploadDir)
// testImagePath := path.Join(testRootDir, testImage)
err := pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
err := pushSyncedLocalImage(testImage, testImageTag, testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
err = os.MkdirAll(testRootDir, 0o755)
@@ -663,7 +667,7 @@ func TestSyncInternal(t *testing.T) {
_, err = testImageStore.PutImageManifest(repo, "latest", ispec.MediaTypeImageIndex, content)
So(err, ShouldBeNil)
err = pushSyncedLocalImage(repo, "latest", testRootDir, imageStore, log)
err = pushSyncedLocalImage(repo, "latest", testRootDir, nil, imageStore, log)
So(err, ShouldBeNil)
// trigger error on manifest pull
@@ -671,7 +675,7 @@ func TestSyncInternal(t *testing.T) {
index.Manifests[0].Digest.Algorithm().String(), index.Manifests[0].Digest.Encoded()), 0o000)
So(err, ShouldBeNil)
err = pushSyncedLocalImage(repo, "latest", testRootDir, imageStore, log)
err = pushSyncedLocalImage(repo, "latest", testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
err = os.Chmod(path.Join(testRootDir, repo, "blobs",
@@ -687,7 +691,7 @@ func TestSyncInternal(t *testing.T) {
}, nil,
)
err = pushSyncedLocalImage(repo, "latest", testRootDir, imageStoreWithLinter, log)
err = pushSyncedLocalImage(repo, "latest", testRootDir, nil, imageStoreWithLinter, log)
// linter error will be ignored by sync
So(err, ShouldBeNil)
@@ -711,7 +715,7 @@ func TestSyncInternal(t *testing.T) {
manifest.Config.Digest.Algorithm().String(), manifest.Config.Digest.Encoded()))
So(err, ShouldBeNil)
err = pushSyncedLocalImage(repo, "latest", testRootDir, imageStore, log)
err = pushSyncedLocalImage(repo, "latest", testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
err = os.Chmod(configBlobPath, local.DefaultDirPerms)
@@ -729,13 +733,45 @@ func TestSyncInternal(t *testing.T) {
err = os.MkdirAll(indexManifestPath, 0o000)
So(err, ShouldBeNil)
err = pushSyncedLocalImage(repo, "latest", testRootDir, imageStore, log)
err = pushSyncedLocalImage(repo, "latest", testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
err = os.Remove(indexManifestPath)
So(err, ShouldBeNil)
})
Convey("RepoDB Errors", func() {
multiArch, err := test.GetRandomMultiarchImage("bad-repodb-tag")
So(err, ShouldBeNil)
err = test.WriteMultiArchImageToFileSystem(multiArch, "repo", storage.StoreController{
DefaultStore: testImageStore,
})
So(err, ShouldBeNil)
// copyManifest Errors
err = pushSyncedLocalImage("repo", "bad-repodb-tag", testRootDir,
mocks.RepoDBMock{
SetRepoReferenceFn: func(repo, Reference string, manifestDigest godigest.Digest, mediaType string) error {
return ErrTestError
},
}, imageStore, log)
So(err, ShouldNotBeNil)
// SetMetadataFromInput
err = pushSyncedLocalImage("repo", "bad-repodb-tag", testRootDir,
mocks.RepoDBMock{
SetRepoReferenceFn: func(repo, Reference string, manifestDigest godigest.Digest, mediaType string) error {
if Reference == "bad-repodb-tag" {
return ErrTestError
}
return nil
},
}, imageStore, log)
So(err, ShouldNotBeNil)
})
Convey("manifest image errors", func() {
var manifest ispec.Manifest
@@ -749,7 +785,7 @@ func TestSyncInternal(t *testing.T) {
if os.Geteuid() != 0 {
So(func() {
_ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
_ = pushSyncedLocalImage(testImage, testImageTag, testRootDir, nil, imageStore, log)
}, ShouldPanic)
}
@@ -762,7 +798,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
if err := os.Chmod(path.Join(testRootDir, testImage, "blobs", "sha256",
@@ -776,7 +812,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
if err := os.Chmod(cachedManifestConfigPath, 0o755); err != nil {
@@ -803,7 +839,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
manifest.Config.Digest = configDigestBackup
@@ -828,7 +864,7 @@ func TestSyncInternal(t *testing.T) {
panic(err)
}
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, imageStore, log)
err = pushSyncedLocalImage(testImage, testImageTag, testRootDir, nil, imageStore, log)
So(err, ShouldNotBeNil)
})
})
+215 -6
View File
@@ -46,6 +46,7 @@ import (
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
"zotregistry.io/zot/pkg/test"
"zotregistry.io/zot/pkg/test/mocks"
)
const (
@@ -137,6 +138,12 @@ func startUpstreamServer(
srcConfig.Storage.RootDirectory = srcDir
defVal := true
srcConfig.Extensions = &extconf.ExtensionConfig{}
srcConfig.Extensions.Search = &extconf.SearchConfig{
BaseConfig: extconf.BaseConfig{Enable: &defVal},
}
sctlr := api.NewController(srcConfig)
scm := test.NewControllerManager(sctlr)
@@ -193,7 +200,10 @@ func startDownstreamServer(
destConfig.Storage.GC = false
destConfig.Extensions = &extconf.ExtensionConfig{}
destConfig.Extensions.Search = nil
defVal := true
destConfig.Extensions.Search = &extconf.SearchConfig{
BaseConfig: extconf.BaseConfig{Enable: &defVal},
}
destConfig.Extensions.Sync = syncConfig
dctlr := api.NewController(destConfig)
@@ -627,6 +637,186 @@ func TestOnDemand(t *testing.T) {
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 500)
})
Convey("Sync on Demand errors", t, func() {
Convey("Signature copier errors", func() {
// start upstream server
rootDir := t.TempDir()
port := test.GetFreePort()
srcBaseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
conf.Storage.GC = false
ctlr := api.NewController(conf)
ctlr.Config.Storage.RootDirectory = rootDir
cm := test.NewControllerManager(ctlr)
cm.StartAndWait(conf.HTTP.Port)
defer cm.StopServer()
imageConfig, layers, manifest, err := test.GetRandomImageComponents(10)
So(err, ShouldBeNil)
manifestBlob, err := json.Marshal(manifest)
So(err, ShouldBeNil)
manifestDigest := godigest.FromBytes(manifestBlob)
err = test.UploadImage(
test.Image{Config: imageConfig, Layers: layers, Manifest: manifest, Reference: "test"},
srcBaseURL,
"remote-repo",
)
So(err, ShouldBeNil)
// sign using cosign
err = test.SignImageUsingCosign(fmt.Sprintf("remote-repo@%s", manifestDigest.String()), port)
So(err, ShouldBeNil)
// add OCI Ref
OCIRefManifest := ispec.Artifact{
Subject: &ispec.Descriptor{
MediaType: ispec.MediaTypeImageManifest,
Digest: manifestDigest,
},
Blobs: []ispec.Descriptor{},
MediaType: ispec.MediaTypeArtifactManifest,
}
OCIRefManifestBlob, err := json.Marshal(OCIRefManifest)
So(err, ShouldBeNil)
resp, err := resty.R().
SetHeader("Content-type", ispec.MediaTypeArtifactManifest).
SetBody(OCIRefManifestBlob).
Put(srcBaseURL + "/v2/remote-repo/manifests/oci.ref")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
// add ORAS Ref
ORASRefManifest := artifactspec.Manifest{
Subject: &artifactspec.Descriptor{
MediaType: ispec.MediaTypeImageManifest,
Digest: manifestDigest,
},
Blobs: []artifactspec.Descriptor{},
MediaType: artifactspec.MediaTypeArtifactManifest,
}
ORASRefManifestBlob, err := json.Marshal(ORASRefManifest)
So(err, ShouldBeNil)
resp, err = resty.R().
SetHeader("Content-type", artifactspec.MediaTypeArtifactManifest).
SetBody(ORASRefManifestBlob).
Put(srcBaseURL + "/v2/remote-repo/manifests/oras.ref")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusCreated)
//------- Start downstream server
var tlsVerify bool
regex := ".*"
semver := true
syncRegistryConfig := syncconf.RegistryConfig{
Content: []syncconf.Content{
{
Prefix: "remote-repo",
Tags: &syncconf.Tags{
Regex: &regex,
Semver: &semver,
},
},
},
URLs: []string{srcBaseURL},
TLSVerify: &tlsVerify,
CertDir: "",
OnDemand: true,
}
defaultVal := true
syncConfig := &syncconf.Config{
Enable: &defaultVal,
Registries: []syncconf.RegistryConfig{syncRegistryConfig},
}
destPort := test.GetFreePort()
destConfig := config.New()
destBaseURL := test.GetBaseURL(destPort)
destConfig.HTTP.Port = destPort
destDir := t.TempDir()
destConfig.Storage.RootDirectory = destDir
destConfig.Storage.Dedupe = false
destConfig.Storage.GC = false
destConfig.Extensions = &extconf.ExtensionConfig{}
defVal := true
destConfig.Extensions.Search = &extconf.SearchConfig{
BaseConfig: extconf.BaseConfig{Enable: &defVal},
}
destConfig.Extensions.Sync = syncConfig
dctlr := api.NewController(destConfig)
dcm := test.NewControllerManager(dctlr)
dcm.StartAndWait(destPort)
// repodb fails for syncOCIRefs
dctlr.RepoDB = mocks.RepoDBMock{
SetRepoReferenceFn: func(repo, Reference string, manifestDigest godigest.Digest, mediaType string) error {
if mediaType == ispec.MediaTypeArtifactManifest {
return sync.ErrTestError
}
return nil
},
}
resp, err = resty.R().Get(destBaseURL + "/v2/remote-repo/manifests/test")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
// repodb fails for syncCosignSignature"
dctlr.RepoDB = mocks.RepoDBMock{
SetRepoReferenceFn: func(repo, reference string, manifestDigest godigest.Digest, mediaType string) error {
if strings.HasPrefix(reference, "sha256") || strings.HasSuffix(reference, ".sig") {
return sync.ErrTestError
}
return nil
},
}
resp, err = resty.R().Get(destBaseURL + "/v2/remote-repo/manifests/test")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
// repodb fails for getORASRefs
dctlr.RepoDB = mocks.RepoDBMock{
SetRepoReferenceFn: func(repo, Reference string, manifestDigest godigest.Digest, mediaType string) error {
if mediaType == artifactspec.MediaTypeArtifactManifest {
return sync.ErrTestError
}
return nil
},
}
resp, err = resty.R().Get(destBaseURL + "/v2/remote-repo/manifests/test")
So(err, ShouldBeNil)
So(resp.StatusCode(), ShouldEqual, 200)
})
})
}
func TestPeriodically(t *testing.T) {
@@ -4510,11 +4700,24 @@ func TestSyncWithDestination(t *testing.T) {
},
}
sctlr, srcBaseURL, _, _, _ := startUpstreamServer(t, false, false)
srcPort := test.GetFreePort()
srcConfig := config.New()
srcBaseURL := test.GetBaseURL(srcPort)
defer func() {
sctlr.Shutdown()
}()
srcConfig.HTTP.Port = srcPort
srcDir := t.TempDir()
srcConfig.Storage.RootDirectory = srcDir
defVal := true
srcConfig.Extensions = &extconf.ExtensionConfig{}
srcConfig.Extensions.Search = &extconf.SearchConfig{
BaseConfig: extconf.BaseConfig{Enable: &defVal},
}
sctlr := api.NewController(srcConfig)
test.CopyTestFiles("../../../test/data", srcDir)
err := os.MkdirAll(path.Join(sctlr.Config.Storage.RootDirectory, "/zot-fold"), local.DefaultDirPerms)
So(err, ShouldBeNil)
@@ -4524,9 +4727,15 @@ func TestSyncWithDestination(t *testing.T) {
path.Join(sctlr.Config.Storage.RootDirectory, "zot-test"),
path.Join(sctlr.Config.Storage.RootDirectory, "/zot-fold/zot-test"),
)
So(err, ShouldBeNil)
scm := test.NewControllerManager(sctlr)
scm.StartAndWait(srcPort)
defer func() {
sctlr.Shutdown()
}()
Convey("Test peridiocally sync", func() {
for _, testCase := range testCases {
updateDuration, _ := time.ParseDuration("30m")
+33 -9
View File
@@ -31,6 +31,7 @@ import (
syncconf "zotregistry.io/zot/pkg/extensions/config/sync"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/meta/repodb"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/storage/local"
"zotregistry.io/zot/pkg/test"
@@ -264,7 +265,7 @@ func getFileCredentials(filepath string) (syncconf.CredentialsFile, error) {
}
func pushSyncedLocalImage(localRepo, reference, localCachePath string,
imageStore storage.ImageStore, log log.Logger,
repoDB repodb.RepoDB, imageStore storage.ImageStore, log log.Logger,
) error {
log.Info().Msgf("pushing synced local image %s/%s:%s to local registry", localCachePath, localRepo, reference)
@@ -275,7 +276,7 @@ func pushSyncedLocalImage(localRepo, reference, localCachePath string,
cacheImageStore := local.NewImageStore(localCachePath, false,
storage.DefaultGCDelay, false, false, log, metrics, nil, nil)
manifestContent, _, mediaType, err := cacheImageStore.GetImageManifest(localRepo, reference)
manifestBlob, manifestDigest, mediaType, err := cacheImageStore.GetImageManifest(localRepo, reference)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)).
@@ -287,7 +288,7 @@ func pushSyncedLocalImage(localRepo, reference, localCachePath string,
// is image manifest
switch mediaType {
case ispec.MediaTypeImageManifest:
if err := copyManifest(localRepo, manifestContent, reference, cacheImageStore, imageStore, log); err != nil {
if err := copyManifest(localRepo, manifestBlob, reference, repoDB, cacheImageStore, imageStore, log); err != nil {
if errors.Is(err, zerr.ErrImageLintAnnotations) {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("couldn't upload manifest because of missing annotations")
@@ -301,7 +302,7 @@ func pushSyncedLocalImage(localRepo, reference, localCachePath string,
// is image index
var indexManifest ispec.Index
if err := json.Unmarshal(manifestContent, &indexManifest); err != nil {
if err := json.Unmarshal(manifestBlob, &indexManifest); err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Str("dir", path.Join(cacheImageStore.RootDir(), localRepo)).
Msg("invalid JSON")
@@ -322,7 +323,7 @@ func pushSyncedLocalImage(localRepo, reference, localCachePath string,
return err
}
if err := copyManifest(localRepo, manifestBuf, manifest.Digest.String(),
if err := copyManifest(localRepo, manifestBuf, manifest.Digest.String(), repoDB,
cacheImageStore, imageStore, log); err != nil {
if errors.Is(err, zerr.ErrImageLintAnnotations) {
log.Error().Str("errorType", common.TypeOf(err)).
@@ -335,19 +336,29 @@ func pushSyncedLocalImage(localRepo, reference, localCachePath string,
}
}
_, err = imageStore.PutImageManifest(localRepo, reference, mediaType, manifestContent)
_, err = imageStore.PutImageManifest(localRepo, reference, mediaType, manifestBlob)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("couldn't upload manifest")
return err
}
if repoDB != nil {
err = repodb.SetMetadataFromInput(localRepo, reference, mediaType,
manifestDigest, manifestBlob, imageStore, repoDB, log)
if err != nil {
return fmt.Errorf("failed to set metadata for image '%s %s': %w", localRepo, reference, err)
}
log.Debug().Msgf("successfully set metadata for %s:%s", localRepo, reference)
}
}
return nil
}
func copyManifest(localRepo string, manifestContent []byte, reference string,
func copyManifest(localRepo string, manifestContent []byte, reference string, repoDB repodb.RepoDB,
cacheImageStore, imageStore storage.ImageStore, log log.Logger,
) error {
var manifest ispec.Manifest
@@ -376,7 +387,7 @@ func copyManifest(localRepo string, manifestContent []byte, reference string,
return err
}
_, err = imageStore.PutImageManifest(localRepo, reference,
digest, err := imageStore.PutImageManifest(localRepo, reference,
ispec.MediaTypeImageManifest, manifestContent)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
@@ -385,6 +396,19 @@ func copyManifest(localRepo string, manifestContent []byte, reference string,
return err
}
if repoDB != nil {
err = repodb.SetMetadataFromInput(localRepo, reference, ispec.MediaTypeImageManifest,
digest, manifestContent, imageStore, repoDB, log)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("couldn't set metadata from input")
return err
}
log.Debug().Msgf("successfully set metadata for %s:%s", localRepo, reference)
}
return nil
}
@@ -720,7 +744,7 @@ func syncImageWithRefs(ctx context.Context, localRepo, upstreamRepo, reference s
}
// push from cache to repo
err = pushSyncedLocalImage(localRepo, reference, localCachePath, imageStore, log)
err = pushSyncedLocalImage(localRepo, reference, localCachePath, sig.repoDB, imageStore, log)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("error while pushing synced cached image %s",