From ca8b866c46461631bf8eb030d9fa03c48d4e584c Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Wed, 23 Mar 2022 17:22:45 +0200 Subject: [PATCH] zb: pick client IPs from a pool, closes #472 Signed-off-by: Petu Eusebiu --- cmd/zb/README.md | 28 +++++- cmd/zb/main.go | 8 +- cmd/zb/perf.go | 232 +++++++++++++++++++++++++---------------------- 3 files changed, 154 insertions(+), 114 deletions(-) diff --git a/cmd/zb/README.md b/cmd/zb/README.md index 87fbe96c..ef96813a 100644 --- a/cmd/zb/README.md +++ b/cmd/zb/README.md @@ -1,11 +1,29 @@ # `zb` -`zb` is a registry benchmarking tool which can run against any [distribution spec](https://github.com/opencontainers/distribution-spec) comformant registry. +## `zb` is a registry benchmarking tool which can run against any [distribution spec](https://github.com/opencontainers/distribution-spec) comformant registry. --n : total number of requests --c : number of concurrent clients performing (n/c) requests per client --d : working dir to store test data --A : BASIC authentication in `username:passwd` format + +``` +Usage: + zb [options] [flags] + +Flags: + -A, --auth-creds string Use colon-separated BASIC auth creds + -c, --concurrency int Number of multiple requests to make at a time (default 1) + -h, --help help for zb + -o, --output-format string Output format of test results: stdout (default), json, ci-cd + -r, --repo string Use specified repo on remote registry for test data + -n, --requests int Number of requests to perform (default 1) + -s, --src-cidr string Use specified cidr to obtain ips to make requests from, src-ips and src-cidr are mutually exclusive + -i, --src-ips string Use colon-separated ips to make requests from, src-ips and src-cidr are mutually exclusive + -v, --version Show the version and exit + -d, --working-dir string Use specified directory to store test data + ``` + + ## Command example + ``` + ./bin/zb-linux-amd64 -c 10 -n 100 --src-cidr 127.0.0.0/8 -A user:pass http://localhost:8080 + ``` # References diff --git a/cmd/zb/main.go b/cmd/zb/main.go index 5f039627..fa1bcae1 100644 --- a/cmd/zb/main.go +++ b/cmd/zb/main.go @@ -13,7 +13,7 @@ import ( func NewPerfRootCmd() *cobra.Command { showVersion := false - var auth, workdir, repo, outFmt string + var auth, workdir, repo, outFmt, srcIPs, srcCIDR string var concurrency, requests int @@ -45,12 +45,16 @@ func NewPerfRootCmd() *cobra.Command { requests = concurrency * (requests / concurrency) - Perf(workdir, url, auth, repo, concurrency, requests, outFmt) + Perf(workdir, url, auth, repo, concurrency, requests, outFmt, srcIPs, srcCIDR) }, } rootCmd.Flags().StringVarP(&auth, "auth-creds", "A", "", "Use colon-separated BASIC auth creds") + rootCmd.Flags().StringVarP(&srcIPs, "src-ips", "i", "", + "Use colon-separated ips to make requests from, src-ips and src-cidr are mutually exclusive") + rootCmd.Flags().StringVarP(&srcCIDR, "src-cidr", "s", "", + "Use specified cidr to obtain ips to make requests from, src-ips and src-cidr are mutually exclusive") rootCmd.Flags().StringVarP(&workdir, "working-dir", "d", "", "Use specified directory to store test data") rootCmd.Flags().StringVarP(&repo, "repo", "r", "", diff --git a/cmd/zb/perf.go b/cmd/zb/perf.go index ced94e13..41998f20 100644 --- a/cmd/zb/perf.go +++ b/cmd/zb/perf.go @@ -6,7 +6,9 @@ import ( "fmt" "io/ioutil" "log" + "math/big" mrand "math/rand" + "net" "net/http" urlparser "net/url" "os" @@ -36,9 +38,13 @@ const ( largeBlob = 100 * MiB cicdFmt = "ci-cd" secureProtocol = "https" + httpKeepAlive = 30 * time.Second + maxSourceIPs = 1000 + httpTimeout = 30 * time.Second + TLSHandshakeTimeout = 10 * time.Second ) -// nolint:gochecknoglobals // used only in this test +// nolint:gochecknoglobals var blobHash map[string]godigest.Digest = map[string]godigest.Digest{} // nolint:gochecknoglobals // used only in this test @@ -283,26 +289,11 @@ func normalizeProbabilityRange(pbty []float64) []float64 { // test suites/funcs. -type testFunc func(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error - -func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error { - client := resty.New() - - if auth != "" { - creds := strings.Split(auth, ":") - client.SetBasicAuth(creds[0], creds[1]) - } - - parsedURL, err := urlparser.Parse(url) - if err != nil { - log.Fatal(err) - } - - // nolint: gosec - if parsedURL.Scheme == secureProtocol { - client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) - } +type testFunc func(workdir, url, repo string, requests int, config testConfig, + statsCh chan statsRecord, client *resty.Client) error +func GetCatalog(workdir, url, repo string, requests int, config testConfig, + statsCh chan statsRecord, client *resty.Client) error { for count := 0; count < requests; count++ { func() { start := time.Now() @@ -347,26 +338,8 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig return nil } -func PushMonolithStreamed(workdir, url, auth, trepo string, requests int, - config testConfig, statsCh chan statsRecord, -) error { - client := resty.New() - - if auth != "" { - creds := strings.Split(auth, ":") - client.SetBasicAuth(creds[0], creds[1]) - } - - parsedURL, err := urlparser.Parse(url) - if err != nil { - log.Fatal(err) - } - - // nolint: gosec - if parsedURL.Scheme == secureProtocol { - client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) - } - +func PushMonolithStreamed(workdir, url, trepo string, requests int, config testConfig, + statsCh chan statsRecord, client *resty.Client) error { var repos []string if config.mixedSize { @@ -379,7 +352,7 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int, } // clean up - err = deleteTestRepo(repos, url, client) + err := deleteTestRepo(repos, url, client) if err != nil { return err } @@ -387,26 +360,8 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int, return nil } -func PushChunkStreamed(workdir, url, auth, trepo string, requests int, - config testConfig, statsCh chan statsRecord, -) error { - client := resty.New() - - if auth != "" { - creds := strings.Split(auth, ":") - client.SetBasicAuth(creds[0], creds[1]) - } - - parsedURL, err := urlparser.Parse(url) - if err != nil { - log.Fatal(err) - } - - // nolint: gosec - if parsedURL.Scheme == secureProtocol { - client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) - } - +func PushChunkStreamed(workdir, url, trepo string, requests int, config testConfig, + statsCh chan statsRecord, client *resty.Client) error { var repos []string if config.mixedSize { @@ -419,7 +374,7 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int, } // clean up - err = deleteTestRepo(repos, url, client) + err := deleteTestRepo(repos, url, client) if err != nil { return err } @@ -427,26 +382,8 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int, return nil } -func Pull(workdir, url, auth, trepo string, requests int, - config testConfig, statsCh chan statsRecord, -) error { - client := resty.New() - - if auth != "" { - creds := strings.Split(auth, ":") - client.SetBasicAuth(creds[0], creds[1]) - } - - parsedURL, err := urlparser.Parse(url) - if err != nil { - log.Fatal(err) - } - - // nolint: gosec - if parsedURL.Scheme == secureProtocol { - client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) - } - +func Pull(workdir, url, trepo string, requests int, + config testConfig, statsCh chan statsRecord, client *resty.Client) error { var repos []string var manifestHash map[string]string @@ -465,7 +402,7 @@ func Pull(workdir, url, auth, trepo string, requests int, largeSizeIdx := 2 // Push small blob - manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, smallBlob, client) + manifestBySize, repos, err := pushMonolithImage(workdir, url, trepo, repos, smallBlob, client) if err != nil { return err } @@ -481,6 +418,7 @@ func Pull(workdir, url, auth, trepo string, requests int, manifestBySizeHash[mediumSizeIdx] = manifestBySize // Push large blob + // nolint: ineffassign, staticcheck, wastedassign manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, largeBlob, client) if err != nil { return err @@ -489,6 +427,7 @@ func Pull(workdir, url, auth, trepo string, requests int, manifestBySizeHash[largeSizeIdx] = manifestBySize } else { // Push blob given size + var err error manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config.size, client) if err != nil { return err @@ -506,7 +445,7 @@ func Pull(workdir, url, auth, trepo string, requests int, } // clean up - err = deleteTestRepo(repos, url, client) + err := deleteTestRepo(repos, url, client) if err != nil { return err } @@ -514,26 +453,8 @@ func Pull(workdir, url, auth, trepo string, requests int, return nil } -func MixedPullAndPush(workdir, url, auth, trepo string, requests int, - config testConfig, statsCh chan statsRecord, -) error { - client := resty.New() - - if auth != "" { - creds := strings.Split(auth, ":") - client.SetBasicAuth(creds[0], creds[1]) - } - - parsedURL, err := urlparser.Parse(url) - if err != nil { - log.Fatal(err) - } - - // nolint: gosec - if parsedURL.Scheme == secureProtocol { - client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) - } - +func MixedPullAndPush(workdir, url, trepo string, requests int, + config testConfig, statsCh chan statsRecord, client *resty.Client) error { var repos []string statusRequests = make(map[string]int) @@ -674,7 +595,8 @@ var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this te }, } -func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string) { +func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string, + srcIPs string, srcCIDR string) { json := jsoniter.ConfigCompatibleWithStandardLibrary // logging log.SetFlags(0) @@ -694,6 +616,19 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt zbError := false + var err error + + // get host ips from command line to make requests from + var ips []string + if len(srcIPs) > 0 { + ips = strings.Split(srcIPs, ",") + } else if len(srcCIDR) > 0 { + ips, err = getIPsFromCIDR(srcCIDR, maxSourceIPs) + if err != nil { + log.Fatal(err) //nolint: gocritic + } + } + for _, tconfig := range testSuite { statsCh := make(chan statsRecord, requests) @@ -710,7 +645,12 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt go func() { defer wg.Done() - _ = tconfig.tfunc(workdir, url, auth, repo, requests/concurrency, tconfig, statsCh) + httpClient, err := getRandomClientIPs(auth, url, ips) + if err != nil { + log.Fatal(err) + } + + _ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient) }() } wg.Wait() @@ -743,7 +683,7 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt if outFmt == cicdFmt { jsonOut, err := json.Marshal(cicdSummary) if err != nil { - log.Fatal(err) // nolint:gocritic // file closed on exit + log.Fatal(err) // file closed on exit } if err := ioutil.WriteFile(fmt.Sprintf("%s.json", outFmt), jsonOut, defaultFilePerms); err != nil { @@ -755,3 +695,81 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt os.Exit(1) } } + +// getRandomClientIPs returns a resty client with a random bind address from ips slice. +func getRandomClientIPs(auth string, url string, ips []string) (*resty.Client, error) { + client := resty.New() + + if auth != "" { + creds := strings.Split(auth, ":") + client.SetBasicAuth(creds[0], creds[1]) + } + + // get random ip client + if len(ips) != 0 { + // get random number + nBig, err := crand.Int(crand.Reader, big.NewInt(int64(len(ips)))) + if err != nil { + return nil, err + } + + // get random ip + ip := ips[nBig.Int64()] + + // set ip in transport + localAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", ip)) + if err != nil { + return nil, err + } + + transport := &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: httpTimeout, + KeepAlive: httpKeepAlive, + LocalAddr: localAddr, + }).DialContext, + TLSHandshakeTimeout: TLSHandshakeTimeout, + } + + client.SetTransport(transport) + } + + parsedURL, err := urlparser.Parse(url) + if err != nil { + log.Fatal(err) + } + + // nolint: gosec + if parsedURL.Scheme == secureProtocol { + client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true}) + } + + return client, nil +} + +// getIPsFromCIDR returns a list of ips given a cidr. +func getIPsFromCIDR(cidr string, maxIPs int) ([]string, error) { + // nolint:varnamelen + ip, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + + var ips []string + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip) && len(ips) < maxIPs; inc(ip) { + ips = append(ips, ip.String()) + } + // remove network address and broadcast address + return ips[1 : len(ips)-1], nil +} + +// https://go.dev/play/p/sdzcMvZYWnc +func inc(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +}