Files
zot/cmd/zb/perf.go
T
Vishwas Rajashekar 9c7e77e12a feat(zb): list tests, test regex filter, docs update (#3884)
feat(zb): list tests and test regex filter + misc

This change introduces the following changes to zb.

Test Filtering
===============
Allows users to selectively run tests by specifying
a standard regex that matches on the name of the test.

Test Listing
===============
Allows users to list out the available tests as well as
the matched tests when using the regex filter.

The documentation README has also been updated with
examples and the command help.

The documentation for skip cleanup has been updated.

Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
2026-03-21 11:43:17 +02:00

897 lines
19 KiB
Go

package main
import (
crand "crypto/rand"
"crypto/tls"
"fmt"
"log"
"math/big"
"net"
"net/http"
urlparser "net/url"
"os"
"path"
"regexp"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
jsoniter "github.com/json-iterator/go"
godigest "github.com/opencontainers/go-digest"
"gopkg.in/resty.v1"
"zotregistry.dev/zot/v2/pkg/api/constants"
)
const (
KiB = 1 * 1024
MiB = 1 * KiB * 1024
GiB = 1 * MiB * 1024
maxSize = 1 * GiB // 1GiB
defaultDirPerms = 0o700
defaultFilePerms = 0o600
defaultSchemaVersion = 2
smallBlob = 1 * MiB
mediumBlob = 10 * MiB
largeBlob = 100 * MiB
cicdFmt = "ci-cd"
secureProtocol = "https"
httpKeepAlive = 30 * time.Second
maxSourceIPs = 1000
httpTimeout = 30 * time.Second
TLSHandshakeTimeout = 10 * time.Second
)
//nolint:gochecknoglobals
var blobHash map[string]godigest.Digest = map[string]godigest.Digest{}
//nolint:gochecknoglobals // used only in this test
var statusRequests sync.Map
func setup(workingDir string) {
_ = os.MkdirAll(workingDir, defaultDirPerms)
const multiplier = 10
const rndPageSize = 4 * KiB
for size := 1 * MiB; size < maxSize; size *= multiplier {
fname := path.Join(workingDir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(fname, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultFilePerms)
if err != nil {
log.Fatal(err)
}
err = fhandle.Truncate(int64(size))
if err != nil {
log.Fatal(err)
}
_, err = fhandle.Seek(0, 0)
if err != nil {
log.Fatal(err)
}
// write a random first page so every test run has different blob content
rnd := make([]byte, rndPageSize)
if _, err := crand.Read(rnd); err != nil {
log.Fatal(err)
}
if _, err := fhandle.Write(rnd); err != nil {
log.Fatal(err)
}
if _, err := fhandle.Seek(0, 0); err != nil {
log.Fatal(err)
}
fhandle.Close() // should flush the write
// pre-compute the SHA256
fhandle, err = os.OpenFile(fname, os.O_RDONLY, defaultFilePerms)
if err != nil {
log.Fatal(err)
}
defer fhandle.Close()
digest, err := godigest.FromReader(fhandle)
if err != nil {
log.Fatal(err) //nolint:gocritic // file closed on exit
}
blobHash[fname] = digest
}
}
func teardown(workingDir string) {
_ = os.RemoveAll(workingDir)
}
// statistics handling.
type Durations []time.Duration
func (a Durations) Len() int { return len(a) }
func (a Durations) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a Durations) Less(i, j int) bool { return a[i] < a[j] }
type statsSummary struct {
latencies []time.Duration
name string
min, max, total time.Duration
statusHist map[string]int
rps float32
mixedSize, mixedType bool
errorCount int
errors map[string]int
}
func newStatsSummary(name string) statsSummary {
summary := statsSummary{
name: name,
min: -1,
max: -1,
statusHist: make(map[string]int),
mixedSize: false,
mixedType: false,
}
return summary
}
type statsRecord struct {
latency time.Duration
statusCode int
isConnFail bool
isErr bool
err error
}
func updateStats(summary *statsSummary, record statsRecord) {
if record.isConnFail || record.isErr {
summary.errorCount++
}
if record.err != nil {
summary.errors[record.err.Error()] += 1
}
if summary.min < 0 || record.latency < summary.min {
summary.min = record.latency
}
if summary.max < 0 || record.latency > summary.max {
summary.max = record.latency
}
// 2xx
if record.statusCode >= http.StatusOK &&
record.statusCode <= http.StatusAccepted {
summary.statusHist["2xx"]++
}
// 3xx
if record.statusCode >= http.StatusMultipleChoices &&
record.statusCode <= http.StatusPermanentRedirect {
summary.statusHist["3xx"]++
}
// 4xx
if record.statusCode >= http.StatusBadRequest &&
record.statusCode <= http.StatusUnavailableForLegalReasons {
summary.statusHist["4xx"]++
}
// 5xx
if record.statusCode >= http.StatusInternalServerError &&
record.statusCode <= http.StatusNetworkAuthenticationRequired {
summary.statusHist["5xx"]++
}
summary.latencies = append(summary.latencies, record.latency)
}
type cicdTestSummary struct {
Name string `json:"name"`
Unit string `json:"unit"`
Value any `json:"value"`
Range string `json:"range,omitempty"`
}
type manifestStruct struct {
manifestHash map[string]string
manifestBySizeHash map[int](map[string]string)
}
//nolint:gochecknoglobals // used only in this test
var cicdSummary = []cicdTestSummary{}
func printStats(requests int, summary *statsSummary, outFmt string) {
log.Printf("============\n")
log.Printf("Test name:\t%s", summary.name)
log.Printf("Time taken for tests:\t%v", summary.total)
log.Printf("Requests per second:\t%v", summary.rps)
log.Printf("Complete requests:\t%v", requests-summary.errorCount)
log.Printf("Failed requests:\t%v", summary.errorCount)
for errStr, count := range summary.errors {
log.Printf("Error %s count:\t%d", errStr, count)
}
log.Printf("\n")
if summary.mixedSize {
current := loadOrStore(&statusRequests, "1MB", 0)
log.Printf("1MB:\t%v", current)
current = loadOrStore(&statusRequests, "10MB", 0)
log.Printf("10MB:\t%v", current)
current = loadOrStore(&statusRequests, "100MB", 0)
log.Printf("100MB:\t%v", current)
log.Printf("\n")
}
if summary.mixedType {
pull := loadOrStore(&statusRequests, "Pull", 0)
log.Printf("Pull:\t%v", pull)
push := loadOrStore(&statusRequests, "Push", 0)
log.Printf("Push:\t%v", push)
log.Printf("\n")
}
for k, v := range summary.statusHist {
log.Printf("%s responses:\t%v", k, v)
}
log.Printf("\n")
log.Printf("min: %v", summary.min)
log.Printf("max: %v", summary.max)
log.Printf("%s:\t%v", "p50", summary.latencies[requests/2])
log.Printf("%s:\t%v", "p75", summary.latencies[requests*3/4])
log.Printf("%s:\t%v", "p90", summary.latencies[requests*9/10])
log.Printf("%s:\t%v", "p99", summary.latencies[requests*99/100])
log.Printf("\n")
// ci/cd
if outFmt == cicdFmt {
cicdSummary = append(cicdSummary,
cicdTestSummary{
Name: summary.name,
Unit: "requests per sec",
Value: summary.rps,
Range: "3",
},
)
}
}
// test suites/funcs.
type testFunc func(
workdir, url, repo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error
//nolint:gosec
func GetCatalog(
workdir, url, repo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
var err error
statusRequests = sync.Map{}
for range requests {
// Push random blob
_, repos, err = pushMonolithImage(workdir, url, repo, repos, config, client)
if err != nil {
return err
}
}
for range requests {
func() {
start := time.Now()
var isConnFail, isErr bool
var statusCode int
var latency time.Duration
var err error
defer func() {
// send a stats record
statsCh <- statsRecord{
latency: latency,
statusCode: statusCode,
isConnFail: isConnFail,
isErr: isErr,
err: err,
}
}()
// send request and get response
resp, err := client.R().Get(url + constants.RoutePrefix + constants.ExtCatalogPrefix)
latency = time.Since(start)
if err != nil {
isConnFail = true
return
}
// request specific check
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
return
}
}()
}
// clean up
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func PushMonolithStreamed(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
if config.mixedSize {
statusRequests = sync.Map{}
}
for count := range requests {
repos = pushMonolithAndCollect(workdir, url, trepo, count,
repos, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func PushChunkStreamed(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
if config.mixedSize {
statusRequests = sync.Map{}
}
for count := range requests {
repos = pushChunkAndCollect(workdir, url, trepo, count,
repos, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func Pull(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
var manifestHash map[string]string
manifestBySizeHash := make(map[int](map[string]string))
if config.mixedSize {
statusRequests = sync.Map{}
}
if config.mixedSize {
var manifestBySize map[string]string
smallSizeIdx := 0
mediumSizeIdx := 1
largeSizeIdx := 2
config.size = smallBlob
// Push small blob
manifestBySize, repos, err := pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestBySizeHash[smallSizeIdx] = manifestBySize
config.size = mediumBlob
// Push medium blob
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestBySizeHash[mediumSizeIdx] = manifestBySize
config.size = largeBlob
// Push large blob
//nolint: ineffassign, staticcheck, wastedassign
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestBySizeHash[largeSizeIdx] = manifestBySize
} else {
// Push blob given size
var err error
manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
}
manifestItem := manifestStruct{
manifestHash: manifestHash,
manifestBySizeHash: manifestBySizeHash,
}
// download image
for range requests {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
func MixedPullAndPush(
workdir, url, trepo string,
requests int,
config testConfig,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
statusRequests = sync.Map{}
// Push blob given size
manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config, client)
if err != nil {
return err
}
manifestItem := manifestStruct{
manifestHash: manifestHash,
}
for count := range requests {
idx := flipFunc(config.probabilityRange)
readTestIdx := 0
writeTestIdx := 1
if idx == readTestIdx {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
current := loadOrStore(&statusRequests, "Pull", 0)
statusRequests.Store("Pull", current+1)
} else if idx == writeTestIdx {
repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh)
current := loadOrStore(&statusRequests, "Push", 0)
statusRequests.Store("Pull", current+1)
}
}
// clean up
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if err != nil {
return err
}
}
return nil
}
// test driver.
type testConfig struct {
name string
tfunc testFunc
// test-specific params
size int
probabilityRange []float64
mixedSize, mixedType bool
}
var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this test
{
name: "Get Catalog",
tfunc: GetCatalog,
probabilityRange: normalizeProbabilityRange([]float64{0.7, 0.2, 0.1}),
},
{
name: "Push Monolith 1MB",
tfunc: PushMonolithStreamed,
size: smallBlob,
},
{
name: "Push Monolith 10MB",
tfunc: PushMonolithStreamed,
size: mediumBlob,
},
{
name: "Push Monolith 100MB",
tfunc: PushMonolithStreamed,
size: largeBlob,
},
{
name: "Push Chunk Streamed 1MB",
tfunc: PushChunkStreamed,
size: smallBlob,
},
{
name: "Push Chunk Streamed 10MB",
tfunc: PushChunkStreamed,
size: mediumBlob,
},
{
name: "Push Chunk Streamed 100MB",
tfunc: PushChunkStreamed,
size: largeBlob,
},
{
name: "Pull 1MB",
tfunc: Pull,
size: smallBlob,
},
{
name: "Pull 10MB",
tfunc: Pull,
size: mediumBlob,
},
{
name: "Pull 100MB",
tfunc: Pull,
size: largeBlob,
},
{
name: "Pull Mixed 20% 1MB, 70% 10MB, 10% 100MB",
tfunc: Pull,
probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}),
mixedSize: true,
},
{
name: "Push Monolith Mixed 20% 1MB, 70% 10MB, 10% 100MB",
tfunc: PushMonolithStreamed,
probabilityRange: normalizeProbabilityRange([]float64{0.2, 0.7, 0.1}),
mixedSize: true,
},
{
name: "Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB",
tfunc: PushChunkStreamed,
probabilityRange: normalizeProbabilityRange([]float64{0.33, 0.33, 0.33}),
mixedSize: true,
},
{
name: "Pull 75% and Push 25% Mixed 1MB",
tfunc: MixedPullAndPush,
size: smallBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "Pull 75% and Push 25% Mixed 10MB",
tfunc: MixedPullAndPush,
size: mediumBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "Pull 75% and Push 25% Mixed 100MB",
tfunc: MixedPullAndPush,
size: largeBlob,
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
}
// ListTests logs the available test names with one on each line.
// When testRegex is not nil, only the tests that match the regex are listed.
func ListTests(testRegex *regexp.Regexp) {
log.SetFlags(0)
log.SetOutput(tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.TabIndent))
for _, tconfig := range testSuite {
if testRegex != nil && !testRegex.MatchString(tconfig.name) {
continue
}
log.Println(tconfig.name)
}
}
func Perf(
workdir, url, auth, repo string,
concurrency int, requests int,
outFmt string, srcIPs string, srcCIDR string, skipCleanup bool,
testRegex *regexp.Regexp,
) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
// fatalWithCleanup calls teardown then logs fatal, ensuring cleanup happens before exit.
// Uses sync.Once to ensure teardown is only called once, even from goroutines.
var teardownOnce sync.Once
fatalWithCleanup := func(err error) {
teardownOnce.Do(func() {
teardown(workdir)
})
log.Fatal(err)
}
// logging
log.SetFlags(0)
log.SetOutput(tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.TabIndent))
// common header
log.Printf("Registry URL:\t%s", url)
log.Printf("\n")
log.Printf("Concurrency Level:\t%v", concurrency)
log.Printf("Total requests:\t%v", requests)
if workdir == "" {
cwd, err := os.Getwd()
if err != nil {
log.Fatal("unable to get current working dir")
}
log.Printf("Working dir:\t%v", cwd)
} else {
log.Printf("Working dir:\t%v", workdir)
}
log.Printf("\n")
// initialize test data
log.Printf("Preparing test data ...\n")
setup(workdir)
log.Printf("Starting tests ...\n")
var err error
zbError := false
// get host ips from command line to make requests from
var ips []string
if len(srcIPs) > 0 {
ips = strings.Split(srcIPs, ",")
} else if len(srcCIDR) > 0 {
ips, err = getIPsFromCIDR(srcCIDR, maxSourceIPs)
if err != nil {
fatalWithCleanup(err)
}
}
for _, tconfig := range testSuite {
if testRegex != nil && !testRegex.MatchString(tconfig.name) {
log.Printf("Skipping test %s\n", tconfig.name)
continue
}
statsCh := make(chan statsRecord, requests)
var wg sync.WaitGroup
summary := newStatsSummary(tconfig.name)
start := time.Now()
for range concurrency {
// parallelize with clients
wg.Go(func() {
httpClient, err := getRandomClientIPs(auth, url, ips)
if err != nil {
fatalWithCleanup(err)
}
err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup)
if err != nil {
fatalWithCleanup(err)
}
})
}
wg.Wait()
summary.total = time.Since(start)
summary.rps = float32(requests) / float32(summary.total.Seconds())
if tconfig.mixedSize || tconfig.size == 0 {
summary.mixedSize = true
}
if tconfig.mixedType {
summary.mixedType = true
}
for range requests {
record := <-statsCh
updateStats(&summary, record)
}
sort.Sort(Durations(summary.latencies))
printStats(requests, &summary, outFmt)
if summary.errorCount != 0 && !zbError {
zbError = true
}
}
if outFmt == cicdFmt {
jsonOut, err := json.Marshal(cicdSummary)
if err != nil {
fatalWithCleanup(err)
}
if err := os.WriteFile(outFmt+".json", jsonOut, defaultFilePerms); err != nil {
fatalWithCleanup(err)
}
}
// Cleanup before exit (sync.Once ensures it only runs once, even if fatalWithCleanup was called)
teardownOnce.Do(func() {
teardown(workdir)
})
if zbError {
os.Exit(1)
}
}
// getRandomClientIPs returns a resty client with a random bind address from ips slice.
func getRandomClientIPs(auth string, url string, ips []string) (*resty.Client, error) {
client := resty.New()
if auth != "" {
creds := strings.Split(auth, ":")
client.SetBasicAuth(creds[0], creds[1])
}
// get random ip client
if len(ips) != 0 {
// get random number
nBig, err := crand.Int(crand.Reader, big.NewInt(int64(len(ips))))
if err != nil {
return nil, err
}
// get random ip
ip := ips[nBig.Int64()]
// set ip in transport
localAddr, err := net.ResolveTCPAddr("tcp", ip+":0")
if err != nil {
return nil, err
}
transport := &http.Transport{
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: httpTimeout,
KeepAlive: httpKeepAlive,
LocalAddr: localAddr,
}).DialContext,
TLSHandshakeTimeout: TLSHandshakeTimeout,
}
client.SetTransport(transport)
}
parsedURL, err := urlparser.Parse(url)
if err != nil {
log.Fatal(err)
}
//nolint: gosec
if parsedURL.Scheme == secureProtocol {
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
}
return client, nil
}
// getIPsFromCIDR returns a list of ips given a cidr.
func getIPsFromCIDR(cidr string, maxIPs int) ([]string, error) {
//nolint:varnamelen
ip, ipnet, err := net.ParseCIDR(cidr)
if err != nil {
return nil, err
}
var ips []string
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip) && len(ips) < maxIPs; inc(ip) {
ips = append(ips, ip.String())
}
// remove network address and broadcast address
return ips[1 : len(ips)-1], nil
}
// https://go.dev/play/p/sdzcMvZYWnc
func inc(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
if ip[j] > 0 {
break
}
}
}