fix: removed resty calls from sync (#1016)

Signed-off-by: Ana-Roberta Lisca <ana.kagome@yahoo.com>
This commit is contained in:
Lisca Ana-Roberta
2022-12-22 20:19:42 +02:00
committed by GitHub
parent 50bdc2f402
commit 14238d4a8d
12 changed files with 820 additions and 568 deletions
+39 -34
View File
@@ -2,10 +2,11 @@ package sync
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"
goSync "sync"
"time"
@@ -18,10 +19,10 @@ import (
"github.com/containers/image/v5/types"
"github.com/opencontainers/go-digest"
ispec "github.com/opencontainers/image-spec/specs-go/v1"
"gopkg.in/resty.v1"
zerr "zotregistry.io/zot/errors"
"zotregistry.io/zot/pkg/api/constants"
"zotregistry.io/zot/pkg/common"
"zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/storage"
"zotregistry.io/zot/pkg/test"
@@ -82,28 +83,17 @@ type RepoReferences struct {
}
// getUpstreamCatalog gets all repos from a registry.
func getUpstreamCatalog(client *resty.Client, upstreamURL string, log log.Logger) (catalog, error) {
func GetUpstreamCatalog(client *http.Client, upstreamURL, username, password string, log log.Logger) (catalog, error) { //nolint
var catalog catalog
registryCatalogURL := fmt.Sprintf("%s%s%s", upstreamURL, constants.RoutePrefix, constants.ExtCatalogPrefix)
resp, err := client.R().SetHeader("Content-Type", "application/json").Get(registryCatalogURL)
body, statusCode, err := common.MakeHTTPGetRequest(client, username,
password, &catalog,
registryCatalogURL, "application/json", log)
if err != nil {
log.Err(err).Msgf("couldn't query %s", registryCatalogURL)
return catalog, err
}
if resp.IsError() {
log.Error().Msgf("couldn't query %s, status code: %d, body: %s", registryCatalogURL,
resp.StatusCode(), resp.Body())
return catalog, zerr.ErrSyncMissingCatalog
}
err = json.Unmarshal(resp.Body(), &catalog)
if err != nil {
log.Err(err).Str("body", string(resp.Body())).Msg("couldn't unmarshal registry's catalog")
statusCode, body)
return catalog, err
}
@@ -119,7 +109,7 @@ func imagesToCopyFromUpstream(ctx context.Context, registryName string, repoName
repoRef, err := parseRepositoryReference(fmt.Sprintf("%s/%s", registryName, repoName))
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't parse repository reference: %s", repoRef)
return imageRefs, err
@@ -127,7 +117,7 @@ func imagesToCopyFromUpstream(ctx context.Context, registryName string, repoName
tags, err := getImageTags(ctx, upstreamCtx, repoRef)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't fetch tags for %s", repoRef)
return imageRefs, err
@@ -228,19 +218,34 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
upstreamCtx := getUpstreamContext(&regCfg, credentials)
options := getCopyOptions(upstreamCtx, localCtx)
httpClient, registryURL, err := getHTTPClient(&regCfg, upstreamURL, credentials, log)
if !common.Contains(regCfg.URLs, upstreamURL) {
return zerr.ErrSyncInvalidUpstreamURL
}
registryURL, err := url.Parse(upstreamURL)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Str("url", upstreamURL).Msg("couldn't parse url")
return err
}
httpClient, err := common.CreateHTTPClient(*regCfg.TLSVerify, registryURL.Host, regCfg.CertDir)
if err != nil {
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("error while creating http client")
return err
}
var catalog catalog
if err = retry.RetryIfNecessary(ctx, func() error {
catalog, err = getUpstreamCatalog(httpClient, upstreamURL, log)
catalog, err = GetUpstreamCatalog(httpClient, upstreamURL, credentials.Username, credentials.Password, log)
return err
}, retryOptions); err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("error while getting upstream catalog, retrying...")
return err
@@ -266,7 +271,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
return err
}, retryOptions); err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("error while getting images references from upstream, retrying...")
return err
@@ -280,7 +285,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
}
}
sig := newSignaturesCopier(httpClient, *registryURL, storeController, log)
sig := newSignaturesCopier(httpClient, credentials, *registryURL, storeController, log)
for _, repoReference := range reposReferences {
upstreamRepo := repoReference.name
@@ -292,7 +297,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
localCachePath, err := getLocalCachePath(imageStore, localRepo)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't get localCachePath for %s", localRepo)
return err
@@ -312,7 +317,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
if !isSupportedMediaType(mediaType) {
if mediaType == ispec.MediaTypeArtifactManifest {
err = sig.syncOCIArtifact(localRepo, upstreamRepo, tag, manifestBuf)
err = sig.syncOCIArtifact(localRepo, upstreamRepo, tag, manifestBuf) //nolint
if err != nil {
return err
}
@@ -359,7 +364,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
// sync image
localImageRef, err := getLocalImageRef(localCachePath, localRepo, tag)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't obtain a valid image reference for reference %s/%s:%s",
localCachePath, localRepo, tag)
@@ -373,7 +378,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
return err
}, retryOptions); err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("error while copying image %s to %s",
upstreamImageRef.DockerReference(), localCachePath)
@@ -382,7 +387,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
// push from cache to repo
err = pushSyncedLocalImage(localRepo, tag, localCachePath, imageStore, log)
if err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("error while pushing synced cached image %s",
fmt.Sprintf("%s/%s:%s", localCachePath, localRepo, tag))
@@ -416,7 +421,7 @@ func syncRegistry(ctx context.Context, regCfg RegistryConfig,
return nil
}, retryOptions); err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msgf("couldn't copy referrer for %s", upstreamImageRef.DockerReference())
}
}
@@ -443,7 +448,7 @@ func getLocalContexts(log log.Logger) (*types.SystemContext, *signature.PolicyCo
policyContext, err := signature.NewPolicyContext(policy)
if err := test.Error(err); err != nil {
log.Error().Str("errorType", TypeOf(err)).
log.Error().Str("errorType", common.TypeOf(err)).
Err(err).Msg("couldn't create policy context")
return &types.SystemContext{}, &signature.PolicyContext{}, err
@@ -463,7 +468,7 @@ func Run(ctx context.Context, cfg Config,
if cfg.CredentialsFile != "" {
credentialsFile, err = getFileCredentials(cfg.CredentialsFile)
if err != nil {
logger.Error().Str("errortype", TypeOf(err)).
logger.Error().Str("errortype", common.TypeOf(err)).
Err(err).Msgf("couldn't get registry credentials from %s", cfg.CredentialsFile)
return err
@@ -513,7 +518,7 @@ func Run(ctx context.Context, cfg Config,
// first try syncing main registry
if err := syncRegistry(ctx, regCfg, upstreamURL, storeController, localCtx, policyCtx,
credentialsFile[upstreamAddr], retryOptions, logger); err != nil {
logger.Error().Str("errortype", TypeOf(err)).
logger.Error().Str("errortype", common.TypeOf(err)).
Err(err).Str("registry", upstreamURL).
Msg("sync exited with error, falling back to auxiliary registries if any")
} else {