mirror of
https://github.com/project-zot/zot.git
synced 2026-06-17 12:58:02 +08:00
feat(api): add repository quota enforcement middleware (#3923)
Adds a configurable maximum repository count per registry instance. When maxRepos is set on StorageConfig, manifest pushes that would create a new repository beyond the limit are rejected with HTTP 429 TOOMANYREQUESTS. Pushes to existing repositories are always allowed. Implemented as an always-available feature in pkg/api (not a build-tag extension). MaxRepos is a field on StorageConfig, enabled when > 0. - repoQuotaMiddleware on the dist-spec router intercepts manifest PUTs. New-repo pushes are serialized with a sync.Mutex to prevent concurrent requests from exceeding the limit. - Adds CountRepos(ctx) to the MetaDB interface with efficient implementations: BoltDB (Stats().KeyN), Redis (HLen), DynamoDB (Scan with Select=COUNT). - Config.IsQuotaEnabled() added, wired into controller.go metaDB init. - Four integration tests (enforcement, concurrency, disabled, unconfigured) and backend-specific CountRepos tests for BoltDB, Redis, and DynamoDB. Signed-off-by: Bachir Khiati <bachir.khiati@gmail.com>
This commit is contained in:
@@ -29,6 +29,7 @@ var (
|
||||
|
||||
type StorageConfig struct {
|
||||
RootDirectory string
|
||||
MaxRepos int
|
||||
Dedupe bool
|
||||
RemoteCache bool
|
||||
GC bool
|
||||
@@ -1144,6 +1145,17 @@ func (c *Config) IsRetentionEnabled() bool {
|
||||
return c.isRetentionEnabledInternal()
|
||||
}
|
||||
|
||||
func (c *Config) IsQuotaEnabled() bool {
|
||||
if c == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
|
||||
return c.Storage.MaxRepos > 0
|
||||
}
|
||||
|
||||
// IsCompatEnabled checks if compatibility mode is enabled.
|
||||
func (c *Config) IsCompatEnabled() bool {
|
||||
if c == nil {
|
||||
|
||||
@@ -411,7 +411,7 @@ func (c *Controller) InitMetaDB() error {
|
||||
extensionsConfig := c.Config.CopyExtensionsConfig()
|
||||
|
||||
if extensionsConfig.IsSearchEnabled() || authConfig.IsBasicAuthnEnabled() || extensionsConfig.IsImageTrustEnabled() ||
|
||||
c.Config.IsRetentionEnabled() {
|
||||
c.Config.IsRetentionEnabled() || c.Config.IsQuotaEnabled() {
|
||||
// Get storage config safely
|
||||
storageConfig := c.Config.CopyStorageConfig()
|
||||
|
||||
|
||||
@@ -12114,7 +12114,7 @@ func TestPeriodicGC(t *testing.T) {
|
||||
|
||||
// periodic GC is enabled for sub store
|
||||
So(string(data), ShouldContainSubstring,
|
||||
fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"Dedupe\":false,\"RemoteCache\":false,\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll
|
||||
fmt.Sprintf("\"SubPaths\":{\"/a\":{\"RootDirectory\":\"%s\",\"MaxRepos\":0,\"Dedupe\":false,\"RemoteCache\":false,\"GC\":true,\"Commit\":false,\"GCDelay\":1000000000,\"GCInterval\":86400000000000", subDir)) //nolint:lll // gofumpt conflicts with lll
|
||||
})
|
||||
|
||||
Convey("Periodic gc error", t, func() {
|
||||
|
||||
@@ -0,0 +1,118 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/mux"
|
||||
|
||||
zerr "zotregistry.dev/zot/v2/errors"
|
||||
"zotregistry.dev/zot/v2/pkg/api/config"
|
||||
apiErr "zotregistry.dev/zot/v2/pkg/api/errors"
|
||||
zcommon "zotregistry.dev/zot/v2/pkg/common"
|
||||
"zotregistry.dev/zot/v2/pkg/log"
|
||||
mTypes "zotregistry.dev/zot/v2/pkg/meta/types"
|
||||
)
|
||||
|
||||
func repoQuotaMiddleware(maxRepos int, metaDB mTypes.MetaDB, log log.Logger) mux.MiddlewareFunc {
|
||||
var quotaMu sync.Mutex
|
||||
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodPut {
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
|
||||
// "reference" is only set on /v2/{name}/manifests/{reference} routes.
|
||||
if _, ok := vars["reference"]; !ok {
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
repoName := vars["name"]
|
||||
if repoName == "" {
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
_, err := metaDB.GetRepoMeta(r.Context(), repoName)
|
||||
if err == nil {
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if !errors.Is(err, zerr.ErrRepoMetaNotFound) {
|
||||
log.Error().Err(err).Str("repo", repoName).
|
||||
Msg("failed to check repo existence for quota, allowing push")
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
quotaMu.Lock()
|
||||
defer quotaMu.Unlock()
|
||||
|
||||
// Re-check after acquiring the lock: another request may have
|
||||
// created this repo while we were waiting.
|
||||
_, err = metaDB.GetRepoMeta(r.Context(), repoName)
|
||||
if err == nil {
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
count, err := metaDB.CountRepos(r.Context())
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("failed to count repos for quota, allowing push")
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
if count >= maxRepos {
|
||||
log.Warn().
|
||||
Str("repo", repoName).
|
||||
Int("current", count).
|
||||
Int("limit", maxRepos).
|
||||
Msg("repository quota limit reached, rejecting push")
|
||||
|
||||
detail := map[string]string{"limit": strconv.Itoa(maxRepos)}
|
||||
zcommon.WriteJSON(w, http.StatusTooManyRequests,
|
||||
apiErr.NewErrorList(apiErr.NewError(apiErr.TOOMANYREQUESTS).AddDetail(detail)))
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func setupQuotaMiddleware(
|
||||
conf *config.Config,
|
||||
router *mux.Router,
|
||||
metaDB mTypes.MetaDB,
|
||||
log log.Logger,
|
||||
) {
|
||||
if !conf.IsQuotaEnabled() {
|
||||
return
|
||||
}
|
||||
|
||||
if metaDB == nil {
|
||||
log.Warn().Msg("metaDB is not initialized, repository quota enforcement disabled")
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
log.Info().Int("maxRepos", conf.Storage.MaxRepos).Msg("repository quota enforcement enabled")
|
||||
router.Use(repoQuotaMiddleware(conf.Storage.MaxRepos, metaDB, log))
|
||||
}
|
||||
@@ -0,0 +1,139 @@
|
||||
package api_test
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
. "github.com/smartystreets/goconvey/convey"
|
||||
"gopkg.in/resty.v1"
|
||||
|
||||
"zotregistry.dev/zot/v2/pkg/api"
|
||||
"zotregistry.dev/zot/v2/pkg/api/config"
|
||||
test "zotregistry.dev/zot/v2/pkg/test/common"
|
||||
. "zotregistry.dev/zot/v2/pkg/test/image-utils"
|
||||
)
|
||||
|
||||
func startQuotaServer(t *testing.T, maxRepos int) (string, func()) {
|
||||
t.Helper()
|
||||
|
||||
port := test.GetFreePort()
|
||||
conf := config.New()
|
||||
conf.HTTP.Port = port
|
||||
conf.Storage.RootDirectory = t.TempDir()
|
||||
conf.Storage.MaxRepos = maxRepos
|
||||
|
||||
ctlr := api.NewController(conf)
|
||||
ctlrManager := test.NewControllerManager(ctlr)
|
||||
ctlrManager.StartAndWait(port)
|
||||
|
||||
return test.GetBaseURL(port), func() { ctlrManager.StopServer() }
|
||||
}
|
||||
|
||||
func TestQuotaEnforcement(t *testing.T) {
|
||||
Convey("Given a registry with maxRepos set to 2", t, func() {
|
||||
baseURL, stop := startQuotaServer(t, 2)
|
||||
defer stop()
|
||||
|
||||
Convey("Push to two different repos succeeds", func() {
|
||||
err := UploadImage(CreateRandomImage(), baseURL, "repo1", "v1")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
err = UploadImage(CreateRandomImage(), baseURL, "repo2", "v1")
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
Convey("Push to a third new repo is rejected with 429", func() {
|
||||
img := CreateRandomImage()
|
||||
manifestBody, err := json.Marshal(img.Manifest)
|
||||
So(err, ShouldBeNil)
|
||||
|
||||
resp, err := resty.R().
|
||||
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
|
||||
SetBody(manifestBody).
|
||||
Put(baseURL + "/v2/repo3/manifests/v1")
|
||||
So(err, ShouldBeNil)
|
||||
So(resp.StatusCode(), ShouldEqual, http.StatusTooManyRequests)
|
||||
|
||||
var body map[string]any
|
||||
So(json.Unmarshal(resp.Body(), &body), ShouldBeNil)
|
||||
errors, ok := body["errors"].([]any)
|
||||
So(ok, ShouldBeTrue)
|
||||
So(len(errors), ShouldBeGreaterThan, 0)
|
||||
firstErr, ok := errors[0].(map[string]any)
|
||||
So(ok, ShouldBeTrue)
|
||||
So(firstErr["code"], ShouldEqual, "TOOMANYREQUESTS")
|
||||
|
||||
detail, ok := firstErr["detail"].(map[string]any)
|
||||
So(ok, ShouldBeTrue)
|
||||
So(detail["limit"], ShouldEqual, "2")
|
||||
})
|
||||
|
||||
Convey("Push a new tag to an existing repo is allowed at the limit", func() {
|
||||
err := UploadImage(CreateRandomImage(), baseURL, "repo1", "v2")
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
|
||||
Convey("Re-pushing an existing tag is allowed at the limit", func() {
|
||||
err := UploadImage(CreateRandomImage(), baseURL, "repo2", "v1")
|
||||
So(err, ShouldBeNil)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestQuotaDisabled(t *testing.T) {
|
||||
Convey("Given a registry with maxRepos set to 0 (disabled)", t, func() {
|
||||
baseURL, stop := startQuotaServer(t, 0)
|
||||
defer stop()
|
||||
|
||||
Convey("Pushing any number of repos succeeds", func() {
|
||||
for _, repo := range []string{"repo1", "repo2", "repo3"} {
|
||||
err := UploadImage(CreateRandomImage(), baseURL, repo, "v1")
|
||||
So(err, ShouldBeNil)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func TestQuotaConcurrency(t *testing.T) {
|
||||
Convey("Given a registry with maxRepos set to 5", t, func() {
|
||||
baseURL, stop := startQuotaServer(t, 5)
|
||||
defer stop()
|
||||
|
||||
Convey("Concurrent pushes to different new repos do not exceed the limit", func() {
|
||||
const goroutines = 10
|
||||
|
||||
var wg sync.WaitGroup
|
||||
results := make([]int, goroutines)
|
||||
|
||||
for i := range goroutines {
|
||||
idx := i
|
||||
wg.Go(func() {
|
||||
err := UploadImage(CreateRandomImage(), baseURL, fmt.Sprintf("concurrent-repo-%d", idx), "v1")
|
||||
if err != nil {
|
||||
results[idx] = http.StatusTooManyRequests
|
||||
} else {
|
||||
results[idx] = http.StatusCreated
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
created := 0
|
||||
rejected := 0
|
||||
|
||||
for _, code := range results {
|
||||
if code == http.StatusCreated {
|
||||
created++
|
||||
} else {
|
||||
rejected++
|
||||
}
|
||||
}
|
||||
|
||||
So(created, ShouldBeLessThanOrEqualTo, 5)
|
||||
So(rejected, ShouldBeGreaterThanOrEqualTo, 5)
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -224,6 +224,7 @@ func (rh *RouteHandler) SetupRoutes() {
|
||||
ext.SetupImageTrustRoutes(rh.c.Config, prefixedRouter, rh.c.MetaDB, rh.c.Log)
|
||||
ext.SetupMgmtRoutes(rh.c.Config, prefixedRouter, rh.c.Log)
|
||||
ext.SetupUserPreferencesRoutes(rh.c.Config, prefixedRouter, rh.c.MetaDB, rh.c.Log)
|
||||
setupQuotaMiddleware(rh.c.Config, prefixedDistSpecRouter, rh.c.MetaDB, rh.c.Log)
|
||||
// last should always be UI because it will setup a http.FileServer and paths will be resolved by this FileServer.
|
||||
ext.SetupUIRoutes(rh.c.Config, rh.c.Router, rh.c.Log)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user