feat(cluster): Add support for request proxying for scale out (#2385)

* feat(cluster): initial commit for scale-out cluster

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>

* feat(cluster): support shared storage scale out

This change introduces support for shared storage backed
zot cluster scale out.

New feature
Multiple stateless zot instances can run using the same shared
storage backend where each instance looks at a specific set
of repositories based on a siphash of the repository name to improve
scale as the load is distributed across multiple instances.
For a given config, there will only be one instance that can perform
dist-spec read/write on a given repository.

What's changed?
- introduced a transparent request proxy for dist-spec endpoints based on
siphash of repository name.
- new config for scale out cluster that specifies list of
cluster members.

Signed-off-by: Vishwas Rajashekar <vrajashe@cisco.com>

---------

Signed-off-by: Ramkumar Chinchani <rchincha@cisco.com>
Signed-off-by: Vishwas Rajashekar <vrajashe@cisco.com>
Co-authored-by: Ramkumar Chinchani <rchincha@cisco.com>
This commit is contained in:
Vishwas R
2024-05-20 21:35:21 +05:30
committed by GitHub
parent be5ad66797
commit 5ae7a028d9
30 changed files with 2320 additions and 24 deletions
+27
View File
@@ -121,6 +121,32 @@ type SchedulerConfig struct {
NumWorkers int
}
// contains the scale-out configuration which is identical for all zot replicas.
type ClusterConfig struct {
// contains the "host:port" of all the zot instances participating
// in the cluster.
Members []string `json:"members" mapstructure:"members"`
// contains the hash key that is required for siphash.
// must be a 128-bit (16-byte) key
// https://github.com/dchest/siphash?tab=readme-ov-file#func-newkey-byte-hashhash64
HashKey string `json:"hashKey" mapstructure:"hashKey"`
// contains client TLS config.
TLS *TLSConfig `json:"tls" mapstructure:"tls"`
// private field for storing Proxy details such as internal socket list.
Proxy *ClusterRequestProxyConfig `json:"-" mapstructure:"-"`
}
type ClusterRequestProxyConfig struct {
// holds the cluster socket (IP:port) derived from the host's
// interface configuration and the listening port of the HTTP server.
LocalMemberClusterSocket string
// index of the local member cluster socket in the members array.
LocalMemberClusterSocketIndex uint64
}
type LDAPCredentials struct {
BindDN string
BindPassword string
@@ -230,6 +256,7 @@ type Config struct {
Log *LogConfig
Extensions *extconf.ExtensionConfig
Scheduler *SchedulerConfig `json:"scheduler" mapstructure:",omitempty"`
Cluster *ClusterConfig `json:"cluster" mapstructure:",omitempty"`
}
func New() *Config {
+5
View File
@@ -31,4 +31,9 @@ const (
DeletePermission = "delete"
// behaviour actions.
DetectManifestCollisionPermission = "detectManifestCollision"
// zot scale-out hop count header.
ScaleOutHopCountHeader = "X-Zot-Cluster-Hop-Count"
// log string keys.
// these can be used together with the logger to add context to a log message.
RepositoryLogKey = "repository"
)
+43 -5
View File
@@ -19,6 +19,7 @@ import (
"zotregistry.dev/zot/errors"
"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/common"
ext "zotregistry.dev/zot/pkg/extensions"
extconf "zotregistry.dev/zot/pkg/extensions/config"
"zotregistry.dev/zot/pkg/extensions/monitoring"
@@ -54,15 +55,52 @@ type Controller struct {
chosenPort int // kernel-chosen port
}
func NewController(config *config.Config) *Controller {
func NewController(appConfig *config.Config) *Controller {
var controller Controller
logger := log.NewLogger(config.Log.Level, config.Log.Output)
controller.Config = config
logger := log.NewLogger(appConfig.Log.Level, appConfig.Log.Output)
if appConfig.Cluster != nil {
// we need the set of local sockets (IP address:port) for identifying
// the local member cluster socket for logging and lookup.
localSockets, err := common.GetLocalSockets(appConfig.HTTP.Port)
if err != nil {
logger.Error().Err(err).Msg("failed to get local sockets")
panic("failed to get local sockets")
}
// memberSocket is the local member's socket
// the index is also fetched for quick lookups during proxying
memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets)
if err != nil {
logger.Error().Err(err).Msg("failed to get member socket")
panic("failed to get member socket")
}
if memberSocket == "" {
// there is a misconfiguration if the memberSocket cannot be identified
logger.Error().
Str("members", strings.Join(appConfig.Cluster.Members, ",")).
Str("localSockets", strings.Join(localSockets, ",")).
Msg("failed to determine the local cluster socket")
panic("failed to determine the local cluster socket")
}
internalProxyConfig := &config.ClusterRequestProxyConfig{
LocalMemberClusterSocket: memberSocket,
LocalMemberClusterSocketIndex: uint64(memberSocketIdx),
}
appConfig.Cluster.Proxy = internalProxyConfig
logger.Logger = logger.Logger.With().
Str("clusterMember", memberSocket).
Str("clusterMemberIndex", strconv.Itoa(memberSocketIdx)).Logger()
}
controller.Config = appConfig
controller.Log = logger
if config.Log.Audit != "" {
audit := log.NewAuditLogger(config.Log.Level, config.Log.Audit)
if appConfig.Log.Audit != "" {
audit := log.NewAuditLogger(appConfig.Log.Level, appConfig.Log.Audit)
controller.Audit = audit
}
+454
View File
@@ -95,6 +95,32 @@ func TestNew(t *testing.T) {
So(conf, ShouldNotBeNil)
So(api.NewController(conf), ShouldNotBeNil)
})
Convey("Given a scale out cluster config where the local cluster socket cannot be found", t, func() {
conf := config.New()
So(conf, ShouldNotBeNil)
conf.HTTP = config.HTTPConfig{
Address: "127.0.0.2",
Port: "9000",
}
conf.Cluster = &config.ClusterConfig{
Members: []string{},
}
So(func() { api.NewController(conf) }, ShouldPanicWith, "failed to determine the local cluster socket")
})
Convey("Given a scale out cluster config where the local cluster socket cannot be found due to an error", t, func() {
conf := config.New()
So(conf, ShouldNotBeNil)
conf.HTTP = config.HTTPConfig{
Address: "127.0.0.2",
Port: "9000",
}
conf.Cluster = &config.ClusterConfig{
Members: []string{"127.0.0.1"},
}
So(func() { api.NewController(conf) }, ShouldPanicWith, "failed to get member socket")
})
}
func TestCreateCacheDatabaseDriver(t *testing.T) {
@@ -958,6 +984,434 @@ func TestBlobReferenced(t *testing.T) {
})
}
// tests for shared-storage scale-out cluster.
func TestScaleOutRequestProxy(t *testing.T) {
// when there is only one member, no proxying is expected and the responses should be correct.
Convey("Given a zot scale out cluster in http mode with only 1 member", t, func() {
port := test.GetFreePort()
clusterMembers := make([]string, 1)
clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port)
conf := config.New()
conf.HTTP.Port = port
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
Convey("Controller should start up and respond without error", func() {
resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
})
Convey("Should upload images and fetch valid responses for repo tags list", func() {
reposToTest := []string{"debian", "alpine", "ubuntu"}
for _, repoName := range reposToTest {
img := CreateRandomImage()
err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0")
So(err, ShouldBeNil)
resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName))
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
result := common.ImageTags{}
err = json.Unmarshal(resp.Body(), &result)
if err != nil {
t.Fatalf("Failed to unmarshal")
}
So(result.Name, ShouldEqual, repoName)
So(len(result.Tags), ShouldEqual, 1)
So(result.Tags[0], ShouldEqual, "1.0")
}
})
})
// when only one member in the cluster is online, an error is expected when there is a
// request proxied to an offline member.
Convey("Given a scale out http cluster with only 1 online member", t, func() {
port := test.GetFreePort()
clusterMembers := make([]string, 3)
clusterMembers[0] = fmt.Sprintf("127.0.0.1:%s", port)
clusterMembers[1] = "127.0.0.1:1"
clusterMembers[2] = "127.0.0.1:2"
conf := config.New()
conf.HTTP.Port = port
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
Convey("Controller should start up and respond without error", func() {
resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
})
Convey("Should fail to upload an image that is proxied to another instance", func() {
repoName := "alpine"
img := CreateRandomImage()
err := UploadImage(img, test.GetBaseURL(port), repoName, "1.0")
So(err, ShouldNotBeNil)
So(err.Error(), ShouldEqual, "can't post blob")
resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName))
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError)
})
})
// when there are multiple members in a cluster, requests are expected to return
// the same data for any member due to proxying.
Convey("Given a zot scale out cluster in http mode with 3 members", t, func() {
numMembers := 3
ports := make([]string, numMembers)
clusterMembers := make([]string, numMembers)
for idx := 0; idx < numMembers; idx++ {
port := test.GetFreePort()
ports[idx] = port
clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port)
}
for _, port := range ports {
conf := config.New()
conf.HTTP.Port = port
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
}
Convey("All 3 controllers should start up and respond without error", func() {
for _, port := range ports {
resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
}
})
Convey("Should upload images to repos and fetch same response from all 3 members", func() {
reposToTest := []string{"debian", "alpine", "ubuntu"}
for idx, repoName := range reposToTest {
img := CreateRandomImage()
// Upload to each instance based on loop counter
err := UploadImage(img, test.GetBaseURL(ports[idx]), repoName, "1.0")
So(err, ShouldBeNil)
// Query all 3 instances and expect the same response
for _, port := range ports {
resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetBaseURL(port), repoName))
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
result := common.ImageTags{}
err = json.Unmarshal(resp.Body(), &result)
if err != nil {
t.Fatalf("Failed to unmarshal")
}
So(result.Name, ShouldEqual, repoName)
So(len(result.Tags), ShouldEqual, 1)
So(result.Tags[0], ShouldEqual, "1.0")
}
}
})
})
// this test checks for functionality when TLS and htpasswd auth are enabled.
// it primarily checks that headers are correctly copied over during the proxying process.
Convey("Given a zot scale out cluster in https mode with auth enabled", t, func() {
numMembers := 3
ports := make([]string, numMembers)
clusterMembers := make([]string, numMembers)
for idx := 0; idx < numMembers; idx++ {
port := test.GetFreePort()
ports[idx] = port
clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port)
}
caCert, err := os.ReadFile(CACert)
So(err, ShouldBeNil)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
username, _ := test.GenerateRandomString()
password, _ := test.GenerateRandomString()
htpasswdPath := test.MakeHtpasswdFileFromString(test.GetCredString(username, password))
defer os.Remove(htpasswdPath)
resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12})
defer func() { resty.SetTLSClientConfig(nil) }()
for _, port := range ports {
conf := config.New()
conf.HTTP.Port = port
conf.HTTP.TLS = &config.TLSConfig{
Cert: ServerCert,
Key: ServerKey,
}
conf.HTTP.Auth = &config.AuthConfig{
HTPasswd: config.AuthHTPasswd{
Path: htpasswdPath,
},
}
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
TLS: &config.TLSConfig{
CACert: CACert,
},
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
}
Convey("All 3 controllers should start up and respond without error", func() {
for _, port := range ports {
resp, err := resty.R().SetBasicAuth(username, password).Get(test.GetSecureBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
}
})
Convey("Should upload images to repos and fetch same response from all 3 instances", func() {
reposToTest := []string{"debian", "alpine", "ubuntu"}
for idx, repoName := range reposToTest {
img := CreateRandomImage()
// Upload to each instance based on loop counter
err := UploadImageWithBasicAuth(img, test.GetSecureBaseURL(ports[idx]), repoName, "1.0", username, password)
So(err, ShouldBeNil)
// Query all 3 instances and expect the same response
for _, port := range ports {
resp, err := resty.R().SetBasicAuth(username, password).Get(
fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(port), repoName),
)
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
result := common.ImageTags{}
err = json.Unmarshal(resp.Body(), &result)
if err != nil {
t.Fatalf("Failed to unmarshal")
}
So(result.Name, ShouldEqual, repoName)
So(len(result.Tags), ShouldEqual, 1)
So(result.Tags[0], ShouldEqual, "1.0")
}
}
})
})
// when the RootCA file does not exist, expect an error
Convey("Given a zot scale out cluster in with 2 members and an incorrect RootCACert", t, func() {
numMembers := 2
ports := make([]string, numMembers)
clusterMembers := make([]string, numMembers)
for idx := 0; idx < numMembers; idx++ {
port := test.GetFreePort()
ports[idx] = port
clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port)
}
for _, port := range ports {
conf := config.New()
conf.HTTP.Port = port
conf.HTTP.TLS = &config.TLSConfig{
Cert: ServerCert,
Key: ServerKey,
}
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
TLS: &config.TLSConfig{
CACert: "/tmp/does-not-exist.crt",
},
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
}
caCert, err := os.ReadFile(CACert)
So(err, ShouldBeNil)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12})
defer func() { resty.SetTLSClientConfig(nil) }()
Convey("Both controllers should start up and respond without error", func() {
for _, port := range ports {
resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
}
})
Convey("Proxying a request should fail with an error", func() {
// debian gets proxied to the second instance
resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian"))
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError)
})
})
// when the server cert file does not exist, expect an error while proxying
Convey("Given a zot scale out cluster in with 2 members and an incorrect server cert", t, func() {
numMembers := 2
ports := make([]string, numMembers)
clusterMembers := make([]string, numMembers)
for idx := 0; idx < numMembers; idx++ {
port := test.GetFreePort()
ports[idx] = port
clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port)
}
for _, port := range ports {
conf := config.New()
conf.HTTP.Port = port
conf.HTTP.TLS = &config.TLSConfig{
Cert: ServerCert,
Key: ServerKey,
}
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
TLS: &config.TLSConfig{
CACert: CACert,
Cert: "/tmp/does-not-exist.crt",
},
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
}
caCert, err := os.ReadFile(CACert)
So(err, ShouldBeNil)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12})
defer func() { resty.SetTLSClientConfig(nil) }()
Convey("Both controllers should start up and respond without error", func() {
for _, port := range ports {
resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
}
})
Convey("Proxying a request should fail with an error", func() {
// debian gets proxied to the second instance
resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian"))
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError)
})
})
// when the server key file does not exist, expect an error while proxying
Convey("Given a zot scale out cluster in with 2 members and an incorrect server key", t, func() {
numMembers := 2
ports := make([]string, numMembers)
clusterMembers := make([]string, numMembers)
for idx := 0; idx < numMembers; idx++ {
port := test.GetFreePort()
ports[idx] = port
clusterMembers[idx] = fmt.Sprintf("127.0.0.1:%s", port)
}
for _, port := range ports {
conf := config.New()
conf.HTTP.Port = port
conf.HTTP.TLS = &config.TLSConfig{
Cert: ServerCert,
Key: ServerKey,
}
conf.Cluster = &config.ClusterConfig{
Members: clusterMembers,
HashKey: "loremipsumdolors",
TLS: &config.TLSConfig{
CACert: CACert,
Cert: ServerCert,
Key: "/tmp/does-not-exist.crt",
},
}
ctrlr := makeController(conf, t.TempDir())
cm := test.NewControllerManager(ctrlr)
cm.StartAndWait(port)
defer cm.StopServer()
}
caCert, err := os.ReadFile(CACert)
So(err, ShouldBeNil)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
resty.SetTLSClientConfig(&tls.Config{RootCAs: caCertPool, MinVersion: tls.VersionTLS12})
defer func() { resty.SetTLSClientConfig(nil) }()
Convey("Both controllers should start up and respond without error", func() {
for _, port := range ports {
resp, err := resty.R().Get(test.GetSecureBaseURL(port) + "/v2/")
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusOK)
}
})
Convey("Proxying a request should fail with an error", func() {
// debian gets proxied to the second instance
resp, err := resty.R().Get(fmt.Sprintf("%s/v2/%s/tags/list", test.GetSecureBaseURL(ports[0]), "debian"))
So(err, ShouldBeNil)
So(resp, ShouldNotBeNil)
So(resp.StatusCode(), ShouldEqual, http.StatusInternalServerError)
})
})
}
func TestPrintTracebackOnPanic(t *testing.T) {
Convey("Run server on unavailable port", t, func() {
port := test.GetFreePort()
+258
View File
@@ -0,0 +1,258 @@
package api
import (
"bytes"
"context"
"fmt"
"io"
"net"
"net/http"
"github.com/dchest/siphash"
"github.com/gorilla/mux"
"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/api/constants"
"zotregistry.dev/zot/pkg/common"
)
// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure
// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster.
// based on the hash value of the repository name, the request will either be handled locally
// or proxied to another zot member in the cluster to get the data before sending a response to the client.
func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc {
return func(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
config := ctrlr.Config
logger := ctrlr.Log
// if no cluster or single-node cluster, handle locally.
if config.Cluster == nil || len(config.Cluster.Members) == 1 {
next.ServeHTTP(response, request)
return
}
// since the handler has been wrapped, it should be possible to get the name
// of the repository from the mux.
vars := mux.Vars(request)
name, ok := vars["name"]
if !ok || name == "" {
response.WriteHeader(http.StatusNotFound)
return
}
// the target member is the only one which should do read/write for the dist-spec APIs
// for the given repository.
targetMemberIndex, targetMember := computeTargetMember(config, name)
logger.Debug().Str(constants.RepositoryLogKey, name).
Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex))
// if the target member is the same as the local member, the current member should handle the request.
// since the instances have the same config, a quick index lookup is sufficient
if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex {
logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally")
next.ServeHTTP(response, request)
return
}
// if the header contains a hop-count, return an error response as there should be no multi-hop
if request.Header.Get(constants.ScaleOutHopCountHeader) != "" {
logger.Fatal().Str("url", request.URL.String()).
Msg("failed to process request - cannot proxy an already proxied request")
return
}
logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request")
proxyResponse, err := proxyHTTPRequest(request.Context(), request, targetMember, ctrlr)
if err != nil {
logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request")
http.Error(response, err.Error(), http.StatusInternalServerError)
return
}
defer proxyResponse.Body.Close()
copyHeader(response.Header(), proxyResponse.Header)
response.WriteHeader(proxyResponse.StatusCode)
_, _ = io.Copy(response, proxyResponse.Body)
})
}
}
// computes the target member using siphash and returns the index and the member
// siphash was chosen to prevent against hash attacks where an attacker
// can target all requests to one given instance instead of balancing across the cluster
// resulting in a Denial-of-Service (DOS).
// ref: https://en.wikipedia.org/wiki/SipHash
func computeTargetMember(config *config.Config, name string) (uint64, string) {
h := siphash.New([]byte(config.Cluster.HashKey))
h.Write([]byte(name))
sum64 := h.Sum64()
targetIdx := sum64 % uint64(len(config.Cluster.Members))
return targetIdx, config.Cluster.Members[targetIdx]
}
// gets all the server sockets of a target member - IP:Port.
// for IPv6, the socket is [IPv6]:Port.
// if the input is an IP address, returns the same targetMember in an array.
// if the input is a host name, performs a lookup and returns the server sockets.
func getTargetMemberServerSockets(targetMemberSocket string) ([]string, error) {
targetHost, targetPort, err := net.SplitHostPort(targetMemberSocket)
if err != nil {
return []string{}, err
}
addr := net.ParseIP(targetHost)
if addr != nil {
// this is an IP address, return as is
return []string{targetMemberSocket}, nil
}
// this is a hostname - try to resolve to an IP
resolvedAddrs, err := common.GetIPFromHostName(targetHost)
if err != nil {
return []string{}, err
}
targetSockets := make([]string, len(resolvedAddrs))
for idx, resolvedAddr := range resolvedAddrs {
targetSockets[idx] = net.JoinHostPort(resolvedAddr, targetPort)
}
return targetSockets, nil
}
// proxy the request to the target member and return a pointer to the response or an error.
func proxyHTTPRequest(ctx context.Context, req *http.Request,
targetMember string, ctrlr *Controller,
) (*http.Response, error) {
cloneURL := *req.URL
proxyQueryScheme := "http"
if ctrlr.Config.HTTP.TLS != nil {
proxyQueryScheme = "https"
}
cloneURL.Scheme = proxyQueryScheme
cloneURL.Host = targetMember
clonedBody := cloneRequestBody(req)
fwdRequest, err := http.NewRequestWithContext(ctx, req.Method, cloneURL.String(), clonedBody)
if err != nil {
return nil, err
}
copyHeader(fwdRequest.Header, req.Header)
// always set hop count to 1 for now.
// the handler wrapper above will terminate the process if it sees a request that
// already has a hop count but is due for proxying.
fwdRequest.Header.Set(constants.ScaleOutHopCountHeader, "1")
clientOpts := common.HTTPClientOptions{
TLSEnabled: ctrlr.Config.HTTP.TLS != nil,
VerifyTLS: ctrlr.Config.HTTP.TLS != nil, // for now, always verify TLS when TLS mode is enabled
Host: targetMember,
}
tlsConfig := ctrlr.Config.Cluster.TLS
if tlsConfig != nil {
clientOpts.CertOptions.ClientCertFile = tlsConfig.Cert
clientOpts.CertOptions.ClientKeyFile = tlsConfig.Key
clientOpts.CertOptions.RootCaCertFile = tlsConfig.CACert
}
httpClient, err := common.CreateHTTPClient(&clientOpts)
if err != nil {
return nil, err
}
resp, err := httpClient.Do(fwdRequest)
if err != nil {
return nil, err
}
var clonedRespBody bytes.Buffer
// copy out the contents into a new buffer as the response body
// stream should be closed to get all the data out.
_, _ = io.Copy(&clonedRespBody, resp.Body)
resp.Body.Close()
// after closing the original body, substitute it with a new reader
// using the buffer that was just created.
// this buffer should be closed later by the consumer of the response.
resp.Body = io.NopCloser(bytes.NewReader(clonedRespBody.Bytes()))
return resp, nil
}
func cloneRequestBody(src *http.Request) io.Reader {
var bCloneForOriginal, bCloneForCopy bytes.Buffer
multiWriter := io.MultiWriter(&bCloneForOriginal, &bCloneForCopy)
numBytesCopied, _ := io.Copy(multiWriter, src.Body)
// if the body is a type of io.NopCloser and length is 0,
// the Content-Length header is not sent in the proxied request.
// explicitly returning http.NoBody allows the implementation
// to set the header.
// ref: https://github.com/golang/go/issues/34295
if numBytesCopied == 0 {
src.Body = http.NoBody
return http.NoBody
}
src.Body = io.NopCloser(&bCloneForOriginal)
return bytes.NewReader(bCloneForCopy.Bytes())
}
func copyHeader(dst, src http.Header) {
for k, vv := range src {
for _, v := range vv {
dst.Add(k, v)
}
}
}
// identifies and returns the cluster socket and index.
// this is the socket which the scale out cluster members will use for
// proxying and communication among each other.
// returns index, socket, error.
// returns an empty string and index value -1 if the cluster socket is not found.
func GetLocalMemberClusterSocket(members []string, localSockets []string) (int, string, error) {
for memberIdx, member := range members {
// for each member, get the full list of sockets, including DNS resolution
memberSockets, err := getTargetMemberServerSockets(member)
if err != nil {
return -1, "", err
}
// for each member socket that we have, compare all the local sockets with
// it to see if there is any match.
for _, memberSocket := range memberSockets {
for _, localSocket := range localSockets {
// this checks if the sockets are equal at a host port level
areSocketsEqual, err := common.AreSocketsEqual(memberSocket, localSocket)
if err != nil {
return -1, "", err
}
if areSocketsEqual {
return memberIdx, member, nil
}
}
}
}
return -1, "", nil
}
+59
View File
@@ -0,0 +1,59 @@
//go:build sync && scrub && metrics && search && lint && userprefs && mgmt && imagetrust && ui
// +build sync,scrub,metrics,search,lint,userprefs,mgmt,imagetrust,ui
package api_test
import (
"testing"
. "github.com/smartystreets/goconvey/convey"
"zotregistry.dev/zot/pkg/api"
)
func TestGetLocalMemberClusterSocket(t *testing.T) {
Convey("Should return an error if a domain name doesn't exist", t, func() {
localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"}
members := []string{"127.0.0.1:9001", "thisdoesnotexist:9000", "127.0.0.1:9000"}
index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets)
So(err.Error(), ShouldContainSubstring, "lookup thisdoesnotexist")
So(index, ShouldEqual, -1)
So(socket, ShouldEqual, "")
})
Convey("Should return an error if a local socket is missing a port", t, func() {
localSockets := []string{"127.0.0.1", "172.16.0.1:9000"}
members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"}
index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets)
So(err.Error(), ShouldEqual, "address 127.0.0.1: missing port in address")
So(index, ShouldEqual, -1)
So(socket, ShouldEqual, "")
})
Convey("Should return an error if a member socket is missing a port", t, func() {
localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"}
members := []string{"127.0.0.1:9001", "www.github.com", "127.0.0.1:9000"}
index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets)
So(err.Error(), ShouldEqual, "address www.github.com: missing port in address")
So(index, ShouldEqual, -1)
So(socket, ShouldEqual, "")
})
Convey("Should return the right socket when a local socket is part of members", t, func() {
localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"}
members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"}
index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets)
So(err, ShouldBeNil)
So(index, ShouldEqual, 2)
So(socket, ShouldEqual, "127.0.0.1:9000")
})
Convey("Should return empty when no local socket is part of members", t, func() {
localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"}
members := []string{"127.0.0.1:9002", "127.0.0.1:9001", "www.github.com:443"}
index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets)
So(err, ShouldBeNil)
So(index, ShouldEqual, -1)
So(socket, ShouldBeEmpty)
})
}
+43 -17
View File
@@ -127,40 +127,66 @@ func (rh *RouteHandler) SetupRoutes() {
prefixedDistSpecRouter.Use(DistSpecAuthzHandler(rh.c))
}
clusterRouteProxy := ClusterProxy(rh.c)
// https://github.com/opencontainers/distribution-spec/blob/main/spec.md#endpoints
// dist-spec APIs that need to be proxied are wrapped in clusterRouteProxy for scale-out proxying.
// these are handlers that have a repository name.
{
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", zreg.NameRegexp.String()),
getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)(
applyCORSHeaders(rh.ListTags))).Methods(http.MethodGet, http.MethodOptions)
clusterRouteProxy(
getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)(
applyCORSHeaders(rh.ListTags),
),
),
).Methods(http.MethodGet, http.MethodOptions)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()),
getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)(
applyCORSHeaders(rh.CheckManifest))).Methods(http.MethodHead, http.MethodOptions)
clusterRouteProxy(
getUIHeadersHandler(rh.c.Config, http.MethodHead, http.MethodGet, http.MethodDelete, http.MethodOptions)(
applyCORSHeaders(rh.CheckManifest),
),
),
).Methods(http.MethodHead, http.MethodOptions)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()),
applyCORSHeaders(rh.GetManifest)).Methods(http.MethodGet)
clusterRouteProxy(
applyCORSHeaders(rh.GetManifest),
),
).Methods(http.MethodGet)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()),
rh.UpdateManifest).Methods(http.MethodPut)
clusterRouteProxy(rh.UpdateManifest)).Methods(http.MethodPut)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", zreg.NameRegexp.String()),
applyCORSHeaders(rh.DeleteManifest)).Methods(http.MethodDelete)
clusterRouteProxy(
applyCORSHeaders(rh.DeleteManifest),
),
).Methods(http.MethodDelete)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()),
rh.CheckBlob).Methods(http.MethodHead)
clusterRouteProxy(rh.CheckBlob)).Methods(http.MethodHead)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()),
rh.GetBlob).Methods(http.MethodGet)
clusterRouteProxy(rh.GetBlob)).Methods(http.MethodGet)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", zreg.NameRegexp.String()),
rh.DeleteBlob).Methods(http.MethodDelete)
clusterRouteProxy(rh.DeleteBlob)).Methods(http.MethodDelete)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", zreg.NameRegexp.String()),
rh.CreateBlobUpload).Methods(http.MethodPost)
clusterRouteProxy(rh.CreateBlobUpload)).Methods(http.MethodPost)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()),
rh.GetBlobUpload).Methods(http.MethodGet)
clusterRouteProxy(rh.GetBlobUpload)).Methods(http.MethodGet)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()),
rh.PatchBlobUpload).Methods(http.MethodPatch)
clusterRouteProxy(rh.PatchBlobUpload)).Methods(http.MethodPatch)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()),
rh.UpdateBlobUpload).Methods(http.MethodPut)
clusterRouteProxy(rh.UpdateBlobUpload)).Methods(http.MethodPut)
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", zreg.NameRegexp.String()),
rh.DeleteBlobUpload).Methods(http.MethodDelete)
clusterRouteProxy(rh.DeleteBlobUpload)).Methods(http.MethodDelete)
// support for OCI artifact references
prefixedDistSpecRouter.HandleFunc(fmt.Sprintf("/{name:%s}/referrers/{digest}", zreg.NameRegexp.String()),
getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)(
applyCORSHeaders(rh.GetReferrers))).Methods(http.MethodGet, http.MethodOptions)
clusterRouteProxy(
getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)(
applyCORSHeaders(rh.GetReferrers),
),
),
).Methods(http.MethodGet, http.MethodOptions)
// handlers which work fine with a single node do not need proxying.
// catalog handler doesn't require proxying as the metadata and storage are shared.
// discover and the default path handlers are node-specific so do not require proxying.
prefixedRouter.HandleFunc(constants.ExtCatalogPrefix,
getUIHeadersHandler(rh.c.Config, http.MethodGet, http.MethodOptions)(
applyCORSHeaders(rh.ListRepositories))).Methods(http.MethodGet, http.MethodOptions)