Catalog content discovery (#2782)

fix(sync): use pagination when querying remote catalog

feat(api): added /v2/_catalog pagination, fixes #2715

Signed-off-by: Eusebiu Petu <petu.eusebiu@gmail.com>
This commit is contained in:
peusebiu
2024-12-19 19:38:35 +02:00
committed by GitHub
parent 037d6bf3d7
commit 772e90a6c5
16 changed files with 768 additions and 68 deletions
+15
View File
@@ -12,6 +12,7 @@ import (
"zotregistry.dev/zot/pkg/common"
"zotregistry.dev/zot/pkg/log"
reqCtx "zotregistry.dev/zot/pkg/requestcontext"
storageTypes "zotregistry.dev/zot/pkg/storage/types"
)
const (
@@ -20,6 +21,20 @@ const (
OPENID = "OpenID"
)
func AuthzFilterFunc(userAc *reqCtx.UserAccessControl) storageTypes.FilterRepoFunc {
return func(repo string) (bool, error) {
if userAc == nil {
return true, nil
}
if userAc.Can(constants.ReadPermission, repo) {
return true, nil
}
return false, nil
}
}
// AccessController authorizes users to act on resources.
type AccessController struct {
Config *config.AccessControlConfig
+433
View File
@@ -8145,6 +8145,439 @@ func TestRouteFailures(t *testing.T) {
})
}
func TestPagedRepositoriesWithAuthorization(t *testing.T) {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
username, _ := test.GenerateRandomString()
password, _ := test.GenerateRandomString()
htpasswdPath := test.MakeHtpasswdFileFromString(test.GetCredString(username, password))
defer os.Remove(htpasswdPath)
conf.HTTP.Auth = &config.AuthConfig{
HTPasswd: config.AuthHTPasswd{
Path: htpasswdPath,
},
}
readPolicyGroup := config.PolicyGroup{
Policies: []config.Policy{
{
Users: []string{username},
Actions: []string{
constants.ReadPermission,
},
},
},
DefaultPolicy: []string{},
}
conf.HTTP.AccessControl = &config.AccessControlConfig{
Repositories: config.Repositories{
test.AuthorizationAllRepos: config.PolicyGroup{
Policies: []config.Policy{
{
Users: []string{username},
Actions: []string{
constants.ReadPermission,
constants.CreatePermission,
},
},
},
DefaultPolicy: []string{},
},
},
AdminPolicy: config.Policy{
Users: []string{},
Actions: []string{},
},
}
ctlr := api.NewController(conf)
ctlr.Config.Storage.RootDirectory = t.TempDir()
cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
defer cm.StopServer()
client := resty.New()
client.SetBasicAuth(username, password)
img := CreateRandomImage()
repoNames := []string{
"alpine1", "alpine2", "alpine3",
"alpine4", "alpine5", "alpine6",
"alpine7", "alpine8", "alpine9",
}
for _, repo := range repoNames {
err := UploadImageWithBasicAuth(img, baseURL, repo, "0.1", username, password)
if err != nil {
panic(err)
}
}
conf.HTTP.AccessControl.Repositories = config.Repositories{
"alpine[13579]": readPolicyGroup,
"alpine[2468]": config.PolicyGroup{},
}
// Note empty strings signify the query parameter is not set
// There are separate tests for passing the empty string as query parameter
testCases := []struct {
testCaseName string
pageSize string
last string
expectedRepos []string
}{
{
testCaseName: "no parameters",
pageSize: "",
last: "",
expectedRepos: []string{"alpine1", "alpine3", "alpine5", "alpine7", "alpine9"},
},
{
testCaseName: "first 3",
pageSize: "3",
last: "",
expectedRepos: []string{"alpine1", "alpine3", "alpine5"},
},
{
testCaseName: "next 2",
pageSize: "3",
last: "alpine5",
expectedRepos: []string{"alpine7", "alpine9"},
},
{
testCaseName: "0",
pageSize: "0",
last: "",
expectedRepos: []string{},
},
{
testCaseName: "Test the parameter 'last' without parameter 'n'",
pageSize: "",
last: "alpine3",
expectedRepos: []string{"alpine5", "alpine7", "alpine9"},
},
{
testCaseName: "Test the parameter 'last' with the final repo as value",
pageSize: "",
last: "alpine9",
expectedRepos: []string{},
},
}
for _, testCase := range testCases {
Convey(testCase.testCaseName, t, func() {
testHTTPPagedRepositories(t, client, baseURL, testCase.testCaseName, testCase.pageSize,
testCase.last, testCase.expectedRepos, repoNames[len(repoNames)-1])
})
}
}
func TestPagedRepositoriesWithSubpaths(t *testing.T) {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
dir := t.TempDir()
firstSubDir := t.TempDir()
secondSubDir := t.TempDir()
subPaths := make(map[string]config.StorageConfig)
subPaths["/a"] = config.StorageConfig{RootDirectory: firstSubDir}
subPaths["/b"] = config.StorageConfig{RootDirectory: secondSubDir}
ctlr := makeController(conf, dir)
ctlr.Config.Storage.SubPaths = subPaths
ctlr.Config.Storage.Commit = true
cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
defer cm.StopServer()
rthdlr := api.NewRouteHandler(ctlr)
img := CreateRandomImage()
repoNames := []string{
"alpine1", "alpine2", "alpine3",
"a/alpine4", "a/alpine5", "a/alpine6",
"b/alpine7", "b/alpine8", "b/alpine9",
}
for _, repo := range repoNames {
err := UploadImage(img, baseURL, repo, "0.1")
if err != nil {
panic(err)
}
}
// Note empty strings signify the query parameter is not set
// There are separate tests for passing the empty string as query parameter
testCases := []struct {
testCaseName string
pageSize string
last string
expectedRepos []string
}{
{
testCaseName: "no parameters",
pageSize: "",
last: "",
expectedRepos: repoNames,
},
{
testCaseName: "first 5",
pageSize: "5",
last: "",
expectedRepos: repoNames[:5],
},
{
testCaseName: "next 5",
pageSize: "5",
last: "a/alpine5",
expectedRepos: repoNames[5:9],
},
{
testCaseName: "0",
pageSize: "0",
last: "",
expectedRepos: []string{},
},
{
testCaseName: "Test the parameter 'last' without parameter 'n'",
pageSize: "",
last: "alpine2",
expectedRepos: repoNames[2:9],
},
{
testCaseName: "Test the parameter 'last' with the final repo as value",
pageSize: "",
last: repoNames[len(repoNames)-1],
expectedRepos: []string{},
},
}
for _, testCase := range testCases {
Convey(testCase.testCaseName, t, func() {
testPagedRepositories(t, rthdlr, baseURL, testCase.testCaseName, testCase.pageSize,
testCase.last, testCase.expectedRepos, repoNames[len(repoNames)-1])
})
}
}
func TestPagedRepositories(t *testing.T) {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
ctlr := makeController(conf, t.TempDir())
ctlr.Config.Storage.Commit = true
cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
defer cm.StopServer()
rthdlr := api.NewRouteHandler(ctlr)
img := CreateRandomImage()
repoName := "alpine"
repoNames := []string{
"alpine1", "alpine2", "alpine3",
"alpine4", "alpine5", "alpine6",
"alpine7", "alpine8", "alpine9",
}
for _, repo := range repoNames {
err := UploadImage(img, baseURL, repo, "0.1")
if err != nil {
panic(err)
}
}
// Note empty strings signify the query parameter is not set
// There are separate tests for passing the empty string as query parameter
testCases := []struct {
testCaseName string
pageSize string
last string
expectedRepos []string
}{
{
testCaseName: "no parameters",
pageSize: "",
last: "",
expectedRepos: repoNames,
},
{
testCaseName: "first 5",
pageSize: "5",
last: "",
expectedRepos: repoNames[:5],
},
{
testCaseName: "next 5",
pageSize: "5",
last: repoName + "5",
expectedRepos: repoNames[5:9],
},
{
testCaseName: "0",
pageSize: "0",
last: "",
expectedRepos: []string{},
},
{
testCaseName: "Test the parameter 'last' without parameter 'n'",
pageSize: "",
last: repoName + "2",
expectedRepos: repoNames[2:9],
},
{
testCaseName: "Test the parameter 'last' with the final repo as value",
pageSize: "",
last: repoName + "9",
expectedRepos: []string{},
},
}
for _, testCase := range testCases {
Convey(testCase.testCaseName, t, func() {
testPagedRepositories(t, rthdlr, baseURL, testCase.testCaseName, testCase.pageSize,
testCase.last, testCase.expectedRepos, repoNames[len(repoNames)-1])
})
}
}
func testHTTPPagedRepositories(t *testing.T, client *resty.Client, baseURL string, testCaseName string,
pageSize string,
last string,
expectedRepos []string, lastRepoInStorage string,
) {
t.Helper()
Convey(testCaseName, func() {
t.Log("Running " + testCaseName)
params := make(map[string]string)
if pageSize != "" || last != "" {
if pageSize != "" {
params["n"] = pageSize
}
if last != "" {
params["last"] = last
}
}
resp, err := client.R().SetQueryParams(params).Get(baseURL + "/v2/_catalog")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
catalog := struct {
Repositories []string `json:"repositories"`
}{}
err = json.Unmarshal(resp.Body(), &catalog)
So(err, ShouldBeNil)
So(catalog.Repositories, ShouldEqual, expectedRepos)
actualLinkValue := resp.Header().Get("Link")
if pageSize == "0" || pageSize == "" { //nolint:gocritic
So(actualLinkValue, ShouldEqual, "")
} else if expectedRepos[len(expectedRepos)-1] == lastRepoInStorage {
So(actualLinkValue, ShouldEqual, "")
} else {
expectedLinkValue := fmt.Sprintf("</v2/_catalog?n=%s&last=%s>; rel=\"next\"",
pageSize, catalog.Repositories[len(catalog.Repositories)-1],
)
So(actualLinkValue, ShouldEqual, expectedLinkValue)
}
t.Log("Finished " + testCaseName)
})
}
func testPagedRepositories(t *testing.T, rthdlr *api.RouteHandler, baseURL string, testCaseName string,
pageSize string,
last string,
expectedRepos []string, lastRepoInStorage string,
) {
t.Helper()
Convey(testCaseName, func() {
t.Log("Running " + testCaseName)
request, _ := http.NewRequestWithContext(context.TODO(), http.MethodGet,
baseURL+constants.RoutePrefix+constants.ExtCatalogPrefix, nil)
if pageSize != "" || last != "" {
qparm := request.URL.Query()
if pageSize != "" {
qparm.Add("n", pageSize)
}
if last != "" {
qparm.Add("last", last)
}
request.URL.RawQuery = qparm.Encode()
}
response := httptest.NewRecorder()
rthdlr.ListRepositories(response, request)
resp := response.Result()
defer resp.Body.Close()
So(resp, ShouldNotBeNil)
So(resp.StatusCode, ShouldEqual, http.StatusOK)
catalog := struct {
Repositories []string `json:"repositories"`
}{}
body, err := io.ReadAll(resp.Body)
So(err, ShouldBeNil)
err = json.Unmarshal(body, &catalog)
So(err, ShouldBeNil)
So(catalog.Repositories, ShouldEqual, expectedRepos)
actualLinkValue := resp.Header.Get("Link")
if pageSize == "0" || pageSize == "" { //nolint:gocritic
So(actualLinkValue, ShouldEqual, "")
} else if expectedRepos[len(expectedRepos)-1] == lastRepoInStorage {
So(actualLinkValue, ShouldEqual, "")
} else {
expectedLinkValue := fmt.Sprintf("</v2/_catalog?n=%s&last=%s>; rel=\"next\"",
pageSize, catalog.Repositories[len(catalog.Repositories)-1],
)
So(actualLinkValue, ShouldEqual, expectedLinkValue)
}
t.Log("Finished " + testCaseName)
})
}
func TestListingTags(t *testing.T) {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
+98 -32
View File
@@ -47,6 +47,7 @@ import (
mTypes "zotregistry.dev/zot/pkg/meta/types"
zreg "zotregistry.dev/zot/pkg/regexp"
reqCtx "zotregistry.dev/zot/pkg/requestcontext"
"zotregistry.dev/zot/pkg/storage"
storageCommon "zotregistry.dev/zot/pkg/storage/common"
storageTypes "zotregistry.dev/zot/pkg/storage/types"
"zotregistry.dev/zot/pkg/test/inject"
@@ -1763,6 +1764,81 @@ type RepositoryList struct {
Repositories []string `json:"repositories"`
}
func (rh *RouteHandler) listStorageRepositories(lastEntry string, maxEntries int,
userAc *reqCtx.UserAccessControl,
) ([]string, bool, error) {
var moreEntries bool
var err error
var repos []string
remainder := maxEntries
combineRepoList := make([]string, 0)
subStore := rh.c.StoreController.SubStore
subPaths := make([]string, 0)
for subPath := range subStore {
subPaths = append(subPaths, subPath)
}
sort.Strings(subPaths)
storePath := rh.c.StoreController.GetStorePath(lastEntry)
if storePath == storage.DefaultStorePath {
singleStore := rh.c.StoreController.DefaultStore
repos, moreEntries, err = singleStore.GetNextRepositories(lastEntry, remainder, AuthzFilterFunc(userAc))
if err != nil {
return repos, false, err
}
remainder = maxEntries - len(repos)
if moreEntries && remainder <= 0 && len(repos) > 0 {
// maxEntries has been hit
lastEntry = repos[len(repos)-1]
} else {
// reset for the next substores
lastEntry = ""
}
combineRepoList = append(combineRepoList, repos...)
}
for _, subPath := range subPaths {
imgStore := subStore[subPath]
if lastEntry != "" && subPath != storePath {
continue
}
if remainder > 0 || maxEntries == -1 {
repos, moreEntries, err = imgStore.GetNextRepositories(lastEntry, remainder, AuthzFilterFunc(userAc))
if err != nil {
return combineRepoList, false, err
}
// compute remainder
remainder -= len(repos)
if moreEntries && remainder <= 0 && len(repos) > 0 {
// maxEntries has been hit
lastEntry = repos[len(repos)-1]
} else {
// reset for the next substores
lastEntry = ""
}
combineRepoList = append(combineRepoList, repos...)
}
}
return combineRepoList, moreEntries, nil
}
// ListRepositories godoc
// @Summary List image repositories
// @Description List all image repositories
@@ -1776,34 +1852,15 @@ func (rh *RouteHandler) ListRepositories(response http.ResponseWriter, request *
return
}
combineRepoList := make([]string, 0)
q := request.URL.Query()
subStore := rh.c.StoreController.SubStore
lastEntry := q.Get("last")
for _, imgStore := range subStore {
repos, err := imgStore.GetRepositories()
if err != nil {
response.WriteHeader(http.StatusInternalServerError)
return
}
combineRepoList = append(combineRepoList, repos...)
maxEntries, err := strconv.Atoi(q.Get("n"))
if err != nil {
maxEntries = -1
}
singleStore := rh.c.StoreController.DefaultStore
if singleStore != nil {
repos, err := singleStore.GetRepositories()
if err != nil {
response.WriteHeader(http.StatusInternalServerError)
return
}
combineRepoList = append(combineRepoList, repos...)
}
repos := make([]string, 0)
// authz context
userAc, err := reqCtx.UserAcFromContext(request.Context())
if err != nil {
@@ -1812,14 +1869,23 @@ func (rh *RouteHandler) ListRepositories(response http.ResponseWriter, request *
return
}
if userAc != nil {
for _, r := range combineRepoList {
if userAc.Can(constants.ReadPermission, r) {
repos = append(repos, r)
}
}
} else {
repos = combineRepoList
repos, moreEntries, err := rh.listStorageRepositories(lastEntry, maxEntries, userAc)
if err != nil {
response.WriteHeader(http.StatusInternalServerError)
return
}
if moreEntries && len(repos) > 0 {
lastRepo := repos[len(repos)-1]
response.Header().Set(
"Link",
fmt.Sprintf("</v2/_catalog?n=%d&last=%s>; rel=\"next\"",
maxEntries,
lastRepo,
),
)
}
is := RepositoryList{Repositories: repos}
+20 -10
View File
@@ -1359,8 +1359,10 @@ func TestRoutes(t *testing.T) {
"session_id": "test",
},
&mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{}, ErrUnexpectedError
GetNextRepositoriesFn: func(lastRepo string, maxEntries int,
fn storageTypes.FilterRepoFunc,
) ([]string, bool, error) {
return []string{}, false, ErrUnexpectedError
},
},
)
@@ -1374,8 +1376,10 @@ func TestRoutes(t *testing.T) {
"session_id": "test",
},
&mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{}, ErrUnexpectedError
GetNextRepositoriesFn: func(lastRepo string, maxEntries int,
fn storageTypes.FilterRepoFunc,
) ([]string, bool, error) {
return []string{}, false, ErrUnexpectedError
},
},
)
@@ -1384,19 +1388,25 @@ func TestRoutes(t *testing.T) {
Convey("ListRepositories with Authz", func() {
ctlr.StoreController.DefaultStore = &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo"}, nil
GetNextRepositoriesFn: func(lastRepo string, maxEntries int,
fn storageTypes.FilterRepoFunc,
) ([]string, bool, error) {
return []string{"repo"}, false, nil
},
}
ctlr.StoreController.SubStore = map[string]storageTypes.ImageStore{
"test1": &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo1"}, nil
GetNextRepositoriesFn: func(lastRepo string, maxEntries int,
fn storageTypes.FilterRepoFunc,
) ([]string, bool, error) {
return []string{"repo1"}, false, nil
},
},
"test2": &mocks.MockedImageStore{
GetRepositoriesFn: func() ([]string, error) {
return []string{"repo2"}, nil
GetNextRepositoriesFn: func(lastRepo string, maxEntries int,
fn storageTypes.FilterRepoFunc,
) ([]string, bool, error) {
return []string{"repo2"}, false, nil
},
},
}