From 386c72d332018a3e0deae50ce51c2f4cff3ee8ff Mon Sep 17 00:00:00 2001 From: Ramkumar Chinchani Date: Wed, 30 Sep 2020 17:16:34 -0700 Subject: [PATCH] routes: refactor locks to handle large file uploads The storage layer is protected with read-write locks. However, we may be holding the locks over unnecessarily large critical sections. The typical workflow is that a blob is first uploaded via a per-client private session-id meaning the blob is not publicly visible yet. When the blob being uploaded is very large, the transfer takes a long time while holding the lock. Private session-id based uploads don't really need locks, and hold locks only when blobs are published after the upload is complete. --- pkg/api/routes.go | 44 ++++++++++++------------------------------ pkg/storage/storage.go | 35 +++++++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 32 deletions(-) diff --git a/pkg/api/routes.go b/pkg/api/routes.go index 74992ff3..dc88e062 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -57,26 +57,6 @@ func (rh *RouteHandler) searchHandler() *gqlHandler.Server { return gqlHandler.NewDefaultServer(search.NewExecutableSchema(resConfig)) } -// blobRLockWrapper calls the real handler with read-lock held. -func (rh *RouteHandler) blobRLockWrapper(f func(w http.ResponseWriter, - r *http.Request)) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - rh.c.ImageStore.RLock() - f(w, r) - rh.c.ImageStore.RUnlock() - } -} - -// blobLockWrapper calls the real handler with write-lock held. -func (rh *RouteHandler) blobLockWrapper(f func(w http.ResponseWriter, - r *http.Request)) func(w http.ResponseWriter, r *http.Request) { - return func(w http.ResponseWriter, r *http.Request) { - rh.c.ImageStore.Lock() - f(w, r) - rh.c.ImageStore.Unlock() - } -} - func (rh *RouteHandler) SetupRoutes() { rh.c.Router.Use(AuthHandler(rh.c)) g := rh.c.Router.PathPrefix(RoutePrefix).Subrouter() @@ -84,29 +64,29 @@ func (rh *RouteHandler) SetupRoutes() { g.HandleFunc(fmt.Sprintf("/{name:%s}/tags/list", NameRegexp.String()), rh.ListTags).Methods("GET") g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()), - rh.blobRLockWrapper(rh.CheckManifest)).Methods("HEAD") + rh.CheckManifest).Methods("HEAD") g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()), - rh.blobRLockWrapper(rh.GetManifest)).Methods("GET") + rh.GetManifest).Methods("GET") g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()), - rh.blobLockWrapper(rh.UpdateManifest)).Methods("PUT") + rh.UpdateManifest).Methods("PUT") g.HandleFunc(fmt.Sprintf("/{name:%s}/manifests/{reference}", NameRegexp.String()), - rh.blobLockWrapper(rh.DeleteManifest)).Methods("DELETE") + rh.DeleteManifest).Methods("DELETE") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()), - rh.blobRLockWrapper(rh.CheckBlob)).Methods("HEAD") + rh.CheckBlob).Methods("HEAD") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()), - rh.blobRLockWrapper(rh.GetBlob)).Methods("GET") + rh.GetBlob).Methods("GET") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/{digest}", NameRegexp.String()), - rh.blobLockWrapper(rh.DeleteBlob)).Methods("DELETE") + rh.DeleteBlob).Methods("DELETE") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/", NameRegexp.String()), - rh.blobLockWrapper(rh.CreateBlobUpload)).Methods("POST") + rh.CreateBlobUpload).Methods("POST") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()), - rh.blobRLockWrapper(rh.GetBlobUpload)).Methods("GET") + rh.GetBlobUpload).Methods("GET") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()), - rh.blobLockWrapper(rh.PatchBlobUpload)).Methods("PATCH") + rh.PatchBlobUpload).Methods("PATCH") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()), - rh.blobLockWrapper(rh.UpdateBlobUpload)).Methods("PUT") + rh.UpdateBlobUpload).Methods("PUT") g.HandleFunc(fmt.Sprintf("/{name:%s}/blobs/uploads/{session_id}", NameRegexp.String()), - rh.blobLockWrapper(rh.DeleteBlobUpload)).Methods("DELETE") + rh.DeleteBlobUpload).Methods("DELETE") g.HandleFunc("/_catalog", rh.ListRepositories).Methods("GET") g.HandleFunc("/", diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 917cab03..565a7d1c 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -111,6 +111,9 @@ func (is *ImageStore) Unlock() { func (is *ImageStore) InitRepo(name string) error { repoDir := path.Join(is.rootDir, name) + is.Lock() + defer is.Unlock() + if fi, err := os.Stat(repoDir); err == nil && fi.IsDir() { return nil } @@ -217,6 +220,9 @@ func (is *ImageStore) ValidateRepo(name string) (bool, error) { func (is *ImageStore) GetRepositories() ([]string, error) { dir := is.rootDir + is.RLock() + defer is.RUnlock() + _, err := ioutil.ReadDir(dir) if err != nil { is.log.Error().Err(err).Msg("failure walking storage root-dir") @@ -258,6 +264,9 @@ func (is *ImageStore) GetImageTags(repo string) ([]string, error) { return nil, errors.ErrRepoNotFound } + is.RLock() + defer is.RUnlock() + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") @@ -289,6 +298,9 @@ func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, s return nil, "", "", errors.ErrRepoNotFound } + is.RLock() + defer is.RUnlock() + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { @@ -414,6 +426,9 @@ func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType refIsDigest = true } + is.Lock() + defer is.Unlock() + dir := path.Join(is.rootDir, repo) buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) @@ -532,6 +547,9 @@ func (is *ImageStore) DeleteImageManifest(repo string, reference string) error { return errors.ErrBadManifest } + is.Lock() + defer is.Unlock() + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) if err != nil { @@ -770,6 +788,10 @@ func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader, } dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) + + is.Lock() + defer is.Unlock() + ensureDir(dir, is.log) dst := is.BlobPath(repo, dstDigest) @@ -835,6 +857,10 @@ func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string) } dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) + + is.Lock() + defer is.Unlock() + ensureDir(dir, is.log) dst := is.BlobPath(repo, dstDigest) @@ -948,6 +974,9 @@ func (is *ImageStore) CheckBlob(repo string, digest string, blobPath := is.BlobPath(repo, d) + is.RLock() + defer is.RUnlock() + blobInfo, err := os.Stat(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") @@ -969,6 +998,9 @@ func (is *ImageStore) GetBlob(repo string, digest string, mediaType string) (io. blobPath := is.BlobPath(repo, d) + is.RLock() + defer is.RUnlock() + blobInfo, err := os.Stat(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") @@ -994,6 +1026,9 @@ func (is *ImageStore) DeleteBlob(repo string, digest string) error { blobPath := is.BlobPath(repo, d) + is.Lock() + defer is.Unlock() + _, err = os.Stat(blobPath) if err != nil { is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob")