From 7d077eaf5a57a3307bc3b24b53f295cd52f3bc39 Mon Sep 17 00:00:00 2001 From: Petu Eusebiu Date: Thu, 30 Sep 2021 16:27:13 +0300 Subject: [PATCH] Added storage interface --- pkg/api/controller.go | 2 +- pkg/api/routes.go | 2 +- pkg/extensions/search/common/common_test.go | 30 +- pkg/extensions/search/common/oci_layout.go | 52 +- pkg/extensions/search/cve/cve.go | 6 +- pkg/extensions/search/cve/cve_test.go | 4 +- pkg/extensions/search/digest/digest.go | 10 +- pkg/extensions/search/digest/digest_test.go | 102 +- pkg/extensions/search/resolver.go | 35 +- pkg/storage/storage.go | 1321 +----------------- pkg/storage/storage_fs.go | 1333 +++++++++++++++++++ pkg/storage/storage_test.go | 246 +++- 12 files changed, 1754 insertions(+), 1389 deletions(-) create mode 100644 pkg/storage/storage_fs.go diff --git a/pkg/api/controller.go b/pkg/api/controller.go index 6af74671..804c4452 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -115,7 +115,7 @@ func (c *Controller) Run() error { if len(c.Config.Storage.SubPaths) > 0 { subPaths := c.Config.Storage.SubPaths - subImageStore := make(map[string]*storage.ImageStore) + subImageStore := make(map[string]storage.ImageStore) // creating image store per subpaths for route, storageConfig := range subPaths { diff --git a/pkg/api/routes.go b/pkg/api/routes.go index e596d732..627ed701 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -1237,6 +1237,6 @@ func WriteDataFromReader(w http.ResponseWriter, status int, length int64, mediaT } // will return image storage corresponding to subpath provided in config. -func (rh *RouteHandler) getImageStore(name string) *storage.ImageStore { +func (rh *RouteHandler) getImageStore(name string) storage.ImageStore { return rh.c.StoreController.GetImageStore(name) } diff --git a/pkg/extensions/search/common/common_test.go b/pkg/extensions/search/common/common_test.go index b0ddb5cd..c98db392 100644 --- a/pkg/extensions/search/common/common_test.go +++ b/pkg/extensions/search/common/common_test.go @@ -157,48 +157,52 @@ func TestImageFormat(t *testing.T) { Convey("Test valid image", t, func() { log := log.NewLogger("debug", "") dbDir := "../../../../test/data" - olu := common.NewOciLayoutUtils(log) - isValidImage, err := olu.IsValidImageFormat(path.Join(dbDir, "zot-test")) + + defaultStore := storage.NewImageStore(dbDir, false, false, log) + storeController := storage.StoreController{DefaultStore: defaultStore} + olu := common.NewOciLayoutUtils(storeController, log) + + isValidImage, err := olu.IsValidImageFormat("zot-test") So(err, ShouldBeNil) So(isValidImage, ShouldEqual, true) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-test:0.0.1")) + isValidImage, err = olu.IsValidImageFormat("zot-test:0.0.1") So(err, ShouldBeNil) So(isValidImage, ShouldEqual, true) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-test:0.0.")) + isValidImage, err = olu.IsValidImageFormat("zot-test:0.0.") So(err, ShouldBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-noindex-test")) + isValidImage, err = olu.IsValidImageFormat("zot-noindex-test") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot--tet")) + isValidImage, err = olu.IsValidImageFormat("zot--tet") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-noindex-test")) + isValidImage, err = olu.IsValidImageFormat("zot-noindex-test") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-squashfs-noblobs")) + isValidImage, err = olu.IsValidImageFormat("zot-squashfs-noblobs") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-squashfs-invalid-index")) + isValidImage, err = olu.IsValidImageFormat("zot-squashfs-invalid-index") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-squashfs-invalid-blob")) + isValidImage, err = olu.IsValidImageFormat("zot-squashfs-invalid-blob") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-squashfs-test:0.3.22-squashfs")) + isValidImage, err = olu.IsValidImageFormat("zot-squashfs-test:0.3.22-squashfs") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) - isValidImage, err = olu.IsValidImageFormat(path.Join(dbDir, "zot-nonreadable-test")) + isValidImage, err = olu.IsValidImageFormat("zot-nonreadable-test") So(err, ShouldNotBeNil) So(isValidImage, ShouldEqual, false) }) @@ -443,7 +447,7 @@ func TestUtilsMethod(t *testing.T) { subStore := storage.NewImageStore(subRootDir, false, false, log) - subStoreMap := make(map[string]*storage.ImageStore) + subStoreMap := make(map[string]storage.ImageStore) subStoreMap["/b"] = subStore diff --git a/pkg/extensions/search/common/oci_layout.go b/pkg/extensions/search/common/oci_layout.go index 89074cb3..f3ac827e 100644 --- a/pkg/extensions/search/common/oci_layout.go +++ b/pkg/extensions/search/common/oci_layout.go @@ -3,14 +3,15 @@ package common import ( "encoding/json" - "io/ioutil" - "os" "path" "strings" "time" + goerrors "errors" + "github.com/anuvu/zot/errors" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/types" godigest "github.com/opencontainers/go-digest" @@ -19,21 +20,22 @@ import ( // OciLayoutInfo ... type OciLayoutUtils struct { - Log log.Logger + Log log.Logger + StoreController storage.StoreController } // NewOciLayoutUtils initializes a new OciLayoutUtils object. -func NewOciLayoutUtils(log log.Logger) *OciLayoutUtils { - return &OciLayoutUtils{Log: log} +func NewOciLayoutUtils(storeController storage.StoreController, log log.Logger) *OciLayoutUtils { + return &OciLayoutUtils{Log: log, StoreController: storeController} } // Below method will return image path including root dir, root dir is determined by splitting. - -func (olu OciLayoutUtils) GetImageManifests(imagePath string) ([]ispec.Descriptor, error) { - buf, err := ioutil.ReadFile(path.Join(imagePath, "index.json")) +func (olu OciLayoutUtils) GetImageManifests(image string) ([]ispec.Descriptor, error) { + imageStore := olu.StoreController.GetImageStore(image) + buf, err := imageStore.GetIndexContent(image) if err != nil { - if os.IsNotExist(err) { + if goerrors.Is(errors.ErrRepoNotFound, err) { olu.Log.Error().Err(err).Msg("index.json doesn't exist") return nil, errors.ErrRepoNotFound @@ -47,17 +49,20 @@ func (olu OciLayoutUtils) GetImageManifests(imagePath string) ([]ispec.Descripto var index ispec.Index if err := json.Unmarshal(buf, &index); err != nil { - olu.Log.Error().Err(err).Str("dir", imagePath).Msg("invalid JSON") + olu.Log.Error().Err(err).Str("dir", path.Join(imageStore.RootDir(), image)).Msg("invalid JSON") return nil, errors.ErrRepoNotFound } return index.Manifests, nil } +//nolint: interfacer func (olu OciLayoutUtils) GetImageBlobManifest(imageDir string, digest godigest.Digest) (v1.Manifest, error) { var blobIndex v1.Manifest - blobBuf, err := ioutil.ReadFile(path.Join(imageDir, "blobs", digest.Algorithm().String(), digest.Encoded())) + imageStore := olu.StoreController.GetImageStore(imageDir) + + blobBuf, err := imageStore.GetBlobContent(imageDir, digest.String()) if err != nil { olu.Log.Error().Err(err).Msg("unable to open image metadata file") @@ -73,10 +78,13 @@ func (olu OciLayoutUtils) GetImageBlobManifest(imageDir string, digest godigest. return blobIndex, nil } +//nolint: interfacer func (olu OciLayoutUtils) GetImageInfo(imageDir string, hash v1.Hash) (ispec.Image, error) { var imageInfo ispec.Image - blobBuf, err := ioutil.ReadFile(path.Join(imageDir, "blobs", hash.Algorithm, hash.Hex)) + imageStore := olu.StoreController.GetImageStore(imageDir) + + blobBuf, err := imageStore.GetBlobContent(imageDir, hash.String()) if err != nil { olu.Log.Error().Err(err).Msg("unable to open image layers file") @@ -92,17 +100,10 @@ func (olu OciLayoutUtils) GetImageInfo(imageDir string, hash v1.Hash) (ispec.Ima return imageInfo, err } -func (olu OciLayoutUtils) IsValidImageFormat(imagePath string) (bool, error) { - imageDir, inputTag := GetImageDirAndTag(imagePath) - - if !DirExists(imageDir) { - olu.Log.Error().Msg("image directory doesn't exist") - - return false, errors.ErrRepoNotFound - } +func (olu OciLayoutUtils) IsValidImageFormat(image string) (bool, error) { + imageDir, inputTag := GetImageDirAndTag(image) manifests, err := olu.GetImageManifests(imageDir) - if err != nil { return false, err } @@ -181,15 +182,6 @@ func (olu OciLayoutUtils) GetImageTagsWithTimestamp(repo string) ([]TagInfo, err return tagsInfo, nil } -func DirExists(d string) bool { - fi, err := os.Stat(d) - if err != nil && os.IsNotExist(err) { - return false - } - - return fi.IsDir() -} - func GetImageDirAndTag(imageName string) (string, string) { var imageDir string diff --git a/pkg/extensions/search/cve/cve.go b/pkg/extensions/search/cve/cve.go index 11c9f39c..d9b261a7 100644 --- a/pkg/extensions/search/cve/cve.go +++ b/pkg/extensions/search/cve/cve.go @@ -40,7 +40,7 @@ func ScanImage(config *config.Config) (report.Results, error) { func GetCVEInfo(storeController storage.StoreController, log log.Logger) (*CveInfo, error) { cveController := CveTrivyController{} - layoutUtils := common.NewOciLayoutUtils(log) + layoutUtils := common.NewOciLayoutUtils(storeController, log) subCveConfig := make(map[string]*config.Config) @@ -118,7 +118,7 @@ func (cveinfo CveInfo) GetTrivyConfig(image string) *config.Config { return trivyConfig } -func (cveinfo CveInfo) GetImageListForCVE(repo string, id string, imgStore *storage.ImageStore, +func (cveinfo CveInfo) GetImageListForCVE(repo string, id string, imgStore storage.ImageStore, trivyConfig *config.Config) ([]*string, error) { tags := make([]*string, 0) @@ -134,7 +134,7 @@ func (cveinfo CveInfo) GetImageListForCVE(repo string, id string, imgStore *stor for _, tag := range tagList { trivyConfig.TrivyConfig.Input = fmt.Sprintf("%s:%s", path.Join(rootDir, repo), tag) - isValidImage, _ := cveinfo.LayoutUtils.IsValidImageFormat(trivyConfig.TrivyConfig.Input) + isValidImage, _ := cveinfo.LayoutUtils.IsValidImageFormat(fmt.Sprintf("%s:%s", repo, tag)) if !isValidImage { cveinfo.Log.Debug().Str("image", repo+":"+tag).Msg("image media type not supported for scanning") diff --git a/pkg/extensions/search/cve/cve_test.go b/pkg/extensions/search/cve/cve_test.go index 35249d30..782defb2 100644 --- a/pkg/extensions/search/cve/cve_test.go +++ b/pkg/extensions/search/cve/cve_test.go @@ -96,7 +96,7 @@ func testSetup() error { storeController := storage.StoreController{DefaultStore: storage.NewImageStore(dir, false, false, log)} - layoutUtils := common.NewOciLayoutUtils(log) + layoutUtils := common.NewOciLayoutUtils(storeController, log) cve = &cveinfo.CveInfo{Log: log, StoreController: storeController, LayoutUtils: layoutUtils} @@ -421,7 +421,7 @@ func TestMultipleStoragePath(t *testing.T) { storeController.DefaultStore = firstStore - subStore := make(map[string]*storage.ImageStore) + subStore := make(map[string]storage.ImageStore) subStore["/a"] = secondStore subStore["/b"] = thirdStore diff --git a/pkg/extensions/search/digest/digest.go b/pkg/extensions/search/digest/digest.go index 523bfc6b..3cc96925 100644 --- a/pkg/extensions/search/digest/digest.go +++ b/pkg/extensions/search/digest/digest.go @@ -3,9 +3,9 @@ package digestinfo import ( "strings" - "github.com/anuvu/zot/errors" "github.com/anuvu/zot/pkg/extensions/search/common" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" ispec "github.com/opencontainers/image-spec/specs-go/v1" ) @@ -16,8 +16,8 @@ type DigestInfo struct { } // NewDigestInfo initializes a new DigestInfo object. -func NewDigestInfo(log log.Logger) *DigestInfo { - layoutUtils := common.NewOciLayoutUtils(log) +func NewDigestInfo(storeController storage.StoreController, log log.Logger) *DigestInfo { + layoutUtils := common.NewOciLayoutUtils(storeController, log) return &DigestInfo{Log: log, LayoutUtils: layoutUtils} } @@ -26,10 +26,6 @@ func NewDigestInfo(log log.Logger) *DigestInfo { func (digestinfo DigestInfo) GetImageTagsByDigest(repo string, digest string) ([]*string, error) { uniqueTags := []*string{} - if !common.DirExists(repo) { - return nil, errors.ErrRepoNotFound - } - manifests, err := digestinfo.LayoutUtils.GetImageManifests(repo) if err != nil { diff --git a/pkg/extensions/search/digest/digest_test.go b/pkg/extensions/search/digest/digest_test.go index 5552b865..eb7ae0fe 100644 --- a/pkg/extensions/search/digest/digest_test.go +++ b/pkg/extensions/search/digest/digest_test.go @@ -15,6 +15,7 @@ import ( ext "github.com/anuvu/zot/pkg/extensions" digestinfo "github.com/anuvu/zot/pkg/extensions/search/digest" "github.com/anuvu/zot/pkg/log" + "github.com/anuvu/zot/pkg/storage" . "github.com/smartystreets/goconvey/convey" "gopkg.in/resty.v1" ) @@ -23,6 +24,7 @@ import ( var ( digestInfo *digestinfo.DigestInfo rootDir string + subRootDir string ) const ( @@ -62,8 +64,15 @@ func testSetup() error { return err } + subDir, err := ioutil.TempDir("", "sub_digest_test") + if err != nil { + return err + } + rootDir = dir + subRootDir = subDir + // Test images used/copied: // IMAGE NAME TAG DIGEST CONFIG LAYERS SIZE // zot-test 0.0.1 2bacca16 adf3bb6c 76MB @@ -71,14 +80,26 @@ func testSetup() error { // zot-cve-test 0.0.1 63a795ca 8dd57e17 75MB // 7a0437f0 75MB + err = os.Mkdir(subDir+"/a", 0700) + if err != nil { + return err + } + err = copyFiles("../../../../test/data", rootDir) if err != nil { return err } + err = copyFiles("../../../../test/data", subDir+"/a/") + if err != nil { + return err + } + log := log.NewLogger("debug", "") - digestInfo = digestinfo.NewDigestInfo(log) + storeController := storage.StoreController{DefaultStore: storage.NewImageStore(rootDir, false, false, log)} + + digestInfo = digestinfo.NewDigestInfo(storeController, log) return nil } @@ -131,30 +152,30 @@ func copyFiles(sourceDir string, destDir string) error { func TestDigestInfo(t *testing.T) { Convey("Test image tag", t, func() { // Search by manifest digest - imageTags, err := digestInfo.GetImageTagsByDigest(path.Join(rootDir, "zot-cve-test"), "63a795ca") + imageTags, err := digestInfo.GetImageTagsByDigest("zot-cve-test", "63a795ca") So(err, ShouldBeNil) So(len(imageTags), ShouldEqual, 1) So(*imageTags[0], ShouldEqual, "0.0.1") // Search by config digest - imageTags, err = digestInfo.GetImageTagsByDigest(path.Join(rootDir, "zot-test"), "adf3bb6c") + imageTags, err = digestInfo.GetImageTagsByDigest("zot-test", "adf3bb6c") So(err, ShouldBeNil) So(len(imageTags), ShouldEqual, 1) So(*imageTags[0], ShouldEqual, "0.0.1") // Search by layer digest - imageTags, err = digestInfo.GetImageTagsByDigest(path.Join(rootDir, "zot-cve-test"), "7a0437f0") + imageTags, err = digestInfo.GetImageTagsByDigest("zot-cve-test", "7a0437f0") So(err, ShouldBeNil) So(len(imageTags), ShouldEqual, 1) So(*imageTags[0], ShouldEqual, "0.0.1") // Search by non-existent image - imageTags, err = digestInfo.GetImageTagsByDigest(path.Join(rootDir, "zot-tes"), "63a795ca") + imageTags, err = digestInfo.GetImageTagsByDigest("zot-tes", "63a795ca") So(err, ShouldNotBeNil) So(len(imageTags), ShouldEqual, 0) // Search by non-existent digest - imageTags, err = digestInfo.GetImageTagsByDigest(path.Join(rootDir, "zot-test"), "111") + imageTags, err = digestInfo.GetImageTagsByDigest("zot-test", "111") So(err, ShouldBeNil) So(len(imageTags), ShouldEqual, 0) }) @@ -286,6 +307,75 @@ func TestDigestSearchHTTP(t *testing.T) { }) } +func TestDigestSearchHTTPSubPaths(t *testing.T) { + Convey("Test image search by digest scanning using storage subpaths", t, func() { + config := api.NewConfig() + config.HTTP.Port = Port1 + config.Extensions = &ext.ExtensionConfig{ + Search: &ext.SearchConfig{Enable: true}, + } + + c := api.NewController(config) + + globalDir, err := ioutil.TempDir("", "digest_test") + if err != nil { + panic(err) + } + defer os.RemoveAll(globalDir) + + c.Config.Storage.RootDirectory = globalDir + + subPathMap := make(map[string]api.StorageConfig) + + subPathMap["/a"] = api.StorageConfig{RootDirectory: subRootDir} + + c.Config.Storage.SubPaths = subPathMap + + go func() { + // this blocks + if err := c.Run(); err != nil { + return + } + }() + + // wait till ready + for { + _, err := resty.R().Get(BaseURL1) + if err == nil { + break + } + time.Sleep(100 * time.Millisecond) + } + + // shut down server + defer func() { + ctx := context.Background() + _ = c.Server.Shutdown(ctx) + }() + + resp, err := resty.R().Get(BaseURL1 + "/v2/") + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + resp, err = resty.R().Get(BaseURL1 + "/query") + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + resp, err = resty.R().Get(BaseURL1 + "/query?query={ImageListForDigest(id:\"sha\"){Name%20Tags}}") + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + var responseStruct ImgResponseForDigest + err = json.Unmarshal(resp.Body(), &responseStruct) + So(err, ShouldBeNil) + So(len(responseStruct.Errors), ShouldEqual, 0) + So(len(responseStruct.ImgListForDigest.Images), ShouldEqual, 2) + }) +} + func TestDigestSearchDisabled(t *testing.T) { Convey("Test disabling image search", t, func() { dir, err := ioutil.TempDir("", "digest_test") diff --git a/pkg/extensions/search/resolver.go b/pkg/extensions/search/resolver.go index 857b0a3b..d54e8aef 100644 --- a/pkg/extensions/search/resolver.go +++ b/pkg/extensions/search/resolver.go @@ -5,7 +5,6 @@ package search import ( "context" "fmt" - "path" "strconv" "strings" @@ -54,7 +53,7 @@ func GetResolverConfig(log log.Logger, storeController storage.StoreController, } } - digestInfo := digestinfo.NewDigestInfo(log) + digestInfo := digestinfo.NewDigestInfo(storeController, log) resConfig := &Resolver{cveInfo: cveInfo, storeController: storeController, digestInfo: digestInfo, log: log} @@ -69,7 +68,7 @@ func (r *queryResolver) CVEListForImage(ctx context.Context, image string) (*CVE r.log.Info().Str("image", image).Msg("scanning image") - isValidImage, err := r.cveInfo.LayoutUtils.IsValidImageFormat(trivyConfig.TrivyConfig.Input) + isValidImage, err := r.cveInfo.LayoutUtils.IsValidImageFormat(image) if !isValidImage { r.log.Debug().Str("image", image).Msg("image media type not supported for scanning") @@ -201,7 +200,7 @@ func (r *queryResolver) ImageListForCve(ctx context.Context, id string) ([]*ImgR return finalCveResult, nil } -func (r *queryResolver) getImageListForCVE(repoList []string, id string, imgStore *storage.ImageStore, +func (r *queryResolver) getImageListForCVE(repoList []string, id string, imgStore storage.ImageStore, trivyConfig *config.Config) ([]*ImgResultForCve, error) { cveResult := []*ImgResultForCve{} @@ -238,7 +237,7 @@ func (r *queryResolver) ImageListWithCVEFixed(ctx context.Context, id string, im r.log.Info().Str("image", image).Msg("extracting list of tags available in image") - tagsInfo, err := r.cveInfo.LayoutUtils.GetImageTagsWithTimestamp(imagePath) + tagsInfo, err := r.cveInfo.LayoutUtils.GetImageTagsWithTimestamp(image) if err != nil { r.log.Error().Err(err).Msg("unable to read image tags") @@ -252,7 +251,7 @@ func (r *queryResolver) ImageListWithCVEFixed(ctx context.Context, id string, im for _, tag := range tagsInfo { trivyConfig.TrivyConfig.Input = fmt.Sprintf("%s:%s", imagePath, tag.Name) - isValidImage, _ := r.cveInfo.LayoutUtils.IsValidImageFormat(trivyConfig.TrivyConfig.Input) + isValidImage, _ := r.cveInfo.LayoutUtils.IsValidImageFormat(fmt.Sprintf("%s:%s", image, tag.Name)) if !isValidImage { r.log.Debug().Str("image", fmt.Sprintf("%s:%s", image, tag.Name)). @@ -325,9 +324,7 @@ func (r *queryResolver) ImageListForDigest(ctx context.Context, id string) ([]*I r.log.Info().Msg("scanning each global repository") - rootDir := defaultStore.RootDir() - - partialImgResultForDigest, err := r.getImageListForDigest(rootDir, repoList, id) + partialImgResultForDigest, err := r.getImageListForDigest(repoList, id) if err != nil { r.log.Error().Err(err).Msg("unable to get image and tag list for global repositories") @@ -338,8 +335,6 @@ func (r *queryResolver) ImageListForDigest(ctx context.Context, id string) ([]*I subStore := r.storeController.SubStore for _, store := range subStore { - rootDir := store.RootDir() - subRepoList, err := store.GetRepositories() if err != nil { r.log.Error().Err(err).Msg("unable to search sub-repositories") @@ -347,7 +342,7 @@ func (r *queryResolver) ImageListForDigest(ctx context.Context, id string) ([]*I return imgResultForDigest, err } - partialImgResultForDigest, err = r.getImageListForDigest(rootDir, subRepoList, id) + partialImgResultForDigest, err = r.getImageListForDigest(subRepoList, id) if err != nil { r.log.Error().Err(err).Msg("unable to get image and tag list for sub-repositories") @@ -360,7 +355,7 @@ func (r *queryResolver) ImageListForDigest(ctx context.Context, id string) ([]*I return imgResultForDigest, nil } -func (r *queryResolver) getImageListForDigest(rootDir string, repoList []string, +func (r *queryResolver) getImageListForDigest(repoList []string, digest string) ([]*ImgResultForDigest, error) { imgResultForDigest := []*ImgResultForDigest{} @@ -369,7 +364,7 @@ func (r *queryResolver) getImageListForDigest(rootDir string, repoList []string, for _, repo := range repoList { r.log.Info().Str("repo", repo).Msg("filtering list of tags in image repo by digest") - tags, err := r.digestInfo.GetImageTagsByDigest(path.Join(rootDir, repo), digest) + tags, err := r.digestInfo.GetImageTagsByDigest(repo, digest) if err != nil { r.log.Error().Err(err).Msg("unable to get filtered list of image tags") @@ -424,7 +419,7 @@ func (r *queryResolver) ImageListWithLatestTag(ctx context.Context) ([]*ImageInf return imageList, nil } -func (r *queryResolver) getImageListWithLatestTag(store *storage.ImageStore) ([]*ImageInfo, error) { +func (r *queryResolver) getImageListWithLatestTag(store storage.ImageStore) ([]*ImageInfo, error) { results := make([]*ImageInfo, 0) repoList, err := store.GetRepositories() @@ -438,12 +433,10 @@ func (r *queryResolver) getImageListWithLatestTag(store *storage.ImageStore) ([] r.log.Info().Msg("no repositories found") } - dir := store.RootDir() - - layoutUtils := common.NewOciLayoutUtils(r.log) + layoutUtils := common.NewOciLayoutUtils(r.storeController, r.log) for _, repo := range repoList { - tagsInfo, err := layoutUtils.GetImageTagsWithTimestamp(path.Join(dir, repo)) + tagsInfo, err := layoutUtils.GetImageTagsWithTimestamp(repo) if err != nil { r.log.Error().Err(err).Msg("extension api: error getting tag timestamp info") @@ -460,7 +453,7 @@ func (r *queryResolver) getImageListWithLatestTag(store *storage.ImageStore) ([] digest := godigest.Digest(latestTag.Digest) - manifest, err := layoutUtils.GetImageBlobManifest(path.Join(dir, repo), digest) + manifest, err := layoutUtils.GetImageBlobManifest(repo, digest) if err != nil { r.log.Error().Err(err).Msg("extension api: error reading manifest") @@ -471,7 +464,7 @@ func (r *queryResolver) getImageListWithLatestTag(store *storage.ImageStore) ([] name := repo - imageConfig, err := layoutUtils.GetImageInfo(path.Join(dir, repo), manifest.Config.Digest) + imageConfig, err := layoutUtils.GetImageInfo(repo, manifest.Config.Digest) if err != nil { r.log.Error().Err(err).Msg("extension api: error reading image config") diff --git a/pkg/storage/storage.go b/pkg/storage/storage.go index 46072143..a9e10833 100644 --- a/pkg/storage/storage.go +++ b/pkg/storage/storage.go @@ -1,1298 +1,39 @@ package storage import ( - "context" - "crypto/sha256" - "encoding/json" - "fmt" "io" - "io/ioutil" - "os" - "path" - "path/filepath" - "strings" - "sync" - "time" - "github.com/anuvu/zot/errors" - zlog "github.com/anuvu/zot/pkg/log" - apexlog "github.com/apex/log" - guuid "github.com/gofrs/uuid" - godigest "github.com/opencontainers/go-digest" - ispec "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/opencontainers/umoci" - "github.com/opencontainers/umoci/oci/casext" - "github.com/rs/zerolog" + "github.com/opencontainers/go-digest" ) -const ( - // BlobUploadDir defines the upload directory for blob uploads. - BlobUploadDir = ".uploads" - schemaVersion = 2 - gcDelay = 1 * time.Hour -) - -// BlobUpload models and upload request. -type BlobUpload struct { - StoreName string - ID string -} - -type StoreController struct { - DefaultStore *ImageStore - SubStore map[string]*ImageStore -} - -// ImageStore provides the image storage operations. -type ImageStore struct { - rootDir string - lock *sync.RWMutex - blobUploads map[string]BlobUpload - cache *Cache - gc bool - dedupe bool - log zerolog.Logger -} - -func (is *ImageStore) RootDir() string { - return is.rootDir -} - -func getRoutePrefix(name string) string { - names := strings.SplitN(name, "/", 2) - - if len(names) != 2 { // nolint: gomnd - // it means route is of global storage e.g "centos:latest" - if len(names) == 1 { - return "/" - } - } - - return fmt.Sprintf("/%s", names[0]) -} - -func (sc StoreController) GetImageStore(name string) *ImageStore { - if sc.SubStore != nil { - // SubStore is being provided, now we need to find equivalent image store and this will be found by splitting name - prefixName := getRoutePrefix(name) - - imgStore, ok := sc.SubStore[prefixName] - if !ok { - imgStore = sc.DefaultStore - } - - return imgStore - } - - return sc.DefaultStore -} - -// NewImageStore returns a new image store backed by a file storage. -func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger) *ImageStore { - if _, err := os.Stat(rootDir); os.IsNotExist(err) { - if err := os.MkdirAll(rootDir, 0700); err != nil { - log.Error().Err(err).Str("rootDir", rootDir).Msg("unable to create root dir") - return nil - } - } - - is := &ImageStore{ - rootDir: rootDir, - lock: &sync.RWMutex{}, - blobUploads: make(map[string]BlobUpload), - gc: gc, - dedupe: dedupe, - log: log.With().Caller().Logger(), - } - - if dedupe { - is.cache = NewCache(rootDir, "cache", log) - } - - if gc { - // we use umoci GC to perform garbage-collection, but it uses its own logger - // - so capture those logs, could be useful - apexlog.SetLevel(apexlog.DebugLevel) - apexlog.SetHandler(apexlog.HandlerFunc(func(entry *apexlog.Entry) error { - e := log.Debug() - for k, v := range entry.Fields { - e = e.Interface(k, v) - } - e.Msg(entry.Message) - return nil - })) - } - - return is -} - -// RLock read-lock. -func (is *ImageStore) RLock() { - is.lock.RLock() -} - -// RUnlock read-unlock. -func (is *ImageStore) RUnlock() { - is.lock.RUnlock() -} - -// Lock write-lock. -func (is *ImageStore) Lock() { - is.lock.Lock() -} - -// Unlock write-unlock. -func (is *ImageStore) Unlock() { - is.lock.Unlock() -} -func (is *ImageStore) initRepo(name string) error { - repoDir := path.Join(is.rootDir, name) - - // create "blobs" subdir - err := ensureDir(path.Join(repoDir, "blobs"), is.log) - if err != nil { - is.log.Error().Err(err).Msg("error creating blobs subdir") - - return err - } - // create BlobUploadDir subdir - err = ensureDir(path.Join(repoDir, BlobUploadDir), is.log) - if err != nil { - is.log.Error().Err(err).Msg("error creating blob upload subdir") - - return err - } - - // "oci-layout" file - create if it doesn't exist - ilPath := path.Join(repoDir, ispec.ImageLayoutFile) - if _, err := os.Stat(ilPath); err != nil { - il := ispec.ImageLayout{Version: ispec.ImageLayoutVersion} - buf, err := json.Marshal(il) - - if err != nil { - is.log.Panic().Err(err).Msg("unable to marshal JSON") - } - - if err := ioutil.WriteFile(ilPath, buf, 0644); err != nil { //nolint: gosec - is.log.Error().Err(err).Str("file", ilPath).Msg("unable to write file") - return err - } - } - - // "index.json" file - create if it doesn't exist - indexPath := path.Join(repoDir, "index.json") - if _, err := os.Stat(indexPath); err != nil { - index := ispec.Index{} - index.SchemaVersion = 2 - buf, err := json.Marshal(index) - - if err != nil { - is.log.Panic().Err(err).Msg("unable to marshal JSON") - } - - if err := ioutil.WriteFile(indexPath, buf, 0644); err != nil { //nolint: gosec - is.log.Error().Err(err).Str("file", indexPath).Msg("unable to write file") - return err - } - } - - return nil -} - -// InitRepo creates an image repository under this store. -func (is *ImageStore) InitRepo(name string) error { - is.Lock() - defer is.Unlock() - - return is.initRepo(name) -} - -// ValidateRepo validates that the repository layout is complaint with the OCI repo layout. -func (is *ImageStore) ValidateRepo(name string) (bool, error) { - // https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content - // at least, expect at least 3 entries - ["blobs", "oci-layout", "index.json"] - // and an additional/optional BlobUploadDir in each image store - dir := path.Join(is.rootDir, name) - if !dirExists(dir) { - return false, errors.ErrRepoNotFound - } - - files, err := ioutil.ReadDir(dir) - if err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("unable to read directory") - return false, errors.ErrRepoNotFound - } - // nolint:gomnd - if len(files) < 3 { - return false, errors.ErrRepoBadVersion - } - - found := map[string]bool{ - "blobs": false, - ispec.ImageLayoutFile: false, - "index.json": false, - } - - for _, file := range files { - if file.Name() == "blobs" && !file.IsDir() { - return false, nil - } - - found[file.Name()] = true - } - - for k, v := range found { - if !v && k != BlobUploadDir { - return false, nil - } - } - - buf, err := ioutil.ReadFile(path.Join(dir, ispec.ImageLayoutFile)) - if err != nil { - return false, err - } - - var il ispec.ImageLayout - if err := json.Unmarshal(buf, &il); err != nil { - return false, err - } - - if il.Version != ispec.ImageLayoutVersion { - return false, errors.ErrRepoBadVersion - } - - return true, nil -} - -// GetRepositories returns a list of all the repositories under this store. -func (is *ImageStore) GetRepositories() ([]string, error) { - dir := is.rootDir - - is.RLock() - defer is.RUnlock() - - _, err := ioutil.ReadDir(dir) - if err != nil { - is.log.Error().Err(err).Msg("failure walking storage root-dir") - return nil, err - } - - stores := make([]string, 0) - err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } - - if !info.IsDir() { - return nil - } - - rel, err := filepath.Rel(is.rootDir, path) - if err != nil { - return nil - } - - if ok, err := is.ValidateRepo(rel); !ok || err != nil { - return nil - } - - //is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store") - stores = append(stores, rel) - - return nil - }) - - return stores, err -} - -// GetImageTags returns a list of image tags available in the specified repository. -func (is *ImageStore) GetImageTags(repo string) ([]string, error) { - dir := path.Join(is.rootDir, repo) - if !dirExists(dir) { - return nil, errors.ErrRepoNotFound - } - - is.RLock() - defer is.RUnlock() - - buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) - if err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") - return nil, errors.ErrRepoNotFound - } - - var index ispec.Index - if err := json.Unmarshal(buf, &index); err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") - return nil, errors.ErrRepoNotFound - } - - tags := make([]string, 0) - - for _, manifest := range index.Manifests { - v, ok := manifest.Annotations[ispec.AnnotationRefName] - if ok { - tags = append(tags, v) - } - } - - return tags, nil -} - -// GetImageManifest returns the image manifest of an image in the specific repository. -func (is *ImageStore) GetImageManifest(repo string, reference string) ([]byte, string, string, error) { - dir := path.Join(is.rootDir, repo) - if !dirExists(dir) { - return nil, "", "", errors.ErrRepoNotFound - } - - is.RLock() - defer is.RUnlock() - - buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) - - if err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") - - if os.IsNotExist(err) { - return nil, "", "", errors.ErrRepoNotFound - } - - return nil, "", "", err - } - - var index ispec.Index - if err := json.Unmarshal(buf, &index); err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") - return nil, "", "", err - } - - found := false - - var digest godigest.Digest - - mediaType := "" - - for _, m := range index.Manifests { - if reference == m.Digest.String() { - digest = m.Digest - mediaType = m.MediaType - found = true - - break - } - - v, ok := m.Annotations[ispec.AnnotationRefName] - if ok && v == reference { - digest = m.Digest - mediaType = m.MediaType - found = true - - break - } - } - - if !found { - return nil, "", "", errors.ErrManifestNotFound - } - - p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) - - buf, err = ioutil.ReadFile(p) - - if err != nil { - is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest") - - if os.IsNotExist(err) { - return nil, "", "", errors.ErrManifestNotFound - } - - return nil, "", "", err - } - - var manifest ispec.Manifest - if err := json.Unmarshal(buf, &manifest); err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") - return nil, "", "", err - } - - return buf, digest.String(), mediaType, nil -} - -// PutImageManifest adds an image manifest to the repository. -func (is *ImageStore) PutImageManifest(repo string, reference string, mediaType string, - body []byte) (string, error) { - if err := is.InitRepo(repo); err != nil { - is.log.Debug().Err(err).Msg("init repo") - return "", err - } - - if mediaType != ispec.MediaTypeImageManifest { - is.log.Debug().Interface("actual", mediaType). - Interface("expected", ispec.MediaTypeImageManifest).Msg("bad manifest media type") - return "", errors.ErrBadManifest - } - - if len(body) == 0 { - is.log.Debug().Int("len", len(body)).Msg("invalid body length") - return "", errors.ErrBadManifest - } - - var m ispec.Manifest - if err := json.Unmarshal(body, &m); err != nil { - is.log.Error().Err(err).Msg("unable to unmarshal JSON") - return "", errors.ErrBadManifest - } - - if m.SchemaVersion != schemaVersion { - is.log.Error().Int("SchemaVersion", m.SchemaVersion).Msg("invalid manifest") - return "", errors.ErrBadManifest - } - - for _, l := range m.Layers { - digest := l.Digest - blobPath := is.BlobPath(repo, digest) - is.log.Info().Str("blobPath", blobPath).Str("reference", reference).Msg("manifest layers") - - if _, err := os.Stat(blobPath); err != nil { - is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob") - return digest.String(), errors.ErrBlobNotFound - } - } - - mDigest := godigest.FromBytes(body) - refIsDigest := false - d, err := godigest.Parse(reference) - - if err == nil { - if d.String() != mDigest.String() { - is.log.Error().Str("actual", mDigest.String()).Str("expected", d.String()). - Msg("manifest digest is not valid") - return "", errors.ErrBadManifest - } - - refIsDigest = true - } - - is.Lock() - defer is.Unlock() - - dir := path.Join(is.rootDir, repo) - buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) - - if err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") - return "", err - } - - var index ispec.Index - if err := json.Unmarshal(buf, &index); err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") - return "", errors.ErrRepoBadVersion - } - - updateIndex := true - // create a new descriptor - desc := ispec.Descriptor{MediaType: mediaType, Size: int64(len(body)), Digest: mDigest, - Platform: &ispec.Platform{Architecture: "amd64", OS: "linux"}} - if !refIsDigest { - desc.Annotations = map[string]string{ispec.AnnotationRefName: reference} - } - - for i, m := range index.Manifests { - if reference == m.Digest.String() { - // nothing changed, so don't update - desc = m - updateIndex = false - - break - } - - v, ok := m.Annotations[ispec.AnnotationRefName] - if ok && v == reference { - if m.Digest.String() == mDigest.String() { - // nothing changed, so don't update - desc = m - updateIndex = false - - break - } - // manifest contents have changed for the same tag, - // so update index.json descriptor - is.log.Info(). - Int64("old size", desc.Size). - Int64("new size", int64(len(body))). - Str("old digest", desc.Digest.String()). - Str("new digest", mDigest.String()). - Msg("updating existing tag with new manifest contents") - - desc = m - desc.Size = int64(len(body)) - desc.Digest = mDigest - - index.Manifests = append(index.Manifests[:i], index.Manifests[i+1:]...) - - break - } - } - - if !updateIndex { - return desc.Digest.String(), nil - } - - // write manifest to "blobs" - dir = path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String()) - _ = ensureDir(dir, is.log) - file := path.Join(dir, mDigest.Encoded()) - - if err := ioutil.WriteFile(file, body, 0600); err != nil { - is.log.Error().Err(err).Str("file", file).Msg("unable to write") - return "", err - } - - // now update "index.json" - index.Manifests = append(index.Manifests, desc) - dir = path.Join(is.rootDir, repo) - file = path.Join(dir, "index.json") - buf, err = json.Marshal(index) - - if err != nil { - is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON") - return "", err - } - - if err := ioutil.WriteFile(file, buf, 0644); err != nil { //nolint: gosec - is.log.Error().Err(err).Str("file", file).Msg("unable to write") - return "", err - } - - if is.gc { - oci, err := umoci.OpenLayout(dir) - if err != nil { - return "", err - } - defer oci.Close() - - if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil { - return "", err - } - } - - return desc.Digest.String(), nil -} - -// DeleteImageManifest deletes the image manifest from the repository. -func (is *ImageStore) DeleteImageManifest(repo string, reference string) error { - dir := path.Join(is.rootDir, repo) - if !dirExists(dir) { - return errors.ErrRepoNotFound - } - - isTag := false - - // as per spec "reference" can be a digest and a tag - digest, err := godigest.Parse(reference) - if err != nil { - is.log.Debug().Str("invalid digest: ", reference).Msg("storage: assuming tag") - - isTag = true - } - - is.Lock() - defer is.Unlock() - - buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) - - if err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") - return err - } - - var index ispec.Index - if err := json.Unmarshal(buf, &index); err != nil { - is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") - return err - } - - found := false - - var m ispec.Descriptor - - // we are deleting, so keep only those manifests that don't match - outIndex := index - outIndex.Manifests = []ispec.Descriptor{} - - for _, m = range index.Manifests { - if isTag { - tag, ok := m.Annotations[ispec.AnnotationRefName] - if ok && tag == reference { - is.log.Debug().Str("deleting tag", tag).Msg("") - - digest = m.Digest - - found = true - - continue - } - } else if reference == m.Digest.String() { - is.log.Debug().Str("deleting reference", reference).Msg("") - found = true - continue - } - - outIndex.Manifests = append(outIndex.Manifests, m) - } - - if !found { - return errors.ErrManifestNotFound - } - - // now update "index.json" - dir = path.Join(is.rootDir, repo) - file := path.Join(dir, "index.json") - buf, err = json.Marshal(outIndex) - - if err != nil { - return err - } - - if err := ioutil.WriteFile(file, buf, 0644); err != nil { //nolint: gosec - return err - } - - if is.gc { - oci, err := umoci.OpenLayout(dir) - if err != nil { - return err - } - defer oci.Close() - - if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil { - return err - } - } - - // Delete blob only when blob digest not present in manifest entry. - // e.g. 1.0.1 & 1.0.2 have same blob digest so if we delete 1.0.1, blob should not be removed. - toDelete := true - - for _, m = range outIndex.Manifests { - if digest.String() == m.Digest.String() { - toDelete = false - break - } - } - - if toDelete { - p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) - - _ = os.Remove(p) - } - - return nil -} - -// BlobUploadPath returns the upload path for a blob in this store. -func (is *ImageStore) BlobUploadPath(repo string, uuid string) string { - dir := path.Join(is.rootDir, repo) - blobUploadPath := path.Join(dir, BlobUploadDir, uuid) - - return blobUploadPath -} - -// NewBlobUpload returns the unique ID for an upload in progress. -func (is *ImageStore) NewBlobUpload(repo string) (string, error) { - if err := is.InitRepo(repo); err != nil { - is.log.Error().Err(err).Msg("error initializing repo") - - return "", err - } - - uuid, err := guuid.NewV4() - if err != nil { - return "", err - } - - u := uuid.String() - - blobUploadPath := is.BlobUploadPath(repo, u) - - file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0600) - if err != nil { - return "", errors.ErrRepoNotFound - } - defer file.Close() - - return u, nil -} - -// GetBlobUpload returns the current size of a blob upload. -func (is *ImageStore) GetBlobUpload(repo string, uuid string) (int64, error) { - blobUploadPath := is.BlobUploadPath(repo, uuid) - fi, err := os.Stat(blobUploadPath) - - if err != nil { - if os.IsNotExist(err) { - return -1, errors.ErrUploadNotFound - } - - return -1, err - } - - return fi.Size(), nil -} - -// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns -// the number of actual bytes to the blob. -func (is *ImageStore) PutBlobChunkStreamed(repo string, uuid string, body io.Reader) (int64, error) { - if err := is.InitRepo(repo); err != nil { - return -1, err - } - - blobUploadPath := is.BlobUploadPath(repo, uuid) - - _, err := os.Stat(blobUploadPath) - if err != nil { - return -1, errors.ErrUploadNotFound - } - - file, err := os.OpenFile( - blobUploadPath, - os.O_WRONLY|os.O_CREATE, - 0600, - ) - if err != nil { - is.log.Fatal().Err(err).Msg("failed to open file") - } - defer file.Close() - - if _, err := file.Seek(0, io.SeekEnd); err != nil { - is.log.Fatal().Err(err).Msg("failed to seek file") - } - - n, err := io.Copy(file, body) - - return n, err -} - -// PutBlobChunk writes another chunk of data to the specified blob. It returns -// the number of actual bytes to the blob. -func (is *ImageStore) PutBlobChunk(repo string, uuid string, from int64, to int64, - body io.Reader) (int64, error) { - if err := is.InitRepo(repo); err != nil { - return -1, err - } - - blobUploadPath := is.BlobUploadPath(repo, uuid) - - fi, err := os.Stat(blobUploadPath) - if err != nil { - return -1, errors.ErrUploadNotFound - } - - if from != fi.Size() { - is.log.Error().Int64("expected", from).Int64("actual", fi.Size()). - Msg("invalid range start for blob upload") - return -1, errors.ErrBadUploadRange - } - - file, err := os.OpenFile( - blobUploadPath, - os.O_WRONLY|os.O_CREATE, - 0600, - ) - if err != nil { - is.log.Fatal().Err(err).Msg("failed to open file") - } - defer file.Close() - - if _, err := file.Seek(from, io.SeekStart); err != nil { - is.log.Fatal().Err(err).Msg("failed to seek file") - } - - n, err := io.Copy(file, body) - - return n, err -} - -// BlobUploadInfo returns the current blob size in bytes. -func (is *ImageStore) BlobUploadInfo(repo string, uuid string) (int64, error) { - blobUploadPath := is.BlobUploadPath(repo, uuid) - fi, err := os.Stat(blobUploadPath) - - if err != nil { - is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob") - return -1, err - } - - size := fi.Size() - - return size, nil -} - -// FinishBlobUpload finalizes the blob upload and moves blob the repository. -func (is *ImageStore) FinishBlobUpload(repo string, uuid string, body io.Reader, digest string) error { - dstDigest, err := godigest.Parse(digest) - if err != nil { - is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") - return errors.ErrBadBlobDigest - } - - src := is.BlobUploadPath(repo, uuid) - - _, err = os.Stat(src) - if err != nil { - is.log.Error().Err(err).Str("blob", src).Msg("failed to stat blob") - return errors.ErrUploadNotFound - } - - f, err := os.Open(src) - if err != nil { - is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") - return errors.ErrUploadNotFound - } - - srcDigest, err := godigest.FromReader(f) - f.Close() - - if err != nil { - is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") - return errors.ErrBadBlobDigest - } - - if srcDigest != dstDigest { - is.log.Error().Str("srcDigest", srcDigest.String()). - Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest") - return errors.ErrBadBlobDigest - } - - dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) - - is.Lock() - defer is.Unlock() - - err = ensureDir(dir, is.log) - if err != nil { - is.log.Error().Err(err).Msg("error creating blobs/sha256 dir") - - return err - } - - dst := is.BlobPath(repo, dstDigest) - - if is.dedupe && is.cache != nil { - if err := is.DedupeBlob(src, dstDigest, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). - Str("dst", dst).Msg("unable to dedupe blob") - return err - } - } else { - if err := os.Rename(src, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). - Str("dst", dst).Msg("unable to finish blob") - return err - } - } - - return nil -} - -// FullBlobUpload handles a full blob upload, and no partial session is created. -func (is *ImageStore) FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) { - if err := is.InitRepo(repo); err != nil { - return "", -1, err - } - - dstDigest, err := godigest.Parse(digest) - if err != nil { - is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") - return "", -1, errors.ErrBadBlobDigest - } - - u, err := guuid.NewV4() - if err != nil { - return "", -1, err - } - - uuid := u.String() - - src := is.BlobUploadPath(repo, uuid) - - f, err := os.Create(src) - if err != nil { - is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") - return "", -1, errors.ErrUploadNotFound - } - - defer f.Close() - - digester := sha256.New() - mw := io.MultiWriter(f, digester) - n, err := io.Copy(mw, body) - - if err != nil { - return "", -1, err - } - - srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil))) - if srcDigest != dstDigest { - is.log.Error().Str("srcDigest", srcDigest.String()). - Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest") - return "", -1, errors.ErrBadBlobDigest - } - - dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) - - is.Lock() - defer is.Unlock() - - _ = ensureDir(dir, is.log) - dst := is.BlobPath(repo, dstDigest) - - if is.dedupe && is.cache != nil { - if err := is.DedupeBlob(src, dstDigest, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). - Str("dst", dst).Msg("unable to dedupe blob") - return "", -1, err - } - } else { - if err := os.Rename(src, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). - Str("dst", dst).Msg("unable to finish blob") - return "", -1, err - } - } - - return uuid, n, nil -} - -// nolint:interfacer -func (is *ImageStore) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error { -retry: - is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: ENTER") - - dstRecord, err := is.cache.GetBlob(dstDigest.String()) - - // nolint:goerr113 - if err != nil && err != errors.ErrCacheMiss { - is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record") - return err - } - - if dstRecord == "" { - // cache record doesn't exist, so first disk and cache entry for this digest - if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil { - is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") - - return err - } - - // move the blob from uploads to final dest - if err := os.Rename(src, dst); err != nil { - is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob") - - return err - } - - is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename") - } else { - // cache record exists, but due to GC and upgrades from older versions, - // disk content and cache records may go out of sync - dstRecord = path.Join(is.rootDir, dstRecord) - - dstRecordFi, err := os.Stat(dstRecord) - if err != nil { - is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") - // the actual blob on disk may have been removed by GC, so sync the cache - if err := is.cache.DeleteBlob(dstDigest.String(), dstRecord); err != nil { - // nolint:lll - is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record") - - return err - } - goto retry - } - - dstFi, err := os.Stat(dst) - if err != nil && !os.IsNotExist(err) { - is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") - - return err - } - - if !os.SameFile(dstFi, dstRecordFi) { - // blob lookup cache out of sync with actual disk contents - if err := os.Remove(dst); err != nil && !os.IsNotExist(err) { - is.log.Error().Err(err).Str("dst", dst).Msg("dedupe: unable to remove blob") - return err - } - - is.log.Debug().Str("blobPath", dst).Msg("dedupe: creating hard link") - - if err := os.Link(dstRecord, dst); err != nil { - is.log.Error().Err(err).Str("blobPath", dst).Str("link", dstRecord).Msg("dedupe: unable to hard link") - - return err - } - } - - if err := os.Remove(src); err != nil { - is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob") - return err - } - is.log.Debug().Str("src", src).Msg("dedupe: remove") - } - - return nil -} - -// DeleteBlobUpload deletes an existing blob upload that is currently in progress. -func (is *ImageStore) DeleteBlobUpload(repo string, uuid string) error { - blobUploadPath := is.BlobUploadPath(repo, uuid) - if err := os.Remove(blobUploadPath); err != nil { - is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload") - return err - } - - return nil -} - -// BlobPath returns the repository path of a blob. -func (is *ImageStore) BlobPath(repo string, digest godigest.Digest) string { - return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded()) -} - -// CheckBlob verifies a blob and returns true if the blob is correct. -func (is *ImageStore) CheckBlob(repo string, digest string) (bool, int64, error) { - d, err := godigest.Parse(digest) - if err != nil { - is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") - return false, -1, errors.ErrBadBlobDigest - } - - blobPath := is.BlobPath(repo, d) - - if is.dedupe && is.cache != nil { - is.Lock() - defer is.Unlock() - } else { - is.RLock() - defer is.RUnlock() - } - - blobInfo, err := os.Stat(blobPath) - if err == nil { - is.log.Debug().Str("blob path", blobPath).Msg("blob path found") - - return true, blobInfo.Size(), nil - } - - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - - // Check blobs in cache - dstRecord, err := is.checkCacheBlob(digest) - if err != nil { - is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found") - - return false, -1, errors.ErrBlobNotFound - } - - // If found copy to location - blobSize, err := is.copyBlob(repo, blobPath, dstRecord) - if err != nil { - return false, -1, errors.ErrBlobNotFound - } - - if err := is.cache.PutBlob(digest, blobPath); err != nil { - is.log.Error().Err(err).Str("blobPath", blobPath).Msg("dedupe: unable to insert blob record") - - return false, -1, err - } - - return true, blobSize, nil -} - -func (is *ImageStore) checkCacheBlob(digest string) (string, error) { - if !is.dedupe || is.cache == nil { - return "", errors.ErrBlobNotFound - } - - dstRecord, err := is.cache.GetBlob(digest) - if err != nil { - return "", err - } - - dstRecord = path.Join(is.rootDir, dstRecord) - - is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record") - - return dstRecord, nil -} - -func (is *ImageStore) copyBlob(repo string, blobPath string, dstRecord string) (int64, error) { - if err := is.initRepo(repo); err != nil { - is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo") - return -1, err - } - - _ = ensureDir(filepath.Dir(blobPath), is.log) - - if err := os.Link(dstRecord, blobPath); err != nil { - is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to hard link") - - return -1, errors.ErrBlobNotFound - } - - blobInfo, err := os.Stat(blobPath) - if err == nil { - return blobInfo.Size(), nil - } - - return -1, errors.ErrBlobNotFound -} - -// GetBlob returns a stream to read the blob. -// FIXME: we should probably parse the manifest and use (digest, mediaType) as a -// blob selector instead of directly downloading the blob. -func (is *ImageStore) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) { - d, err := godigest.Parse(digest) - if err != nil { - is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") - return nil, -1, errors.ErrBadBlobDigest - } - - blobPath := is.BlobPath(repo, d) - - is.RLock() - defer is.RUnlock() - - blobInfo, err := os.Stat(blobPath) - if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - return nil, -1, errors.ErrBlobNotFound - } - - blobReader, err := os.Open(blobPath) - if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") - return nil, -1, err - } - - return blobReader, blobInfo.Size(), nil -} - -// DeleteBlob removes the blob from the repository. -func (is *ImageStore) DeleteBlob(repo string, digest string) error { - d, err := godigest.Parse(digest) - if err != nil { - is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") - return errors.ErrBlobNotFound - } - - blobPath := is.BlobPath(repo, d) - - is.Lock() - defer is.Unlock() - - _, err = os.Stat(blobPath) - if err != nil { - is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") - return errors.ErrBlobNotFound - } - - if is.cache != nil { - if err := is.cache.DeleteBlob(digest, blobPath); err != nil { - is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache") - return err - } - } - - if err := os.Remove(blobPath); err != nil { - is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path") - return err - } - - return nil -} - -// garbage collection - -// Scrub will clean up all unreferenced blobs. -// TODO. -func Scrub(dir string, fix bool) error { - return nil -} - -// utility routines - -func CheckHardLink(srcFileName string, destFileName string) error { - return os.Link(srcFileName, destFileName) -} - -func ValidateHardLink(rootDir string) error { - err := ioutil.WriteFile(path.Join(rootDir, "hardlinkcheck.txt"), //nolint: gosec - []byte("check whether hardlinks work on filesystem"), 0644) - if err != nil { - return err - } - - err = CheckHardLink(path.Join(rootDir, "hardlinkcheck.txt"), path.Join(rootDir, "duphardlinkcheck.txt")) - if err != nil { - // Remove hardlinkcheck.txt if hardlink fails - zerr := os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt")) - if zerr != nil { - return zerr - } - - return err - } - - err = os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt")) - if err != nil { - return err - } - - return os.RemoveAll(path.Join(rootDir, "duphardlinkcheck.txt")) -} - -func dirExists(d string) bool { - fi, err := os.Stat(d) - if err != nil && os.IsNotExist(err) { - return false - } - - if !fi.IsDir() { - return false - } - - return true -} - -func ensureDir(dir string, log zerolog.Logger) error { - if err := os.MkdirAll(dir, 0755); err != nil { - log.Error().Err(err).Str("dir", dir).Msg("unable to create dir") - - return err - } - - return nil -} - -func ifOlderThan(is *ImageStore, repo string, delay time.Duration) casext.GCPolicy { - return func(ctx context.Context, digest godigest.Digest) (bool, error) { - blobPath := is.BlobPath(repo, digest) - fi, err := os.Stat(blobPath) - - if err != nil { - return false, err - } - - if fi.ModTime().Add(delay).After(time.Now()) { - return false, nil - } - - is.log.Info().Str("digest", digest.String()).Str("blobPath", blobPath).Msg("perform GC on blob") - - return true, nil - } +type ImageStore interface { + DirExists(d string) bool + RootDir() string + RLock() + RUnlock() + Lock() + Unlock() + InitRepo(name string) error + ValidateRepo(name string) (bool, error) + GetRepositories() ([]string, error) + GetImageTags(repo string) ([]string, error) + GetImageManifest(repo string, reference string) ([]byte, string, string, error) + PutImageManifest(repo string, reference string, mediaType string, body []byte) (string, error) + DeleteImageManifest(repo string, reference string) error + BlobUploadPath(repo string, uuid string) string + NewBlobUpload(repo string) (string, error) + GetBlobUpload(repo string, uuid string) (int64, error) + PutBlobChunkStreamed(repo string, uuid string, body io.Reader) (int64, error) + PutBlobChunk(repo string, uuid string, from int64, to int64, body io.Reader) (int64, error) + BlobUploadInfo(repo string, uuid string) (int64, error) + FinishBlobUpload(repo string, uuid string, body io.Reader, digest string) error + FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) + DedupeBlob(src string, dstDigest digest.Digest, dst string) error + DeleteBlobUpload(repo string, uuid string) error + BlobPath(repo string, digest digest.Digest) string + CheckBlob(repo string, digest string) (bool, int64, error) + GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) + DeleteBlob(repo string, digest string) error + GetIndexContent(repo string) ([]byte, error) + GetBlobContent(repo, digest string) ([]byte, error) } diff --git a/pkg/storage/storage_fs.go b/pkg/storage/storage_fs.go new file mode 100644 index 00000000..6efb2d31 --- /dev/null +++ b/pkg/storage/storage_fs.go @@ -0,0 +1,1333 @@ +package storage + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "os" + "path" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/anuvu/zot/errors" + zlog "github.com/anuvu/zot/pkg/log" + apexlog "github.com/apex/log" + guuid "github.com/gofrs/uuid" + godigest "github.com/opencontainers/go-digest" + ispec "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/opencontainers/umoci" + "github.com/opencontainers/umoci/oci/casext" + "github.com/rs/zerolog" +) + +const ( + // BlobUploadDir defines the upload directory for blob uploads. + BlobUploadDir = ".uploads" + schemaVersion = 2 + gcDelay = 1 * time.Hour +) + +// BlobUpload models and upload request. +type BlobUpload struct { + StoreName string + ID string +} + +type StoreController struct { + DefaultStore ImageStore + SubStore map[string]ImageStore +} + +// ImageStoreFS provides the image storage operations. +type ImageStoreFS struct { + rootDir string + lock *sync.RWMutex + blobUploads map[string]BlobUpload + cache *Cache + gc bool + dedupe bool + log zerolog.Logger +} + +func (is *ImageStoreFS) RootDir() string { + return is.rootDir +} + +func (is *ImageStoreFS) DirExists(d string) bool { + fi, err := os.Stat(d) + if err != nil && os.IsNotExist(err) { + return false + } + + if !fi.IsDir() { + return false + } + + return true +} + +func getRoutePrefix(name string) string { + names := strings.SplitN(name, "/", 2) + + if len(names) != 2 { // nolint: gomnd + // it means route is of global storage e.g "centos:latest" + if len(names) == 1 { + return "/" + } + } + + return fmt.Sprintf("/%s", names[0]) +} + +func (sc StoreController) GetImageStore(name string) ImageStore { + if sc.SubStore != nil { + // SubStore is being provided, now we need to find equivalent image store and this will be found by splitting name + prefixName := getRoutePrefix(name) + + imgStore, ok := sc.SubStore[prefixName] + if !ok { + imgStore = sc.DefaultStore + } + + return imgStore + } + + return sc.DefaultStore +} + +// NewImageStore returns a new image store backed by a file storage. +func NewImageStore(rootDir string, gc bool, dedupe bool, log zlog.Logger) ImageStore { + if _, err := os.Stat(rootDir); os.IsNotExist(err) { + if err := os.MkdirAll(rootDir, 0700); err != nil { + log.Error().Err(err).Str("rootDir", rootDir).Msg("unable to create root dir") + return nil + } + } + + is := &ImageStoreFS{ + rootDir: rootDir, + lock: &sync.RWMutex{}, + blobUploads: make(map[string]BlobUpload), + gc: gc, + dedupe: dedupe, + log: log.With().Caller().Logger(), + } + + if dedupe { + is.cache = NewCache(rootDir, "cache", log) + } + + if gc { + // we use umoci GC to perform garbage-collection, but it uses its own logger + // - so capture those logs, could be useful + apexlog.SetLevel(apexlog.DebugLevel) + apexlog.SetHandler(apexlog.HandlerFunc(func(entry *apexlog.Entry) error { + e := log.Debug() + for k, v := range entry.Fields { + e = e.Interface(k, v) + } + e.Msg(entry.Message) + return nil + })) + } + + return is +} + +// RLock read-lock. +func (is *ImageStoreFS) RLock() { + is.lock.RLock() +} + +// RUnlock read-unlock. +func (is *ImageStoreFS) RUnlock() { + is.lock.RUnlock() +} + +// Lock write-lock. +func (is *ImageStoreFS) Lock() { + is.lock.Lock() +} + +// Unlock write-unlock. +func (is *ImageStoreFS) Unlock() { + is.lock.Unlock() +} + +func (is *ImageStoreFS) initRepo(name string) error { + repoDir := path.Join(is.rootDir, name) + // create "blobs" subdir + err := ensureDir(path.Join(repoDir, "blobs"), is.log) + if err != nil { + is.log.Error().Err(err).Msg("error creating blobs subdir") + + return err + } + // create BlobUploadDir subdir + err = ensureDir(path.Join(repoDir, BlobUploadDir), is.log) + if err != nil { + is.log.Error().Err(err).Msg("error creating blob upload subdir") + + return err + } + + // "oci-layout" file - create if it doesn't exist + ilPath := path.Join(repoDir, ispec.ImageLayoutFile) + if _, err := os.Stat(ilPath); err != nil { + il := ispec.ImageLayout{Version: ispec.ImageLayoutVersion} + buf, err := json.Marshal(il) + + if err != nil { + is.log.Panic().Err(err).Msg("unable to marshal JSON") + } + + if err := ioutil.WriteFile(ilPath, buf, 0644); err != nil { //nolint: gosec + is.log.Error().Err(err).Str("file", ilPath).Msg("unable to write file") + return err + } + } + + // "index.json" file - create if it doesn't exist + indexPath := path.Join(repoDir, "index.json") + if _, err := os.Stat(indexPath); err != nil { + index := ispec.Index{} + index.SchemaVersion = 2 + buf, err := json.Marshal(index) + + if err != nil { + is.log.Panic().Err(err).Msg("unable to marshal JSON") + } + + if err := ioutil.WriteFile(indexPath, buf, 0644); err != nil { //nolint: gosec + is.log.Error().Err(err).Str("file", indexPath).Msg("unable to write file") + return err + } + } + + return nil +} + +// InitRepo creates an image repository under this store. +func (is *ImageStoreFS) InitRepo(name string) error { + is.Lock() + defer is.Unlock() + + return is.initRepo(name) +} + +// ValidateRepo validates that the repository layout is complaint with the OCI repo layout. +func (is *ImageStoreFS) ValidateRepo(name string) (bool, error) { + // https://github.com/opencontainers/image-spec/blob/master/image-layout.md#content + // at least, expect at least 3 entries - ["blobs", "oci-layout", "index.json"] + // and an additional/optional BlobUploadDir in each image store + dir := path.Join(is.rootDir, name) + if !is.DirExists(dir) { + return false, errors.ErrRepoNotFound + } + + files, err := ioutil.ReadDir(dir) + if err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("unable to read directory") + return false, errors.ErrRepoNotFound + } + // nolint:gomnd + if len(files) < 3 { + return false, errors.ErrRepoBadVersion + } + + found := map[string]bool{ + "blobs": false, + ispec.ImageLayoutFile: false, + "index.json": false, + } + + for _, file := range files { + if file.Name() == "blobs" && !file.IsDir() { + return false, nil + } + + found[file.Name()] = true + } + + for k, v := range found { + if !v && k != BlobUploadDir { + return false, nil + } + } + + buf, err := ioutil.ReadFile(path.Join(dir, ispec.ImageLayoutFile)) + if err != nil { + return false, err + } + + var il ispec.ImageLayout + if err := json.Unmarshal(buf, &il); err != nil { + return false, err + } + + if il.Version != ispec.ImageLayoutVersion { + return false, errors.ErrRepoBadVersion + } + + return true, nil +} + +// GetRepositories returns a list of all the repositories under this store. +func (is *ImageStoreFS) GetRepositories() ([]string, error) { + dir := is.rootDir + + is.RLock() + defer is.RUnlock() + + _, err := ioutil.ReadDir(dir) + if err != nil { + is.log.Error().Err(err).Msg("failure walking storage root-dir") + return nil, err + } + + stores := make([]string, 0) + err = filepath.Walk(dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + + if !info.IsDir() { + return nil + } + + rel, err := filepath.Rel(is.rootDir, path) + if err != nil { + return nil + } + + if ok, err := is.ValidateRepo(rel); !ok || err != nil { + return nil + } + + //is.log.Debug().Str("dir", path).Str("name", info.Name()).Msg("found image store") + stores = append(stores, rel) + + return nil + }) + + return stores, err +} + +// GetImageTags returns a list of image tags available in the specified repository. +func (is *ImageStoreFS) GetImageTags(repo string) ([]string, error) { + dir := path.Join(is.rootDir, repo) + if !is.DirExists(dir) { + return nil, errors.ErrRepoNotFound + } + + is.RLock() + defer is.RUnlock() + + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) + if err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") + return nil, errors.ErrRepoNotFound + } + + var index ispec.Index + if err := json.Unmarshal(buf, &index); err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") + return nil, errors.ErrRepoNotFound + } + + tags := make([]string, 0) + + for _, manifest := range index.Manifests { + v, ok := manifest.Annotations[ispec.AnnotationRefName] + if ok { + tags = append(tags, v) + } + } + + return tags, nil +} + +// GetImageManifest returns the image manifest of an image in the specific repository. +func (is *ImageStoreFS) GetImageManifest(repo string, reference string) ([]byte, string, string, error) { + dir := path.Join(is.rootDir, repo) + if !is.DirExists(dir) { + return nil, "", "", errors.ErrRepoNotFound + } + + is.RLock() + defer is.RUnlock() + + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) + + if err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") + + if os.IsNotExist(err) { + return nil, "", "", errors.ErrRepoNotFound + } + + return nil, "", "", err + } + + var index ispec.Index + if err := json.Unmarshal(buf, &index); err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") + return nil, "", "", err + } + + found := false + + var digest godigest.Digest + + mediaType := "" + + for _, m := range index.Manifests { + if reference == m.Digest.String() { + digest = m.Digest + mediaType = m.MediaType + found = true + + break + } + + v, ok := m.Annotations[ispec.AnnotationRefName] + if ok && v == reference { + digest = m.Digest + mediaType = m.MediaType + found = true + + break + } + } + + if !found { + return nil, "", "", errors.ErrManifestNotFound + } + + p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) + + buf, err = ioutil.ReadFile(p) + + if err != nil { + is.log.Error().Err(err).Str("blob", p).Msg("failed to read manifest") + + if os.IsNotExist(err) { + return nil, "", "", errors.ErrManifestNotFound + } + + return nil, "", "", err + } + + var manifest ispec.Manifest + if err := json.Unmarshal(buf, &manifest); err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") + return nil, "", "", err + } + + return buf, digest.String(), mediaType, nil +} + +// PutImageManifest adds an image manifest to the repository. +func (is *ImageStoreFS) PutImageManifest(repo string, reference string, mediaType string, + body []byte) (string, error) { + if err := is.InitRepo(repo); err != nil { + is.log.Debug().Err(err).Msg("init repo") + return "", err + } + + if mediaType != ispec.MediaTypeImageManifest { + is.log.Debug().Interface("actual", mediaType). + Interface("expected", ispec.MediaTypeImageManifest).Msg("bad manifest media type") + return "", errors.ErrBadManifest + } + + if len(body) == 0 { + is.log.Debug().Int("len", len(body)).Msg("invalid body length") + return "", errors.ErrBadManifest + } + + var m ispec.Manifest + if err := json.Unmarshal(body, &m); err != nil { + is.log.Error().Err(err).Msg("unable to unmarshal JSON") + return "", errors.ErrBadManifest + } + + if m.SchemaVersion != schemaVersion { + is.log.Error().Int("SchemaVersion", m.SchemaVersion).Msg("invalid manifest") + return "", errors.ErrBadManifest + } + + for _, l := range m.Layers { + digest := l.Digest + blobPath := is.BlobPath(repo, digest) + is.log.Info().Str("blobPath", blobPath).Str("reference", reference).Msg("manifest layers") + + if _, err := os.Stat(blobPath); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to find blob") + return digest.String(), errors.ErrBlobNotFound + } + } + + mDigest := godigest.FromBytes(body) + refIsDigest := false + d, err := godigest.Parse(reference) + + if err == nil { + if d.String() != mDigest.String() { + is.log.Error().Str("actual", mDigest.String()).Str("expected", d.String()). + Msg("manifest digest is not valid") + return "", errors.ErrBadManifest + } + + refIsDigest = true + } + + is.Lock() + defer is.Unlock() + + dir := path.Join(is.rootDir, repo) + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) + + if err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") + return "", err + } + + var index ispec.Index + if err := json.Unmarshal(buf, &index); err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") + return "", errors.ErrRepoBadVersion + } + + updateIndex := true + // create a new descriptor + desc := ispec.Descriptor{MediaType: mediaType, Size: int64(len(body)), Digest: mDigest, + Platform: &ispec.Platform{Architecture: "amd64", OS: "linux"}} + if !refIsDigest { + desc.Annotations = map[string]string{ispec.AnnotationRefName: reference} + } + + for i, m := range index.Manifests { + if reference == m.Digest.String() { + // nothing changed, so don't update + desc = m + updateIndex = false + + break + } + + v, ok := m.Annotations[ispec.AnnotationRefName] + if ok && v == reference { + if m.Digest.String() == mDigest.String() { + // nothing changed, so don't update + desc = m + updateIndex = false + + break + } + // manifest contents have changed for the same tag, + // so update index.json descriptor + is.log.Info(). + Int64("old size", desc.Size). + Int64("new size", int64(len(body))). + Str("old digest", desc.Digest.String()). + Str("new digest", mDigest.String()). + Msg("updating existing tag with new manifest contents") + + desc = m + desc.Size = int64(len(body)) + desc.Digest = mDigest + + index.Manifests = append(index.Manifests[:i], index.Manifests[i+1:]...) + + break + } + } + + if !updateIndex { + return desc.Digest.String(), nil + } + + // write manifest to "blobs" + dir = path.Join(is.rootDir, repo, "blobs", mDigest.Algorithm().String()) + _ = ensureDir(dir, is.log) + file := path.Join(dir, mDigest.Encoded()) + + if err := ioutil.WriteFile(file, body, 0600); err != nil { + is.log.Error().Err(err).Str("file", file).Msg("unable to write") + return "", err + } + + // now update "index.json" + index.Manifests = append(index.Manifests, desc) + dir = path.Join(is.rootDir, repo) + file = path.Join(dir, "index.json") + buf, err = json.Marshal(index) + + if err != nil { + is.log.Error().Err(err).Str("file", file).Msg("unable to marshal JSON") + return "", err + } + + if err := ioutil.WriteFile(file, buf, 0644); err != nil { //nolint: gosec + is.log.Error().Err(err).Str("file", file).Msg("unable to write") + return "", err + } + + if is.gc { + oci, err := umoci.OpenLayout(dir) + if err != nil { + return "", err + } + defer oci.Close() + + if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil { + return "", err + } + } + + return desc.Digest.String(), nil +} + +// DeleteImageManifest deletes the image manifest from the repository. +func (is *ImageStoreFS) DeleteImageManifest(repo string, reference string) error { + dir := path.Join(is.rootDir, repo) + if !is.DirExists(dir) { + return errors.ErrRepoNotFound + } + + isTag := false + + // as per spec "reference" can be a digest and a tag + digest, err := godigest.Parse(reference) + if err != nil { + is.log.Debug().Str("invalid digest: ", reference).Msg("storage: assuming tag") + + isTag = true + } + + is.Lock() + defer is.Unlock() + + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) + + if err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") + return err + } + + var index ispec.Index + if err := json.Unmarshal(buf, &index); err != nil { + is.log.Error().Err(err).Str("dir", dir).Msg("invalid JSON") + return err + } + + found := false + + var m ispec.Descriptor + + // we are deleting, so keep only those manifests that don't match + outIndex := index + outIndex.Manifests = []ispec.Descriptor{} + + for _, m = range index.Manifests { + if isTag { + tag, ok := m.Annotations[ispec.AnnotationRefName] + if ok && tag == reference { + is.log.Debug().Str("deleting tag", tag).Msg("") + + digest = m.Digest + + found = true + + continue + } + } else if reference == m.Digest.String() { + is.log.Debug().Str("deleting reference", reference).Msg("") + found = true + continue + } + + outIndex.Manifests = append(outIndex.Manifests, m) + } + + if !found { + return errors.ErrManifestNotFound + } + + // now update "index.json" + dir = path.Join(is.rootDir, repo) + file := path.Join(dir, "index.json") + buf, err = json.Marshal(outIndex) + + if err != nil { + return err + } + + if err := ioutil.WriteFile(file, buf, 0644); err != nil { //nolint: gosec + return err + } + + if is.gc { + oci, err := umoci.OpenLayout(dir) + if err != nil { + return err + } + defer oci.Close() + + if err := oci.GC(context.Background(), ifOlderThan(is, repo, gcDelay)); err != nil { + return err + } + } + + // Delete blob only when blob digest not present in manifest entry. + // e.g. 1.0.1 & 1.0.2 have same blob digest so if we delete 1.0.1, blob should not be removed. + toDelete := true + + for _, m = range outIndex.Manifests { + if digest.String() == m.Digest.String() { + toDelete = false + break + } + } + + if toDelete { + p := path.Join(dir, "blobs", digest.Algorithm().String(), digest.Encoded()) + + _ = os.Remove(p) + } + + return nil +} + +// BlobUploadPath returns the upload path for a blob in this store. +func (is *ImageStoreFS) BlobUploadPath(repo string, uuid string) string { + dir := path.Join(is.rootDir, repo) + blobUploadPath := path.Join(dir, BlobUploadDir, uuid) + + return blobUploadPath +} + +// NewBlobUpload returns the unique ID for an upload in progress. +func (is *ImageStoreFS) NewBlobUpload(repo string) (string, error) { + if err := is.InitRepo(repo); err != nil { + is.log.Error().Err(err).Msg("error initializing repo") + + return "", err + } + + uuid, err := guuid.NewV4() + if err != nil { + return "", err + } + + u := uuid.String() + + blobUploadPath := is.BlobUploadPath(repo, u) + + file, err := os.OpenFile(blobUploadPath, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0600) + if err != nil { + return "", errors.ErrRepoNotFound + } + defer file.Close() + + return u, nil +} + +// GetBlobUpload returns the current size of a blob upload. +func (is *ImageStoreFS) GetBlobUpload(repo string, uuid string) (int64, error) { + blobUploadPath := is.BlobUploadPath(repo, uuid) + fi, err := os.Stat(blobUploadPath) + + if err != nil { + if os.IsNotExist(err) { + return -1, errors.ErrUploadNotFound + } + + return -1, err + } + + return fi.Size(), nil +} + +// PutBlobChunkStreamed appends another chunk of data to the specified blob. It returns +// the number of actual bytes to the blob. +func (is *ImageStoreFS) PutBlobChunkStreamed(repo string, uuid string, body io.Reader) (int64, error) { + if err := is.InitRepo(repo); err != nil { + return -1, err + } + + blobUploadPath := is.BlobUploadPath(repo, uuid) + + _, err := os.Stat(blobUploadPath) + if err != nil { + return -1, errors.ErrUploadNotFound + } + + file, err := os.OpenFile( + blobUploadPath, + os.O_WRONLY|os.O_CREATE, + 0600, + ) + if err != nil { + is.log.Fatal().Err(err).Msg("failed to open file") + } + defer file.Close() + + if _, err := file.Seek(0, io.SeekEnd); err != nil { + is.log.Fatal().Err(err).Msg("failed to seek file") + } + + n, err := io.Copy(file, body) + + return n, err +} + +// PutBlobChunk writes another chunk of data to the specified blob. It returns +// the number of actual bytes to the blob. +func (is *ImageStoreFS) PutBlobChunk(repo string, uuid string, from int64, to int64, + body io.Reader) (int64, error) { + if err := is.InitRepo(repo); err != nil { + return -1, err + } + + blobUploadPath := is.BlobUploadPath(repo, uuid) + + fi, err := os.Stat(blobUploadPath) + if err != nil { + return -1, errors.ErrUploadNotFound + } + + if from != fi.Size() { + is.log.Error().Int64("expected", from).Int64("actual", fi.Size()). + Msg("invalid range start for blob upload") + return -1, errors.ErrBadUploadRange + } + + file, err := os.OpenFile( + blobUploadPath, + os.O_WRONLY|os.O_CREATE, + 0600, + ) + if err != nil { + is.log.Fatal().Err(err).Msg("failed to open file") + } + defer file.Close() + + if _, err := file.Seek(from, io.SeekStart); err != nil { + is.log.Fatal().Err(err).Msg("failed to seek file") + } + + n, err := io.Copy(file, body) + + return n, err +} + +// BlobUploadInfo returns the current blob size in bytes. +func (is *ImageStoreFS) BlobUploadInfo(repo string, uuid string) (int64, error) { + blobUploadPath := is.BlobUploadPath(repo, uuid) + fi, err := os.Stat(blobUploadPath) + + if err != nil { + is.log.Error().Err(err).Str("blob", blobUploadPath).Msg("failed to stat blob") + return -1, err + } + + size := fi.Size() + + return size, nil +} + +// FinishBlobUpload finalizes the blob upload and moves blob the repository. +func (is *ImageStoreFS) FinishBlobUpload(repo string, uuid string, body io.Reader, digest string) error { + dstDigest, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + return errors.ErrBadBlobDigest + } + + src := is.BlobUploadPath(repo, uuid) + + _, err = os.Stat(src) + if err != nil { + is.log.Error().Err(err).Str("blob", src).Msg("failed to stat blob") + return errors.ErrUploadNotFound + } + + f, err := os.Open(src) + if err != nil { + is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") + return errors.ErrUploadNotFound + } + + srcDigest, err := godigest.FromReader(f) + f.Close() + + if err != nil { + is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") + return errors.ErrBadBlobDigest + } + + if srcDigest != dstDigest { + is.log.Error().Str("srcDigest", srcDigest.String()). + Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest") + return errors.ErrBadBlobDigest + } + + dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) + + is.Lock() + defer is.Unlock() + + err = ensureDir(dir, is.log) + if err != nil { + is.log.Error().Err(err).Msg("error creating blobs/sha256 dir") + + return err + } + + dst := is.BlobPath(repo, dstDigest) + + if is.dedupe && is.cache != nil { + if err := is.DedupeBlob(src, dstDigest, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to dedupe blob") + return err + } + } else { + if err := os.Rename(src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to finish blob") + return err + } + } + + return nil +} + +// FullBlobUpload handles a full blob upload, and no partial session is created. +func (is *ImageStoreFS) FullBlobUpload(repo string, body io.Reader, digest string) (string, int64, error) { + if err := is.InitRepo(repo); err != nil { + return "", -1, err + } + + dstDigest, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + return "", -1, errors.ErrBadBlobDigest + } + + u, err := guuid.NewV4() + if err != nil { + return "", -1, err + } + + uuid := u.String() + + src := is.BlobUploadPath(repo, uuid) + + f, err := os.Create(src) + if err != nil { + is.log.Error().Err(err).Str("blob", src).Msg("failed to open blob") + return "", -1, errors.ErrUploadNotFound + } + + defer f.Close() + + digester := sha256.New() + mw := io.MultiWriter(f, digester) + n, err := io.Copy(mw, body) + + if err != nil { + return "", -1, err + } + + srcDigest := godigest.NewDigestFromEncoded(godigest.SHA256, fmt.Sprintf("%x", digester.Sum(nil))) + if srcDigest != dstDigest { + is.log.Error().Str("srcDigest", srcDigest.String()). + Str("dstDigest", dstDigest.String()).Msg("actual digest not equal to expected digest") + return "", -1, errors.ErrBadBlobDigest + } + + dir := path.Join(is.rootDir, repo, "blobs", dstDigest.Algorithm().String()) + + is.Lock() + defer is.Unlock() + + _ = ensureDir(dir, is.log) + dst := is.BlobPath(repo, dstDigest) + + if is.dedupe && is.cache != nil { + if err := is.DedupeBlob(src, dstDigest, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to dedupe blob") + return "", -1, err + } + } else { + if err := os.Rename(src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dstDigest", dstDigest.String()). + Str("dst", dst).Msg("unable to finish blob") + return "", -1, err + } + } + + return uuid, n, nil +} + +func (is *ImageStoreFS) DedupeBlob(src string, dstDigest godigest.Digest, dst string) error { +retry: + is.log.Debug().Str("src", src).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: ENTER") + + dstRecord, err := is.cache.GetBlob(dstDigest.String()) + + // nolint:goerr113 + if err != nil && err != errors.ErrCacheMiss { + is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to lookup blob record") + return err + } + + if dstRecord == "" { + // cache record doesn't exist, so first disk and cache entry for this diges + if err := is.cache.PutBlob(dstDigest.String(), dst); err != nil { + is.log.Error().Err(err).Str("blobPath", dst).Msg("dedupe: unable to insert blob record") + + return err + } + + // move the blob from uploads to final dest + if err := os.Rename(src, dst); err != nil { + is.log.Error().Err(err).Str("src", src).Str("dst", dst).Msg("dedupe: unable to rename blob") + + return err + } + + is.log.Debug().Str("src", src).Str("dst", dst).Msg("dedupe: rename") + } else { + // cache record exists, but due to GC and upgrades from older versions, + // disk content and cache records may go out of sync + dstRecord = path.Join(is.rootDir, dstRecord) + + dstRecordFi, err := os.Stat(dstRecord) + if err != nil { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") + // the actual blob on disk may have been removed by GC, so sync the cache + if err := is.cache.DeleteBlob(dstDigest.String(), dstRecord); err != nil { + // nolint:lll + is.log.Error().Err(err).Str("dstDigest", dstDigest.String()).Str("dst", dst).Msg("dedupe: unable to delete blob record") + + return err + } + goto retry + } + + dstFi, err := os.Stat(dst) + if err != nil && !os.IsNotExist(err) { + is.log.Error().Err(err).Str("blobPath", dstRecord).Msg("dedupe: unable to stat") + + return err + } + + if !os.SameFile(dstFi, dstRecordFi) { + // blob lookup cache out of sync with actual disk contents + if err := os.Remove(dst); err != nil && !os.IsNotExist(err) { + is.log.Error().Err(err).Str("dst", dst).Msg("dedupe: unable to remove blob") + return err + } + + is.log.Debug().Str("blobPath", dst).Msg("dedupe: creating hard link") + + if err := os.Link(dstRecord, dst); err != nil { + is.log.Error().Err(err).Str("blobPath", dst).Str("link", dstRecord).Msg("dedupe: unable to hard link") + + return err + } + } + + if err := os.Remove(src); err != nil { + is.log.Error().Err(err).Str("src", src).Msg("dedupe: uname to remove blob") + return err + } + is.log.Debug().Str("src", src).Msg("dedupe: remove") + } + + return nil +} + +// DeleteBlobUpload deletes an existing blob upload that is currently in progress. +func (is *ImageStoreFS) DeleteBlobUpload(repo string, uuid string) error { + blobUploadPath := is.BlobUploadPath(repo, uuid) + if err := os.Remove(blobUploadPath); err != nil { + is.log.Error().Err(err).Str("blobUploadPath", blobUploadPath).Msg("error deleting blob upload") + return err + } + + return nil +} + +// BlobPath returns the repository path of a blob. +func (is *ImageStoreFS) BlobPath(repo string, digest godigest.Digest) string { + return path.Join(is.rootDir, repo, "blobs", digest.Algorithm().String(), digest.Encoded()) +} + +// CheckBlob verifies a blob and returns true if the blob is correct. +func (is *ImageStoreFS) CheckBlob(repo string, digest string) (bool, int64, error) { + d, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + return false, -1, errors.ErrBadBlobDigest + } + + blobPath := is.BlobPath(repo, d) + + if is.dedupe && is.cache != nil { + is.Lock() + defer is.Unlock() + } else { + is.RLock() + defer is.RUnlock() + } + + blobInfo, err := os.Stat(blobPath) + if err == nil { + is.log.Debug().Str("blob path", blobPath).Msg("blob path found") + + return true, blobInfo.Size(), nil + } + + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + + // Check blobs in cache + dstRecord, err := is.checkCacheBlob(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("cache: not found") + + return false, -1, errors.ErrBlobNotFound + } + + // If found copy to location + blobSize, err := is.copyBlob(repo, blobPath, dstRecord) + if err != nil { + return false, -1, errors.ErrBlobNotFound + } + + if err := is.cache.PutBlob(digest, blobPath); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Msg("dedupe: unable to insert blob record") + + return false, -1, err + } + + return true, blobSize, nil +} + +func (is *ImageStoreFS) checkCacheBlob(digest string) (string, error) { + if !is.dedupe || is.cache == nil { + return "", errors.ErrBlobNotFound + } + + dstRecord, err := is.cache.GetBlob(digest) + if err != nil { + return "", err + } + + dstRecord = path.Join(is.rootDir, dstRecord) + + is.log.Debug().Str("digest", digest).Str("dstRecord", dstRecord).Msg("cache: found dedupe record") + + return dstRecord, nil +} + +func (is *ImageStoreFS) copyBlob(repo string, blobPath string, dstRecord string) (int64, error) { + if err := is.initRepo(repo); err != nil { + is.log.Error().Err(err).Str("repo", repo).Msg("unable to initialize an empty repo") + return -1, err + } + + _ = ensureDir(filepath.Dir(blobPath), is.log) + + if err := os.Link(dstRecord, blobPath); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Str("link", dstRecord).Msg("dedupe: unable to hard link") + + return -1, errors.ErrBlobNotFound + } + + blobInfo, err := os.Stat(blobPath) + if err == nil { + return blobInfo.Size(), nil + } + + return -1, errors.ErrBlobNotFound +} + +// GetBlob returns a stream to read the blob. +// FIXME: we should probably parse the manifest and use (digest, mediaType) as a +// blob selector instead of directly downloading the blob. +func (is *ImageStoreFS) GetBlob(repo string, digest string, mediaType string) (io.Reader, int64, error) { + d, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + return nil, -1, errors.ErrBadBlobDigest + } + + blobPath := is.BlobPath(repo, d) + + is.RLock() + defer is.RUnlock() + + blobInfo, err := os.Stat(blobPath) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + return nil, -1, errors.ErrBlobNotFound + } + + blobReader, err := os.Open(blobPath) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to open blob") + return nil, -1, err + } + + return blobReader, blobInfo.Size(), nil +} + +func (is *ImageStoreFS) GetBlobContent(repo string, digest string) ([]byte, error) { + blob, _, err := is.GetBlob(repo, digest, ispec.MediaTypeImageManifest) + if err != nil { + return []byte{}, err + } + + buf := new(bytes.Buffer) + + _, err = buf.ReadFrom(blob) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to read blob") + return []byte{}, err + } + + return buf.Bytes(), nil +} + +func (is *ImageStoreFS) GetIndexContent(repo string) ([]byte, error) { + dir := path.Join(is.rootDir, repo) + + buf, err := ioutil.ReadFile(path.Join(dir, "index.json")) + if err != nil { + if os.IsNotExist(err) { + is.log.Error().Err(err).Str("dir", dir).Msg("index.json doesn't exist") + return []byte{}, errors.ErrRepoNotFound + } + + is.log.Error().Err(err).Str("dir", dir).Msg("failed to read index.json") + + return []byte{}, err + } + + return buf, nil +} + +// DeleteBlob removes the blob from the repository. +func (is *ImageStoreFS) DeleteBlob(repo string, digest string) error { + d, err := godigest.Parse(digest) + if err != nil { + is.log.Error().Err(err).Str("digest", digest).Msg("failed to parse digest") + return errors.ErrBlobNotFound + } + + blobPath := is.BlobPath(repo, d) + + is.Lock() + defer is.Unlock() + + _, err = os.Stat(blobPath) + if err != nil { + is.log.Error().Err(err).Str("blob", blobPath).Msg("failed to stat blob") + return errors.ErrBlobNotFound + } + + if is.cache != nil { + if err := is.cache.DeleteBlob(digest, blobPath); err != nil { + is.log.Error().Err(err).Str("digest", digest).Str("blobPath", blobPath).Msg("unable to remove blob path from cache") + return err + } + } + + if err := os.Remove(blobPath); err != nil { + is.log.Error().Err(err).Str("blobPath", blobPath).Msg("unable to remove blob path") + return err + } + + return nil +} + +// garbage collection + +// Scrub will clean up all unreferenced blobs. +// TODO. +func Scrub(dir string, fix bool) error { + return nil +} + +// utility routines + +func CheckHardLink(srcFileName string, destFileName string) error { + return os.Link(srcFileName, destFileName) +} + +func ValidateHardLink(rootDir string) error { + err := ioutil.WriteFile(path.Join(rootDir, "hardlinkcheck.txt"), //nolint: gosec + []byte("check whether hardlinks work on filesystem"), 0644) + if err != nil { + return err + } + + err = CheckHardLink(path.Join(rootDir, "hardlinkcheck.txt"), path.Join(rootDir, "duphardlinkcheck.txt")) + if err != nil { + // Remove hardlinkcheck.txt if hardlink fails + zerr := os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt")) + if zerr != nil { + return zerr + } + + return err + } + + err = os.RemoveAll(path.Join(rootDir, "hardlinkcheck.txt")) + if err != nil { + return err + } + + return os.RemoveAll(path.Join(rootDir, "duphardlinkcheck.txt")) +} + +func ensureDir(dir string, log zerolog.Logger) error { + if err := os.MkdirAll(dir, 0755); err != nil { + log.Error().Err(err).Str("dir", dir).Msg("unable to create dir") + + return err + } + + return nil +} + +func ifOlderThan(is *ImageStoreFS, repo string, delay time.Duration) casext.GCPolicy { + return func(ctx context.Context, digest godigest.Digest) (bool, error) { + blobPath := is.BlobPath(repo, digest) + fi, err := os.Stat(blobPath) + + if err != nil { + return false, err + } + + if fi.ModTime().Add(delay).After(time.Now()) { + return false, nil + } + + is.log.Info().Str("digest", digest.String()).Str("blobPath", blobPath).Msg("perform GC on blob") + + return true, nil + } +} diff --git a/pkg/storage/storage_test.go b/pkg/storage/storage_test.go index 4807645c..ce69c6bf 100644 --- a/pkg/storage/storage_test.go +++ b/pkg/storage/storage_test.go @@ -38,11 +38,18 @@ func TestAPIs(t *testing.T) { v, err := il.ValidateRepo(repoName) So(v, ShouldEqual, false) So(err, ShouldNotBeNil) + ok := il.DirExists(path.Join(il.RootDir(), repoName)) + So(ok, ShouldBeFalse) }) Convey("Initialize repo", func() { err := il.InitRepo(repoName) So(err, ShouldBeNil) + ok := il.DirExists(path.Join(il.RootDir(), repoName)) + So(ok, ShouldBeTrue) + storeController := storage.StoreController{} + storeController.DefaultStore = il + So(storeController.GetImageStore("test"), ShouldResemble, il) }) Convey("Validate repo", func() { @@ -78,6 +85,13 @@ func TestAPIs(t *testing.T) { So(err, ShouldBeNil) So(v, ShouldNotBeEmpty) + err = il.DeleteBlobUpload("test", v) + So(err, ShouldBeNil) + + v, err = il.NewBlobUpload("test") + So(err, ShouldBeNil) + So(v, ShouldNotBeEmpty) + Convey("Get blob upload", func() { b, err := il.GetBlobUpload("test", "invalid") So(err, ShouldNotBeNil) @@ -92,17 +106,40 @@ func TestAPIs(t *testing.T) { So(b, ShouldBeGreaterThanOrEqualTo, 0) content := []byte("test-data1") + firstChunkContent := []byte("test") + firstChunkBuf := bytes.NewBuffer(firstChunkContent) + secondChunkContent := []byte("-data1") + secondChunkBuf := bytes.NewBuffer(secondChunkContent) + firstChunkLen := firstChunkBuf.Len() + secondChunkLen := secondChunkBuf.Len() + buf := bytes.NewBuffer(content) l := buf.Len() d := godigest.FromBytes(content) - b, err = il.PutBlobChunk("test", v, 0, int64(l), buf) - So(err, ShouldBeNil) - So(b, ShouldEqual, l) blobDigest := d + // invalid chunk range + _, err = il.PutBlobChunk("test", v, 10, int64(l), buf) + So(err, ShouldNotBeNil) + + b, err = il.PutBlobChunk("test", v, 0, int64(firstChunkLen), firstChunkBuf) + So(err, ShouldBeNil) + So(b, ShouldEqual, firstChunkLen) + + b, err = il.GetBlobUpload("test", v) + So(err, ShouldBeNil) + So(b, ShouldEqual, int64(firstChunkLen)) + + b, err = il.BlobUploadInfo("test", v) + So(err, ShouldBeNil) + So(b, ShouldEqual, int64(firstChunkLen)) + + b, err = il.PutBlobChunk("test", v, int64(firstChunkLen), int64(l), secondChunkBuf) + So(err, ShouldBeNil) + So(b, ShouldEqual, secondChunkLen) + err = il.FinishBlobUpload("test", v, buf, d.String()) So(err, ShouldBeNil) - So(b, ShouldEqual, l) _, _, err = il.CheckBlob("test", d.String()) So(err, ShouldBeNil) @@ -121,11 +158,62 @@ func TestAPIs(t *testing.T) { _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, []byte{}) So(err, ShouldNotBeNil) + _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, []byte(`{"test":true}`)) + So(err, ShouldNotBeNil) + _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, mb) So(err, ShouldNotBeNil) _, _, _, err = il.GetImageManifest("test", d.String()) So(err, ShouldNotBeNil) + + _, _, _, err = il.GetImageManifest("inexistent", d.String()) + So(err, ShouldNotBeNil) + + annotationsMap := make(map[string]string) + annotationsMap[ispec.AnnotationRefName] = "1.0" + m := ispec.Manifest{ + Config: ispec.Descriptor{ + Digest: d, + Size: int64(l), + }, + Layers: []ispec.Descriptor{ + { + MediaType: "application/vnd.oci.image.layer.v1.tar", + Digest: d, + Size: int64(l), + }, + }, + Annotations: annotationsMap, + } + + m.SchemaVersion = 2 + mb, _ = json.Marshal(m) + d := godigest.FromBytes(mb) + + So(os.Chmod(path.Join(il.RootDir(), "test", "index.json"), 0000), ShouldBeNil) + _, err = il.PutImageManifest("test", "1.0", ispec.MediaTypeImageManifest, mb) + So(err, ShouldNotBeNil) + + So(os.Chmod(path.Join(il.RootDir(), "test", "index.json"), 0755), ShouldBeNil) + _, err = il.PutImageManifest("test", "1.0", ispec.MediaTypeImageManifest, mb) + So(err, ShouldBeNil) + + manifestPath := path.Join(il.RootDir(), "test", "blobs", d.Algorithm().String(), d.Encoded()) + + So(os.Chmod(manifestPath, 0000), ShouldBeNil) + _, _, _, err = il.GetImageManifest("test", d.String()) + So(err, ShouldNotBeNil) + + So(os.Remove(manifestPath), ShouldBeNil) + _, _, _, err = il.GetImageManifest("test", d.String()) + So(err, ShouldNotBeNil) + + So(os.Chmod(path.Join(il.RootDir(), "test"), 0000), ShouldBeNil) + _, err = il.PutImageManifest("test", "2.0", ispec.MediaTypeImageManifest, mb) + So(err, ShouldNotBeNil) + So(os.Chmod(path.Join(il.RootDir(), "test"), 0755), ShouldBeNil) + So(os.RemoveAll(path.Join(il.RootDir(), "test")), ShouldBeNil) }) Convey("Good image manifest", func() { @@ -147,8 +235,20 @@ func TestAPIs(t *testing.T) { } m.SchemaVersion = 2 - mb, _ := json.Marshal(m) + mb, _ = json.Marshal(m) d := godigest.FromBytes(mb) + + // bad manifest + m.Layers[0].Digest = godigest.FromBytes([]byte("inexistent")) + badMb, _ := json.Marshal(m) + + _, err = il.PutImageManifest("test", "1.0", ispec.MediaTypeImageManifest, badMb) + So(err, ShouldNotBeNil) + + _, err = il.PutImageManifest("test", "1.0", ispec.MediaTypeImageManifest, mb) + So(err, ShouldBeNil) + + // same manifest for coverage _, err = il.PutImageManifest("test", "1.0", ispec.MediaTypeImageManifest, mb) So(err, ShouldBeNil) @@ -158,6 +258,9 @@ func TestAPIs(t *testing.T) { _, err = il.PutImageManifest("test", "3.0", ispec.MediaTypeImageManifest, mb) So(err, ShouldBeNil) + _, err = il.GetImageTags("inexistent") + So(err, ShouldNotBeNil) + // total tags should be 3 but they have same reference. tags, err := il.GetImageTags("test") So(err, ShouldBeNil) @@ -166,6 +269,9 @@ func TestAPIs(t *testing.T) { _, _, _, err = il.GetImageManifest("test", d.String()) So(err, ShouldBeNil) + _, _, _, err = il.GetImageManifest("test", "3.0") + So(err, ShouldBeNil) + err = il.DeleteImageManifest("test", "1.0") So(err, ShouldBeNil) @@ -178,6 +284,15 @@ func TestAPIs(t *testing.T) { So(err, ShouldBeNil) So(hasBlob, ShouldEqual, true) + // invalid DeleteImageManifest + indexPath := path.Join(il.RootDir(), "test", "index.json") + So(os.Chmod(indexPath, 0000), ShouldBeNil) + + err = il.DeleteImageManifest("test", d.String()) + So(err, ShouldNotBeNil) + + So(os.Chmod(indexPath, 0755), ShouldBeNil) + // If we pass reference all manifest with input reference should be deleted. err = il.DeleteImageManifest("test", d.String()) So(err, ShouldBeNil) @@ -191,6 +306,12 @@ func TestAPIs(t *testing.T) { So(err, ShouldNotBeNil) So(hasBlob, ShouldEqual, false) + err = il.DeleteBlob("test", "inexistent") + So(err, ShouldNotBeNil) + + err = il.DeleteBlob("test", godigest.FromBytes([]byte("inexistent")).String()) + So(err, ShouldNotBeNil) + err = il.DeleteBlob("test", blobDigest.String()) So(err, ShouldBeNil) @@ -209,6 +330,9 @@ func TestAPIs(t *testing.T) { So(v, ShouldNotBeEmpty) Convey("Get blob upload", func() { + err = il.FinishBlobUpload("test", v, bytes.NewBuffer([]byte{}), "inexistent") + So(err, ShouldNotBeNil) + b, err := il.GetBlobUpload("test", "invalid") So(err, ShouldNotBeNil) So(b, ShouldEqual, -1) @@ -217,6 +341,9 @@ func TestAPIs(t *testing.T) { So(err, ShouldBeNil) So(b, ShouldBeGreaterThanOrEqualTo, 0) + _, err = il.BlobUploadInfo("test", "inexistent") + So(err, ShouldNotBeNil) + b, err = il.BlobUploadInfo("test", v) So(err, ShouldBeNil) So(b, ShouldBeGreaterThanOrEqualTo, 0) @@ -229,24 +356,50 @@ func TestAPIs(t *testing.T) { So(err, ShouldBeNil) So(b, ShouldEqual, l) + _, err = il.PutBlobChunkStreamed("test", "inexistent", buf) + So(err, ShouldNotBeNil) + + err = il.FinishBlobUpload("test", "inexistent", buf, d.String()) + So(err, ShouldNotBeNil) + err = il.FinishBlobUpload("test", v, buf, d.String()) So(err, ShouldBeNil) - So(b, ShouldEqual, l) _, _, err = il.CheckBlob("test", d.String()) So(err, ShouldBeNil) + _, _, err = il.GetBlob("test", "inexistent", "application/vnd.oci.image.layer.v1.tar+gzip") + So(err, ShouldNotBeNil) + _, _, err = il.GetBlob("test", d.String(), "application/vnd.oci.image.layer.v1.tar+gzip") So(err, ShouldBeNil) + blobContent, err := il.GetBlobContent("test", d.String()) + So(err, ShouldBeNil) + So(content, ShouldResemble, blobContent) + + _, err = il.GetBlobContent("inexistent", d.String()) + So(err, ShouldNotBeNil) + m := ispec.Manifest{} m.SchemaVersion = 2 mb, _ := json.Marshal(m) + Convey("Bad digests", func() { + _, _, err := il.FullBlobUpload("test", bytes.NewBuffer([]byte{}), "inexistent") + So(err, ShouldNotBeNil) + + _, _, err = il.CheckBlob("test", "inexistent") + So(err, ShouldNotBeNil) + }) + Convey("Bad image manifest", func() { _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, mb) So(err, ShouldNotBeNil) + _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, []byte("bad json")) + So(err, ShouldNotBeNil) + _, _, _, err = il.GetImageManifest("test", d.String()) So(err, ShouldNotBeNil) }) @@ -265,18 +418,38 @@ func TestAPIs(t *testing.T) { }, }, } + m.SchemaVersion = 2 mb, _ = json.Marshal(m) d := godigest.FromBytes(mb) _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, mb) So(err, ShouldBeNil) + // same manifest for coverage + _, err = il.PutImageManifest("test", d.String(), ispec.MediaTypeImageManifest, mb) + So(err, ShouldBeNil) + _, _, _, err = il.GetImageManifest("test", d.String()) So(err, ShouldBeNil) + _, err = il.GetIndexContent("inexistent") + So(err, ShouldNotBeNil) + + indexContent, err := il.GetIndexContent("test") + So(err, ShouldBeNil) + + var index ispec.Index + + err = json.Unmarshal(indexContent, &index) + So(err, ShouldBeNil) + + So(len(index.Manifests), ShouldEqual, 1) err = il.DeleteImageManifest("test", "1.0") So(err, ShouldNotBeNil) + err = il.DeleteImageManifest("inexistent", "1.0") + So(err, ShouldNotBeNil) + err = il.DeleteImageManifest("test", d.String()) So(err, ShouldBeNil) @@ -508,7 +681,7 @@ func TestAPIs(t *testing.T) { func TestDedupe(t *testing.T) { Convey("Dedupe", t, func(c C) { Convey("Nil ImageStore", func() { - is := &storage.ImageStore{} + var is storage.ImageStore So(func() { _ = is.DedupeBlob("", "", "") }, ShouldPanic) }) @@ -645,8 +818,8 @@ func TestNegativeCases(t *testing.T) { }) Convey("Invalid get image tags", t, func(c C) { - il := &storage.ImageStore{} - _, err := il.GetImageTags("test") + var ilfs storage.ImageStoreFS + _, err := ilfs.GetImageTags("test") So(err, ShouldNotBeNil) dir, err := ioutil.TempDir("", "oci-repo-test") @@ -654,7 +827,7 @@ func TestNegativeCases(t *testing.T) { panic(err) } defer os.RemoveAll(dir) - il = storage.NewImageStore(dir, true, true, log.Logger{Logger: zerolog.New(os.Stdout)}) + il := storage.NewImageStore(dir, true, true, log.Logger{Logger: zerolog.New(os.Stdout)}) So(il, ShouldNotBeNil) So(il.InitRepo("test"), ShouldBeNil) So(os.Remove(path.Join(dir, "test", "index.json")), ShouldBeNil) @@ -668,8 +841,8 @@ func TestNegativeCases(t *testing.T) { }) Convey("Invalid get image manifest", t, func(c C) { - il := &storage.ImageStore{} - _, _, _, err := il.GetImageManifest("test", "") + var ilfs storage.ImageStoreFS + _, _, _, err := ilfs.GetImageManifest("test", "") So(err, ShouldNotBeNil) dir, err := ioutil.TempDir("", "oci-repo-test") @@ -677,9 +850,12 @@ func TestNegativeCases(t *testing.T) { panic(err) } defer os.RemoveAll(dir) - il = storage.NewImageStore(dir, true, true, log.Logger{Logger: zerolog.New(os.Stdout)}) + il := storage.NewImageStore(dir, true, true, log.Logger{Logger: zerolog.New(os.Stdout)}) So(il, ShouldNotBeNil) So(il.InitRepo("test"), ShouldBeNil) + So(os.Chmod(path.Join(dir, "test", "index.json"), 0000), ShouldBeNil) + _, _, _, err = il.GetImageManifest("test", "") + So(err, ShouldNotBeNil) So(os.Remove(path.Join(dir, "test", "index.json")), ShouldBeNil) _, _, _, err = il.GetImageManifest("test", "") So(err, ShouldNotBeNil) @@ -690,7 +866,47 @@ func TestNegativeCases(t *testing.T) { So(err, ShouldNotBeNil) }) - Convey("Invalid dedupe sceanrios", t, func() { + Convey("Invalid new blob upload", t, func(c C) { + dir, err := ioutil.TempDir("", "oci-repo-test") + if err != nil { + panic(err) + } + defer os.RemoveAll(dir) + + il := storage.NewImageStore(dir, true, true, log.Logger{Logger: zerolog.New(os.Stdout)}) + So(il, ShouldNotBeNil) + So(il.InitRepo("test"), ShouldBeNil) + + So(os.Chmod(path.Join(dir, "test", ".uploads"), 0000), ShouldBeNil) + _, err = il.NewBlobUpload("test") + So(err, ShouldNotBeNil) + + So(os.Chmod(path.Join(dir, "test"), 0000), ShouldBeNil) + _, err = il.NewBlobUpload("test") + So(err, ShouldNotBeNil) + + So(os.Chmod(path.Join(dir, "test"), 0755), ShouldBeNil) + So(il.InitRepo("test"), ShouldBeNil) + + _, err = il.NewBlobUpload("test") + So(err, ShouldNotBeNil) + + So(os.Chmod(path.Join(dir, "test", ".uploads"), 0755), ShouldBeNil) + v, err := il.NewBlobUpload("test") + So(err, ShouldBeNil) + + So(os.Chmod(path.Join(dir, "test", ".uploads"), 0000), ShouldBeNil) + content := []byte("test-data3") + buf := bytes.NewBuffer(content) + l := buf.Len() + _, err = il.PutBlobChunkStreamed("test", v, buf) + So(err, ShouldNotBeNil) + + _, err = il.PutBlobChunk("test", v, 0, int64(l), buf) + So(err, ShouldNotBeNil) + }) + + Convey("Invalid dedupe scenarios", t, func() { dir, err := ioutil.TempDir("", "oci-repo-test") if err != nil { panic(err) @@ -826,7 +1042,7 @@ func TestStorageHandler(t *testing.T) { storeController.DefaultStore = firstStore - subStore := make(map[string]*storage.ImageStore) + subStore := make(map[string]storage.ImageStore) subStore["/a"] = secondStore subStore["/b"] = thirdStore