fix: more accurate storage metrics after zot restart (#1972)

Signed-off-by: Alexei Dodon <adodon@cisco.com>
This commit is contained in:
Alexei Dodon
2023-11-01 18:09:21 +02:00
committed by GitHub
parent 3e6053e1db
commit a79d79a03a
10 changed files with 195 additions and 20 deletions
+8
View File
@@ -372,6 +372,10 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
ext.EnableMetricsExtension(c.Config, c.Log, c.Config.Storage.RootDirectory)
ext.EnableSearchExtension(c.Config, c.StoreController, c.MetaDB, taskScheduler, c.CveScanner, c.Log)
}
// runs once if metrics are enabled & imagestore is local
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
c.StoreController.DefaultStore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
}
if c.Config.Storage.SubPaths != nil {
for route, storageConfig := range c.Config.Storage.SubPaths {
@@ -396,6 +400,10 @@ func (c *Controller) StartBackgroundTasks(reloadCtx context.Context) {
substore := c.StoreController.SubStore[route]
if substore != nil {
substore.RunDedupeBlobs(time.Duration(0), taskScheduler)
if c.Config.IsMetricsEnabled() && c.Config.Storage.StorageDriver == nil {
substore.PopulateStorageMetrics(time.Duration(0), taskScheduler)
}
}
}
}
+1 -1
View File
@@ -16,7 +16,7 @@ type MetricServer interface {
IsEnabled() bool
}
func getDirSize(path string) (int64, error) {
func GetDirSize(path string) (int64, error) {
var size int64
err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error {
+1 -1
View File
@@ -171,7 +171,7 @@ func IncDownloadCounter(ms MetricServer, repo string) {
func SetStorageUsage(ms MetricServer, rootDir, repo string) {
ms.SendMetric(func() {
dir := path.Join(rootDir, repo)
repoSize, err := getDirSize(dir)
repoSize, err := GetDirSize(dir)
if err == nil {
repoStorageBytes.WithLabelValues(repo).Set(float64(repoSize))
+1 -1
View File
@@ -486,7 +486,7 @@ func IncUploadCounter(ms MetricServer, repo string) {
func SetStorageUsage(ms MetricServer, rootDir, repo string) {
dir := path.Join(rootDir, repo)
repoSize, err := getDirSize(dir)
repoSize, err := GetDirSize(dir)
if err != nil {
ms.(*metricServer).log.Error().Err(err).Msg("failed to set storage usage")
}
@@ -4,9 +4,12 @@
package monitoring_test
import (
"context"
"fmt"
"math/rand"
"net/http"
"os"
"path"
"testing"
"time"
@@ -17,6 +20,8 @@ import (
"zotregistry.io/zot/pkg/api/config"
extconf "zotregistry.io/zot/pkg/extensions/config"
"zotregistry.io/zot/pkg/extensions/monitoring"
"zotregistry.io/zot/pkg/scheduler"
common "zotregistry.io/zot/pkg/storage/common"
test "zotregistry.io/zot/pkg/test/common"
. "zotregistry.io/zot/pkg/test/image-utils"
ociutils "zotregistry.io/zot/pkg/test/oci-utils"
@@ -413,6 +418,70 @@ func TestMetricsAuthorization(t *testing.T) {
})
}
func TestPopulateStorageMetrics(t *testing.T) {
Convey("Start a scheduler when metrics enabled", t, func() {
port := test.GetFreePort()
baseURL := test.GetBaseURL(port)
conf := config.New()
conf.HTTP.Port = port
rootDir := t.TempDir()
conf.Storage.RootDirectory = rootDir
conf.Extensions = &extconf.ExtensionConfig{}
enabled := true
conf.Extensions.Metrics = &extconf.MetricsConfig{
BaseConfig: extconf.BaseConfig{Enable: &enabled},
Prometheus: &extconf.PrometheusConfig{Path: "/metrics"},
}
ctlr := api.NewController(conf)
So(ctlr, ShouldNotBeNil)
cm := test.NewControllerManager(ctlr)
cm.StartAndWait(port)
defer cm.StopServer()
// write a couple of images
srcStorageCtlr := ociutils.GetDefaultStoreController(rootDir, ctlr.Log)
err := WriteImageToFileSystem(CreateDefaultImage(), "alpine", "0.0.1", srcStorageCtlr)
So(err, ShouldBeNil)
err = WriteImageToFileSystem(CreateDefaultImage(), "busybox", "0.0.1", srcStorageCtlr)
So(err, ShouldBeNil)
sch := scheduler.NewScheduler(conf, ctlr.Log)
ctx, cancel := context.WithCancel(context.Background())
sch.RunScheduler(ctx)
generator := &common.StorageMetricsInitGenerator{
ImgStore: ctlr.StoreController.DefaultStore,
Metrics: ctlr.Metrics,
Log: ctlr.Log,
MaxDelay: 1, // maximum delay between jobs (each job computes repo's storage size)
}
sch.SubmitGenerator(generator, time.Duration(0), scheduler.LowPriority)
time.Sleep(5 * time.Second)
cancel()
alpineSize, err := monitoring.GetDirSize(path.Join(rootDir, "alpine"))
So(err, ShouldBeNil)
busyboxSize, err := monitoring.GetDirSize(path.Join(rootDir, "busybox"))
So(err, ShouldBeNil)
resp, err := resty.R().Get(baseURL + "/metrics")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
alpineMetric := fmt.Sprintf("zot_repo_storage_bytes{repo=\"alpine\"} %d", alpineSize)
busyboxMetric := fmt.Sprintf("zot_repo_storage_bytes{repo=\"busybox\"} %d", busyboxSize)
respStr := string(resp.Body())
So(respStr, ShouldContainSubstring, alpineMetric)
So(respStr, ShouldContainSubstring, busyboxMetric)
})
}
func generateRandomString() string {
//nolint: gosec
seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
+72
View File
@@ -6,8 +6,10 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"path"
"strings"
"time"
"github.com/docker/distribution/registry/storage/driver"
godigest "github.com/opencontainers/go-digest"
@@ -18,6 +20,7 @@ import (
zerr "zotregistry.io/zot/errors"
zcommon "zotregistry.io/zot/pkg/common"
"zotregistry.io/zot/pkg/extensions/monitoring"
zlog "zotregistry.io/zot/pkg/log"
"zotregistry.io/zot/pkg/scheduler"
storageConstants "zotregistry.io/zot/pkg/storage/constants"
@@ -1052,3 +1055,72 @@ func (dt *dedupeTask) DoWork(ctx context.Context) error {
return err
}
type StorageMetricsInitGenerator struct {
ImgStore storageTypes.ImageStore
done bool
Metrics monitoring.MetricServer
lastRepo string
nextRun time.Time
rand *rand.Rand
Log zlog.Logger
MaxDelay int
}
func (gen *StorageMetricsInitGenerator) Next() (scheduler.Task, error) {
if gen.lastRepo == "" && gen.nextRun.IsZero() {
gen.rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano())) //nolint: gosec
}
delay := gen.rand.Intn(gen.MaxDelay)
gen.nextRun = time.Now().Add(time.Duration(delay) * time.Second)
repo, err := gen.ImgStore.GetNextRepository(gen.lastRepo)
if err != nil {
return nil, err
}
gen.Log.Debug().Str("repo", repo).Int("randomDelay", delay).Msg("StorageMetricsInitGenerator")
if repo == "" {
gen.done = true
return nil, nil
}
gen.lastRepo = repo
return NewStorageMetricsTask(gen.ImgStore, gen.Metrics, repo), nil
}
func (gen *StorageMetricsInitGenerator) IsDone() bool {
return gen.done
}
func (gen *StorageMetricsInitGenerator) IsReady() bool {
return time.Now().After(gen.nextRun)
}
func (gen *StorageMetricsInitGenerator) Reset() {
gen.lastRepo = ""
gen.done = false
gen.nextRun = time.Time{}
}
type smTask struct {
imgStore storageTypes.ImageStore
metrics monitoring.MetricServer
repo string
}
func NewStorageMetricsTask(imgStore storageTypes.ImageStore, metrics monitoring.MetricServer, repo string,
) *smTask {
return &smTask{imgStore, metrics, repo}
}
func (smt *smTask) DoWork(ctx context.Context) error {
// run task
monitoring.SetStorageUsage(smt.metrics, smt.imgStore.RootDir(), smt.repo)
return nil
}
+20 -2
View File
@@ -488,7 +488,10 @@ func (is *ImageStore) PutImageManifest(repo, reference, mediaType string, //noli
is.Unlock(&lockLatency)
if err == nil {
monitoring.SetStorageUsage(is.metrics, is.rootDir, repo)
if is.storeDriver.Name() == storageConstants.LocalStorageDriverName {
monitoring.SetStorageUsage(is.metrics, is.rootDir, repo)
}
monitoring.IncUploadCounter(is.metrics, repo)
}
}()
@@ -621,7 +624,11 @@ func (is *ImageStore) DeleteImageManifest(repo, reference string, detectCollisio
}
func (is *ImageStore) deleteImageManifest(repo, reference string, detectCollisions bool) error {
defer monitoring.SetStorageUsage(is.metrics, is.rootDir, repo)
defer func() {
if is.storeDriver.Name() == storageConstants.LocalStorageDriverName {
monitoring.SetStorageUsage(is.metrics, is.rootDir, repo)
}
}()
index, err := common.GetIndex(is, repo, is.log)
if err != nil {
@@ -1929,6 +1936,17 @@ func (is *ImageStore) RunDedupeBlobs(interval time.Duration, sch *scheduler.Sche
sch.SubmitGenerator(generator, interval, scheduler.MediumPriority)
}
func (is *ImageStore) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) {
generator := &common.StorageMetricsInitGenerator{
ImgStore: is,
Metrics: is.metrics,
Log: is.log,
MaxDelay: 15, //nolint:gomnd
}
sch.SubmitGenerator(generator, interval, scheduler.LowPriority)
}
type blobStream struct {
reader io.Reader
closer io.Closer
+1
View File
@@ -61,6 +61,7 @@ type ImageStore interface { //nolint:interfacebloat
RunDedupeForDigest(digest godigest.Digest, dedupe bool, duplicateBlobs []string) error
GetNextDigestWithBlobPaths(repos []string, lastDigests []godigest.Digest) (godigest.Digest, []string, error)
GetAllBlobs(repo string) ([]string, error)
PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler)
}
type Driver interface { //nolint:interfacebloat
+7
View File
@@ -55,6 +55,7 @@ type MockedImageStore struct {
GetAllBlobsFn func(repo string) ([]string, error)
CleanupRepoFn func(repo string, blobs []godigest.Digest, removeRepo bool) (int, error)
PutIndexContentFn func(repo string, index ispec.Index) error
PopulateStorageMetricsFn func(interval time.Duration, sch *scheduler.Scheduler)
}
func (is MockedImageStore) Lock(t *time.Time) {
@@ -405,3 +406,9 @@ func (is MockedImageStore) PutIndexContent(repo string, index ispec.Index) error
return nil
}
func (is MockedImageStore) PopulateStorageMetrics(interval time.Duration, sch *scheduler.Scheduler) {
if is.PopulateStorageMetricsFn != nil {
is.PopulateStorageMetricsFn(interval, sch)
}
}