feat(zb): Support for TTFB measurement + on-demand sync tests (#3929)

feat(zb): support for measuring TTFB + sync tests

Adds supports for measuring time to first byte (TTFB)
for Pull tests for the manifest check, the manifest get,
the config get, and the blob get.

Additionally, this introduces 2 new sync tests which measure
the performance of on-demand sync.

Setup code has been refactored to accomodate a new
blob size of 1GB.

Parts of zb have been refactored to address linter errors.

Signed-off-by: Vishwas Rajashekar <dev@vrajashkr.com>
This commit is contained in:
Vishwas Rajashekar
2026-04-24 21:33:11 +05:30
committed by GitHub
parent 9ba59559d2
commit 97b65b5b39
4 changed files with 543 additions and 191 deletions
+205 -76
View File
@@ -8,19 +8,20 @@ Usage:
zb <url> [flags]
Flags:
-A, --auth-creds string Use colon-separated BASIC auth creds
-c, --concurrency int Number of multiple requests to make at a time (default 1)
-h, --help help for zb
-l, --list-tests Print a list of all available tests. When used together with test regex, lists the tests that match the regex.
-o, --output-format string Output format of test results: stdout (default), json, ci-cd
-r, --repo string Use specified repo on remote registry for test data
-n, --requests int Number of requests to perform (default 1)
--skip-cleanup Skip clean up of pushed repos from remote registry after running benchmark (default false)
-s, --src-cidr string Use specified cidr to obtain ips to make requests from, src-ips and src-cidr are mutually exclusive
-i, --src-ips string Use colon-separated ips to make requests from, src-ips and src-cidr are mutually exclusive
-t, --test-regex string Optional regex for selectively running tests. If blank, all tests are run by default.
-v, --version Show the version and exit
-d, --working-dir string Use specified directory to store test data
-A, --auth-creds string Use colon-separated BASIC auth creds
-c, --concurrency int Number of multiple requests to make at a time (default 1)
-h, --help help for zb
-l, --list-tests Print a list of all available tests. When used together with test regex, lists the tests that match the regex.
-o, --output-format string Output format of test results: stdout (default), json, ci-cd
-r, --repo string Use specified repo on remote registry for test data
-n, --requests int Number of requests to perform (default 1)
--skip-cleanup Skip clean up of pushed repos from remote registry after running benchmark (default false)
-s, --src-cidr string Use specified cidr to obtain ips to make requests from, src-ips and src-cidr are mutually exclusive
-i, --src-ips string Use colon-separated ips to make requests from, src-ips and src-cidr are mutually exclusive
-t, --test-regex string Optional regex for selectively running tests. If blank, all tests are run by default.
-u, --upstream-server-url string Sets the upstream server URL for sync tests. Must be provided for sync tests.
-v, --version Show the version and exit
-d, --working-dir string Use specified directory to store test data
```
## Command example
@@ -35,14 +36,16 @@ docker run -net=host -it ghcr.io/project-zot/zb-linux-amd64:latest -c 2 -n 10 -s
## Command output
```console
$ zb -c 10 -n 1000 http://localhost:8080
Registry URL: http://localhost:8080
$ zb -c 2 -n 100 http://localhost:8080
Registry URL: http://localhost:8080
Concurrency Level: 2
Total requests: 100
Working dir:
Working dir: /home/user/test
Skipping test On-demand Sync 100MB
Skipping test On-demand Sync 1GB
Preparing test data ...
Starting tests ...
============
Test name: Get Catalog
Time taken for tests: 45.397205ms
@@ -98,6 +101,8 @@ Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB
Pull 75% and Push 25% Mixed 1MB
Pull 75% and Push 25% Mixed 10MB
Pull 75% and Push 25% Mixed 100MB
On-demand Sync 100MB
On-demand Sync 1GB
```
## List tests with Regex
@@ -112,31 +117,12 @@ Pull 1MB
```
$ zb --src-cidr 127.0.0.0/8 --test-regex "^Push Monolith 1MB$" http://localhost:9000
Registry URL: http://localhost:9000
Registry URL: http://localhost:9000
Concurrency Level: 1
Total requests: 1
Working dir: /home/darkaether/projects/github/zot
Working dir: /home/user/test
Preparing test data ...
Starting tests ...
Skipping test Get Catalog
============
Test name: Push Monolith 1MB
Time taken for tests: 18.700779ms
Requests per second: 53.47371
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 15.970773ms
max: 15.970773ms
p50: 15.970773ms
p75: 15.970773ms
p90: 15.970773ms
p99: 15.970773ms
Skipping test Push Monolith 10MB
Skipping test Push Monolith 100MB
Skipping test Push Chunk Streamed 1MB
@@ -151,58 +137,42 @@ Skipping test Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB
Skipping test Pull 75% and Push 25% Mixed 1MB
Skipping test Pull 75% and Push 25% Mixed 10MB
Skipping test Pull 75% and Push 25% Mixed 100MB
Skipping test On-demand Sync 100MB
Skipping test On-demand Sync 1GB
Preparing test data ...
Starting tests ...
============
Test name: Push Monolith 1MB
Time taken for tests: 20.821408ms
Requests per second: 48.027493
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 18.527633ms
max: 18.527633ms
p50: 18.527633ms
p75: 18.527633ms
p90: 18.527633ms
p99: 18.527633ms
```
## Selective test run with a push and corresponding pull
```
$ zb --src-cidr 127.0.0.0/8 --test-regex "^(Push Monolith|Pull) 1MB$" http://localhost:9000
Registry URL: http://localhost:9000
Registry URL: http://localhost:9000
Concurrency Level: 1
Total requests: 1
Working dir: /home/darkaether/projects/github/zot
Working dir: /home/user/test
Preparing test data ...
Starting tests ...
Skipping test Get Catalog
============
Test name: Push Monolith 1MB
Time taken for tests: 19.136523ms
Requests per second: 52.256096
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 16.496555ms
max: 16.496555ms
p50: 16.496555ms
p75: 16.496555ms
p90: 16.496555ms
p99: 16.496555ms
Skipping test Push Monolith 10MB
Skipping test Push Monolith 100MB
Skipping test Push Chunk Streamed 1MB
Skipping test Push Chunk Streamed 10MB
Skipping test Push Chunk Streamed 100MB
============
Test name: Pull 1MB
Time taken for tests: 17.836719ms
Requests per second: 56.06412
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 3.774833ms
max: 3.774833ms
p50: 3.774833ms
p75: 3.774833ms
p90: 3.774833ms
p99: 3.774833ms
Skipping test Pull 10MB
Skipping test Pull 100MB
Skipping test Pull Mixed 20% 1MB, 70% 10MB, 10% 100MB
@@ -211,6 +181,165 @@ Skipping test Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB
Skipping test Pull 75% and Push 25% Mixed 1MB
Skipping test Pull 75% and Push 25% Mixed 10MB
Skipping test Pull 75% and Push 25% Mixed 100MB
Skipping test On-demand Sync 100MB
Skipping test On-demand Sync 1GB
Preparing test data ...
Starting tests ...
============
Test name: Push Monolith 1MB
Time taken for tests: 21.497313ms
Requests per second: 46.51744
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 18.826599ms
max: 18.826599ms
p50: 18.826599ms
p75: 18.826599ms
p90: 18.826599ms
p99: 18.826599ms
============
Test name: Pull 1MB
Time taken for tests: 15.387887ms
Requests per second: 64.98618
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 2.343145ms
max: 2.343145ms
p50: 2.343145ms
p75: 2.343145ms
p90: 2.343145ms
p99: 2.343145ms
Manifest HEAD TTFB p50: 352.099µs
Manifest HEAD TTFB p75: 352.099µs
Manifest HEAD TTFB p90: 352.099µs
Manifest HEAD TTFB p99: 352.099µs
Manifest GET TTFB p50: 323.77µs
Manifest GET TTFB p75: 323.77µs
Manifest GET TTFB p90: 323.77µs
Manifest GET TTFB p99: 323.77µs
Config TTFB p50: 318.809µs
Config TTFB p75: 318.809µs
Config TTFB p90: 318.809µs
Config TTFB p99: 318.809µs
Layer TTFB p50: 219.679µs
Layer TTFB p75: 219.679µs
Layer TTFB p90: 219.679µs
Layer TTFB p99: 219.679µs
```
## Run on-demand sync tests
Sync tests require an upstream zot registry to be provided and the target zot instance must be configured with on-demand sync config that points to the upstream server.
If upstream registry is not provided, sync tests will be skipped.
```
$ zb --src-cidr 127.0.0.0/8 --test-regex "^On-demand Sync" --upstream-server-url http://localhost:9000 http://localhost:8080
Registry URL: http://localhost:8080
Upstream Registry URL: http://localhost:9000
Concurrency Level: 1
Total requests: 1
Working dir: /home/user/test
Skipping test Get Catalog
Skipping test Push Monolith 1MB
Skipping test Push Monolith 10MB
Skipping test Push Monolith 100MB
Skipping test Push Chunk Streamed 1MB
Skipping test Push Chunk Streamed 10MB
Skipping test Push Chunk Streamed 100MB
Skipping test Pull 1MB
Skipping test Pull 10MB
Skipping test Pull 100MB
Skipping test Pull Mixed 20% 1MB, 70% 10MB, 10% 100MB
Skipping test Push Monolith Mixed 20% 1MB, 70% 10MB, 10% 100MB
Skipping test Push Chunk Mixed 33% 1MB, 33% 10MB, 33% 100MB
Skipping test Pull 75% and Push 25% Mixed 1MB
Skipping test Pull 75% and Push 25% Mixed 10MB
Skipping test Pull 75% and Push 25% Mixed 100MB
Preparing test data ...
Starting tests ...
============
Test name: On-demand Sync 100MB
Time taken for tests: 1.444024183s
Requests per second: 0.6925092
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 550.943262ms
max: 550.943262ms
p50: 550.943262ms
p75: 550.943262ms
p90: 550.943262ms
p99: 550.943262ms
Manifest HEAD TTFB p50: 546.921878ms
Manifest HEAD TTFB p75: 546.921878ms
Manifest HEAD TTFB p90: 546.921878ms
Manifest HEAD TTFB p99: 546.921878ms
Manifest GET TTFB p50: 1.988577ms
Manifest GET TTFB p75: 1.988577ms
Manifest GET TTFB p90: 1.988577ms
Manifest GET TTFB p99: 1.988577ms
Config TTFB p50: 387.699µs
Config TTFB p75: 387.699µs
Config TTFB p90: 387.699µs
Config TTFB p99: 387.699µs
Layer TTFB p50: 232.091µs
Layer TTFB p75: 232.091µs
Layer TTFB p90: 232.091µs
Layer TTFB p99: 232.091µs
============
Test name: On-demand Sync 1GB
Time taken for tests: 16.783082396s
Requests per second: 0.05958381
Complete requests: 1
Failed requests: 0
2xx responses: 1
min: 5.175110487s
max: 5.175110487s
p50: 5.175110487s
p75: 5.175110487s
p90: 5.175110487s
p99: 5.175110487s
Manifest HEAD TTFB p50: 5.170570733s
Manifest HEAD TTFB p75: 5.170570733s
Manifest HEAD TTFB p90: 5.170570733s
Manifest HEAD TTFB p99: 5.170570733s
Manifest GET TTFB p50: 2.269987ms
Manifest GET TTFB p75: 2.269987ms
Manifest GET TTFB p90: 2.269987ms
Manifest GET TTFB p99: 2.269987ms
Config TTFB p50: 623.639µs
Config TTFB p75: 623.639µs
Config TTFB p90: 623.639µs
Config TTFB p99: 623.639µs
Layer TTFB p50: 439.369µs
Layer TTFB p75: 439.369µs
Layer TTFB p90: 439.369µs
Layer TTFB p99: 439.369µs
```
# References
+75 -8
View File
@@ -2,12 +2,14 @@ package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"math/rand"
"net/http"
"net/http/httptrace"
"net/url"
"os"
"path"
@@ -126,6 +128,8 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
err error
)
timings := []perTagTiming{}
defer func() {
// send a stats record
statsCh <- statsRecord{
@@ -134,6 +138,7 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
isConnFail: isConnFail,
isErr: isErr,
err: err,
timings: timings,
}
}()
@@ -148,9 +153,21 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
var resp *resty.Response
tagTiming := perTagTiming{}
var manifestHeadFirstByte time.Time
manifestHeadReqStart := time.Now()
manifestHeadTrace := &httptrace.ClientTrace{
GotFirstResponseByte: func() {
manifestHeadFirstByte = time.Now()
},
}
// check manifest
resp, err = client.R().
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetContext(httptrace.WithClientTrace(context.Background(), manifestHeadTrace)).
Head(manifestLoc)
latency = time.Since(start)
@@ -169,9 +186,23 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
return
}
if !manifestHeadFirstByte.IsZero() {
tagTiming.manifestHeadTTFB = manifestHeadFirstByte.Sub(manifestHeadReqStart)
}
var manifestGetFirstByte time.Time
manifestGetReqStart := time.Now()
manifestTrace := &httptrace.ClientTrace{
GotFirstResponseByte: func() {
manifestGetFirstByte = time.Now()
},
}
// send request and get the manifest
resp, err = client.R().
SetHeader("Content-Type", "application/vnd.oci.image.manifest.v1+json").
SetContext(httptrace.WithClientTrace(context.Background(), manifestTrace)).
Get(manifestLoc)
latency = time.Since(start)
@@ -190,6 +221,10 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
return
}
if !manifestGetFirstByte.IsZero() {
tagTiming.manifestGetTTFB = manifestGetFirstByte.Sub(manifestGetReqStart)
}
manifestBody := resp.Body()
// file copy simulation
@@ -229,8 +264,18 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
return
}
// send request and get the config
resp, err = client.R().Get(configLoc)
var configFirstByte time.Time
configReqStart := time.Now()
configTrace := &httptrace.ClientTrace{
GotFirstResponseByte: func() {
configFirstByte = time.Now()
},
}
resp, err = client.R().
SetContext(httptrace.WithClientTrace(context.Background(), configTrace)).
Get(configLoc)
latency = time.Since(start)
@@ -248,6 +293,10 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
return
}
if !configFirstByte.IsZero() {
tagTiming.configTTFB = configFirstByte.Sub(configReqStart)
}
configBody := resp.Body()
// file copy simulation
@@ -283,8 +332,20 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
return
}
// send request and get response the blob
resp, err = client.R().Get(blobLoc)
var blobFirstByte time.Time
blobReqStart := time.Now()
blobTrace := &httptrace.ClientTrace{
GotFirstResponseByte: func() {
blobFirstByte = time.Now()
},
}
// send request and stream the blob directly to discard without buffering
resp, err = client.R().
SetDoNotParseResponse(true).
SetContext(httptrace.WithClientTrace(context.Background(), blobTrace)).
Get(blobLoc)
latency = time.Since(start)
@@ -298,18 +359,24 @@ func pullAndCollect(url string, repos []string, manifestItem manifestStruct,
statusCode = resp.StatusCode()
if statusCode != http.StatusOK {
isErr = true
resp.RawBody().Close()
return
}
blobBody := resp.Body()
if !blobFirstByte.IsZero() {
tagTiming.layersTTFB = append(tagTiming.layersTTFB, blobFirstByte.Sub(blobReqStart))
}
// file copy simulation
_, err = io.Copy(io.Discard, bytes.NewReader(blobBody))
if err != nil {
// drain response body without buffering in memory
if _, err = io.Copy(io.Discard, resp.RawBody()); err != nil {
log.Fatal(err)
}
resp.RawBody().Close()
}
timings = append(timings, tagTiming)
}
}()
+7 -2
View File
@@ -15,7 +15,7 @@ import (
func NewPerfRootCmd() *cobra.Command {
showVersion := false
var auth, workdir, repo, outFmt, srcIPs, srcCIDR, testRegexStr string
var auth, workdir, repo, outFmt, srcIPs, srcCIDR, testRegexStr, upstreamServerURL string
var concurrency, requests int
@@ -67,7 +67,10 @@ func NewPerfRootCmd() *cobra.Command {
requests = concurrency * (requests / concurrency)
Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR, skipCleanup, testRegex)
Perf(
workdir, url, auth, repo, concurrency, requests, outFmt,
srcIPs, srcCIDR, skipCleanup, testRegex, upstreamServerURL,
)
},
}
@@ -93,6 +96,8 @@ func NewPerfRootCmd() *cobra.Command {
"Optional regex for selectively running tests. If blank, all tests are run by default.")
rootCmd.Flags().BoolVarP(&listTests, "list-tests", "l", false,
"Print a list of all available tests. When used together with test regex, lists the tests that match the regex.")
rootCmd.Flags().StringVarP(&upstreamServerURL, "upstream-server-url", "u", "",
"Sets the upstream server URL for sync tests. Must be provided for sync tests.")
// "version"
rootCmd.Flags().BoolVarP(&showVersion, "version", "v", false, "Show the version and exit")
+256 -105
View File
@@ -29,13 +29,13 @@ const (
KiB = 1 * 1024
MiB = 1 * KiB * 1024
GiB = 1 * MiB * 1024
maxSize = 1 * GiB // 1GiB
defaultDirPerms = 0o700
defaultFilePerms = 0o600
defaultSchemaVersion = 2
smallBlob = 1 * MiB
mediumBlob = 10 * MiB
largeBlob = 100 * MiB
superLargeBlob = 1 * GiB
cicdFmt = "ci-cd"
secureProtocol = "https"
httpKeepAlive = 30 * time.Second
@@ -50,14 +50,12 @@ var blobHash map[string]godigest.Digest = map[string]godigest.Digest{}
//nolint:gochecknoglobals // used only in this test
var statusRequests sync.Map
func setup(workingDir string) {
func setup(workingDir string, sizesToPrepare []int) {
_ = os.MkdirAll(workingDir, defaultDirPerms)
const multiplier = 10
const rndPageSize = 4 * KiB
for size := 1 * MiB; size < maxSize; size *= multiplier {
for _, size := range sizesToPrepare {
fname := path.Join(workingDir, fmt.Sprintf("%d.blob", size))
fhandle, err := os.OpenFile(fname, os.O_RDWR|os.O_CREATE|os.O_TRUNC, defaultFilePerms)
@@ -129,6 +127,10 @@ type statsSummary struct {
mixedSize, mixedType bool
errorCount int
errors map[string]int
manifestHeadTTFBs []time.Duration
manifestGetTTFBs []time.Duration
configTTFBs []time.Duration
layerTTFBs []time.Duration
}
func newStatsSummary(name string) statsSummary {
@@ -139,17 +141,28 @@ func newStatsSummary(name string) statsSummary {
statusHist: make(map[string]int),
mixedSize: false,
mixedType: false,
errors: make(map[string]int),
}
return summary
}
type perTagTiming struct {
manifestHeadTTFB time.Duration
manifestGetTTFB time.Duration
configTTFB time.Duration
layersTTFB []time.Duration
}
type statsRecord struct {
latency time.Duration
statusCode int
isConnFail bool
isErr bool
err error
// sync test specific items
timings []perTagTiming
}
func updateStats(summary *statsSummary, record statsRecord) {
@@ -194,6 +207,22 @@ func updateStats(summary *statsSummary, record statsRecord) {
}
summary.latencies = append(summary.latencies, record.latency)
for _, timing := range record.timings {
if timing.manifestHeadTTFB > 0 {
summary.manifestHeadTTFBs = append(summary.manifestHeadTTFBs, timing.manifestHeadTTFB)
}
if timing.manifestGetTTFB > 0 {
summary.manifestGetTTFBs = append(summary.manifestGetTTFBs, timing.manifestGetTTFB)
}
if timing.configTTFB > 0 {
summary.configTTFBs = append(summary.configTTFBs, timing.configTTFB)
}
summary.layerTTFBs = append(summary.layerTTFBs, timing.layersTTFB...)
}
}
type cicdTestSummary struct {
@@ -208,10 +237,7 @@ type manifestStruct struct {
manifestBySizeHash map[int](map[string]string)
}
//nolint:gochecknoglobals // used only in this test
var cicdSummary = []cicdTestSummary{}
func printStats(requests int, summary *statsSummary, outFmt string) {
func printStats(requests int, summary *statsSummary) {
log.Printf("============\n")
log.Printf("Test name:\t%s", summary.name)
log.Printf("Time taken for tests:\t%v", summary.total)
@@ -253,6 +279,7 @@ func printStats(requests int, summary *statsSummary, outFmt string) {
}
log.Printf("\n")
sort.Sort(Durations(summary.latencies))
log.Printf("min: %v", summary.min)
log.Printf("max: %v", summary.max)
log.Printf("%s:\t%v", "p50", summary.latencies[requests/2])
@@ -261,38 +288,62 @@ func printStats(requests int, summary *statsSummary, outFmt string) {
log.Printf("%s:\t%v", "p99", summary.latencies[requests*99/100])
log.Printf("\n")
// ci/cd
if outFmt == cicdFmt {
cicdSummary = append(cicdSummary,
cicdTestSummary{
Name: summary.name,
Unit: "requests per sec",
Value: summary.rps,
Range: "3",
},
)
if len(summary.manifestHeadTTFBs) > 0 {
sort.Sort(Durations(summary.manifestHeadTTFBs))
n := len(summary.manifestHeadTTFBs)
log.Printf("Manifest HEAD TTFB p50:\t%v", summary.manifestHeadTTFBs[n/2])
log.Printf("Manifest HEAD TTFB p75:\t%v", summary.manifestHeadTTFBs[n*3/4])
log.Printf("Manifest HEAD TTFB p90:\t%v", summary.manifestHeadTTFBs[n*9/10])
log.Printf("Manifest HEAD TTFB p99:\t%v", summary.manifestHeadTTFBs[n*99/100])
log.Printf("\n")
}
if len(summary.manifestGetTTFBs) > 0 {
sort.Sort(Durations(summary.manifestGetTTFBs))
n := len(summary.manifestGetTTFBs)
log.Printf("Manifest GET TTFB p50:\t%v", summary.manifestGetTTFBs[n/2])
log.Printf("Manifest GET TTFB p75:\t%v", summary.manifestGetTTFBs[n*3/4])
log.Printf("Manifest GET TTFB p90:\t%v", summary.manifestGetTTFBs[n*9/10])
log.Printf("Manifest GET TTFB p99:\t%v", summary.manifestGetTTFBs[n*99/100])
log.Printf("\n")
}
if len(summary.configTTFBs) > 0 {
sort.Sort(Durations(summary.configTTFBs))
n := len(summary.configTTFBs)
log.Printf("Config TTFB p50:\t%v", summary.configTTFBs[n/2])
log.Printf("Config TTFB p75:\t%v", summary.configTTFBs[n*3/4])
log.Printf("Config TTFB p90:\t%v", summary.configTTFBs[n*9/10])
log.Printf("Config TTFB p99:\t%v", summary.configTTFBs[n*99/100])
log.Printf("\n")
}
if len(summary.layerTTFBs) > 0 {
sort.Sort(Durations(summary.layerTTFBs))
n := len(summary.layerTTFBs)
log.Printf("Layer TTFB p50:\t%v", summary.layerTTFBs[n/2])
log.Printf("Layer TTFB p75:\t%v", summary.layerTTFBs[n*3/4])
log.Printf("Layer TTFB p90:\t%v", summary.layerTTFBs[n*9/10])
log.Printf("Layer TTFB p99:\t%v", summary.layerTTFBs[n*99/100])
log.Printf("\n")
}
}
// test suites/funcs.
type testFunc func(
workdir, url, repo string,
requests int,
config testConfig,
suiteCfg testSuiteCfg,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error
//nolint:gosec
func GetCatalog(
workdir, url, repo string,
requests int,
config testConfig,
suiteCfg testSuiteCfg,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
@@ -300,15 +351,15 @@ func GetCatalog(
statusRequests = sync.Map{}
for range requests {
for range suiteCfg.requests {
// Push random blob
_, repos, err = pushMonolithImage(workdir, url, repo, repos, config, client)
_, repos, err = pushMonolithImage(suiteCfg.workDir, suiteCfg.targetServerURL, suiteCfg.repo, repos, config, client)
if err != nil {
return err
}
}
for range requests {
for range suiteCfg.requests {
func() {
start := time.Now()
@@ -332,7 +383,7 @@ func GetCatalog(
}()
// send request and get response
resp, err := client.R().Get(url + constants.RoutePrefix + constants.ExtCatalogPrefix)
resp, err := client.R().Get(suiteCfg.targetServerURL + constants.RoutePrefix + constants.ExtCatalogPrefix)
latency = time.Since(start)
@@ -353,8 +404,8 @@ func GetCatalog(
}
// clean up
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if !suiteCfg.skipCleanup {
err = deleteTestRepo(repos, suiteCfg.targetServerURL, client)
if err != nil {
return err
}
@@ -364,12 +415,10 @@ func GetCatalog(
}
func PushMonolithStreamed(
workdir, url, trepo string,
requests int,
config testConfig,
suiteCfg testSuiteCfg,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
@@ -377,14 +426,14 @@ func PushMonolithStreamed(
statusRequests = sync.Map{}
}
for count := range requests {
repos = pushMonolithAndCollect(workdir, url, trepo, count,
for count := range suiteCfg.requests {
repos = pushMonolithAndCollect(suiteCfg.workDir, suiteCfg.targetServerURL, suiteCfg.repo, count,
repos, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if !suiteCfg.skipCleanup {
err := deleteTestRepo(repos, suiteCfg.targetServerURL, client)
if err != nil {
return err
}
@@ -394,12 +443,10 @@ func PushMonolithStreamed(
}
func PushChunkStreamed(
workdir, url, trepo string,
requests int,
config testConfig,
suiteCfg testSuiteCfg,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
@@ -407,14 +454,14 @@ func PushChunkStreamed(
statusRequests = sync.Map{}
}
for count := range requests {
repos = pushChunkAndCollect(workdir, url, trepo, count,
for count := range suiteCfg.requests {
repos = pushChunkAndCollect(suiteCfg.workDir, suiteCfg.targetServerURL, suiteCfg.repo, count,
repos, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if !suiteCfg.skipCleanup {
err := deleteTestRepo(repos, suiteCfg.targetServerURL, client)
if err != nil {
return err
}
@@ -424,12 +471,10 @@ func PushChunkStreamed(
}
func Pull(
workdir, url, trepo string,
requests int,
config testConfig,
suiteCfg testSuiteCfg,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
@@ -441,6 +486,12 @@ func Pull(
statusRequests = sync.Map{}
}
pushTargetURL := suiteCfg.targetServerURL
if suiteCfg.syncTest {
pushTargetURL = suiteCfg.upstreamServerURL
}
if config.mixedSize {
var manifestBySize map[string]string
@@ -451,7 +502,8 @@ func Pull(
config.size = smallBlob
// Push small blob
manifestBySize, repos, err := pushMonolithImage(workdir, url, trepo, repos, config, client)
manifestBySize, repos, err := pushMonolithImage(
suiteCfg.workDir, pushTargetURL, suiteCfg.repo, repos, config, client)
if err != nil {
return err
}
@@ -461,7 +513,8 @@ func Pull(
config.size = mediumBlob
// Push medium blob
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
manifestBySize, repos, err = pushMonolithImage(
suiteCfg.workDir, pushTargetURL, suiteCfg.repo, repos, config, client)
if err != nil {
return err
}
@@ -472,7 +525,8 @@ func Pull(
// Push large blob
//nolint: ineffassign, staticcheck, wastedassign
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
manifestBySize, repos, err = pushMonolithImage(
suiteCfg.workDir, pushTargetURL, suiteCfg.repo, repos, config, client)
if err != nil {
return err
}
@@ -482,7 +536,8 @@ func Pull(
// Push blob given size
var err error
manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config, client)
manifestHash, repos, err = pushMonolithImage(
suiteCfg.workDir, pushTargetURL, suiteCfg.repo, repos, config, client)
if err != nil {
return err
}
@@ -494,35 +549,41 @@ func Pull(
}
// download image
for range requests {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
for range suiteCfg.requests {
repos = pullAndCollect(suiteCfg.targetServerURL, repos, manifestItem, config, client, statsCh)
}
// clean up
if !skipCleanup {
err := deleteTestRepo(repos, url, client)
if !suiteCfg.skipCleanup {
err := deleteTestRepo(repos, suiteCfg.targetServerURL, client)
if err != nil {
return err
}
if suiteCfg.syncTest {
err := deleteTestRepo(repos, suiteCfg.upstreamServerURL, client)
if err != nil {
return err
}
}
}
return nil
}
func MixedPullAndPush(
workdir, url, trepo string,
requests int,
config testConfig,
suiteCfg testSuiteCfg,
statsCh chan statsRecord,
client *resty.Client,
skipCleanup bool,
) error {
var repos []string
statusRequests = sync.Map{}
// Push blob given size
manifestHash, repos, err := pushMonolithImage(workdir, url, trepo, repos, config, client)
manifestHash, repos, err := pushMonolithImage(
suiteCfg.workDir, suiteCfg.targetServerURL, suiteCfg.repo, repos, config, client)
if err != nil {
return err
}
@@ -531,26 +592,28 @@ func MixedPullAndPush(
manifestHash: manifestHash,
}
for count := range requests {
for count := range suiteCfg.requests {
idx := flipFunc(config.probabilityRange)
readTestIdx := 0
writeTestIdx := 1
if idx == readTestIdx {
repos = pullAndCollect(url, repos, manifestItem, config, client, statsCh)
switch idx {
case readTestIdx:
repos = pullAndCollect(suiteCfg.targetServerURL, repos, manifestItem, config, client, statsCh)
current := loadOrStore(&statusRequests, "Pull", 0)
statusRequests.Store("Pull", current+1)
} else if idx == writeTestIdx {
repos = pushMonolithAndCollect(workdir, url, trepo, count, repos, config, client, statsCh)
case writeTestIdx:
repos = pushMonolithAndCollect(
suiteCfg.workDir, suiteCfg.targetServerURL, suiteCfg.repo, count, repos, config, client, statsCh)
current := loadOrStore(&statusRequests, "Push", 0)
statusRequests.Store("Pull", current+1)
}
}
// clean up
if !skipCleanup {
err = deleteTestRepo(repos, url, client)
if !suiteCfg.skipCleanup {
err = deleteTestRepo(repos, suiteCfg.targetServerURL, client)
if err != nil {
return err
}
@@ -561,6 +624,16 @@ func MixedPullAndPush(
// test driver.
type testSuiteCfg struct {
workDir string
targetServerURL string
upstreamServerURL string
repo string
requests int
skipCleanup bool
syncTest bool
}
type testConfig struct {
name string
tfunc testFunc
@@ -568,6 +641,7 @@ type testConfig struct {
size int
probabilityRange []float64
mixedSize, mixedType bool
syncTest bool
}
var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this test
@@ -660,6 +734,18 @@ var testSuite = []testConfig{ //nolint:gochecknoglobals // used only in this tes
mixedType: true,
probabilityRange: normalizeProbabilityRange([]float64{0.75, 0.25}),
},
{
name: "On-demand Sync 100MB",
tfunc: Pull,
size: largeBlob,
syncTest: true,
},
{
name: "On-demand Sync 1GB",
tfunc: Pull,
size: superLargeBlob,
syncTest: true,
},
}
// ListTests logs the available test names with one on each line.
@@ -677,30 +763,32 @@ func ListTests(testRegex *regexp.Regexp) {
}
}
// fatalWithCleanup calls teardown then logs fatal, ensuring cleanup happens before exit.
func fatalWithCleanup(syncObj *sync.Once, workdir string, err error) {
syncObj.Do(func() {
teardown(workdir)
})
log.Fatal(err)
}
func Perf(
workdir, url, auth, repo string,
concurrency int, requests int,
outFmt string, srcIPs string, srcCIDR string, skipCleanup bool,
testRegex *regexp.Regexp,
testRegex *regexp.Regexp, upstreamServerURL string,
) {
json := jsoniter.ConfigCompatibleWithStandardLibrary
// fatalWithCleanup calls teardown then logs fatal, ensuring cleanup happens before exit.
// Uses sync.Once to ensure teardown is only called once, even from goroutines.
var teardownOnce sync.Once
fatalWithCleanup := func(err error) {
teardownOnce.Do(func() {
teardown(workdir)
})
log.Fatal(err)
}
// logging
log.SetFlags(0)
log.SetOutput(tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', tabwriter.TabIndent))
// teardown sync object to ensure cleanup happens on fatal or at the end of tests
var teardownOnce sync.Once
// common header
log.Printf("Registry URL:\t%s", url)
log.Printf("\n")
log.Printf("Registry URL:\t%s\n", url)
if upstreamServerURL != "" {
log.Printf("Upstream Registry URL:\t%s\n", upstreamServerURL)
}
log.Printf("Concurrency Level:\t%v", concurrency)
log.Printf("Total requests:\t%v", requests)
@@ -717,10 +805,44 @@ func Perf(
log.Printf("\n")
// pre-filter tests to know which data to initialize
fileSizesMap := map[int]struct{}{}
testsToRun := []testConfig{}
for _, tconfig := range testSuite {
if testRegex != nil && !testRegex.MatchString(tconfig.name) {
log.Printf("Skipping test %s\n", tconfig.name)
continue
}
if tconfig.syncTest && upstreamServerURL == "" {
log.Printf("Skipping test %s\n", tconfig.name)
continue
}
testsToRun = append(testsToRun, tconfig)
if tconfig.size == 0 {
sizes := []int{smallBlob, mediumBlob, largeBlob}
for _, size := range sizes {
fileSizesMap[size] = struct{}{}
}
} else if tconfig.size != 0 {
fileSizesMap[tconfig.size] = struct{}{}
}
}
// initialize test data
log.Printf("Preparing test data ...\n")
setup(workdir)
sizesToPrepare := []int{}
for size := range fileSizesMap {
sizesToPrepare = append(sizesToPrepare, size)
}
setup(workdir, sizesToPrepare)
log.Printf("Starting tests ...\n")
@@ -735,17 +857,13 @@ func Perf(
} else if len(srcCIDR) > 0 {
ips, err = getIPsFromCIDR(srcCIDR, maxSourceIPs)
if err != nil {
fatalWithCleanup(err)
fatalWithCleanup(&teardownOnce, workdir, err)
}
}
for _, tconfig := range testSuite {
if testRegex != nil && !testRegex.MatchString(tconfig.name) {
log.Printf("Skipping test %s\n", tconfig.name)
continue
}
statsSummaries := []statsSummary{}
for _, tconfig := range testsToRun {
statsCh := make(chan statsRecord, requests)
var wg sync.WaitGroup
@@ -759,12 +877,21 @@ func Perf(
wg.Go(func() {
httpClient, err := getRandomClientIPs(auth, url, ips)
if err != nil {
fatalWithCleanup(err)
fatalWithCleanup(&teardownOnce, workdir, err)
}
err = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient, skipCleanup)
if err != nil {
fatalWithCleanup(err)
suiteConfig := testSuiteCfg{
workDir: workdir,
targetServerURL: url,
upstreamServerURL: upstreamServerURL,
repo: repo,
requests: requests / concurrency,
skipCleanup: skipCleanup,
syncTest: tconfig.syncTest,
}
if tFuncErr := tconfig.tfunc(tconfig, suiteConfig, statsCh, httpClient); tFuncErr != nil {
fatalWithCleanup(&teardownOnce, workdir, tFuncErr)
}
})
}
@@ -787,24 +914,16 @@ func Perf(
updateStats(&summary, record)
}
sort.Sort(Durations(summary.latencies))
printStats(requests, &summary, outFmt)
printStats(requests, &summary)
statsSummaries = append(statsSummaries, summary)
if summary.errorCount != 0 && !zbError {
zbError = true
}
}
if outFmt == cicdFmt {
jsonOut, err := json.Marshal(cicdSummary)
if err != nil {
fatalWithCleanup(err)
}
if err := os.WriteFile(outFmt+".json", jsonOut, defaultFilePerms); err != nil {
fatalWithCleanup(err)
}
if err = outputTestResults(statsSummaries, outFmt); err != nil {
fatalWithCleanup(&teardownOnce, workdir, err)
}
// Cleanup before exit (sync.Once ensures it only runs once, even if fatalWithCleanup was called)
@@ -817,6 +936,38 @@ func Perf(
}
}
// outputTestResults outputs the test results in the specified format.
// If the format is "ci-cd", it writes the results to a JSON file.
func outputTestResults(summary []statsSummary, outFmt string) error {
json := jsoniter.ConfigCompatibleWithStandardLibrary
if outFmt == cicdFmt {
cicdSummary := []cicdTestSummary{}
for _, s := range summary {
cicdSummary = append(cicdSummary,
cicdTestSummary{
Name: s.name,
Unit: "requests per sec",
Value: s.rps,
Range: "3",
},
)
}
jsonOut, err := json.Marshal(cicdSummary)
if err != nil {
return err
}
if err := os.WriteFile(outFmt+".json", jsonOut, defaultFilePerms); err != nil {
return err
}
}
return nil
}
// getRandomClientIPs returns a resty client with a random bind address from ips slice.
func getRandomClientIPs(auth string, url string, ips []string) (*resty.Client, error) {
client := resty.New()