mirror of
https://github.com/project-zot/zot.git
synced 2026-06-16 20:38:08 +08:00
zb: pick client IPs from a pool, closes #472
Signed-off-by: Petu Eusebiu <peusebiu@cisco.com>
This commit is contained in:
committed by
Ramkumar Chinchani
parent
a5e091e3d2
commit
ca8b866c46
+125
-107
@@ -6,7 +6,9 @@ import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/big"
|
||||
mrand "math/rand"
|
||||
"net"
|
||||
"net/http"
|
||||
urlparser "net/url"
|
||||
"os"
|
||||
@@ -36,9 +38,13 @@ const (
|
||||
largeBlob = 100 * MiB
|
||||
cicdFmt = "ci-cd"
|
||||
secureProtocol = "https"
|
||||
httpKeepAlive = 30 * time.Second
|
||||
maxSourceIPs = 1000
|
||||
httpTimeout = 30 * time.Second
|
||||
TLSHandshakeTimeout = 10 * time.Second
|
||||
)
|
||||
|
||||
// nolint:gochecknoglobals // used only in this test
|
||||
// nolint:gochecknoglobals
|
||||
var blobHash map[string]godigest.Digest = map[string]godigest.Digest{}
|
||||
|
||||
// nolint:gochecknoglobals // used only in this test
|
||||
@@ -283,26 +289,11 @@ func normalizeProbabilityRange(pbty []float64) []float64 {
|
||||
|
||||
// test suites/funcs.
|
||||
|
||||
type testFunc func(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error
|
||||
|
||||
func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig, statsCh chan statsRecord) error {
|
||||
client := resty.New()
|
||||
|
||||
if auth != "" {
|
||||
creds := strings.Split(auth, ":")
|
||||
client.SetBasicAuth(creds[0], creds[1])
|
||||
}
|
||||
|
||||
parsedURL, err := urlparser.Parse(url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// nolint: gosec
|
||||
if parsedURL.Scheme == secureProtocol {
|
||||
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
}
|
||||
type testFunc func(workdir, url, repo string, requests int, config testConfig,
|
||||
statsCh chan statsRecord, client *resty.Client) error
|
||||
|
||||
func GetCatalog(workdir, url, repo string, requests int, config testConfig,
|
||||
statsCh chan statsRecord, client *resty.Client) error {
|
||||
for count := 0; count < requests; count++ {
|
||||
func() {
|
||||
start := time.Now()
|
||||
@@ -347,26 +338,8 @@ func GetCatalog(workdir, url, auth, repo string, requests int, config testConfig
|
||||
return nil
|
||||
}
|
||||
|
||||
func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
|
||||
config testConfig, statsCh chan statsRecord,
|
||||
) error {
|
||||
client := resty.New()
|
||||
|
||||
if auth != "" {
|
||||
creds := strings.Split(auth, ":")
|
||||
client.SetBasicAuth(creds[0], creds[1])
|
||||
}
|
||||
|
||||
parsedURL, err := urlparser.Parse(url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// nolint: gosec
|
||||
if parsedURL.Scheme == secureProtocol {
|
||||
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
}
|
||||
|
||||
func PushMonolithStreamed(workdir, url, trepo string, requests int, config testConfig,
|
||||
statsCh chan statsRecord, client *resty.Client) error {
|
||||
var repos []string
|
||||
|
||||
if config.mixedSize {
|
||||
@@ -379,7 +352,7 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
|
||||
}
|
||||
|
||||
// clean up
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -387,26 +360,8 @@ func PushMonolithStreamed(workdir, url, auth, trepo string, requests int,
|
||||
return nil
|
||||
}
|
||||
|
||||
func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
|
||||
config testConfig, statsCh chan statsRecord,
|
||||
) error {
|
||||
client := resty.New()
|
||||
|
||||
if auth != "" {
|
||||
creds := strings.Split(auth, ":")
|
||||
client.SetBasicAuth(creds[0], creds[1])
|
||||
}
|
||||
|
||||
parsedURL, err := urlparser.Parse(url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// nolint: gosec
|
||||
if parsedURL.Scheme == secureProtocol {
|
||||
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
}
|
||||
|
||||
func PushChunkStreamed(workdir, url, trepo string, requests int, config testConfig,
|
||||
statsCh chan statsRecord, client *resty.Client) error {
|
||||
var repos []string
|
||||
|
||||
if config.mixedSize {
|
||||
@@ -419,7 +374,7 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
|
||||
}
|
||||
|
||||
// clean up
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -427,26 +382,8 @@ func PushChunkStreamed(workdir, url, auth, trepo string, requests int,
|
||||
return nil
|
||||
}
|
||||
|
||||
func Pull(workdir, url, auth, trepo string, requests int,
|
||||
config testConfig, statsCh chan statsRecord,
|
||||
) error {
|
||||
client := resty.New()
|
||||
|
||||
if auth != "" {
|
||||
creds := strings.Split(auth, ":")
|
||||
client.SetBasicAuth(creds[0], creds[1])
|
||||
}
|
||||
|
||||
parsedURL, err := urlparser.Parse(url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// nolint: gosec
|
||||
if parsedURL.Scheme == secureProtocol {
|
||||
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
}
|
||||
|
||||
func Pull(workdir, url, trepo string, requests int,
|
||||
config testConfig, statsCh chan statsRecord, client *resty.Client) error {
|
||||
var repos []string
|
||||
|
||||
var manifestHash map[string]string
|
||||
@@ -465,7 +402,7 @@ func Pull(workdir, url, auth, trepo string, requests int,
|
||||
largeSizeIdx := 2
|
||||
|
||||
// Push small blob
|
||||
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, smallBlob, client)
|
||||
manifestBySize, repos, err := pushMonolithImage(workdir, url, trepo, repos, smallBlob, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -481,6 +418,7 @@ func Pull(workdir, url, auth, trepo string, requests int,
|
||||
manifestBySizeHash[mediumSizeIdx] = manifestBySize
|
||||
|
||||
// Push large blob
|
||||
// nolint: ineffassign, staticcheck, wastedassign
|
||||
manifestBySize, repos, err = pushMonolithImage(workdir, url, trepo, repos, largeBlob, client)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -489,6 +427,7 @@ func Pull(workdir, url, auth, trepo string, requests int,
|
||||
manifestBySizeHash[largeSizeIdx] = manifestBySize
|
||||
} else {
|
||||
// Push blob given size
|
||||
var err error
|
||||
manifestHash, repos, err = pushMonolithImage(workdir, url, trepo, repos, config.size, client)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -506,7 +445,7 @@ func Pull(workdir, url, auth, trepo string, requests int,
|
||||
}
|
||||
|
||||
// clean up
|
||||
err = deleteTestRepo(repos, url, client)
|
||||
err := deleteTestRepo(repos, url, client)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -514,26 +453,8 @@ func Pull(workdir, url, auth, trepo string, requests int,
|
||||
return nil
|
||||
}
|
||||
|
||||
func MixedPullAndPush(workdir, url, auth, trepo string, requests int,
|
||||
config testConfig, statsCh chan statsRecord,
|
||||
) error {
|
||||
client := resty.New()
|
||||
|
||||
if auth != "" {
|
||||
creds := strings.Split(auth, ":")
|
||||
client.SetBasicAuth(creds[0], creds[1])
|
||||
}
|
||||
|
||||
parsedURL, err := urlparser.Parse(url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// nolint: gosec
|
||||
if parsedURL.Scheme == secureProtocol {
|
||||
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
}
|
||||
|
||||
func MixedPullAndPush(workdir, url, trepo string, requests int,
|
||||
config testConfig, statsCh chan statsRecord, client *resty.Client) error {
|
||||
var repos []string
|
||||
|
||||
statusRequests = make(map[string]int)
|
||||
@@ -674,7 +595,8 @@ var testSuite = []testConfig{ // nolint:gochecknoglobals // used only in this te
|
||||
},
|
||||
}
|
||||
|
||||
func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string) {
|
||||
func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt string,
|
||||
srcIPs string, srcCIDR string) {
|
||||
json := jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
// logging
|
||||
log.SetFlags(0)
|
||||
@@ -694,6 +616,19 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
|
||||
|
||||
zbError := false
|
||||
|
||||
var err error
|
||||
|
||||
// get host ips from command line to make requests from
|
||||
var ips []string
|
||||
if len(srcIPs) > 0 {
|
||||
ips = strings.Split(srcIPs, ",")
|
||||
} else if len(srcCIDR) > 0 {
|
||||
ips, err = getIPsFromCIDR(srcCIDR, maxSourceIPs)
|
||||
if err != nil {
|
||||
log.Fatal(err) //nolint: gocritic
|
||||
}
|
||||
}
|
||||
|
||||
for _, tconfig := range testSuite {
|
||||
statsCh := make(chan statsRecord, requests)
|
||||
|
||||
@@ -710,7 +645,12 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
_ = tconfig.tfunc(workdir, url, auth, repo, requests/concurrency, tconfig, statsCh)
|
||||
httpClient, err := getRandomClientIPs(auth, url, ips)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_ = tconfig.tfunc(workdir, url, repo, requests/concurrency, tconfig, statsCh, httpClient)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
@@ -743,7 +683,7 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
|
||||
if outFmt == cicdFmt {
|
||||
jsonOut, err := json.Marshal(cicdSummary)
|
||||
if err != nil {
|
||||
log.Fatal(err) // nolint:gocritic // file closed on exit
|
||||
log.Fatal(err) // file closed on exit
|
||||
}
|
||||
|
||||
if err := ioutil.WriteFile(fmt.Sprintf("%s.json", outFmt), jsonOut, defaultFilePerms); err != nil {
|
||||
@@ -755,3 +695,81 @@ func Perf(workdir, url, auth, repo string, concurrency int, requests int, outFmt
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// getRandomClientIPs returns a resty client with a random bind address from ips slice.
|
||||
func getRandomClientIPs(auth string, url string, ips []string) (*resty.Client, error) {
|
||||
client := resty.New()
|
||||
|
||||
if auth != "" {
|
||||
creds := strings.Split(auth, ":")
|
||||
client.SetBasicAuth(creds[0], creds[1])
|
||||
}
|
||||
|
||||
// get random ip client
|
||||
if len(ips) != 0 {
|
||||
// get random number
|
||||
nBig, err := crand.Int(crand.Reader, big.NewInt(int64(len(ips))))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// get random ip
|
||||
ip := ips[nBig.Int64()]
|
||||
|
||||
// set ip in transport
|
||||
localAddr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:0", ip))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
transport := &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
DialContext: (&net.Dialer{
|
||||
Timeout: httpTimeout,
|
||||
KeepAlive: httpKeepAlive,
|
||||
LocalAddr: localAddr,
|
||||
}).DialContext,
|
||||
TLSHandshakeTimeout: TLSHandshakeTimeout,
|
||||
}
|
||||
|
||||
client.SetTransport(transport)
|
||||
}
|
||||
|
||||
parsedURL, err := urlparser.Parse(url)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
// nolint: gosec
|
||||
if parsedURL.Scheme == secureProtocol {
|
||||
client.SetTLSClientConfig(&tls.Config{InsecureSkipVerify: true})
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
// getIPsFromCIDR returns a list of ips given a cidr.
|
||||
func getIPsFromCIDR(cidr string, maxIPs int) ([]string, error) {
|
||||
// nolint:varnamelen
|
||||
ip, ipnet, err := net.ParseCIDR(cidr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var ips []string
|
||||
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip) && len(ips) < maxIPs; inc(ip) {
|
||||
ips = append(ips, ip.String())
|
||||
}
|
||||
// remove network address and broadcast address
|
||||
return ips[1 : len(ips)-1], nil
|
||||
}
|
||||
|
||||
// https://go.dev/play/p/sdzcMvZYWnc
|
||||
func inc(ip net.IP) {
|
||||
for j := len(ip) - 1; j >= 0; j-- {
|
||||
ip[j]++
|
||||
if ip[j] > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user