mirror of
https://github.com/project-zot/zot.git
synced 2026-06-15 20:07:55 +08:00
routes: refactor locks to handle large file uploads
The storage layer is protected with read-write locks. However, we may be holding the locks over unnecessarily large critical sections. The typical workflow is that a blob is first uploaded via a per-client private session-id meaning the blob is not publicly visible yet. When the blob being uploaded is very large, the transfer takes a long time while holding the lock. Private session-id based uploads don't really need locks, and hold locks only when blobs are published after the upload is complete.
This commit is contained in:
committed by
Shivam Mishra
parent
25ad71787a
commit
386c72d332
+12
-32
@@ -57,26 +57,6 @@ func (rh *RouteHandler) searchHandler() *gqlHandler.Server {
|
|||||||
return gqlHandler.NewDefaultServer(search.NewExecutableSchema(resConfig))
|
return gqlHandler.NewDefaultServer(search.NewExecutableSchema(resConfig))
|
||||||
}
|
}
|
||||||
|
|
||||||
// blobRLockWrapper calls the real handler with read-lock held.
|
|
||||||
func (rh *RouteHandler) blobRLockWrapper(f func(w http.ResponseWriter,
|
|
||||||
r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
rh.c.ImageStore.RLock()
|
|
||||||
f(w, r)
|
|
||||||
rh.c.ImageStore.RUnlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// blobLockWrapper calls the real handler with write-lock held.
|
|
||||||
func (rh *RouteHandler) blobLockWrapper(f func(w http.ResponseWriter,
|
|
||||||
r *http.Request)) func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
|
||||||
rh.c.ImageStore.Lock()
|
|
||||||
f(w, r)
|
|
||||||
rh.c.ImageStore.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (rh *RouteHandler) SetupRoutes() {
|
func (rh *RouteHandler) SetupRoutes() {
|
||||||
rh.c.Router.Use(AuthHandler(rh.c))
|
rh.c.Router.Use(AuthHandler(rh.c))
|
||||||
g := rh.c.Router.PathPrefix(RoutePrefix).Subrouter()
|
g := rh.c.Router.PathPrefix(RoutePrefix).Subrouter()
|
||||||
@@ -84,29 +64,29 @@ func (rh *RouteHandler) SetupRoutes() {
|
|||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", NameRegexp.String()),
|
||||||
rh.ListTags).Methods("GET")
|
rh.ListTags).Methods("GET")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
||||||
rh.blobRLockWrapper(rh.CheckManifest)).Methods("HEAD")
|
rh.CheckManifest).Methods("HEAD")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
||||||
rh.blobRLockWrapper(rh.GetManifest)).Methods("GET")
|
rh.GetManifest).Methods("GET")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.UpdateManifest)).Methods("PUT")
|
rh.UpdateManifest).Methods("PUT")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.DeleteManifest)).Methods("DELETE")
|
rh.DeleteManifest).Methods("DELETE")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()),
|
||||||
rh.blobRLockWrapper(rh.CheckBlob)).Methods("HEAD")
|
rh.CheckBlob).Methods("HEAD")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()),
|
||||||
rh.blobRLockWrapper(rh.GetBlob)).Methods("GET")
|
rh.GetBlob).Methods("GET")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.DeleteBlob)).Methods("DELETE")
|
rh.DeleteBlob).Methods("DELETE")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.CreateBlobUpload)).Methods("POST")
|
rh.CreateBlobUpload).Methods("POST")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
||||||
rh.blobRLockWrapper(rh.GetBlobUpload)).Methods("GET")
|
rh.GetBlobUpload).Methods("GET")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.PatchBlobUpload)).Methods("PATCH")
|
rh.PatchBlobUpload).Methods("PATCH")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.UpdateBlobUpload)).Methods("PUT")
|
rh.UpdateBlobUpload).Methods("PUT")
|
||||||
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()),
|
||||||
rh.blobLockWrapper(rh.DeleteBlobUpload)).Methods("DELETE")
|
rh.DeleteBlobUpload).Methods("DELETE")
|
||||||
g.HandleFunc("/_catalog",
|
g.HandleFunc("/_catalog",
|
||||||
rh.ListRepositories).Methods("GET")
|
rh.ListRepositories).Methods("GET")
|
||||||
g.HandleFunc("/",
|
g.HandleFunc("/",
|
||||||
|
|||||||
@@ -111,6 +111,9 @@ func (is *ImageStore) Unlock() {
|
|||||||
func (is *ImageStore) InitRepo(name string) error {
|
func (is *ImageStore) InitRepo(name string) error {
|
||||||
repoDir := path.Join(is.rootDir, name)
|
repoDir := path.Join(is.rootDir, name)
|
||||||
|
|
||||||
|
is.Lock()
|
||||||
|
defer is.Unlock()
|
||||||
|
|
||||||
if fi, err := os.Stat(repoDir); err == nil && fi.IsDir() {
|
if fi, err := os.Stat(repoDir); err == nil && fi.IsDir() {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -217,6 +220,9 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) {
|
|||||||
func (is *ImageStore) GetRepositories() ([]string, error) {
|
func (is *ImageStore) GetRepositories() ([]string, error) {
|
||||||
dir := is.rootDir
|
dir := is.rootDir
|
||||||
|
|
||||||
|
is.RLock()
|
||||||
|
defer is.RUnlock()
|
||||||
|
|
||||||
_, err := ioutil.ReadDir(dir)
|
_, err := ioutil.ReadDir(dir)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
is.log.Error().Err(err).Msg("failure walking storage root-dir")
|
is.log.Error().Err(err).Msg("failure walking storage root-dir")
|
||||||
@@ -258,6 +264,9 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) {
|
|||||||
return nil, errors.ErrRepoNotFound
|
return nil, errors.ErrRepoNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is.RLock()
|
||||||
|
defer is.RUnlock()
|
||||||
|
|
||||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
|
is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json")
|
||||||
@@ -289,6 +298,9 @@ func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, s
|
|||||||
return nil, "", "", errors.ErrRepoNotFound
|
return nil, "", "", errors.ErrRepoNotFound
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is.RLock()
|
||||||
|
defer is.RUnlock()
|
||||||
|
|
||||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -414,6 +426,9 @@ func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType
|
|||||||
refIsDigest = true
|
refIsDigest = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is.Lock()
|
||||||
|
defer is.Unlock()
|
||||||
|
|
||||||
dir := path.Join(is.rootDir, repo)
|
dir := path.Join(is.rootDir, repo)
|
||||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||||
|
|
||||||
@@ -532,6 +547,9 @@ func (is *ImageStore) DeleteImageManifest(repo string, reference string) error {
|
|||||||
return errors.ErrBadManifest
|
return errors.ErrBadManifest
|
||||||
}
|
}
|
||||||
|
|
||||||
|
is.Lock()
|
||||||
|
defer is.Unlock()
|
||||||
|
|
||||||
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
buf, err := ioutil.ReadFile(path.Join(dir, "index.json"))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -770,6 +788,10 @@ func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader,
|
|||||||
}
|
}
|
||||||
|
|
||||||
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
|
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
|
||||||
|
|
||||||
|
is.Lock()
|
||||||
|
defer is.Unlock()
|
||||||
|
|
||||||
ensureDir(dir, is.log)
|
ensureDir(dir, is.log)
|
||||||
dst := is.BlobPath(repo, dstDigest)
|
dst := is.BlobPath(repo, dstDigest)
|
||||||
|
|
||||||
@@ -835,6 +857,10 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
|
dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String())
|
||||||
|
|
||||||
|
is.Lock()
|
||||||
|
defer is.Unlock()
|
||||||
|
|
||||||
ensureDir(dir, is.log)
|
ensureDir(dir, is.log)
|
||||||
dst := is.BlobPath(repo, dstDigest)
|
dst := is.BlobPath(repo, dstDigest)
|
||||||
|
|
||||||
@@ -948,6 +974,9 @@ func (is *ImageStore) CheckBlob(repo string, digest string,
|
|||||||
|
|
||||||
blobPath := is.BlobPath(repo, d)
|
blobPath := is.BlobPath(repo, d)
|
||||||
|
|
||||||
|
is.RLock()
|
||||||
|
defer is.RUnlock()
|
||||||
|
|
||||||
blobInfo, err := os.Stat(blobPath)
|
blobInfo, err := os.Stat(blobPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
||||||
@@ -969,6 +998,9 @@ func (is *ImageStore) GetBlob(repo string, digest string, mediaType string) (io.
|
|||||||
|
|
||||||
blobPath := is.BlobPath(repo, d)
|
blobPath := is.BlobPath(repo, d)
|
||||||
|
|
||||||
|
is.RLock()
|
||||||
|
defer is.RUnlock()
|
||||||
|
|
||||||
blobInfo, err := os.Stat(blobPath)
|
blobInfo, err := os.Stat(blobPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
||||||
@@ -994,6 +1026,9 @@ func (is *ImageStore) DeleteBlob(repo string, digest string) error {
|
|||||||
|
|
||||||
blobPath := is.BlobPath(repo, d)
|
blobPath := is.BlobPath(repo, d)
|
||||||
|
|
||||||
|
is.Lock()
|
||||||
|
defer is.Unlock()
|
||||||
|
|
||||||
_, err = os.Stat(blobPath)
|
_, err = os.Stat(blobPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")
|
||||||
|
|||||||
Reference in New Issue
Block a user